Spaces:
Sleeping
Sleeping
| """ | |
| resnet_dataset_creator.py - Fixed Dataset Creation Tool for ResNet Fine-tuning | |
| Changes: Removed head extraction, fixed image gallery display | |
| """ | |
| import gradio as gr | |
| import cv2 | |
| import numpy as np | |
| import pandas as pd | |
| import json | |
| import shutil | |
| import torch | |
| from pathlib import Path | |
| from typing import List, Dict, Optional, Tuple | |
| from datetime import datetime | |
| from PIL import Image | |
| import zipfile | |
| import gc | |
| # Import required modules | |
| from detection import DogDetector | |
| from tracking import SimpleTracker | |
| from reid import SingleModelReID # Using simplified version | |
| from ultralytics import YOLO | |
| # ========== IMAGE QUALITY ANALYZER (unchanged) ========== | |
| class ImageQualityAnalyzer: | |
| """Analyze and score image quality for dataset selection""" | |
| def __init__(self): | |
| self.quality_weights = { | |
| 'sharpness': 0.3, | |
| 'resolution': 0.2, | |
| 'brightness': 0.15, | |
| 'contrast': 0.15, | |
| 'occlusion': 0.2 | |
| } | |
| def calculate_sharpness(self, image: np.ndarray) -> float: | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| laplacian = cv2.Laplacian(gray, cv2.CV_64F) | |
| return min(100, laplacian.var()) | |
| def calculate_resolution_score(self, image: np.ndarray) -> float: | |
| h, w = image.shape[:2] | |
| pixels = h * w | |
| ideal_pixels = 224 * 224 | |
| return min(100, (pixels / ideal_pixels) * 100) | |
| def calculate_brightness_score(self, image: np.ndarray) -> float: | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| mean_brightness = np.mean(gray) | |
| return 100 - abs(mean_brightness - 127) * 0.78 | |
| def calculate_contrast_score(self, image: np.ndarray) -> float: | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| contrast = gray.std() | |
| return min(100, contrast * 2) | |
| def detect_occlusion(self, bbox: List[float], frame_shape: Tuple) -> float: | |
| x1, y1, x2, y2 = bbox | |
| h, w = frame_shape[:2] | |
| edge_penalty = 0 | |
| if x1 <= 5 or y1 <= 5 or x2 >= w-5 or y2 >= h-5: | |
| edge_penalty = 30 | |
| aspect = (x2 - x1) / (y2 - y1) | |
| if aspect < 0.3 or aspect > 3: | |
| edge_penalty += 20 | |
| return 100 - edge_penalty | |
| def calculate_overall_quality(self, image: np.ndarray, bbox: List[float], | |
| frame_shape: Tuple) -> float: | |
| scores = { | |
| 'sharpness': self.calculate_sharpness(image), | |
| 'resolution': self.calculate_resolution_score(image), | |
| 'brightness': self.calculate_brightness_score(image), | |
| 'contrast': self.calculate_contrast_score(image), | |
| 'occlusion': self.detect_occlusion(bbox, frame_shape) | |
| } | |
| total = sum(scores[k] * self.quality_weights[k] for k in scores) | |
| return total | |
| # ========== SMART IMAGE SELECTOR (unchanged) ========== | |
| class SmartImageSelector: | |
| """Intelligently select best images based on quality and diversity""" | |
| def __init__(self): | |
| self.quality_analyzer = ImageQualityAnalyzer() | |
| self.min_temporal_distance = 10 | |
| def select_best_images(self, dog_data: List[Dict], max_images: int = 30, | |
| video_fps: float = 30) -> List[Dict]: | |
| for item in dog_data: | |
| item['quality_score'] = self.quality_analyzer.calculate_overall_quality( | |
| item['crop'], item['bbox'], item['frame'].shape | |
| ) | |
| if len(dog_data) <= max_images: | |
| return dog_data | |
| dog_data.sort(key=lambda x: x['quality_score'], reverse=True) | |
| selected = [] | |
| selected_frames = set() | |
| selected_indices = set() | |
| for idx, item in enumerate(dog_data): | |
| frame_num = item['frame_num'] | |
| too_close = any( | |
| abs(frame_num - f) < self.min_temporal_distance | |
| for f in selected_frames | |
| ) | |
| if not too_close and len(selected) < max_images: | |
| selected.append(item) | |
| selected_frames.add(frame_num) | |
| selected_indices.add(idx) | |
| if len(selected) < max_images: | |
| for idx, item in enumerate(dog_data): | |
| if idx not in selected_indices and len(selected) < max_images: | |
| selected.append(item) | |
| selected_indices.add(idx) | |
| return selected[:max_images] | |
| # ========== MAIN DATASET CREATOR - FIXED ========== | |
| class ResNetDatasetCreator: | |
| """Main application with head extraction removed and gallery display fixed""" | |
| def __init__(self): | |
| # Directories | |
| self.temp_dir = Path("temp_dataset") | |
| self.final_dir = Path("resnet_finetune_dataset") | |
| self.database_dir = Path("permanent_database") | |
| # Components - initialize once | |
| self.detector = DogDetector(device='cuda' if torch.cuda.is_available() else 'cpu') | |
| self.tracker = SimpleTracker() | |
| self.reid = SingleModelReID(device='cuda' if torch.cuda.is_available() else 'cpu') | |
| # REMOVED: self.head_extractor = SimpleHeadExtractor() | |
| self.image_selector = SmartImageSelector() | |
| # Session data - temporary only | |
| self.current_video_path = None | |
| self.current_session = None | |
| self.temp_processed_dogs = {} # Temporary dogs from current video | |
| self.permanent_dogs = {} # Permanently saved dogs | |
| # Create directories | |
| self.temp_dir.mkdir(exist_ok=True) | |
| self.final_dir.mkdir(exist_ok=True) | |
| self.database_dir.mkdir(exist_ok=True) | |
| # Load permanent database | |
| self.load_permanent_database() | |
| def load_permanent_database(self): | |
| """Load only permanently saved dogs""" | |
| db_file = self.database_dir / "database.json" | |
| if db_file.exists(): | |
| with open(db_file, 'r') as f: | |
| data = json.load(f) | |
| self.permanent_dogs = {int(k): v for k, v in data.get('dogs', {}).items()} | |
| print(f"Loaded {len(self.permanent_dogs)} permanently saved dogs") | |
| def save_to_permanent_database(self): | |
| """Save selected dogs to permanent database""" | |
| # Merge temp dogs into permanent | |
| self.permanent_dogs.update(self.temp_processed_dogs) | |
| # Save metadata | |
| db_file = self.database_dir / "database.json" | |
| data = { | |
| 'dogs': {str(k): v for k, v in self.permanent_dogs.items()}, | |
| 'last_updated': datetime.now().isoformat() | |
| } | |
| with open(db_file, 'w') as f: | |
| json.dump(data, f, indent=2) | |
| # Copy images from temp to permanent | |
| for dog_id in self.temp_processed_dogs: | |
| src_dir = self.temp_dir / f"dog_{dog_id:03d}" | |
| dst_dir = self.database_dir / f"dog_{dog_id:03d}" | |
| if src_dir.exists(): | |
| if dst_dir.exists(): | |
| shutil.rmtree(dst_dir) | |
| shutil.copytree(src_dir, dst_dir) | |
| print(f"Saved {len(self.temp_processed_dogs)} dogs to permanent database") | |
| def clear_temp_data(self): | |
| """Clear all temporary data for new video and free memory.""" | |
| # Clear temp directory | |
| if self.temp_dir.exists(): | |
| shutil.rmtree(self.temp_dir) | |
| self.temp_dir.mkdir() | |
| # Clear temp session data | |
| self.current_video_path = None | |
| self.current_session = None | |
| self.temp_processed_dogs = {} | |
| # Reset ReID (clears in-memory dogs) | |
| self.reid.reset_all() | |
| # π ADD THESE TWO LINES FOR MEMORY CLEANUP | |
| gc.collect() | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| print("Temporary data cleared and memory released.") | |
| def clear_all_permanent_data(self): | |
| """Clear entire permanent database""" | |
| if self.database_dir.exists(): | |
| shutil.rmtree(self.database_dir) | |
| self.database_dir.mkdir() | |
| self.permanent_dogs = {} | |
| print("All permanent data cleared") | |
| def process_video(self, video_path: str, reid_threshold: float, | |
| max_images_per_dog: int, sample_rate: int) -> Dict: | |
| """Process video with current settings""" | |
| # Clear previous temp data if new video | |
| if video_path != self.current_video_path: | |
| self.clear_temp_data() | |
| self.current_video_path = video_path | |
| else: | |
| # Re-processing same video - clear and start fresh | |
| self.clear_temp_data() | |
| self.current_video_path = video_path | |
| # Set ReID threshold | |
| self.reid.set_all_thresholds(reid_threshold) | |
| # Storage for dog data | |
| dog_data = {} | |
| # Open video | |
| cap = cv2.VideoCapture(video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| frame_num = 0 | |
| processed_frames = 0 | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Sample frames | |
| if frame_num % sample_rate == 0: | |
| # Detect dogs | |
| detections = self.detector.detect(frame) | |
| # Update tracking | |
| tracks = self.tracker.update(detections) | |
| # Process each track | |
| for track in tracks: | |
| # Get ReID result | |
| results = self.reid.match_or_register_all(track) | |
| dog_id = results['ResNet50']['dog_id'] | |
| confidence = results['ResNet50']['confidence'] | |
| if dog_id > 0 and confidence > 0.3: | |
| # Get best detection | |
| detection = None | |
| for det in reversed(track.detections): | |
| if det.image_crop is not None: | |
| detection = det | |
| break | |
| if detection: | |
| if dog_id not in dog_data: | |
| dog_data[dog_id] = [] | |
| dog_data[dog_id].append({ | |
| 'frame': frame.copy(), | |
| 'crop': detection.image_crop, | |
| 'bbox': detection.bbox, | |
| 'frame_num': frame_num, | |
| 'reid_confidence': confidence, | |
| 'detection_confidence': detection.confidence, | |
| 'timestamp': frame_num / fps | |
| }) | |
| processed_frames += 1 | |
| frame_num += 1 | |
| # Yield progress | |
| if frame_num % 30 == 0: | |
| progress = int((frame_num / total_frames) * 100) | |
| yield {'progress': progress, 'status': f"Processing: {progress}%"} | |
| cap.release() | |
| # Select best images for each dog | |
| total_images = 0 | |
| new_dogs = {} | |
| for dog_id, images in dog_data.items(): | |
| selected = self.image_selector.select_best_images( | |
| images, max_images_per_dog, fps | |
| ) | |
| # Save to temp directory only - ONLY FULL BODY IMAGES | |
| dog_dir = self.temp_dir / f"dog_{dog_id:03d}" | |
| dog_dir.mkdir(exist_ok=True) | |
| # REMOVED: (dog_dir / 'head').mkdir(exist_ok=True) | |
| saved_count = 0 | |
| for idx, img_data in enumerate(selected): | |
| # Save full crop only | |
| full_path = dog_dir / f"frame_{img_data['frame_num']:06d}.jpg" | |
| cv2.imwrite(str(full_path), img_data['crop']) | |
| # REMOVED: Head extraction and saving | |
| saved_count += 1 | |
| total_images += saved_count | |
| # Store in temp dogs only | |
| new_dogs[dog_id] = { | |
| 'num_images': saved_count, | |
| 'avg_confidence': np.mean([d['reid_confidence'] for d in selected]), | |
| 'quality_scores': [d['quality_score'] for d in selected] | |
| } | |
| # Update temp dogs (not permanent) | |
| self.temp_processed_dogs = new_dogs | |
| # Save session info | |
| self.current_session = { | |
| 'video': video_path, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'num_dogs': len(new_dogs), | |
| 'total_images': total_images, | |
| 'reid_threshold': reid_threshold, | |
| 'dogs': {str(k): v for k, v in new_dogs.items()} | |
| } | |
| # Save metadata to temp | |
| with open(self.temp_dir / 'session.json', 'w') as f: | |
| json.dump(self.current_session, f, indent=2) | |
| yield {'status': 'complete', 'session': self.current_session} | |
| def get_dog_images(self, dog_id: int, from_permanent: bool = False, max_display: int = None) -> List: | |
| """Get images for verification - FIXED to show all or specified number of images""" | |
| if from_permanent: | |
| dog_dir = self.database_dir / f"dog_{dog_id:03d}" | |
| else: | |
| dog_dir = self.temp_dir / f"dog_{dog_id:03d}" | |
| # Check directly in dog directory (no 'full' subdirectory anymore) | |
| if not dog_dir.exists(): | |
| return [] | |
| images = [] | |
| image_files = sorted(dog_dir.glob("*.jpg")) | |
| # If max_display is specified, limit to that number, otherwise show all | |
| if max_display: | |
| image_files = image_files[:max_display] | |
| for img_path in image_files: | |
| img = cv2.imread(str(img_path)) | |
| if img is not None: | |
| img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| images.append(img_rgb) | |
| return images | |
| def remove_images_by_selection(self, dog_id: int, selected_indices: List, from_permanent: bool = False): | |
| """Remove images based on gallery selection""" | |
| if from_permanent: | |
| dog_dir = self.database_dir / f"dog_{dog_id:03d}" | |
| else: | |
| dog_dir = self.temp_dir / f"dog_{dog_id:03d}" | |
| if not dog_dir.exists(): | |
| return | |
| image_files = sorted(list(dog_dir.glob("*.jpg"))) | |
| # Remove selected images | |
| for idx in selected_indices: | |
| if 0 <= idx < len(image_files): | |
| # Remove image | |
| image_files[idx].unlink(missing_ok=True) | |
| def delete_dog(self, dog_id: int, from_permanent: bool = False): | |
| """Delete entire dog folder""" | |
| if from_permanent: | |
| dog_dir = self.database_dir / f"dog_{dog_id:03d}" | |
| if dog_id in self.permanent_dogs: | |
| del self.permanent_dogs[dog_id] | |
| else: | |
| dog_dir = self.temp_dir / f"dog_{dog_id:03d}" | |
| if dog_id in self.temp_processed_dogs: | |
| del self.temp_processed_dogs[dog_id] | |
| if dog_dir.exists(): | |
| shutil.rmtree(dog_dir) | |
| def save_final_dataset(self, format_type: str = 'both') -> str: | |
| """Export both temp and permanent dogs - UPDATED for full body only""" | |
| if self.final_dir.exists(): | |
| shutil.rmtree(self.final_dir) | |
| self.final_dir.mkdir() | |
| # Combine temp and permanent dogs | |
| all_dog_dirs = [] | |
| # Add temp dogs | |
| for d in self.temp_dir.iterdir(): | |
| if d.is_dir() and d.name.startswith('dog_'): | |
| all_dog_dirs.append(d) | |
| # Add permanent dogs | |
| temp_dogs = {d.name for d in all_dog_dirs} | |
| for d in self.database_dir.iterdir(): | |
| if d.is_dir() and d.name.startswith('dog_') and d.name not in temp_dogs: | |
| all_dog_dirs.append(d) | |
| data_entries = [] | |
| final_id = 1 | |
| for dog_dir in sorted(all_dog_dirs): | |
| if not dog_dir.exists(): | |
| continue | |
| final_dog_dir = self.final_dir / f"dog_{final_id:03d}" | |
| shutil.copytree(dog_dir, final_dog_dir) | |
| for img_path in final_dog_dir.glob("*.jpg"): | |
| data_entries.append({ | |
| 'dog_id': final_id, | |
| 'image_path': str(img_path.relative_to(self.final_dir)), | |
| 'class': final_id | |
| }) | |
| final_id += 1 | |
| if format_type in ['csv', 'both']: | |
| df = pd.DataFrame(data_entries) | |
| if len(df) > 5: | |
| from sklearn.model_selection import train_test_split | |
| train_df, val_df = train_test_split( | |
| df, test_size=0.2, stratify=df['dog_id'], random_state=42 | |
| ) | |
| train_df.to_csv(self.final_dir / 'train.csv', index=False) | |
| val_df.to_csv(self.final_dir / 'val.csv', index=False) | |
| else: | |
| df.to_csv(self.final_dir / 'train.csv', index=False) | |
| metadata = { | |
| 'total_dogs': final_id - 1, | |
| 'total_images': len(data_entries), | |
| 'format': format_type, | |
| 'created': datetime.now().isoformat() | |
| } | |
| with open(self.final_dir / 'metadata.json', 'w') as f: | |
| json.dump(metadata, f, indent=2) | |
| # Create zip | |
| zip_path = self.final_dir.parent / f"resnet_dataset_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip" | |
| with zipfile.ZipFile(zip_path, 'w') as zipf: | |
| for file_path in self.final_dir.rglob('*'): | |
| zipf.write(file_path, file_path.relative_to(self.final_dir)) | |
| return str(zip_path) | |
| def create_interface(self): | |
| """Create Gradio interface with fixes""" | |
| with gr.Blocks( | |
| title="ResNet Fine-tuning Dataset Creator", | |
| theme=gr.themes.Soft() | |
| ) as app: | |
| gr.Markdown(""" | |
| # π― ResNet Fine-tuning Dataset Creator - Full Body Only | |
| ### Creates dataset with full body dog images only (no head extraction) | |
| """) | |
| # States | |
| processing_state = gr.State(None) | |
| selected_indices_state = gr.State([]) | |
| with gr.Tabs() as tabs: | |
| # ========== STEP 1: PROCESS VIDEO ========== | |
| with gr.Tab("πΉ Step 1: Process Video", id=0): | |
| with gr.Row(): | |
| video_input = gr.Video(label="Upload Video") | |
| with gr.Column(): | |
| reid_threshold = gr.Slider( | |
| 0.30, 0.85, 0.40, step=0.05, | |
| label="ReID Threshold", | |
| info="Lower = More lenient (combine similar dogs)" | |
| ) | |
| max_images = gr.Slider( | |
| 10, 50, 30, step=5, | |
| label="Max Images per Dog" | |
| ) | |
| sample_rate = gr.Slider( | |
| 1, 5, 2, step=1, | |
| label="Sample Rate", | |
| info="Process every Nth frame" | |
| ) | |
| process_btn = gr.Button("π Process Video", variant="primary", size="lg") | |
| with gr.Column(): | |
| progress_bar = gr.Textbox(label="Progress", interactive=False) | |
| results_display = gr.HTML(label="Processing Results") | |
| with gr.Row(): | |
| clear_btn = gr.Button( | |
| "π Clear & Reset (Process Again)", | |
| variant="secondary", | |
| size="lg", | |
| visible=False | |
| ) | |
| def process_wrapper(video, threshold, max_img, sample): | |
| """Process with current settings""" | |
| if not video: | |
| return None, "", "Please upload a video", gr.update(visible=False) | |
| # Process video (will auto-clear if needed) | |
| for update in self.process_video(video, threshold, int(max_img), int(sample)): | |
| if 'progress' in update: | |
| yield None, "", update['status'], gr.update(visible=False) | |
| else: | |
| # Format results | |
| session = update['session'] | |
| html = f""" | |
| <div style="padding: 20px; background: #f8f9fa; border-radius: 10px;"> | |
| <h3>π Processing Complete!</h3> | |
| <p><b>Dogs detected:</b> {session['num_dogs']}</p> | |
| <p><b>Total full body images:</b> {session['total_images']}</p> | |
| <p><b>ReID threshold used:</b> {session['reid_threshold']:.2f}</p> | |
| <hr> | |
| <p>β Data is in <b>temporary storage</b>. Review in Step 2 before saving permanently.</p> | |
| </div> | |
| """ | |
| yield session, html, "Complete! β ", gr.update(visible=True) | |
| def clear_and_reset(): | |
| """Clear all temp data for reprocessing""" | |
| self.clear_temp_data() | |
| return None, "", "", gr.update(visible=False) | |
| process_btn.click( | |
| process_wrapper, | |
| inputs=[video_input, reid_threshold, max_images, sample_rate], | |
| outputs=[processing_state, results_display, progress_bar, clear_btn] | |
| ) | |
| clear_btn.click( | |
| clear_and_reset, | |
| outputs=[processing_state, results_display, progress_bar, clear_btn] | |
| ) | |
| # ========== STEP 2: VERIFY & CLEAN ========== | |
| with gr.Tab("β Step 2: Verify & Clean", id=1): | |
| gr.Markdown(""" | |
| Review temporary results. **Nothing is permanently saved until you click Save.** | |
| Click images in the gallery to select them, then use Remove Selected. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| source_selector = gr.Radio( | |
| choices=["Temporary (Current Video)", "Permanent (Saved)"], | |
| value="Temporary (Current Video)", | |
| label="Data Source" | |
| ) | |
| dog_selector = gr.Dropdown( | |
| label="Select Dog", | |
| choices=[], | |
| interactive=True | |
| ) | |
| refresh_btn = gr.Button("π Refresh List") | |
| image_gallery = gr.Gallery( | |
| label="Full Body Images - Click to select for removal", | |
| show_label=True, | |
| columns=6, | |
| rows=8, # Increased rows for more visibility | |
| object_fit="contain", | |
| height=600, # Fixed height for scrolling | |
| interactive=True, | |
| type="numpy" | |
| ) | |
| with gr.Row(): | |
| selected_info = gr.Textbox( | |
| label="Selected Images", | |
| value="No images selected", | |
| interactive=False | |
| ) | |
| remove_selected_btn = gr.Button("π Remove Selected Images", variant="secondary") | |
| delete_dog_btn = gr.Button("β Delete Entire Dog", variant="stop") | |
| with gr.Row(): | |
| save_to_permanent_btn = gr.Button( | |
| "πΎ Save Current Video Results to Permanent Database", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| clear_permanent_btn = gr.Button( | |
| "β οΈ Clear All Permanent Data", | |
| variant="stop" | |
| ) | |
| status_text = gr.Textbox(label="Status", interactive=False) | |
| def refresh_dogs(source): | |
| """Refresh dog list based on source""" | |
| if source == "Temporary (Current Video)": | |
| if not self.temp_processed_dogs: | |
| return gr.update(choices=[], value=None) | |
| choices = [f"Dog {dog_id}" for dog_id in sorted(self.temp_processed_dogs.keys())] | |
| else: | |
| if not self.permanent_dogs: | |
| return gr.update(choices=[], value=None) | |
| choices = [f"Dog {dog_id}" for dog_id in sorted(self.permanent_dogs.keys())] | |
| if choices: | |
| return gr.update(choices=choices, value=choices[0]) | |
| return gr.update(choices=[], value=None) | |
| def show_dog_images(dog_selection, source): | |
| """Display ALL images for selected dog""" | |
| if not dog_selection: | |
| return [], [], "No dog selected" | |
| dog_id = int(dog_selection.split()[1]) | |
| from_permanent = (source == "Permanent (Saved)") | |
| # Don't limit number of images - show all | |
| images = self.get_dog_images(dog_id, from_permanent) | |
| return images, [], f"Showing {len(images)} images for Dog {dog_id}" | |
| def handle_gallery_select(evt: gr.SelectData, selected_indices): | |
| """Handle gallery selection""" | |
| if evt.index in selected_indices: | |
| selected_indices.remove(evt.index) | |
| else: | |
| selected_indices.append(evt.index) | |
| if selected_indices: | |
| return selected_indices, f"Selected images: {sorted(selected_indices)}" | |
| return [], "No images selected" | |
| def remove_selected_images(dog_selection, source, selected_indices): | |
| """Remove selected images""" | |
| if not dog_selection: | |
| return "No dog selected", [], [] | |
| if not selected_indices: | |
| return "No images selected", gr.update(), selected_indices | |
| dog_id = int(dog_selection.split()[1]) | |
| from_permanent = (source == "Permanent (Saved)") | |
| self.remove_images_by_selection(dog_id, selected_indices, from_permanent) | |
| # Refresh gallery | |
| images = self.get_dog_images(dog_id, from_permanent) | |
| return f"Removed {len(selected_indices)} images", images, [] | |
| def delete_dog(dog_selection, source): | |
| """Delete entire dog""" | |
| if not dog_selection: | |
| return "No dog selected", [] | |
| dog_id = int(dog_selection.split()[1]) | |
| from_permanent = (source == "Permanent (Saved)") | |
| self.delete_dog(dog_id, from_permanent) | |
| return f"Deleted Dog {dog_id}", [] | |
| def save_to_permanent(): | |
| """Save current temp results to permanent database""" | |
| if not self.temp_processed_dogs: | |
| return "No temporary data to save" | |
| self.save_to_permanent_database() | |
| count = len(self.temp_processed_dogs) | |
| self.clear_temp_data() # Clear temp after saving | |
| return f"β Saved {count} dogs to permanent database. Temp data cleared." | |
| def clear_all_permanent(): | |
| """Clear all permanent data""" | |
| self.clear_all_permanent_data() | |
| return "β οΈ All permanent data cleared" | |
| # Event handlers | |
| refresh_btn.click( | |
| refresh_dogs, | |
| inputs=source_selector, | |
| outputs=dog_selector | |
| ) | |
| dog_selector.change( | |
| show_dog_images, | |
| inputs=[dog_selector, source_selector], | |
| outputs=[image_gallery, selected_indices_state, selected_info] | |
| ) | |
| image_gallery.select( | |
| handle_gallery_select, | |
| inputs=selected_indices_state, | |
| outputs=[selected_indices_state, selected_info] | |
| ) | |
| remove_selected_btn.click( | |
| remove_selected_images, | |
| inputs=[dog_selector, source_selector, selected_indices_state], | |
| outputs=[status_text, image_gallery, selected_indices_state] | |
| ) | |
| delete_dog_btn.click( | |
| delete_dog, | |
| inputs=[dog_selector, source_selector], | |
| outputs=[status_text, image_gallery] | |
| ) | |
| save_to_permanent_btn.click( | |
| save_to_permanent, | |
| outputs=status_text | |
| ) | |
| clear_permanent_btn.click( | |
| clear_all_permanent, | |
| outputs=status_text | |
| ) | |
| # ========== STEP 3: EXPORT DATASET ========== | |
| with gr.Tab("πΎ Step 3: Export Dataset", id=2): | |
| gr.Markdown(""" | |
| Export combined dataset (temporary + permanent dogs) for training. | |
| **Dataset contains full body images only.** | |
| """) | |
| format_selector = gr.Radio( | |
| choices=["folder", "csv", "both"], | |
| value="both", | |
| label="Export Format" | |
| ) | |
| export_btn = gr.Button("π¦ Export Final Dataset", variant="primary", size="lg") | |
| export_output = gr.Textbox(label="Export Path", interactive=False) | |
| download_file = gr.File(label="Download Dataset", interactive=False) | |
| stats_display = gr.Markdown() | |
| def export_dataset(format_type): | |
| try: | |
| zip_path = self.save_final_dataset(format_type) | |
| with open(self.final_dir / 'metadata.json', 'r') as f: | |
| metadata = json.load(f) | |
| stats = f""" | |
| ### β Dataset Exported! | |
| - **Total Dogs**: {metadata['total_dogs']} | |
| - **Total Full Body Images**: {metadata['total_images']} | |
| - **Format**: {format_type} | |
| Download the ZIP file below. | |
| """ | |
| return zip_path, zip_path, stats | |
| except Exception as e: | |
| return "", None, f"### β Export Error\n{str(e)}" | |
| export_btn.click( | |
| export_dataset, | |
| inputs=format_selector, | |
| outputs=[export_output, download_file, stats_display] | |
| ) | |
| return app | |
| # Main entry point | |
| if __name__ == "__main__": | |
| creator = ResNetDatasetCreator() | |
| app = creator.create_interface() | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False | |
| ) |