Spaces:
Sleeping
Sleeping
| """ | |
| database.py - SQLite Database Manager for Dog Monitoring System | |
| Handles persistent storage of dog data, features, and annotations | |
| """ | |
| import sqlite3 | |
| import json | |
| import pickle | |
| import base64 | |
| import numpy as np | |
| import cv2 | |
| from datetime import datetime | |
| from typing import List, Dict, Optional, Tuple, Any | |
| from pathlib import Path | |
| import pandas as pd | |
| class DogDatabase: | |
| """SQLite database manager for dog monitoring system""" | |
| def __init__(self, db_path: str = "dog_monitoring.db"): | |
| """Initialize database connection and create tables""" | |
| self.db_path = db_path | |
| self.conn = sqlite3.connect(db_path, check_same_thread=False) | |
| self.conn.row_factory = sqlite3.Row | |
| self.cursor = self.conn.cursor() | |
| # Create tables if they don't exist | |
| self._create_tables() | |
| def _create_tables(self): | |
| """Create all necessary database tables""" | |
| # Dogs table - main registry | |
| self.cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS dogs ( | |
| dog_id INTEGER PRIMARY KEY, | |
| name TEXT, | |
| first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| total_sightings INTEGER DEFAULT 1, | |
| notes TEXT, | |
| merged_from TEXT, | |
| status TEXT DEFAULT 'active' | |
| ) | |
| """) | |
| # Dog features table | |
| self.cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS dog_features ( | |
| feature_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| dog_id INTEGER, | |
| resnet_features BLOB, | |
| color_histogram BLOB, | |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| confidence REAL, | |
| FOREIGN KEY (dog_id) REFERENCES dogs(dog_id) | |
| ) | |
| """) | |
| # Dog images table | |
| self.cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS dog_images ( | |
| image_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| dog_id INTEGER, | |
| image_data BLOB, | |
| thumbnail BLOB, | |
| width INTEGER, | |
| height INTEGER, | |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| frame_number INTEGER, | |
| video_source TEXT, | |
| bbox TEXT, | |
| confidence REAL, | |
| pose_keypoints TEXT, | |
| is_validated BOOLEAN DEFAULT 0, | |
| is_discarded BOOLEAN DEFAULT 0, | |
| FOREIGN KEY (dog_id) REFERENCES dogs(dog_id) | |
| ) | |
| """) | |
| # Body parts table | |
| self.cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS body_parts ( | |
| part_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| dog_id INTEGER, | |
| image_id INTEGER, | |
| part_type TEXT, | |
| part_image BLOB, | |
| crop_bbox TEXT, | |
| confidence REAL, | |
| is_validated BOOLEAN DEFAULT 0, | |
| is_discarded BOOLEAN DEFAULT 0, | |
| FOREIGN KEY (dog_id) REFERENCES dogs(dog_id), | |
| FOREIGN KEY (image_id) REFERENCES dog_images(image_id) | |
| ) | |
| """) | |
| # Sightings table | |
| self.cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS sightings ( | |
| sighting_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| dog_id INTEGER, | |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| position_x REAL, | |
| position_y REAL, | |
| video_source TEXT, | |
| frame_number INTEGER, | |
| confidence REAL, | |
| FOREIGN KEY (dog_id) REFERENCES dogs(dog_id) | |
| ) | |
| """) | |
| # Processing sessions table | |
| self.cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS sessions ( | |
| session_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| end_time TIMESTAMP, | |
| video_path TEXT, | |
| total_frames INTEGER, | |
| dogs_detected INTEGER, | |
| settings TEXT | |
| ) | |
| """) | |
| # Create indexes | |
| self.cursor.execute("CREATE INDEX IF NOT EXISTS idx_dog_features ON dog_features(dog_id)") | |
| self.cursor.execute("CREATE INDEX IF NOT EXISTS idx_dog_images ON dog_images(dog_id)") | |
| self.cursor.execute("CREATE INDEX IF NOT EXISTS idx_sightings ON sightings(dog_id)") | |
| self.conn.commit() | |
| # ========== Dog Management ========== | |
| def add_dog(self, name: str = None) -> int: | |
| """Add new dog to database and return dog_id""" | |
| cursor = self.conn.cursor() | |
| cursor.execute(''' | |
| INSERT INTO dogs (name, first_seen, status) | |
| VALUES (?, datetime('now'), 'active') | |
| ''', (name,)) | |
| self.conn.commit() | |
| return cursor.lastrowid | |
| def update_dog_sighting(self, dog_id: int): | |
| """Update last seen time and increment sighting count""" | |
| self.cursor.execute(""" | |
| UPDATE dogs | |
| SET last_seen = CURRENT_TIMESTAMP, | |
| total_sightings = total_sightings + 1 | |
| WHERE dog_id = ? | |
| """, (dog_id,)) | |
| self.conn.commit() | |
| def export_training_dataset(self, output_dir: str = "training_dataset") -> Dict: | |
| """ | |
| Export database images to folder structure for fine-tuning | |
| Output structure: | |
| training_dataset/ | |
| βββ dog_1/ | |
| β βββ img_0000.jpg | |
| β βββ ... | |
| βββ dog_2/ | |
| βββ metadata.json | |
| Returns: | |
| dict with export statistics | |
| """ | |
| from pathlib import Path | |
| import json | |
| from datetime import datetime | |
| output_path = Path(output_dir) | |
| output_path.mkdir(parents=True, exist_ok=True) | |
| # Get all active dogs | |
| dogs_df = self.get_all_dogs(active_only=True) | |
| if dogs_df.empty: | |
| return { | |
| 'success': False, | |
| 'message': 'No dogs in database', | |
| 'total_dogs': 0, | |
| 'total_images': 0 | |
| } | |
| total_images = 0 | |
| export_info = [] | |
| for _, dog in dogs_df.iterrows(): | |
| dog_id = dog['dog_id'] | |
| dog_name = dog['name'] or f"dog_{dog_id}" | |
| # Create folder for this dog | |
| dog_folder = output_path / dog_name | |
| dog_folder.mkdir(exist_ok=True) | |
| # Get all non-discarded images | |
| images = self.get_dog_images( | |
| dog_id=dog_id, | |
| validated_only=False, | |
| include_discarded=False | |
| ) | |
| if not images: | |
| print(f"Warning: Dog {dog_id} has no images") | |
| continue | |
| # Save each image as JPG | |
| for idx, img_data in enumerate(images): | |
| image = img_data['image'] # OpenCV array from BLOB | |
| filename = f"img_{idx:04d}.jpg" | |
| filepath = dog_folder / filename | |
| # Write to disk | |
| cv2.imwrite(str(filepath), image) | |
| total_images += len(images) | |
| export_info.append({ | |
| 'dog_id': dog_id, | |
| 'name': dog_name, | |
| 'image_count': len(images), | |
| 'folder': str(dog_folder) | |
| }) | |
| print(f"Exported {len(images)} images for {dog_name}") | |
| # Create metadata file | |
| metadata = { | |
| 'export_date': datetime.now().isoformat(), | |
| 'total_dogs': len(dogs_df), | |
| 'total_images': total_images, | |
| 'output_directory': str(output_path), | |
| 'dogs': export_info | |
| } | |
| metadata_path = output_path / 'metadata.json' | |
| with open(metadata_path, 'w') as f: | |
| json.dump(metadata, f, indent=2) | |
| print(f"\nExport complete: {len(dogs_df)} dogs, {total_images} images") | |
| print(f"Location: {output_path}") | |
| return { | |
| 'success': True, | |
| 'total_dogs': len(dogs_df), | |
| 'total_images': total_images, | |
| 'output_path': str(output_path), | |
| 'dogs': export_info | |
| } | |
| def merge_dogs(self, keep_id: int, merge_id: int) -> bool: | |
| """Merge two dogs, keeping keep_id""" | |
| try: | |
| self.cursor.execute("UPDATE dog_features SET dog_id = ? WHERE dog_id = ?", (keep_id, merge_id)) | |
| self.cursor.execute("UPDATE dog_images SET dog_id = ? WHERE dog_id = ?", (keep_id, merge_id)) | |
| self.cursor.execute("UPDATE sightings SET dog_id = ? WHERE dog_id = ?", (keep_id, merge_id)) | |
| self.cursor.execute("SELECT merged_from FROM dogs WHERE dog_id = ?", (merge_id,)) | |
| row = self.cursor.fetchone() | |
| merged_history = json.loads(row['merged_from'] if row and row['merged_from'] else '[]') | |
| merged_history.append(merge_id) | |
| self.cursor.execute(""" | |
| UPDATE dogs | |
| SET merged_from = ?, | |
| total_sightings = total_sightings + ( | |
| SELECT total_sightings FROM dogs WHERE dog_id = ? | |
| ) | |
| WHERE dog_id = ? | |
| """, (json.dumps(merged_history), merge_id, keep_id)) | |
| self.cursor.execute("UPDATE dogs SET status = 'merged' WHERE dog_id = ?", (merge_id,)) | |
| self.conn.commit() | |
| return True | |
| except Exception as e: | |
| print(f"Error merging dogs: {e}") | |
| self.conn.rollback() | |
| return False | |
| def delete_dog(self, dog_id: int, hard_delete: bool = False): | |
| """Delete or mark dog as deleted""" | |
| if hard_delete: | |
| self.cursor.execute("DELETE FROM dog_features WHERE dog_id = ?", (dog_id,)) | |
| self.cursor.execute("DELETE FROM dog_images WHERE dog_id = ?", (dog_id,)) | |
| self.cursor.execute("DELETE FROM sightings WHERE dog_id = ?", (dog_id,)) | |
| self.cursor.execute("DELETE FROM dogs WHERE dog_id = ?", (dog_id,)) | |
| else: | |
| self.cursor.execute("UPDATE dogs SET status = 'deleted' WHERE dog_id = ?", (dog_id,)) | |
| self.conn.commit() | |
| # ========== Features Management ========== | |
| def save_features(self, dog_id: int, resnet_features: np.ndarray, | |
| color_histogram: np.ndarray, confidence: float): | |
| """Save dog features to database""" | |
| resnet_blob = pickle.dumps(resnet_features) | |
| color_blob = pickle.dumps(color_histogram) | |
| self.cursor.execute(""" | |
| INSERT INTO dog_features | |
| (dog_id, resnet_features, color_histogram, confidence) | |
| VALUES (?, ?, ?, ?) | |
| """, (dog_id, resnet_blob, color_blob, confidence)) | |
| self.conn.commit() | |
| def get_features(self, dog_id: int, limit: int = 20) -> List[Dict]: | |
| """Get recent features for a dog""" | |
| self.cursor.execute(""" | |
| SELECT * FROM dog_features | |
| WHERE dog_id = ? | |
| ORDER BY timestamp DESC | |
| LIMIT ? | |
| """, (dog_id, limit)) | |
| features = [] | |
| for row in self.cursor.fetchall(): | |
| features.append({ | |
| 'resnet_features': pickle.loads(row['resnet_features']), | |
| 'color_histogram': pickle.loads(row['color_histogram']), | |
| 'confidence': row['confidence'], | |
| 'timestamp': row['timestamp'] | |
| }) | |
| return features | |
| # ========== Images Management ========== | |
| # ========== Images Management ========== | |
| def save_image(self, dog_id: int, image: np.ndarray, | |
| frame_number: int, video_source: str, | |
| bbox: List[float], confidence: float, | |
| pose_keypoints: Optional[List] = None): | |
| """Save dog image to database""" | |
| _, buffer = cv2.imencode('.jpg', image) | |
| image_data = base64.b64encode(buffer).decode('utf-8') | |
| thumbnail = cv2.resize(image, (128, 128)) | |
| _, thumb_buffer = cv2.imencode('.jpg', thumbnail, [cv2.IMWRITE_JPEG_QUALITY, 70]) | |
| thumb_data = base64.b64encode(thumb_buffer).decode('utf-8') | |
| h, w = image.shape[:2] | |
| self.cursor.execute(""" | |
| INSERT INTO dog_images | |
| (dog_id, image_data, thumbnail, width, height, | |
| frame_number, video_source, bbox, confidence, pose_keypoints) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, (dog_id, image_data, thumb_data, w, h, | |
| frame_number, video_source, json.dumps(bbox), | |
| confidence, json.dumps(pose_keypoints) if pose_keypoints else None)) | |
| self.conn.commit() | |
| return self.cursor.lastrowid | |
| def add_dog_image(self, dog_id: int, image: np.ndarray, | |
| timestamp: float, confidence: float, bbox: tuple): | |
| """Add a dog image to database (simplified for dataset collection)""" | |
| try: | |
| # Convert image to base64 for storage | |
| _, buffer = cv2.imencode('.jpg', image) | |
| image_data = base64.b64encode(buffer).decode('utf-8') | |
| # Create thumbnail | |
| thumbnail = cv2.resize(image, (128, 128)) | |
| _, thumb_buffer = cv2.imencode('.jpg', thumbnail, [cv2.IMWRITE_JPEG_QUALITY, 70]) | |
| thumb_data = base64.b64encode(thumb_buffer).decode('utf-8') | |
| h, w = image.shape[:2] | |
| # Insert into database | |
| self.cursor.execute(""" | |
| INSERT INTO dog_images | |
| (dog_id, image_data, thumbnail, width, height, | |
| timestamp, bbox, confidence) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
| """, (dog_id, image_data, thumb_data, w, h, | |
| timestamp, json.dumps(bbox), confidence)) | |
| # Update dog's last_seen and sighting count | |
| self.cursor.execute(""" | |
| UPDATE dogs | |
| SET last_seen = datetime('now'), | |
| total_sightings = total_sightings + 1 | |
| WHERE dog_id = ? | |
| """, (dog_id,)) | |
| self.conn.commit() | |
| return self.cursor.lastrowid | |
| except Exception as e: | |
| self.conn.rollback() | |
| print(f"Error adding dog image: {e}") | |
| raise | |
| def get_dog_images(self, dog_id: int, validated_only: bool = False, | |
| include_discarded: bool = False) -> List[Dict]: | |
| """Get all images for a dog""" | |
| query = "SELECT * FROM dog_images WHERE dog_id = ?" | |
| params = [dog_id] | |
| if validated_only: | |
| query += " AND is_validated = 1" | |
| if not include_discarded: | |
| query += " AND is_discarded = 0" | |
| query += " ORDER BY timestamp DESC" | |
| self.cursor.execute(query, params) | |
| rows = self.cursor.fetchall() | |
| images = [] | |
| for row in rows: | |
| # β Decode base64 string to bytes first | |
| image_bytes = base64.b64decode(row['image_data']) | |
| nparr = np.frombuffer(image_bytes, np.uint8) | |
| image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| images.append({ | |
| 'image_id': row['image_id'], | |
| 'image': image, | |
| 'thumbnail': row['thumbnail'], | |
| 'bbox': json.loads(row['bbox']) if row['bbox'] else [], | |
| 'confidence': row['confidence'], | |
| 'frame_number': row['frame_number'] if 'frame_number' in row.keys() else None, | |
| 'video_source': row['video_source'] if 'video_source' in row.keys() else None, | |
| 'is_validated': row['is_validated'], | |
| 'is_discarded': row['is_discarded'], | |
| 'pose_keypoints': json.loads(row['pose_keypoints']) if row['pose_keypoints'] else None | |
| }) | |
| return images | |
| def validate_image(self, image_id: int, is_valid: bool = True): | |
| """Mark image as validated or discarded""" | |
| if is_valid: | |
| self.cursor.execute("UPDATE dog_images SET is_validated = 1 WHERE image_id = ?", (image_id,)) | |
| else: | |
| self.cursor.execute("UPDATE dog_images SET is_discarded = 1 WHERE image_id = ?", (image_id,)) | |
| self.conn.commit() | |
| # ========== Body Parts Management ========== | |
| def save_body_parts(self, dog_id: int, image_id: int, | |
| head_crop: Optional[np.ndarray], | |
| torso_crop: Optional[np.ndarray], | |
| rear_crop: Optional[np.ndarray], | |
| confidences: Dict[str, float]): | |
| """Save body part crops to database""" | |
| parts = { | |
| 'head': head_crop, | |
| 'torso': torso_crop, | |
| 'rear': rear_crop | |
| } | |
| for part_type, crop in parts.items(): | |
| if crop is not None: | |
| _, buffer = cv2.imencode('.jpg', crop) | |
| crop_data = base64.b64encode(buffer).decode('utf-8') | |
| confidence = confidences.get(part_type, 0.0) | |
| self.cursor.execute(""" | |
| INSERT INTO body_parts | |
| (dog_id, image_id, part_type, part_image, confidence) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, (dog_id, image_id, part_type, crop_data, confidence)) | |
| self.conn.commit() | |
| def get_body_parts(self, dog_id: int, part_type: Optional[str] = None, | |
| validated_only: bool = False, include_discarded: bool = False) -> List[Dict]: | |
| """Get body part crops for a dog""" | |
| query = "SELECT * FROM body_parts WHERE dog_id = ?" | |
| params = [dog_id] | |
| if part_type: | |
| query += " AND part_type = ?" | |
| params.append(part_type) | |
| if validated_only: | |
| query += " AND is_validated = 1" | |
| if not include_discarded: | |
| query += " AND is_discarded = 0" | |
| self.cursor.execute(query, params) | |
| parts = [] | |
| for row in self.cursor.fetchall(): | |
| image_bytes = base64.b64decode(row['part_image']) | |
| nparr = np.frombuffer(image_bytes, np.uint8) | |
| image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| parts.append({ | |
| 'part_id': row['part_id'], | |
| 'part_type': row['part_type'], | |
| 'image': image, | |
| 'confidence': row['confidence'], | |
| 'is_validated': row['is_validated'], | |
| 'image_id': row['image_id'] | |
| }) | |
| return parts | |
| def validate_body_part(self, part_id: int, is_valid: bool = True): | |
| """Mark body part as validated or discarded""" | |
| if is_valid: | |
| self.cursor.execute("UPDATE body_parts SET is_validated = 1 WHERE part_id = ?", (part_id,)) | |
| else: | |
| self.cursor.execute("UPDATE body_parts SET is_discarded = 1 WHERE part_id = ?", (part_id,)) | |
| self.conn.commit() | |
| def add_sighting(self, dog_id: int, position: Tuple[float, float], | |
| video_source: str, frame_number: int, confidence: float): | |
| """Record a dog sighting""" | |
| self.cursor.execute(""" | |
| INSERT INTO sightings | |
| (dog_id, position_x, position_y, video_source, frame_number, confidence) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| """, (dog_id, position[0], position[1], video_source, frame_number, confidence)) | |
| self.conn.commit() | |
| # ========== Query Methods ========== | |
| def get_all_dogs(self, active_only: bool = True) -> pd.DataFrame: | |
| """Get all dogs as DataFrame""" | |
| query = "SELECT * FROM dogs" | |
| if active_only: | |
| query += " WHERE status = 'active'" | |
| query += " ORDER BY dog_id" | |
| return pd.read_sql_query(query, self.conn) | |
| def get_dog_statistics(self) -> Dict: | |
| """Get overall statistics""" | |
| stats = {} | |
| self.cursor.execute("SELECT COUNT(*) FROM dogs WHERE status = 'active'") | |
| stats['total_active_dogs'] = self.cursor.fetchone()[0] | |
| self.cursor.execute("SELECT COUNT(*) FROM dog_images WHERE is_discarded = 0") | |
| stats['total_images'] = self.cursor.fetchone()[0] | |
| self.cursor.execute("SELECT COUNT(*) FROM dog_images WHERE is_validated = 1") | |
| stats['validated_images'] = self.cursor.fetchone()[0] | |
| self.cursor.execute("SELECT COUNT(*) FROM sightings") | |
| stats['total_sightings'] = self.cursor.fetchone()[0] | |
| self.cursor.execute(""" | |
| SELECT d.dog_id, d.name, d.total_sightings | |
| FROM dogs d | |
| WHERE d.status = 'active' | |
| ORDER BY d.total_sightings DESC | |
| LIMIT 1 | |
| """) | |
| row = self.cursor.fetchone() | |
| if row: | |
| stats['most_seen_dog'] = { | |
| 'dog_id': row[0], | |
| 'name': row[1] or f"Dog #{row[0]}", | |
| 'sightings': row[2] | |
| } | |
| return stats | |
| # ========== Export Methods ========== | |
| def export_training_dataset(self, output_dir: str, validated_only: bool = True) -> Dict: | |
| """Export dataset with body parts for fine-tuning""" | |
| output_path = Path(output_dir) | |
| output_path.mkdir(parents=True, exist_ok=True) | |
| images_dir = output_path / "images" | |
| images_dir.mkdir(exist_ok=True) | |
| dataset = [] | |
| dogs = self.get_all_dogs() | |
| for _, dog in dogs.iterrows(): | |
| dog_id = dog['dog_id'] | |
| dog_dir = images_dir / f"dog_{dog_id}" | |
| dog_dir.mkdir(exist_ok=True) | |
| for part in ['full', 'head', 'torso', 'rear']: | |
| (dog_dir / part).mkdir(exist_ok=True) | |
| images = self.get_dog_images(dog_id, validated_only=validated_only) | |
| for idx, img_data in enumerate(images): | |
| full_path = dog_dir / 'full' / f"img_{idx:04d}.jpg" | |
| cv2.imwrite(str(full_path), img_data['image']) | |
| parts = self.get_body_parts(dog_id, validated_only=validated_only) | |
| part_paths = {} | |
| for part_data in parts: | |
| if part_data['image_id'] == img_data['image_id']: | |
| part_type = part_data['part_type'] | |
| part_path = dog_dir / part_type / f"img_{idx:04d}.jpg" | |
| cv2.imwrite(str(part_path), part_data['image']) | |
| part_paths[part_type] = str(part_path.relative_to(output_path)) | |
| dataset_entry = { | |
| 'dog_id': dog_id, | |
| 'full_image': str(full_path.relative_to(output_path)), | |
| 'bbox': img_data['bbox'], | |
| 'confidence': img_data['confidence'] | |
| } | |
| for part_type in ['head', 'torso', 'rear']: | |
| dataset_entry[f'{part_type}_image'] = part_paths.get(part_type, None) | |
| dataset.append(dataset_entry) | |
| dataset_df = pd.DataFrame(dataset) | |
| dataset_df.to_csv(output_path / "dataset.csv", index=False) | |
| metadata = { | |
| 'total_dogs': len(dogs), | |
| 'total_images': len(dataset), | |
| 'export_date': datetime.now().isoformat(), | |
| 'validated_only': validated_only, | |
| 'includes_body_parts': True | |
| } | |
| with open(output_path / "metadata.json", 'w') as f: | |
| json.dump(metadata, f, indent=2) | |
| from sklearn.model_selection import train_test_split | |
| train_df, test_df = train_test_split(dataset_df, test_size=0.2, stratify=dataset_df['dog_id']) | |
| train_df.to_csv(output_path / "train.csv", index=False) | |
| test_df.to_csv(output_path / "test.csv", index=False) | |
| metadata['train_samples'] = len(train_df) | |
| metadata['test_samples'] = len(test_df) | |
| return metadata | |
| # ========== Cleanup Methods ========== | |
| def reset_database(self, confirm: bool = False): | |
| """Reset entire database""" | |
| if not confirm: | |
| return False | |
| tables = ['sightings', 'dog_images', 'dog_features', 'dogs', 'sessions'] | |
| for table in tables: | |
| self.cursor.execute(f"DELETE FROM {table}") | |
| self.cursor.execute("DELETE FROM sqlite_sequence") | |
| self.conn.commit() | |
| return True | |
| def vacuum(self): | |
| """Optimize database file size""" | |
| self.conn.execute("VACUUM") | |
| def close(self): | |
| """Close database connection""" | |
| self.conn.close() | |
| def __del__(self): | |
| """Ensure connection is closed""" | |
| if hasattr(self, 'conn'): | |
| self.conn.close() | |
| # ========== Health Methods (NEW) ========== | |
| def save_health_assessment(self, dog_id: int, health_score: float, status: str, | |
| posture_score: float = None, gait_score: float = None, | |
| body_condition_score: float = None, activity_score: float = None, | |
| alerts: List[str] = None, recommendations: List[str] = None, | |
| confidence: float = 0.5, video_source: str = None, | |
| frame_number: int = None): | |
| """Save health assessment to database""" | |
| self.cursor.execute(""" | |
| INSERT INTO health_assessments | |
| (dog_id, health_score, status, posture_score, gait_score, | |
| body_condition_score, activity_score, alerts, recommendations, | |
| confidence, video_source, frame_number) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, (dog_id, health_score, status, posture_score, gait_score, | |
| body_condition_score, activity_score, | |
| json.dumps(alerts) if alerts else None, | |
| json.dumps(recommendations) if recommendations else None, | |
| confidence, video_source, frame_number)) | |
| self.cursor.execute(""" | |
| UPDATE dogs | |
| SET last_health_score = ?, | |
| health_status = ? | |
| WHERE dog_id = ? | |
| """, (health_score, status, dog_id)) | |
| self.conn.commit() | |
| def get_health_history(self, dog_id: int, limit: int = 50) -> List[Dict]: | |
| """Get health assessment history for a dog""" | |
| self.cursor.execute(""" | |
| SELECT * FROM health_assessments | |
| WHERE dog_id = ? | |
| ORDER BY timestamp DESC | |
| LIMIT ? | |
| """, (dog_id, limit)) | |
| assessments = [] | |
| for row in self.cursor.fetchall(): | |
| assessments.append({ | |
| 'timestamp': row['timestamp'], | |
| 'health_score': row['health_score'], | |
| 'status': row['status'], | |
| 'posture_score': row['posture_score'], | |
| 'gait_score': row['gait_score'], | |
| 'body_condition_score': row['body_condition_score'], | |
| 'activity_score': row['activity_score'], | |
| 'alerts': json.loads(row['alerts']) if row['alerts'] else [], | |
| 'recommendations': json.loads(row['recommendations']) if row['recommendations'] else [], | |
| 'confidence': row['confidence'] | |
| }) | |
| return assessments | |
| def get_health_statistics(self) -> Dict: | |
| """Get overall health statistics""" | |
| stats = {} | |
| self.cursor.execute(""" | |
| SELECT AVG(last_health_score) as avg_health, | |
| COUNT(CASE WHEN last_health_score >= 8 THEN 1 END) as healthy_count, | |
| COUNT(CASE WHEN last_health_score < 6 THEN 1 END) as unhealthy_count, | |
| COUNT(*) as total_dogs | |
| FROM dogs | |
| WHERE status = 'active' | |
| """) | |
| row = self.cursor.fetchone() | |
| if row: | |
| stats['average_health'] = round(row['avg_health'] or 5.0, 1) | |
| stats['healthy_dogs'] = row['healthy_count'] or 0 | |
| stats['unhealthy_dogs'] = row['unhealthy_count'] or 0 | |
| stats['total_dogs'] = row['total_dogs'] or 0 | |
| self.cursor.execute(""" | |
| SELECT dog_id, name, last_health_score, health_status | |
| FROM dogs | |
| WHERE status = 'active' AND last_health_score < 6 | |
| ORDER BY last_health_score ASC | |
| LIMIT 10 | |
| """) | |
| stats['dogs_needing_attention'] = [] | |
| for row in self.cursor.fetchall(): | |
| stats['dogs_needing_attention'].append({ | |
| 'dog_id': row['dog_id'], | |
| 'name': row['name'] or f"Dog #{row['dog_id']}", | |
| 'health_score': row['last_health_score'], | |
| 'status': row['health_status'] | |
| }) | |
| return stats | |
| def save_pose_keypoints(self, dog_id: int, keypoints: np.ndarray, | |
| frame_number: int, video_source: str): | |
| """Save pose keypoints for a dog""" | |
| keypoints_json = json.dumps(keypoints.tolist()) if keypoints is not None else None | |
| self.cursor.execute(""" | |
| UPDATE sightings | |
| SET pose_keypoints = ? | |
| WHERE dog_id = ? AND frame_number = ? AND video_source = ? | |
| """, (keypoints_json, dog_id, frame_number, video_source)) | |
| self.conn.commit() | |
| def export_training_dataset(self, output_dir: str = "training_dataset") -> Dict: | |
| """ | |
| Export database images to folder structure for fine-tuning | |
| Output structure: | |
| training_dataset/ | |
| βββ dog_1/ | |
| β βββ img_0000.jpg | |
| β βββ ... | |
| βββ dog_2/ | |
| βββ metadata.json | |
| Returns: | |
| dict with export statistics | |
| """ | |
| from pathlib import Path | |
| import json | |
| from datetime import datetime | |
| output_path = Path(output_dir) | |
| output_path.mkdir(parents=True, exist_ok=True) | |
| # Get all active dogs | |
| dogs_df = self.get_all_dogs(active_only=True) | |
| if dogs_df.empty: | |
| return { | |
| 'success': False, | |
| 'message': 'No dogs in database', | |
| 'total_dogs': 0, | |
| 'total_images': 0 | |
| } | |
| total_images = 0 | |
| export_info = [] | |
| for _, dog in dogs_df.iterrows(): | |
| dog_id = dog['dog_id'] | |
| dog_name = dog['name'] or f"dog_{dog_id}" | |
| # Sanitize folder name (remove special characters) | |
| safe_name = "".join(c for c in dog_name if c.isalnum() or c in ('_', '-')) | |
| # Create folder for this dog | |
| dog_folder = output_path / safe_name | |
| dog_folder.mkdir(exist_ok=True) | |
| # Get all non-discarded images | |
| images = self.get_dog_images( | |
| dog_id=dog_id, | |
| validated_only=False, | |
| include_discarded=False | |
| ) | |
| if not images: | |
| print(f"Warning: Dog {dog_id} has no images") | |
| continue | |
| # Save each image as JPG | |
| for idx, img_data in enumerate(images): | |
| image = img_data['image'] # OpenCV array from BLOB | |
| filename = f"img_{idx:04d}.jpg" | |
| filepath = dog_folder / filename | |
| # Write to disk | |
| success = cv2.imwrite(str(filepath), image) | |
| if not success: | |
| print(f"Warning: Failed to write {filepath}") | |
| total_images += len(images) | |
| export_info.append({ | |
| 'dog_id': dog_id, | |
| 'name': dog_name, | |
| 'folder_name': safe_name, | |
| 'image_count': len(images), | |
| 'folder': str(dog_folder) | |
| }) | |
| print(f"Exported {len(images)} images for {dog_name}") | |
| # Create metadata file | |
| metadata = { | |
| 'export_date': datetime.now().isoformat(), | |
| 'total_dogs': len(dogs_df), | |
| 'total_images': total_images, | |
| 'output_directory': str(output_path), | |
| 'dogs': export_info | |
| } | |
| metadata_path = output_path / 'metadata.json' | |
| with open(metadata_path, 'w') as f: | |
| json.dump(metadata, f, indent=2) | |
| print(f"\nExport complete: {len(dogs_df)} dogs, {total_images} images") | |
| print(f"Location: {output_path}") | |
| return { | |
| 'success': True, | |
| 'total_dogs': len(dogs_df), | |
| 'total_images': total_images, | |
| 'output_path': str(output_path), | |
| 'dogs': export_info | |
| } | |