""" enhanced_gradio_app.py - Enhanced Gradio Interface for Dog Monitoring With SQLite database, dataset curation, and export features """ import gradio as gr import cv2 import numpy as np import pandas as pd from PIL import Image import time import json import zipfile import tempfile from pathlib import Path from typing import List, Dict, Optional, Tuple from datetime import datetime # Import core modules from detection import DogDetector from tracking import SimpleTracker from reid import SimplifiedDogReID from database import DogDatabase from threshold_optimizer import ThresholdOptimizer class EnhancedDogMonitoringApp: """Enhanced app with database and dataset management""" def __init__(self, db_path: str = "dog_monitoring.db"): """Initialize the enhanced monitoring system""" # Core components self.detector = DogDetector(device='cuda') self.tracker = SimpleTracker() self.reid = SimplifiedDogReID(device='cuda') # Database self.db = DogDatabase(db_path) # Threshold optimizer self.threshold_optimizer = ThresholdOptimizer() # Processing parameters self.detection_confidence = 0.45 self.reid_threshold = 0.7 self.process_every_n_frames = 3 # Current session info self.current_video_path = None self.current_frame_count = 0 self.processing_active = False def process_video(self, video_path: str, progress=None): """Process video with database storage""" if not video_path: return None, [], "No video uploaded" self.current_video_path = video_path self.processing_active = True # Reset tracking for new video self.tracker = SimpleTracker() self.reid.reset() # Open video cap = cv2.VideoCapture(video_path) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) # Prepare output width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) output_path = "output_video.mp4" fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) frame_count = 0 dogs_in_video = set() while self.processing_active: ret, frame = cap.read() if not ret: break frame_count += 1 self.current_frame_count = frame_count # Update progress if progress and total_frames > 0: progress(frame_count / total_frames, f"Processing frame {frame_count}/{total_frames}") # Process every Nth frame if frame_count % self.process_every_n_frames == 0: # Detect dogs detections = self.detector.detect(frame) # Update tracker tracks = self.tracker.update(detections) # Process each track for track in tracks: # Re-identify dog_id, confidence = self.reid.match_or_register(track) if dog_id > 0: dogs_in_video.add(dog_id) # Save to database self._save_to_database( dog_id, track, confidence, frame_count, video_path ) # Draw on frame self._draw_track(frame, track, dog_id, confidence) # Feed to optimizer self.threshold_optimizer.add_reid_sample( similarity=confidence, matched_dog_id=dog_id ) # Add overlay self._add_overlay(frame, frame_count, len(dogs_in_video)) # Write frame out.write(frame) cap.release() out.release() self.processing_active = False # Create summary summary = f"Processed {frame_count} frames, detected {len(dogs_in_video)} unique dogs" return output_path, self._get_dog_gallery(), summary def _save_to_database(self, dog_id: int, track, confidence: float, frame_number: int, video_source: str): """Save dog data to database""" # Ensure dog exists in database self.db.add_dog(dog_id) # Get latest detection with image detection = None for det in reversed(track.detections): if det.image_crop is not None: detection = det break if detection: # Save image FIRST and get the image_id image_id = self.db.save_image( dog_id=dog_id, image=detection.image_crop, frame_number=frame_number, video_source=video_source, bbox=detection.bbox, confidence=confidence ) # Extract and save body parts - THIS IS THE KEY ADDITION crops = self.reid.extract_body_parts(detection.image_crop, detection.bbox) confidences = crops.crop_confidences or {} self.db.save_body_parts( dog_id=dog_id, image_id=image_id, # Use the returned image_id head_crop=crops.head_crop, torso_crop=crops.torso_crop, rear_crop=crops.rear_crop, confidences=confidences ) # Save features features = self.reid.dog_database.get(dog_id, []) if features: latest_feature = features[-1] self.db.save_features( dog_id=dog_id, resnet_features=latest_feature.full_features, color_histogram=latest_feature.color_histogram, confidence=confidence ) # Save sighting bbox = detection.bbox position = ((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2) self.db.add_sighting( dog_id=dog_id, position=position, video_source=video_source, frame_number=frame_number, confidence=confidence ) # Update dog sighting count self.db.update_dog_sighting(dog_id) def _draw_track(self, frame: np.ndarray, track, dog_id: int, confidence: float): """Draw bounding box with dog ID""" bbox = track.bbox x1, y1, x2, y2 = map(int, bbox) # Color based on confidence if confidence > 0.8: color = (0, 255, 0) elif confidence > 0.6: color = (0, 165, 255) else: color = (0, 0, 255) # Draw box and label cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) label = f"Dog #{dog_id} ({confidence:.0%})" label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0] cv2.rectangle(frame, (x1, y1 - label_size[1] - 10), (x1 + label_size[0], y1), color, -1) cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) def _add_overlay(self, frame: np.ndarray, frame_count: int, dog_count: int): """Add info overlay to frame""" h, w = frame.shape[:2] # Semi-transparent background overlay = frame.copy() cv2.rectangle(overlay, (10, 10), (250, 80), (0, 0, 0), -1) frame[:] = cv2.addWeighted(overlay, 0.3, frame, 0.7, 0) # Add text cv2.putText(frame, f"Frame: {frame_count}", (20, 35), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) cv2.putText(frame, f"Dogs: {dog_count}", (20, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1) def _get_dog_gallery(self) -> List[Tuple[np.ndarray, str]]: """Get gallery of detected dogs from database""" gallery = [] dogs = self.db.get_all_dogs() for _, dog in dogs.head(12).iterrows(): images = self.db.get_dog_images(dog['dog_id'], include_discarded=False) if images: img = images[0]['image'] img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) caption = f"Dog #{dog['dog_id']} | Sightings: {dog['total_sightings']}" gallery.append((img_rgb, caption)) return gallery # ========== Dataset Management Functions ========== def get_dog_list(self) -> pd.DataFrame: """Get list of all dogs for management""" dogs = self.db.get_all_dogs() return dogs[['dog_id', 'name', 'first_seen', 'last_seen', 'total_sightings', 'status']] def get_dog_images_for_review(self, dog_id: int) -> Tuple[List, List]: """Get dog images and body parts for review/validation""" if dog_id is None: return [], [] dog_id = int(dog_id) # Get full images images = self.db.get_dog_images(dog_id, include_discarded=True) display_images = [] for img_data in images: img_rgb = cv2.cvtColor(img_data['image'], cv2.COLOR_BGR2RGB) status = "✓" if img_data['is_validated'] else "✗" if img_data['is_discarded'] else "?" display_images.append({ 'image': img_rgb, 'image_id': img_data['image_id'], 'status': status, 'confidence': img_data['confidence'] }) # Get body parts body_parts = self.db.get_body_parts(dog_id, include_discarded=True) display_parts = [] for part_data in body_parts: part_rgb = cv2.cvtColor(part_data['image'], cv2.COLOR_BGR2RGB) status = "✓" if part_data['is_validated'] else "✗" if part_data.get('is_discarded') else "?" display_parts.append({ 'image': part_rgb, 'part_id': part_data['part_id'], 'part_type': part_data['part_type'], 'status': status, 'confidence': part_data['confidence'] }) return display_images, display_parts def validate_body_parts(self, part_ids: List[int], action: str) -> str: """Validate or discard body parts""" if not part_ids: return "No parts selected" count = 0 for part_id in part_ids: # Extract the actual ID from tuple if needed if isinstance(part_id, tuple): part_id = part_id[1] # Get value from (label, value) tuple if action == "validate": self.db.validate_body_part(part_id, is_valid=True) elif action == "discard": self.db.validate_body_part(part_id, is_valid=False) count += 1 return f"Updated {count} body parts" def validate_images(self, dog_id: int, image_ids: List[int], action: str) -> str: """Validate or discard images""" if not image_ids: return "No images selected" count = 0 for img_id in image_ids: # Extract the actual ID from tuple if needed if isinstance(img_id, tuple): img_id = img_id[1] # Get value from (label, value) tuple if action == "validate": self.db.validate_image(img_id, is_valid=True) elif action == "discard": self.db.validate_image(img_id, is_valid=False) count += 1 return f"Updated {count} images" def merge_dogs_handler(self, keep_id: int, merge_id: int) -> str: """Handle dog merging""" if keep_id == merge_id: return "Cannot merge dog with itself" if self.db.merge_dogs(keep_id, merge_id): return f"Successfully merged Dog #{merge_id} into Dog #{keep_id}" else: return "Failed to merge dogs" def export_dataset(self, output_format: str, validated_only: bool) -> str: """Export dataset for training""" try: # Create temporary directory with tempfile.TemporaryDirectory() as temp_dir: # Export dataset metadata = self.db.export_training_dataset( temp_dir, validated_only=validated_only ) # Create zip file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") zip_path = f"dog_dataset_{timestamp}.zip" with zipfile.ZipFile(zip_path, 'w') as zipf: for root, dirs, files in Path(temp_dir).walk(): for file in files: file_path = Path(root) / file arcname = file_path.relative_to(temp_dir) zipf.write(file_path, arcname) return f"Dataset exported: {zip_path} ({metadata['total_images']} images)" except Exception as e: return f"Export failed: {str(e)}" def reset_database_handler(self, confirm_text: str) -> str: """Handle database reset""" if confirm_text.lower() != "reset": return "Type 'reset' to confirm database reset" if self.db.reset_database(confirm=True): self.reid.reset() return "Database reset successfully" else: return "Database reset failed" def get_database_stats(self) -> str: """Get database statistics""" stats = self.db.get_dog_statistics() return f""" 📊 Database Statistics: â€ĸ Active Dogs: {stats['total_active_dogs']} â€ĸ Total Images: {stats['total_images']} â€ĸ Validated Images: {stats['validated_images']} â€ĸ Total Sightings: {stats['total_sightings']} 🏆 Most Seen: {stats.get('most_seen_dog', {}).get('name', 'None')} ({stats.get('most_seen_dog', {}).get('sightings', 0)} sightings) """ def stop_processing(self): """Stop video processing""" self.processing_active = False return "Processing stopped" # ========== Gradio Interface ========== def create_interface(self) -> gr.Blocks: """Create enhanced Gradio interface""" with gr.Blocks(title="Enhanced Dog Monitoring System", theme=gr.themes.Soft()) as app: gr.Markdown(""" # 🐕 Enhanced Stray Dog Monitoring System **Detection â€ĸ Tracking â€ĸ Re-ID â€ĸ Database â€ĸ Dataset Export** """) with gr.Tabs(): # Tab 1: Video Processing with gr.Tab("📹 Process Video"): with gr.Row(): with gr.Column(scale=1): video_input = gr.Video(label="Upload Video") with gr.Row(): process_btn = gr.Button("â–ļī¸ Process", variant="primary") stop_btn = gr.Button("âšī¸ Stop") # Settings gr.Markdown("### Settings") detection_slider = gr.Slider( 0.1, 0.9, 0.45, step=0.05, label="Detection Confidence" ) reid_slider = gr.Slider( 0.3, 0.95, 0.7, step=0.05, label="ReID Threshold" ) frame_skip = gr.Slider( 1, 10, 3, step=1, label="Process Every N Frames" ) with gr.Column(scale=2): video_output = gr.Video(label="Processed Video") processing_status = gr.Textbox(label="Status") dog_gallery = gr.Gallery( label="Detected Dogs", columns=4, rows=3 ) # Tab 2: Dog Management with gr.Tab("đŸļ Manage Dogs"): with gr.Row(): with gr.Column(scale=1): gr.Markdown("### Dog Registry") refresh_btn = gr.Button("🔄 Refresh List") dog_table = gr.Dataframe( headers=["ID", "Name", "First Seen", "Last Seen", "Sightings", "Status"], interactive=False ) # Merge dogs gr.Markdown("### Merge Dogs") with gr.Row(): keep_dog_id = gr.Number(label="Keep Dog ID", precision=0) merge_dog_id = gr.Number(label="Merge Dog ID", precision=0) merge_btn = gr.Button("🔀 Merge Dogs") merge_status = gr.Textbox(label="Merge Status") with gr.Column(scale=2): gr.Markdown("### Review Dog Images & Body Parts") selected_dog_id = gr.Number( label="Dog ID to Review", precision=0 ) load_images_btn = gr.Button("📷 Load Images") with gr.Tab("Full Images"): dog_images_gallery = gr.Gallery( label="Full Dog Images", columns=4, rows=3, height=300 ) with gr.Row(): selected_images = gr.CheckboxGroup( label="Select Images", choices=[] ) with gr.Row(): validate_imgs_btn = gr.Button("✅ Validate Selected") discard_imgs_btn = gr.Button("❌ Discard Selected") with gr.Tab("Body Parts"): body_parts_gallery = gr.Gallery( label="Body Part Crops", columns=4, rows=3, height=300 ) gr.Markdown(""" **Part Types**: - Head (top 35% of dog) - Torso (middle 40%) - Rear (bottom 40%) Validate correctly cropped parts, discard mixed/wrong crops. """) with gr.Row(): selected_parts = gr.CheckboxGroup( label="Select Parts", choices=[] ) with gr.Row(): validate_parts_btn = gr.Button("✅ Validate Parts") discard_parts_btn = gr.Button("❌ Discard Parts") validation_status = gr.Textbox(label="Validation Status") # Tab 3: Dataset Export with gr.Tab("💾 Export Dataset"): gr.Markdown(""" ### Export Training Dataset Export validated dog images for ResNet fine-tuning """) with gr.Row(): with gr.Column(): export_format = gr.Radio( ["Images + CSV", "COCO Format", "YOLO Format"], value="Images + CSV", label="Export Format" ) validated_only = gr.Checkbox( label="Export validated images only", value=True ) export_btn = gr.Button("đŸ“Ļ Export Dataset", variant="primary") export_status = gr.Textbox(label="Export Status") gr.Markdown(""" ### Dataset Info The exported dataset includes: - Individual dog images organized by ID - CSV file with labels and metadata - Bounding box annotations - Pose keypoints (if available) Use this dataset to fine-tune ResNet for better re-identification! """) with gr.Column(): stats_display = gr.Textbox( label="Database Statistics", lines=10 ) refresh_stats_btn = gr.Button("📊 Refresh Stats") # Tab 4: Database Management with gr.Tab("âš™ī¸ Database"): gr.Markdown("### Database Management") with gr.Row(): with gr.Column(): gr.Markdown(""" âš ī¸ **Warning**: Resetting the database will delete all data! Type 'reset' to confirm. """) reset_confirm = gr.Textbox( label="Type 'reset' to confirm", placeholder="reset" ) reset_btn = gr.Button("đŸ—‘ī¸ Reset Database", variant="stop") reset_status = gr.Textbox(label="Reset Status") gr.Markdown("### Database Optimization") optimize_btn = gr.Button("🔧 Optimize Database") optimize_status = gr.Textbox(label="Optimization Status") # Event handlers # Video processing process_btn.click( self.process_video, inputs=[video_input], outputs=[video_output, dog_gallery, processing_status] ) stop_btn.click( self.stop_processing, outputs=[processing_status] ) # Settings updates detection_slider.change( lambda v: setattr(self, 'detection_confidence', v) or f"Detection: {v:.2f}", inputs=[detection_slider], outputs=[processing_status] ) reid_slider.change( lambda v: self.reid.set_threshold(v) or f"ReID Threshold set to: {v:.2f}", inputs=[reid_slider], outputs=[processing_status] ) frame_skip.change( lambda v: setattr(self, 'process_every_n_frames', int(v)) or f"Skip: {int(v)}", inputs=[frame_skip], outputs=[processing_status] ) # Dog management refresh_btn.click( self.get_dog_list, outputs=[dog_table] ) merge_btn.click( self.merge_dogs_handler, inputs=[keep_dog_id, merge_dog_id], outputs=[merge_status] ) def load_dog_images(dog_id): if dog_id is None: return [], gr.update(choices=[]), [], gr.update(choices=[]), "No dog selected" images, parts = self.get_dog_images_for_review(int(dog_id)) # FIX: Use (label, id) tuples for choices img_gallery = [(img['image'], f"ID: {img['image_id']} | {img['status']} | {img['confidence']:.1%}") for img in images] img_choices = [(f"Image ID: {img['image_id']}", img['image_id']) for img in images] # FIX: Use (label, id) tuples for choices part_gallery = [(p['image'], f"ID: {p['part_id']} | {p['part_type'].upper()} {p['status']}") for p in parts] part_choices = [(f"{p['part_type'].capitalize()} ID: {p['part_id']}", p['part_id']) for p in parts] return (img_gallery, gr.update(choices=img_choices), part_gallery, gr.update(choices=part_choices), f"Loaded {len(images)} images, {len(parts)} body parts for Dog #{int(dog_id)}") load_images_btn.click( load_dog_images, inputs=[selected_dog_id], outputs=[dog_images_gallery, selected_images, body_parts_gallery, selected_parts, validation_status] ) # Validate/discard body parts validate_parts_btn.click( self.validate_body_parts, inputs=[selected_parts, gr.State("validate")], outputs=[validation_status] ) discard_parts_btn.click( self.validate_body_parts, inputs=[selected_parts, gr.State("discard")], outputs=[validation_status] ) # FIX: Add event handlers for image validation validate_imgs_btn.click( self.validate_images, inputs=[selected_dog_id, selected_images, gr.State("validate")], outputs=[validation_status] ) discard_imgs_btn.click( self.validate_images, inputs=[selected_dog_id, selected_images, gr.State("discard")], outputs=[validation_status] ) # Dataset export export_btn.click( self.export_dataset, inputs=[export_format, validated_only], outputs=[export_status] ) refresh_stats_btn.click( self.get_database_stats, outputs=[stats_display] ) # Database management reset_btn.click( self.reset_database_handler, inputs=[reset_confirm], outputs=[reset_status] ) def optimize_database(): self.db.vacuum() return "Database optimized" optimize_btn.click( optimize_database, outputs=[optimize_status] ) # Load initial data app.load( self.get_dog_list, outputs=[dog_table] ) app.load( self.get_database_stats, outputs=[stats_display] ) return app def main(): """Launch the enhanced application""" app = EnhancedDogMonitoringApp() interface = app.create_interface() interface.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True ) if __name__ == "__main__": main()