Spaces:
Sleeping
Sleeping
| """ | |
| database.py - SQLite Database Manager for Dog Monitoring System | |
| Handles persistent storage of dog data, features, and annotations | |
| """ | |
| import sqlite3 | |
| import json | |
| import pickle | |
| import base64 | |
| import numpy as np | |
| import cv2 | |
| from datetime import datetime | |
| from typing import List, Dict, Optional, Tuple, Any | |
| from pathlib import Path | |
| import pandas as pd | |
| class DogDatabase: | |
| """SQLite database manager for dog monitoring system""" | |
| def __init__(self, db_path: str = "dog_monitoring.db"): | |
| """Initialize database connection and create tables""" | |
| self.db_path = db_path | |
| self.conn = sqlite3.connect(db_path, check_same_thread=False) | |
| self.conn.row_factory = sqlite3.Row | |
| self.cursor = self.conn.cursor() | |
| # Create tables if they don't exist | |
| self._create_tables() | |
| def _create_tables(self): | |
| """Create all necessary database tables""" | |
| # Dogs table - main registry | |
| self.cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS dogs ( | |
| dog_id INTEGER PRIMARY KEY, | |
| name TEXT, | |
| first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| total_sightings INTEGER DEFAULT 1, | |
| notes TEXT, | |
| merged_from TEXT, -- JSON list of merged dog IDs | |
| status TEXT DEFAULT 'active' -- active, merged, deleted | |
| ) | |
| """) | |
| # Dog features table - stores extracted features | |
| self.cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS dog_features ( | |
| feature_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| dog_id INTEGER, | |
| resnet_features BLOB, -- Pickled numpy array | |
| color_histogram BLOB, -- Pickled numpy array | |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| confidence REAL, | |
| FOREIGN KEY (dog_id) REFERENCES dogs(dog_id) | |
| ) | |
| """) | |
| # Dog images table - stores actual images | |
| self.cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS dog_images ( | |
| image_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| dog_id INTEGER, | |
| image_data BLOB, -- Base64 encoded image | |
| thumbnail BLOB, -- Small preview | |
| width INTEGER, | |
| height INTEGER, | |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| frame_number INTEGER, | |
| video_source TEXT, | |
| bbox TEXT, -- JSON [x1, y1, x2, y2] | |
| confidence REAL, | |
| pose_keypoints TEXT, -- JSON of keypoints | |
| is_validated BOOLEAN DEFAULT 0, | |
| is_discarded BOOLEAN DEFAULT 0, | |
| FOREIGN KEY (dog_id) REFERENCES dogs(dog_id) | |
| ) | |
| """) | |
| # Body parts table - stores cropped body parts | |
| self.cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS body_parts ( | |
| part_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| dog_id INTEGER, | |
| image_id INTEGER, | |
| part_type TEXT, -- 'head', 'torso', 'rear' | |
| part_image BLOB, -- Base64 encoded crop | |
| crop_bbox TEXT, -- JSON [x1, y1, x2, y2] relative to full image | |
| confidence REAL, | |
| is_validated BOOLEAN DEFAULT 0, | |
| is_discarded BOOLEAN DEFAULT 0, | |
| FOREIGN KEY (dog_id) REFERENCES dogs(dog_id), | |
| FOREIGN KEY (image_id) REFERENCES dog_images(image_id) | |
| ) | |
| """) | |
| # Sightings table - tracks when/where dogs were seen | |
| self.cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS sightings ( | |
| sighting_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| dog_id INTEGER, | |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| position_x REAL, | |
| position_y REAL, | |
| video_source TEXT, | |
| frame_number INTEGER, | |
| confidence REAL, | |
| FOREIGN KEY (dog_id) REFERENCES dogs(dog_id) | |
| ) | |
| """) | |
| # Processing sessions table | |
| self.cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS sessions ( | |
| session_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| end_time TIMESTAMP, | |
| video_path TEXT, | |
| total_frames INTEGER, | |
| dogs_detected INTEGER, | |
| settings TEXT -- JSON of processing settings | |
| ) | |
| """) | |
| # Create indexes for performance | |
| self.cursor.execute("CREATE INDEX IF NOT EXISTS idx_dog_features ON dog_features(dog_id)") | |
| self.cursor.execute("CREATE INDEX IF NOT EXISTS idx_dog_images ON dog_images(dog_id)") | |
| self.cursor.execute("CREATE INDEX IF NOT EXISTS idx_sightings ON sightings(dog_id)") | |
| self.conn.commit() | |
| # ========== Dog Management ========== | |
| def add_dog(self, dog_id: Optional[int] = None, name: Optional[str] = None) -> int: | |
| """Add a new dog to the database""" | |
| if dog_id: | |
| self.cursor.execute( | |
| "INSERT OR IGNORE INTO dogs (dog_id, name) VALUES (?, ?)", | |
| (dog_id, name) | |
| ) | |
| else: | |
| self.cursor.execute( | |
| "INSERT INTO dogs (name) VALUES (?)", | |
| (name,) | |
| ) | |
| dog_id = self.cursor.lastrowid | |
| self.conn.commit() | |
| return dog_id | |
| def update_dog_sighting(self, dog_id: int): | |
| """Update last seen time and increment sighting count""" | |
| self.cursor.execute(""" | |
| UPDATE dogs | |
| SET last_seen = CURRENT_TIMESTAMP, | |
| total_sightings = total_sightings + 1 | |
| WHERE dog_id = ? | |
| """, (dog_id,)) | |
| self.conn.commit() | |
| def merge_dogs(self, keep_id: int, merge_id: int) -> bool: | |
| """Merge two dogs, keeping keep_id""" | |
| try: | |
| # Update all references | |
| self.cursor.execute("UPDATE dog_features SET dog_id = ? WHERE dog_id = ?", | |
| (keep_id, merge_id)) | |
| self.cursor.execute("UPDATE dog_images SET dog_id = ? WHERE dog_id = ?", | |
| (keep_id, merge_id)) | |
| self.cursor.execute("UPDATE sightings SET dog_id = ? WHERE dog_id = ?", | |
| (keep_id, merge_id)) | |
| # Get merged_from history | |
| self.cursor.execute("SELECT merged_from FROM dogs WHERE dog_id = ?", (merge_id,)) | |
| row = self.cursor.fetchone() | |
| merged_history = json.loads(row['merged_from'] if row and row['merged_from'] else '[]') | |
| merged_history.append(merge_id) | |
| # Update keep_id dog with merge history | |
| self.cursor.execute(""" | |
| UPDATE dogs | |
| SET merged_from = ?, | |
| total_sightings = total_sightings + ( | |
| SELECT total_sightings FROM dogs WHERE dog_id = ? | |
| ) | |
| WHERE dog_id = ? | |
| """, (json.dumps(merged_history), merge_id, keep_id)) | |
| # Mark merge_id as merged | |
| self.cursor.execute( | |
| "UPDATE dogs SET status = 'merged' WHERE dog_id = ?", | |
| (merge_id,) | |
| ) | |
| self.conn.commit() | |
| return True | |
| except Exception as e: | |
| print(f"Error merging dogs: {e}") | |
| self.conn.rollback() | |
| return False | |
| def delete_dog(self, dog_id: int, hard_delete: bool = False): | |
| """Delete or mark dog as deleted""" | |
| if hard_delete: | |
| # Hard delete - remove all data | |
| self.cursor.execute("DELETE FROM dog_features WHERE dog_id = ?", (dog_id,)) | |
| self.cursor.execute("DELETE FROM dog_images WHERE dog_id = ?", (dog_id,)) | |
| self.cursor.execute("DELETE FROM sightings WHERE dog_id = ?", (dog_id,)) | |
| self.cursor.execute("DELETE FROM dogs WHERE dog_id = ?", (dog_id,)) | |
| else: | |
| # Soft delete - mark as deleted | |
| self.cursor.execute( | |
| "UPDATE dogs SET status = 'deleted' WHERE dog_id = ?", | |
| (dog_id,) | |
| ) | |
| self.conn.commit() | |
| # ========== Features Management ========== | |
| def save_features(self, dog_id: int, resnet_features: np.ndarray, | |
| color_histogram: np.ndarray, confidence: float): | |
| """Save dog features to database""" | |
| resnet_blob = pickle.dumps(resnet_features) | |
| color_blob = pickle.dumps(color_histogram) | |
| self.cursor.execute(""" | |
| INSERT INTO dog_features | |
| (dog_id, resnet_features, color_histogram, confidence) | |
| VALUES (?, ?, ?, ?) | |
| """, (dog_id, resnet_blob, color_blob, confidence)) | |
| self.conn.commit() | |
| def get_features(self, dog_id: int, limit: int = 20) -> List[Dict]: | |
| """Get recent features for a dog""" | |
| self.cursor.execute(""" | |
| SELECT * FROM dog_features | |
| WHERE dog_id = ? | |
| ORDER BY timestamp DESC | |
| LIMIT ? | |
| """, (dog_id, limit)) | |
| features = [] | |
| for row in self.cursor.fetchall(): | |
| features.append({ | |
| 'resnet_features': pickle.loads(row['resnet_features']), | |
| 'color_histogram': pickle.loads(row['color_histogram']), | |
| 'confidence': row['confidence'], | |
| 'timestamp': row['timestamp'] | |
| }) | |
| return features | |
| # ========== Images Management ========== | |
| def save_image(self, dog_id: int, image: np.ndarray, | |
| frame_number: int, video_source: str, | |
| bbox: List[float], confidence: float, | |
| pose_keypoints: Optional[List] = None): | |
| """Save dog image to database""" | |
| # Encode image as JPEG | |
| _, buffer = cv2.imencode('.jpg', image) | |
| image_data = base64.b64encode(buffer).decode('utf-8') | |
| # Create thumbnail | |
| thumbnail = cv2.resize(image, (128, 128)) | |
| _, thumb_buffer = cv2.imencode('.jpg', thumbnail, [cv2.IMWRITE_JPEG_QUALITY, 70]) | |
| thumb_data = base64.b64encode(thumb_buffer).decode('utf-8') | |
| h, w = image.shape[:2] | |
| self.cursor.execute(""" | |
| INSERT INTO dog_images | |
| (dog_id, image_data, thumbnail, width, height, | |
| frame_number, video_source, bbox, confidence, pose_keypoints) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, (dog_id, image_data, thumb_data, w, h, | |
| frame_number, video_source, json.dumps(bbox), | |
| confidence, json.dumps(pose_keypoints) if pose_keypoints else None)) | |
| self.conn.commit() | |
| return self.cursor.lastrowid # THIS LINE - make sure it returns the image_id | |
| def get_dog_images(self, dog_id: int, validated_only: bool = False, | |
| include_discarded: bool = False) -> List[Dict]: | |
| """Get all images for a dog""" | |
| query = "SELECT * FROM dog_images WHERE dog_id = ?" | |
| params = [dog_id] | |
| if validated_only: | |
| query += " AND is_validated = 1" | |
| if not include_discarded: | |
| query += " AND is_discarded = 0" | |
| query += " ORDER BY timestamp DESC" | |
| self.cursor.execute(query, params) | |
| rows = self.cursor.fetchall() | |
| images = [] | |
| for row in rows: | |
| # Decode image | |
| image_bytes = base64.b64decode(row['image_data']) | |
| nparr = np.frombuffer(image_bytes, np.uint8) | |
| image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| images.append({ | |
| 'image_id': row['image_id'], | |
| 'image': image, | |
| 'thumbnail': row['thumbnail'], | |
| 'bbox': json.loads(row['bbox']), | |
| 'confidence': row['confidence'], | |
| 'frame_number': row['frame_number'], | |
| 'video_source': row['video_source'], | |
| 'is_validated': row['is_validated'], | |
| 'is_discarded': row['is_discarded'], | |
| 'pose_keypoints': json.loads(row['pose_keypoints']) if row['pose_keypoints'] else None | |
| }) | |
| return images | |
| def validate_image(self, image_id: int, is_valid: bool = True): | |
| """Mark image as validated or discarded""" | |
| if is_valid: | |
| self.cursor.execute( | |
| "UPDATE dog_images SET is_validated = 1 WHERE image_id = ?", | |
| (image_id,) | |
| ) | |
| else: | |
| self.cursor.execute( | |
| "UPDATE dog_images SET is_discarded = 1 WHERE image_id = ?", | |
| (image_id,) | |
| ) | |
| self.conn.commit() | |
| # ========== Body Parts Management ========== | |
| def save_body_parts(self, dog_id: int, image_id: int, | |
| head_crop: Optional[np.ndarray], | |
| torso_crop: Optional[np.ndarray], | |
| rear_crop: Optional[np.ndarray], | |
| confidences: Dict[str, float]): | |
| """Save body part crops to database""" | |
| parts = { | |
| 'head': head_crop, | |
| 'torso': torso_crop, | |
| 'rear': rear_crop | |
| } | |
| for part_type, crop in parts.items(): | |
| if crop is not None: | |
| # Encode crop as JPEG | |
| _, buffer = cv2.imencode('.jpg', crop) | |
| crop_data = base64.b64encode(buffer).decode('utf-8') | |
| confidence = confidences.get(part_type, 0.0) | |
| self.cursor.execute(""" | |
| INSERT INTO body_parts | |
| (dog_id, image_id, part_type, part_image, confidence) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, (dog_id, image_id, part_type, crop_data, confidence)) | |
| self.conn.commit() | |
| def get_body_parts(self, dog_id: int, part_type: Optional[str] = None, | |
| validated_only: bool = False, include_discarded: bool = False) -> List[Dict]: | |
| """Get body part crops for a dog""" | |
| query = "SELECT * FROM body_parts WHERE dog_id = ?" | |
| params = [dog_id] | |
| if part_type: | |
| query += " AND part_type = ?" | |
| params.append(part_type) | |
| if validated_only: | |
| query += " AND is_validated = 1" | |
| if not include_discarded: | |
| query += " AND is_discarded = 0" | |
| self.cursor.execute(query, params) | |
| parts = [] | |
| for row in self.cursor.fetchall(): | |
| # Decode image | |
| image_bytes = base64.b64decode(row['part_image']) | |
| nparr = np.frombuffer(image_bytes, np.uint8) | |
| image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| parts.append({ | |
| 'part_id': row['part_id'], | |
| 'part_type': row['part_type'], | |
| 'image': image, | |
| 'confidence': row['confidence'], | |
| 'is_validated': row['is_validated'], | |
| 'image_id': row['image_id'] | |
| }) | |
| return parts | |
| def validate_body_part(self, part_id: int, is_valid: bool = True): | |
| """Mark body part as validated or discarded""" | |
| if is_valid: | |
| self.cursor.execute( | |
| "UPDATE body_parts SET is_validated = 1 WHERE part_id = ?", | |
| (part_id,) | |
| ) | |
| else: | |
| self.cursor.execute( | |
| "UPDATE body_parts SET is_discarded = 1 WHERE part_id = ?", | |
| (part_id,) | |
| ) | |
| self.conn.commit() | |
| def add_sighting(self, dog_id: int, position: Tuple[float, float], | |
| video_source: str, frame_number: int, confidence: float): | |
| """Record a dog sighting""" | |
| self.cursor.execute(""" | |
| INSERT INTO sightings | |
| (dog_id, position_x, position_y, video_source, frame_number, confidence) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| """, (dog_id, position[0], position[1], video_source, frame_number, confidence)) | |
| self.conn.commit() | |
| # ========== Query Methods ========== | |
| def get_all_dogs(self, active_only: bool = True) -> pd.DataFrame: | |
| """Get all dogs as DataFrame""" | |
| query = "SELECT * FROM dogs" | |
| if active_only: | |
| query += " WHERE status = 'active'" | |
| query += " ORDER BY dog_id" | |
| return pd.read_sql_query(query, self.conn) | |
| def get_dog_statistics(self) -> Dict: | |
| """Get overall statistics""" | |
| stats = {} | |
| # Total dogs | |
| self.cursor.execute("SELECT COUNT(*) FROM dogs WHERE status = 'active'") | |
| stats['total_active_dogs'] = self.cursor.fetchone()[0] | |
| # Total images | |
| self.cursor.execute("SELECT COUNT(*) FROM dog_images WHERE is_discarded = 0") | |
| stats['total_images'] = self.cursor.fetchone()[0] | |
| # Validated images | |
| self.cursor.execute("SELECT COUNT(*) FROM dog_images WHERE is_validated = 1") | |
| stats['validated_images'] = self.cursor.fetchone()[0] | |
| # Total sightings | |
| self.cursor.execute("SELECT COUNT(*) FROM sightings") | |
| stats['total_sightings'] = self.cursor.fetchone()[0] | |
| # Most seen dog | |
| self.cursor.execute(""" | |
| SELECT d.dog_id, d.name, d.total_sightings | |
| FROM dogs d | |
| WHERE d.status = 'active' | |
| ORDER BY d.total_sightings DESC | |
| LIMIT 1 | |
| """) | |
| row = self.cursor.fetchone() | |
| if row: | |
| stats['most_seen_dog'] = { | |
| 'dog_id': row[0], | |
| 'name': row[1] or f"Dog #{row[0]}", | |
| 'sightings': row[2] | |
| } | |
| return stats | |
| # ========== Export Methods ========== | |
| def export_training_dataset(self, output_dir: str, validated_only: bool = True) -> Dict: | |
| """Export dataset with body parts for fine-tuning""" | |
| output_path = Path(output_dir) | |
| output_path.mkdir(parents=True, exist_ok=True) | |
| # Create directories | |
| images_dir = output_path / "images" | |
| images_dir.mkdir(exist_ok=True) | |
| # Export data | |
| dataset = [] | |
| dogs = self.get_all_dogs() | |
| for _, dog in dogs.iterrows(): | |
| dog_id = dog['dog_id'] | |
| # Create directories for each dog | |
| dog_dir = images_dir / f"dog_{dog_id}" | |
| dog_dir.mkdir(exist_ok=True) | |
| # Subdirectories for body parts | |
| for part in ['full', 'head', 'torso', 'rear']: | |
| part_dir = dog_dir / part | |
| part_dir.mkdir(exist_ok=True) | |
| # Get full images | |
| images = self.get_dog_images(dog_id, validated_only=validated_only) | |
| for idx, img_data in enumerate(images): | |
| # Save full image | |
| full_path = dog_dir / 'full' / f"img_{idx:04d}.jpg" | |
| cv2.imwrite(str(full_path), img_data['image']) | |
| # Get and save body parts for this image | |
| parts = self.get_body_parts(dog_id, validated_only=validated_only) | |
| part_paths = {} | |
| for part_data in parts: | |
| if part_data['image_id'] == img_data['image_id']: | |
| part_type = part_data['part_type'] | |
| part_path = dog_dir / part_type / f"img_{idx:04d}.jpg" | |
| cv2.imwrite(str(part_path), part_data['image']) | |
| part_paths[part_type] = str(part_path.relative_to(output_path)) | |
| # Add to dataset | |
| dataset_entry = { | |
| 'dog_id': dog_id, | |
| 'full_image': str(full_path.relative_to(output_path)), | |
| 'bbox': img_data['bbox'], | |
| 'confidence': img_data['confidence'] | |
| } | |
| # Add body part paths if available | |
| for part_type in ['head', 'torso', 'rear']: | |
| dataset_entry[f'{part_type}_image'] = part_paths.get(part_type, None) | |
| dataset.append(dataset_entry) | |
| # Save dataset info | |
| dataset_df = pd.DataFrame(dataset) | |
| dataset_df.to_csv(output_path / "dataset.csv", index=False) | |
| # Save metadata | |
| metadata = { | |
| 'total_dogs': len(dogs), | |
| 'total_images': len(dataset), | |
| 'export_date': datetime.now().isoformat(), | |
| 'validated_only': validated_only, | |
| 'includes_body_parts': True | |
| } | |
| with open(output_path / "metadata.json", 'w') as f: | |
| json.dump(metadata, f, indent=2) | |
| # Create training splits | |
| from sklearn.model_selection import train_test_split | |
| train_df, test_df = train_test_split(dataset_df, test_size=0.2, | |
| stratify=dataset_df['dog_id']) | |
| train_df.to_csv(output_path / "train.csv", index=False) | |
| test_df.to_csv(output_path / "test.csv", index=False) | |
| metadata['train_samples'] = len(train_df) | |
| metadata['test_samples'] = len(test_df) | |
| return metadata | |
| # ========== Cleanup Methods ========== | |
| def reset_database(self, confirm: bool = False): | |
| """Reset entire database""" | |
| if not confirm: | |
| return False | |
| tables = ['sightings', 'dog_images', 'dog_features', 'dogs', 'sessions'] | |
| for table in tables: | |
| self.cursor.execute(f"DELETE FROM {table}") | |
| # Reset autoincrement | |
| self.cursor.execute("DELETE FROM sqlite_sequence") | |
| self.conn.commit() | |
| return True | |
| def vacuum(self): | |
| """Optimize database file size""" | |
| self.conn.execute("VACUUM") | |
| def close(self): | |
| """Close database connection""" | |
| self.conn.close() | |
| def __del__(self): | |
| """Ensure connection is closed""" | |
| if hasattr(self, 'conn'): | |
| self.conn.close() |