303 lines
9.2 KiB
Python
303 lines
9.2 KiB
Python
"""app.py – Jul 15 2025"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import mimetypes
|
||
import os
|
||
import secrets
|
||
import shutil
|
||
import signal
|
||
import sysconfig
|
||
import threading
|
||
from pathlib import Path
|
||
from typing import Dict
|
||
|
||
import aiofiles
|
||
import structlog
|
||
from quart import (
|
||
Quart,
|
||
Response,
|
||
jsonify,
|
||
redirect,
|
||
render_template,
|
||
request,
|
||
session,
|
||
url_for, render_template_string,
|
||
)
|
||
|
||
from backend.core.logging import init_logging
|
||
from core.settings import TMP_DIR, DOWNLOAD_DIR
|
||
from core.formats import choose_format, _lookup_cache_sync, _cached_metadata_fetch
|
||
from core.download import download, EST_MB
|
||
from core.formats import choose_format as choose_format_logic
|
||
from core.db_xp import is_ip_banned, ensure_user, get_status
|
||
from core.web.db_extra import invalid_over_limit, init_proxy_seed, start_background_tasks
|
||
from core.db import metadata, engine
|
||
from core import progress_bus
|
||
|
||
init_logging()
|
||
log = structlog.get_logger()
|
||
|
||
|
||
def _frontend_root() -> Path:
|
||
here = Path(__file__).resolve().parent
|
||
dev = here.parent.parent / "frontend"
|
||
return dev if dev.exists() else Path(sysconfig.get_path("data")) / "share" / "s1ne" / "frontend"
|
||
|
||
|
||
FRONTEND_ROOT = _frontend_root()
|
||
app = Quart(
|
||
__name__,
|
||
template_folder=str(FRONTEND_ROOT / "templates"),
|
||
static_folder=str(FRONTEND_ROOT / "static"),
|
||
)
|
||
app.secret_key = os.getenv("SECRET_KEY_WORD")
|
||
|
||
_tasks: Dict[str, asyncio.Task] = {}
|
||
|
||
|
||
async def _cleanup_temp(interval: int = 900) -> None:
|
||
while True:
|
||
cutoff = asyncio.get_event_loop().time() - 60 * 60 * 12
|
||
for p in (TMP_DIR / "ytlocks").glob("*.lock"):
|
||
if p.stat().st_mtime < cutoff:
|
||
p.unlink(missing_ok=True)
|
||
for pattern in ("yt_*", "tmp*"):
|
||
for p in TMP_DIR.glob(pattern):
|
||
if p.is_dir() and p.stat().st_mtime < cutoff:
|
||
shutil.rmtree(p, ignore_errors=True)
|
||
await asyncio.sleep(interval)
|
||
|
||
|
||
async def _file_iter(path: Path, chunk: int = 1 << 15):
|
||
async with aiofiles.open(path, "rb") as f:
|
||
while (blk := await f.read(chunk)):
|
||
yield blk
|
||
|
||
async def _shutdown_waiter():
|
||
await asyncio.sleep(0.1)
|
||
log.info("shutdown.tasks_cancelled")
|
||
|
||
def _graceful_exit() -> None:
|
||
log.info("shutdown.initiated")
|
||
for t in list(_tasks.values()):
|
||
if not t.done():
|
||
t.cancel()
|
||
asyncio.create_task(_shutdown_waiter())
|
||
|
||
def force_exit():
|
||
import time
|
||
time.sleep(5)
|
||
os._exit(1)
|
||
|
||
threading.Thread(target=force_exit, daemon=True).start()
|
||
|
||
|
||
@app.before_serving
|
||
async def _launch_tasks() -> None:
|
||
metadata.create_all(engine)
|
||
await init_proxy_seed()
|
||
start_background_tasks(asyncio.get_running_loop())
|
||
asyncio.create_task(_cleanup_temp())
|
||
loop = asyncio.get_running_loop()
|
||
for sig in (signal.SIGTERM, signal.SIGINT):
|
||
loop.add_signal_handler(sig, _graceful_exit)
|
||
|
||
|
||
@app.route("/")
|
||
async def home():
|
||
if not request.cookies.get("auth"):
|
||
return await render_template("login.html")
|
||
ip = request.remote_addr or "0.0.0.0"
|
||
ensure_user(ip)
|
||
soft_banned = get_status(ip)["soft_banned"]
|
||
return await render_template("index.html", soft_banned=soft_banned)
|
||
|
||
|
||
@app.route("/login", methods=["GET", "POST"])
|
||
async def login():
|
||
if request.method == "GET":
|
||
return await render_template("login.html")
|
||
form = await request.form
|
||
if form.get("password") == os.getenv("MASTER_PASSWORD"):
|
||
resp = redirect(url_for("home"))
|
||
resp.set_cookie("auth", "1", httponly=True, secure=False)
|
||
return resp
|
||
return await render_template("login.html", error_badge="Incorrect password")
|
||
|
||
|
||
@app.route("/logout")
|
||
async def logout():
|
||
session.clear()
|
||
resp = redirect(url_for("login"))
|
||
resp.delete_cookie("auth")
|
||
return resp
|
||
|
||
|
||
@app.route("/choose_format", methods=["POST"])
|
||
async def handle_choose_format() -> Response:
|
||
url: str
|
||
try:
|
||
|
||
if request.content_type and "application/json" in request.content_type:
|
||
data = await request.get_json(silent=True) or {}
|
||
url = (data.get("url") or "").strip()
|
||
else:
|
||
form = await request.form
|
||
url = (form.get("url") or "").strip()
|
||
|
||
if not url:
|
||
#log.warning("choose_format.missing_url")
|
||
return jsonify({"error": "url field required"}), 422
|
||
|
||
|
||
run_id: str = session.get("run_id") or secrets.token_urlsafe(10)
|
||
session["run_id"] = run_id
|
||
#log.info("choose_format.run_id_set", run_id=run_id, url=url)
|
||
|
||
|
||
res: dict = await choose_format_logic(url)
|
||
res["sid"] = run_id
|
||
|
||
if "error" in res:
|
||
#log.warning("choose_format.logic_error", error=res["error"], url=url)
|
||
return jsonify(res), 400
|
||
|
||
log.info("choose_format.success", url=url, title=res.get("title"), platform=res.get("platform"))
|
||
return jsonify(res)
|
||
|
||
except Exception as e:
|
||
log.exception("choose_format.exception", err=str(e))
|
||
return jsonify({"error": "Internal error during format selection"}), 500
|
||
|
||
|
||
@app.route("/download_file")
|
||
async def dl():
|
||
ip = request.remote_addr or "0.0.0.0"
|
||
|
||
|
||
if ip == "127.0.0.1":
|
||
log.info("dev_mode.skip_ban_check", ip=ip)
|
||
else:
|
||
if is_ip_banned(ip):
|
||
log.warning("download.reject.banned", ip=ip)
|
||
return jsonify({"error": "Banned"}), 403
|
||
|
||
|
||
url = request.args.get("url", "").strip()
|
||
fmt = request.args.get("format_id", "").strip()
|
||
sid = request.args.get("sid", "").strip()
|
||
run_id = session.get("run_id")
|
||
|
||
if run_id is None:
|
||
pass
|
||
|
||
if is_ip_banned(ip):
|
||
log.warning("download.reject.banned", ip=ip)
|
||
return jsonify({"error": "Banned"}), 403
|
||
|
||
if not url or not fmt:
|
||
log.warning("download.reject.missing_params", url=url, fmt=fmt)
|
||
return jsonify({"error": "Missing URL or format"}), 400
|
||
|
||
if sid in _tasks and not _tasks[sid].done():
|
||
log.warning("download.reject.already_running", sid=sid)
|
||
return jsonify({"error": "download already running"}), 409
|
||
|
||
if sid != run_id:
|
||
log.warning("download.reject.sid_mismatch", sid=sid, session_run_id=run_id)
|
||
return jsonify({
|
||
"error": "Session mismatch – please refresh the page and select a format again."
|
||
}), 403
|
||
|
||
progress_bus.register(sid)
|
||
|
||
|
||
async def _run_download() -> Path:
|
||
try:
|
||
meta = await asyncio.to_thread(_lookup_cache_sync, url)
|
||
if meta:
|
||
chosen = next((f for f in meta["formats"] if f["format_id"] == fmt), None)
|
||
est = (
|
||
chosen.get("filesize")
|
||
or chosen.get("filesize_approx")
|
||
or 0
|
||
) if chosen else 0
|
||
EST_MB.set(int(est / 1_048_576))
|
||
|
||
log.info("download.starting", sid=sid, url=url, fmt=fmt)
|
||
path_str = await download(url, fmt, ip, sid)
|
||
return Path(path_str)
|
||
|
||
finally:
|
||
_tasks.pop(sid, None)
|
||
|
||
task = asyncio.create_task(_run_download())
|
||
_tasks[sid] = task
|
||
|
||
try:
|
||
tmp_path = await task
|
||
mime = mimetypes.guess_type(tmp_path.name)[0] or "application/octet-stream"
|
||
log.info("download.success", file=str(tmp_path), sid=sid)
|
||
|
||
resp = Response(
|
||
_file_iter(tmp_path),
|
||
headers={
|
||
"Content-Type": mime,
|
||
"Content-Disposition": f'attachment; filename="{tmp_path.name}"',
|
||
},
|
||
)
|
||
|
||
if hasattr(resp, "call_after_response"):
|
||
def _after():
|
||
progress_bus.update(sid, status="finished", pct=100, progress="Done")
|
||
progress_bus.clear(sid)
|
||
if str(tmp_path.parent).startswith(str(TMP_DIR)):
|
||
shutil.rmtree(tmp_path.parent, ignore_errors=True)
|
||
|
||
resp.call_after_response(_after)
|
||
|
||
return resp
|
||
|
||
except asyncio.CancelledError:
|
||
log.warning("download.cancelled", sid=sid)
|
||
progress_bus.update(sid, status="cancelled", progress="Cancelled")
|
||
return jsonify({"error": "Download cancelled"}), 499
|
||
|
||
except Exception as e:
|
||
log.exception("download.failed", sid=sid, err=str(e))
|
||
progress_bus.update(sid, status="error", progress="Error")
|
||
return jsonify({"error": "Download failed"}), 500
|
||
|
||
|
||
@app.route("/cancel_download", methods=["POST"])
|
||
async def cancel_dl():
|
||
sid = request.args.get("sid", "").strip()
|
||
if sid:
|
||
task = _tasks.get(sid)
|
||
if task and not task.done():
|
||
task.cancel()
|
||
progress_bus.update(sid, status="cancelled", progress="Cancelled")
|
||
return jsonify({"status": "cancelled"})
|
||
|
||
|
||
@app.route("/api/progress/<sid>")
|
||
async def progress_stream(sid: str):
|
||
q = progress_bus.subscribe(sid)
|
||
|
||
async def gen():
|
||
while True:
|
||
msg = await q.get()
|
||
yield f"data: {msg}\n\n"
|
||
|
||
return Response(
|
||
gen(),
|
||
content_type="text/event-stream",
|
||
headers={"Cache-Control": "no-store"},
|
||
)
|
||
|
||
|
||
@app.before_serving
|
||
async def _on_startup():
|
||
pass |