49 lines
1.3 KiB
Python
49 lines
1.3 KiB
Python
"""
|
|
network.py - 16 May 2025
|
|
"""
|
|
from __future__ import annotations
|
|
import random, structlog
|
|
from functools import lru_cache
|
|
from typing import Optional
|
|
from fake_useragent import UserAgent
|
|
from tls_client import Session as TLSSession
|
|
from backend.web.db_extra import acquire_proxy, release_proxy, queue_proxy_result
|
|
|
|
log = structlog.get_logger()
|
|
|
|
@lru_cache(maxsize=128)
|
|
def stealth_headers(rotate: bool = False) -> dict[str, str]:
|
|
if rotate:
|
|
stealth_headers.cache_clear()
|
|
|
|
browsers = ["chrome", "firefox", "edge"]
|
|
browser = random.choice(browsers)
|
|
client_id_map = {
|
|
"chrome": ["chrome_122", "chrome_121", "chrome_120"],
|
|
"firefox": ["firefox_123"],
|
|
"edge": ["edge_121"],
|
|
}
|
|
client_id = random.choice(client_id_map[browser])
|
|
TLSSession(client_identifier=client_id)
|
|
|
|
headers = {
|
|
"User-Agent": UserAgent()[browser],
|
|
"Accept-Language": random.choice(
|
|
["en-US,en;q=0.9", "en-GB,en;q=0.9", "en;q=0.8"]
|
|
),
|
|
}
|
|
return headers
|
|
|
|
|
|
def get_proxy() -> str:
|
|
px = acquire_proxy()
|
|
if px:
|
|
return px
|
|
log.debug("proxy.none", msg="DIRECT fallback")
|
|
return "DIRECT"
|
|
|
|
def record_proxy(px: str, ok: bool) -> None:
|
|
if not px or px == "DIRECT":
|
|
return
|
|
queue_proxy_result(px, ok)
|
|
release_proxy(px, ok) |