Spaces:
Runtime error
Runtime error
Thanush
Refactor transcript.py to support multiple API keys for youtube-transcript.io and enhance error handling; update llm_response functions to return cost alongside error messages.
97d69f2
import os | |
import time | |
import requests | |
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound | |
# Define a proxies dictionary for requests only | |
proxies = { | |
"http": "http://Vv8lHp2g:kMhGaCi9XZ@103.172.84.179:50100", | |
"https": "http://Vv8lHp2g:kMhGaCi9XZ@103.172.84.179:50100", | |
} | |
YTI_API_URL = "https://www.youtube-transcript.io/api/transcripts" | |
YTI_API_KEYS = [ | |
"681c60baa1baf5a82dd5f382", | |
"681c624386b69cddd17685ed", | |
"681c628dde80881429decd76", | |
"681c62bfa1baf5a82dd5f3b3", | |
"681c62eade80881429decd7f", | |
"68244ac37994b78ec23e3089", | |
"68244c15f0a725b52f52477e", | |
"68244c6d7994b78ec23e30b4" | |
] | |
def fetch_transcript(video_id: str, max_retries: int = 2, backoff_factor: float = 2.0) -> str: | |
""" | |
Fetch YouTube transcript with three-tier fallback: | |
1) YouTubeTranscriptApi (en, hi) | |
2) youtube-transcript.io API (English) | |
3) YouTube timedtext web endpoint (English) | |
Retries each method up to max_retries with exponential backoff. | |
Returns the full transcript as a single string. | |
Raises the last exception if all methods fail. | |
""" | |
def _retry(fn, *args, **kwargs): | |
"""Helper to retry a fetch function with exponential backoff.""" | |
for attempt in range(max_retries): | |
try: | |
return fn(*args, **kwargs) | |
except Exception as err: | |
if attempt < max_retries - 1: | |
wait = backoff_factor ** attempt | |
print(f"[Retry] {fn.__name__} failed (attempt {attempt+1}), retrying in {wait}s...") | |
time.sleep(wait) | |
else: | |
# Last attempt: re-raise | |
raise | |
# --- Method 1: youtube_transcript_api --- | |
def _yt_api(): | |
print(f"[Transcript] Trying youtube_transcript_api for video_id={video_id}") | |
try: | |
# Try direct get_transcript | |
segs = YouTubeTranscriptApi.get_transcript(video_id, languages=['en', 'hi']) | |
except (TranscriptsDisabled, NoTranscriptFound): | |
print(f"[Transcript] youtube_transcript_api direct failed, trying list_transcripts for video_id={video_id}") | |
# Fallback to listing then manual/generate selection | |
transcripts = YouTubeTranscriptApi.list_transcripts(video_id) | |
try: | |
t = transcripts.find_transcript(['en']) | |
except NoTranscriptFound: | |
print(f"[Transcript] No English transcript, trying generated Hindi transcript for video_id={video_id}") | |
t = transcripts.find_generated_transcript(['hi']) | |
segs = t.fetch() | |
print(f"[Transcript] youtube_transcript_api succeeded for video_id={video_id}") | |
return " ".join(s['text'] for s in segs) | |
try: | |
return _retry(_yt_api) | |
except Exception as e1: | |
print(f"[Fallback 1 failed] {e1}") | |
print(f"[Transcript] Falling back to youtube-transcript.io API for video_id={video_id}") | |
# --- Method 2: youtube-transcript.io API --- | |
def _yti_api(): | |
print(f"[Transcript] Trying youtube-transcript.io API for video_id={video_id}") | |
last_err = None | |
for idx, api_key in enumerate(YTI_API_KEYS): | |
print(f"[Transcript] Using API key {idx+1}/{len(YTI_API_KEYS)}: {api_key}") | |
try: | |
resp = requests.post( | |
YTI_API_URL, | |
headers={ | |
"Authorization": f"Basic {api_key}", | |
"Content-Type": "application/json" | |
}, | |
json={"ids": [video_id]}, | |
proxies=proxies | |
) | |
resp.raise_for_status() | |
data = resp.json() | |
# Try 'segments' format first | |
segments = data.get(video_id, {}).get("segments", []) | |
if segments: | |
print(f"[Transcript] youtube-transcript.io API succeeded with 'segments' for video_id={video_id}") | |
return " ".join(seg["text"] for seg in segments) | |
# Try 'tracks' format (like example.py) | |
tracks = data.get(video_id, {}).get("tracks", []) | |
if tracks and tracks[0].get("transcript"): | |
print(f"[Transcript] youtube-transcript.io API succeeded with 'tracks' for video_id={video_id}") | |
lines = [entry["text"] for entry in tracks[0].get("transcript", [])] | |
return " ".join(lines) | |
print(f"[Transcript] No transcript found in 'segments' or 'tracks' for video_id={video_id} with key {api_key}") | |
last_err = ValueError("No transcript found in 'segments' or 'tracks'") | |
except Exception as e: | |
print(f"[Transcript] API key {api_key} failed: {e}") | |
last_err = e | |
raise last_err or Exception("All youtube-transcript.io API keys failed") | |
try: | |
return _retry(_yti_api) | |
except Exception as e2: | |
print(f"[Fallback 2 failed] {e2}") | |
print(f"[Transcript] Falling back to YouTube timedtext endpoint for video_id={video_id}") | |
# --- Method 3: YouTube timedtext endpoint --- | |
def _yt_timedtext(): | |
print(f"[Transcript] Trying YouTube timedtext endpoint for video_id={video_id}") | |
# construct the "web" captions endpoint | |
url = ( | |
f"https://video.google.com/timedtext?" | |
f"lang=en&v={video_id}" | |
) | |
resp = requests.get(url, timeout=10, proxies=proxies) | |
resp.raise_for_status() | |
# XML format: <transcript><text start="..." dur="...">...text...</text>...</transcript> | |
import xml.etree.ElementTree as ET | |
root = ET.fromstring(resp.text) | |
if not list(root): | |
raise ValueError("No captions in timedtext response") | |
print(f"[Transcript] YouTube timedtext endpoint succeeded for video_id={video_id}") | |
# combine all text nodes | |
return " ".join(node.text.strip().replace('\n', ' ') for node in root.findall('text') if node.text) | |
# final attempt; if this fails it'll raise | |
return _retry(_yt_timedtext) | |