Spaces:
Sleeping
Sleeping
| """ | |
| dataset_generator.py - Enhanced Dataset Generation with User Verification | |
| Hybrid workflow for creating clean dog ReID fine-tuning datasets | |
| """ | |
| import gradio as gr | |
| import cv2 | |
| import numpy as np | |
| import pandas as pd | |
| import json | |
| import shutil | |
| from pathlib import Path | |
| from typing import List, Dict, Optional, Tuple | |
| from datetime import datetime | |
| import base64 | |
| from io import BytesIO | |
| from PIL import Image | |
| from ultralytics import YOLO | |
| # Import existing modules | |
| from detection import DogDetector | |
| from tracking import SimpleTracker | |
| from reid import SingleModelReID | |
| from database import DogDatabase | |
| # ========== ENHANCED HEADSHOT EXTRACTOR ========== | |
| class ImprovedHeadExtractor: | |
| """Enhanced head extraction with multiple strategies""" | |
| def __init__(self): | |
| # Try to load pose model | |
| self.pose_model = None | |
| try: | |
| self.pose_model = YOLO('yolov8m-pose.pt') | |
| self.pose_model.to('cuda') | |
| print("Pose model loaded for head extraction") | |
| except: | |
| print("Using adaptive geometric head extraction") | |
| # Dog keypoint indices | |
| self.keypoints_map = { | |
| 'nose': 0, 'left_eye': 1, 'right_eye': 2, | |
| 'left_ear': 3, 'right_ear': 4 | |
| } | |
| def extract_adaptive_geometric(self, dog_crop: np.ndarray, | |
| aspect_ratio: float) -> Optional[np.ndarray]: | |
| """Adaptive geometric extraction based on dog orientation""" | |
| h, w = dog_crop.shape[:2] | |
| try: | |
| if aspect_ratio > 1.3: # Dog sideways | |
| # Head is typically in first 35% horizontally | |
| head_width = int(w * 0.35) | |
| head_height = int(h * 0.5) | |
| head_crop = dog_crop[:head_height, :head_width] | |
| elif aspect_ratio < 0.7: # Dog vertical (sitting/standing facing camera) | |
| # Head is top portion, centered | |
| head_height = int(h * 0.45) | |
| margin = int(w * 0.15) | |
| head_crop = dog_crop[:head_height, margin:w-margin] | |
| else: # Normal orientation | |
| # Standard extraction | |
| head_height = int(h * 0.4) | |
| margin = int(w * 0.1) | |
| head_crop = dog_crop[:head_height, margin:w-margin] | |
| if head_crop.size == 0: | |
| return None | |
| # Resize to standard size | |
| head_crop = cv2.resize(head_crop, (128, 128)) | |
| return head_crop | |
| except: | |
| return None | |
| def extract_with_pose(self, dog_crop: np.ndarray) -> Optional[np.ndarray]: | |
| """Extract using pose keypoints if available""" | |
| if self.pose_model is None: | |
| return None | |
| try: | |
| results = self.pose_model(dog_crop, conf=0.25, verbose=False) | |
| if results and len(results) > 0 and hasattr(results[0], 'keypoints'): | |
| keypoints = results[0].keypoints | |
| if keypoints is not None and keypoints.xy is not None: | |
| kpts = keypoints.xy[0].cpu().numpy() | |
| # Collect head keypoints | |
| head_points = [] | |
| for key in ['nose', 'left_eye', 'right_eye', 'left_ear', 'right_ear']: | |
| idx = self.keypoints_map[key] | |
| if idx < len(kpts) and kpts[idx][0] > 0: | |
| head_points.append(kpts[idx]) | |
| if len(head_points) >= 3: # Need at least 3 points | |
| head_points = np.array(head_points) | |
| # Add padding around keypoints | |
| padding = 30 | |
| min_x = max(0, int(np.min(head_points[:, 0]) - padding)) | |
| min_y = max(0, int(np.min(head_points[:, 1]) - padding)) | |
| max_x = min(dog_crop.shape[1], int(np.max(head_points[:, 0]) + padding)) | |
| max_y = min(dog_crop.shape[0], int(np.max(head_points[:, 1]) + padding * 1.2)) | |
| head_crop = dog_crop[min_y:max_y, min_x:max_x] | |
| if head_crop.size > 0: | |
| head_crop = cv2.resize(head_crop, (128, 128)) | |
| return head_crop | |
| except: | |
| pass | |
| return None | |
| def extract_head(self, image: np.ndarray, bbox: List[float]) -> Dict: | |
| """Main extraction method returning both head crop and metadata""" | |
| x1, y1, x2, y2 = map(int, bbox) | |
| dog_crop = image[y1:y2, x1:x2] | |
| if dog_crop.size == 0: | |
| return {'head_crop': None, 'method': 'failed', 'confidence': 0.0} | |
| aspect_ratio = (x2 - x1) / (y2 - y1) | |
| # Try pose-based extraction first | |
| head_crop = self.extract_with_pose(dog_crop) | |
| method = 'pose' | |
| # Fallback to adaptive geometric | |
| if head_crop is None: | |
| head_crop = self.extract_adaptive_geometric(dog_crop, aspect_ratio) | |
| method = 'geometric' | |
| # Calculate quality score | |
| confidence = 0.0 | |
| if head_crop is not None: | |
| gray = cv2.cvtColor(head_crop, cv2.COLOR_BGR2GRAY) | |
| laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var() | |
| confidence = min(1.0, laplacian_var / 100) | |
| return { | |
| 'head_crop': head_crop, | |
| 'method': method, | |
| 'confidence': confidence, | |
| 'bbox': [x1, y1, x2, y2] | |
| } | |
| # ========== DATASET GENERATOR WITH VERIFICATION ========== | |
| class DatasetGenerator: | |
| """Generate and manage temporary datasets for verification""" | |
| def __init__(self, temp_dir: str = "temp_dataset", | |
| final_dir: str = "verified_dataset"): | |
| self.temp_dir = Path(temp_dir) | |
| self.final_dir = Path(final_dir) | |
| self.head_extractor = ImprovedHeadExtractor() | |
| # Create directories | |
| self.temp_dir.mkdir(exist_ok=True) | |
| self.final_dir.mkdir(exist_ok=True) | |
| # Tracking | |
| self.current_session = None | |
| self.verification_status = {} | |
| def process_video_for_dataset(self, video_path: str, reid_threshold: float = 0.75, | |
| max_images_per_dog: int = 30) -> Dict: | |
| """Process video and extract provisional dataset""" | |
| # Clear temp directory | |
| if self.temp_dir.exists(): | |
| shutil.rmtree(self.temp_dir) | |
| self.temp_dir.mkdir() | |
| # Initialize components | |
| detector = DogDetector(device='cuda', confidence_threshold=0.45) | |
| tracker = SimpleTracker() | |
| reid = SingleModelReID(device='cuda') | |
| reid.set_all_thresholds(reid_threshold) | |
| # Storage for dog images | |
| dog_images = {} # dog_id -> list of (image, bbox, frame_num, confidence) | |
| # Process video | |
| cap = cv2.VideoCapture(video_path) | |
| frame_num = 0 | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Process every 3rd frame to avoid too many similar images | |
| if frame_num % 3 == 0: | |
| # Detect and track | |
| detections = detector.detect(frame) | |
| tracks = tracker.update(detections) | |
| # Process each track | |
| for track in tracks: | |
| # Get ReID result | |
| results = reid.match_or_register_all(track) | |
| dog_id = results['ResNet50']['dog_id'] | |
| confidence = results['ResNet50']['confidence'] | |
| if dog_id > 0: | |
| # Get best detection from track | |
| detection = None | |
| for det in reversed(track.detections): | |
| if det.image_crop is not None: | |
| detection = det | |
| break | |
| if detection: | |
| if dog_id not in dog_images: | |
| dog_images[dog_id] = [] | |
| # Store image data | |
| dog_images[dog_id].append({ | |
| 'frame': frame.copy(), | |
| 'bbox': detection.bbox, | |
| 'frame_num': frame_num, | |
| 'confidence': confidence, | |
| 'detection_conf': detection.confidence | |
| }) | |
| frame_num += 1 | |
| cap.release() | |
| # Extract and save best images for each dog | |
| dataset_info = { | |
| 'video_source': video_path, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'dogs': {} | |
| } | |
| for dog_id, images in dog_images.items(): | |
| # Sort by confidence and quality | |
| images.sort(key=lambda x: x['confidence'] * x['detection_conf'], reverse=True) | |
| # Take top N images | |
| selected_images = images[:max_images_per_dog] | |
| # Create dog directory | |
| dog_dir = self.temp_dir / f"dog_{dog_id:03d}_provisional" | |
| dog_dir.mkdir(exist_ok=True) | |
| # Create subdirectories | |
| (dog_dir / 'full').mkdir(exist_ok=True) | |
| (dog_dir / 'head').mkdir(exist_ok=True) | |
| dog_info = { | |
| 'dog_id': dog_id, | |
| 'num_images': len(selected_images), | |
| 'avg_confidence': np.mean([img['confidence'] for img in selected_images]), | |
| 'images': [] | |
| } | |
| # Save images | |
| for idx, img_data in enumerate(selected_images): | |
| # Extract crops | |
| x1, y1, x2, y2 = map(int, img_data['bbox']) | |
| full_crop = img_data['frame'][y1:y2, x1:x2] | |
| # Extract head | |
| head_result = self.head_extractor.extract_head( | |
| img_data['frame'], img_data['bbox'] | |
| ) | |
| # Save full crop | |
| full_path = dog_dir / 'full' / f"frame_{img_data['frame_num']:06d}.jpg" | |
| cv2.imwrite(str(full_path), full_crop) | |
| # Save head crop if available | |
| head_path = None | |
| if head_result['head_crop'] is not None: | |
| head_path = dog_dir / 'head' / f"frame_{img_data['frame_num']:06d}_head.jpg" | |
| cv2.imwrite(str(head_path), head_result['head_crop']) | |
| # Store metadata | |
| dog_info['images'].append({ | |
| 'frame_num': img_data['frame_num'], | |
| 'confidence': img_data['confidence'], | |
| 'detection_conf': img_data['detection_conf'], | |
| 'has_head': head_path is not None, | |
| 'head_method': head_result['method'], | |
| 'head_confidence': head_result['confidence'] | |
| }) | |
| dataset_info['dogs'][dog_id] = dog_info | |
| # Save metadata | |
| with open(self.temp_dir / 'dataset_info.json', 'w') as f: | |
| json.dump(dataset_info, f, indent=2) | |
| self.current_session = dataset_info | |
| return dataset_info | |
| def get_dog_preview_images(self, dog_id: int, num_images: int = 6) -> List: | |
| """Get preview images for verification interface""" | |
| dog_dir = self.temp_dir / f"dog_{dog_id:03d}_provisional" | |
| full_dir = dog_dir / 'full' | |
| if not full_dir.exists(): | |
| return [] | |
| images = [] | |
| image_files = sorted(list(full_dir.glob("*.jpg")))[:num_images] | |
| for img_path in image_files: | |
| img = cv2.imread(str(img_path)) | |
| img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| images.append(img_rgb) | |
| return images | |
| def verify_dog(self, dog_id: int, action: str, images_to_remove: List[str] = None): | |
| """Process user verification action""" | |
| if action == 'approve': | |
| self.verification_status[dog_id] = 'approved' | |
| elif action == 'delete': | |
| self.verification_status[dog_id] = 'deleted' | |
| elif action == 'remove_images' and images_to_remove: | |
| # Remove specific images | |
| dog_dir = self.temp_dir / f"dog_{dog_id:03d}_provisional" | |
| for img_name in images_to_remove: | |
| img_path = dog_dir / 'full' / img_name | |
| if img_path.exists(): | |
| img_path.unlink() | |
| # Also remove corresponding head image | |
| head_path = dog_dir / 'head' / img_name.replace('.jpg', '_head.jpg') | |
| if head_path.exists(): | |
| head_path.unlink() | |
| self.verification_status[dog_id] = 'cleaned' | |
| def merge_dogs(self, dog_id1: int, dog_id2: int): | |
| """Merge two dog folders""" | |
| dir1 = self.temp_dir / f"dog_{dog_id1:03d}_provisional" | |
| dir2 = self.temp_dir / f"dog_{dog_id2:03d}_provisional" | |
| if dir1.exists() and dir2.exists(): | |
| # Move all images from dir2 to dir1 | |
| for img_path in (dir2 / 'full').glob("*.jpg"): | |
| shutil.move(str(img_path), str(dir1 / 'full' / img_path.name)) | |
| for img_path in (dir2 / 'head').glob("*.jpg"): | |
| shutil.move(str(img_path), str(dir1 / 'head' / img_path.name)) | |
| # Remove dir2 | |
| shutil.rmtree(dir2) | |
| self.verification_status[dog_id2] = 'merged' | |
| def finalize_dataset(self) -> Dict: | |
| """Move verified dogs to final dataset""" | |
| if not self.current_session: | |
| return {'error': 'No active session'} | |
| # Clear final directory | |
| if self.final_dir.exists(): | |
| shutil.rmtree(self.final_dir) | |
| self.final_dir.mkdir() | |
| final_dogs = [] | |
| dog_id_mapping = {} # provisional_id -> final_id | |
| final_id = 1 | |
| for dog_id, status in self.verification_status.items(): | |
| if status in ['approved', 'cleaned']: | |
| # Copy to final directory | |
| src_dir = self.temp_dir / f"dog_{dog_id:03d}_provisional" | |
| dst_dir = self.final_dir / f"dog_{final_id:03d}" | |
| if src_dir.exists(): | |
| shutil.copytree(src_dir, dst_dir) | |
| dog_id_mapping[dog_id] = final_id | |
| final_dogs.append({ | |
| 'final_id': final_id, | |
| 'provisional_id': dog_id, | |
| 'num_images': len(list((dst_dir / 'full').glob("*.jpg"))) | |
| }) | |
| final_id += 1 | |
| # Create training metadata | |
| self.create_training_metadata(final_dogs) | |
| return { | |
| 'total_dogs': len(final_dogs), | |
| 'dogs': final_dogs, | |
| 'dataset_path': str(self.final_dir) | |
| } | |
| def create_training_metadata(self, dogs: List[Dict]): | |
| """Create CSV files for fine-tuning""" | |
| data = [] | |
| for dog_info in dogs: | |
| dog_dir = self.final_dir / f"dog_{dog_info['final_id']:03d}" | |
| # Get all images | |
| for img_path in (dog_dir / 'full').glob("*.jpg"): | |
| head_path = dog_dir / 'head' / img_path.name.replace('.jpg', '_head.jpg') | |
| data.append({ | |
| 'dog_id': dog_info['final_id'], | |
| 'full_image': str(img_path.relative_to(self.final_dir)), | |
| 'head_image': str(head_path.relative_to(self.final_dir)) if head_path.exists() else None | |
| }) | |
| # Create DataFrame | |
| df = pd.DataFrame(data) | |
| # Split into train/val (80/20) | |
| train_size = int(len(df) * 0.8) | |
| train_df = df.iloc[:train_size] | |
| val_df = df.iloc[train_size:] | |
| # Save CSVs | |
| train_df.to_csv(self.final_dir / 'train.csv', index=False) | |
| val_df.to_csv(self.final_dir / 'val.csv', index=False) | |
| # Save metadata | |
| metadata = { | |
| 'total_dogs': len(dogs), | |
| 'total_images': len(df), | |
| 'train_images': len(train_df), | |
| 'val_images': len(val_df), | |
| 'created': datetime.now().isoformat() | |
| } | |
| with open(self.final_dir / 'metadata.json', 'w') as f: | |
| json.dump(metadata, f, indent=2) | |
| # ========== GRADIO INTERFACE ========== | |
| class DatasetVerificationApp: | |
| """Gradio app for dataset verification""" | |
| def __init__(self): | |
| self.generator = DatasetGenerator() | |
| def create_interface(self): | |
| with gr.Blocks( | |
| title="Dog Dataset Verification", | |
| theme=gr.themes.Soft() | |
| ) as app: | |
| gr.Markdown(""" | |
| # π Dog ReID Dataset Generator | |
| ### Hybrid workflow: AI grouping + Human verification = Clean dataset | |
| """) | |
| # Step 1: Process Video | |
| with gr.Tab("Step 1: Process Video"): | |
| with gr.Row(): | |
| video_input = gr.Video(label="Upload Video") | |
| with gr.Column(): | |
| reid_threshold = gr.Slider( | |
| 0.65, 0.85, 0.75, step=0.05, | |
| label="ReID Threshold (Higher = Stricter)" | |
| ) | |
| max_images = gr.Slider( | |
| 10, 50, 30, step=5, | |
| label="Max Images per Dog" | |
| ) | |
| process_btn = gr.Button("Process Video", variant="primary") | |
| process_output = gr.JSON(label="Processing Results") | |
| def process_video(video_path, threshold, max_imgs): | |
| if not video_path: | |
| return {"error": "Please upload a video"} | |
| result = self.generator.process_video_for_dataset( | |
| video_path, threshold, int(max_imgs) | |
| ) | |
| return result | |
| process_btn.click( | |
| process_video, | |
| inputs=[video_input, reid_threshold, max_images], | |
| outputs=process_output | |
| ) | |
| # Step 2: Verify Dogs | |
| with gr.Tab("Step 2: Verify Dogs"): | |
| gr.Markdown("Review each dog folder and verify/clean the images") | |
| with gr.Row(): | |
| dog_selector = gr.Dropdown( | |
| label="Select Dog to Review", | |
| choices=[] | |
| ) | |
| refresh_btn = gr.Button("Refresh Dog List") | |
| preview_gallery = gr.Gallery( | |
| label="Dog Images Preview", | |
| show_label=True, | |
| columns=3, | |
| rows=2, | |
| height="auto" | |
| ) | |
| with gr.Row(): | |
| approve_btn = gr.Button("β Approve", variant="primary") | |
| clean_btn = gr.Button("π§Ή Remove Selected", variant="secondary") | |
| delete_btn = gr.Button("β Delete All", variant="stop") | |
| with gr.Row(): | |
| merge_dog1 = gr.Dropdown(label="Merge Dog 1") | |
| merge_dog2 = gr.Dropdown(label="With Dog 2") | |
| merge_btn = gr.Button("π Merge Dogs") | |
| verification_status = gr.Textbox(label="Verification Status") | |
| def refresh_dogs(): | |
| if not self.generator.current_session: | |
| return gr.update(choices=[]) | |
| dogs = self.generator.current_session['dogs'] | |
| choices = [f"Dog {dog_id}" for dog_id in dogs.keys()] | |
| return gr.update(choices=choices) | |
| def show_dog_preview(dog_selection): | |
| if not dog_selection: | |
| return [] | |
| dog_id = int(dog_selection.split()[1]) | |
| return self.generator.get_dog_preview_images(dog_id) | |
| def approve_dog(dog_selection): | |
| if not dog_selection: | |
| return "No dog selected" | |
| dog_id = int(dog_selection.split()[1]) | |
| self.generator.verify_dog(dog_id, 'approve') | |
| return f"β Dog {dog_id} approved" | |
| def delete_dog(dog_selection): | |
| if not dog_selection: | |
| return "No dog selected" | |
| dog_id = int(dog_selection.split()[1]) | |
| self.generator.verify_dog(dog_id, 'delete') | |
| return f"β Dog {dog_id} deleted" | |
| def merge_dogs(dog1, dog2): | |
| if not dog1 or not dog2: | |
| return "Select both dogs to merge" | |
| id1 = int(dog1.split()[1]) | |
| id2 = int(dog2.split()[1]) | |
| self.generator.merge_dogs(id1, id2) | |
| return f"π Merged Dog {id2} into Dog {id1}" | |
| refresh_btn.click(refresh_dogs, outputs=dog_selector) | |
| dog_selector.change(show_dog_preview, inputs=dog_selector, outputs=preview_gallery) | |
| approve_btn.click(approve_dog, inputs=dog_selector, outputs=verification_status) | |
| delete_btn.click(delete_dog, inputs=dog_selector, outputs=verification_status) | |
| merge_btn.click(merge_dogs, inputs=[merge_dog1, merge_dog2], outputs=verification_status) | |
| # Step 3: Finalize Dataset | |
| with gr.Tab("Step 3: Finalize"): | |
| gr.Markdown("Save verified dogs to final dataset") | |
| finalize_btn = gr.Button("π¦ Create Final Dataset", variant="primary", size="lg") | |
| final_output = gr.JSON(label="Dataset Creation Results") | |
| download_section = gr.Markdown("") | |
| def finalize(): | |
| result = self.generator.finalize_dataset() | |
| if 'error' not in result: | |
| download_html = f""" | |
| ### β Dataset Ready! | |
| - **Total Dogs:** {result['total_dogs']} | |
| - **Location:** `{result['dataset_path']}` | |
| - **Files:** `train.csv`, `val.csv`, `metadata.json` | |
| Ready for ResNet50 fine-tuning! | |
| """ | |
| else: | |
| download_html = f"β Error: {result['error']}" | |
| return result, download_html | |
| finalize_btn.click(finalize, outputs=[final_output, download_section]) | |
| return app | |
| # Main entry point | |
| if __name__ == "__main__": | |
| app = DatasetVerificationApp() | |
| interface = app.create_interface() | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False | |
| ) |