""" database.py - SQLite Database Manager for Dog Monitoring System Handles persistent storage of dog data, features, and annotations """ import sqlite3 import json import pickle import base64 import numpy as np import cv2 from datetime import datetime from typing import List, Dict, Optional, Tuple, Any from pathlib import Path import pandas as pd class DogDatabase: """SQLite database manager for dog monitoring system""" def __init__(self, db_path: str = "dog_monitoring.db"): """Initialize database connection and create tables""" self.db_path = db_path self.conn = sqlite3.connect(db_path, check_same_thread=False) self.conn.row_factory = sqlite3.Row self.cursor = self.conn.cursor() # Create tables if they don't exist self._create_tables() def _create_tables(self): """Create all necessary database tables""" # Dogs table - main registry self.cursor.execute(""" CREATE TABLE IF NOT EXISTS dogs ( dog_id INTEGER PRIMARY KEY, name TEXT, first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, total_sightings INTEGER DEFAULT 1, notes TEXT, merged_from TEXT, status TEXT DEFAULT 'active' ) """) # Dog features table self.cursor.execute(""" CREATE TABLE IF NOT EXISTS dog_features ( feature_id INTEGER PRIMARY KEY AUTOINCREMENT, dog_id INTEGER, resnet_features BLOB, color_histogram BLOB, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, confidence REAL, FOREIGN KEY (dog_id) REFERENCES dogs(dog_id) ) """) # Dog images table self.cursor.execute(""" CREATE TABLE IF NOT EXISTS dog_images ( image_id INTEGER PRIMARY KEY AUTOINCREMENT, dog_id INTEGER, image_data BLOB, thumbnail BLOB, width INTEGER, height INTEGER, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, frame_number INTEGER, video_source TEXT, bbox TEXT, confidence REAL, pose_keypoints TEXT, is_validated BOOLEAN DEFAULT 0, is_discarded BOOLEAN DEFAULT 0, FOREIGN KEY (dog_id) REFERENCES dogs(dog_id) ) """) # Body parts table self.cursor.execute(""" CREATE TABLE IF NOT EXISTS body_parts ( part_id INTEGER PRIMARY KEY AUTOINCREMENT, dog_id INTEGER, image_id INTEGER, part_type TEXT, part_image BLOB, crop_bbox TEXT, confidence REAL, is_validated BOOLEAN DEFAULT 0, is_discarded BOOLEAN DEFAULT 0, FOREIGN KEY (dog_id) REFERENCES dogs(dog_id), FOREIGN KEY (image_id) REFERENCES dog_images(image_id) ) """) # Sightings table self.cursor.execute(""" CREATE TABLE IF NOT EXISTS sightings ( sighting_id INTEGER PRIMARY KEY AUTOINCREMENT, dog_id INTEGER, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, position_x REAL, position_y REAL, video_source TEXT, frame_number INTEGER, confidence REAL, FOREIGN KEY (dog_id) REFERENCES dogs(dog_id) ) """) # Processing sessions table self.cursor.execute(""" CREATE TABLE IF NOT EXISTS sessions ( session_id INTEGER PRIMARY KEY AUTOINCREMENT, start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, end_time TIMESTAMP, video_path TEXT, total_frames INTEGER, dogs_detected INTEGER, settings TEXT ) """) # Create indexes self.cursor.execute("CREATE INDEX IF NOT EXISTS idx_dog_features ON dog_features(dog_id)") self.cursor.execute("CREATE INDEX IF NOT EXISTS idx_dog_images ON dog_images(dog_id)") self.cursor.execute("CREATE INDEX IF NOT EXISTS idx_sightings ON sightings(dog_id)") self.conn.commit() # ========== Dog Management ========== def add_dog(self, name: str = None) -> int: """Add new dog to database and return dog_id""" cursor = self.conn.cursor() cursor.execute(''' INSERT INTO dogs (name, first_seen, status) VALUES (?, datetime('now'), 'active') ''', (name,)) self.conn.commit() return cursor.lastrowid def update_dog_sighting(self, dog_id: int): """Update last seen time and increment sighting count""" self.cursor.execute(""" UPDATE dogs SET last_seen = CURRENT_TIMESTAMP, total_sightings = total_sightings + 1 WHERE dog_id = ? """, (dog_id,)) self.conn.commit() def export_training_dataset(self, output_dir: str = "training_dataset") -> Dict: """ Export database images to folder structure for fine-tuning Output structure: training_dataset/ ├── dog_1/ │ ├── img_0000.jpg │ └── ... ├── dog_2/ └── metadata.json Returns: dict with export statistics """ from pathlib import Path import json from datetime import datetime output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) # Get all active dogs dogs_df = self.get_all_dogs(active_only=True) if dogs_df.empty: return { 'success': False, 'message': 'No dogs in database', 'total_dogs': 0, 'total_images': 0 } total_images = 0 export_info = [] for _, dog in dogs_df.iterrows(): dog_id = dog['dog_id'] dog_name = dog['name'] or f"dog_{dog_id}" # Create folder for this dog dog_folder = output_path / dog_name dog_folder.mkdir(exist_ok=True) # Get all non-discarded images images = self.get_dog_images( dog_id=dog_id, validated_only=False, include_discarded=False ) if not images: print(f"Warning: Dog {dog_id} has no images") continue # Save each image as JPG for idx, img_data in enumerate(images): image = img_data['image'] # OpenCV array from BLOB filename = f"img_{idx:04d}.jpg" filepath = dog_folder / filename # Write to disk cv2.imwrite(str(filepath), image) total_images += len(images) export_info.append({ 'dog_id': dog_id, 'name': dog_name, 'image_count': len(images), 'folder': str(dog_folder) }) print(f"Exported {len(images)} images for {dog_name}") # Create metadata file metadata = { 'export_date': datetime.now().isoformat(), 'total_dogs': len(dogs_df), 'total_images': total_images, 'output_directory': str(output_path), 'dogs': export_info } metadata_path = output_path / 'metadata.json' with open(metadata_path, 'w') as f: json.dump(metadata, f, indent=2) print(f"\nExport complete: {len(dogs_df)} dogs, {total_images} images") print(f"Location: {output_path}") return { 'success': True, 'total_dogs': len(dogs_df), 'total_images': total_images, 'output_path': str(output_path), 'dogs': export_info } def merge_dogs(self, keep_id: int, merge_id: int) -> bool: """Merge two dogs, keeping keep_id""" try: self.cursor.execute("UPDATE dog_features SET dog_id = ? WHERE dog_id = ?", (keep_id, merge_id)) self.cursor.execute("UPDATE dog_images SET dog_id = ? WHERE dog_id = ?", (keep_id, merge_id)) self.cursor.execute("UPDATE sightings SET dog_id = ? WHERE dog_id = ?", (keep_id, merge_id)) self.cursor.execute("SELECT merged_from FROM dogs WHERE dog_id = ?", (merge_id,)) row = self.cursor.fetchone() merged_history = json.loads(row['merged_from'] if row and row['merged_from'] else '[]') merged_history.append(merge_id) self.cursor.execute(""" UPDATE dogs SET merged_from = ?, total_sightings = total_sightings + ( SELECT total_sightings FROM dogs WHERE dog_id = ? ) WHERE dog_id = ? """, (json.dumps(merged_history), merge_id, keep_id)) self.cursor.execute("UPDATE dogs SET status = 'merged' WHERE dog_id = ?", (merge_id,)) self.conn.commit() return True except Exception as e: print(f"Error merging dogs: {e}") self.conn.rollback() return False def delete_dog(self, dog_id: int, hard_delete: bool = False): """Delete or mark dog as deleted""" if hard_delete: self.cursor.execute("DELETE FROM dog_features WHERE dog_id = ?", (dog_id,)) self.cursor.execute("DELETE FROM dog_images WHERE dog_id = ?", (dog_id,)) self.cursor.execute("DELETE FROM sightings WHERE dog_id = ?", (dog_id,)) self.cursor.execute("DELETE FROM dogs WHERE dog_id = ?", (dog_id,)) else: self.cursor.execute("UPDATE dogs SET status = 'deleted' WHERE dog_id = ?", (dog_id,)) self.conn.commit() # ========== Features Management ========== def save_features(self, dog_id: int, resnet_features: np.ndarray, color_histogram: np.ndarray, confidence: float): """Save dog features to database""" resnet_blob = pickle.dumps(resnet_features) color_blob = pickle.dumps(color_histogram) self.cursor.execute(""" INSERT INTO dog_features (dog_id, resnet_features, color_histogram, confidence) VALUES (?, ?, ?, ?) """, (dog_id, resnet_blob, color_blob, confidence)) self.conn.commit() def get_features(self, dog_id: int, limit: int = 20) -> List[Dict]: """Get recent features for a dog""" self.cursor.execute(""" SELECT * FROM dog_features WHERE dog_id = ? ORDER BY timestamp DESC LIMIT ? """, (dog_id, limit)) features = [] for row in self.cursor.fetchall(): features.append({ 'resnet_features': pickle.loads(row['resnet_features']), 'color_histogram': pickle.loads(row['color_histogram']), 'confidence': row['confidence'], 'timestamp': row['timestamp'] }) return features # ========== Images Management ========== # ========== Images Management ========== def save_image(self, dog_id: int, image: np.ndarray, frame_number: int, video_source: str, bbox: List[float], confidence: float, pose_keypoints: Optional[List] = None): """Save dog image to database""" _, buffer = cv2.imencode('.jpg', image) image_data = base64.b64encode(buffer).decode('utf-8') thumbnail = cv2.resize(image, (128, 128)) _, thumb_buffer = cv2.imencode('.jpg', thumbnail, [cv2.IMWRITE_JPEG_QUALITY, 70]) thumb_data = base64.b64encode(thumb_buffer).decode('utf-8') h, w = image.shape[:2] self.cursor.execute(""" INSERT INTO dog_images (dog_id, image_data, thumbnail, width, height, frame_number, video_source, bbox, confidence, pose_keypoints) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (dog_id, image_data, thumb_data, w, h, frame_number, video_source, json.dumps(bbox), confidence, json.dumps(pose_keypoints) if pose_keypoints else None)) self.conn.commit() return self.cursor.lastrowid def add_dog_image(self, dog_id: int, image: np.ndarray, timestamp: float, confidence: float, bbox: tuple): """Add a dog image to database (simplified for dataset collection)""" try: # Convert image to base64 for storage _, buffer = cv2.imencode('.jpg', image) image_data = base64.b64encode(buffer).decode('utf-8') # Create thumbnail thumbnail = cv2.resize(image, (128, 128)) _, thumb_buffer = cv2.imencode('.jpg', thumbnail, [cv2.IMWRITE_JPEG_QUALITY, 70]) thumb_data = base64.b64encode(thumb_buffer).decode('utf-8') h, w = image.shape[:2] # Insert into database self.cursor.execute(""" INSERT INTO dog_images (dog_id, image_data, thumbnail, width, height, timestamp, bbox, confidence) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, (dog_id, image_data, thumb_data, w, h, timestamp, json.dumps(bbox), confidence)) # Update dog's last_seen and sighting count self.cursor.execute(""" UPDATE dogs SET last_seen = datetime('now'), total_sightings = total_sightings + 1 WHERE dog_id = ? """, (dog_id,)) self.conn.commit() return self.cursor.lastrowid except Exception as e: self.conn.rollback() print(f"Error adding dog image: {e}") raise def get_dog_images(self, dog_id: int, validated_only: bool = False, include_discarded: bool = False) -> List[Dict]: """Get all images for a dog""" query = "SELECT * FROM dog_images WHERE dog_id = ?" params = [dog_id] if validated_only: query += " AND is_validated = 1" if not include_discarded: query += " AND is_discarded = 0" query += " ORDER BY timestamp DESC" self.cursor.execute(query, params) rows = self.cursor.fetchall() images = [] for row in rows: # ✅ Decode base64 string to bytes first image_bytes = base64.b64decode(row['image_data']) nparr = np.frombuffer(image_bytes, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) images.append({ 'image_id': row['image_id'], 'image': image, 'thumbnail': row['thumbnail'], 'bbox': json.loads(row['bbox']) if row['bbox'] else [], 'confidence': row['confidence'], 'frame_number': row['frame_number'] if 'frame_number' in row.keys() else None, 'video_source': row['video_source'] if 'video_source' in row.keys() else None, 'is_validated': row['is_validated'], 'is_discarded': row['is_discarded'], 'pose_keypoints': json.loads(row['pose_keypoints']) if row['pose_keypoints'] else None }) return images def validate_image(self, image_id: int, is_valid: bool = True): """Mark image as validated or discarded""" if is_valid: self.cursor.execute("UPDATE dog_images SET is_validated = 1 WHERE image_id = ?", (image_id,)) else: self.cursor.execute("UPDATE dog_images SET is_discarded = 1 WHERE image_id = ?", (image_id,)) self.conn.commit() # ========== Body Parts Management ========== def save_body_parts(self, dog_id: int, image_id: int, head_crop: Optional[np.ndarray], torso_crop: Optional[np.ndarray], rear_crop: Optional[np.ndarray], confidences: Dict[str, float]): """Save body part crops to database""" parts = { 'head': head_crop, 'torso': torso_crop, 'rear': rear_crop } for part_type, crop in parts.items(): if crop is not None: _, buffer = cv2.imencode('.jpg', crop) crop_data = base64.b64encode(buffer).decode('utf-8') confidence = confidences.get(part_type, 0.0) self.cursor.execute(""" INSERT INTO body_parts (dog_id, image_id, part_type, part_image, confidence) VALUES (?, ?, ?, ?, ?) """, (dog_id, image_id, part_type, crop_data, confidence)) self.conn.commit() def get_body_parts(self, dog_id: int, part_type: Optional[str] = None, validated_only: bool = False, include_discarded: bool = False) -> List[Dict]: """Get body part crops for a dog""" query = "SELECT * FROM body_parts WHERE dog_id = ?" params = [dog_id] if part_type: query += " AND part_type = ?" params.append(part_type) if validated_only: query += " AND is_validated = 1" if not include_discarded: query += " AND is_discarded = 0" self.cursor.execute(query, params) parts = [] for row in self.cursor.fetchall(): image_bytes = base64.b64decode(row['part_image']) nparr = np.frombuffer(image_bytes, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) parts.append({ 'part_id': row['part_id'], 'part_type': row['part_type'], 'image': image, 'confidence': row['confidence'], 'is_validated': row['is_validated'], 'image_id': row['image_id'] }) return parts def validate_body_part(self, part_id: int, is_valid: bool = True): """Mark body part as validated or discarded""" if is_valid: self.cursor.execute("UPDATE body_parts SET is_validated = 1 WHERE part_id = ?", (part_id,)) else: self.cursor.execute("UPDATE body_parts SET is_discarded = 1 WHERE part_id = ?", (part_id,)) self.conn.commit() def add_sighting(self, dog_id: int, position: Tuple[float, float], video_source: str, frame_number: int, confidence: float): """Record a dog sighting""" self.cursor.execute(""" INSERT INTO sightings (dog_id, position_x, position_y, video_source, frame_number, confidence) VALUES (?, ?, ?, ?, ?, ?) """, (dog_id, position[0], position[1], video_source, frame_number, confidence)) self.conn.commit() # ========== Query Methods ========== def get_all_dogs(self, active_only: bool = True) -> pd.DataFrame: """Get all dogs as DataFrame""" query = "SELECT * FROM dogs" if active_only: query += " WHERE status = 'active'" query += " ORDER BY dog_id" return pd.read_sql_query(query, self.conn) def get_dog_statistics(self) -> Dict: """Get overall statistics""" stats = {} self.cursor.execute("SELECT COUNT(*) FROM dogs WHERE status = 'active'") stats['total_active_dogs'] = self.cursor.fetchone()[0] self.cursor.execute("SELECT COUNT(*) FROM dog_images WHERE is_discarded = 0") stats['total_images'] = self.cursor.fetchone()[0] self.cursor.execute("SELECT COUNT(*) FROM dog_images WHERE is_validated = 1") stats['validated_images'] = self.cursor.fetchone()[0] self.cursor.execute("SELECT COUNT(*) FROM sightings") stats['total_sightings'] = self.cursor.fetchone()[0] self.cursor.execute(""" SELECT d.dog_id, d.name, d.total_sightings FROM dogs d WHERE d.status = 'active' ORDER BY d.total_sightings DESC LIMIT 1 """) row = self.cursor.fetchone() if row: stats['most_seen_dog'] = { 'dog_id': row[0], 'name': row[1] or f"Dog #{row[0]}", 'sightings': row[2] } return stats # ========== Export Methods ========== def export_training_dataset(self, output_dir: str, validated_only: bool = True) -> Dict: """Export dataset with body parts for fine-tuning""" output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) images_dir = output_path / "images" images_dir.mkdir(exist_ok=True) dataset = [] dogs = self.get_all_dogs() for _, dog in dogs.iterrows(): dog_id = dog['dog_id'] dog_dir = images_dir / f"dog_{dog_id}" dog_dir.mkdir(exist_ok=True) for part in ['full', 'head', 'torso', 'rear']: (dog_dir / part).mkdir(exist_ok=True) images = self.get_dog_images(dog_id, validated_only=validated_only) for idx, img_data in enumerate(images): full_path = dog_dir / 'full' / f"img_{idx:04d}.jpg" cv2.imwrite(str(full_path), img_data['image']) parts = self.get_body_parts(dog_id, validated_only=validated_only) part_paths = {} for part_data in parts: if part_data['image_id'] == img_data['image_id']: part_type = part_data['part_type'] part_path = dog_dir / part_type / f"img_{idx:04d}.jpg" cv2.imwrite(str(part_path), part_data['image']) part_paths[part_type] = str(part_path.relative_to(output_path)) dataset_entry = { 'dog_id': dog_id, 'full_image': str(full_path.relative_to(output_path)), 'bbox': img_data['bbox'], 'confidence': img_data['confidence'] } for part_type in ['head', 'torso', 'rear']: dataset_entry[f'{part_type}_image'] = part_paths.get(part_type, None) dataset.append(dataset_entry) dataset_df = pd.DataFrame(dataset) dataset_df.to_csv(output_path / "dataset.csv", index=False) metadata = { 'total_dogs': len(dogs), 'total_images': len(dataset), 'export_date': datetime.now().isoformat(), 'validated_only': validated_only, 'includes_body_parts': True } with open(output_path / "metadata.json", 'w') as f: json.dump(metadata, f, indent=2) from sklearn.model_selection import train_test_split train_df, test_df = train_test_split(dataset_df, test_size=0.2, stratify=dataset_df['dog_id']) train_df.to_csv(output_path / "train.csv", index=False) test_df.to_csv(output_path / "test.csv", index=False) metadata['train_samples'] = len(train_df) metadata['test_samples'] = len(test_df) return metadata # ========== Cleanup Methods ========== def reset_database(self, confirm: bool = False): """Reset entire database""" if not confirm: return False tables = ['sightings', 'dog_images', 'dog_features', 'dogs', 'sessions'] for table in tables: self.cursor.execute(f"DELETE FROM {table}") self.cursor.execute("DELETE FROM sqlite_sequence") self.conn.commit() return True def vacuum(self): """Optimize database file size""" self.conn.execute("VACUUM") def close(self): """Close database connection""" self.conn.close() def __del__(self): """Ensure connection is closed""" if hasattr(self, 'conn'): self.conn.close() # ========== Health Methods (NEW) ========== def save_health_assessment(self, dog_id: int, health_score: float, status: str, posture_score: float = None, gait_score: float = None, body_condition_score: float = None, activity_score: float = None, alerts: List[str] = None, recommendations: List[str] = None, confidence: float = 0.5, video_source: str = None, frame_number: int = None): """Save health assessment to database""" self.cursor.execute(""" INSERT INTO health_assessments (dog_id, health_score, status, posture_score, gait_score, body_condition_score, activity_score, alerts, recommendations, confidence, video_source, frame_number) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (dog_id, health_score, status, posture_score, gait_score, body_condition_score, activity_score, json.dumps(alerts) if alerts else None, json.dumps(recommendations) if recommendations else None, confidence, video_source, frame_number)) self.cursor.execute(""" UPDATE dogs SET last_health_score = ?, health_status = ? WHERE dog_id = ? """, (health_score, status, dog_id)) self.conn.commit() def get_health_history(self, dog_id: int, limit: int = 50) -> List[Dict]: """Get health assessment history for a dog""" self.cursor.execute(""" SELECT * FROM health_assessments WHERE dog_id = ? ORDER BY timestamp DESC LIMIT ? """, (dog_id, limit)) assessments = [] for row in self.cursor.fetchall(): assessments.append({ 'timestamp': row['timestamp'], 'health_score': row['health_score'], 'status': row['status'], 'posture_score': row['posture_score'], 'gait_score': row['gait_score'], 'body_condition_score': row['body_condition_score'], 'activity_score': row['activity_score'], 'alerts': json.loads(row['alerts']) if row['alerts'] else [], 'recommendations': json.loads(row['recommendations']) if row['recommendations'] else [], 'confidence': row['confidence'] }) return assessments def get_health_statistics(self) -> Dict: """Get overall health statistics""" stats = {} self.cursor.execute(""" SELECT AVG(last_health_score) as avg_health, COUNT(CASE WHEN last_health_score >= 8 THEN 1 END) as healthy_count, COUNT(CASE WHEN last_health_score < 6 THEN 1 END) as unhealthy_count, COUNT(*) as total_dogs FROM dogs WHERE status = 'active' """) row = self.cursor.fetchone() if row: stats['average_health'] = round(row['avg_health'] or 5.0, 1) stats['healthy_dogs'] = row['healthy_count'] or 0 stats['unhealthy_dogs'] = row['unhealthy_count'] or 0 stats['total_dogs'] = row['total_dogs'] or 0 self.cursor.execute(""" SELECT dog_id, name, last_health_score, health_status FROM dogs WHERE status = 'active' AND last_health_score < 6 ORDER BY last_health_score ASC LIMIT 10 """) stats['dogs_needing_attention'] = [] for row in self.cursor.fetchall(): stats['dogs_needing_attention'].append({ 'dog_id': row['dog_id'], 'name': row['name'] or f"Dog #{row['dog_id']}", 'health_score': row['last_health_score'], 'status': row['health_status'] }) return stats def save_pose_keypoints(self, dog_id: int, keypoints: np.ndarray, frame_number: int, video_source: str): """Save pose keypoints for a dog""" keypoints_json = json.dumps(keypoints.tolist()) if keypoints is not None else None self.cursor.execute(""" UPDATE sightings SET pose_keypoints = ? WHERE dog_id = ? AND frame_number = ? AND video_source = ? """, (keypoints_json, dog_id, frame_number, video_source)) self.conn.commit() def export_training_dataset(self, output_dir: str = "training_dataset") -> Dict: """ Export database images to folder structure for fine-tuning Output structure: training_dataset/ ├── dog_1/ │ ├── img_0000.jpg │ └── ... ├── dog_2/ └── metadata.json Returns: dict with export statistics """ from pathlib import Path import json from datetime import datetime output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) # Get all active dogs dogs_df = self.get_all_dogs(active_only=True) if dogs_df.empty: return { 'success': False, 'message': 'No dogs in database', 'total_dogs': 0, 'total_images': 0 } total_images = 0 export_info = [] for _, dog in dogs_df.iterrows(): dog_id = dog['dog_id'] dog_name = dog['name'] or f"dog_{dog_id}" # Sanitize folder name (remove special characters) safe_name = "".join(c for c in dog_name if c.isalnum() or c in ('_', '-')) # Create folder for this dog dog_folder = output_path / safe_name dog_folder.mkdir(exist_ok=True) # Get all non-discarded images images = self.get_dog_images( dog_id=dog_id, validated_only=False, include_discarded=False ) if not images: print(f"Warning: Dog {dog_id} has no images") continue # Save each image as JPG for idx, img_data in enumerate(images): image = img_data['image'] # OpenCV array from BLOB filename = f"img_{idx:04d}.jpg" filepath = dog_folder / filename # Write to disk success = cv2.imwrite(str(filepath), image) if not success: print(f"Warning: Failed to write {filepath}") total_images += len(images) export_info.append({ 'dog_id': dog_id, 'name': dog_name, 'folder_name': safe_name, 'image_count': len(images), 'folder': str(dog_folder) }) print(f"Exported {len(images)} images for {dog_name}") # Create metadata file metadata = { 'export_date': datetime.now().isoformat(), 'total_dogs': len(dogs_df), 'total_images': total_images, 'output_directory': str(output_path), 'dogs': export_info } metadata_path = output_path / 'metadata.json' with open(metadata_path, 'w') as f: json.dump(metadata, f, indent=2) print(f"\nExport complete: {len(dogs_df)} dogs, {total_images} images") print(f"Location: {output_path}") return { 'success': True, 'total_dogs': len(dogs_df), 'total_images': total_images, 'output_path': str(output_path), 'dogs': export_info }