Files
s1ne/backend/web/app.py
2026-03-29 23:50:49 -05:00

303 lines
9.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""app.py Jul 15 2025"""
from __future__ import annotations
import asyncio
import mimetypes
import os
import secrets
import shutil
import signal
import sysconfig
import threading
from pathlib import Path
from typing import Dict
import aiofiles
import structlog
from quart import (
Quart,
Response,
jsonify,
redirect,
render_template,
request,
session,
url_for, render_template_string,
)
from backend.core.logging import init_logging
from core.settings import TMP_DIR, DOWNLOAD_DIR
from core.formats import choose_format, _lookup_cache_sync, _cached_metadata_fetch
from core.download import download, EST_MB
from core.formats import choose_format as choose_format_logic
from core.db_xp import is_ip_banned, ensure_user, get_status
from core.web.db_extra import invalid_over_limit, init_proxy_seed, start_background_tasks
from core.db import metadata, engine
from core import progress_bus
init_logging()
log = structlog.get_logger()
def _frontend_root() -> Path:
here = Path(__file__).resolve().parent
dev = here.parent.parent / "frontend"
return dev if dev.exists() else Path(sysconfig.get_path("data")) / "share" / "s1ne" / "frontend"
FRONTEND_ROOT = _frontend_root()
app = Quart(
__name__,
template_folder=str(FRONTEND_ROOT / "templates"),
static_folder=str(FRONTEND_ROOT / "static"),
)
app.secret_key = os.getenv("SECRET_KEY_WORD")
_tasks: Dict[str, asyncio.Task] = {}
async def _cleanup_temp(interval: int = 900) -> None:
while True:
cutoff = asyncio.get_event_loop().time() - 60 * 60 * 12
for p in (TMP_DIR / "ytlocks").glob("*.lock"):
if p.stat().st_mtime < cutoff:
p.unlink(missing_ok=True)
for pattern in ("yt_*", "tmp*"):
for p in TMP_DIR.glob(pattern):
if p.is_dir() and p.stat().st_mtime < cutoff:
shutil.rmtree(p, ignore_errors=True)
await asyncio.sleep(interval)
async def _file_iter(path: Path, chunk: int = 1 << 15):
async with aiofiles.open(path, "rb") as f:
while (blk := await f.read(chunk)):
yield blk
async def _shutdown_waiter():
await asyncio.sleep(0.1)
log.info("shutdown.tasks_cancelled")
def _graceful_exit() -> None:
log.info("shutdown.initiated")
for t in list(_tasks.values()):
if not t.done():
t.cancel()
asyncio.create_task(_shutdown_waiter())
def force_exit():
import time
time.sleep(5)
os._exit(1)
threading.Thread(target=force_exit, daemon=True).start()
@app.before_serving
async def _launch_tasks() -> None:
metadata.create_all(engine)
await init_proxy_seed()
start_background_tasks(asyncio.get_running_loop())
asyncio.create_task(_cleanup_temp())
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, _graceful_exit)
@app.route("/")
async def home():
if not request.cookies.get("auth"):
return await render_template("login.html")
ip = request.remote_addr or "0.0.0.0"
ensure_user(ip)
soft_banned = get_status(ip)["soft_banned"]
return await render_template("index.html", soft_banned=soft_banned)
@app.route("/login", methods=["GET", "POST"])
async def login():
if request.method == "GET":
return await render_template("login.html")
form = await request.form
if form.get("password") == os.getenv("MASTER_PASSWORD"):
resp = redirect(url_for("home"))
resp.set_cookie("auth", "1", httponly=True, secure=False)
return resp
return await render_template("login.html", error_badge="Incorrect password")
@app.route("/logout")
async def logout():
session.clear()
resp = redirect(url_for("login"))
resp.delete_cookie("auth")
return resp
@app.route("/choose_format", methods=["POST"])
async def handle_choose_format() -> Response:
url: str
try:
if request.content_type and "application/json" in request.content_type:
data = await request.get_json(silent=True) or {}
url = (data.get("url") or "").strip()
else:
form = await request.form
url = (form.get("url") or "").strip()
if not url:
#log.warning("choose_format.missing_url")
return jsonify({"error": "url field required"}), 422
run_id: str = session.get("run_id") or secrets.token_urlsafe(10)
session["run_id"] = run_id
#log.info("choose_format.run_id_set", run_id=run_id, url=url)
res: dict = await choose_format_logic(url)
res["sid"] = run_id
if "error" in res:
#log.warning("choose_format.logic_error", error=res["error"], url=url)
return jsonify(res), 400
log.info("choose_format.success", url=url, title=res.get("title"), platform=res.get("platform"))
return jsonify(res)
except Exception as e:
log.exception("choose_format.exception", err=str(e))
return jsonify({"error": "Internal error during format selection"}), 500
@app.route("/download_file")
async def dl():
ip = request.remote_addr or "0.0.0.0"
if ip == "127.0.0.1":
log.info("dev_mode.skip_ban_check", ip=ip)
else:
if is_ip_banned(ip):
log.warning("download.reject.banned", ip=ip)
return jsonify({"error": "Banned"}), 403
url = request.args.get("url", "").strip()
fmt = request.args.get("format_id", "").strip()
sid = request.args.get("sid", "").strip()
run_id = session.get("run_id")
if run_id is None:
pass
if is_ip_banned(ip):
log.warning("download.reject.banned", ip=ip)
return jsonify({"error": "Banned"}), 403
if not url or not fmt:
log.warning("download.reject.missing_params", url=url, fmt=fmt)
return jsonify({"error": "Missing URL or format"}), 400
if sid in _tasks and not _tasks[sid].done():
log.warning("download.reject.already_running", sid=sid)
return jsonify({"error": "download already running"}), 409
if sid != run_id:
log.warning("download.reject.sid_mismatch", sid=sid, session_run_id=run_id)
return jsonify({
"error": "Session mismatch please refresh the page and select a format again."
}), 403
progress_bus.register(sid)
async def _run_download() -> Path:
try:
meta = await asyncio.to_thread(_lookup_cache_sync, url)
if meta:
chosen = next((f for f in meta["formats"] if f["format_id"] == fmt), None)
est = (
chosen.get("filesize")
or chosen.get("filesize_approx")
or 0
) if chosen else 0
EST_MB.set(int(est / 1_048_576))
log.info("download.starting", sid=sid, url=url, fmt=fmt)
path_str = await download(url, fmt, ip, sid)
return Path(path_str)
finally:
_tasks.pop(sid, None)
task = asyncio.create_task(_run_download())
_tasks[sid] = task
try:
tmp_path = await task
mime = mimetypes.guess_type(tmp_path.name)[0] or "application/octet-stream"
log.info("download.success", file=str(tmp_path), sid=sid)
resp = Response(
_file_iter(tmp_path),
headers={
"Content-Type": mime,
"Content-Disposition": f'attachment; filename="{tmp_path.name}"',
},
)
if hasattr(resp, "call_after_response"):
def _after():
progress_bus.update(sid, status="finished", pct=100, progress="Done")
progress_bus.clear(sid)
if str(tmp_path.parent).startswith(str(TMP_DIR)):
shutil.rmtree(tmp_path.parent, ignore_errors=True)
resp.call_after_response(_after)
return resp
except asyncio.CancelledError:
log.warning("download.cancelled", sid=sid)
progress_bus.update(sid, status="cancelled", progress="Cancelled")
return jsonify({"error": "Download cancelled"}), 499
except Exception as e:
log.exception("download.failed", sid=sid, err=str(e))
progress_bus.update(sid, status="error", progress="Error")
return jsonify({"error": "Download failed"}), 500
@app.route("/cancel_download", methods=["POST"])
async def cancel_dl():
sid = request.args.get("sid", "").strip()
if sid:
task = _tasks.get(sid)
if task and not task.done():
task.cancel()
progress_bus.update(sid, status="cancelled", progress="Cancelled")
return jsonify({"status": "cancelled"})
@app.route("/api/progress/<sid>")
async def progress_stream(sid: str):
q = progress_bus.subscribe(sid)
async def gen():
while True:
msg = await q.get()
yield f"data: {msg}\n\n"
return Response(
gen(),
content_type="text/event-stream",
headers={"Cache-Control": "no-store"},
)
@app.before_serving
async def _on_startup():
pass