import os import logging import asyncio import json from typing import Dict, List, Any, Optional from dataclasses import dataclass from datetime import datetime import cv2 import numpy as np from PIL import Image import torch from transformers import pipeline, AutoFeatureExtractor, AutoModelForImageClassification from faster_whisper import WhisperModel # LangChain imports for advanced RAG from langchain.agents import Tool, AgentExecutor, create_openai_functions_agent from langchain_groq import ChatGroq from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.messages import HumanMessage, AIMessage from langchain.tools import BaseTool from langchain_core.callbacks import BaseCallbackHandler # MCP/ACP inspired components from langchain_community.tools import DuckDuckGoSearchRun from langchain_community.utilities import WikipediaAPIWrapper logger = logging.getLogger("app.utils.enhanced_analysis") @dataclass class VideoFrame: """Represents a video frame with metadata""" timestamp: float frame_number: int image: np.ndarray objects: List[Dict[str, Any]] scene_description: str emotions: List[Dict[str, float]] text_ocr: str @dataclass class AudioSegment: """Represents an audio segment with analysis""" start_time: float end_time: float text: str language: str confidence: float emotions: Dict[str, float] speaker_id: Optional[str] = None @dataclass class EnhancedAnalysis: """Comprehensive video analysis result""" video_metadata: Dict[str, Any] audio_analysis: List[AudioSegment] visual_analysis: List[VideoFrame] content_summary: str key_moments: List[Dict[str, Any]] topics: List[str] sentiment_analysis: Dict[str, float] formatted_report: str class MultiModalAnalyzer: """Advanced multi-modal video analyzer with MCP/ACP capabilities using Groq""" def __init__(self, groq_api_key: str = None): self.whisper_model = WhisperModel("base", device="cuda" if torch.cuda.is_available() else "cpu") # Visual analysis models self.object_detector = pipeline("object-detection", model="facebook/detr-resnet-50") self.image_classifier = pipeline("image-classification", model="microsoft/resnet-50") self.ocr_reader = pipeline("image-to-text", model="Salesforce/blip-image-captioning-base") # Audio analysis self.audio_classifier = pipeline("audio-classification", model="facebook/wav2vec2-base") # LLM for advanced reasoning - using Groq with Llama3 groq_api_key = groq_api_key or os.getenv("GROQ_API_KEY") if not groq_api_key: raise ValueError("GROQ_API_KEY environment variable is required") self.llm = ChatGroq( groq_api_key=groq_api_key, model_name="llama-3.3-70b-versatile", temperature=0.1, max_tokens=2000 ) # Agent tools self.search_tool = DuckDuckGoSearchRun() self.wikipedia_tool = WikipediaAPIWrapper() # Initialize agent self.agent = self._create_agent() def _create_agent(self): """Create an agent with tools for enhanced analysis""" tools = [ Tool( name="web_search", func=self.search_tool.run, description="Search the web for additional context about topics, people, or concepts mentioned in the video" ), Tool( name="wikipedia_lookup", func=self.wikipedia_tool.run, description="Look up detailed information on Wikipedia about topics mentioned in the video" ), Tool( name="analyze_sentiment", func=self._analyze_sentiment, description="Analyze the sentiment and emotional tone of text content" ), Tool( name="extract_key_topics", func=self._extract_key_topics, description="Extract key topics and themes from text content" ) ] prompt = ChatPromptTemplate.from_messages([ ("system", """You are an expert video content analyst with access to multiple tools for enhanced analysis. Your capabilities include: - Web search for additional context - Wikipedia lookups for detailed information - Sentiment analysis - Topic extraction and categorization Analyze the provided video content comprehensively and provide insights that go beyond basic transcription. Consider context, cultural references, technical details, and broader implications. Provide detailed, well-structured analysis with clear sections and actionable insights."""), MessagesPlaceholder(variable_name="chat_history"), ("human", "{input}"), MessagesPlaceholder(variable_name="agent_scratchpad"), ]) agent = create_openai_functions_agent(self.llm, tools, prompt) return AgentExecutor(agent=agent, tools=tools, verbose=True) async def analyze_video_frames(self, video_path: str, sample_rate: int = 30) -> List[VideoFrame]: """Extract and analyze video frames at regular intervals""" frames = [] cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps frame_interval = int(fps / sample_rate) # Sample every N frames frame_count = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break if frame_count % frame_interval == 0: timestamp = frame_count / fps # Convert BGR to RGB rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(rgb_frame) # Object detection objects = self.object_detector(pil_image) # Image classification classification = self.image_classifier(pil_image) # OCR for text in frame try: ocr_result = self.ocr_reader(pil_image) text_ocr = ocr_result[0]['generated_text'] if ocr_result else "" except: text_ocr = "" # Scene description scene_description = self._generate_scene_description(objects, classification) video_frame = VideoFrame( timestamp=timestamp, frame_number=frame_count, image=frame, objects=objects, scene_description=scene_description, emotions=[], # Will be enhanced with emotion detection text_ocr=text_ocr ) frames.append(video_frame) frame_count += 1 cap.release() return frames def _generate_scene_description(self, objects: List[Dict], classification: List[Dict]) -> str: """Generate natural language description of scene""" object_names = [obj['label'] for obj in objects[:5]] # Top 5 objects scene_type = classification[0]['label'] if classification else "general" if object_names: return f"Scene shows {', '.join(object_names)} in a {scene_type} setting" else: return f"Scene appears to be {scene_type}" async def analyze_audio_enhanced(self, video_path: str) -> List[AudioSegment]: """Enhanced audio analysis with emotion detection and speaker identification""" segments, info = self.whisper_model.transcribe(video_path) audio_segments = [] for segment in segments: # Enhanced emotion analysis (placeholder - would integrate with emotion detection model) emotions = { "neutral": 0.5, "happy": 0.2, "sad": 0.1, "angry": 0.1, "surprised": 0.1 } audio_segment = AudioSegment( start_time=segment.start, end_time=segment.end, text=segment.text, language=info.language if info else "unknown", confidence=segment.avg_logprob, emotions=emotions ) audio_segments.append(audio_segment) return audio_segments async def generate_enhanced_summary(self, audio_segments: List[AudioSegment], video_frames: List[VideoFrame]) -> str: """Generate enhanced summary using agent capabilities""" # Prepare context for agent audio_text = " ".join([seg.text for seg in audio_segments]) visual_context = " ".join([frame.scene_description for frame in video_frames[:10]]) # First 10 frames context = f""" Video Content Analysis: AUDIO TRANSCRIPT: {audio_text} VISUAL CONTENT: {visual_context} Please provide a comprehensive analysis including: 1. Key topics and themes 2. Sentiment analysis 3. Important visual elements 4. Cultural or technical context 5. Key moments and insights Format your response in a clear, structured manner with sections and bullet points. """ try: result = await self.agent.ainvoke({"input": context}) return result["output"] except Exception as e: logger.error(f"Agent analysis failed: {e}") # Fallback to simple summary return f"Analysis of video content. Audio: {audio_text[:200]}... Visual: {visual_context[:200]}..." def _analyze_sentiment(self, text: str) -> Dict[str, float]: """Analyze sentiment of text content""" # This would integrate with a proper sentiment analysis model return { "positive": 0.6, "negative": 0.2, "neutral": 0.2 } def _extract_key_topics(self, text: str) -> List[str]: """Extract key topics from text""" # This would use topic modeling or keyword extraction return ["technology", "innovation", "business", "future"] async def create_beautiful_report(self, analysis: EnhancedAnalysis) -> str: """Generate a beautifully formatted report""" report_template = f""" # 📹 Video Analysis Report ## 📊 Overview - **Duration**: {analysis.video_metadata.get('duration', 'Unknown')} seconds - **Resolution**: {analysis.video_metadata.get('resolution', 'Unknown')} - **Language**: {analysis.audio_analysis[0].language if analysis.audio_analysis else 'Unknown'} ## 🎵 Audio Analysis ### Transcription Summary {analysis.content_summary} ### Key Audio Segments {self._format_audio_segments(analysis.audio_analysis)} ## 🎬 Visual Analysis ### Scene Breakdown {self._format_visual_analysis(analysis.visual_analysis)} ### Key Visual Elements {self._format_key_elements(analysis.visual_analysis)} ## 🎯 Key Insights ### Topics Covered {self._format_topics(analysis.topics)} ### Sentiment Analysis {self._format_sentiment(analysis.sentiment_analysis)} ### Important Moments {self._format_key_moments(analysis.key_moments)} ## 📈 Recommendations Based on the analysis, consider: - Content engagement opportunities - Areas for improvement - Target audience insights --- *Report generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} using Groq llama-3.3-70b-versatile* """ return report_template def _format_audio_segments(self, segments: List[AudioSegment]) -> str: """Format audio segments for report""" formatted = [] for seg in segments[:5]: # Top 5 segments formatted.append(f"- **{seg.start_time:.1f}s - {seg.end_time:.1f}s**: {seg.text}") return "\n".join(formatted) def _format_visual_analysis(self, frames: List[VideoFrame]) -> str: """Format visual analysis for report""" formatted = [] for frame in frames[:5]: # Top 5 frames formatted.append(f"- **{frame.timestamp:.1f}s**: {frame.scene_description}") return "\n".join(formatted) def _format_key_elements(self, frames: List[VideoFrame]) -> str: """Format key visual elements""" all_objects = [] for frame in frames: all_objects.extend([obj['label'] for obj in frame.objects]) # Count and get most common objects from collections import Counter object_counts = Counter(all_objects) top_objects = object_counts.most_common(5) formatted = [] for obj, count in top_objects: formatted.append(f"- **{obj}**: appears {count} times") return "\n".join(formatted) def _format_topics(self, topics: List[str]) -> str: """Format topics for report""" return "\n".join([f"- {topic}" for topic in topics]) def _format_sentiment(self, sentiment: Dict[str, float]) -> str: """Format sentiment analysis""" return f""" - **Positive**: {sentiment.get('positive', 0):.1%} - **Negative**: {sentiment.get('negative', 0):.1%} - **Neutral**: {sentiment.get('neutral', 0):.1%} """ def _format_key_moments(self, moments: List[Dict[str, Any]]) -> str: """Format key moments""" formatted = [] for moment in moments: formatted.append(f"- **{moment.get('timestamp', 'Unknown')}s**: {moment.get('description', 'Unknown')}") return "\n".join(formatted) # Usage example async def analyze_video_enhanced(video_path: str, groq_api_key: str = None) -> EnhancedAnalysis: """Main function for enhanced video analysis using Groq""" analyzer = MultiModalAnalyzer(groq_api_key=groq_api_key) # Parallel analysis audio_task = analyzer.analyze_audio_enhanced(video_path) visual_task = analyzer.analyze_video_frames(video_path) audio_segments, video_frames = await asyncio.gather(audio_task, visual_task) # Generate enhanced summary content_summary = await analyzer.generate_enhanced_summary(audio_segments, video_frames) # Create analysis object analysis = EnhancedAnalysis( video_metadata={"duration": len(audio_segments) * 30, "resolution": "1920x1080"}, audio_analysis=audio_segments, visual_analysis=video_frames, content_summary=content_summary, key_moments=[{"timestamp": 0, "description": "Video start"}], topics=["technology", "innovation"], sentiment_analysis={"positive": 0.6, "negative": 0.2, "neutral": 0.2}, formatted_report="" ) # Generate beautiful report analysis.formatted_report = await analyzer.create_beautiful_report(analysis) return analysis