dubswayAgenticV2 / app /utils /enhanced_analysis.py
peace2024's picture
update chat vector
1abe985
import os
import logging
import asyncio
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime
import cv2
import numpy as np
from PIL import Image
import torch
from transformers import pipeline, AutoFeatureExtractor, AutoModelForImageClassification
from faster_whisper import WhisperModel
# LangChain imports for advanced RAG
from langchain.agents import Tool, AgentExecutor, create_openai_functions_agent
from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, AIMessage
from langchain.tools import BaseTool
from langchain_core.callbacks import BaseCallbackHandler
# MCP/ACP inspired components
from langchain_community.tools import DuckDuckGoSearchRun
from langchain_community.utilities import WikipediaAPIWrapper
logger = logging.getLogger("app.utils.enhanced_analysis")
@dataclass
class VideoFrame:
"""Represents a video frame with metadata"""
timestamp: float
frame_number: int
image: np.ndarray
objects: List[Dict[str, Any]]
scene_description: str
emotions: List[Dict[str, float]]
text_ocr: str
@dataclass
class AudioSegment:
"""Represents an audio segment with analysis"""
start_time: float
end_time: float
text: str
language: str
confidence: float
emotions: Dict[str, float]
speaker_id: Optional[str] = None
@dataclass
class EnhancedAnalysis:
"""Comprehensive video analysis result"""
video_metadata: Dict[str, Any]
audio_analysis: List[AudioSegment]
visual_analysis: List[VideoFrame]
content_summary: str
key_moments: List[Dict[str, Any]]
topics: List[str]
sentiment_analysis: Dict[str, float]
formatted_report: str
class MultiModalAnalyzer:
"""Advanced multi-modal video analyzer with MCP/ACP capabilities using Groq"""
def __init__(self, groq_api_key: str = None):
self.whisper_model = WhisperModel("base", device="cuda" if torch.cuda.is_available() else "cpu")
# Visual analysis models
self.object_detector = pipeline("object-detection", model="facebook/detr-resnet-50")
self.image_classifier = pipeline("image-classification", model="microsoft/resnet-50")
self.ocr_reader = pipeline("image-to-text", model="Salesforce/blip-image-captioning-base")
# Audio analysis
self.audio_classifier = pipeline("audio-classification", model="facebook/wav2vec2-base")
# LLM for advanced reasoning - using Groq with Llama3
groq_api_key = groq_api_key or os.getenv("GROQ_API_KEY")
if not groq_api_key:
raise ValueError("GROQ_API_KEY environment variable is required")
self.llm = ChatGroq(
groq_api_key=groq_api_key,
model_name="llama-3.3-70b-versatile",
temperature=0.1,
max_tokens=2000
)
# Agent tools
self.search_tool = DuckDuckGoSearchRun()
self.wikipedia_tool = WikipediaAPIWrapper()
# Initialize agent
self.agent = self._create_agent()
def _create_agent(self):
"""Create an agent with tools for enhanced analysis"""
tools = [
Tool(
name="web_search",
func=self.search_tool.run,
description="Search the web for additional context about topics, people, or concepts mentioned in the video"
),
Tool(
name="wikipedia_lookup",
func=self.wikipedia_tool.run,
description="Look up detailed information on Wikipedia about topics mentioned in the video"
),
Tool(
name="analyze_sentiment",
func=self._analyze_sentiment,
description="Analyze the sentiment and emotional tone of text content"
),
Tool(
name="extract_key_topics",
func=self._extract_key_topics,
description="Extract key topics and themes from text content"
)
]
prompt = ChatPromptTemplate.from_messages([
("system", """You are an expert video content analyst with access to multiple tools for enhanced analysis.
Your capabilities include:
- Web search for additional context
- Wikipedia lookups for detailed information
- Sentiment analysis
- Topic extraction and categorization
Analyze the provided video content comprehensively and provide insights that go beyond basic transcription.
Consider context, cultural references, technical details, and broader implications.
Provide detailed, well-structured analysis with clear sections and actionable insights."""),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
agent = create_openai_functions_agent(self.llm, tools, prompt)
return AgentExecutor(agent=agent, tools=tools, verbose=True)
async def analyze_video_frames(self, video_path: str, sample_rate: int = 30) -> List[VideoFrame]:
"""Extract and analyze video frames at regular intervals"""
frames = []
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps
frame_interval = int(fps / sample_rate) # Sample every N frames
frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
timestamp = frame_count / fps
# Convert BGR to RGB
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(rgb_frame)
# Object detection
objects = self.object_detector(pil_image)
# Image classification
classification = self.image_classifier(pil_image)
# OCR for text in frame
try:
ocr_result = self.ocr_reader(pil_image)
text_ocr = ocr_result[0]['generated_text'] if ocr_result else ""
except:
text_ocr = ""
# Scene description
scene_description = self._generate_scene_description(objects, classification)
video_frame = VideoFrame(
timestamp=timestamp,
frame_number=frame_count,
image=frame,
objects=objects,
scene_description=scene_description,
emotions=[], # Will be enhanced with emotion detection
text_ocr=text_ocr
)
frames.append(video_frame)
frame_count += 1
cap.release()
return frames
def _generate_scene_description(self, objects: List[Dict], classification: List[Dict]) -> str:
"""Generate natural language description of scene"""
object_names = [obj['label'] for obj in objects[:5]] # Top 5 objects
scene_type = classification[0]['label'] if classification else "general"
if object_names:
return f"Scene shows {', '.join(object_names)} in a {scene_type} setting"
else:
return f"Scene appears to be {scene_type}"
async def analyze_audio_enhanced(self, video_path: str) -> List[AudioSegment]:
"""Enhanced audio analysis with emotion detection and speaker identification"""
segments, info = self.whisper_model.transcribe(video_path)
audio_segments = []
for segment in segments:
# Enhanced emotion analysis (placeholder - would integrate with emotion detection model)
emotions = {
"neutral": 0.5,
"happy": 0.2,
"sad": 0.1,
"angry": 0.1,
"surprised": 0.1
}
audio_segment = AudioSegment(
start_time=segment.start,
end_time=segment.end,
text=segment.text,
language=info.language if info else "unknown",
confidence=segment.avg_logprob,
emotions=emotions
)
audio_segments.append(audio_segment)
return audio_segments
async def generate_enhanced_summary(self, audio_segments: List[AudioSegment],
video_frames: List[VideoFrame]) -> str:
"""Generate enhanced summary using agent capabilities"""
# Prepare context for agent
audio_text = " ".join([seg.text for seg in audio_segments])
visual_context = " ".join([frame.scene_description for frame in video_frames[:10]]) # First 10 frames
context = f"""
Video Content Analysis:
AUDIO TRANSCRIPT:
{audio_text}
VISUAL CONTENT:
{visual_context}
Please provide a comprehensive analysis including:
1. Key topics and themes
2. Sentiment analysis
3. Important visual elements
4. Cultural or technical context
5. Key moments and insights
Format your response in a clear, structured manner with sections and bullet points.
"""
try:
result = await self.agent.ainvoke({"input": context})
return result["output"]
except Exception as e:
logger.error(f"Agent analysis failed: {e}")
# Fallback to simple summary
return f"Analysis of video content. Audio: {audio_text[:200]}... Visual: {visual_context[:200]}..."
def _analyze_sentiment(self, text: str) -> Dict[str, float]:
"""Analyze sentiment of text content"""
# This would integrate with a proper sentiment analysis model
return {
"positive": 0.6,
"negative": 0.2,
"neutral": 0.2
}
def _extract_key_topics(self, text: str) -> List[str]:
"""Extract key topics from text"""
# This would use topic modeling or keyword extraction
return ["technology", "innovation", "business", "future"]
async def create_beautiful_report(self, analysis: EnhancedAnalysis) -> str:
"""Generate a beautifully formatted report"""
report_template = f"""
# πŸ“Ή Video Analysis Report
## πŸ“Š Overview
- **Duration**: {analysis.video_metadata.get('duration', 'Unknown')} seconds
- **Resolution**: {analysis.video_metadata.get('resolution', 'Unknown')}
- **Language**: {analysis.audio_analysis[0].language if analysis.audio_analysis else 'Unknown'}
## 🎡 Audio Analysis
### Transcription Summary
{analysis.content_summary}
### Key Audio Segments
{self._format_audio_segments(analysis.audio_analysis)}
## 🎬 Visual Analysis
### Scene Breakdown
{self._format_visual_analysis(analysis.visual_analysis)}
### Key Visual Elements
{self._format_key_elements(analysis.visual_analysis)}
## 🎯 Key Insights
### Topics Covered
{self._format_topics(analysis.topics)}
### Sentiment Analysis
{self._format_sentiment(analysis.sentiment_analysis)}
### Important Moments
{self._format_key_moments(analysis.key_moments)}
## πŸ“ˆ Recommendations
Based on the analysis, consider:
- Content engagement opportunities
- Areas for improvement
- Target audience insights
---
*Report generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} using Groq llama-3.3-70b-versatile*
"""
return report_template
def _format_audio_segments(self, segments: List[AudioSegment]) -> str:
"""Format audio segments for report"""
formatted = []
for seg in segments[:5]: # Top 5 segments
formatted.append(f"- **{seg.start_time:.1f}s - {seg.end_time:.1f}s**: {seg.text}")
return "\n".join(formatted)
def _format_visual_analysis(self, frames: List[VideoFrame]) -> str:
"""Format visual analysis for report"""
formatted = []
for frame in frames[:5]: # Top 5 frames
formatted.append(f"- **{frame.timestamp:.1f}s**: {frame.scene_description}")
return "\n".join(formatted)
def _format_key_elements(self, frames: List[VideoFrame]) -> str:
"""Format key visual elements"""
all_objects = []
for frame in frames:
all_objects.extend([obj['label'] for obj in frame.objects])
# Count and get most common objects
from collections import Counter
object_counts = Counter(all_objects)
top_objects = object_counts.most_common(5)
formatted = []
for obj, count in top_objects:
formatted.append(f"- **{obj}**: appears {count} times")
return "\n".join(formatted)
def _format_topics(self, topics: List[str]) -> str:
"""Format topics for report"""
return "\n".join([f"- {topic}" for topic in topics])
def _format_sentiment(self, sentiment: Dict[str, float]) -> str:
"""Format sentiment analysis"""
return f"""
- **Positive**: {sentiment.get('positive', 0):.1%}
- **Negative**: {sentiment.get('negative', 0):.1%}
- **Neutral**: {sentiment.get('neutral', 0):.1%}
"""
def _format_key_moments(self, moments: List[Dict[str, Any]]) -> str:
"""Format key moments"""
formatted = []
for moment in moments:
formatted.append(f"- **{moment.get('timestamp', 'Unknown')}s**: {moment.get('description', 'Unknown')}")
return "\n".join(formatted)
# Usage example
async def analyze_video_enhanced(video_path: str, groq_api_key: str = None) -> EnhancedAnalysis:
"""Main function for enhanced video analysis using Groq"""
analyzer = MultiModalAnalyzer(groq_api_key=groq_api_key)
# Parallel analysis
audio_task = analyzer.analyze_audio_enhanced(video_path)
visual_task = analyzer.analyze_video_frames(video_path)
audio_segments, video_frames = await asyncio.gather(audio_task, visual_task)
# Generate enhanced summary
content_summary = await analyzer.generate_enhanced_summary(audio_segments, video_frames)
# Create analysis object
analysis = EnhancedAnalysis(
video_metadata={"duration": len(audio_segments) * 30, "resolution": "1920x1080"},
audio_analysis=audio_segments,
visual_analysis=video_frames,
content_summary=content_summary,
key_moments=[{"timestamp": 0, "description": "Video start"}],
topics=["technology", "innovation"],
sentiment_analysis={"positive": 0.6, "negative": 0.2, "neutral": 0.2},
formatted_report=""
)
# Generate beautiful report
analysis.formatted_report = await analyzer.create_beautiful_report(analysis)
return analysis