Upozornění:
Veškerý zde vystavený kód je mým autorským dílem a je zveřejněn výhradně za účelem ukázky stylu mé práce.
Kód slouží pouze k nahlédnutí – jakékoliv kopírování, šíření, úpravy či jiné využití bez mého výslovného souhlasu nejsou povoleny.
Zůstává mým duševním vlastnictvím ve smyslu autorského zákona.
Děkuji za pochopení.
← Zpět na seznam
RoSaaRi (Python)
Navrh, vyvinul a nasadil jsem vlastní aplikaci, kterou používáme v práci. Je dost dobrá, vím co je potřeba, jsem řidič sanitky :)
Jj, omlouvám se, ale ještě to mám namrskané vše v jednom kódu. Tohle je verze na které stále pracuji.
from flask import Flask, request, jsonify, send_from_directory
from datetime import datetime, timedelta, date
import sqlite3
import os
import json
import secrets
import smtplib
from email.message import EmailMessage
from dotenv import load_dotenv
import bcrypt
import csv
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
from functools import wraps
from zoneinfo import ZoneInfo
load_dotenv()
app = Flask(__name__, instance_relative_config=True)
os.makedirs(app.instance_path, exist_ok=True)
DB_NAME = os.path.join(app.instance_path, "zaznamy.db")
app.config["SECRET_KEY"] = os.getenv("SECRET_KEY")
if not app.config["SECRET_KEY"]:
raise RuntimeError("Chybí SECRET_KEY v .env")
serializer = URLSafeTimedSerializer(app.config["SECRET_KEY"])
TOKEN_MAX_AGE = 60 * 60 * 48 # 48 hodin
def get_db():
conn = sqlite3.connect(DB_NAME, timeout=5)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def init_db():
with get_db() as conn:
c = conn.cursor()
c.execute('''
CREATE TABLE IF NOT EXISTS zaznamy (
id TEXT PRIMARY KEY,
typ TEXT,
kod INTEGER,
cas_nalozeni TEXT,
mesto_nalozeni TEXT,
poznamka TEXT,
cas_vylozeni TEXT,
mesto_vylozeni TEXT,
tachometr TEXT,
smena TEXT,
datum TEXT,
ridic TEXT,
vozidlo TEXT,
personal_id TEXT,
pojistovna TEXT,
ulozeno TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
c.execute('''
CREATE TABLE IF NOT EXISTS karta_vozidla (
id INTEGER PRIMARY KEY AUTOINCREMENT,
datum TEXT,
poradove_cislo INTEGER,
vozidlo TEXT,
smena TEXT,
ujeto_km INTEGER,
konec_km INTEGER,
natankovano_l REAL,
stav_v_nadrzi_l REAL,
poznamka TEXT,
ridic TEXT,
zapsano TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
c.execute('''
CREATE TABLE IF NOT EXISTS uzivatele (
username TEXT PRIMARY KEY,
heslo TEXT,
ridic TEXT,
vozidlo TEXT,
role TEXT DEFAULT 'ridic'
)
''')
c.execute('''
CREATE TABLE IF NOT EXISTS vozidla (
spz TEXT PRIMARY KEY,
spotreba REAL,
stav TEXT DEFAULT 'volne'
)
''')
c.execute("""
CREATE TABLE IF NOT EXISTS dovolena (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ridic TEXT NOT NULL,
datum_od TEXT,
datum_do TEXT,
hodiny REAL DEFAULT 0,
stav TEXT NOT NULL DEFAULT 'ceka',
vytvoril TEXT,
vytvoreno TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
schvalil TEXT,
schvaleno TIMESTAMP,
poznamka TEXT
)
""")
# Migrace starší verze tabulky dovolena, která měla jen rok/mesic/tyden/stav.
c.execute("PRAGMA table_info(dovolena)")
dovolena_sloupce = {row[1] for row in c.fetchall()}
for nazev, definice in {
"datum_od": "TEXT",
"datum_do": "TEXT",
"hodiny": "REAL DEFAULT 0",
"vytvoril": "TEXT",
"vytvoreno": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP",
"schvalil": "TEXT",
"schvaleno": "TIMESTAMP",
"poznamka": "TEXT",
}.items():
if nazev not in dovolena_sloupce:
c.execute(f"ALTER TABLE dovolena ADD COLUMN {nazev} {definice}")
# Roční nárok na dovolenou v hodinách. 20 dní × 8 hodin = 160.
c.execute("PRAGMA table_info(uzivatele)")
uzivatele_sloupce = {row[1] for row in c.fetchall()}
if "dovolena_hodin_rok" not in uzivatele_sloupce:
c.execute("ALTER TABLE uzivatele ADD COLUMN dovolena_hodin_rok REAL DEFAULT 160")
conn.commit()
SMENY = [
"D1", "D7", "D5+HDS", "PG", "-", "-", "-",
"D7", "D2", "5.00 - 15.00 K1", "D3", "7.00 - 15.30 K2", "-", "-",
"D5+HDS", "PG", "-", "4.40 - 15.00 K1", "D2", "6.30 - 18.30 R1", "PG",
"-", "4.40 - 15.00 K1", "D3", "7.00 - 15.30 K2", "D4", "-", "6.30 - 18.30 R1",
"18.30 - 06.30 NR", "-", "D1", "6.30 - 18.30 R2", "D6", "-", "-",
"7.00 - 15.30 K2", "6.30 - 18.30 R2", "D4", "D2", "7.00 - 15.30 OS + HDS", "-", "-",
"6.30 - 18.30 R1", "18.30 - 06.30 NR", "-", "D1", "D3", "HDS", "-",
"6.30 - 18.30 R2", "7.00 - 15.30 OS + HDS", "D6", "D5+HDS", "PG", "PG", "18.30 - 06.30 NR",
"-", "6.30 - 18.30 R1", "18.30 - 06.30 NR", "-", "5.00 - 15.00 K1", "-", "-",
"D3", "D1", "7.00 - 15.30 OS + HDS", "D7", "D5+HDS", "-", "-",
"D2", "D3", "6.30 - 18.30 R1", "18.30 - 06.30 NR", "-", "-", "-",
"PG", "-", "6.30 - 18.30 R2", "D4", "D1", "-", "-",
"7.00 - 15.30 OS + HDS", "D4", "7.00 - 15.30 K2", "D6", "D7", "-", "-",
"5.00 - 15.00 K1", "D6", "-", "6.30 - 18.30 R1", "18.30 - 06.30 NR", "18.30 - 06.30 NR", "-",
"D4", "D5+HDS", "PG", "-", "6.30 - 18.30 R2", "OHDS", "-",
"D6", "7.00 - 15.30 K2", "D2", "7.00 - 15.30 OS + HDS", "6.30 - 18.30 R1", "-", "-"
]
BASE_START_DATE = date(2025, 3, 31)
def hash_hesla(heslo):
return bcrypt.hashpw(heslo.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def over_heslo(heslo, hash_z_db):
return bcrypt.checkpw(heslo.encode('utf-8'), hash_z_db.encode('utf-8'))
def vytvor_token(username, role, ridic, vozidlo):
return serializer.dumps({
"username": username,
"role": role or "ridic",
"ridic": ridic,
"vozidlo": vozidlo
})
def nacti_token():
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
return None
token = auth.replace("Bearer ", "", 1)
try:
return serializer.loads(token, max_age=TOKEN_MAX_AGE)
except SignatureExpired:
return None
except BadSignature:
return None
def login_required(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
user = nacti_token()
if not user:
return jsonify({"error": "Neoprávněný přístup"}), 401
request.user = user
return fn(*args, **kwargs)
return wrapper
def admin_required(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
user = nacti_token()
if not user:
return jsonify({"error": "Neoprávněný přístup"}), 401
if user.get("role") != "admin":
return jsonify({"error": "Nedostatečná oprávnění"}), 403
request.user = user
return fn(*args, **kwargs)
return wrapper
def zaloguj_udalost(typ, uzivatel, ip_adresa):
zaznam = {
"typ": typ,
"uzivatel": uzivatel,
"cas": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"ip": ip_adresa
}
file_exists = os.path.exists("logs.csv")
with open("logs.csv", mode="a", newline="", encoding="utf-8") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=["typ", "uzivatel", "cas", "ip"], delimiter=";")
if not file_exists:
writer.writeheader()
writer.writerow(zaznam)
def get_posun_z_db(kod):
conn = sqlite3.connect(DB_NAME)
cur = conn.cursor()
cur.execute("SELECT posun FROM uzivatele WHERE kod = ?", (kod,))
row = cur.fetchone()
conn.close()
return row[0] if row else 0
def get_smena(datum, kod):
posun = get_posun_z_db(kod)
rozdil_dni = (datum - BASE_START_DATE).days
if rozdil_dni < 0:
return "-"
index = (rozdil_dni - posun) % len(SMENY)
return SMENY[index]
def get_smena_a_km(vozidlo):
today = datetime.now(ZoneInfo("Europe/Prague")).strftime("%d.%m.%Y")
with get_db() as conn:
c = conn.cursor()
c.execute("SELECT konec_km FROM karta_vozidla WHERE vozidlo = ? ORDER BY datum DESC, id DESC LIMIT 1", (vozidlo,))
result = c.fetchone()
start_km = result[0] if result and result[0] else 0
return {"datum": today, "start_km": start_km}
# === Funkce pro odeslání e-mailu s volitelnými přílohami ===
def odesli_email(prijemce, predmet, zprava, prilohy=None):
SMTP_SERVER = "smtp.gmail.com"
SMTP_PORT = 587
SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD")
try:
msg = EmailMessage()
msg['Subject'] = predmet
msg['From'] = SMTP_USER
msg['To'] = prijemce
msg.set_content(zprava)
if prilohy:
for pril in prilohy:
with open(pril, 'rb') as f:
file_data = f.read()
file_name = os.path.basename(pril)
msg.add_attachment(file_data, maintype='application', subtype='octet-stream', filename=file_name)
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls()
server.login(SMTP_USER, SMTP_PASSWORD)
server.send_message(msg)
print(f"[EMAIL] E-mail odeslán na {prijemce}")
return True
except Exception as e:
print(f"[EMAIL] Chyba při odesílání na {prijemce}: {e}")
return False
@app.route("/api/vozidla/")
@login_required
def get_spotreba_vozidla(spz):
with get_db() as conn:
c = conn.cursor()
c.execute("SELECT spotreba FROM vozidla WHERE spz = ?", (spz,))
row = c.fetchone()
if row:
return jsonify({"spotreba": row[0]})
else:
return jsonify({"error": "Vozidlo nenalezeno"}), 404
@app.route("/api/login", methods=["POST"])
def login():
data = request.get_json()
username = data.get("username")
password = data.get("password")
with get_db() as conn:
c = conn.cursor()
c.execute("SELECT heslo, ridic, vozidlo, role, kod FROM uzivatele WHERE username = ?", (username,))
row = c.fetchone()
if not row:
return jsonify({"success": False, "msg": "Uživatel nenalezen"}), 401
heslo_db, ridic, vozidlo, role, kod = row
if not over_heslo(password, heslo_db):
return jsonify({"success": False, "msg": "Neplatné heslo"}), 401
from datetime import timedelta
today = datetime.now().date()
smena_auto = get_smena(today, kod)
smena_info = get_smena_a_km(vozidlo)
zaloguj_udalost("prihlaseni", username, request.remote_addr)
token = vytvor_token(username, role, ridic, vozidlo)
return jsonify({
"success": True,
"token": token,
"ridic": ridic,
"vozidlo": vozidlo,
"smena_navrzena": smena_auto,
"datum": smena_info["datum"],
"start_km": smena_info["start_km"],
"role": role or "ridic"
})
@app.route("/api/poradove_cislo")
@login_required
def get_poradove_cislo():
vozidlo = request.args.get("vozidlo") or request.user.get("vozidlo")
today = datetime.now(ZoneInfo("Europe/Prague")).strftime("%Y-%m-%d")
month_prefix = today[:7]
with get_db() as conn:
c = conn.cursor()
c.execute("""
SELECT MAX(poradove_cislo)
FROM karta_vozidla
WHERE vozidlo = ? AND datum LIKE ?
""", (vozidlo, f"{month_prefix}%"))
posledni = c.fetchone()[0] or 0
return jsonify({
"poradove_cislo": posledni + 1,
"datum_iso": today
})
@app.route("/api/zaznamy", methods=["POST"])
@login_required
def prijmi_zaznamy():
user = request.user
data = request.get_json()
if not isinstance(data, list):
return jsonify({"error": "Data musí být seznam záznamů"}), 400
with get_db() as conn:
c = conn.cursor()
for z in data:
try:
c.execute('''
INSERT INTO zaznamy (
id, typ, kod, cas_nalozeni, mesto_nalozeni, poznamka,
cas_vylozeni, mesto_vylozeni, tachometr, smena, datum,
ridic, vozidlo, personal_id, pojistovna
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', (
z.get("id"), z.get("typ"), z.get("kod"), z.get("cas_nalozeni"),
z.get("mesto_nalozeni"), z.get("poznamka"), z.get("cas_vylozeni"),
z.get("mesto_vylozeni"), z.get("tachometr"), z.get("smena"),
z.get("datum"), user.get("ridic"), user.get("vozidlo"),
z.get("personal_id"), z.get("pojistovna")))
except sqlite3.IntegrityError:
pass
conn.commit()
return jsonify({"status": "OK", "count": len(data)})
@app.route("/api/karta", methods=["POST"])
@login_required
def uloz_kartu():
z = request.get_json()
if not isinstance(z, dict):
return jsonify({"error": "Chybná data"}), 400
datum = z.get("datum")
vozidlo = z.get("vozidlo")
poradove = z.get("poradove_cislo")
if not datum or not vozidlo:
return jsonify({"error": "Chybí datum nebo vozidlo"}), 400
with get_db() as conn:
c = conn.cursor()
if not poradove:
month_prefix = datum[:7]
c.execute("""
SELECT MAX(poradove_cislo)
FROM karta_vozidla
WHERE vozidlo = ? AND datum LIKE ?
""", (vozidlo, f"{month_prefix}%"))
poradove = (c.fetchone()[0] or 0) + 1
c.execute("""
INSERT INTO karta_vozidla (
datum, poradove_cislo, vozidlo, smena, ujeto_km, konec_km,
natankovano_l, stav_v_nadrzi_l, poznamka, ridic
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
datum, poradove, vozidlo, z.get("smena"), z.get("ujeto_km"),
z.get("konec_km"), z.get("natankovano_l"), z.get("stav_v_nadrzi_l"),
z.get("poznamka"), z.get("ridic")
))
conn.commit()
return jsonify({"status": "Karta uložena", "poradove": poradove}), 200
@app.route("/tabulky")
@login_required
def get_tabulky():
with get_db() as conn:
c = conn.cursor()
c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
tabulky = [row[0] for row in c.fetchall()]
return jsonify(tabulky)
@app.route("/filtry")
@login_required
def get_filtry():
tabulka = request.args.get("tabulka")
if tabulka not in ["zaznamy", "karta_vozidla", "uzivatele", "vozidla"]:
return jsonify({"error": "Neznámá tabulka"}), 400
with get_db() as conn:
c = conn.cursor()
try:
c.execute(f"SELECT DISTINCT ridic FROM {tabulka} WHERE ridic IS NOT NULL")
jmena = sorted(set(row[0] for row in c.fetchall() if row[0]))
except sqlite3.OperationalError:
jmena = []
try:
if tabulka == "vozidla":
c.execute("SELECT spz FROM vozidla WHERE stav = 'volne'")
spz = sorted(set(row[0] for row in c.fetchall()))
else:
c.execute(f"SELECT DISTINCT vozidlo FROM {tabulka} WHERE vozidlo IS NOT NULL")
spz = sorted(set(str(row[0]) for row in c.fetchall() if row[0]))
except sqlite3.OperationalError:
spz = []
return jsonify({"spz": spz, "jmeno": jmena})
@app.route("/api/stav_vozidla")
@login_required
def stav_vozidla():
spz = request.args.get("spz")
if not spz:
return jsonify({"error": "Chybí parametr SPZ"}), 400
with get_db() as conn:
c = conn.cursor()
c.execute("SELECT stav FROM vozidla WHERE spz = ?", (spz,))
row = c.fetchone()
if not row:
return jsonify({"error": "Vozidlo neexistuje"}), 404
return jsonify({"stav": row[0]})
@app.route("/api/rezervuj_vozidlo", methods=["POST"])
@login_required
def rezervuj_vozidlo():
data = request.get_json()
spz = data.get("vozidlo")
if not spz:
return jsonify({"error": "Chybí parametr vozidlo"}), 400
with get_db() as conn:
c = conn.cursor()
c.execute("SELECT stav FROM vozidla WHERE spz = ?", (spz,))
row = c.fetchone()
if not row:
return jsonify({"error": "Vozidlo neexistuje"}), 404
if row[0] == "obsazene":
return jsonify({"error": "Vozidlo je již obsazeno"}), 409
c.execute("UPDATE vozidla SET stav = 'obsazene' WHERE spz = ?", (spz,))
conn.commit()
return jsonify({"ok": True})
@app.route("/api/uvolni_vozidlo", methods=["POST"])
@login_required
def uvolni_vozidlo():
data = request.get_json()
spz = data.get("vozidlo")
if not spz:
return jsonify({"error": "Chybí parametr vozidlo"}), 400
with get_db() as conn:
c = conn.cursor()
c.execute("SELECT stav FROM vozidla WHERE spz = ?", (spz,))
row = c.fetchone()
if not row:
return jsonify({"error": "Vozidlo neexistuje"}), 404
if row[0] == "volne":
return jsonify({"ok": True, "msg": "Vozidlo je již volné"})
c.execute("UPDATE vozidla SET stav = 'volne' WHERE spz = ?", (spz,))
conn.commit()
return jsonify({"ok": True, "msg": "Vozidlo bylo uvolněno"})
@app.route("/data")
@login_required
def get_data():
tabulka = request.args.get("tabulka")
povolene_tabulky = ["zaznamy", "karta_vozidla", "vozidla", "uzivatele"]
if tabulka not in povolene_tabulky:
return jsonify({"error": "Nepovolená tabulka"}), 403
spz = request.args.get("spz")
jmeno = request.args.get("jmeno")
datum = request.args.get("datum")
filters = []
params = []
if jmeno:
filters.append("ridic = ?")
params.append(jmeno)
spz_sloupec = "vozidlo" if tabulka in ["karta_vozidla", "zaznamy"] else "spz"
if spz and spz_sloupec:
filters.append(f"{spz_sloupec} = ?")
params.append(spz)
if datum:
filters.append("datum = ?")
params.append(datum)
where_clause = " WHERE " + " AND ".join(filters) if filters else ""
order_clause = " ORDER BY datum DESC, id DESC" if tabulka == "karta_vozidla" else ""
with get_db() as conn:
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute(f"SELECT * FROM {tabulka}{where_clause}{order_clause}", params)
rows = [dict(row) for row in c.fetchall()]
return jsonify(rows)
@app.route("/update", methods=["PUT"])
@admin_required
def update_zaznam():
tabulka = request.args.get("tabulka")
data = request.get_json()
if tabulka not in ["zaznamy", "karta_vozidla", "vozidla", "uzivatele"]:
return jsonify({"error": "Nepovolená tabulka"}), 400
if not data:
return jsonify({"error": "Chybí data"}), 400
if tabulka == "vozidla":
if "spz" not in data:
return jsonify({"error": "Chybí SPZ"}), 400
spz = data["spz"]
fields = [k for k in data.keys() if k != "spz"]
if not fields:
return jsonify({"error": "Není co upravit"}), 400
values = [data[k] for k in fields]
with get_db() as conn:
c = conn.cursor()
c.execute(
f"UPDATE vozidla SET {', '.join([f'{k} = ?' for k in fields])} WHERE spz = ?",
(*values, spz)
)
conn.commit()
return jsonify({"status": "upraveno"})
if tabulka == "uzivatele":
if "username" not in data:
return jsonify({"error": "Chybí username"}), 400
username = data["username"]
zakazane = {"heslo", "posun"}
fields = [k for k in data.keys() if k not in zakazane and k != "username"]
if not fields:
return jsonify({"error": "Není co upravit"}), 400
values = [data[k] for k in fields]
with get_db() as conn:
c = conn.cursor()
c.execute(
f"UPDATE uzivatele SET {', '.join([f'{k} = ?' for k in fields])} WHERE username = ?",
(*values, username)
)
conn.commit()
return jsonify({"status": "upraveno"})
keys = list(data.keys())
if "id" in data:
where = "id = ?"
where_value = data["id"]
elif "poradove_cislo" in data and "datum" in data:
where = "poradove_cislo = ? AND datum = ?"
where_value = (data["poradove_cislo"], data["datum"])
else:
return jsonify({"error": "Nelze určit unikátní záznam"}), 400
fields = [f"{k} = ?" for k in keys if k not in ["id", "poradove_cislo", "datum"]]
values = [data[k] for k in keys if k not in ["id", "poradove_cislo", "datum"]]
with get_db() as conn:
c = conn.cursor()
if isinstance(where_value, tuple):
c.execute(f"UPDATE {tabulka} SET {', '.join(fields)} WHERE {where}", (*values, *where_value))
else:
c.execute(f"UPDATE {tabulka} SET {', '.join(fields)} WHERE {where}", (*values, where_value))
conn.commit()
return jsonify({"status": "upraveno"})
@app.route("/delete", methods=["DELETE"])
@admin_required
def delete_zaznam():
tabulka = request.args.get("tabulka")
data = request.get_json()
if tabulka not in ["zaznamy", "karta_vozidla"]:
return jsonify({"error": "Nepovolená tabulka"}), 400
if not data:
return jsonify({"error": "Chybí data"}), 400
if "id" in data:
where = "id = ?"
params = (data["id"],)
elif "poradove_cislo" in data and "datum" in data:
where = "poradove_cislo = ? AND datum = ?"
params = (data["poradove_cislo"], data["datum"])
else:
return jsonify({"error": "Nelze určit záznam pro smazání"}), 400
with get_db() as conn:
c = conn.cursor()
c.execute(f"DELETE FROM {tabulka} WHERE {where}", params)
conn.commit()
return jsonify({"status": "smazano"})
def spocitej_hodiny_dovolene(datum_od, datum_do):
"""Počítá dovolenou podle pravidla Po–Pá × 8 hodin. Víkendy se neodečítají."""
try:
start = datetime.strptime(datum_od, "%Y-%m-%d").date()
end = datetime.strptime(datum_do, "%Y-%m-%d").date()
except (TypeError, ValueError):
raise ValueError("Datum musí být ve formátu YYYY-MM-DD")
if start > end:
raise ValueError("Datum od nesmí být později než datum do")
pracovni_dny = 0
aktualni = start
while aktualni <= end:
if aktualni.weekday() < 5: # pondělí=0, pátek=4
pracovni_dny += 1
aktualni += timedelta(days=1)
return pracovni_dny * 8, pracovni_dny
def dovolena_prekryv(ridic, datum_od, datum_do, ignorovat_id=None):
"""Vrátí existující nezamítnuté žádosti stejného řidiče, které se překrývají s novou žádostí."""
params = [ridic, datum_do, datum_od]
sql = """
SELECT id, datum_od, datum_do, stav
FROM dovolena
WHERE ridic = ?
AND datum_od IS NOT NULL
AND datum_do IS NOT NULL
AND stav != 'zamitnuto'
AND datum_od <= ?
AND datum_do >= ?
"""
if ignorovat_id is not None:
sql += " AND id != ?"
params.append(ignorovat_id)
with get_db() as conn:
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute(sql, params)
return [dict(row) for row in c.fetchall()]
@app.route("/api/dovolena", methods=["GET", "POST"])
@login_required
def dovolena_api():
user = request.user
if request.method == "GET":
rok = request.args.get("rok", type=int)
datum_od = request.args.get("od")
datum_do = request.args.get("do")
ridic = request.args.get("ridic")
if rok:
datum_od = f"{rok}-01-01"
datum_do = f"{rok}-12-31"
if not datum_od or not datum_do:
return jsonify({"error": "Zadej rok nebo rozsah od/do"}), 400
# Běžný zaměstnanec vidí jen sebe. Admin může filtrovat podle řidiče nebo vidět všechny.
if user.get("role") != "admin":
ridic = user.get("ridic")
params = [datum_do, datum_od]
filters = [
"datum_od IS NOT NULL",
"datum_do IS NOT NULL",
"datum_od <= ?",
"datum_do >= ?",
]
if ridic:
filters.append("ridic = ?")
params.append(ridic)
with get_db() as conn:
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute(f"""
SELECT id, ridic, datum_od, datum_do, hodiny, stav, vytvoril,
vytvoreno, schvalil, schvaleno, poznamka
FROM dovolena
WHERE {' AND '.join(filters)}
ORDER BY datum_od ASC, ridic ASC
""", params)
dovolene = [dict(row) for row in c.fetchall()]
return jsonify(dovolene)
data = request.get_json()
if not isinstance(data, dict):
return jsonify({"error": "Data musí být objekt"}), 400
datum_od = data.get("datum_od")
datum_do = data.get("datum_do")
poznamka = data.get("poznamka")
try:
hodiny, pracovni_dny = spocitej_hodiny_dovolene(datum_od, datum_do)
except ValueError as e:
return jsonify({"error": str(e)}), 400
if user.get("role") == "admin":
ridic = data.get("ridic") or user.get("ridic")
else:
ridic = user.get("ridic")
if not ridic:
return jsonify({"error": "Chybí řidič"}), 400
prekryvy = dovolena_prekryv(ridic, datum_od, datum_do)
if prekryvy:
return jsonify({"error": "Žádost se překrývá s existující dovolenou", "prekryvy": prekryvy}), 409
with get_db() as conn:
c = conn.cursor()
c.execute("""
INSERT INTO dovolena (ridic, datum_od, datum_do, hodiny, stav, vytvoril, poznamka)
VALUES (?, ?, ?, ?, 'ceka', ?, ?)
""", (ridic, datum_od, datum_do, hodiny, user.get("username"), poznamka))
nove_id = c.lastrowid
conn.commit()
return jsonify({
"id": nove_id,
"ridic": ridic,
"datum_od": datum_od,
"datum_do": datum_do,
"hodiny": hodiny,
"pracovni_dny": pracovni_dny,
"stav": "ceka"
}), 201
@app.route("/api/dovolena//stav", methods=["PUT"])
@admin_required
def dovolena_zmen_stav(dovolena_id):
data = request.get_json()
if not isinstance(data, dict):
return jsonify({"error": "Data musí být objekt"}), 400
stav = data.get("stav")
if stav not in ["ceka", "schvaleno", "zamitnuto"]:
return jsonify({"error": "Neplatný stav"}), 400
with get_db() as conn:
c = conn.cursor()
c.execute("SELECT id FROM dovolena WHERE id = ?", (dovolena_id,))
if not c.fetchone():
return jsonify({"error": "Dovolená nenalezena"}), 404
c.execute("""
UPDATE dovolena
SET stav = ?, schvalil = ?, schvaleno = CURRENT_TIMESTAMP
WHERE id = ?
""", (stav, request.user.get("username"), dovolena_id))
conn.commit()
return jsonify({"status": "upraveno", "id": dovolena_id, "stav": stav})
@app.route("/api/dovolena/narok", methods=["GET", "PUT"])
@login_required
def dovolena_narok():
user = request.user
if request.method == "GET":
ridic = request.args.get("ridic") if user.get("role") == "admin" else user.get("ridic")
if not ridic:
return jsonify({"error": "Chybí řidič"}), 400
with get_db() as conn:
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute("""
SELECT ridic, dovolena_hodin_rok
FROM uzivatele
WHERE ridic = ?
""", (ridic,))
row = c.fetchone()
if not row:
return jsonify({"error": "Řidič nenalezen"}), 404
return jsonify(dict(row))
if user.get("role") != "admin":
return jsonify({"error": "Nedostatečná oprávnění"}), 403
data = request.get_json()
ridic = data.get("ridic")
hodiny = data.get("dovolena_hodin_rok")
if not ridic or hodiny is None:
return jsonify({"error": "Chybí ridic nebo dovolena_hodin_rok"}), 400
with get_db() as conn:
c = conn.cursor()
c.execute("UPDATE uzivatele SET dovolena_hodin_rok = ? WHERE ridic = ?", (hodiny, ridic))
conn.commit()
if c.rowcount == 0:
return jsonify({"error": "Řidič nenalezen"}), 404
return jsonify({"status": "upraveno", "ridic": ridic, "dovolena_hodin_rok": hodiny})
@app.route("/api/ridici")
@login_required
def ridici():
con = sqlite3.connect(DB_NAME)
cur = con.cursor()
cur.execute("SELECT ridic FROM uzivatele WHERE role = 'ridic' AND ridic IS NOT NULL")
data = [row[0] for row in cur.fetchall()]
con.close()
return jsonify(data)
@app.route("/api/ztracene_smeny", methods=["POST"])
@login_required
def ztracene_smeny():
data = request.get_json()
ridic = data.get("ridic")
datumy = data.get("datumy", [])
if not ridic or not datumy:
return jsonify({"error": "Chybí ridic nebo datumy"}), 400
with get_db() as conn:
c = conn.cursor()
c.execute("SELECT kod FROM uzivatele WHERE ridic = ?", (ridic,))
row = c.fetchone()
if not row:
return jsonify({"error": "Řidič nenalezen"}), 404
kod = row[0]
vysledky = []
for datum_str in datumy:
try:
datum = datetime.strptime(datum_str, "%Y-%m-%d").date()
smena = get_smena(datum, kod)
if smena and smena != "-":
vysledky.append({"datum": datum_str, "smena": smena})
except Exception as e:
continue
return jsonify(vysledky)
@app.route("/api/zapomenute-heslo", methods=["POST"])
@login_required
def zapomenute_heslo():
data = request.get_json()
email = data.get("email")
if not email:
return jsonify({"success": False, "msg": "Chybí e-mail"}), 400
with get_db() as conn:
c = conn.cursor()
c.execute("SELECT username FROM uzivatele WHERE email = ?", (email,))
row = c.fetchone()
if not row:
return jsonify({"success": True})
nove_heslo = secrets.token_hex(4)
hashed = hash_hesla(nove_heslo)
with get_db() as conn:
c = conn.cursor()
c.execute("UPDATE uzivatele SET heslo = ? WHERE email = ?", (hashed, email))
conn.commit()
predmet = "Obnova hesla - Sanitka systém"
zprava = f"Bylo vám vygenerováno nové dočasné heslo: {nove_heslo}\nPo přihlášení si ho prosím změňte."
odesli_email(email, predmet, zprava)
zaloguj_udalost("obnova_hesla", row[0] if row else "neznámý", request.remote_addr)
return jsonify({"success": True})
@app.route("/api/zmenit-heslo", methods=["POST"])
@login_required
def zmenit_heslo():
data = request.get_json()
ridic = data.get("username")
stare_heslo = data.get("stare_heslo")
nove_heslo = data.get("nove_heslo")
if not all([ridic, stare_heslo, nove_heslo]):
return jsonify({"success": False, "msg": "Chybí údaje"}), 400
with get_db() as conn:
c = conn.cursor()
c.execute("SELECT heslo FROM uzivatele WHERE ridic = ?", (ridic,))
row = c.fetchone()
if not row:
return jsonify({"success": False, "msg": "Uživatel nenalezen"}), 404
stary_hash = row[0]
if not over_heslo(stare_heslo, stary_hash):
return jsonify({"success": False, "msg": "Špatné staré heslo"}), 401
novy_hash = hash_hesla(nove_heslo)
with get_db() as conn:
c = conn.cursor()
c.execute("UPDATE uzivatele SET heslo = ? WHERE ridic = ?", (novy_hash, ridic))
conn.commit()
return jsonify({"success": True, "msg": "Heslo změněno."})
@app.route("/api/odeslat-ics", methods=["POST"])
@login_required
def odeslat_ics():
data = request.get_json()
smeny = data.get("smeny")
jmeno = data.get("jmeno")
mesic = data.get("mesic")
if not smeny or not jmeno or not mesic:
return jsonify({"zprava": "Neplatná data"}), 400
# Získání e-mailu z tabulky 'uzivatele', sloupec 'ridic' obsahuje celé jméno
con = sqlite3.connect(DB_NAME)
cursor = con.cursor()
cursor.execute("SELECT email FROM uzivatele WHERE ridic = ? COLLATE NOCASE", (jmeno.strip(),))
row = cursor.fetchone()
con.close()
if not row:
return jsonify({"zprava": "E-mail nenalezen"}), 404
email = row[0]
ics = "BEGIN:VCALENDAR\nVERSION:2.0\nCALSCALE:GREGORIAN\n"
for zaznam in smeny:
datum, smena = zaznam
if not smena or smena == "-":
continue
try:
dt = datetime.strptime(datum, "%Y-%m-%d")
except ValueError:
continue
start = datetime(dt.year, dt.month, dt.day, 8, 0)
end = datetime(dt.year, dt.month, dt.day, 16, 0)
dt_start = start.strftime("%Y%m%dT%H%M%SZ")
dt_end = end.strftime("%Y%m%dT%H%M%SZ")
ics += f"BEGIN:VEVENT\nSUMMARY:{smena}\nDTSTART:{dt_start}\nDTEND:{dt_end}\nEND:VEVENT\n"
ics += "END:VCALENDAR"
# Ulož ICS do dočasného souboru
filename = f"{mesic}.ics"
with open(filename, "w", encoding="utf-8") as f:
f.write(ics)
# Odešli e-mail
predmet = f"Rozpis směn - {mesic}"
zprava = f"V příloze naleznete směny pro měsíc {mesic}."
uspesne = odesli_email(email, predmet, zprava, prilohy=[filename])
os.remove(filename)
if uspesne:
return jsonify({"zprava": "E-mail úspěšně odeslán."})
else:
return jsonify({"zprava": "Chyba při odesílání."}), 500
@app.route('/')
def static_files(filename):
return send_from_directory('.', filename)
@app.route("/")
def index():
return send_from_directory('.', 'index.html')
if __name__ == "__main__":
init_db()
app.run(host="0.0.0.0", port=8050, debug=False)
← Zpět na seznam