|
""" |
|
Video Content Analyzer for the GAIA agent. |
|
|
|
This module provides tools for extracting and analyzing visual content from YouTube videos, |
|
especially when transcripts are unavailable. It includes: |
|
- Frame extraction from YouTube videos at strategic timestamps |
|
- Visual content analysis using multimodal capabilities |
|
- OCR for extracting text displayed in videos |
|
- Consolidated visual and text analysis results |
|
|
|
It serves as a fallback mechanism when YouTube transcript extraction fails, |
|
enabling the agent to still understand video content through visual analysis. |
|
""" |
|
|
|
import logging |
|
import traceback |
|
import time |
|
import re |
|
import os |
|
import tempfile |
|
from typing import Dict, Any, List, Optional, Tuple, Union |
|
from enum import Enum |
|
from pathlib import Path |
|
import json |
|
|
|
|
|
logger = logging.getLogger("gaia_agent.tools.video_content_analyzer") |
|
|
|
|
|
class ErrorSeverity(Enum): |
|
"""Enum for categorizing error severity levels.""" |
|
INFO = "INFO" |
|
WARNING = "WARNING" |
|
ERROR = "ERROR" |
|
CRITICAL = "CRITICAL" |
|
|
|
try: |
|
from PIL import Image |
|
import numpy as np |
|
except ImportError: |
|
Image = None |
|
np = None |
|
|
|
try: |
|
import pytesseract |
|
except ImportError: |
|
pytesseract = None |
|
|
|
from src.gaia.agent.config import get_tool_config, get_model_config |
|
from langchain_openai import ChatOpenAI |
|
from langchain.prompts import PromptTemplate |
|
from langchain_core.output_parsers import StrOutputParser |
|
|
|
|
|
class VideoFrameExtractor: |
|
"""Tool for extracting frames from YouTube videos using browser_action.""" |
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None): |
|
""" |
|
Initialize the video frame extractor. |
|
|
|
Args: |
|
config: Optional configuration dictionary |
|
""" |
|
self.config = config or get_tool_config().get("video_frame_extraction", {}) |
|
|
|
|
|
self.default_frame_count = self.config.get("default_frame_count", 5) |
|
self.temp_dir = self.config.get("temp_dir", tempfile.gettempdir()) |
|
self.capture_interval_pct = self.config.get("capture_interval_pct", [0, 0.25, 0.5, 0.75, 0.9]) |
|
|
|
logger.info(f"VideoFrameExtractor initialized with temporary directory: {self.temp_dir}") |
|
|
|
def extract_video_id(self, video_id_or_url: str) -> str: |
|
""" |
|
Extract the YouTube video ID from a URL or return the ID if already provided. |
|
|
|
Args: |
|
video_id_or_url: YouTube video ID or URL |
|
|
|
Returns: |
|
The extracted video ID |
|
|
|
Raises: |
|
ValueError: If the video ID cannot be extracted |
|
""" |
|
|
|
if re.match(r'^[a-zA-Z0-9_-]{11}$', video_id_or_url): |
|
return video_id_or_url |
|
|
|
|
|
patterns = [ |
|
r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/|youtube\.com/shorts/)([a-zA-Z0-9_-]{11})', |
|
r'youtube\.com/watch\?.*v=([a-zA-Z0-9_-]{11})' |
|
] |
|
|
|
for pattern in patterns: |
|
match = re.search(pattern, video_id_or_url) |
|
if match: |
|
return match.group(1) |
|
|
|
raise ValueError(f"Could not extract video ID from: {video_id_or_url}") |
|
|
|
def extract_video_frames(self, video_id_or_url: str, frame_count: Optional[int] = None) -> Dict[str, Any]: |
|
""" |
|
Extract frames from a YouTube video using browser_action tool. |
|
|
|
This method coordinates the browser interactions needed to: |
|
1. Open the YouTube video |
|
2. Capture screenshots at strategic timestamps |
|
3. Save the screenshots for further analysis |
|
|
|
Args: |
|
video_id_or_url: YouTube video ID or URL |
|
frame_count: Optional number of frames to capture, defaults to configuration |
|
|
|
Returns: |
|
Dictionary containing extracted frames and metadata |
|
""" |
|
try: |
|
video_id = self.extract_video_id(video_id_or_url) |
|
frames_to_capture = frame_count if frame_count is not None else self.default_frame_count |
|
|
|
|
|
timestamp = int(time.time()) |
|
video_frame_dir = os.path.join(self.temp_dir, f"video_frames_{video_id}_{timestamp}") |
|
os.makedirs(video_frame_dir, exist_ok=True) |
|
|
|
|
|
frame_capture_instructions = self._generate_frame_capture_instructions( |
|
video_id, frames_to_capture, video_frame_dir |
|
) |
|
|
|
return { |
|
"video_id": video_id, |
|
"frame_count": frames_to_capture, |
|
"frame_dir": video_frame_dir, |
|
"browser_instructions": frame_capture_instructions, |
|
"success": False, |
|
"completed": False, |
|
"timestamp": timestamp |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error preparing video frame extraction: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
return { |
|
"video_id": video_id_or_url, |
|
"error": f"Failed to prepare video frame extraction: {str(e)}", |
|
"error_type": type(e).__name__, |
|
"severity": ErrorSeverity.ERROR.value, |
|
"success": False, |
|
"browser_instructions": None |
|
} |
|
|
|
def _generate_frame_capture_instructions(self, video_id: str, frame_count: int, |
|
output_dir: str) -> List[Dict[str, Any]]: |
|
""" |
|
Generate instructions for browser_action to capture video frames. |
|
|
|
Args: |
|
video_id: YouTube video ID |
|
frame_count: Number of frames to capture |
|
output_dir: Directory to save frames |
|
|
|
Returns: |
|
List of browser_action instructions |
|
""" |
|
|
|
if frame_count != len(self.capture_interval_pct): |
|
|
|
self.capture_interval_pct = [i/(frame_count-1) if frame_count > 1 else 0.5 |
|
for i in range(frame_count)] |
|
|
|
if self.capture_interval_pct[-1] > 0.95: |
|
self.capture_interval_pct[-1] = 0.95 |
|
|
|
|
|
embedded_url = f"https://www.youtube.com/embed/{video_id}?autoplay=0&controls=1" |
|
|
|
|
|
instructions = [ |
|
{ |
|
"action": "launch", |
|
"url": embedded_url, |
|
"description": "Launch YouTube embedded player for the video" |
|
}, |
|
{ |
|
"action": "wait", |
|
"seconds": 3, |
|
"description": "Wait for video player to load" |
|
} |
|
] |
|
|
|
|
|
for i, interval_pct in enumerate(self.capture_interval_pct): |
|
|
|
instructions.extend([ |
|
{ |
|
"action": "click", |
|
"selector": ".ytp-play-button", |
|
"description": f"Click play button to start video" |
|
}, |
|
{ |
|
"action": "wait_for_duration_pct", |
|
"percentage": interval_pct, |
|
"description": f"Wait until {interval_pct:.0%} of the video" |
|
}, |
|
{ |
|
"action": "click", |
|
"selector": ".ytp-play-button", |
|
"description": "Pause the video for screenshot" |
|
}, |
|
{ |
|
"action": "take_screenshot", |
|
"filename": f"{output_dir}/frame_{i+1}_of_{frame_count}.png", |
|
"description": f"Take screenshot at {interval_pct:.0%} of video duration" |
|
} |
|
]) |
|
|
|
|
|
instructions.append({ |
|
"action": "close", |
|
"description": "Close the browser after capturing all frames" |
|
}) |
|
|
|
return instructions |
|
|
|
def process_captured_frames(self, frame_dir: str) -> Dict[str, Any]: |
|
""" |
|
Process captured frames after browser_action has completed the extraction. |
|
|
|
Args: |
|
frame_dir: Directory containing the captured frames |
|
|
|
Returns: |
|
Dictionary containing processed frame information |
|
""" |
|
try: |
|
if not os.path.exists(frame_dir): |
|
raise FileNotFoundError(f"Frame directory not found: {frame_dir}") |
|
|
|
|
|
frame_files = sorted( |
|
[f for f in os.listdir(frame_dir) if f.lower().endswith('.png')] |
|
) |
|
|
|
if not frame_files: |
|
return { |
|
"frame_dir": frame_dir, |
|
"error": "No frame images found in directory", |
|
"error_type": "NoFramesFound", |
|
"severity": ErrorSeverity.ERROR.value, |
|
"success": False |
|
} |
|
|
|
|
|
frame_data = [] |
|
for i, frame_file in enumerate(frame_files): |
|
frame_path = os.path.join(frame_dir, frame_file) |
|
|
|
|
|
match = re.search(r'frame_(\d+)_of_(\d+)', frame_file) |
|
if match: |
|
frame_num = int(match.group(1)) |
|
total_frames = int(match.group(2)) |
|
position = (frame_num - 1) / (total_frames - 1) if total_frames > 1 else 0 |
|
else: |
|
frame_num = i + 1 |
|
total_frames = len(frame_files) |
|
position = i / (len(frame_files) - 1) if len(frame_files) > 1 else 0 |
|
|
|
frame_data.append({ |
|
"path": frame_path, |
|
"filename": frame_file, |
|
"frame_number": frame_num, |
|
"total_frames": total_frames, |
|
"position": position, |
|
"timestamp_pct": position |
|
}) |
|
|
|
return { |
|
"frame_dir": frame_dir, |
|
"frame_count": len(frame_files), |
|
"frames": frame_data, |
|
"success": True, |
|
"completed": True |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing captured frames: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
return { |
|
"frame_dir": frame_dir, |
|
"error": f"Failed to process captured frames: {str(e)}", |
|
"error_type": type(e).__name__, |
|
"severity": ErrorSeverity.ERROR.value, |
|
"success": False |
|
} |
|
|
|
|
|
class VideoContentAnalyzer: |
|
"""Tool for analyzing visual content from YouTube videos.""" |
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None): |
|
""" |
|
Initialize the video content analyzer. |
|
|
|
Args: |
|
config: Optional configuration dictionary |
|
""" |
|
self.config = config or get_tool_config().get("video_content_analysis", {}) |
|
self.model_config = get_model_config() |
|
|
|
|
|
self.model = ChatOpenAI( |
|
model=self.model_config.get("vision_model", "gpt-4o"), |
|
temperature=self.model_config.get("temperature", 0.1), |
|
max_tokens=self.model_config.get("max_tokens", 4096) |
|
) |
|
|
|
|
|
self.frame_extractor = VideoFrameExtractor(config) |
|
|
|
|
|
if pytesseract is None: |
|
logger.warning("Pytesseract not installed. OCR features will be limited.") |
|
|
|
def analyze_video_content(self, video_id_or_url: str, frame_data: Dict[str, Any]) -> Dict[str, Any]: |
|
""" |
|
Analyze video content from captured frames. |
|
|
|
Args: |
|
video_id_or_url: YouTube video ID or URL |
|
frame_data: Dictionary containing frame information from process_captured_frames |
|
|
|
Returns: |
|
Dictionary containing analysis results |
|
""" |
|
try: |
|
video_id = self.frame_extractor.extract_video_id(video_id_or_url) |
|
|
|
if not frame_data.get("success", False): |
|
return { |
|
"video_id": video_id, |
|
"error": "Frame extraction was not successful", |
|
"error_type": "FrameExtractionFailed", |
|
"severity": ErrorSeverity.ERROR.value, |
|
"success": False |
|
} |
|
|
|
frames = frame_data.get("frames", []) |
|
if not frames: |
|
return { |
|
"video_id": video_id, |
|
"error": "No frames available for analysis", |
|
"error_type": "NoFramesAvailable", |
|
"severity": ErrorSeverity.ERROR.value, |
|
"success": False |
|
} |
|
|
|
|
|
frame_analyses = [] |
|
for frame in frames: |
|
frame_path = frame.get("path") |
|
frame_position = frame.get("position", 0) |
|
|
|
if not os.path.exists(frame_path): |
|
logger.warning(f"Frame file not found: {frame_path}") |
|
continue |
|
|
|
|
|
frame_analysis = self._analyze_frame(frame_path, frame_position) |
|
frame_analyses.append({ |
|
**frame, |
|
"analysis": frame_analysis |
|
}) |
|
|
|
|
|
ocr_results = self._extract_text_from_frames(frames) |
|
|
|
|
|
consolidated_analysis = self._consolidate_frame_analyses(frame_analyses) |
|
|
|
|
|
return { |
|
"video_id": video_id, |
|
"frame_count": len(frames), |
|
"frame_analyses": frame_analyses, |
|
"ocr_results": ocr_results, |
|
"consolidated_analysis": consolidated_analysis, |
|
"success": True, |
|
"has_visual_content": True, |
|
"analysis_method": "frame_extraction" |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error analyzing video content: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
return { |
|
"video_id": video_id_or_url, |
|
"error": f"Failed to analyze video content: {str(e)}", |
|
"error_type": type(e).__name__, |
|
"severity": ErrorSeverity.ERROR.value, |
|
"success": False |
|
} |
|
|
|
def _analyze_frame(self, frame_path: str, frame_position: float) -> Dict[str, Any]: |
|
""" |
|
Analyze a single video frame using the vision model. |
|
|
|
Args: |
|
frame_path: Path to the frame image file |
|
frame_position: Position of the frame in the video (0-1) |
|
|
|
Returns: |
|
Dictionary containing frame analysis |
|
""" |
|
try: |
|
if Image is None: |
|
raise ImportError("PIL not installed. Install with: pip install pillow") |
|
|
|
if not os.path.exists(frame_path): |
|
raise FileNotFoundError(f"Frame file not found: {frame_path}") |
|
|
|
image = Image.open(frame_path) |
|
|
|
|
|
frame_type = "beginning" if frame_position < 0.1 else "end" if frame_position > 0.85 else "middle" |
|
|
|
analysis_prompt = f"""Analyze this frame from the {frame_type} of a YouTube video. |
|
|
|
For frames from the beginning of videos, focus on: introductory elements, titles, channel branding. |
|
For frames from the middle of videos, focus on: main content, actions, subjects, visual information. |
|
For frames from the end of videos, focus on: conclusions, call-to-action elements, credits. |
|
|
|
Provide a detailed JSON response with: |
|
- frame_description: What's visible in the frame |
|
- visible_text: Any text visible in the frame |
|
- key_elements: Important objects, people, or visual elements |
|
- topic: The apparent topic or subject |
|
- visual_style: Description of the visual presentation |
|
|
|
JSON Response:""" |
|
|
|
prompt_template = PromptTemplate.from_template(analysis_prompt) |
|
chain = prompt_template | self.model | StrOutputParser() |
|
result = chain.invoke({"image": image}) |
|
|
|
try: |
|
parsed_result = json.loads(result) |
|
return parsed_result |
|
except json.JSONDecodeError: |
|
logger.warning("Frame analysis result is not valid JSON, returning as plain text") |
|
return { |
|
"frame_description": result, |
|
"analysis_error": "Failed to parse JSON result" |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error analyzing frame: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
return { |
|
"analysis_error": f"Frame analysis failed: {str(e)}", |
|
"error_type": type(e).__name__ |
|
} |
|
|
|
def _extract_text_from_frames(self, frames: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
""" |
|
Extract text from frames using OCR. |
|
|
|
Args: |
|
frames: List of frame information dictionaries |
|
|
|
Returns: |
|
Dictionary containing OCR results |
|
""" |
|
try: |
|
if pytesseract is None: |
|
return { |
|
"success": False, |
|
"error": "Pytesseract not installed. Install with: pip install pytesseract", |
|
"text_found": False |
|
} |
|
|
|
if Image is None: |
|
return { |
|
"success": False, |
|
"error": "PIL not installed. Install with: pip install pillow", |
|
"text_found": False |
|
} |
|
|
|
ocr_results = [] |
|
all_text = [] |
|
|
|
for frame in frames: |
|
frame_path = frame.get("path") |
|
frame_position = frame.get("position", 0) |
|
|
|
if not os.path.exists(frame_path): |
|
logger.warning(f"Frame file not found for OCR: {frame_path}") |
|
continue |
|
|
|
try: |
|
image = Image.open(frame_path) |
|
|
|
|
|
|
|
if image.mode != 'L': |
|
image = image.convert('L') |
|
|
|
|
|
extracted_text = pytesseract.image_to_string(image) |
|
extracted_text = extracted_text.strip() |
|
|
|
|
|
if not extracted_text: |
|
ocr_results.append({ |
|
"frame_position": frame_position, |
|
"text_found": False, |
|
"text": "" |
|
}) |
|
continue |
|
|
|
|
|
ocr_results.append({ |
|
"frame_position": frame_position, |
|
"text_found": True, |
|
"text": extracted_text |
|
}) |
|
|
|
all_text.append(f"[Frame at {frame_position:.0%}]: {extracted_text}") |
|
|
|
except Exception as e: |
|
logger.warning(f"OCR failed for frame {frame_path}: {str(e)}") |
|
ocr_results.append({ |
|
"frame_position": frame_position, |
|
"text_found": False, |
|
"text": "", |
|
"error": str(e) |
|
}) |
|
|
|
|
|
return { |
|
"success": True, |
|
"frames_processed": len(frames), |
|
"frames_with_text": sum(1 for r in ocr_results if r.get("text_found", False)), |
|
"text_found": any(r.get("text_found", False) for r in ocr_results), |
|
"frame_results": ocr_results, |
|
"combined_text": "\n\n".join(all_text) |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error extracting text from frames: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
return { |
|
"success": False, |
|
"error": f"Text extraction failed: {str(e)}", |
|
"error_type": type(e).__name__, |
|
"text_found": False |
|
} |
|
|
|
def _consolidate_frame_analyses(self, frame_analyses: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
""" |
|
Consolidate individual frame analyses into a comprehensive video analysis. |
|
|
|
Args: |
|
frame_analyses: List of frame analysis results |
|
|
|
Returns: |
|
Dictionary containing consolidated analysis |
|
""" |
|
try: |
|
if not frame_analyses: |
|
return { |
|
"error": "No frame analyses available for consolidation", |
|
"success": False |
|
} |
|
|
|
|
|
frame_descriptions = [] |
|
for frame_analysis in frame_analyses: |
|
position = frame_analysis.get("position", 0) |
|
analysis = frame_analysis.get("analysis", {}) |
|
frame_desc = analysis.get("frame_description", "No description available") |
|
frame_descriptions.append(f"[Frame at {position:.0%} of video] {frame_desc}") |
|
|
|
|
|
all_descriptions = "\n".join(frame_descriptions) |
|
|
|
|
|
consolidated_prompt = """Based on the analysis of multiple frames from a YouTube video, provide a comprehensive understanding of the video content. |
|
|
|
Frame descriptions: |
|
{frame_descriptions} |
|
|
|
Provide a JSON response with: |
|
- video_topic: The main topic or subject of the video |
|
- video_type: The type of video (tutorial, vlog, educational, etc.) |
|
- key_elements: Important visual elements across frames |
|
- visual_style: The overall visual style and production quality |
|
- summary: A summary of what the video appears to be conveying |
|
|
|
JSON Response:""" |
|
|
|
prompt_template = PromptTemplate.from_template(consolidated_prompt) |
|
chain = prompt_template | self.model | StrOutputParser() |
|
result = chain.invoke({"frame_descriptions": all_descriptions}) |
|
|
|
try: |
|
parsed_result = json.loads(result) |
|
parsed_result["success"] = True |
|
return parsed_result |
|
except json.JSONDecodeError: |
|
logger.warning("Consolidated analysis result is not valid JSON, returning as plain text") |
|
return { |
|
"summary": result, |
|
"success": True, |
|
"parsing_error": "Failed to parse JSON result" |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error consolidating frame analyses: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
return { |
|
"error": f"Consolidation failed: {str(e)}", |
|
"error_type": type(e).__name__, |
|
"success": False |
|
} |
|
|
|
def analyze_youtube_video(self, video_id_or_url: str, frame_count: Optional[int] = None) -> Dict[str, Any]: |
|
""" |
|
Complete flow for analyzing YouTube video content through visual extraction. |
|
|
|
This serves as the main entry point for the module, handling the entire process: |
|
1. Extract frames from the video |
|
2. Process the captured frames |
|
3. Analyze the visual content |
|
4. Extract text using OCR |
|
5. Provide consolidated results |
|
|
|
Args: |
|
video_id_or_url: YouTube video ID or URL |
|
frame_count: Optional number of frames to capture |
|
|
|
Returns: |
|
Dictionary containing complete analysis results |
|
""" |
|
try: |
|
|
|
extraction_result = self.frame_extractor.extract_video_frames( |
|
video_id_or_url, frame_count |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
frame_dir = extraction_result.get("frame_dir") |
|
if not frame_dir or not os.path.exists(frame_dir): |
|
return { |
|
"video_id": self.frame_extractor.extract_video_id(video_id_or_url), |
|
"error": "Frame extraction not completed or directory not found", |
|
"error_type": "FrameExtractionIncomplete", |
|
"severity": ErrorSeverity.ERROR.value, |
|
"success": False, |
|
"browser_instructions": extraction_result.get("browser_instructions"), |
|
"extract_frames_first": True, |
|
"frame_dir": frame_dir |
|
} |
|
|
|
|
|
frame_data = self.frame_extractor.process_captured_frames(frame_dir) |
|
|
|
|
|
analysis_result = self.analyze_video_content(video_id_or_url, frame_data) |
|
|
|
|
|
analysis_result["guidance"] = """ |
|
This analysis is based on visual content extracted from the YouTube video. |
|
It provides insights into what's shown in the video when transcript is unavailable. |
|
The frame analyses show content from different points in the video timeline. |
|
OCR results capture text visible in the video frames. |
|
The consolidated analysis summarizes the overall video content based on visual cues. |
|
""" |
|
|
|
return analysis_result |
|
|
|
except Exception as e: |
|
logger.error(f"Error in complete YouTube video analysis flow: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
return { |
|
"video_id": video_id_or_url, |
|
"error": f"Complete analysis flow failed: {str(e)}", |
|
"error_type": type(e).__name__, |
|
"severity": ErrorSeverity.ERROR.value, |
|
"success": False |
|
} |
|
|
|
|
|
def create_video_frame_extractor() -> VideoFrameExtractor: |
|
""" |
|
Create an instance of the VideoFrameExtractor tool. |
|
|
|
Returns: |
|
VideoFrameExtractor: An instance of the video frame extractor tool |
|
""" |
|
config = get_tool_config().get("video_frame_extraction", {}) |
|
return VideoFrameExtractor(config) |
|
|
|
|
|
def create_video_content_analyzer() -> VideoContentAnalyzer: |
|
""" |
|
Create an instance of the VideoContentAnalyzer tool. |
|
|
|
Returns: |
|
VideoContentAnalyzer: An instance of the video content analyzer tool |
|
""" |
|
config = get_tool_config().get("video_content_analysis", {}) |
|
return VideoContentAnalyzer(config) |