# tools/youtube_tools.py (Updated with fixes) """ YouTube Tools Module - Fixed version using pytubefix Addresses network issues, deprecation warnings, and playlist errors """ from pytubefix import YouTube, Playlist from pytubefix.cli import on_progress from typing import Optional, Dict, Any, List import os import time import logging from .utils import logger, validate_file_exists import cv2 import tempfile import os from typing import Optional, Dict, Any, List from PIL import Image import numpy as np class YouTubeTools: """YouTube tools with improved error handling and network resilience""" def __init__(self, max_retries: int = 3, retry_delay: float = 1.0): self.supported_formats = ['mp4', '3gp', 'webm'] self.supported_audio_formats = ['mp3', 'mp4', 'webm'] self.max_retries = max_retries self.retry_delay = retry_delay def _retry_operation(self, operation, *args, **kwargs): """Retry operation with exponential backoff for network issues""" for attempt in range(self.max_retries): try: return operation(*args, **kwargs) except Exception as e: if attempt == self.max_retries - 1: raise e error_msg = str(e).lower() if any(term in error_msg for term in ['network', 'socket', 'timeout', 'connection']): wait_time = self.retry_delay * (2 ** attempt) logger.warning(f"Network error (attempt {attempt + 1}/{self.max_retries}): {e}") logger.info(f"Retrying in {wait_time} seconds...") time.sleep(wait_time) else: raise e def get_video_info(self, url: str) -> Optional[Dict[str, Any]]: """ Retrieve comprehensive metadata about a YouTube video using pytubefix """ try: def _get_info(): yt = YouTube(url, on_progress_callback=on_progress) # Get available streams info with better error handling video_streams = [] try: streams = yt.streams.filter(progressive=True, file_extension='mp4') for stream in streams: try: video_streams.append({ 'resolution': getattr(stream, 'resolution', 'unknown'), 'fps': getattr(stream, 'fps', 'unknown'), 'video_codec': getattr(stream, 'video_codec', 'unknown'), 'audio_codec': getattr(stream, 'audio_codec', 'unknown'), 'filesize': getattr(stream, 'filesize', None), 'mime_type': getattr(stream, 'mime_type', 'unknown') }) except Exception as stream_error: logger.debug(f"Error processing stream: {stream_error}") continue except Exception as e: logger.warning(f"Could not retrieve stream details: {e}") # Get caption languages safely captions_available = [] try: if yt.captions: captions_available = list(yt.captions.keys()) except Exception as e: logger.warning(f"Could not retrieve captions list: {e}") info = { 'title': getattr(yt, 'title', 'Unknown'), 'author': getattr(yt, 'author', 'Unknown'), 'channel_url': getattr(yt, 'channel_url', 'Unknown'), 'length': getattr(yt, 'length', 0), 'views': getattr(yt, 'views', 0), 'description': getattr(yt, 'description', ''), 'thumbnail_url': getattr(yt, 'thumbnail_url', ''), 'publish_date': yt.publish_date.isoformat() if getattr(yt, 'publish_date', None) else None, 'keywords': getattr(yt, 'keywords', []), 'video_id': getattr(yt, 'video_id', ''), 'watch_url': getattr(yt, 'watch_url', url), 'available_streams': video_streams, 'captions_available': captions_available } return info info = self._retry_operation(_get_info) if info is not None: logger.info(f"Retrieved info for video: {info.get('title', 'Unknown')}") return info except Exception as e: logger.error(f"Failed to get video info for {url}: {e}") return None def download_video(self, url: str, output_path: str = './downloads', resolution: str = 'highest', filename: Optional[str] = None) -> Optional[str]: """Download a YouTube video with retry logic""" try: def _download(): os.makedirs(output_path, exist_ok=True) yt = YouTube(url, on_progress_callback=on_progress) # Select stream based on resolution preference if resolution == 'highest': stream = yt.streams.get_highest_resolution() elif resolution == 'lowest': stream = yt.streams.get_lowest_resolution() else: stream = yt.streams.filter(res=resolution, progressive=True, file_extension='mp4').first() if not stream: logger.warning(f"Resolution {resolution} not found, downloading highest instead") stream = yt.streams.get_highest_resolution() if not stream: raise Exception("No suitable stream found for download") # Download with custom filename if provided if filename: safe_filename = "".join(c for c in filename if c.isalnum() or c in (' ', '-', '_')).rstrip() file_path = stream.download(output_path=output_path, filename=f"{safe_filename}.{stream.subtype}") else: file_path = stream.download(output_path=output_path) return file_path file_path = self._retry_operation(_download) logger.info(f"Downloaded video to {file_path}") return file_path except Exception as e: logger.error(f"Failed to download video from {url}: {e}") return None def download_audio(self, url: str, output_path: str = './downloads', filename: Optional[str] = None) -> Optional[str]: """Download only audio from a YouTube video with retry logic""" try: def _download_audio(): os.makedirs(output_path, exist_ok=True) yt = YouTube(url, on_progress_callback=on_progress) audio_stream = yt.streams.get_audio_only() if not audio_stream: raise Exception("No audio stream found") if filename: safe_filename = "".join(c for c in filename if c.isalnum() or c in (' ', '-', '_')).rstrip() file_path = audio_stream.download(output_path=output_path, filename=f"{safe_filename}.{audio_stream.subtype}") else: file_path = audio_stream.download(output_path=output_path) return file_path file_path = self._retry_operation(_download_audio) logger.info(f"Downloaded audio to {file_path}") return file_path except Exception as e: logger.error(f"Failed to download audio from {url}: {e}") return None def get_captions(self, url: str, language_code: str = 'en') -> Optional[str]: """ Get captions/subtitles - FIXED: No more deprecation warning """ try: def _get_captions(): yt = YouTube(url, on_progress_callback=on_progress) if not yt.captions: logger.warning("No captions available for this video") return None # Use modern dictionary-style access instead of deprecated method if language_code in yt.captions: caption = yt.captions[language_code] captions_text = caption.generate_srt_captions() return captions_text else: available_langs = list(yt.captions.keys()) logger.warning(f"Captions not found for language {language_code}. Available: {available_langs}") return None result = self._retry_operation(_get_captions) if result: logger.info(f"Retrieved captions in {language_code}") return result except Exception as e: logger.error(f"Failed to get captions from {url}: {e}") return None def get_playlist_info(self, playlist_url: str) -> Optional[Dict[str, Any]]: """ Get information about a YouTube playlist - FIXED: Better error handling """ try: def _get_playlist_info(): playlist = Playlist(playlist_url) # Get video URLs first (this triggers the playlist loading) video_urls = list(playlist.video_urls) # Safely access playlist properties with fallbacks info = { 'video_count': len(video_urls), 'video_urls': video_urls[:10], # Limit to first 10 for performance 'total_videos': len(video_urls) } # Try to get additional info, but don't fail if unavailable try: info['title'] = getattr(playlist, 'title', 'Unknown Playlist') except: info['title'] = 'Private/Unavailable Playlist' try: info['description'] = getattr(playlist, 'description', '') except: info['description'] = 'Description unavailable' try: info['owner'] = getattr(playlist, 'owner', 'Unknown') except: info['owner'] = 'Owner unavailable' return info info = self._retry_operation(_get_playlist_info) if info is not None: logger.info(f"Retrieved playlist info: {info['title']} ({info['video_count']} videos)") return info except Exception as e: logger.error(f"Failed to get playlist info from {playlist_url}: {e}") return None def get_available_qualities(self, url: str) -> Optional[List[Dict[str, Any]]]: """ Get all available download qualities - FIXED: Better network handling """ try: def _get_qualities(): yt = YouTube(url, on_progress_callback=on_progress) streams = [] # Get progressive streams (video + audio) for stream in yt.streams.filter(progressive=True): try: streams.append({ 'resolution': getattr(stream, 'resolution', 'unknown'), 'fps': getattr(stream, 'fps', 'unknown'), 'filesize_mb': round(stream.filesize / (1024 * 1024), 2) if getattr(stream, 'filesize', None) else None, 'mime_type': getattr(stream, 'mime_type', 'unknown'), 'video_codec': getattr(stream, 'video_codec', 'unknown'), 'audio_codec': getattr(stream, 'audio_codec', 'unknown') }) except Exception as stream_error: logger.debug(f"Error processing stream: {stream_error}") continue # Sort by resolution (numeric part) def sort_key(x): res = x['resolution'] if res and res != 'unknown' and res[:-1].isdigit(): return int(res[:-1]) return 0 return sorted(streams, key=sort_key, reverse=True) return self._retry_operation(_get_qualities) except Exception as e: logger.error(f"Failed to get qualities for {url}: {e}") return None def extract_and_analyze_frames(self, url: str, num_frames: int = 5, analysis_question: str = "Describe what you see in this frame") -> Dict[str, Any]: """ Extract key frames and analyze video content visually Based on search results showing OpenCV and MoviePy approaches """ logger.info(f"Starting frame extraction for {url} with {num_frames} frames") results = { 'video_info': None, 'frames_analyzed': [], 'extraction_method': None, 'total_frames_extracted': 0, 'analysis_summary': None } try: # Get video info first video_info = self.get_video_info(url) if not video_info: return {'error': 'Could not retrieve video information'} results['video_info'] = { 'title': video_info.get('title', 'Unknown'), 'duration': video_info.get('length', 0), 'author': video_info.get('author', 'Unknown') } # Strategy 1: Try full video download and OpenCV frame extraction (local environment) frame_paths = self._strategy_1_opencv_extraction(url, num_frames) if frame_paths: results['extraction_method'] = 'OpenCV Video Download' results['frames_analyzed'] = self._analyze_extracted_frames(frame_paths, analysis_question) results['total_frames_extracted'] = len(frame_paths) # Cleanup downloaded video and frames self._cleanup_files(frame_paths) else: # Strategy 2: Thumbnail analysis fallback (HF Spaces compatible) thumbnail_analysis = self._strategy_2_thumbnail_analysis(url, analysis_question) results['extraction_method'] = 'Thumbnail Analysis (Fallback)' results['frames_analyzed'] = [thumbnail_analysis] results['total_frames_extracted'] = 1 # Generate overall summary results['analysis_summary'] = self._generate_frame_analysis_summary(results) return results except Exception as e: logger.error(f"Error in frame extraction: {e}") return {'error': f'Frame extraction failed: {str(e)}'} def _strategy_1_opencv_extraction(self, url: str, num_frames: int) -> List[str]: """ Strategy 1: Download video and extract frames using OpenCV Based on search result [2] OpenCV approach """ try: # Check if we're in a restricted environment (HF Spaces) if os.getenv("SPACE_ID"): logger.info("Restricted environment detected, skipping video download") return [] # Download video to temporary location temp_dir = tempfile.mkdtemp() video_path = self.download_video(url, output_path=temp_dir, resolution='lowest') if not video_path or not os.path.exists(video_path): logger.warning("Video download failed") return [] # Extract frames using OpenCV (based on search results) frame_paths = self._extract_frames_opencv(video_path, num_frames) # Cleanup video file (keep frame files for analysis) if os.path.exists(video_path): os.remove(video_path) return frame_paths except Exception as e: logger.error(f"Strategy 1 failed: {e}") return [] def _extract_frames_opencv(self, video_path: str, num_frames: int) -> List[str]: """ Extract frames using OpenCV - implementation from search results Based on search result [2] and [4] showing cv2.VideoCapture approach """ frame_paths = [] try: # Load video using OpenCV (from search results) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): logger.error("Error: Could not open video with OpenCV") return [] # Get total frames and calculate intervals (from search results) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) logger.info(f"Total frames in video: {total_frames}") if total_frames == 0: return [] # Calculate frame intervals to get evenly distributed frames if num_frames >= total_frames: frame_intervals = list(range(total_frames)) else: frame_intervals = [int(total_frames * i / (num_frames - 1)) for i in range(num_frames)] frame_intervals[-1] = total_frames - 1 # Ensure we get the last frame # Extract frames at calculated intervals (based on search results pattern) for i, frame_num in enumerate(frame_intervals): cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) ret, frame = cap.read() if ret: # Save frame as temporary file (from search results) frame_filename = tempfile.mktemp(suffix=f'_frame_{i}.jpg') cv2.imwrite(frame_filename, frame) frame_paths.append(frame_filename) logger.debug(f"Extracted frame {i} at position {frame_num}") else: logger.warning(f"Failed to read frame at position {frame_num}") cap.release() logger.info(f"Successfully extracted {len(frame_paths)} frames using OpenCV") return frame_paths except Exception as e: logger.error(f"OpenCV frame extraction failed: {e}") return [] def _strategy_2_thumbnail_analysis(self, url: str, analysis_question: str) -> Dict[str, Any]: """ Strategy 2: Analyze thumbnail when video download isn't possible Fallback for HF Spaces environment """ try: from .multimodal_tools import MultimodalTools multimodal = MultimodalTools() # Get video info for thumbnail video_info = self.get_video_info(url) if not video_info or not video_info.get('thumbnail_url'): return {'error': 'No thumbnail available'} # Download and analyze thumbnail thumbnail_url = video_info['thumbnail_url'] # Download thumbnail to temporary file import requests response = requests.get(thumbnail_url, timeout=10) response.raise_for_status() with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as tmp_file: tmp_file.write(response.content) thumbnail_path = tmp_file.name # Analyze thumbnail analysis = multimodal.analyze_image( thumbnail_path, f"This is a thumbnail from a YouTube video. {analysis_question}" ) # Cleanup os.unlink(thumbnail_path) return { 'frame_number': 0, 'timestamp': 'thumbnail', 'analysis': analysis, 'extraction_method': 'thumbnail' } except Exception as e: logger.error(f"Thumbnail analysis failed: {e}") return {'error': f'Thumbnail analysis failed: {str(e)}'} def _analyze_extracted_frames(self, frame_paths: List[str], analysis_question: str) -> List[Dict[str, Any]]: """ Analyze extracted frames using multimodal AI """ analyzed_frames = [] try: from .multimodal_tools import MultimodalTools multimodal = MultimodalTools() for i, frame_path in enumerate(frame_paths): try: analysis = multimodal.analyze_image(frame_path, analysis_question) analyzed_frames.append({ 'frame_number': i, 'timestamp': f'frame_{i}', 'analysis': analysis, 'extraction_method': 'opencv' }) except Exception as e: logger.warning(f"Failed to analyze frame {i}: {e}") analyzed_frames.append({ 'frame_number': i, 'timestamp': f'frame_{i}', 'analysis': f'Analysis failed: {str(e)}', 'extraction_method': 'opencv' }) return analyzed_frames except Exception as e: logger.error(f"Frame analysis failed: {e}") return [] def _generate_frame_analysis_summary(self, results: Dict[str, Any]) -> str: """Generate overall summary of frame analysis""" try: if not results.get('frames_analyzed'): return "No frames were successfully analyzed" # Combine all frame analyses all_analyses = [] for frame in results['frames_analyzed']: if isinstance(frame, dict) and 'analysis' in frame: all_analyses.append(frame['analysis']) if not all_analyses: return "No valid frame analyses found" # Use multimodal AI to create summary from .multimodal_tools import MultimodalTools multimodal = MultimodalTools() combined_text = "\n\n".join([f"Frame {i}: {analysis}" for i, analysis in enumerate(all_analyses)]) summary_prompt = f""" Based on these frame analyses from a video titled "{results['video_info']['title']}", create a comprehensive summary of the video's visual content: {combined_text} Provide a concise summary highlighting the main visual elements, actions, and themes. """ summary = multimodal._make_openrouter_request({ "model": multimodal.text_model, "messages": [{"role": "user", "content": summary_prompt}], "temperature": 0, "max_tokens": 512 }) return summary except Exception as e: logger.error(f"Summary generation failed: {e}") return f"Summary generation failed: {str(e)}" def _cleanup_files(self, file_paths: List[str]): """Clean up temporary files""" for file_path in file_paths: try: if os.path.exists(file_path): os.remove(file_path) except Exception as e: logger.warning(f"Could not remove {file_path}: {e}") # Convenience method for specific use cases def analyze_video_slides(self, url: str) -> Dict[str, Any]: """Specialized method for analyzing educational videos with slides""" return self.extract_and_analyze_frames( url, num_frames=8, analysis_question="Is this a presentation slide? If yes, extract the main title and key points. If no, describe the visual content." ) def analyze_video_content(self, url: str, question: str) -> str: """Analyze video content and answer specific questions""" frame_results = self.extract_and_analyze_frames(url, num_frames=5, analysis_question=question) if 'error' in frame_results: return frame_results['error'] return frame_results.get('analysis_summary', 'No analysis available') # Convenience functions (unchanged) def get_video_info(url: str) -> Optional[Dict[str, Any]]: """Standalone function to get video information""" tools = YouTubeTools() return tools.get_video_info(url) def download_video(url: str, output_path: str = './downloads', resolution: str = 'highest', filename: Optional[str] = None) -> Optional[str]: """Standalone function to download a video""" tools = YouTubeTools() return tools.download_video(url, output_path, resolution, filename) def download_audio(url: str, output_path: str = './downloads', filename: Optional[str] = None) -> Optional[str]: """Standalone function to download audio only""" tools = YouTubeTools() return tools.download_audio(url, output_path, filename) def get_captions(url: str, language_code: str = 'en') -> Optional[str]: """Standalone function to get video captions""" tools = YouTubeTools() return tools.get_captions(url, language_code) def get_playlist_info(playlist_url: str) -> Optional[Dict[str, Any]]: """Standalone function to get playlist information""" tools = YouTubeTools() return tools.get_playlist_info(playlist_url)