Spaces:
Running
Running
""" | |
Open Source Multimodal Tools | |
This module provides multimodal tool capabilities using open-source models: | |
- BLIP-2 and Mistral Vision models for image analysis | |
- Faster-Whisper for European audio transcription | |
- DistilBERT for document question answering | |
- Hugging Face transformers for various tasks | |
- No dependency on proprietary OpenAI models | |
Key Features: | |
- Image analysis using BLIP-2 or Mistral Vision | |
- Audio transcription using Faster-Whisper (European community-driven) | |
- Text generation using Mistral models | |
- Document processing and analysis | |
- All capabilities using open-source models with no API dependencies | |
""" | |
import os | |
import logging | |
import base64 | |
import io | |
from typing import Dict, Any, List, Optional, Union | |
from pathlib import Path | |
import requests | |
from PIL import Image | |
# Environment setup | |
from utils.environment_setup import get_api_key, has_api_key, should_suppress_warnings | |
# Mistral and open-source model imports | |
try: | |
# Try new API first (recommended) | |
from mistralai import Mistral as MistralClient | |
from mistralai import UserMessage | |
MISTRAL_AVAILABLE = True | |
MISTRAL_CLIENT_TYPE = "new" | |
except ImportError: | |
try: | |
# Fallback to old API (deprecated) | |
from mistralai.client import MistralClient | |
from mistralai import UserMessage | |
MISTRAL_AVAILABLE = True | |
MISTRAL_CLIENT_TYPE = "old" | |
except ImportError: | |
MistralClient = None | |
UserMessage = None | |
MISTRAL_AVAILABLE = False | |
MISTRAL_CLIENT_TYPE = None | |
# European Community-Driven Audio Processing | |
try: | |
# Faster-Whisper - Community-driven European alternative | |
# Optimized, CPU-friendly, 4x faster than original Whisper | |
# Developed by European open-source community | |
import faster_whisper | |
FASTER_WHISPER_AVAILABLE = True | |
except ImportError: | |
FASTER_WHISPER_AVAILABLE = False | |
# Audio processing availability (European community solution only) | |
AUDIO_AVAILABLE = FASTER_WHISPER_AVAILABLE | |
# Hugging Face transformers for additional capabilities | |
try: | |
from transformers import pipeline, AutoProcessor, AutoModel | |
import torch | |
TRANSFORMERS_AVAILABLE = True | |
except ImportError: | |
TRANSFORMERS_AVAILABLE = False | |
# AGNO framework | |
from agno.tools.toolkit import Toolkit | |
# Response formatting | |
from utils.response_formatter import ( | |
ResponseFormatter, | |
ResponseType, | |
FormatConfig, | |
FormatStandard, | |
) | |
logger = logging.getLogger(__name__) | |
class OpenSourceMultimodalTools(Toolkit): | |
""" | |
Open-source multimodal tools using Mistral and other open models. | |
This is a tool collection, not an agent. It provides multimodal capabilities | |
that can be integrated into AGNO agents. | |
Capabilities: | |
- Image analysis using BLIP-2 and Mistral Vision | |
- Audio transcription using Faster-Whisper (European community-driven) | |
- Document analysis using DistilBERT | |
- Text generation using Mistral models | |
- All using open-source models with no proprietary dependencies | |
""" | |
def __init__(self): | |
"""Initialize the Mistral-based multimodal agent.""" | |
logger.info("🚀 Initializing Mistral Multimodal Agent (Open Source)...") | |
# Load environment variables from .env file | |
self._load_env_file() | |
# Initialize response formatter | |
self._init_response_formatter() | |
# Initialize Mistral client | |
self.mistral_client = None | |
self.mistral_api_key = get_api_key('mistral') | |
if self.mistral_api_key and MISTRAL_AVAILABLE and MistralClient: | |
try: | |
if MISTRAL_CLIENT_TYPE == "new": | |
# New API initialization | |
self.mistral_client = MistralClient(api_key=self.mistral_api_key) | |
logger.info("✅ Mistral client initialized (new API)") | |
else: | |
# Old API initialization (deprecated) | |
self.mistral_client = MistralClient(api_key=self.mistral_api_key) | |
logger.info("✅ Mistral client initialized (old API - deprecated)") | |
except Exception as e: | |
if not should_suppress_warnings(): | |
logger.warning(f"⚠️ Mistral client initialization failed: {e}") | |
else: | |
if not should_suppress_warnings(): | |
if not MISTRAL_AVAILABLE: | |
logger.info("ℹ️ Mistral library not available - using fallback models") | |
elif not self.mistral_api_key: | |
logger.info("ℹ️ MISTRAL_API_KEY not found - using open-source alternatives") | |
# Initialize open-source models | |
self.whisper_model = None | |
self.vision_pipeline = None | |
self.document_pipeline = None | |
self._init_open_source_models() | |
# Track available capabilities | |
self.capabilities = self._assess_capabilities() | |
# Build tools list for AGNO registration | |
tools = [ | |
self.analyze_image, | |
self.transcribe_audio, | |
self.analyze_document | |
] | |
# Initialize the toolkit with auto-registration enabled | |
super().__init__(name="multimodal_tools", tools=tools) | |
logger.info("✅ Mistral Multimodal Agent initialized") | |
logger.info(f"📊 Available capabilities: {list(self.capabilities.keys())}") | |
logger.info(f"🔧 Registered AGNO tools: {[tool.__name__ for tool in tools]}") | |
def _load_env_file(self): | |
"""Load environment variables from .env file if it exists.""" | |
from pathlib import Path | |
env_file = Path('.env') | |
if env_file.exists(): | |
with open(env_file, 'r') as f: | |
for line in f: | |
line = line.strip() | |
if line and not line.startswith('#') and '=' in line: | |
key, value = line.split('=', 1) | |
os.environ[key.strip()] = value.strip() | |
logger.info("✅ Environment variables loaded from .env file") | |
# Reload the environment manager to pick up new variables | |
from utils.environment_setup import env_manager | |
env_manager._load_environment() | |
def _init_response_formatter(self): | |
"""Initialize response formatter for consistent output.""" | |
format_config = FormatConfig( | |
format_standard=FormatStandard.HF_EVALUATION, | |
remove_markdown=True, | |
remove_prefixes=True, | |
strip_whitespace=True, | |
normalize_spaces=True | |
) | |
self.response_formatter = ResponseFormatter(config=format_config) | |
def _init_open_source_models(self): | |
"""Initialize open-source models for multimodal capabilities.""" | |
# Initialize Faster-Whisper (European community-driven alternative) | |
self.whisper_model = None | |
if FASTER_WHISPER_AVAILABLE: | |
try: | |
# Use CPU-optimized configuration for European deployment | |
self.whisper_model = faster_whisper.WhisperModel( | |
"base", # Lightweight model for efficiency | |
device="cpu", # CPU-friendly for European servers | |
compute_type="int8", # Memory-efficient quantization | |
num_workers=1 # Conservative resource usage | |
) | |
logger.info("✅ Faster-Whisper loaded (European community-driven alternative)") | |
logger.info("🇪🇺 Using CPU-optimized configuration for European deployment") | |
except Exception as e: | |
logger.warning(f"⚠️ Faster-Whisper loading failed: {e}") | |
if not self.whisper_model: | |
logger.warning("⚠️ No audio transcription available") | |
logger.info("💡 Install: pip install faster-whisper (European community alternative)") | |
# Initialize vision pipeline using open models | |
if TRANSFORMERS_AVAILABLE: | |
try: | |
# Use BLIP-2 for image captioning (open source) | |
self.vision_pipeline = pipeline( | |
"image-to-text", | |
model="Salesforce/blip-image-captioning-base", | |
device=0 if torch.cuda.is_available() else -1 | |
) | |
logger.info("✅ Vision pipeline initialized (BLIP-2)") | |
except Exception as e: | |
logger.warning(f"⚠️ Vision pipeline initialization failed: {e}") | |
try: | |
# Document analysis pipeline | |
self.document_pipeline = pipeline( | |
"question-answering", | |
model="distilbert-base-cased-distilled-squad" | |
) | |
logger.info("✅ Document analysis pipeline initialized") | |
except Exception as e: | |
logger.warning(f"⚠️ Document pipeline initialization failed: {e}") | |
def _assess_capabilities(self) -> Dict[str, bool]: | |
"""Assess what multimodal capabilities are available.""" | |
return { | |
'text_generation': self.mistral_client is not None, | |
'image_analysis': self.vision_pipeline is not None or self.mistral_client is not None, | |
'audio_transcription': self.whisper_model is not None, | |
'document_analysis': self.document_pipeline is not None, | |
'vision_reasoning': self.mistral_client is not None, # Mistral Vision | |
} | |
def analyze_image(self, image_input: Union[str, bytes, Image.Image, dict], question: str = None) -> str: | |
""" | |
Analyze an image using open-source models. | |
Args: | |
image_input: Image file path, bytes, PIL Image, or dict with file_path | |
question: Optional specific question about the image | |
Returns: | |
Analysis result as string | |
""" | |
try: | |
# Convert input to PIL Image | |
if isinstance(image_input, dict): | |
# Handle AGNO tool format: {'file_path': 'image.png'} | |
if 'file_path' in image_input: | |
image_path = image_input['file_path'] | |
if os.path.exists(image_path): | |
image = Image.open(image_path) | |
else: | |
return f"Error: Image file not found: {image_path}" | |
else: | |
return "Error: Dictionary input must contain 'file_path' key" | |
elif isinstance(image_input, str): | |
if os.path.exists(image_input): | |
image = Image.open(image_input) | |
else: | |
# Assume it's a URL | |
response = requests.get(image_input) | |
image = Image.open(io.BytesIO(response.content)) | |
elif isinstance(image_input, bytes): | |
image = Image.open(io.BytesIO(image_input)) | |
elif isinstance(image_input, Image.Image): | |
image = image_input | |
else: | |
return "Error: Unsupported image input format" | |
# Try Mistral Vision first (if available) | |
if self.mistral_client and question: | |
try: | |
result = self._analyze_with_mistral_vision(image, question) | |
if result: | |
return result | |
except Exception as e: | |
logger.warning(f"Mistral Vision failed: {e}") | |
# Fallback to open-source vision pipeline | |
if self.vision_pipeline: | |
try: | |
# Generate image caption | |
caption_result = self.vision_pipeline(image) | |
caption = caption_result[0]['generated_text'] if caption_result else "Unable to generate caption" | |
if question: | |
# Use Mistral to reason about the image based on caption | |
if self.mistral_client: | |
reasoning_prompt = f""" | |
Image Description: {caption} | |
Question: {question} | |
Based on the image description, please answer the question about the image. | |
""" | |
if MISTRAL_CLIENT_TYPE == "new": | |
response = self.mistral_client.chat.complete( | |
model="mistral-large-latest", | |
messages=[UserMessage(content=reasoning_prompt)] | |
) | |
else: | |
# Old API format (deprecated) | |
response = self.mistral_client.chat( | |
model="mistral-large-latest", | |
messages=[UserMessage(content=reasoning_prompt)] | |
) | |
return response.choices[0].message.content | |
else: | |
return f"Image shows: {caption}. Question: {question} (Unable to reason without Mistral API)" | |
else: | |
return f"Image analysis: {caption}" | |
except Exception as e: | |
logger.error(f"Vision pipeline failed: {e}") | |
return f"Error analyzing image: {e}" | |
return "Error: No image analysis capabilities available" | |
except Exception as e: | |
logger.error(f"Image analysis failed: {e}") | |
return f"Error: {e}" | |
def _analyze_with_mistral_vision(self, image: Image.Image, question: str) -> Optional[str]: | |
""" | |
Analyze image using Mistral Vision model. | |
Args: | |
image: PIL Image object | |
question: Question about the image | |
Returns: | |
Analysis result or None if failed | |
""" | |
try: | |
# Convert image to base64 | |
buffer = io.BytesIO() | |
image.save(buffer, format='PNG') | |
image_b64 = base64.b64encode(buffer.getvalue()).decode() | |
# Create message with image - compatible with both API versions | |
messages = [ | |
UserMessage( | |
content=[ | |
{ | |
"type": "text", | |
"text": question | |
}, | |
{ | |
"type": "image_url", | |
"image_url": f"data:image/png;base64,{image_b64}" | |
} | |
] | |
) | |
] | |
# Use Mistral Vision model - different API call formats | |
if MISTRAL_CLIENT_TYPE == "new": | |
response = self.mistral_client.chat.complete( | |
model="pixtral-12b-2409", # Mistral's vision model | |
messages=messages | |
) | |
else: | |
# Old API format (deprecated) | |
response = self.mistral_client.chat( | |
model="pixtral-12b-2409", # Mistral's vision model | |
messages=messages | |
) | |
return response.choices[0].message.content | |
except Exception as e: | |
logger.warning(f"Mistral Vision analysis failed: {e}") | |
return None | |
def transcribe_audio(self, audio_input: Union[str, bytes, dict]) -> str: | |
""" | |
Transcribe audio using Faster-Whisper (European community-driven alternative). | |
Args: | |
audio_input: Audio file path, bytes, or dict with 'file_path' key | |
Returns: | |
Transcription text | |
""" | |
if not self.whisper_model: | |
return "Error: Audio transcription not available (Faster-Whisper not loaded)" | |
try: | |
# Handle different input types from AGNO framework | |
if isinstance(audio_input, dict): | |
# AGNO passes {'file_path': '/path/to/file'} | |
if 'file_path' in audio_input: | |
file_path = audio_input['file_path'] | |
else: | |
return "Error: Invalid audio input format - expected 'file_path' key in dict" | |
elif isinstance(audio_input, str): | |
# Direct file path | |
file_path = audio_input | |
elif isinstance(audio_input, bytes): | |
# Handle bytes input - save to temporary file | |
import tempfile | |
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp: | |
tmp.write(audio_input) | |
tmp.flush() | |
file_path = tmp.name | |
else: | |
return f"Error: Unsupported audio input type: {type(audio_input)}" | |
# Transcribe using Faster-Whisper | |
segments, info = self.whisper_model.transcribe(file_path) | |
transcription = " ".join([segment.text for segment in segments]) | |
# Clean up temporary file if we created one | |
if isinstance(audio_input, bytes): | |
os.unlink(file_path) | |
logger.info(f"🇪🇺 Audio transcribed using Faster-Whisper (European community)") | |
return transcription.strip() | |
except Exception as e: | |
logger.error(f"Audio transcription failed: {e}") | |
return f"Error transcribing audio: {e}" | |
def analyze_document(self, document_text: str, question: str) -> str: | |
""" | |
Analyze document content and answer questions. | |
Args: | |
document_text: Text content of document | |
question: Question about the document | |
Returns: | |
Answer based on document analysis | |
""" | |
try: | |
# Use Mistral for complex reasoning if available | |
if self.mistral_client: | |
prompt = f""" | |
Document Content: | |
{document_text[:4000]} # Limit length | |
Question: {question} | |
Please analyze the document and answer the question based on the content provided. | |
""" | |
if MISTRAL_CLIENT_TYPE == "new": | |
response = self.mistral_client.chat.complete( | |
model="mistral-large-latest", | |
messages=[UserMessage(content=prompt)] | |
) | |
else: | |
# Old API format (deprecated) | |
response = self.mistral_client.chat( | |
model="mistral-large-latest", | |
messages=[UserMessage(content=prompt)] | |
) | |
return response.choices[0].message.content | |
# Fallback to simple QA pipeline | |
elif self.document_pipeline: | |
result = self.document_pipeline( | |
question=question, | |
context=document_text[:1000] # Limit context length | |
) | |
return result['answer'] | |
else: | |
return "Error: Document analysis not available" | |
except Exception as e: | |
logger.error(f"Document analysis failed: {e}") | |
return f"Error analyzing document: {e}" | |
def generate_text(self, prompt: str, max_tokens: int = 500) -> str: | |
""" | |
Generate text using Mistral model. | |
Args: | |
prompt: Input prompt | |
max_tokens: Maximum tokens to generate | |
Returns: | |
Generated text | |
""" | |
if not self.mistral_client: | |
return "Error: Text generation not available (Mistral API key required)" | |
try: | |
if MISTRAL_CLIENT_TYPE == "new": | |
response = self.mistral_client.chat.complete( | |
model="mistral-large-latest", | |
messages=[UserMessage(content=prompt)], | |
max_tokens=max_tokens | |
) | |
else: | |
# Old API format (deprecated) | |
response = self.mistral_client.chat( | |
model="mistral-large-latest", | |
messages=[UserMessage(content=prompt)], | |
max_tokens=max_tokens | |
) | |
return response.choices[0].message.content | |
except Exception as e: | |
logger.error(f"Text generation failed: {e}") | |
return f"Error generating text: {e}" | |
def __call__(self, question: str, **kwargs) -> str: | |
""" | |
Main interface for the multimodal agent. | |
Args: | |
question: User question/request | |
**kwargs: Additional parameters (image, audio, document, etc.) | |
Returns: | |
Formatted response | |
""" | |
try: | |
logger.info(f"🤔 Processing multimodal question: {question[:100]}...") | |
# Check for multimodal inputs | |
if 'image' in kwargs: | |
result = self.analyze_image(kwargs['image'], question) | |
elif 'audio' in kwargs: | |
# First transcribe, then process | |
transcription = self.transcribe_audio(kwargs['audio']) | |
combined_question = f"Audio transcription: {transcription}\nQuestion: {question}" | |
result = self.generate_text(combined_question) | |
elif 'document' in kwargs: | |
result = self.analyze_document(kwargs['document'], question) | |
else: | |
# Pure text generation | |
result = self.generate_text(question) | |
# Format response | |
formatted_result = self.response_formatter.format_response( | |
result, | |
response_type=ResponseType.DIRECT_ANSWER | |
) | |
logger.info(f"📤 Mistral Multimodal Agent response: {formatted_result[:100]}...") | |
return formatted_result | |
except Exception as e: | |
logger.error(f"Multimodal processing failed: {e}") | |
return "Error processing multimodal request" | |
def get_capabilities_status(self) -> Dict[str, Any]: | |
"""Get detailed status of multimodal capabilities.""" | |
return { | |
'agent_type': 'mistral_multimodal', | |
'capabilities': self.capabilities, | |
'models': { | |
'text_generation': 'mistral-large-latest' if self.mistral_client else None, | |
'vision': 'pixtral-12b-2409' if self.mistral_client else 'BLIP-2', | |
'audio': 'faster-whisper-base' if self.whisper_model else None, | |
'document_qa': 'distilbert-base-cased' if self.document_pipeline else None, | |
}, | |
'dependencies': { | |
'mistral_api': self.mistral_client is not None, | |
'whisper': FASTER_WHISPER_AVAILABLE and self.whisper_model is not None, | |
'transformers': TRANSFORMERS_AVAILABLE, | |
'vision_pipeline': self.vision_pipeline is not None, | |
} | |
} | |
# Convenience function for easy import | |
def create_mistral_multimodal_agent(): | |
"""Create and return an open-source multimodal tools instance.""" | |
return OpenSourceMultimodalTools() | |
def create_open_source_multimodal_tools(): | |
"""Create and return an open-source multimodal tools instance.""" | |
return OpenSourceMultimodalTools() |