Final_Assignment_GAIAAgent / src /gaia /agent /multimodal_processor.py
JoachimVC's picture
Implement full GAIA agent solution with formatter and multimodal processing
460ec88
"""
GAIA Multimodal Processor
This module integrates different processing components for handling multimodal content:
- Images (including chess positions)
- Audio files
- Video content
- Text (including reversed text and word puzzles)
- Data files and tables
It provides a unified interface for multimodal content detection and processing.
"""
import os
import re
import logging
import time
from typing import Dict, Any, List, Optional, Union
import traceback
from pathlib import Path
# Import specialized components
from src.gaia.agent.components.image_analyzer import ImageAnalyzer
from src.gaia.agent.components.audio_analyzer import AudioAnalyzer
from src.gaia.agent.components.video_analyzer import VideoAnalyzer
from src.gaia.agent.components.text_analyzer import TextAnalyzer
from src.gaia.agent.components.data_file_handler import DataFileHandler
from src.gaia.agent.components.document_analyzer import DocumentAnalyzer
from src.gaia.agent.components.table_processor import TableProcessor
# Set up logging
logger = logging.getLogger("gaia_agent.multimodal_processor")
class MultimodalProcessor:
"""
Unified processor for multimodal content (images, audio, video, text, data files).
This class orchestrates the detection and processing of different types of content
using specialized component handlers.
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
Initialize the multimodal processor with configuration.
Args:
config: Optional configuration dictionary for components
"""
self.config = config or {}
# Initialize specialized components
self._initialize_components()
# Cache for processed results
self.processing_cache = {}
logger.info("Multimodal processor initialized")
def _initialize_components(self):
"""Initialize specialized processing components."""
logger.info("Initializing multimodal processing components")
try:
# Image analyzer for images and chess positions
self.image_analyzer = ImageAnalyzer()
# Audio analyzer for sound files
self.audio_analyzer = AudioAnalyzer()
# Video analyzer for video content
self.video_analyzer = VideoAnalyzer()
# Text analyzer for text processing
self.text_analyzer = TextAnalyzer()
# Data file handler for structured data files
self.data_file_handler = DataFileHandler()
# Document analyzer for PDFs and other documents
self.document_analyzer = DocumentAnalyzer()
# Table processor for tabular data
self.table_processor = TableProcessor()
logger.info("All processing components initialized")
except Exception as e:
logger.error(f"Error initializing components: {str(e)}")
logger.debug(traceback.format_exc())
raise RuntimeError(f"Failed to initialize multimodal processing components: {str(e)}")
def detect_content_type(self, question: str) -> str:
"""
Detect the type of content mentioned in a question.
Args:
question: The question to analyze
Returns:
str: Content type identifier
"""
question_lower = question.lower()
# Check for image content
if any(term in question_lower for term in ["image", "picture", "photo", "diagram", "chess position", "chess board"]):
if "chess" in question_lower:
return "chess_image"
return "image"
# Check for audio content
if any(term in question_lower for term in ["audio", "sound", "mp3", "recording", "listen"]):
return "audio"
# Check for video content
if any(term in question_lower for term in ["video", "youtube", "watch"]):
# Extract YouTube URL if present
if "youtube.com/watch" in question_lower or "youtu.be/" in question_lower:
return "youtube_video"
return "video"
# Check for structured data
if any(term in question_lower for term in ["table", "excel", "csv", "database", "spreadsheet"]):
return "structured_data"
# Check for document files
if any(term in question_lower for term in ["pdf", "document", "article", "paper", "file"]):
return "document"
# Check for special text cases
if any(term in question_lower for term in ["reversed", "backwards", "unscramble", "anagram"]):
return "special_text"
# Default to plain text
return "text"
def extract_content_url(self, question: str, content_type: str) -> Optional[str]:
"""
Extract URL or file path for content from the question.
Args:
question: The question containing content references
content_type: The detected content type
Returns:
str or None: Extracted URL or path, or None if not found
"""
if content_type == "youtube_video":
# Extract YouTube URL
youtube_match = re.search(r'(https?://(?:www\.)?(?:youtube\.com/watch\?v=|youtu\.be/)[a-zA-Z0-9_-]+)', question)
if youtube_match:
return youtube_match.group(1)
# Extract URLs for other content types
url_match = re.search(r'(https?://\S+)', question)
if url_match:
return url_match.group(1)
# Extract potential file paths
file_path_match = re.search(r'([/\\]?(?:[a-zA-Z0-9_-]+[/\\])*[a-zA-Z0-9_-]+\.(?:jpg|png|gif|mp3|mp4|pdf|xlsx|csv))', question)
if file_path_match:
return file_path_match.group(1)
return None
def process_content(self, content_type: str, content_reference: str, question: str) -> Dict[str, Any]:
"""
Process multimodal content using the appropriate specialized component.
Args:
content_type: The type of content to process
content_reference: URL, file path, or content itself
question: The question about the content
Returns:
dict: Processing results including answer and metadata
"""
# Check cache first
cache_key = f"{content_type}:{content_reference}:{question}"
if cache_key in self.processing_cache:
logger.info(f"Using cached result for {cache_key}")
return self.processing_cache[cache_key]
start_time = time.time()
# Initialize result structure
result = {
"content_type": content_type,
"reference": content_reference,
"question": question,
"answer": None,
"success": False,
"error": None,
"metadata": {},
"processing_time": 0
}
try:
# Process based on content type
if content_type in ["image", "chess_image"]:
if os.path.exists(content_reference):
analysis = self.image_analyzer.process_image(content_reference, question)
result["metadata"] = analysis
result["answer"] = analysis.get("description", "")
result["success"] = analysis.get("success", False)
# Handle chess-specific analysis
if content_type == "chess_image" and "position_evaluation" in analysis:
result["answer"] = f"Chess position analysis: {analysis['description']}"
elif content_type == "audio":
if os.path.exists(content_reference):
analysis = self.audio_analyzer.process_audio(content_reference, question)
result["metadata"] = analysis
result["answer"] = analysis.get("description", "")
if analysis.get("transcription"):
result["answer"] = f"Audio content: {analysis['transcription']}"
result["success"] = analysis.get("success", False)
elif content_type in ["video", "youtube_video"]:
analysis = self.video_analyzer.analyze_video_content(content_reference, question)
result["metadata"] = analysis
result["answer"] = analysis.get("content", "")
result["success"] = analysis.get("success", False)
elif content_type == "structured_data":
if os.path.exists(content_reference):
analysis = self.data_file_handler.process_file(content_reference, question)
result["metadata"] = analysis
result["answer"] = analysis.get("summary", "")
result["success"] = analysis.get("success", False)
elif content_type == "document":
if os.path.exists(content_reference):
analysis = self.document_analyzer.process_document(content_reference, question)
result["metadata"] = analysis
result["answer"] = analysis.get("content", "")
result["success"] = analysis.get("success", False)
elif content_type == "special_text":
analysis = self.text_analyzer.process_text_question(question)
result["metadata"] = analysis
result["answer"] = analysis.get("answer", "")
result["success"] = analysis.get("success", False)
else: # Default text processing
analysis = self.text_analyzer.analyze_text(question)
result["metadata"] = analysis
result["answer"] = analysis.get("summary", "")
result["success"] = analysis.get("success", False)
# If we still don't have an answer but processing was successful
if not result["answer"] and result["success"]:
result["answer"] = "Analysis was successful, but no specific answer could be generated."
# Add processing time
result["processing_time"] = time.time() - start_time
# Cache the result
self.processing_cache[cache_key] = result
return result
except Exception as e:
logger.error(f"Error processing {content_type} content: {str(e)}")
logger.debug(traceback.format_exc())
result["success"] = False
result["error"] = str(e)
result["processing_time"] = time.time() - start_time
return result
def process_question(self, question: str) -> Dict[str, Any]:
"""
Process a question that may reference multimodal content.
Args:
question: The question to process
Returns:
dict: Processing results including answer and metadata
"""
# Detect content type
content_type = self.detect_content_type(question)
logger.info(f"Detected content type: {content_type}")
# Extract content reference if applicable
content_reference = self.extract_content_url(question, content_type)
logger.info(f"Extracted content reference: {content_reference}")
# If we have a content reference, process it
if content_reference:
return self.process_content(content_type, content_reference, question)
# For special text without explicit references
if content_type == "special_text":
return self.process_content(content_type, question, question)
# If no content reference found, return a basic result
return {
"content_type": content_type,
"reference": None,
"question": question,
"answer": None,
"success": False,
"error": "No content reference found in the question",
"metadata": {},
"processing_time": 0
}