"""yt-dlp transcript proxy — HuggingFace Space API Bypasses HF's YouTube DNS block via DNS-over-HTTPS resolution. """ import json, re, os, ssl, socket from urllib.request import Request, urlopen from fastapi import FastAPI, Query, HTTPException from fastapi.responses import PlainTextResponse import yt_dlp app = FastAPI(title="yt-dlp transcript proxy") YT_URL = "https://www.youtube.com/watch?v={}" # --------------------------------------------------------------------------- # DNS-over-HTTPS bootstrap — resolve YouTube IPs at startup # --------------------------------------------------------------------------- YOUTUBE_HOSTS = [ "www.youtube.com", "youtube.com", "youtubei.googleapis.com", "www.google.com", # for consent redirects ] def _resolve_via_doh(hostname): """Resolve hostname via Google DNS-over-HTTPS.""" try: url = f"https://dns.google/resolve?name={hostname}&type=A" with urlopen(url, timeout=10) as r: data = json.loads(r.read()) return [a["data"] for a in data.get("Answer", []) if a.get("type") == 1] except Exception as e: print(f"[DNS] DoH failed for {hostname}: {e}") return [] def _bootstrap_dns(): """Patch DNS resolution for YouTube hosts.""" ip_map = {} for host in YOUTUBE_HOSTS: ips = _resolve_via_doh(host) if ips: ip_map[host] = ips[0] print(f"[DNS] {host} -> {ips[0]}") if not ip_map: print("[DNS] WARNING: No YouTube IPs resolved") return # Try /etc/hosts first try: with open("/etc/hosts", "a") as f: f.write("\n# YouTube DoH resolution\n") for host, ip in ip_map.items(): f.write(f"{ip} {host}\n") print("[DNS] Patched /etc/hosts") return except PermissionError: pass # Fallback: monkey-patch socket.getaddrinfo _orig = socket.getaddrinfo def _patched(host, port, *args, **kwargs): if host in ip_map: return [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (ip_map[host], port or 443))] return _orig(host, port, *args, **kwargs) socket.getaddrinfo = _patched print(f"[DNS] Socket patched for: {list(ip_map.keys())}") _bootstrap_dns() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _base_opts(): return { "quiet": True, "no_warnings": True, "skip_download": True, "socket_timeout": 30, "nocheckcertificate": True, } def _ssl_ctx(): ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return ctx # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @app.get("/") def health(): dns_ok = False try: socket.getaddrinfo("www.youtube.com", 443) dns_ok = True except Exception: pass return { "ok": True, "service": "yt-dlp-transcript-proxy", "yt_dlp_version": yt_dlp.version.__version__, "youtube_dns": dns_ok, } @app.get("/subs") def list_subs(v: str = Query(..., description="YouTube video ID")): """List available subtitle languages.""" opts = _base_opts() try: with yt_dlp.YoutubeDL(opts) as ydl: info = ydl.extract_info(YT_URL.format(v), download=False) except yt_dlp.utils.DownloadError as e: raise HTTPException(status_code=400, detail=str(e)) manual = info.get("subtitles") or {} auto = info.get("automatic_captions") or {} return { "video_id": v, "title": info.get("title"), "duration": info.get("duration"), "manual": {lang: [f["ext"] for f in fmts] for lang, fmts in manual.items()}, "auto": list(auto.keys())[:20], } @app.get("/transcript") def get_transcript( v: str = Query(..., description="YouTube video ID"), lang: str = Query("en", description="Language code"), auto: bool = Query(True, description="Include auto-generated captions"), fmt: str = Query("json", description="Output: json or text"), ): """Extract transcript with timestamps.""" opts = _base_opts() try: with yt_dlp.YoutubeDL(opts) as ydl: info = ydl.extract_info(YT_URL.format(v), download=False) except yt_dlp.utils.DownloadError as e: raise HTTPException(status_code=400, detail=str(e)) # Find subtitle URL (prefer manual, fallback to auto) sub_url = None sources = ["subtitles", "automatic_captions"] if auto else ["subtitles"] for src in sources: subs = info.get(src) or {} if lang in subs: for f in subs[lang]: if f.get("ext") == "json3": sub_url = f.get("url") break if sub_url: break if not sub_url: available = list((info.get("subtitles") or {}).keys()) raise HTTPException( status_code=404, detail=f"No subtitles for lang={lang}. Available: {available}" ) # Fetch subtitle data try: req = Request(sub_url, headers={"User-Agent": "Mozilla/5.0"}) with urlopen(req, context=_ssl_ctx(), timeout=30) as r: raw = json.loads(r.read()) except Exception as e: raise HTTPException(status_code=502, detail=f"Subtitle fetch failed: {e}") segments = _parse_json3(raw) if fmt == "text": return PlainTextResponse(" ".join(s["text"] for s in segments)) return { "video_id": v, "title": info.get("title"), "channel": info.get("channel"), "duration": info.get("duration"), "language": lang, "segment_count": len(segments), "full_text": " ".join(s["text"] for s in segments), "segments": segments, } def _parse_json3(raw): events = raw.get("events", []) segments = [] for ev in events: segs = ev.get("segs") if not segs: continue text = "".join(s.get("utf8", "") for s in segs).strip().replace("\n", " ") if not text: continue start_ms = ev.get("tStartMs", 0) dur_ms = ev.get("dDurationMs", 0) segments.append({ "start": round(start_ms / 1000, 2), "end": round((start_ms + dur_ms) / 1000, 2), "text": text, }) return segments