import os import tempfile import cv2 import numpy as np from typing import List, Dict, Any import requests from PIL import Image import torch from transformers import ( BlipProcessor, BlipForConditionalGeneration, pipeline, AutoTokenizer, AutoModelForSequenceClassification ) import yt_dlp from smolagents import Tool import whisper import subprocess import time import random class YouTubeVideoProcessorTool(Tool): name = "youtube_video_processor" description = """ Processes YouTube videos to answer questions about their content, including visual elements, people, conversations, actions, and scenes. Takes a YouTube URL and a question as input. """ inputs = { "url": { "type": "string", "description": "YouTube video URL to analyze" }, "questions": { "type": "string", "description": "Question to answer about the video content" } } output_type = "string" def __init__(self): super().__init__() self._setup_models() self._setup_yt_dlp() def _setup_models(self): """Initialize AI models for video analysis""" # Visual question answering model self.blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base") self.blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-vqa-base") # Audio transcription model self.whisper_model = whisper.load_model("base") # Text analysis pipeline self.text_analyzer = pipeline( "question-answering", model="distilbert-base-cased-distilled-squad", tokenizer="distilbert-base-cased-distilled-squad" ) def _get_random_user_agent(self): user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:89.0) Gecko/20100101 Firefox/89.0', ] return random.choice(user_agents) def _setup_yt_dlp(self): """Configure yt-dlp with anti-blocking measures""" self.ydl_opts = { #'format': 'best[height<=720]', # Limit quality to avoid large downloads 'format': 'bestaudio/best', 'extractaudio': True, 'audioformat': 'wav', 'outtmpl': '%(title)s.%(ext)s', 'quiet': True, 'no_warnings': True, # Anti-blocking measures 'sleep_interval': 2, 'max_sleep_interval': 3, 'sleep_interval_requests': 2, 'sleep_interval_subtitles': 2, 'extractor_retries': 3, 'fragment_retries': 3, #'http_headers': { # 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' # Headers rotation 'http_headers': { 'User-Agent': self._get_random_user_agent(), 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip,deflate', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Connection': 'keep-alive', }, # Use proxy rotation if available 'proxy': self._get_random_proxy() if self._has_proxies() else None, } def _has_proxies(self) -> bool: """Check if proxy list is available""" proxy_file = os.environ.get('PROXY_LIST_FILE', 'proxies.txt') return os.path.exists(proxy_file) def _get_random_proxy(self) -> str: """Get random proxy from list""" try: proxy_file = os.environ.get('PROXY_LIST_FILE', 'proxies.txt') with open(proxy_file, 'r') as f: proxies = f.read().strip().split('\n') return random.choice(proxies) if proxies else None except: return None def _setup_youtube_cookies(self) -> str: """Setup YouTube cookies from HuggingFace secrets""" print("_setup_youtube_cookies called") if 'YOUTUBE_COOKIES' in os.environ: # Create temporary cookies file print("Cookies found in environment variables") cookies_content = os.environ['YOUTUBE_COOKIES'] # Write to temporary file with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: f.write(cookies_content) return f.name print("Cookies not found in environment variables") return None def _download_video(self, url: str, temp_dir: str) -> Dict[str, str]: """Download video and audio with anti-blocking measures""" video_path = None audio_path = None # Add random delay to avoid rate limiting time.sleep(random.uniform(1, 3)) try: print("Setting up YouTube cookies...") cookies_file = self._setup_youtube_cookies() if cookies_file: self.ydl_opts['cookiefile'] = cookies_file print("Using YouTube cookies from secrets") else: print("No YouTube cookies found in secrets") with yt_dlp.YoutubeDL(self.ydl_opts) as ydl: # Extract video info first info = ydl.extract_info(url, download=False) title = info.get('title', 'video') # Download video video_opts = self.ydl_opts.copy() video_opts['outtmpl'] = os.path.join(temp_dir, f'{title}_video.%(ext)s') print("Video info extracted, starting download...") max_retries = 3 try: for attempt in range(max_retries): print(f"Attempt {attempt + 1} of {max_retries} to download video...") with yt_dlp.YoutubeDL(video_opts) as video_ydl: video_ydl.download([url]) # If download is successful, break the loop break except Exception as e: # If all attempts fail, return None if attempt == max_retries - 1: return {"video": None, "audio": None} #with yt_dlp.YoutubeDL(video_opts) as video_ydl: # video_ydl.download([url]) # Find downloaded video file for file in os.listdir(temp_dir): if 'video' in file and any(ext in file for ext in ['.mp4', '.webm', '.mkv']): video_path = os.path.join(temp_dir, file) break print(f"Video downloaded: {video_path}") # Extract audio separately audio_opts = self.ydl_opts.copy() audio_opts.update({ 'format': 'bestaudio/best', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'wav', 'preferredquality': '192', }], 'outtmpl': os.path.join(temp_dir, f'{title}_audio.%(ext)s') }) print(f"Starting audio extraction...") with yt_dlp.YoutubeDL(audio_opts) as audio_ydl: audio_ydl.download([url]) # Find audio file for file in os.listdir(temp_dir): if 'audio' in file and file.endswith('.wav'): audio_path = os.path.join(temp_dir, file) break print(f"Audio extracted: {audio_path}") # Clean up temporary cookies file if cookies_file and os.path.exists(cookies_file): os.unlink(cookies_file) except Exception as e: print(f"Trying fallback download method due to error: {str(e)}") # Fallback: try alternative extraction method return self._fallback_download(url, temp_dir) return {"video": video_path, "audio": audio_path} def _fallback_download(self, url: str, temp_dir: str) -> Dict[str, str]: """Fallback download method using different approach""" try: # Use streamlink as fallback if available video_path = os.path.join(temp_dir, "fallback_video.mp4") cmd = f'streamlink "{url}" best -o "{video_path}"' subprocess.run(cmd, shell=True, check=True, capture_output=True) # Extract audio from video audio_path = os.path.join(temp_dir, "fallback_audio.wav") cmd = f'ffmpeg -i "{video_path}" -vn -acodec pcm_s16le -ar 16000 -ac 1 "{audio_path}"' subprocess.run(cmd, shell=True, check=True, capture_output=True) return {"video": video_path, "audio": audio_path} except: return {"video": None, "audio": None} def _extract_frames(self, video_path: str, num_frames: int = 10) -> List[np.ndarray]: """Extract key frames from video""" frames = [] if not video_path or not os.path.exists(video_path): return frames cap = cv2.VideoCapture(video_path) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if total_frames == 0: cap.release() return frames # Extract frames at regular intervals interval = max(1, total_frames // num_frames) for i in range(0, total_frames, interval): cap.set(cv2.CAP_PROP_POS_FRAMES, i) ret, frame = cap.read() if ret: # Convert BGR to RGB frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frames.append(frame_rgb) if len(frames) >= num_frames: break cap.release() return frames def _transcribe_audio(self, audio_path: str) -> str: """Transcribe audio to text using Whisper""" if not audio_path or not os.path.exists(audio_path): return "" try: result = self.whisper_model.transcribe(audio_path) return result["text"] except Exception as e: print(f"Transcription error: {str(e)}") return "" def _analyze_frames_with_question(self, frames: List[np.ndarray], question: str) -> List[str]: """Analyze frames using visual question answering""" answers = [] for frame in frames: try: # Convert numpy array to PIL Image pil_image = Image.fromarray(frame) # Process with BLIP model inputs = self.blip_processor(pil_image, question, return_tensors="pt") with torch.no_grad(): outputs = self.blip_model.generate(**inputs, max_length=50) answer = self.blip_processor.decode(outputs[0], skip_special_tokens=True) if answer and answer.lower() not in ['no', 'none', 'nothing']: answers.append(answer) except Exception as e: print(f"Frame analysis error: {str(e)}") continue return answers def _answer_from_transcript(self, transcript: str, question: str) -> str: """Answer question using transcript analysis""" if not transcript: return "" try: # Split transcript into chunks if too long max_length = 512 chunks = [transcript[i:i+max_length] for i in range(0, len(transcript), max_length)] best_answer = "" best_score = 0 for chunk in chunks: try: result = self.text_analyzer(question=question, context=chunk) if result['score'] > best_score: best_score = result['score'] best_answer = result['answer'] except: continue return best_answer if best_score > 0.1 else "" except Exception as e: print(f"Transcript analysis error: {str(e)}") return "" def forward(self, url: str, questions: str) -> str: """Main processing function""" if not url or not questions: return "Error: URL and questions are required" # Validate URL if 'youtube.com' not in url and 'youtu.be' not in url: return "Error: Invalid YouTube URL" with tempfile.TemporaryDirectory() as temp_dir: try: # Download video and audio print("Downloading video...") paths = self._download_video(url, temp_dir) if not paths["video"] and not paths["audio"]: return "Error: Could not download video. YouTube may be blocking requests or the video is unavailable." # Extract visual information visual_answers = [] if paths["video"]: print("Processing video frames...") frames = self._extract_frames(paths["video"]) if frames: visual_answers = self._analyze_frames_with_question(frames, questions) # Extract and analyze audio transcript = "" audio_answer = "" if paths["audio"]: print("Transcribing audio...") transcript = self._transcribe_audio(paths["audio"]) if transcript: audio_answer = self._answer_from_transcript(transcript, questions) # Combine results result_parts = [] if audio_answer: result_parts.append(f"From transcript: {audio_answer}") if visual_answers: unique_visual = list(set(visual_answers)) result_parts.append(f"From visual analysis: {', '.join(unique_visual[:3])}") if transcript and not audio_answer: # Include relevant transcript snippet words = transcript.split() if len(words) > 50: transcript_snippet = ' '.join(words[:50]) + "..." else: transcript_snippet = transcript result_parts.append(f"Transcript excerpt: {transcript_snippet}") if not result_parts: return "Could not extract sufficient information from the video to answer the question." return "\n\n".join(result_parts) except Exception as e: return f"Error processing video: {str(e)[:200]}"