gaia-enhanced-agent / agents /mistral_multimodal_agent.py
GAIA Agent Deployment
Deploy Complete Enhanced GAIA Agent with Phase 1-6 Improvements
9a6a4dc
"""
Open Source Multimodal Tools
This module provides multimodal tool capabilities using open-source models:
- BLIP-2 and Mistral Vision models for image analysis
- Faster-Whisper for European audio transcription
- DistilBERT for document question answering
- Hugging Face transformers for various tasks
- No dependency on proprietary OpenAI models
Key Features:
- Image analysis using BLIP-2 or Mistral Vision
- Audio transcription using Faster-Whisper (European community-driven)
- Text generation using Mistral models
- Document processing and analysis
- All capabilities using open-source models with no API dependencies
"""
import os
import logging
import base64
import io
from typing import Dict, Any, List, Optional, Union
from pathlib import Path
import requests
from PIL import Image
# Environment setup
from utils.environment_setup import get_api_key, has_api_key, should_suppress_warnings
# Mistral and open-source model imports
try:
# Try new API first (recommended)
from mistralai import Mistral as MistralClient
from mistralai import UserMessage
MISTRAL_AVAILABLE = True
MISTRAL_CLIENT_TYPE = "new"
except ImportError:
try:
# Fallback to old API (deprecated)
from mistralai.client import MistralClient
from mistralai import UserMessage
MISTRAL_AVAILABLE = True
MISTRAL_CLIENT_TYPE = "old"
except ImportError:
MistralClient = None
UserMessage = None
MISTRAL_AVAILABLE = False
MISTRAL_CLIENT_TYPE = None
# European Community-Driven Audio Processing
try:
# Faster-Whisper - Community-driven European alternative
# Optimized, CPU-friendly, 4x faster than original Whisper
# Developed by European open-source community
import faster_whisper
FASTER_WHISPER_AVAILABLE = True
except ImportError:
FASTER_WHISPER_AVAILABLE = False
# Audio processing availability (European community solution only)
AUDIO_AVAILABLE = FASTER_WHISPER_AVAILABLE
# Hugging Face transformers for additional capabilities
try:
from transformers import pipeline, AutoProcessor, AutoModel
import torch
TRANSFORMERS_AVAILABLE = True
except ImportError:
TRANSFORMERS_AVAILABLE = False
# AGNO framework
from agno.tools.toolkit import Toolkit
# Response formatting
from utils.response_formatter import (
ResponseFormatter,
ResponseType,
FormatConfig,
FormatStandard,
)
logger = logging.getLogger(__name__)
class OpenSourceMultimodalTools(Toolkit):
"""
Open-source multimodal tools using Mistral and other open models.
This is a tool collection, not an agent. It provides multimodal capabilities
that can be integrated into AGNO agents.
Capabilities:
- Image analysis using BLIP-2 and Mistral Vision
- Audio transcription using Faster-Whisper (European community-driven)
- Document analysis using DistilBERT
- Text generation using Mistral models
- All using open-source models with no proprietary dependencies
"""
def __init__(self):
"""Initialize the Mistral-based multimodal agent."""
logger.info("🚀 Initializing Mistral Multimodal Agent (Open Source)...")
# Load environment variables from .env file
self._load_env_file()
# Initialize response formatter
self._init_response_formatter()
# Initialize Mistral client
self.mistral_client = None
self.mistral_api_key = get_api_key('mistral')
if self.mistral_api_key and MISTRAL_AVAILABLE and MistralClient:
try:
if MISTRAL_CLIENT_TYPE == "new":
# New API initialization
self.mistral_client = MistralClient(api_key=self.mistral_api_key)
logger.info("✅ Mistral client initialized (new API)")
else:
# Old API initialization (deprecated)
self.mistral_client = MistralClient(api_key=self.mistral_api_key)
logger.info("✅ Mistral client initialized (old API - deprecated)")
except Exception as e:
if not should_suppress_warnings():
logger.warning(f"⚠️ Mistral client initialization failed: {e}")
else:
if not should_suppress_warnings():
if not MISTRAL_AVAILABLE:
logger.info("ℹ️ Mistral library not available - using fallback models")
elif not self.mistral_api_key:
logger.info("ℹ️ MISTRAL_API_KEY not found - using open-source alternatives")
# Initialize open-source models
self.whisper_model = None
self.vision_pipeline = None
self.document_pipeline = None
self._init_open_source_models()
# Track available capabilities
self.capabilities = self._assess_capabilities()
# Build tools list for AGNO registration
tools = [
self.analyze_image,
self.transcribe_audio,
self.analyze_document
]
# Initialize the toolkit with auto-registration enabled
super().__init__(name="multimodal_tools", tools=tools)
logger.info("✅ Mistral Multimodal Agent initialized")
logger.info(f"📊 Available capabilities: {list(self.capabilities.keys())}")
logger.info(f"🔧 Registered AGNO tools: {[tool.__name__ for tool in tools]}")
def _load_env_file(self):
"""Load environment variables from .env file if it exists."""
from pathlib import Path
env_file = Path('.env')
if env_file.exists():
with open(env_file, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
os.environ[key.strip()] = value.strip()
logger.info("✅ Environment variables loaded from .env file")
# Reload the environment manager to pick up new variables
from utils.environment_setup import env_manager
env_manager._load_environment()
def _init_response_formatter(self):
"""Initialize response formatter for consistent output."""
format_config = FormatConfig(
format_standard=FormatStandard.HF_EVALUATION,
remove_markdown=True,
remove_prefixes=True,
strip_whitespace=True,
normalize_spaces=True
)
self.response_formatter = ResponseFormatter(config=format_config)
def _init_open_source_models(self):
"""Initialize open-source models for multimodal capabilities."""
# Initialize Faster-Whisper (European community-driven alternative)
self.whisper_model = None
if FASTER_WHISPER_AVAILABLE:
try:
# Use CPU-optimized configuration for European deployment
self.whisper_model = faster_whisper.WhisperModel(
"base", # Lightweight model for efficiency
device="cpu", # CPU-friendly for European servers
compute_type="int8", # Memory-efficient quantization
num_workers=1 # Conservative resource usage
)
logger.info("✅ Faster-Whisper loaded (European community-driven alternative)")
logger.info("🇪🇺 Using CPU-optimized configuration for European deployment")
except Exception as e:
logger.warning(f"⚠️ Faster-Whisper loading failed: {e}")
if not self.whisper_model:
logger.warning("⚠️ No audio transcription available")
logger.info("💡 Install: pip install faster-whisper (European community alternative)")
# Initialize vision pipeline using open models
if TRANSFORMERS_AVAILABLE:
try:
# Use BLIP-2 for image captioning (open source)
self.vision_pipeline = pipeline(
"image-to-text",
model="Salesforce/blip-image-captioning-base",
device=0 if torch.cuda.is_available() else -1
)
logger.info("✅ Vision pipeline initialized (BLIP-2)")
except Exception as e:
logger.warning(f"⚠️ Vision pipeline initialization failed: {e}")
try:
# Document analysis pipeline
self.document_pipeline = pipeline(
"question-answering",
model="distilbert-base-cased-distilled-squad"
)
logger.info("✅ Document analysis pipeline initialized")
except Exception as e:
logger.warning(f"⚠️ Document pipeline initialization failed: {e}")
def _assess_capabilities(self) -> Dict[str, bool]:
"""Assess what multimodal capabilities are available."""
return {
'text_generation': self.mistral_client is not None,
'image_analysis': self.vision_pipeline is not None or self.mistral_client is not None,
'audio_transcription': self.whisper_model is not None,
'document_analysis': self.document_pipeline is not None,
'vision_reasoning': self.mistral_client is not None, # Mistral Vision
}
def analyze_image(self, image_input: Union[str, bytes, Image.Image, dict], question: str = None) -> str:
"""
Analyze an image using open-source models.
Args:
image_input: Image file path, bytes, PIL Image, or dict with file_path
question: Optional specific question about the image
Returns:
Analysis result as string
"""
try:
# Convert input to PIL Image
if isinstance(image_input, dict):
# Handle AGNO tool format: {'file_path': 'image.png'}
if 'file_path' in image_input:
image_path = image_input['file_path']
if os.path.exists(image_path):
image = Image.open(image_path)
else:
return f"Error: Image file not found: {image_path}"
else:
return "Error: Dictionary input must contain 'file_path' key"
elif isinstance(image_input, str):
if os.path.exists(image_input):
image = Image.open(image_input)
else:
# Assume it's a URL
response = requests.get(image_input)
image = Image.open(io.BytesIO(response.content))
elif isinstance(image_input, bytes):
image = Image.open(io.BytesIO(image_input))
elif isinstance(image_input, Image.Image):
image = image_input
else:
return "Error: Unsupported image input format"
# Try Mistral Vision first (if available)
if self.mistral_client and question:
try:
result = self._analyze_with_mistral_vision(image, question)
if result:
return result
except Exception as e:
logger.warning(f"Mistral Vision failed: {e}")
# Fallback to open-source vision pipeline
if self.vision_pipeline:
try:
# Generate image caption
caption_result = self.vision_pipeline(image)
caption = caption_result[0]['generated_text'] if caption_result else "Unable to generate caption"
if question:
# Use Mistral to reason about the image based on caption
if self.mistral_client:
reasoning_prompt = f"""
Image Description: {caption}
Question: {question}
Based on the image description, please answer the question about the image.
"""
if MISTRAL_CLIENT_TYPE == "new":
response = self.mistral_client.chat.complete(
model="mistral-large-latest",
messages=[UserMessage(content=reasoning_prompt)]
)
else:
# Old API format (deprecated)
response = self.mistral_client.chat(
model="mistral-large-latest",
messages=[UserMessage(content=reasoning_prompt)]
)
return response.choices[0].message.content
else:
return f"Image shows: {caption}. Question: {question} (Unable to reason without Mistral API)"
else:
return f"Image analysis: {caption}"
except Exception as e:
logger.error(f"Vision pipeline failed: {e}")
return f"Error analyzing image: {e}"
return "Error: No image analysis capabilities available"
except Exception as e:
logger.error(f"Image analysis failed: {e}")
return f"Error: {e}"
def _analyze_with_mistral_vision(self, image: Image.Image, question: str) -> Optional[str]:
"""
Analyze image using Mistral Vision model.
Args:
image: PIL Image object
question: Question about the image
Returns:
Analysis result or None if failed
"""
try:
# Convert image to base64
buffer = io.BytesIO()
image.save(buffer, format='PNG')
image_b64 = base64.b64encode(buffer.getvalue()).decode()
# Create message with image - compatible with both API versions
messages = [
UserMessage(
content=[
{
"type": "text",
"text": question
},
{
"type": "image_url",
"image_url": f"data:image/png;base64,{image_b64}"
}
]
)
]
# Use Mistral Vision model - different API call formats
if MISTRAL_CLIENT_TYPE == "new":
response = self.mistral_client.chat.complete(
model="pixtral-12b-2409", # Mistral's vision model
messages=messages
)
else:
# Old API format (deprecated)
response = self.mistral_client.chat(
model="pixtral-12b-2409", # Mistral's vision model
messages=messages
)
return response.choices[0].message.content
except Exception as e:
logger.warning(f"Mistral Vision analysis failed: {e}")
return None
def transcribe_audio(self, audio_input: Union[str, bytes, dict]) -> str:
"""
Transcribe audio using Faster-Whisper (European community-driven alternative).
Args:
audio_input: Audio file path, bytes, or dict with 'file_path' key
Returns:
Transcription text
"""
if not self.whisper_model:
return "Error: Audio transcription not available (Faster-Whisper not loaded)"
try:
# Handle different input types from AGNO framework
if isinstance(audio_input, dict):
# AGNO passes {'file_path': '/path/to/file'}
if 'file_path' in audio_input:
file_path = audio_input['file_path']
else:
return "Error: Invalid audio input format - expected 'file_path' key in dict"
elif isinstance(audio_input, str):
# Direct file path
file_path = audio_input
elif isinstance(audio_input, bytes):
# Handle bytes input - save to temporary file
import tempfile
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp:
tmp.write(audio_input)
tmp.flush()
file_path = tmp.name
else:
return f"Error: Unsupported audio input type: {type(audio_input)}"
# Transcribe using Faster-Whisper
segments, info = self.whisper_model.transcribe(file_path)
transcription = " ".join([segment.text for segment in segments])
# Clean up temporary file if we created one
if isinstance(audio_input, bytes):
os.unlink(file_path)
logger.info(f"🇪🇺 Audio transcribed using Faster-Whisper (European community)")
return transcription.strip()
except Exception as e:
logger.error(f"Audio transcription failed: {e}")
return f"Error transcribing audio: {e}"
def analyze_document(self, document_text: str, question: str) -> str:
"""
Analyze document content and answer questions.
Args:
document_text: Text content of document
question: Question about the document
Returns:
Answer based on document analysis
"""
try:
# Use Mistral for complex reasoning if available
if self.mistral_client:
prompt = f"""
Document Content:
{document_text[:4000]} # Limit length
Question: {question}
Please analyze the document and answer the question based on the content provided.
"""
if MISTRAL_CLIENT_TYPE == "new":
response = self.mistral_client.chat.complete(
model="mistral-large-latest",
messages=[UserMessage(content=prompt)]
)
else:
# Old API format (deprecated)
response = self.mistral_client.chat(
model="mistral-large-latest",
messages=[UserMessage(content=prompt)]
)
return response.choices[0].message.content
# Fallback to simple QA pipeline
elif self.document_pipeline:
result = self.document_pipeline(
question=question,
context=document_text[:1000] # Limit context length
)
return result['answer']
else:
return "Error: Document analysis not available"
except Exception as e:
logger.error(f"Document analysis failed: {e}")
return f"Error analyzing document: {e}"
def generate_text(self, prompt: str, max_tokens: int = 500) -> str:
"""
Generate text using Mistral model.
Args:
prompt: Input prompt
max_tokens: Maximum tokens to generate
Returns:
Generated text
"""
if not self.mistral_client:
return "Error: Text generation not available (Mistral API key required)"
try:
if MISTRAL_CLIENT_TYPE == "new":
response = self.mistral_client.chat.complete(
model="mistral-large-latest",
messages=[UserMessage(content=prompt)],
max_tokens=max_tokens
)
else:
# Old API format (deprecated)
response = self.mistral_client.chat(
model="mistral-large-latest",
messages=[UserMessage(content=prompt)],
max_tokens=max_tokens
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"Text generation failed: {e}")
return f"Error generating text: {e}"
def __call__(self, question: str, **kwargs) -> str:
"""
Main interface for the multimodal agent.
Args:
question: User question/request
**kwargs: Additional parameters (image, audio, document, etc.)
Returns:
Formatted response
"""
try:
logger.info(f"🤔 Processing multimodal question: {question[:100]}...")
# Check for multimodal inputs
if 'image' in kwargs:
result = self.analyze_image(kwargs['image'], question)
elif 'audio' in kwargs:
# First transcribe, then process
transcription = self.transcribe_audio(kwargs['audio'])
combined_question = f"Audio transcription: {transcription}\nQuestion: {question}"
result = self.generate_text(combined_question)
elif 'document' in kwargs:
result = self.analyze_document(kwargs['document'], question)
else:
# Pure text generation
result = self.generate_text(question)
# Format response
formatted_result = self.response_formatter.format_response(
result,
response_type=ResponseType.DIRECT_ANSWER
)
logger.info(f"📤 Mistral Multimodal Agent response: {formatted_result[:100]}...")
return formatted_result
except Exception as e:
logger.error(f"Multimodal processing failed: {e}")
return "Error processing multimodal request"
def get_capabilities_status(self) -> Dict[str, Any]:
"""Get detailed status of multimodal capabilities."""
return {
'agent_type': 'mistral_multimodal',
'capabilities': self.capabilities,
'models': {
'text_generation': 'mistral-large-latest' if self.mistral_client else None,
'vision': 'pixtral-12b-2409' if self.mistral_client else 'BLIP-2',
'audio': 'faster-whisper-base' if self.whisper_model else None,
'document_qa': 'distilbert-base-cased' if self.document_pipeline else None,
},
'dependencies': {
'mistral_api': self.mistral_client is not None,
'whisper': FASTER_WHISPER_AVAILABLE and self.whisper_model is not None,
'transformers': TRANSFORMERS_AVAILABLE,
'vision_pipeline': self.vision_pipeline is not None,
}
}
# Convenience function for easy import
def create_mistral_multimodal_agent():
"""Create and return an open-source multimodal tools instance."""
return OpenSourceMultimodalTools()
def create_open_source_multimodal_tools():
"""Create and return an open-source multimodal tools instance."""
return OpenSourceMultimodalTools()