Spaces:
Running
Running
""" | |
Object Detection Engine for GAIA Agent - Phase 5 | |
Provides robust object detection, classification, and tracking capabilities. | |
Features: | |
- Pre-trained model integration (YOLO, DETR, etc.) | |
- Custom object classification for animals/birds | |
- Bounding box detection and tracking | |
- Confidence scoring for detections | |
- Multi-class object recognition | |
- Temporal consistency validation | |
""" | |
import os | |
import logging | |
import numpy as np | |
import cv2 | |
from typing import Dict, Any, List, Optional, Tuple | |
import torch | |
from PIL import Image | |
import json | |
from pathlib import Path | |
# Configure logging | |
logger = logging.getLogger(__name__) | |
class ObjectDetectionEngine: | |
"""Advanced object detection engine with multiple model support.""" | |
def __init__(self): | |
"""Initialize the object detection engine.""" | |
self.available = False | |
self.primary_detector = None | |
self.fallback_detector = None | |
self.class_mappings = {} | |
self.confidence_threshold = 0.3 | |
self.nms_threshold = 0.4 | |
# Initialize detection models | |
self._init_detection_models() | |
self._init_class_mappings() | |
logger.info(f"🔍 Object Detection Engine initialized - Available: {self.available}") | |
def _init_detection_models(self): | |
"""Initialize object detection models in order of preference.""" | |
# Try YOLO first (best performance) | |
if self._init_yolo(): | |
self.available = True | |
return | |
# Try OpenCV DNN as fallback | |
if self._init_opencv_dnn(): | |
self.available = True | |
return | |
# Try basic computer vision as last resort | |
if self._init_basic_cv(): | |
self.available = True | |
return | |
logger.error("❌ No object detection models available") | |
def _init_yolo(self) -> bool: | |
"""Initialize YOLO object detection.""" | |
try: | |
from ultralytics import YOLO | |
# Try different YOLO models in order of preference | |
models_to_try = ['yolov8n.pt', 'yolov8s.pt', 'yolov5n.pt'] | |
for model_name in models_to_try: | |
try: | |
self.primary_detector = YOLO(model_name) | |
self.detector_type = 'yolo' | |
logger.info(f"✅ YOLO model initialized: {model_name}") | |
return True | |
except Exception as e: | |
logger.warning(f"⚠️ Failed to load {model_name}: {e}") | |
continue | |
return False | |
except ImportError: | |
logger.warning("⚠️ ultralytics not available") | |
return False | |
except Exception as e: | |
logger.warning(f"⚠️ YOLO initialization failed: {e}") | |
return False | |
def _init_opencv_dnn(self) -> bool: | |
"""Initialize OpenCV DNN-based detection.""" | |
try: | |
# Use OpenCV's DNN module with COCO-trained models | |
self.primary_detector = 'opencv_dnn' | |
self.detector_type = 'opencv_dnn' | |
# COCO class names | |
self.coco_classes = [ | |
'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', | |
'boat', 'traffic light', 'fire hydrant', 'stop sign', 'parking meter', 'bench', | |
'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', | |
'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', | |
'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', | |
'skateboard', 'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup', | |
'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', | |
'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch', | |
'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', | |
'remote', 'keyboard', 'cell phone', 'microwave', 'oven', 'toaster', 'sink', | |
'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', | |
'toothbrush' | |
] | |
logger.info("✅ OpenCV DNN detection initialized") | |
return True | |
except Exception as e: | |
logger.warning(f"⚠️ OpenCV DNN initialization failed: {e}") | |
return False | |
def _init_basic_cv(self) -> bool: | |
"""Initialize basic computer vision detection.""" | |
try: | |
self.primary_detector = 'basic_cv' | |
self.detector_type = 'basic_cv' | |
logger.info("✅ Basic computer vision detection initialized") | |
return True | |
except Exception as e: | |
logger.warning(f"⚠️ Basic CV initialization failed: {e}") | |
return False | |
def _init_class_mappings(self): | |
"""Initialize class mappings for species identification.""" | |
self.class_mappings = { | |
'birds': { | |
'bird': ['bird', 'eagle', 'hawk', 'owl', 'duck', 'goose', 'swan'], | |
'waterfowl': ['duck', 'goose', 'swan'], | |
'raptors': ['eagle', 'hawk', 'owl', 'falcon'], | |
'songbirds': ['sparrow', 'robin', 'finch', 'cardinal'], | |
'corvids': ['crow', 'raven', 'magpie', 'jay'] | |
}, | |
'animals': { | |
'mammals': ['cat', 'dog', 'horse', 'cow', 'sheep', 'pig'], | |
'wild_mammals': ['deer', 'bear', 'wolf', 'fox', 'rabbit'], | |
'large_mammals': ['elephant', 'giraffe', 'zebra', 'rhinoceros'], | |
'domestic': ['cat', 'dog', 'horse', 'cow', 'sheep', 'pig'] | |
}, | |
'confidence_weights': { | |
'bird': 1.0, | |
'cat': 0.9, | |
'dog': 0.9, | |
'horse': 0.8, | |
'cow': 0.8, | |
'sheep': 0.8, | |
'elephant': 0.9, | |
'bear': 0.8, | |
'zebra': 0.8, | |
'giraffe': 0.8 | |
} | |
} | |
def detect_objects(self, image: np.ndarray, | |
confidence_threshold: Optional[float] = None) -> List[Dict[str, Any]]: | |
""" | |
Detect objects in an image. | |
Args: | |
image: Input image as numpy array | |
confidence_threshold: Minimum confidence for detections | |
Returns: | |
List of detection dictionaries | |
""" | |
if not self.available: | |
return [] | |
threshold = confidence_threshold or self.confidence_threshold | |
try: | |
if self.detector_type == 'yolo': | |
return self._detect_yolo(image, threshold) | |
elif self.detector_type == 'opencv_dnn': | |
return self._detect_opencv_dnn(image, threshold) | |
elif self.detector_type == 'basic_cv': | |
return self._detect_basic_cv(image, threshold) | |
else: | |
return [] | |
except Exception as e: | |
logger.error(f"❌ Object detection failed: {e}") | |
return [] | |
def _detect_yolo(self, image: np.ndarray, threshold: float) -> List[Dict[str, Any]]: | |
"""Perform object detection using YOLO.""" | |
try: | |
results = self.primary_detector.predict( | |
image, | |
conf=threshold, | |
verbose=False | |
) | |
detections = [] | |
for result in results: | |
boxes = result.boxes | |
if boxes is not None: | |
for box in boxes: | |
# Extract detection information | |
xyxy = box.xyxy[0].cpu().numpy() | |
conf = float(box.conf[0].cpu().numpy()) | |
cls = int(box.cls[0].cpu().numpy()) | |
# Get class name | |
class_name = result.names[cls] if cls < len(result.names) else 'unknown' | |
# Apply confidence weighting | |
weighted_conf = self._apply_confidence_weighting(class_name, conf) | |
detection = { | |
'class': class_name, | |
'confidence': conf, | |
'weighted_confidence': weighted_conf, | |
'bbox': xyxy.tolist(), | |
'area': self._calculate_bbox_area(xyxy), | |
'center': self._calculate_bbox_center(xyxy), | |
'species_type': self._classify_species_type(class_name) | |
} | |
detections.append(detection) | |
# Apply non-maximum suppression | |
detections = self._apply_nms(detections) | |
return detections | |
except Exception as e: | |
logger.error(f"❌ YOLO detection failed: {e}") | |
return [] | |
def _detect_opencv_dnn(self, image: np.ndarray, threshold: float) -> List[Dict[str, Any]]: | |
"""Perform object detection using OpenCV DNN.""" | |
try: | |
# This is a simplified implementation | |
# In a full implementation, you would load a pre-trained DNN model | |
detections = [] | |
# Use basic object detection techniques | |
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
# Edge detection for object boundaries | |
edges = cv2.Canny(gray, 50, 150) | |
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
for contour in contours: | |
area = cv2.contourArea(contour) | |
if area > 1000: # Filter small objects | |
x, y, w, h = cv2.boundingRect(contour) | |
detection = { | |
'class': 'object', | |
'confidence': 0.5, | |
'weighted_confidence': 0.5, | |
'bbox': [x, y, x+w, y+h], | |
'area': area, | |
'center': [x + w//2, y + h//2], | |
'species_type': 'unknown' | |
} | |
detections.append(detection) | |
return detections[:10] # Limit to top 10 detections | |
except Exception as e: | |
logger.error(f"❌ OpenCV DNN detection failed: {e}") | |
return [] | |
def _detect_basic_cv(self, image: np.ndarray, threshold: float) -> List[Dict[str, Any]]: | |
"""Perform basic computer vision detection.""" | |
try: | |
detections = [] | |
# Convert to grayscale | |
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
# Use blob detection | |
params = cv2.SimpleBlobDetector_Params() | |
params.filterByArea = True | |
params.minArea = 500 | |
params.maxArea = 50000 | |
detector = cv2.SimpleBlobDetector_create(params) | |
keypoints = detector.detect(gray) | |
for kp in keypoints: | |
x, y = int(kp.pt[0]), int(kp.pt[1]) | |
size = int(kp.size) | |
detection = { | |
'class': 'blob', | |
'confidence': 0.3, | |
'weighted_confidence': 0.3, | |
'bbox': [x-size//2, y-size//2, x+size//2, y+size//2], | |
'area': size * size, | |
'center': [x, y], | |
'species_type': 'unknown' | |
} | |
detections.append(detection) | |
return detections | |
except Exception as e: | |
logger.error(f"❌ Basic CV detection failed: {e}") | |
return [] | |
def _apply_confidence_weighting(self, class_name: str, confidence: float) -> float: | |
"""Apply confidence weighting based on class type.""" | |
weight = self.class_mappings['confidence_weights'].get(class_name, 1.0) | |
return confidence * weight | |
def _calculate_bbox_area(self, bbox: np.ndarray) -> float: | |
"""Calculate bounding box area.""" | |
return (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) | |
def _calculate_bbox_center(self, bbox: np.ndarray) -> List[float]: | |
"""Calculate bounding box center.""" | |
return [(bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2] | |
def _classify_species_type(self, class_name: str) -> str: | |
"""Classify detected object into species type.""" | |
class_name_lower = class_name.lower() | |
# Check if it's a bird | |
for bird_category, bird_list in self.class_mappings['birds'].items(): | |
if class_name_lower in bird_list: | |
return 'bird' | |
# Check if it's an animal | |
for animal_category, animal_list in self.class_mappings['animals'].items(): | |
if class_name_lower in animal_list: | |
return 'animal' | |
return 'unknown' | |
def _apply_nms(self, detections: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
"""Apply non-maximum suppression to remove duplicate detections.""" | |
if not detections: | |
return detections | |
try: | |
# Extract bounding boxes and scores | |
boxes = np.array([det['bbox'] for det in detections]) | |
scores = np.array([det['confidence'] for det in detections]) | |
# Apply OpenCV NMS | |
indices = cv2.dnn.NMSBoxes( | |
boxes.tolist(), | |
scores.tolist(), | |
self.confidence_threshold, | |
self.nms_threshold | |
) | |
if len(indices) > 0: | |
indices = indices.flatten() | |
return [detections[i] for i in indices] | |
else: | |
return detections | |
except Exception as e: | |
logger.warning(f"⚠️ NMS failed, returning original detections: {e}") | |
return detections | |
def track_objects(self, detections_sequence: List[List[Dict[str, Any]]]) -> Dict[str, Any]: | |
""" | |
Track objects across multiple frames. | |
Args: | |
detections_sequence: List of detection lists for each frame | |
Returns: | |
Tracking results with object trajectories | |
""" | |
try: | |
tracking_results = { | |
'tracks': [], | |
'max_simultaneous': {}, | |
'species_counts': {}, | |
'temporal_patterns': [] | |
} | |
# Simple tracking based on spatial proximity | |
active_tracks = [] | |
track_id = 0 | |
for frame_idx, detections in enumerate(detections_sequence): | |
frame_tracks = [] | |
for detection in detections: | |
# Find closest existing track | |
best_track = None | |
min_distance = float('inf') | |
for track in active_tracks: | |
if track['class'] == detection['class']: | |
last_center = track['centers'][-1] | |
current_center = detection['center'] | |
distance = np.sqrt( | |
(last_center[0] - current_center[0])**2 + | |
(last_center[1] - current_center[1])**2 | |
) | |
if distance < min_distance and distance < 100: # Threshold | |
min_distance = distance | |
best_track = track | |
if best_track: | |
# Update existing track | |
best_track['centers'].append(detection['center']) | |
best_track['confidences'].append(detection['confidence']) | |
best_track['last_frame'] = frame_idx | |
frame_tracks.append(best_track['id']) | |
else: | |
# Create new track | |
new_track = { | |
'id': track_id, | |
'class': detection['class'], | |
'species_type': detection['species_type'], | |
'centers': [detection['center']], | |
'confidences': [detection['confidence']], | |
'first_frame': frame_idx, | |
'last_frame': frame_idx | |
} | |
active_tracks.append(new_track) | |
frame_tracks.append(track_id) | |
track_id += 1 | |
# Count simultaneous objects by type | |
species_counts = {} | |
for track_id in frame_tracks: | |
track = next(t for t in active_tracks if t['id'] == track_id) | |
species_type = track['species_type'] | |
species_counts[species_type] = species_counts.get(species_type, 0) + 1 | |
tracking_results['temporal_patterns'].append({ | |
'frame': frame_idx, | |
'active_tracks': frame_tracks.copy(), | |
'species_counts': species_counts.copy() | |
}) | |
# Update maximums | |
for species, count in species_counts.items(): | |
current_max = tracking_results['max_simultaneous'].get(species, 0) | |
tracking_results['max_simultaneous'][species] = max(current_max, count) | |
# Finalize tracks | |
tracking_results['tracks'] = active_tracks | |
return tracking_results | |
except Exception as e: | |
logger.error(f"❌ Object tracking failed: {e}") | |
return {'tracks': [], 'max_simultaneous': {}, 'species_counts': {}} | |
def classify_species(self, detection: Dict[str, Any], | |
image_region: Optional[np.ndarray] = None) -> Dict[str, Any]: | |
""" | |
Classify species for a detected object. | |
Args: | |
detection: Detection dictionary | |
image_region: Optional image region for detailed analysis | |
Returns: | |
Enhanced detection with species classification | |
""" | |
try: | |
class_name = detection.get('class', '').lower() | |
species_info = { | |
'primary_class': class_name, | |
'species_type': detection.get('species_type', 'unknown'), | |
'confidence': detection.get('confidence', 0.0), | |
'species_details': {} | |
} | |
# Detailed bird classification | |
if species_info['species_type'] == 'bird': | |
species_info['species_details'] = self._classify_bird_species(class_name) | |
# Detailed animal classification | |
elif species_info['species_type'] == 'animal': | |
species_info['species_details'] = self._classify_animal_species(class_name) | |
# Update detection with species information | |
enhanced_detection = detection.copy() | |
enhanced_detection['species_info'] = species_info | |
return enhanced_detection | |
except Exception as e: | |
logger.error(f"❌ Species classification failed: {e}") | |
return detection | |
def _classify_bird_species(self, class_name: str) -> Dict[str, Any]: | |
"""Classify bird species details.""" | |
bird_details = { | |
'category': 'unknown', | |
'habitat': 'unknown', | |
'size': 'unknown', | |
'behavior': 'unknown' | |
} | |
# Simple classification based on class name | |
if class_name in ['duck', 'goose', 'swan']: | |
bird_details.update({ | |
'category': 'waterfowl', | |
'habitat': 'aquatic', | |
'size': 'medium-large', | |
'behavior': 'swimming' | |
}) | |
elif class_name in ['eagle', 'hawk', 'owl', 'falcon']: | |
bird_details.update({ | |
'category': 'raptor', | |
'habitat': 'various', | |
'size': 'medium-large', | |
'behavior': 'hunting' | |
}) | |
elif class_name in ['sparrow', 'robin', 'finch']: | |
bird_details.update({ | |
'category': 'songbird', | |
'habitat': 'terrestrial', | |
'size': 'small', | |
'behavior': 'foraging' | |
}) | |
return bird_details | |
def _classify_animal_species(self, class_name: str) -> Dict[str, Any]: | |
"""Classify animal species details.""" | |
animal_details = { | |
'category': 'unknown', | |
'habitat': 'unknown', | |
'size': 'unknown', | |
'behavior': 'unknown' | |
} | |
# Simple classification based on class name | |
if class_name in ['cat', 'dog']: | |
animal_details.update({ | |
'category': 'domestic', | |
'habitat': 'human-associated', | |
'size': 'small-medium', | |
'behavior': 'companion' | |
}) | |
elif class_name in ['horse', 'cow', 'sheep']: | |
animal_details.update({ | |
'category': 'livestock', | |
'habitat': 'agricultural', | |
'size': 'large', | |
'behavior': 'grazing' | |
}) | |
elif class_name in ['elephant', 'giraffe', 'zebra']: | |
animal_details.update({ | |
'category': 'wild_large', | |
'habitat': 'savanna', | |
'size': 'very_large', | |
'behavior': 'roaming' | |
}) | |
return animal_details | |
def get_detection_statistics(self, detections: List[Dict[str, Any]]) -> Dict[str, Any]: | |
"""Get statistics for a set of detections.""" | |
try: | |
stats = { | |
'total_detections': len(detections), | |
'species_counts': {}, | |
'confidence_stats': {}, | |
'size_distribution': {}, | |
'class_distribution': {} | |
} | |
if not detections: | |
return stats | |
# Count by species type | |
for detection in detections: | |
species_type = detection.get('species_type', 'unknown') | |
stats['species_counts'][species_type] = stats['species_counts'].get(species_type, 0) + 1 | |
class_name = detection.get('class', 'unknown') | |
stats['class_distribution'][class_name] = stats['class_distribution'].get(class_name, 0) + 1 | |
# Confidence statistics | |
confidences = [det.get('confidence', 0.0) for det in detections] | |
stats['confidence_stats'] = { | |
'mean': np.mean(confidences), | |
'std': np.std(confidences), | |
'min': np.min(confidences), | |
'max': np.max(confidences) | |
} | |
# Size distribution | |
areas = [det.get('area', 0) for det in detections] | |
stats['size_distribution'] = { | |
'mean_area': np.mean(areas), | |
'std_area': np.std(areas), | |
'min_area': np.min(areas), | |
'max_area': np.max(areas) | |
} | |
return stats | |
except Exception as e: | |
logger.error(f"❌ Failed to calculate detection statistics: {e}") | |
return {'total_detections': 0} | |
def get_capabilities(self) -> Dict[str, Any]: | |
"""Get detection engine capabilities.""" | |
return { | |
'available': self.available, | |
'detector_type': getattr(self, 'detector_type', 'none'), | |
'confidence_threshold': self.confidence_threshold, | |
'nms_threshold': self.nms_threshold, | |
'supported_classes': list(self.class_mappings['confidence_weights'].keys()), | |
'features': [ | |
'Object detection', | |
'Species classification', | |
'Confidence scoring', | |
'Bounding box detection', | |
'Non-maximum suppression', | |
'Object tracking', | |
'Statistical analysis' | |
] | |
} | |
# Factory function for creating detection engine | |
def create_object_detection_engine() -> ObjectDetectionEngine: | |
"""Create and return an object detection engine instance.""" | |
return ObjectDetectionEngine() | |
if __name__ == "__main__": | |
# Test the detection engine | |
engine = ObjectDetectionEngine() | |
print(f"Detection engine available: {engine.available}") | |
print(f"Capabilities: {json.dumps(engine.get_capabilities(), indent=2)}") |