Spaces:
Running
Running
File size: 24,027 Bytes
9a6a4dc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 |
"""
Open Source Multimodal Tools
This module provides multimodal tool capabilities using open-source models:
- BLIP-2 and Mistral Vision models for image analysis
- Faster-Whisper for European audio transcription
- DistilBERT for document question answering
- Hugging Face transformers for various tasks
- No dependency on proprietary OpenAI models
Key Features:
- Image analysis using BLIP-2 or Mistral Vision
- Audio transcription using Faster-Whisper (European community-driven)
- Text generation using Mistral models
- Document processing and analysis
- All capabilities using open-source models with no API dependencies
"""
import os
import logging
import base64
import io
from typing import Dict, Any, List, Optional, Union
from pathlib import Path
import requests
from PIL import Image
# Environment setup
from utils.environment_setup import get_api_key, has_api_key, should_suppress_warnings
# Mistral and open-source model imports
try:
# Try new API first (recommended)
from mistralai import Mistral as MistralClient
from mistralai import UserMessage
MISTRAL_AVAILABLE = True
MISTRAL_CLIENT_TYPE = "new"
except ImportError:
try:
# Fallback to old API (deprecated)
from mistralai.client import MistralClient
from mistralai import UserMessage
MISTRAL_AVAILABLE = True
MISTRAL_CLIENT_TYPE = "old"
except ImportError:
MistralClient = None
UserMessage = None
MISTRAL_AVAILABLE = False
MISTRAL_CLIENT_TYPE = None
# European Community-Driven Audio Processing
try:
# Faster-Whisper - Community-driven European alternative
# Optimized, CPU-friendly, 4x faster than original Whisper
# Developed by European open-source community
import faster_whisper
FASTER_WHISPER_AVAILABLE = True
except ImportError:
FASTER_WHISPER_AVAILABLE = False
# Audio processing availability (European community solution only)
AUDIO_AVAILABLE = FASTER_WHISPER_AVAILABLE
# Hugging Face transformers for additional capabilities
try:
from transformers import pipeline, AutoProcessor, AutoModel
import torch
TRANSFORMERS_AVAILABLE = True
except ImportError:
TRANSFORMERS_AVAILABLE = False
# AGNO framework
from agno.tools.toolkit import Toolkit
# Response formatting
from utils.response_formatter import (
ResponseFormatter,
ResponseType,
FormatConfig,
FormatStandard,
)
logger = logging.getLogger(__name__)
class OpenSourceMultimodalTools(Toolkit):
"""
Open-source multimodal tools using Mistral and other open models.
This is a tool collection, not an agent. It provides multimodal capabilities
that can be integrated into AGNO agents.
Capabilities:
- Image analysis using BLIP-2 and Mistral Vision
- Audio transcription using Faster-Whisper (European community-driven)
- Document analysis using DistilBERT
- Text generation using Mistral models
- All using open-source models with no proprietary dependencies
"""
def __init__(self):
"""Initialize the Mistral-based multimodal agent."""
logger.info("π Initializing Mistral Multimodal Agent (Open Source)...")
# Load environment variables from .env file
self._load_env_file()
# Initialize response formatter
self._init_response_formatter()
# Initialize Mistral client
self.mistral_client = None
self.mistral_api_key = get_api_key('mistral')
if self.mistral_api_key and MISTRAL_AVAILABLE and MistralClient:
try:
if MISTRAL_CLIENT_TYPE == "new":
# New API initialization
self.mistral_client = MistralClient(api_key=self.mistral_api_key)
logger.info("β
Mistral client initialized (new API)")
else:
# Old API initialization (deprecated)
self.mistral_client = MistralClient(api_key=self.mistral_api_key)
logger.info("β
Mistral client initialized (old API - deprecated)")
except Exception as e:
if not should_suppress_warnings():
logger.warning(f"β οΈ Mistral client initialization failed: {e}")
else:
if not should_suppress_warnings():
if not MISTRAL_AVAILABLE:
logger.info("βΉοΈ Mistral library not available - using fallback models")
elif not self.mistral_api_key:
logger.info("βΉοΈ MISTRAL_API_KEY not found - using open-source alternatives")
# Initialize open-source models
self.whisper_model = None
self.vision_pipeline = None
self.document_pipeline = None
self._init_open_source_models()
# Track available capabilities
self.capabilities = self._assess_capabilities()
# Build tools list for AGNO registration
tools = [
self.analyze_image,
self.transcribe_audio,
self.analyze_document
]
# Initialize the toolkit with auto-registration enabled
super().__init__(name="multimodal_tools", tools=tools)
logger.info("β
Mistral Multimodal Agent initialized")
logger.info(f"π Available capabilities: {list(self.capabilities.keys())}")
logger.info(f"π§ Registered AGNO tools: {[tool.__name__ for tool in tools]}")
def _load_env_file(self):
"""Load environment variables from .env file if it exists."""
from pathlib import Path
env_file = Path('.env')
if env_file.exists():
with open(env_file, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
os.environ[key.strip()] = value.strip()
logger.info("β
Environment variables loaded from .env file")
# Reload the environment manager to pick up new variables
from utils.environment_setup import env_manager
env_manager._load_environment()
def _init_response_formatter(self):
"""Initialize response formatter for consistent output."""
format_config = FormatConfig(
format_standard=FormatStandard.HF_EVALUATION,
remove_markdown=True,
remove_prefixes=True,
strip_whitespace=True,
normalize_spaces=True
)
self.response_formatter = ResponseFormatter(config=format_config)
def _init_open_source_models(self):
"""Initialize open-source models for multimodal capabilities."""
# Initialize Faster-Whisper (European community-driven alternative)
self.whisper_model = None
if FASTER_WHISPER_AVAILABLE:
try:
# Use CPU-optimized configuration for European deployment
self.whisper_model = faster_whisper.WhisperModel(
"base", # Lightweight model for efficiency
device="cpu", # CPU-friendly for European servers
compute_type="int8", # Memory-efficient quantization
num_workers=1 # Conservative resource usage
)
logger.info("β
Faster-Whisper loaded (European community-driven alternative)")
logger.info("πͺπΊ Using CPU-optimized configuration for European deployment")
except Exception as e:
logger.warning(f"β οΈ Faster-Whisper loading failed: {e}")
if not self.whisper_model:
logger.warning("β οΈ No audio transcription available")
logger.info("π‘ Install: pip install faster-whisper (European community alternative)")
# Initialize vision pipeline using open models
if TRANSFORMERS_AVAILABLE:
try:
# Use BLIP-2 for image captioning (open source)
self.vision_pipeline = pipeline(
"image-to-text",
model="Salesforce/blip-image-captioning-base",
device=0 if torch.cuda.is_available() else -1
)
logger.info("β
Vision pipeline initialized (BLIP-2)")
except Exception as e:
logger.warning(f"β οΈ Vision pipeline initialization failed: {e}")
try:
# Document analysis pipeline
self.document_pipeline = pipeline(
"question-answering",
model="distilbert-base-cased-distilled-squad"
)
logger.info("β
Document analysis pipeline initialized")
except Exception as e:
logger.warning(f"β οΈ Document pipeline initialization failed: {e}")
def _assess_capabilities(self) -> Dict[str, bool]:
"""Assess what multimodal capabilities are available."""
return {
'text_generation': self.mistral_client is not None,
'image_analysis': self.vision_pipeline is not None or self.mistral_client is not None,
'audio_transcription': self.whisper_model is not None,
'document_analysis': self.document_pipeline is not None,
'vision_reasoning': self.mistral_client is not None, # Mistral Vision
}
def analyze_image(self, image_input: Union[str, bytes, Image.Image, dict], question: str = None) -> str:
"""
Analyze an image using open-source models.
Args:
image_input: Image file path, bytes, PIL Image, or dict with file_path
question: Optional specific question about the image
Returns:
Analysis result as string
"""
try:
# Convert input to PIL Image
if isinstance(image_input, dict):
# Handle AGNO tool format: {'file_path': 'image.png'}
if 'file_path' in image_input:
image_path = image_input['file_path']
if os.path.exists(image_path):
image = Image.open(image_path)
else:
return f"Error: Image file not found: {image_path}"
else:
return "Error: Dictionary input must contain 'file_path' key"
elif isinstance(image_input, str):
if os.path.exists(image_input):
image = Image.open(image_input)
else:
# Assume it's a URL
response = requests.get(image_input)
image = Image.open(io.BytesIO(response.content))
elif isinstance(image_input, bytes):
image = Image.open(io.BytesIO(image_input))
elif isinstance(image_input, Image.Image):
image = image_input
else:
return "Error: Unsupported image input format"
# Try Mistral Vision first (if available)
if self.mistral_client and question:
try:
result = self._analyze_with_mistral_vision(image, question)
if result:
return result
except Exception as e:
logger.warning(f"Mistral Vision failed: {e}")
# Fallback to open-source vision pipeline
if self.vision_pipeline:
try:
# Generate image caption
caption_result = self.vision_pipeline(image)
caption = caption_result[0]['generated_text'] if caption_result else "Unable to generate caption"
if question:
# Use Mistral to reason about the image based on caption
if self.mistral_client:
reasoning_prompt = f"""
Image Description: {caption}
Question: {question}
Based on the image description, please answer the question about the image.
"""
if MISTRAL_CLIENT_TYPE == "new":
response = self.mistral_client.chat.complete(
model="mistral-large-latest",
messages=[UserMessage(content=reasoning_prompt)]
)
else:
# Old API format (deprecated)
response = self.mistral_client.chat(
model="mistral-large-latest",
messages=[UserMessage(content=reasoning_prompt)]
)
return response.choices[0].message.content
else:
return f"Image shows: {caption}. Question: {question} (Unable to reason without Mistral API)"
else:
return f"Image analysis: {caption}"
except Exception as e:
logger.error(f"Vision pipeline failed: {e}")
return f"Error analyzing image: {e}"
return "Error: No image analysis capabilities available"
except Exception as e:
logger.error(f"Image analysis failed: {e}")
return f"Error: {e}"
def _analyze_with_mistral_vision(self, image: Image.Image, question: str) -> Optional[str]:
"""
Analyze image using Mistral Vision model.
Args:
image: PIL Image object
question: Question about the image
Returns:
Analysis result or None if failed
"""
try:
# Convert image to base64
buffer = io.BytesIO()
image.save(buffer, format='PNG')
image_b64 = base64.b64encode(buffer.getvalue()).decode()
# Create message with image - compatible with both API versions
messages = [
UserMessage(
content=[
{
"type": "text",
"text": question
},
{
"type": "image_url",
"image_url": f"data:image/png;base64,{image_b64}"
}
]
)
]
# Use Mistral Vision model - different API call formats
if MISTRAL_CLIENT_TYPE == "new":
response = self.mistral_client.chat.complete(
model="pixtral-12b-2409", # Mistral's vision model
messages=messages
)
else:
# Old API format (deprecated)
response = self.mistral_client.chat(
model="pixtral-12b-2409", # Mistral's vision model
messages=messages
)
return response.choices[0].message.content
except Exception as e:
logger.warning(f"Mistral Vision analysis failed: {e}")
return None
def transcribe_audio(self, audio_input: Union[str, bytes, dict]) -> str:
"""
Transcribe audio using Faster-Whisper (European community-driven alternative).
Args:
audio_input: Audio file path, bytes, or dict with 'file_path' key
Returns:
Transcription text
"""
if not self.whisper_model:
return "Error: Audio transcription not available (Faster-Whisper not loaded)"
try:
# Handle different input types from AGNO framework
if isinstance(audio_input, dict):
# AGNO passes {'file_path': '/path/to/file'}
if 'file_path' in audio_input:
file_path = audio_input['file_path']
else:
return "Error: Invalid audio input format - expected 'file_path' key in dict"
elif isinstance(audio_input, str):
# Direct file path
file_path = audio_input
elif isinstance(audio_input, bytes):
# Handle bytes input - save to temporary file
import tempfile
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp:
tmp.write(audio_input)
tmp.flush()
file_path = tmp.name
else:
return f"Error: Unsupported audio input type: {type(audio_input)}"
# Transcribe using Faster-Whisper
segments, info = self.whisper_model.transcribe(file_path)
transcription = " ".join([segment.text for segment in segments])
# Clean up temporary file if we created one
if isinstance(audio_input, bytes):
os.unlink(file_path)
logger.info(f"πͺπΊ Audio transcribed using Faster-Whisper (European community)")
return transcription.strip()
except Exception as e:
logger.error(f"Audio transcription failed: {e}")
return f"Error transcribing audio: {e}"
def analyze_document(self, document_text: str, question: str) -> str:
"""
Analyze document content and answer questions.
Args:
document_text: Text content of document
question: Question about the document
Returns:
Answer based on document analysis
"""
try:
# Use Mistral for complex reasoning if available
if self.mistral_client:
prompt = f"""
Document Content:
{document_text[:4000]} # Limit length
Question: {question}
Please analyze the document and answer the question based on the content provided.
"""
if MISTRAL_CLIENT_TYPE == "new":
response = self.mistral_client.chat.complete(
model="mistral-large-latest",
messages=[UserMessage(content=prompt)]
)
else:
# Old API format (deprecated)
response = self.mistral_client.chat(
model="mistral-large-latest",
messages=[UserMessage(content=prompt)]
)
return response.choices[0].message.content
# Fallback to simple QA pipeline
elif self.document_pipeline:
result = self.document_pipeline(
question=question,
context=document_text[:1000] # Limit context length
)
return result['answer']
else:
return "Error: Document analysis not available"
except Exception as e:
logger.error(f"Document analysis failed: {e}")
return f"Error analyzing document: {e}"
def generate_text(self, prompt: str, max_tokens: int = 500) -> str:
"""
Generate text using Mistral model.
Args:
prompt: Input prompt
max_tokens: Maximum tokens to generate
Returns:
Generated text
"""
if not self.mistral_client:
return "Error: Text generation not available (Mistral API key required)"
try:
if MISTRAL_CLIENT_TYPE == "new":
response = self.mistral_client.chat.complete(
model="mistral-large-latest",
messages=[UserMessage(content=prompt)],
max_tokens=max_tokens
)
else:
# Old API format (deprecated)
response = self.mistral_client.chat(
model="mistral-large-latest",
messages=[UserMessage(content=prompt)],
max_tokens=max_tokens
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"Text generation failed: {e}")
return f"Error generating text: {e}"
def __call__(self, question: str, **kwargs) -> str:
"""
Main interface for the multimodal agent.
Args:
question: User question/request
**kwargs: Additional parameters (image, audio, document, etc.)
Returns:
Formatted response
"""
try:
logger.info(f"π€ Processing multimodal question: {question[:100]}...")
# Check for multimodal inputs
if 'image' in kwargs:
result = self.analyze_image(kwargs['image'], question)
elif 'audio' in kwargs:
# First transcribe, then process
transcription = self.transcribe_audio(kwargs['audio'])
combined_question = f"Audio transcription: {transcription}\nQuestion: {question}"
result = self.generate_text(combined_question)
elif 'document' in kwargs:
result = self.analyze_document(kwargs['document'], question)
else:
# Pure text generation
result = self.generate_text(question)
# Format response
formatted_result = self.response_formatter.format_response(
result,
response_type=ResponseType.DIRECT_ANSWER
)
logger.info(f"π€ Mistral Multimodal Agent response: {formatted_result[:100]}...")
return formatted_result
except Exception as e:
logger.error(f"Multimodal processing failed: {e}")
return "Error processing multimodal request"
def get_capabilities_status(self) -> Dict[str, Any]:
"""Get detailed status of multimodal capabilities."""
return {
'agent_type': 'mistral_multimodal',
'capabilities': self.capabilities,
'models': {
'text_generation': 'mistral-large-latest' if self.mistral_client else None,
'vision': 'pixtral-12b-2409' if self.mistral_client else 'BLIP-2',
'audio': 'faster-whisper-base' if self.whisper_model else None,
'document_qa': 'distilbert-base-cased' if self.document_pipeline else None,
},
'dependencies': {
'mistral_api': self.mistral_client is not None,
'whisper': FASTER_WHISPER_AVAILABLE and self.whisper_model is not None,
'transformers': TRANSFORMERS_AVAILABLE,
'vision_pipeline': self.vision_pipeline is not None,
}
}
# Convenience function for easy import
def create_mistral_multimodal_agent():
"""Create and return an open-source multimodal tools instance."""
return OpenSourceMultimodalTools()
def create_open_source_multimodal_tools():
"""Create and return an open-source multimodal tools instance."""
return OpenSourceMultimodalTools() |