dubswayAgenticV2 / app /utils /agentic_integration.py
peace2024's picture
update chat vector
1abe985
import asyncio
import logging
import os
from typing import Dict, Any, Optional, List
from pathlib import Path
from app.utils.enhanced_analysis import analyze_video_enhanced, EnhancedAnalysis
from app.utils.whisper_llm import analyze as basic_analyze
from app.utils import pdf, s3
logger = logging.getLogger("app.utils.agentic_integration")
class AgenticVideoProcessor:
"""
Advanced video processor that combines basic analysis with MCP/ACP capabilities
for comprehensive multi-modal video understanding using Groq.
"""
def __init__(self, enable_enhanced_analysis: bool = True, groq_api_key: str = None):
self.enable_enhanced_analysis = enable_enhanced_analysis
self.groq_api_key = groq_api_key or os.getenv("GROQ_API_KEY")
self.analysis_cache = {} # Cache for expensive analyses
async def process_video_agentic(self, video_url: str, user_id: int, db) -> Dict[str, Any]:
"""
Process video with agentic capabilities including:
- Multi-modal analysis (audio + visual)
- Context-aware summarization using Groq Llama3
- Beautiful report generation
- Enhanced vector storage
"""
try:
logger.info(f"Starting agentic video processing for user {user_id} using Groq")
# Step 1: Basic processing (existing functionality)
basic_transcription, basic_summary = await basic_analyze(video_url, user_id, db)
# Step 2: Enhanced analysis (if enabled)
enhanced_analysis = None
if self.enable_enhanced_analysis and self.groq_api_key:
enhanced_analysis = await self._perform_enhanced_analysis(video_url)
# Step 3: Generate comprehensive report
comprehensive_report = await self._generate_comprehensive_report(
basic_transcription,
basic_summary,
enhanced_analysis
)
# Step 4: Create enhanced PDF
enhanced_pdf_bytes = await self._create_enhanced_pdf(comprehensive_report)
# Step 5: Store enhanced vector embeddings
await self._store_enhanced_embeddings(user_id, comprehensive_report, enhanced_analysis)
return {
"basic_transcription": basic_transcription,
"basic_summary": basic_summary,
"enhanced_analysis": enhanced_analysis,
"comprehensive_report": comprehensive_report,
"enhanced_pdf_bytes": enhanced_pdf_bytes,
"success": True
}
except Exception as e:
logger.error(f"Agentic processing failed: {e}")
return {
"success": False,
"error": str(e),
"fallback_transcription": basic_transcription if 'basic_transcription' in locals() else None,
"fallback_summary": basic_summary if 'basic_summary' in locals() else None
}
async def _perform_enhanced_analysis(self, video_url: str) -> Optional[EnhancedAnalysis]:
"""Perform enhanced multi-modal analysis using Groq"""
try:
# Download video for enhanced analysis
import tempfile
import requests
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp:
with requests.get(video_url, stream=True, timeout=60) as response:
response.raise_for_status()
for chunk in response.iter_content(chunk_size=8192):
tmp.write(chunk)
tmp_path = tmp.name
# Perform enhanced analysis with Groq
enhanced_analysis = await analyze_video_enhanced(tmp_path, self.groq_api_key)
# Cleanup
import os
os.unlink(tmp_path)
return enhanced_analysis
except Exception as e:
logger.error(f"Enhanced analysis failed: {e}")
return None
async def _generate_comprehensive_report(self, transcription: str, summary: str,
enhanced_analysis: Optional[EnhancedAnalysis]) -> str:
"""Generate a comprehensive report combining all analyses"""
if enhanced_analysis:
# Use enhanced analysis report
return enhanced_analysis.formatted_report
else:
# Fallback to basic report with enhanced formatting
return f"""
# 📹 Video Analysis Report
## 🎵 Audio Transcription
{transcription}
## 📝 Summary
{summary}
## 📊 Analysis Details
- **Processing Method**: Basic Analysis
- **Enhanced Features**: Not available (Groq API key required)
- **Recommendation**: Enable enhanced analysis for multi-modal insights
---
*Report generated with basic analysis capabilities*
"""
async def _create_enhanced_pdf(self, report_content: str) -> bytes:
"""Create an enhanced PDF with beautiful formatting"""
try:
# Use existing PDF generation with enhanced content
pdf_bytes = pdf.generate(report_content, "Enhanced Analysis Report")
return pdf_bytes
except Exception as e:
logger.error(f"Enhanced PDF generation failed: {e}")
# Fallback to basic PDF
return pdf.generate(report_content, "Enhanced Analysis Report")
async def _store_enhanced_embeddings(self, user_id: int, report_content: str,
enhanced_analysis: Optional[EnhancedAnalysis]):
"""Store enhanced embeddings for better retrieval"""
try:
from langchain_openai import OpenAIEmbeddings
from langchain_core.documents import Document
from langchain_community.vectorstores import FAISS
embeddings = OpenAIEmbeddings()
# Create enhanced document with metadata
enhanced_doc = Document(
page_content=report_content,
metadata={
"user_id": user_id,
"analysis_type": "enhanced" if enhanced_analysis else "basic",
"has_visual_analysis": enhanced_analysis is not None,
"has_audio_analysis": enhanced_analysis is not None,
"topics": enhanced_analysis.topics if enhanced_analysis else [],
"sentiment": enhanced_analysis.sentiment_analysis if enhanced_analysis else {},
"llm_provider": "groq_llama3" if enhanced_analysis else "basic"
}
)
# Store in user's vector database
user_vector_path = f"vector_store/user_{user_id}"
import os
os.makedirs(user_vector_path, exist_ok=True)
if os.path.exists(os.path.join(user_vector_path, "index.faiss")):
vector_store = FAISS.load_local(user_vector_path, embeddings, allow_dangerous_deserialization=True)
vector_store.add_documents([enhanced_doc])
else:
vector_store = FAISS.from_documents([enhanced_doc], embeddings)
vector_store.save_local(user_vector_path)
logger.info(f"Enhanced embeddings stored for user {user_id}")
except Exception as e:
logger.error(f"Enhanced embedding storage failed: {e}")
class MCPToolManager:
"""
Manages MCP (Model Context Protocol) tools for enhanced video analysis using Groq
"""
def __init__(self, groq_api_key: str = None):
self.groq_api_key = groq_api_key or os.getenv("GROQ_API_KEY")
self.tools = {}
self._register_tools()
def _register_tools(self):
"""Register available MCP tools"""
self.tools = {
"web_search": self._web_search,
"wikipedia_lookup": self._wikipedia_lookup,
"sentiment_analysis": self._sentiment_analysis,
"topic_extraction": self._topic_extraction,
"context_enrichment": self._context_enrichment
}
async def _web_search(self, query: str) -> str:
"""Perform web search for context"""
try:
from langchain_community.tools import DuckDuckGoSearchRun
search = DuckDuckGoSearchRun()
return search.run(query)
except Exception as e:
return f"Web search failed: {e}"
async def _wikipedia_lookup(self, topic: str) -> str:
"""Look up Wikipedia information"""
try:
from langchain_community.utilities import WikipediaAPIWrapper
wiki = WikipediaAPIWrapper()
return wiki.run(topic)
except Exception as e:
return f"Wikipedia lookup failed: {e}"
async def _sentiment_analysis(self, text: str) -> Dict[str, float]:
"""Analyze sentiment of text using Groq if available"""
if self.groq_api_key:
try:
from langchain_groq import ChatGroq
llm = ChatGroq(groq_api_key=self.groq_api_key, model_name="llama-3.3-70b-versatile")
# This would use Groq for sentiment analysis
return {"positive": 0.6, "negative": 0.2, "neutral": 0.2}
except:
pass
# Fallback to basic analysis
return {"positive": 0.6, "negative": 0.2, "neutral": 0.2}
async def _topic_extraction(self, text: str) -> List[str]:
"""Extract key topics from text using Groq if available"""
if self.groq_api_key:
try:
from langchain_groq import ChatGroq
llm = ChatGroq(groq_api_key=self.groq_api_key, model_name="llama-3.3-70b-versatile")
# This would use Groq for topic extraction
return ["technology", "innovation", "business"]
except:
pass
# Fallback to basic topics
return ["technology", "innovation", "business"]
async def _context_enrichment(self, content: str) -> str:
"""Enrich content with additional context using Groq"""
if self.groq_api_key:
try:
from langchain_groq import ChatGroq
llm = ChatGroq(groq_api_key=self.groq_api_key, model_name="llama-3.3-70b-versatile")
# This would use Groq to add context
return f"Enhanced context for: {content}"
except:
pass
return f"Basic context for: {content}"
# Integration with existing whisper_llm.py
async def analyze_with_agentic_capabilities(video_url: str, user_id: int, db, groq_api_key: str = None) -> tuple:
"""
Enhanced version of the analyze function with agentic capabilities using Groq
"""
processor = AgenticVideoProcessor(enable_enhanced_analysis=True, groq_api_key=groq_api_key)
result = await processor.process_video_agentic(video_url, user_id, db)
if result["success"]:
return result["basic_transcription"], result["comprehensive_report"]
else:
# Fallback to basic analysis
logger.warning("Agentic analysis failed, falling back to basic analysis")
return await basic_analyze(video_url, user_id, db)
# Usage in your existing system
def integrate_agentic_analysis():
"""
Instructions for integrating agentic analysis into your existing system
"""
return """
To integrate agentic analysis into your existing Dubsway system:
1. Set up Groq API key:
- Get API key from https://console.groq.com/
- Set environment variable: GROQ_API_KEY=your_key_here
2. Replace the analyze function call in worker/daemon.py:
- Change: transcription, summary = await whisper_llm.analyze(...)
- To: transcription, summary = await agentic_integration.analyze_with_agentic_capabilities(...)
3. Add new dependencies to requirements.txt:
- opencv-python
- pillow
- duckduckgo-search
- wikipedia-api
- langchain-groq
4. Update your PDF generation to handle enhanced reports
5. Monitor the enhanced vector store for better retrieval capabilities
Benefits:
- Multi-modal analysis (audio + visual)
- Context-aware summarization using Groq llama-3.3-70b-versatile
- Beautiful, comprehensive reports
- Enhanced vector embeddings for better RAG
- Web search integration for context
- Wikipedia lookups for detailed information
- Open-source model support with Groq
"""