dubswayAgenticV2 / app /utils /lightweight_agentic.py
peace2024's picture
update chat vector
1abe985
import asyncio
import logging
import os
from typing import Dict, Any, Optional, List
from pathlib import Path
from app.utils.whisper_llm import analyze as basic_analyze
from app.utils import pdf, s3
logger = logging.getLogger("app.utils.lightweight_agentic")
class LightweightAgenticProcessor:
"""
Lightweight agentic processor that uses Groq for enhanced analysis
without heavy computer vision models that can cause hanging.
"""
def __init__(self, enable_enhanced_analysis: bool = True, groq_api_key: str = None):
self.enable_enhanced_analysis = enable_enhanced_analysis
self.groq_api_key = groq_api_key or os.getenv("GROQ_API_KEY")
self.analysis_cache = {}
async def process_video_lightweight(self, video_url: str, user_id: int, db) -> Dict[str, Any]:
"""
Process video with lightweight agentic capabilities using only Groq
"""
try:
logger.info(f"Starting lightweight agentic video processing for user {user_id}")
# Step 1: Basic processing (existing functionality)
basic_transcription, basic_summary = await basic_analyze(video_url, user_id, db)
# Step 2: Enhanced analysis with Groq only (no heavy CV models)
enhanced_analysis = None
if self.enable_enhanced_analysis and self.groq_api_key:
enhanced_analysis = await self._perform_lightweight_analysis(basic_transcription, basic_summary)
# Step 3: Generate comprehensive report
comprehensive_report = await self._generate_lightweight_report(
basic_transcription,
basic_summary,
enhanced_analysis
)
# Step 4: Create enhanced PDF
enhanced_pdf_bytes = await self._create_enhanced_pdf(comprehensive_report)
# Step 5: Store enhanced vector embeddings
await self._store_enhanced_embeddings(user_id, comprehensive_report, enhanced_analysis)
return {
"basic_transcription": basic_transcription,
"basic_summary": basic_summary,
"enhanced_analysis": enhanced_analysis,
"comprehensive_report": comprehensive_report,
"enhanced_pdf_bytes": enhanced_pdf_bytes,
"success": True
}
except Exception as e:
logger.error(f"Lightweight agentic processing failed: {e}")
return {
"success": False,
"error": str(e),
"fallback_transcription": basic_transcription if 'basic_transcription' in locals() else None,
"fallback_summary": basic_summary if 'basic_summary' in locals() else None
}
async def _perform_lightweight_analysis(self, transcription: str, summary: str) -> Optional[Dict[str, Any]]:
"""Perform lightweight enhanced analysis using only Groq"""
try:
from langchain_groq import ChatGroq
# Initialize Groq
llm = ChatGroq(
groq_api_key=self.groq_api_key,
model_name="llama-3.3-70b-versatile",
temperature=0.1,
max_tokens=1000
)
# Create enhanced analysis prompt
analysis_prompt = f"""
Analyze this video content and provide enhanced insights:
TRANSCRIPTION:
{transcription}
BASIC SUMMARY:
{summary}
Please provide:
1. Key topics and themes
2. Sentiment analysis
3. Important insights
4. Recommendations
5. Context and implications
Format your response in a clear, structured manner.
"""
# Get enhanced analysis
response = await llm.ainvoke(analysis_prompt)
enhanced_analysis = response.content
return {
"enhanced_analysis": enhanced_analysis,
"topics": ["technology", "innovation", "business"], # Placeholder
"sentiment": {"positive": 0.6, "negative": 0.2, "neutral": 0.2}, # Placeholder
"key_insights": enhanced_analysis[:200] + "..." if len(enhanced_analysis) > 200 else enhanced_analysis
}
except Exception as e:
logger.error(f"Lightweight analysis failed: {e}")
return None
async def _generate_lightweight_report(self, transcription: str, summary: str,
enhanced_analysis: Optional[Dict[str, Any]]) -> str:
"""Generate a lightweight comprehensive report"""
if enhanced_analysis:
return f"""
# πŸ“Ή Video Analysis Report (Enhanced with Groq)
## 🎡 Audio Transcription
{transcription}
## πŸ“ Basic Summary
{summary}
## πŸ€– Enhanced Analysis (Groq llama-3.3-70b-versatile)
{enhanced_analysis.get('enhanced_analysis', 'Analysis not available')}
## 🎯 Key Insights
{enhanced_analysis.get('key_insights', 'No additional insights available')}
## πŸ“Š Analysis Details
- **Processing Method**: Lightweight Agentic Analysis
- **LLM Provider**: Groq llama-3.3-70b-versatile
- **Enhanced Features**: Text-based analysis and reasoning
- **Topics**: {', '.join(enhanced_analysis.get('topics', ['General']))}
- **Sentiment**: {enhanced_analysis.get('sentiment', {})}
---
*Report generated using Groq llama-3.3-70b-versatile*
"""
else:
return f"""
# πŸ“Ή Video Analysis Report
## 🎡 Audio Transcription
{transcription}
## πŸ“ Summary
{summary}
## πŸ“Š Analysis Details
- **Processing Method**: Basic Analysis
- **Enhanced Features**: Not available (Groq API key required)
- **Recommendation**: Enable enhanced analysis for intelligent insights
---
*Report generated with basic analysis capabilities*
"""
async def _create_enhanced_pdf(self, report_content: str) -> bytes:
"""Create an enhanced PDF with beautiful formatting"""
try:
# Use existing PDF generation
pdf_bytes = pdf.generate(report_content, "Enhanced Analysis Report")
return pdf_bytes
except Exception as e:
logger.error(f"Enhanced PDF generation failed: {e}")
# Fallback to basic PDF
return pdf.generate(report_content, "Enhanced Analysis Report")
async def _store_enhanced_embeddings(self, user_id: int, report_content: str,
enhanced_analysis: Optional[Dict[str, Any]]):
"""Store enhanced embeddings for better retrieval"""
try:
from langchain_openai import OpenAIEmbeddings
from langchain_core.documents import Document
from langchain_community.vectorstores import FAISS
embeddings = OpenAIEmbeddings()
# Create enhanced document with metadata
enhanced_doc = Document(
page_content=report_content,
metadata={
"user_id": user_id,
"analysis_type": "lightweight_enhanced" if enhanced_analysis else "basic",
"has_enhanced_analysis": enhanced_analysis is not None,
"topics": enhanced_analysis.get('topics', []) if enhanced_analysis else [],
"sentiment": enhanced_analysis.get('sentiment', {}) if enhanced_analysis else {},
"llm_provider": "groq_llama3" if enhanced_analysis else "basic"
}
)
# Store in user's vector database
user_vector_path = f"vector_store/user_{user_id}"
os.makedirs(user_vector_path, exist_ok=True)
if os.path.exists(os.path.join(user_vector_path, "index.faiss")):
vector_store = FAISS.load_local(user_vector_path, embeddings, allow_dangerous_deserialization=True)
vector_store.add_documents([enhanced_doc])
else:
vector_store = FAISS.from_documents([enhanced_doc], embeddings)
vector_store.save_local(user_vector_path)
logger.info(f"Enhanced embeddings stored for user {user_id}")
except Exception as e:
logger.error(f"Enhanced embedding storage failed: {e}")
# Integration with existing whisper_llm.py
async def analyze_with_lightweight_agentic(video_url: str, user_id: int, db, groq_api_key: str = None) -> tuple:
"""
Lightweight version of the analyze function with agentic capabilities using Groq
"""
processor = LightweightAgenticProcessor(enable_enhanced_analysis=True, groq_api_key=groq_api_key)
result = await processor.process_video_lightweight(video_url, user_id, db)
if result["success"]:
return result["basic_transcription"], result["comprehensive_report"]
else:
# Fallback to basic analysis
logger.warning("Lightweight agentic analysis failed, falling back to basic analysis")
return await basic_analyze(video_url, user_id, db)