Spaces:
Sleeping
Sleeping
| """ | |
| Advanced Video Analyzer for GAIA Agent - Phase 5 | |
| Comprehensive video analysis tool for YouTube videos with object detection and temporal tracking. | |
| Features: | |
| - YouTube video downloading and processing | |
| - Advanced object detection using YOLO models | |
| - Bird and animal species identification | |
| - Temporal object tracking across frames | |
| - Simultaneous object counting | |
| - Integration with AGNO framework | |
| """ | |
| import os | |
| import logging | |
| import cv2 | |
| import numpy as np | |
| from typing import Dict, Any, List, Optional, Tuple | |
| import json | |
| import tempfile | |
| import shutil | |
| from pathlib import Path | |
| from datetime import datetime | |
| import yt_dlp | |
| # Import detection engines | |
| try: | |
| from .object_detection_engine import ObjectDetectionEngine | |
| from .video_content_analyzer import create_video_content_analyzer | |
| except ImportError: | |
| try: | |
| from object_detection_engine import ObjectDetectionEngine | |
| from video_content_analyzer import create_video_content_analyzer | |
| except ImportError: | |
| ObjectDetectionEngine = None | |
| create_video_content_analyzer = None | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| class AdvancedVideoAnalyzer: | |
| """Advanced video analyzer for comprehensive video content analysis.""" | |
| def __init__(self): | |
| """Initialize the advanced video analyzer.""" | |
| self.available = True | |
| self.temp_dir = tempfile.mkdtemp() | |
| # Initialize detection engine | |
| self.detection_engine = None | |
| if ObjectDetectionEngine: | |
| try: | |
| self.detection_engine = ObjectDetectionEngine() | |
| if not self.detection_engine.available: | |
| logger.warning("⚠️ Object detection engine not available") | |
| except Exception as e: | |
| logger.warning(f"⚠️ Failed to initialize object detection engine: {e}") | |
| # Initialize content analyzer | |
| self.content_analyzer = None | |
| if create_video_content_analyzer: | |
| try: | |
| self.content_analyzer = create_video_content_analyzer() | |
| if not self.content_analyzer.available: | |
| logger.warning("⚠️ Video content analyzer not available") | |
| except Exception as e: | |
| logger.warning(f"⚠️ Failed to initialize video content analyzer: {e}") | |
| # Analysis parameters | |
| self.frame_sampling_rate = 1 # Analyze every frame by default | |
| self.max_frames = 1000 # Maximum frames to analyze | |
| self.confidence_threshold = 0.3 | |
| self.nms_threshold = 0.4 | |
| logger.info(f"📹 Advanced Video Analyzer initialized - Available: {self.available}") | |
| def analyze_video(self, video_url: str, question: str = None, | |
| max_duration: int = 300) -> Dict[str, Any]: | |
| """ | |
| Analyze a video comprehensively for object detection and counting. | |
| Args: | |
| video_url: URL of the video (YouTube supported) | |
| question: Optional question to guide analysis | |
| max_duration: Maximum video duration to process (seconds) | |
| Returns: | |
| Comprehensive video analysis results | |
| """ | |
| try: | |
| logger.info(f"📹 Starting video analysis for: {video_url}") | |
| # Download video | |
| video_path = self._download_video(video_url, max_duration) | |
| if not video_path: | |
| return { | |
| 'success': False, | |
| 'error': 'Failed to download video' | |
| } | |
| # Extract video metadata | |
| metadata = self._extract_video_metadata(video_path) | |
| # Perform frame-by-frame object detection | |
| detection_results = self._analyze_video_frames(video_path, question) | |
| # Perform content analysis | |
| content_analysis = None | |
| if self.content_analyzer: | |
| content_analysis = self.content_analyzer.analyze_video_content( | |
| video_path, detection_results.get('frame_detections', []), question | |
| ) | |
| # Generate comprehensive analysis report | |
| analysis_report = self._create_analysis_report( | |
| video_url, metadata, detection_results, content_analysis, question | |
| ) | |
| # Cleanup | |
| self._cleanup_temp_files(video_path) | |
| return analysis_report | |
| except Exception as e: | |
| logger.error(f"❌ Video analysis failed: {e}") | |
| return { | |
| 'success': False, | |
| 'error': f'Video analysis failed: {str(e)}' | |
| } | |
| def _download_video(self, video_url: str, max_duration: int = 300) -> Optional[str]: | |
| """Download video from URL using yt-dlp.""" | |
| try: | |
| output_path = os.path.join(self.temp_dir, 'video.%(ext)s') | |
| ydl_opts = { | |
| 'format': 'best[height<=720][ext=mp4]/best[ext=mp4]/best', | |
| 'outtmpl': output_path, | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'extract_flat': False, | |
| 'writethumbnail': False, | |
| 'writeinfojson': False, | |
| 'match_filter': lambda info_dict: None if info_dict.get('duration', 0) <= max_duration else "Video too long" | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| # Extract info first to check duration | |
| info = ydl.extract_info(video_url, download=False) | |
| duration = info.get('duration', 0) | |
| if duration > max_duration: | |
| logger.warning(f"⚠️ Video duration ({duration}s) exceeds maximum ({max_duration}s)") | |
| return None | |
| # Download the video | |
| ydl.download([video_url]) | |
| # Find the downloaded file | |
| for file in os.listdir(self.temp_dir): | |
| if file.startswith('video.') and file.endswith(('.mp4', '.webm', '.mkv')): | |
| video_path = os.path.join(self.temp_dir, file) | |
| logger.info(f"✅ Video downloaded: {video_path}") | |
| return video_path | |
| logger.error("❌ Downloaded video file not found") | |
| return None | |
| except Exception as e: | |
| logger.error(f"❌ Video download failed: {e}") | |
| return None | |
| def _extract_video_metadata(self, video_path: str) -> Dict[str, Any]: | |
| """Extract video metadata using OpenCV.""" | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise Exception("Failed to open video file") | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| duration = frame_count / fps if fps > 0 else 0 | |
| cap.release() | |
| metadata = { | |
| 'duration_seconds': duration, | |
| 'fps': fps, | |
| 'frame_count': frame_count, | |
| 'resolution': {'width': width, 'height': height}, | |
| 'file_size': os.path.getsize(video_path), | |
| 'analysis_timestamp': datetime.now().isoformat() | |
| } | |
| logger.info(f"📊 Video metadata: {duration:.1f}s, {width}x{height}, {fps:.1f} FPS") | |
| return metadata | |
| except Exception as e: | |
| logger.error(f"❌ Failed to extract video metadata: {e}") | |
| return {} | |
| def _analyze_video_frames(self, video_path: str, question: str = None) -> Dict[str, Any]: | |
| """Analyze video frames for object detection and tracking.""" | |
| try: | |
| if not self.detection_engine or not self.detection_engine.available: | |
| logger.warning("⚠️ Object detection engine not available") | |
| return {'frame_detections': [], 'summary': {}} | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise Exception("Failed to open video file") | |
| frame_detections = [] | |
| frame_count = 0 | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| # Determine frame sampling rate based on video length | |
| if total_frames > self.max_frames: | |
| self.frame_sampling_rate = max(1, total_frames // self.max_frames) | |
| logger.info(f"📊 Sampling every {self.frame_sampling_rate} frames") | |
| # Track objects across frames | |
| object_tracker = {} | |
| next_object_id = 0 | |
| while cap.isOpened() and frame_count < total_frames: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Sample frames based on sampling rate | |
| if frame_count % self.frame_sampling_rate == 0: | |
| # Detect objects in frame | |
| detections = self.detection_engine.detect_objects( | |
| frame, | |
| confidence_threshold=self.confidence_threshold, | |
| nms_threshold=self.nms_threshold | |
| ) | |
| # Add temporal information | |
| timestamp = frame_count / fps | |
| for detection in detections: | |
| detection['frame_number'] = frame_count | |
| detection['timestamp'] = timestamp | |
| frame_detections.append(detections) | |
| # Progress logging | |
| if len(frame_detections) % 50 == 0: | |
| progress = (frame_count / total_frames) * 100 | |
| logger.info(f"📈 Analysis progress: {progress:.1f}% ({len(frame_detections)} frames analyzed)") | |
| frame_count += 1 | |
| # Break if we've analyzed enough frames | |
| if len(frame_detections) >= self.max_frames: | |
| break | |
| cap.release() | |
| # Generate detection summary | |
| summary = self._generate_detection_summary(frame_detections, question) | |
| logger.info(f"✅ Frame analysis complete: {len(frame_detections)} frames analyzed") | |
| return { | |
| 'frame_detections': frame_detections, | |
| 'summary': summary, | |
| 'analysis_params': { | |
| 'frame_sampling_rate': self.frame_sampling_rate, | |
| 'confidence_threshold': self.confidence_threshold, | |
| 'nms_threshold': self.nms_threshold, | |
| 'frames_analyzed': len(frame_detections) | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Frame analysis failed: {e}") | |
| return {'frame_detections': [], 'summary': {}} | |
| def _generate_detection_summary(self, frame_detections: List[List[Dict[str, Any]]], | |
| question: str = None) -> Dict[str, Any]: | |
| """Generate summary of detection results.""" | |
| try: | |
| summary = { | |
| 'total_frames_analyzed': len(frame_detections), | |
| 'total_detections': 0, | |
| 'species_counts': {}, | |
| 'max_simultaneous_objects': 0, | |
| 'max_simultaneous_birds': 0, | |
| 'max_simultaneous_animals': 0, | |
| 'temporal_patterns': [], | |
| 'answer_analysis': {} | |
| } | |
| # Analyze each frame | |
| simultaneous_counts = [] | |
| bird_counts = [] | |
| animal_counts = [] | |
| for frame_dets in frame_detections: | |
| summary['total_detections'] += len(frame_dets) | |
| # Count objects by type | |
| frame_birds = 0 | |
| frame_animals = 0 | |
| frame_objects = len(frame_dets) | |
| for detection in frame_dets: | |
| species_type = detection.get('species_type', 'unknown') | |
| class_name = detection.get('class', 'unknown') | |
| # Update species counts | |
| if species_type not in summary['species_counts']: | |
| summary['species_counts'][species_type] = 0 | |
| summary['species_counts'][species_type] += 1 | |
| # Count birds and animals | |
| if species_type == 'bird': | |
| frame_birds += 1 | |
| elif species_type == 'animal': | |
| frame_animals += 1 | |
| simultaneous_counts.append(frame_objects) | |
| bird_counts.append(frame_birds) | |
| animal_counts.append(frame_animals) | |
| # Calculate maximums | |
| if simultaneous_counts: | |
| summary['max_simultaneous_objects'] = max(simultaneous_counts) | |
| if bird_counts: | |
| summary['max_simultaneous_birds'] = max(bird_counts) | |
| if animal_counts: | |
| summary['max_simultaneous_animals'] = max(animal_counts) | |
| # Analyze question-specific patterns | |
| if question: | |
| summary['answer_analysis'] = self._analyze_question_specific_patterns( | |
| question, frame_detections, bird_counts, animal_counts | |
| ) | |
| # Generate temporal patterns | |
| summary['temporal_patterns'] = { | |
| 'avg_objects_per_frame': np.mean(simultaneous_counts) if simultaneous_counts else 0, | |
| 'avg_birds_per_frame': np.mean(bird_counts) if bird_counts else 0, | |
| 'avg_animals_per_frame': np.mean(animal_counts) if animal_counts else 0, | |
| 'object_variance': np.var(simultaneous_counts) if simultaneous_counts else 0 | |
| } | |
| return summary | |
| except Exception as e: | |
| logger.error(f"❌ Detection summary generation failed: {e}") | |
| return {} | |
| def _analyze_question_specific_patterns(self, question: str, | |
| frame_detections: List[List[Dict[str, Any]]], | |
| bird_counts: List[int], | |
| animal_counts: List[int]) -> Dict[str, Any]: | |
| """Analyze patterns specific to the question asked.""" | |
| try: | |
| analysis = { | |
| 'question_type': 'unknown', | |
| 'target_answer': None, | |
| 'confidence': 0.0, | |
| 'reasoning': [] | |
| } | |
| question_lower = question.lower() | |
| # Detect question type and provide specific analysis | |
| if 'bird' in question_lower and ('highest' in question_lower or 'maximum' in question_lower): | |
| analysis['question_type'] = 'max_birds_simultaneous' | |
| analysis['target_answer'] = max(bird_counts) if bird_counts else 0 | |
| analysis['confidence'] = 0.9 if bird_counts else 0.1 | |
| analysis['reasoning'].append(f"Maximum simultaneous birds detected: {analysis['target_answer']}") | |
| # Find frames with maximum birds | |
| max_bird_count = analysis['target_answer'] | |
| max_frames = [i for i, count in enumerate(bird_counts) if count == max_bird_count] | |
| analysis['reasoning'].append(f"Maximum occurred in {len(max_frames)} frame(s)") | |
| elif 'animal' in question_lower and ('highest' in question_lower or 'maximum' in question_lower): | |
| analysis['question_type'] = 'max_animals_simultaneous' | |
| analysis['target_answer'] = max(animal_counts) if animal_counts else 0 | |
| analysis['confidence'] = 0.9 if animal_counts else 0.1 | |
| analysis['reasoning'].append(f"Maximum simultaneous animals detected: {analysis['target_answer']}") | |
| elif 'species' in question_lower and ('highest' in question_lower or 'maximum' in question_lower): | |
| analysis['question_type'] = 'max_species_simultaneous' | |
| # For species counting, we need to count unique species per frame | |
| max_species = 0 | |
| for frame_dets in frame_detections: | |
| unique_species = set() | |
| for det in frame_dets: | |
| species_type = det.get('species_type', 'unknown') | |
| if species_type in ['bird', 'animal']: | |
| class_name = det.get('class', 'unknown') | |
| unique_species.add(class_name) | |
| max_species = max(max_species, len(unique_species)) | |
| analysis['target_answer'] = max_species | |
| analysis['confidence'] = 0.8 if max_species > 0 else 0.1 | |
| analysis['reasoning'].append(f"Maximum simultaneous species detected: {analysis['target_answer']}") | |
| return analysis | |
| except Exception as e: | |
| logger.error(f"❌ Question-specific analysis failed: {e}") | |
| return {'question_type': 'unknown', 'target_answer': None, 'confidence': 0.0} | |
| def _create_analysis_report(self, video_url: str, metadata: Dict[str, Any], | |
| detection_results: Dict[str, Any], | |
| content_analysis: Dict[str, Any] = None, | |
| question: str = None) -> Dict[str, Any]: | |
| """Create comprehensive analysis report.""" | |
| try: | |
| report = { | |
| 'success': True, | |
| 'video_url': video_url, | |
| 'question': question, | |
| 'analysis_timestamp': datetime.now().isoformat(), | |
| 'metadata': metadata, | |
| 'detection_results': detection_results, | |
| 'content_analysis': content_analysis, | |
| 'final_answer': None, | |
| 'confidence': 0.0, | |
| 'reasoning': [] | |
| } | |
| # Extract final answer from detection summary | |
| summary = detection_results.get('summary', {}) | |
| answer_analysis = summary.get('answer_analysis', {}) | |
| if answer_analysis.get('target_answer') is not None: | |
| report['final_answer'] = answer_analysis['target_answer'] | |
| report['confidence'] = answer_analysis.get('confidence', 0.0) | |
| report['reasoning'] = answer_analysis.get('reasoning', []) | |
| else: | |
| # Fallback to general analysis | |
| if question and 'bird' in question.lower(): | |
| report['final_answer'] = summary.get('max_simultaneous_birds', 0) | |
| report['confidence'] = 0.7 | |
| report['reasoning'] = [f"Maximum simultaneous birds detected: {report['final_answer']}"] | |
| elif question and 'animal' in question.lower(): | |
| report['final_answer'] = summary.get('max_simultaneous_animals', 0) | |
| report['confidence'] = 0.7 | |
| report['reasoning'] = [f"Maximum simultaneous animals detected: {report['final_answer']}"] | |
| else: | |
| report['final_answer'] = summary.get('max_simultaneous_objects', 0) | |
| report['confidence'] = 0.5 | |
| report['reasoning'] = [f"Maximum simultaneous objects detected: {report['final_answer']}"] | |
| # Add analysis insights | |
| insights = [] | |
| if summary.get('total_frames_analyzed', 0) > 0: | |
| insights.append(f"Analyzed {summary['total_frames_analyzed']} frames") | |
| if summary.get('total_detections', 0) > 0: | |
| insights.append(f"Total detections: {summary['total_detections']}") | |
| if summary.get('species_counts'): | |
| species_info = ", ".join([f"{k}: {v}" for k, v in summary['species_counts'].items()]) | |
| insights.append(f"Species distribution: {species_info}") | |
| report['insights'] = insights | |
| logger.info("📊 Analysis report generated successfully") | |
| return report | |
| except Exception as e: | |
| logger.error(f"❌ Failed to create analysis report: {e}") | |
| return { | |
| 'success': False, | |
| 'error': f'Failed to create analysis report: {str(e)}' | |
| } | |
| def _cleanup_temp_files(self, video_path: str = None): | |
| """Clean up temporary files.""" | |
| try: | |
| if video_path and os.path.exists(video_path): | |
| os.remove(video_path) | |
| # Clean up temp directory if it exists and is empty | |
| if os.path.exists(self.temp_dir): | |
| try: | |
| os.rmdir(self.temp_dir) | |
| except OSError: | |
| # Directory not empty, clean up individual files | |
| shutil.rmtree(self.temp_dir, ignore_errors=True) | |
| except Exception as e: | |
| logger.warning(f"⚠️ Cleanup failed: {e}") | |
| def get_capabilities(self) -> Dict[str, Any]: | |
| """Get video analyzer capabilities.""" | |
| return { | |
| 'available': self.available, | |
| 'detection_engine_available': self.detection_engine is not None and self.detection_engine.available, | |
| 'content_analyzer_available': self.content_analyzer is not None and self.content_analyzer.available, | |
| 'supported_formats': ['YouTube URLs', 'MP4', 'WebM', 'MKV'], | |
| 'max_duration': 300, | |
| 'max_frames': self.max_frames, | |
| 'features': [ | |
| 'YouTube video downloading', | |
| 'Object detection and classification', | |
| 'Bird and animal species identification', | |
| 'Temporal object tracking', | |
| 'Simultaneous object counting', | |
| 'Content analysis and summarization', | |
| 'Question-specific analysis' | |
| ] | |
| } | |
| # AGNO Framework Integration Functions | |
| def get_advanced_video_analysis_tools() -> List[AdvancedVideoAnalyzer]: | |
| """Get advanced video analysis tools for AGNO framework integration.""" | |
| try: | |
| analyzer = AdvancedVideoAnalyzer() | |
| if analyzer.available: | |
| return [analyzer] | |
| else: | |
| logger.warning("⚠️ Advanced video analyzer not available") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"❌ Failed to create advanced video analysis tools: {e}") | |
| return [] | |
| if __name__ == "__main__": | |
| # Test the advanced video analyzer | |
| analyzer = AdvancedVideoAnalyzer() | |
| print(f"Video analyzer available: {analyzer.available}") | |
| print(f"Capabilities: {json.dumps(analyzer.get_capabilities(), indent=2)}") | |
| # Test with a sample YouTube video (if available) | |
| test_url = "https://www.youtube.com/watch?v=L1vXCYZAYYM" | |
| test_question = "What is the highest number of bird species to be on camera simultaneously?" | |
| print(f"\nTesting with: {test_url}") | |
| print(f"Question: {test_question}") | |
| # Note: Actual testing would require running the analyzer | |
| # result = analyzer.analyze_video(test_url, test_question) | |
| # print(f"Result: {json.dumps(result, indent=2)}") |