63 lines
1.8 KiB
Python
63 lines
1.8 KiB
Python
"""
|
|
progress_bus.py - 07 May 2025
|
|
"""
|
|
from __future__ import annotations
|
|
import asyncio, json, time
|
|
from typing import Dict, Any
|
|
|
|
# in-mem state
|
|
_progress: Dict[str, Dict[str, Any]] = {}
|
|
_watchers: Dict[str, list[asyncio.Queue[str]]] = {}
|
|
_TTL = 60 * 60 # keep finished/error records 1 h
|
|
|
|
def _now() -> float: return time.time()
|
|
|
|
def register(sid: str) -> None:
|
|
_progress[sid] = dict(pct=0, progress="", status="running", ts=_now())
|
|
_broadcast(sid)
|
|
|
|
def update(sid: str, *, pct: float | None = None,
|
|
progress: str | None = None, status: str | None = None) -> None:
|
|
if sid not in _progress:
|
|
register(sid)
|
|
p = _progress[sid]
|
|
if pct is not None: p["pct"] = pct
|
|
if progress is not None: p["progress"] = progress
|
|
if status is not None: p["status"] = status
|
|
p["ts"] = _now()
|
|
_broadcast(sid)
|
|
|
|
def get(sid: str) -> Dict[str, Any]:
|
|
_gc()
|
|
return _progress.get(sid, {"status": "idle"})
|
|
|
|
def clear(sid: str) -> None:
|
|
_progress.pop(sid, None)
|
|
_watchers.pop(sid, None)
|
|
|
|
# SSE integration
|
|
def subscribe(sid: str) -> asyncio.Queue[str]:
|
|
q: asyncio.Queue[str] = asyncio.Queue(maxsize=16)
|
|
_watchers.setdefault(sid, []).append(q)
|
|
# immediately push current state
|
|
q.put_nowait(json.dumps({"sid": sid, **get(sid)}))
|
|
return q
|
|
|
|
def _broadcast(sid: str) -> None:
|
|
if sid not in _watchers:
|
|
return
|
|
payload = json.dumps({"sid": sid, **_progress[sid]})
|
|
for q in list(_watchers[sid]):
|
|
try:
|
|
q.put_nowait(payload)
|
|
except asyncio.QueueFull:
|
|
pass # drop frame
|
|
|
|
# garbage collector
|
|
def _gc() -> None:
|
|
now = _now()
|
|
stale = [k for k, v in _progress.items()
|
|
if v["status"] in ("finished", "error") and now - v["ts"] > _TTL]
|
|
for k in stale:
|
|
clear(k)
|