import modal from fastapi import FastAPI, UploadFile, File, Body, Query from fastapi.responses import JSONResponse web_app = FastAPI(title="MCP Video Analysis API") import os import tempfile import io # Used by Whisper for BytesIO import httpx # For downloading videos from URLs from typing import Optional, List, Dict, Any import json import hashlib from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import re # For parsing search results import yt_dlp import asyncio # For concurrent video processing import gradio as gr # Global Configuration (should be at the top of the file) WHISPER_MODEL_NAME = "openai/whisper-large-v3" # Use latest Whisper model CAPTION_MODEL_NAME = "microsoft/xclip-base-patch16" # For SpaceTimeGPT alternative CAPTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # For SpaceTimeGPT's video encoder # CAPTION_TOKENIZER_NAME = "gpt2" # For SpaceTimeGPT's text decoder (usually part of processor) ACTION_MODEL_NAME = "MCG-NJU/videomae-base-finetuned-kinetics" ACTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # Or VideoMAEImageProcessor.from_pretrained(ACTION_MODEL_NAME) OBJECT_DETECTION_MODEL_NAME = "facebook/detr-resnet-50" OBJECT_DETECTION_PROCESSOR_NAME = "facebook/detr-resnet-50" # --- Modal Image Definition --- video_analysis_image_v2 = ( modal.Image.debian_slim(python_version="3.10") .apt_install("ffmpeg") .pip_install( "gradio==3.50.2", # Pin Gradio version for stability "transformers[torch]", # For all Hugging Face models and PyTorch "soundfile", # For Whisper "av", # For video frame extraction "Pillow", # For image processing "timm", # Often a dependency for vision models "torchvision", "torchaudio", "fastapi[standard]", # For web endpoints "pydantic", "yt-dlp", # For request body validation "httpx", # For downloading video from URL "cowsay==6.1" # Cache-busting package ) ) # --- Modal App Definition --- app = modal.App(name="video-analysis-gradio-pipeline") # New app name, using App # --- Pydantic model for web endpoint request --- class VideoAnalysisRequestPayload(BaseModel): video_url: Optional[str] = None class TopicAnalysisRequest(BaseModel): topic: str max_videos: int = Query(3, ge=1, le=10) # Default 3, min 1, max 10 videos # --- Constants for Model Names --- # WHISPER_MODEL_NAME = "openai/whisper-large-v3" CAPTION_MODEL_NAME = "Neleac/SpaceTimeGPT" CAPTION_PROCESSOR_NAME = "Neleac/SpaceTimeGPT" # Use processor from SpaceTimeGPT itself # # CAPTION_TOKENIZER_NAME = "gpt2" # For SpaceTimeGPT's text decoder (usually part of processor) # ACTION_MODEL_NAME = "MCG-NJU/videomae-base-finetuned-kinetics" # ACTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # Or VideoMAEImageProcessor.from_pretrained(ACTION_MODEL_NAME) # OBJECT_DETECTION_MODEL_NAME = "facebook/detr-resnet-50" # OBJECT_DETECTION_PROCESSOR_NAME = "facebook/detr-resnet-50" # --- Modal Distributed Dictionary for Caching --- video_analysis_cache = modal.Dict.from_name("video_analysis_cache", create_if_missing=True) # --- Hugging Face Token Secret --- HF_TOKEN_SECRET = modal.Secret.from_name("my-huggingface-secret") # --- Helper: Hugging Face Login --- def _login_to_hf(): import os from huggingface_hub import login hf_token = os.environ.get("HF_TOKEN") if hf_token: try: login(token=hf_token) print("Successfully logged into Hugging Face Hub.") return True except Exception as e: print(f"Hugging Face Hub login failed: {e}") return False else: print("HF_TOKEN secret not found. Some models might fail to load.") return False # === 1. Transcription with Whisper === @app.function( image=video_analysis_image_v2, secrets=[HF_TOKEN_SECRET], gpu="any", timeout=600 ) def transcribe_video_with_whisper(video_bytes: bytes) -> str: _login_to_hf() import torch from transformers import pipeline import soundfile as sf import av # For robust audio extraction import numpy as np import io print("[Whisper] Starting transcription.") temp_audio_path = None try: # Robust audio extraction using PyAV with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file: tmp_video_file.write(video_bytes) video_path = tmp_video_file.name container = av.open(video_path) audio_stream = next((s for s in container.streams if s.type == 'audio'), None) if audio_stream is None: return "Whisper Error: No audio stream found in video." # Decode and resample audio to 16kHz mono WAV # Store resampled audio in a temporary WAV file with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_audio_file_for_sf: temp_audio_path = tmp_audio_file_for_sf.name output_container = av.open(temp_audio_path, mode='w') output_stream = output_container.add_stream('pcm_s16le', rate=16000, layout='mono') for frame in container.decode(audio_stream): for packet in output_stream.encode(frame): output_container.mux(packet) # Flush stream for packet in output_stream.encode(): output_container.mux(packet) output_container.close() container.close() os.remove(video_path) # Clean up temp video file pipe = pipeline( "automatic-speech-recognition", model=WHISPER_MODEL_NAME, torch_dtype=torch.float16, device="cuda:0" if torch.cuda.is_available() else "cpu", ) print(f"[Whisper] Pipeline loaded. Transcribing {temp_audio_path}...") # Add robust error handling for the Whisper model try: outputs = pipe(temp_audio_path, chunk_length_s=30, stride_length_s=5, batch_size=8, generate_kwargs={"language": "english"}, return_timestamps=False) except Exception as whisper_err: print(f"[Whisper] Error during transcription: {whisper_err}") # Try again with different settings if the first attempt failed print(f"[Whisper] Attempting fallback transcription with smaller chunk size...") outputs = pipe(temp_audio_path, chunk_length_s=10, stride_length_s=2, batch_size=4, generate_kwargs={"language": "english"}, return_timestamps=False) transcription = outputs["text"] print(f"[Whisper] Transcription successful: {transcription[:100]}...") return transcription except Exception as e: print(f"[Whisper] Error: {e}") import traceback traceback.print_exc() return f"Whisper Error: {str(e)}" finally: if temp_audio_path and os.path.exists(temp_audio_path): os.remove(temp_audio_path) if 'video_path' in locals() and video_path and os.path.exists(video_path): os.remove(video_path) # Ensure temp video is cleaned up if audio extraction failed early # === 2. Captioning with SpaceTimeGPT === @app.function( image=video_analysis_image_v2, secrets=[HF_TOKEN_SECRET], gpu="any", timeout=600 ) def generate_captions_with_spacetimegpt(video_bytes: bytes) -> str: _login_to_hf() import torch from transformers import AutoProcessor, AutoModelForVision2Seq import av import numpy as np import tempfile print("[SpaceTimeGPT] Starting captioning.") video_path = None try: with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file: tmp_video_file.write(video_bytes) video_path = tmp_video_file.name container = av.open(video_path) video_stream = next((s for s in container.streams if s.type == 'video'), None) if video_stream is None: return "SpaceTimeGPT Error: No video stream found." num_frames_to_sample = 16 total_frames = video_stream.frames if total_frames == 0: return "SpaceTimeGPT Error: Video has no frames." indices = np.linspace(0, total_frames - 1, num_frames_to_sample, dtype=int) frames = [] for i in indices: container.seek(int(i), stream=video_stream) frame = next(container.decode(video_stream)) frames.append(frame.to_rgb().to_ndarray()) container.close() video_frames_np = np.stack(frames) processor = AutoProcessor.from_pretrained(CAPTION_PROCESSOR_NAME, trust_remote_code=True) # Debug prints print(f"[SpaceTimeGPT] DEBUG: CAPTION_MODEL_NAME is {CAPTION_MODEL_NAME}") print(f"[SpaceTimeGPT] DEBUG: Intending to use model class: {AutoModelForVision2Seq.__name__}") print(f"[SpaceTimeGPT] DEBUG: Type of model class object: {type(AutoModelForVision2Seq)}") model = AutoModelForVision2Seq.from_pretrained(CAPTION_MODEL_NAME, trust_remote_code=True) device = "cuda:0" if torch.cuda.is_available() else "cpu" model.to(device) if hasattr(processor, 'tokenizer'): # Check if tokenizer exists processor.tokenizer.padding_side = "right" print("[SpaceTimeGPT] Model and processor loaded. Generating captions...") inputs = processor(text=None, videos=list(video_frames_np), return_tensors="pt", padding=True).to(device) generated_ids = model.generate(**inputs, max_new_tokens=128) captions = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip() print(f"[SpaceTimeGPT] Captioning successful: {captions}") return captions except Exception as e: print(f"[SpaceTimeGPT] Error: {e}") import traceback traceback.print_exc() return f"SpaceTimeGPT Error: {str(e)}" finally: if video_path and os.path.exists(video_path): os.remove(video_path) # === 3. Action Recognition with VideoMAE === @app.function( image=video_analysis_image_v2, secrets=[HF_TOKEN_SECRET], gpu="any", timeout=600 ) def generate_action_labels(video_bytes: bytes) -> List[Dict[str, Any]]: _login_to_hf() import torch from transformers import VideoMAEImageProcessor, VideoMAEForVideoClassification import av import numpy as np import tempfile print("[VideoMAE] Starting action recognition.") video_path = None try: with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file: tmp_video_file.write(video_bytes) video_path = tmp_video_file.name container = av.open(video_path) video_stream = next((s for s in container.streams if s.type == 'video'), None) if video_stream is None: return [{"error": "VideoMAE Error: No video stream found."}] num_frames_to_sample = 16 total_frames = video_stream.frames if total_frames == 0: return [{"error": "VideoMAE Error: Video has no frames."}] indices = np.linspace(0, total_frames - 1, num_frames_to_sample, dtype=int) video_frames_list = [] for i in indices: container.seek(int(i), stream=video_stream) frame = next(container.decode(video_stream)) video_frames_list.append(frame.to_rgb().to_ndarray()) container.close() processor = VideoMAEImageProcessor.from_pretrained(ACTION_PROCESSOR_NAME) model = VideoMAEForVideoClassification.from_pretrained(ACTION_MODEL_NAME) device = "cuda:0" if torch.cuda.is_available() else "cpu" model.to(device) print("[VideoMAE] Model and processor loaded. Classifying actions...") inputs = processor(video_frames_list, return_tensors="pt").to(device) with torch.no_grad(): outputs = model(**inputs) logits = outputs.logits top_k = 5 probabilities = torch.softmax(logits, dim=-1) top_probs, top_indices = torch.topk(probabilities, top_k) results = [] for i in range(top_k): label = model.config.id2label[top_indices[0, i].item()] score = top_probs[0, i].item() results.append({"action": label, "confidence": round(score, 4)}) print(f"[VideoMAE] Action recognition successful: {results}") return results except Exception as e: print(f"[VideoMAE] Error: {e}") import traceback traceback.print_exc() return [{"error": f"VideoMAE Error: {str(e)}"}] finally: if video_path and os.path.exists(video_path): os.remove(video_path) # === 4. Object Detection with DETR === @app.function( image=video_analysis_image_v2, secrets=[HF_TOKEN_SECRET], gpu="any", timeout=600 ) def generate_object_detection(video_bytes: bytes) -> List[Dict[str, Any]]: _login_to_hf() import torch from transformers import DetrImageProcessor, DetrForObjectDetection from PIL import Image # Imported but not directly used, av.frame.to_image() is used import av import numpy as np import tempfile print("[DETR] Starting object detection.") video_path = None try: with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file: tmp_video_file.write(video_bytes) video_path = tmp_video_file.name container = av.open(video_path) video_stream = next((s for s in container.streams if s.type == 'video'), None) if video_stream is None: return [{"error": "DETR Error: No video stream found."}] num_frames_to_extract = 3 total_frames = video_stream.frames if total_frames == 0: return [{"error": "DETR Error: Video has no frames."}] frame_indices = np.linspace(0, total_frames - 1, num_frames_to_extract, dtype=int) processor = DetrImageProcessor.from_pretrained(OBJECT_DETECTION_PROCESSOR_NAME) model = DetrForObjectDetection.from_pretrained(OBJECT_DETECTION_MODEL_NAME) device = "cuda:0" if torch.cuda.is_available() else "cpu" model.to(device) print("[DETR] Model and processor loaded.") all_frame_detections = [] for frame_num, target_frame_index in enumerate(frame_indices): container.seek(int(target_frame_index), stream=video_stream) frame = next(container.decode(video_stream)) pil_image = frame.to_image() print(f"[DETR] Processing frame {frame_num + 1}/{num_frames_to_extract} (original index {target_frame_index})...") inputs = processor(images=pil_image, return_tensors="pt").to(device) outputs = model(**inputs) target_sizes = torch.tensor([pil_image.size[::-1]], device=device) results = processor.post_process_object_detection(outputs, target_sizes=target_sizes, threshold=0.7)[0] frame_detections = [] for score, label, box in zip(results["scores"], results["labels"], results["boxes"]): frame_detections.append({ "label": model.config.id2label[label.item()], "confidence": round(score.item(), 3), "box": [round(coord) for coord in box.tolist()] }) if frame_detections: # Only add if detections are present for this frame all_frame_detections.append({ "frame_number": frame_num + 1, "original_frame_index": int(target_frame_index), "detections": frame_detections }) container.close() print(f"[DETR] Object detection successful: {all_frame_detections if all_frame_detections else 'No objects detected with threshold.'}") return all_frame_detections if all_frame_detections else [{"info": "No objects detected with current threshold."}] except Exception as e: print(f"[DETR] Error: {e}") import traceback traceback.print_exc() return [{"error": f"DETR Error: {str(e)}"}] finally: if video_path and os.path.exists(video_path): os.remove(video_path) # === 5. Comprehensive Video Analysis (Orchestrator) === @app.function( image=video_analysis_image_v2, secrets=[HF_TOKEN_SECRET], gpu="any", # Request GPU as some sub-tasks will need it timeout=1800, # Generous timeout for all models # allow_concurrent_inputs=10, # Optional: if you expect many parallel requests # keep_warm=1 # Optional: to keep one instance warm for faster cold starts ) async def analyze_video_comprehensive(video_bytes: bytes) -> Dict[str, Any]: print("[Orchestrator] Starting comprehensive video analysis.") cache_key = hashlib.sha256(video_bytes).hexdigest() try: cached_result = video_analysis_cache.get(cache_key) if cached_result: print(f"[Orchestrator] Cache hit for key: {cache_key}") return cached_result except Exception as e: # Log error but proceed with analysis if cache get fails print(f"[Orchestrator] Cache GET error: {e}. Proceeding with fresh analysis.") print(f"[Orchestrator] Cache miss for key: {cache_key}. Performing full analysis.") results = {} print("[Orchestrator] Calling transcription...") try: # .call() is synchronous in the context of the Modal function execution results["transcription"] = transcribe_video_with_whisper.remote(video_bytes) except Exception as e: print(f"[Orchestrator] Error in transcription: {e}") results["transcription"] = f"Transcription Error: {str(e)}" print("[Orchestrator] Calling captioning...") try: results["caption"] = generate_captions_with_spacetimegpt.remote(video_bytes) except Exception as e: print(f"[Orchestrator] Error in captioning: {e}") results["caption"] = f"Captioning Error: {str(e)}" print("[Orchestrator] Calling action recognition...") try: results["actions"] = generate_action_labels.remote(video_bytes) except Exception as e: print(f"[Orchestrator] Error in action recognition: {e}") results["actions"] = [{"error": f"Action Recognition Error: {str(e)}"}] # Ensure list type for error print("[Orchestrator] Calling object detection...") try: results["objects"] = generate_object_detection.remote(video_bytes) except Exception as e: print(f"[Orchestrator] Error in object detection: {e}") results["objects"] = [{"error": f"Object Detection Error: {str(e)}"}] # Ensure list type for error print("[Orchestrator] All analyses attempted. Storing results in cache.") try: video_analysis_cache.put(cache_key, results) print(f"[Orchestrator] Successfully cached results for key: {cache_key}") except Exception as e: print(f"[Orchestrator] Cache PUT error: {e}") return results # === FastAPI Endpoint for Video Analysis === @web_app.post("/process_video_analysis") def process_video_analysis(payload: VideoAnalysisRequestPayload): """FastAPI endpoint for comprehensive video analysis.""" print(f"[FastAPI Endpoint] Received request for video analysis") video_url = payload.video_url if not video_url: return JSONResponse(status_code=400, content={"error": "video_url must be provided in JSON payload."}) print(f"[FastAPI Endpoint] Processing video_url: {video_url}") try: # Download video using yt-dlp with enhanced options for robustness import yt_dlp import tempfile import os import subprocess import shutil video_bytes = None with tempfile.TemporaryDirectory() as tmpdir: output_base = os.path.join(tmpdir, 'video') output_path = output_base + '.mp4' # Enhanced yt-dlp options for more reliable downloads ydl_opts = { # Request specific formats in priority order 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', 'outtmpl': output_base, 'quiet': False, # Temporarily enable output for debugging 'verbose': True, # More verbose output to diagnose issues 'no_warnings': False, # Show warnings for debugging 'noplaylist': True, # Force remux to ensure valid container 'merge_output_format': 'mp4', # Add postprocessors to ensure valid MP4 'postprocessors': [{ 'key': 'FFmpegVideoConvertor', 'preferedformat': 'mp4', 'postprocessor_args': ['-movflags', '+faststart'], }], # Force ffmpeg to create a valid MP4 with moov atom at the beginning 'prefer_ffmpeg': True, 'http_headers': { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36' }, } try: print(f"[FastAPI Endpoint] Downloading video with enhanced yt-dlp options from {video_url}") download_success = False # Try yt-dlp first try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([video_url]) # Find the actual output file (might have a different extension) downloaded_files = [f for f in os.listdir(tmpdir) if f.startswith('video')] if downloaded_files: actual_file = os.path.join(tmpdir, downloaded_files[0]) print(f"[FastAPI Endpoint] Found downloaded file: {actual_file}") download_success = True except Exception as e: print(f"[FastAPI Endpoint] yt-dlp download failed: {e}. Trying direct download...") # Fallback to direct download if it's a direct video URL if not download_success and (video_url.endswith('.mp4') or 'commondatastorage.googleapis.com' in video_url): import requests try: print(f"[FastAPI Endpoint] Attempting direct download for {video_url}") actual_file = os.path.join(tmpdir, 'direct_video.mp4') with requests.get(video_url, stream=True) as r: r.raise_for_status() with open(actual_file, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) print(f"[FastAPI Endpoint] Direct download successful: {actual_file}") download_success = True except Exception as e: print(f"[FastAPI Endpoint] Direct download failed: {e}") # For testing: Try a sample video if all downloads failed (Big Buck Bunny) if not download_success: test_url = "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4" print(f"[FastAPI Endpoint] All downloads failed. Falling back to sample video: {test_url}") import requests try: actual_file = os.path.join(tmpdir, 'fallback_video.mp4') with requests.get(test_url, stream=True) as r: r.raise_for_status() with open(actual_file, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) print(f"[FastAPI Endpoint] Fallback download successful") download_success = True except Exception as e: print(f"[FastAPI Endpoint] Even fallback download failed: {e}") raise Exception("All download methods failed") # Ensure it's a properly formatted MP4 using ffmpeg directly final_output = os.path.join(tmpdir, 'final_video.mp4') try: # Use ffmpeg to re-encode the file, ensuring proper moov atom placement print(f"[FastAPI Endpoint] Reprocessing with ffmpeg to ensure valid MP4 format") subprocess.run( ["ffmpeg", "-i", actual_file, "-c:v", "copy", "-c:a", "copy", "-movflags", "faststart", final_output], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) if os.path.exists(final_output) and os.path.getsize(final_output) > 0: with open(final_output, 'rb') as f: video_bytes = f.read() print(f"[FastAPI Endpoint] Successfully reprocessed video, size: {len(video_bytes)} bytes") else: print(f"[FastAPI Endpoint] ffmpeg reprocessing failed to produce valid output") except subprocess.SubprocessError as se: print(f"[FastAPI Endpoint] ffmpeg reprocessing failed: {se}") # If ffmpeg fails, try with the original file if os.path.exists(actual_file) and os.path.getsize(actual_file) > 0: with open(actual_file, 'rb') as f: video_bytes = f.read() print(f"[FastAPI Endpoint] Using original download, size: {len(video_bytes)} bytes") except yt_dlp.utils.DownloadError: # Fallback to httpx for direct links if yt-dlp fails print(f"[FastAPI Endpoint] yt-dlp failed, falling back to httpx for {video_url}") try: import httpx with httpx.Client() as client: response = client.get(video_url, follow_redirects=True, timeout=60.0) response.raise_for_status() video_bytes = response.content except httpx.RequestError as he: return JSONResponse(status_code=400, content={"error": f"Failed to download video from URL using both yt-dlp and httpx. Details: {he}"}) if not video_bytes: return JSONResponse(status_code=400, content={"error": f"Downloaded video from URL {video_url} is empty or download failed."}) print(f"[FastAPI Endpoint] Successfully downloaded and validated {len(video_bytes)} bytes from {video_url} using enhanced downloader.") # Call comprehensive analysis analysis_results = analyze_video_comprehensive.remote(video_bytes) print("[FastAPI Endpoint] Comprehensive analysis finished.") return JSONResponse(status_code=200, content=analysis_results) except httpx.RequestError as e: print(f"[FastAPI Endpoint] httpx.RequestError downloading video: {e}") return JSONResponse(status_code=400, content={"error": f"Error downloading video from URL: {video_url}. Details: {str(e)}"}) except Exception as e: print(f"[FastAPI Endpoint] Unexpected Exception during analysis: {e}") return JSONResponse(status_code=500, content={"error": f"Unexpected server error during analysis: {str(e)}"}) # === FastAPI Endpoint for Topic Analysis === @web_app.post("/analyze_topic") async def handle_analyze_topic_request(request: TopicAnalysisRequest): """ Handles a request to analyze videos based on a topic. 1. Finds video URLs for the topic using YouTube search. 2. Concurrently analyzes these videos. 3. Returns aggregated results. """ print(f"[TopicAPI] Received request to analyze topic: '{request.topic}', max_videos: {request.max_videos}") try: # Use .aio for async call if the Modal function is async, or just .remote if it's sync # Assuming find_video_urls_for_topic is sync as defined, but can be called with .remote() # If find_video_urls_for_topic itself becomes async, then .remote.aio() is appropriate. # For now, let's assume it's called as a standard remote Modal function. video_urls = await find_video_urls_for_topic.remote.aio(request.topic, request.max_videos) if not video_urls: print(f"[TopicAPI] No video URLs found for topic: '{request.topic}'") return JSONResponse( status_code=404, content={ "status": "error", "message": "No videos found for the specified topic.", "topic": request.topic, "details": "The YouTube search did not return any relevant video URLs." } ) print(f"[TopicAPI] Found {len(video_urls)} URLs for topic '{request.topic}', proceeding to analysis.") # analyze_videos_by_topic is an async Modal function, so use .remote.aio() analysis_results = await analyze_videos_by_topic.remote.aio(video_urls, request.topic) print(f"[TopicAPI] Successfully analyzed videos for topic: '{request.topic}'") return analysis_results except Exception as e: print(f"[TopicAPI] Error during topic analysis for '{request.topic}': {e}") import traceback traceback.print_exc() return JSONResponse( status_code=500, content={ "status": "error", "message": "An internal server error occurred during topic analysis.", "topic": request.topic, "error_details_str": str(e) # Keep it simple for JSON } ) # === 6. Topic-Based Video Search === @app.function( image=video_analysis_image_v2, secrets=[HF_TOKEN_SECRET], timeout=300 ) def find_video_urls_for_topic(topic: str, max_results: int = 3) -> List[str]: """Finds video URLs (YouTube) for a given topic using yt-dlp.""" print(f"[TopicSearch] Finding video URLs for topic: '{topic}', max_results={max_results}") video_urls = [] try: # Add a common user-agent to avoid getting blocked # Let yt-dlp find ffmpeg in the PATH instead of hardcoding it ydl_opts = { 'quiet': True, 'extract_flat': 'discard_in_playlist', 'force_generic_extractor': False, 'default_search': f"ytsearch{max_results}", 'noplaylist': True, 'prefer_ffmpeg': True, 'http_headers': { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36' } } with yt_dlp.YoutubeDL(ydl_opts) as ydl: # extract_info with a search query like 'ytsearchN:query' returns a playlist dictionary search_result = ydl.extract_info(topic, download=False) if search_result and 'entries' in search_result: for entry in search_result['entries']: # Ensure entry is a dictionary and has 'webpage_url' if isinstance(entry, dict) and entry.get('webpage_url'): video_urls.append(entry['webpage_url']) # yt-dlp search might return more than max_results, so we cap it here if len(video_urls) >= max_results: break # Sometimes a single video result might not be in 'entries' elif isinstance(search_result, dict) and search_result.get('webpage_url'): video_urls.append(search_result['webpage_url']) # Ensure we don't exceed max_results if the loop didn't break early enough video_urls = video_urls[:max_results] print(f"[TopicSearch] Found {len(video_urls)} video URLs for topic '{topic}': {video_urls}") except Exception as e: print(f"[TopicSearch] Error finding videos for topic '{topic}': {e}") import traceback traceback.print_exc() return video_urls # Helper function (not a Modal function) to extract video URLs from search results def extract_video_urls_from_search(search_results: List[Dict[str, str]], max_urls: int = 3) -> List[str]: """Extracts video URLs from a list of search result dictionaries.""" video_urls = [] seen_urls = set() # Regex for YouTube, Vimeo, and common video file extensions # Simplified YouTube regex to catch most common video and shorts links youtube_regex = r"(?:https?://)?(?:www\.)?(?:youtube\.com/(?:watch\?v=|embed/|shorts/)|youtu\.be/)([a-zA-Z0-9_-]{11})" vimeo_regex = r"(?:https?://)?(?:www\.)?vimeo\.com/(\d+)" direct_video_regex = r"https?://[^\s]+\.(mp4|mov|avi|webm|mkv)(\?[^\s]*)?" patterns = [ re.compile(youtube_regex), re.compile(vimeo_regex), re.compile(direct_video_regex) ] for item in search_results: url = item.get("link") or item.get("url") # Common keys for URL in search results if not url: continue for pattern in patterns: match = pattern.search(url) if match: # Reconstruct canonical YouTube URL if it's a short link or embed if pattern.pattern == youtube_regex and match.group(1): normalized_url = f"https://www.youtube.com/watch?v={match.group(1)}" else: normalized_url = url if normalized_url not in seen_urls: video_urls.append(normalized_url) seen_urls.add(normalized_url) if len(video_urls) >= max_urls: break if len(video_urls) >= max_urls: break # === 7. Topic-Based Video Analysis Orchestrator === @app.function( image=video_analysis_image_v2, secrets=[HF_TOKEN_SECRET], timeout=1800, ) async def _analyze_video_worker(video_url: str) -> dict: """ Worker function to download a video from a URL and run comprehensive analysis. This is designed to be called concurrently. """ print(f"[Worker] Starting analysis for {video_url}") try: async with httpx.AsyncClient() as client: print(f"[Worker] Downloading video from {video_url}") response = await client.get(video_url, follow_redirects=True, timeout=60.0) response.raise_for_status() video_bytes = await response.aread() print(f"[Worker] Downloaded {len(video_bytes)} bytes from {video_url}") if not video_bytes: raise ValueError("Downloaded video content is empty.") analysis_result = await analyze_video_comprehensive.coro(video_bytes) if isinstance(analysis_result, dict) and any("error" in str(v).lower() for v in analysis_result.values()): print(f"[Worker] Comprehensive analysis for {video_url} reported errors: {analysis_result}") return {"url": video_url, "status": "error", "error_type": "analysis_error", "details": analysis_result} else: return {"url": video_url, "status": "success", "analysis": analysis_result} except httpx.HTTPStatusError as e: print(f"[Worker] HTTP error downloading {video_url}: {e}") return {"url": video_url, "status": "error", "error_type": "download_error", "details": f"HTTP {e.response.status_code}"} except httpx.RequestError as e: print(f"[Worker] Request error downloading {video_url}: {e}") return {"url": video_url, "status": "error", "error_type": "download_error", "details": f"Failed to download: {str(e)}"} except Exception as e: print(f"[Worker] Error processing video {video_url}: {e}") import traceback return {"url": video_url, "status": "error", "error_type": "processing_error", "details": str(e), "traceback": traceback.format_exc()[:1000]} @app.function( image=video_analysis_image_v2, secrets=[HF_TOKEN_SECRET], timeout=3600, gpu="any", ) async def analyze_videos_by_topic(video_urls: List[str], topic: str) -> Dict[str, Any]: """Analyzes a list of videos (by URL) concurrently and aggregates results for a topic.""" print(f"[TopicAnalysis] Starting concurrent analysis for topic: '{topic}' with {len(video_urls)} video(s).") results_aggregator = { "topic": topic, "analyzed_videos": [], "errors": [] } if not video_urls: results_aggregator["errors"].append({"topic_error": "No video URLs provided or found for the topic."}) return results_aggregator # Use .map to run the worker function concurrently on all video URLs # The list() call forces the generator to execute and retrieve all results. individual_results = list(_analyze_video_worker.map(video_urls)) for result in individual_results: if isinstance(result, dict): if result.get("status") == "error": results_aggregator["errors"].append(result) else: results_aggregator["analyzed_videos"].append(result) else: # This case handles unexpected return types from the worker, like exceptions print(f"[TopicAnalysis] Received an unexpected result type from worker: {type(result)}") results_aggregator["errors"].append({"url": "unknown", "error_type": "unexpected_result", "details": str(result)}) print(f"[TopicAnalysis] Finished concurrent analysis for topic '{topic}'.") return results_aggregator # === Gradio Interface === def video_analyzer_gradio_ui(): print("[Gradio] UI function called to define interface.") def analyze_video_all_models(video_filepath): print(f"[Gradio] Received video filepath for analysis: {video_filepath}") if not video_filepath or not os.path.exists(video_filepath): return "Error: Video file path is invalid or does not exist.", "", "[]", "[]" with open(video_filepath, "rb") as f: video_bytes_content = f.read() print(f"[Gradio] Read {len(video_bytes_content)} bytes from video path: {video_filepath}") if not video_bytes_content: return "Error: Could not read video bytes.", "", "[]", "[]" print("[Gradio] Calling Whisper...") transcription = transcribe_video_with_whisper.call(video_bytes_content) print(f"[Gradio] Whisper result length: {len(transcription)}") print("[Gradio] Calling SpaceTimeGPT...") captions = generate_captions_with_spacetimegpt.call(video_bytes_content) print(f"[Gradio] SpaceTimeGPT result: {captions}") print("[Gradio] Calling VideoMAE...") action_labels = generate_action_labels.call(video_bytes_content) print(f"[Gradio] VideoMAE result: {action_labels}") print("[Gradio] Calling DETR...") object_detections = generate_object_detection.call(video_bytes_content) print(f"[Gradio] DETR result: {object_detections}") return transcription, captions, str(action_labels), str(object_detections) with gr.Blocks(title="Comprehensive Video Analyzer", theme=gr.themes.Soft()) as demo: gr.Markdown("# Comprehensive Video Analyzer") gr.Markdown("Upload a video to get transcription, captions, action labels, and object detections.") with gr.Row(): video_input = gr.Video(label="Upload Video", sources=["upload"], type="filepath") submit_button = gr.Button("Analyze Video", variant="primary") with gr.Tabs(): with gr.TabItem("Transcription (Whisper)"): transcription_output = gr.Textbox(label="Transcription", lines=10, interactive=False) with gr.TabItem("Dense Captions (SpaceTimeGPT)"): caption_output = gr.Textbox(label="Captions", lines=10, interactive=False) with gr.TabItem("Action Recognition (VideoMAE)"): action_output = gr.Textbox(label="Predicted Actions (JSON format)", lines=10, interactive=False) with gr.TabItem("Object Detection (DETR)"): object_output = gr.Textbox(label="Detected Objects (JSON format)", lines=10, interactive=False) submit_button.click( fn=analyze_video_all_models, inputs=[video_input], outputs=[transcription_output, caption_output, action_output, object_output] ) gr.Markdown("### Example Video") gr.Markdown("You can test with a short video. Processing may take a few minutes depending on video length and model inference times.") print("[Gradio] UI definition complete.") return gr.routes.App.create_app(demo)