"""backend/core/formats.py — patched 2025-06-03""" from __future__ import annotations import asyncio import os import re import urllib.parse as _url from datetime import datetime, timezone from functools import lru_cache from pathlib import Path from urllib.parse import urlparse import yt_dlp import structlog from sqlalchemy import select, delete, Table, Column, Text, DateTime, JSON from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.exc import NoResultFound from core.db import SessionLocal, metadata from core.network import get_proxy, record_proxy, stealth_headers from core.settings import FORMAT_CACHE_TTL_SEC log = structlog.get_logger() format_cache = Table( "format_cache", metadata, Column("url", Text, primary_key=True), Column("cached_at", DateTime, nullable=False), Column("info", JSON, nullable=False), ) _YT_PAT = re.compile(r"(youtu\.be/|youtube\.com/(?:watch|shorts))", re.I) _BC_PAT = re.compile(r"\.bandcamp\.com", re.I) _SC_PAT = re.compile(r"(?:soundcloud\.com|on\.soundcloud\.com|m\.soundcloud\.com)", re.I) _TW_PAT = re.compile(r"(?:twitter\.com|x\.com|mobile\.twitter\.com)", re.I) _ansi_escape = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]") # resolve cookie file path from env or fallback to root-relative path COOKIE_FILE = Path(os.getenv("YT_COOKIE_FILE", Path(__file__).resolve().parents[2] / "playwright_cookies.txt")) log.info("cookie_file_resolved", path=str(COOKIE_FILE), exists=COOKIE_FILE.exists()) def _canonical_url(u: str) -> str: u = u.strip() if not u.lower().startswith(("http://", "https://")): return u if _YT_PAT.search(u): parsed = _url.urlparse(u) if "youtu.be" in parsed.netloc: vid = parsed.path.lstrip("/") else: q = _url.parse_qs(parsed.query) vid = (q.get("v") or [None])[0] if not vid and parsed.path.startswith("/shorts/"): vid = parsed.path.split("/")[2] return f"https://www.youtube.com/watch?v={vid}" if vid else u if _BC_PAT.search(u): parsed = _url.urlparse(u) clean = parsed._replace(query="", fragment="") return _url.urlunparse(clean) if _SC_PAT.search(u): u2 = ( u.replace("m.soundcloud.com", "soundcloud.com") .replace("on.soundcloud.com", "soundcloud.com") ) return u2.split("?")[0].split("#")[0] if _TW_PAT.search(u): parsed = _url.urlparse( u.replace("mobile.twitter.com", "x.com").replace("twitter.com", "x.com") ) clean = parsed._replace(query="", fragment="") return _url.urlunparse(clean) parsed = _url.urlparse(u) clean = parsed._replace(query="", fragment="") return _url.urlunparse(clean) def _clean_proxy(proxy: str) -> str: if not proxy or proxy.upper() == "DIRECT": return "DIRECT" parsed = urlparse(proxy) return ( f"{parsed.scheme}://{parsed.hostname}{f':{parsed.port}' if parsed.port else ''}" if parsed.hostname else proxy ) def platform_badge(u: str) -> str: l = u.lower() if "youtu" in l: return "youtube" if "soundcloud" in l: return "soundcloud" if "twitter" in l or "x.com" in l: return "twitterx" if "bandcamp" in l: return "bandcamp" return "other" def user_facing_formats(fmts: list[dict]) -> list[dict]: desired_heights = [1440, 1080, 720, 480, 360] out: list[dict] = [] audio_only = [ f for f in fmts if f.get("vcodec") == "none" and f.get("acodec") != "none" ] if audio_only: best = max(audio_only, key=lambda x: x.get("tbr") or 0) out.append( { "format_id": best["format_id"], "ext": best.get("ext", "mp3"), "label": "Audio (.mp3)", } ) for h in desired_heights: candidates = [f for f in fmts if f.get("height") == h and f.get("vcodec") != "none"] if candidates: best = max(candidates, key=lambda x: x.get("tbr") or 0) out.append( { "format_id": best["format_id"], "ext": best.get("ext", "mp4"), "label": f"{h}p", } ) return out @lru_cache(maxsize=1024) def _cached_metadata_fetch(url: str) -> dict: opts = {"quiet": True, "skip_download": True} try: with yt_dlp.YoutubeDL(opts) as ydl: return ydl.extract_info(url, download=False) except Exception as e: msg = _ansi_escape.sub("", str(e)).strip() log.warning("metadata_fail_direct", url=url, err=msg) raise def _fetch_metadata_sync(url: str, proxy_url: str = "DIRECT") -> dict: opts = { "quiet": True, "skip_download": True, "proxy": None if proxy_url == "DIRECT" else proxy_url, "http_headers": stealth_headers(), "cookiefile": str(COOKIE_FILE), } if not COOKIE_FILE.exists(): log.warning("cookie_file_missing", path=str(COOKIE_FILE)) try: with yt_dlp.YoutubeDL(opts) as ydl: return ydl.extract_info(url, download=False) except Exception as e: clean_proxy = _clean_proxy(proxy_url) msg = _ansi_escape.sub("", str(e)).strip() log.warning("metadata_fail_proxy", url=url, proxy=clean_proxy, err=msg) raise async def _fetch_metadata(url: str) -> dict: if any(x in url.lower() for x in ("youtube.com", "youtu.be", "bandcamp.com")): return await asyncio.to_thread(_cached_metadata_fetch, url) for attempt in range(1, 4): proxy = get_proxy() try: info = await asyncio.to_thread(_fetch_metadata_sync, url, proxy) if not info.get("formats"): raise ValueError("No formats found") record_proxy(proxy, True) return info except Exception as e: record_proxy(proxy, False) err_msg = _ansi_escape.sub("", str(e)).strip() log.warning( "metadata_retry_fail", attempt=attempt, proxy=_clean_proxy(proxy), err=err_msg, ) raise RuntimeError("Format fetch failed after 3 attempts") async def choose_format(url: str) -> dict: url = _canonical_url(url) if not re.match(r"^https?://", url, re.I): return {"error": "Invalid URL"} if any(x in url.lower() for x in ("soundcloud.com", "x.com")): return {"auto_download": True, "fmt_id": "bestaudio", "url": url} info = await asyncio.to_thread(_lookup_cache_sync, url) if info: return { "formats": user_facing_formats(info["formats"]), "title": info.get("title", "Unknown"), "platform": info.get("platform", ""), "url": url, } info_raw = await _fetch_metadata(url) cache_doc = { "title": info_raw.get("title", "Unknown"), "formats": info_raw.get("formats", []), "platform": platform_badge(url), } await asyncio.to_thread(_store_cache_sync, url, cache_doc) return { "formats": user_facing_formats(info_raw.get("formats", [])), "title": cache_doc["title"], "platform": cache_doc["platform"], "url": url, } def _lookup_cache_sync(url: str) -> dict | None: now = datetime.now(timezone.utc) with SessionLocal() as session: try: row = session.execute( select(format_cache.c.info, format_cache.c.cached_at).where( format_cache.c.url == url ) ).one() except NoResultFound: return None info, cached_at = row if cached_at.tzinfo is None: cached_at = cached_at.replace(tzinfo=timezone.utc) if (now - cached_at).total_seconds() > FORMAT_CACHE_TTL_SEC: session.execute(delete(format_cache).where(format_cache.c.url == url)) session.commit() return None return info def _store_cache_sync(url: str, info: dict) -> None: now = datetime.now(timezone.utc) stmt = ( pg_insert(format_cache) .values(url=url, cached_at=now, info=info) .on_conflict_do_update(index_elements=["url"], set_={"cached_at": now, "info": info}) ) with SessionLocal.begin() as session: session.execute(stmt)