|
""" |
|
GAIA Multimodal Processor |
|
|
|
This module integrates different processing components for handling multimodal content: |
|
- Images (including chess positions) |
|
- Audio files |
|
- Video content |
|
- Text (including reversed text and word puzzles) |
|
- Data files and tables |
|
|
|
It provides a unified interface for multimodal content detection and processing. |
|
""" |
|
|
|
import os |
|
import re |
|
import logging |
|
import time |
|
from typing import Dict, Any, List, Optional, Union |
|
import traceback |
|
from pathlib import Path |
|
|
|
|
|
from src.gaia.agent.components.image_analyzer import ImageAnalyzer |
|
from src.gaia.agent.components.audio_analyzer import AudioAnalyzer |
|
from src.gaia.agent.components.video_analyzer import VideoAnalyzer |
|
from src.gaia.agent.components.text_analyzer import TextAnalyzer |
|
from src.gaia.agent.components.data_file_handler import DataFileHandler |
|
from src.gaia.agent.components.document_analyzer import DocumentAnalyzer |
|
from src.gaia.agent.components.table_processor import TableProcessor |
|
|
|
|
|
logger = logging.getLogger("gaia_agent.multimodal_processor") |
|
|
|
class MultimodalProcessor: |
|
""" |
|
Unified processor for multimodal content (images, audio, video, text, data files). |
|
|
|
This class orchestrates the detection and processing of different types of content |
|
using specialized component handlers. |
|
""" |
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None): |
|
""" |
|
Initialize the multimodal processor with configuration. |
|
|
|
Args: |
|
config: Optional configuration dictionary for components |
|
""" |
|
self.config = config or {} |
|
|
|
|
|
self._initialize_components() |
|
|
|
|
|
self.processing_cache = {} |
|
|
|
logger.info("Multimodal processor initialized") |
|
|
|
def _initialize_components(self): |
|
"""Initialize specialized processing components.""" |
|
logger.info("Initializing multimodal processing components") |
|
|
|
try: |
|
|
|
self.image_analyzer = ImageAnalyzer() |
|
|
|
|
|
self.audio_analyzer = AudioAnalyzer() |
|
|
|
|
|
self.video_analyzer = VideoAnalyzer() |
|
|
|
|
|
self.text_analyzer = TextAnalyzer() |
|
|
|
|
|
self.data_file_handler = DataFileHandler() |
|
|
|
|
|
self.document_analyzer = DocumentAnalyzer() |
|
|
|
|
|
self.table_processor = TableProcessor() |
|
|
|
logger.info("All processing components initialized") |
|
|
|
except Exception as e: |
|
logger.error(f"Error initializing components: {str(e)}") |
|
logger.debug(traceback.format_exc()) |
|
raise RuntimeError(f"Failed to initialize multimodal processing components: {str(e)}") |
|
|
|
def detect_content_type(self, question: str) -> str: |
|
""" |
|
Detect the type of content mentioned in a question. |
|
|
|
Args: |
|
question: The question to analyze |
|
|
|
Returns: |
|
str: Content type identifier |
|
""" |
|
question_lower = question.lower() |
|
|
|
|
|
if any(term in question_lower for term in ["image", "picture", "photo", "diagram", "chess position", "chess board"]): |
|
if "chess" in question_lower: |
|
return "chess_image" |
|
return "image" |
|
|
|
|
|
if any(term in question_lower for term in ["audio", "sound", "mp3", "recording", "listen"]): |
|
return "audio" |
|
|
|
|
|
if any(term in question_lower for term in ["video", "youtube", "watch"]): |
|
|
|
if "youtube.com/watch" in question_lower or "youtu.be/" in question_lower: |
|
return "youtube_video" |
|
return "video" |
|
|
|
|
|
if any(term in question_lower for term in ["table", "excel", "csv", "database", "spreadsheet"]): |
|
return "structured_data" |
|
|
|
|
|
if any(term in question_lower for term in ["pdf", "document", "article", "paper", "file"]): |
|
return "document" |
|
|
|
|
|
if any(term in question_lower for term in ["reversed", "backwards", "unscramble", "anagram"]): |
|
return "special_text" |
|
|
|
|
|
return "text" |
|
|
|
def extract_content_url(self, question: str, content_type: str) -> Optional[str]: |
|
""" |
|
Extract URL or file path for content from the question. |
|
|
|
Args: |
|
question: The question containing content references |
|
content_type: The detected content type |
|
|
|
Returns: |
|
str or None: Extracted URL or path, or None if not found |
|
""" |
|
if content_type == "youtube_video": |
|
|
|
youtube_match = re.search(r'(https?://(?:www\.)?(?:youtube\.com/watch\?v=|youtu\.be/)[a-zA-Z0-9_-]+)', question) |
|
if youtube_match: |
|
return youtube_match.group(1) |
|
|
|
|
|
url_match = re.search(r'(https?://\S+)', question) |
|
if url_match: |
|
return url_match.group(1) |
|
|
|
|
|
file_path_match = re.search(r'([/\\]?(?:[a-zA-Z0-9_-]+[/\\])*[a-zA-Z0-9_-]+\.(?:jpg|png|gif|mp3|mp4|pdf|xlsx|csv))', question) |
|
if file_path_match: |
|
return file_path_match.group(1) |
|
|
|
return None |
|
|
|
def process_content(self, content_type: str, content_reference: str, question: str) -> Dict[str, Any]: |
|
""" |
|
Process multimodal content using the appropriate specialized component. |
|
|
|
Args: |
|
content_type: The type of content to process |
|
content_reference: URL, file path, or content itself |
|
question: The question about the content |
|
|
|
Returns: |
|
dict: Processing results including answer and metadata |
|
""" |
|
|
|
cache_key = f"{content_type}:{content_reference}:{question}" |
|
if cache_key in self.processing_cache: |
|
logger.info(f"Using cached result for {cache_key}") |
|
return self.processing_cache[cache_key] |
|
|
|
start_time = time.time() |
|
|
|
|
|
result = { |
|
"content_type": content_type, |
|
"reference": content_reference, |
|
"question": question, |
|
"answer": None, |
|
"success": False, |
|
"error": None, |
|
"metadata": {}, |
|
"processing_time": 0 |
|
} |
|
|
|
try: |
|
|
|
if content_type in ["image", "chess_image"]: |
|
if os.path.exists(content_reference): |
|
analysis = self.image_analyzer.process_image(content_reference, question) |
|
result["metadata"] = analysis |
|
result["answer"] = analysis.get("description", "") |
|
result["success"] = analysis.get("success", False) |
|
|
|
|
|
if content_type == "chess_image" and "position_evaluation" in analysis: |
|
result["answer"] = f"Chess position analysis: {analysis['description']}" |
|
|
|
elif content_type == "audio": |
|
if os.path.exists(content_reference): |
|
analysis = self.audio_analyzer.process_audio(content_reference, question) |
|
result["metadata"] = analysis |
|
result["answer"] = analysis.get("description", "") |
|
if analysis.get("transcription"): |
|
result["answer"] = f"Audio content: {analysis['transcription']}" |
|
result["success"] = analysis.get("success", False) |
|
|
|
elif content_type in ["video", "youtube_video"]: |
|
analysis = self.video_analyzer.analyze_video_content(content_reference, question) |
|
result["metadata"] = analysis |
|
result["answer"] = analysis.get("content", "") |
|
result["success"] = analysis.get("success", False) |
|
|
|
elif content_type == "structured_data": |
|
if os.path.exists(content_reference): |
|
analysis = self.data_file_handler.process_file(content_reference, question) |
|
result["metadata"] = analysis |
|
result["answer"] = analysis.get("summary", "") |
|
result["success"] = analysis.get("success", False) |
|
|
|
elif content_type == "document": |
|
if os.path.exists(content_reference): |
|
analysis = self.document_analyzer.process_document(content_reference, question) |
|
result["metadata"] = analysis |
|
result["answer"] = analysis.get("content", "") |
|
result["success"] = analysis.get("success", False) |
|
|
|
elif content_type == "special_text": |
|
analysis = self.text_analyzer.process_text_question(question) |
|
result["metadata"] = analysis |
|
result["answer"] = analysis.get("answer", "") |
|
result["success"] = analysis.get("success", False) |
|
|
|
else: |
|
analysis = self.text_analyzer.analyze_text(question) |
|
result["metadata"] = analysis |
|
result["answer"] = analysis.get("summary", "") |
|
result["success"] = analysis.get("success", False) |
|
|
|
|
|
if not result["answer"] and result["success"]: |
|
result["answer"] = "Analysis was successful, but no specific answer could be generated." |
|
|
|
|
|
result["processing_time"] = time.time() - start_time |
|
|
|
|
|
self.processing_cache[cache_key] = result |
|
|
|
return result |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing {content_type} content: {str(e)}") |
|
logger.debug(traceback.format_exc()) |
|
|
|
result["success"] = False |
|
result["error"] = str(e) |
|
result["processing_time"] = time.time() - start_time |
|
|
|
return result |
|
|
|
def process_question(self, question: str) -> Dict[str, Any]: |
|
""" |
|
Process a question that may reference multimodal content. |
|
|
|
Args: |
|
question: The question to process |
|
|
|
Returns: |
|
dict: Processing results including answer and metadata |
|
""" |
|
|
|
content_type = self.detect_content_type(question) |
|
logger.info(f"Detected content type: {content_type}") |
|
|
|
|
|
content_reference = self.extract_content_url(question, content_type) |
|
logger.info(f"Extracted content reference: {content_reference}") |
|
|
|
|
|
if content_reference: |
|
return self.process_content(content_type, content_reference, question) |
|
|
|
|
|
if content_type == "special_text": |
|
return self.process_content(content_type, question, question) |
|
|
|
|
|
return { |
|
"content_type": content_type, |
|
"reference": None, |
|
"question": question, |
|
"answer": None, |
|
"success": False, |
|
"error": "No content reference found in the question", |
|
"metadata": {}, |
|
"processing_time": 0 |
|
} |