Stray_Dogs / database.py
mustafa2ak's picture
Update database.py
efeb13e verified
"""
database.py - SQLite Database Manager for Dog Monitoring System
Handles persistent storage of dog data, features, and annotations
"""
import sqlite3
import json
import pickle
import base64
import numpy as np
import cv2
from datetime import datetime
from typing import List, Dict, Optional, Tuple, Any
from pathlib import Path
import pandas as pd
class DogDatabase:
"""SQLite database manager for dog monitoring system"""
def __init__(self, db_path: str = "dog_monitoring.db"):
"""Initialize database connection and create tables"""
self.db_path = db_path
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self.conn.row_factory = sqlite3.Row
self.cursor = self.conn.cursor()
# Create tables if they don't exist
self._create_tables()
def _create_tables(self):
"""Create all necessary database tables"""
# Dogs table - main registry
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS dogs (
dog_id INTEGER PRIMARY KEY,
name TEXT,
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
total_sightings INTEGER DEFAULT 1,
notes TEXT,
merged_from TEXT,
status TEXT DEFAULT 'active'
)
""")
# Dog features table
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS dog_features (
feature_id INTEGER PRIMARY KEY AUTOINCREMENT,
dog_id INTEGER,
resnet_features BLOB,
color_histogram BLOB,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
confidence REAL,
FOREIGN KEY (dog_id) REFERENCES dogs(dog_id)
)
""")
# Dog images table
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS dog_images (
image_id INTEGER PRIMARY KEY AUTOINCREMENT,
dog_id INTEGER,
image_data BLOB,
thumbnail BLOB,
width INTEGER,
height INTEGER,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
frame_number INTEGER,
video_source TEXT,
bbox TEXT,
confidence REAL,
pose_keypoints TEXT,
is_validated BOOLEAN DEFAULT 0,
is_discarded BOOLEAN DEFAULT 0,
FOREIGN KEY (dog_id) REFERENCES dogs(dog_id)
)
""")
# Body parts table
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS body_parts (
part_id INTEGER PRIMARY KEY AUTOINCREMENT,
dog_id INTEGER,
image_id INTEGER,
part_type TEXT,
part_image BLOB,
crop_bbox TEXT,
confidence REAL,
is_validated BOOLEAN DEFAULT 0,
is_discarded BOOLEAN DEFAULT 0,
FOREIGN KEY (dog_id) REFERENCES dogs(dog_id),
FOREIGN KEY (image_id) REFERENCES dog_images(image_id)
)
""")
# Sightings table
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS sightings (
sighting_id INTEGER PRIMARY KEY AUTOINCREMENT,
dog_id INTEGER,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
position_x REAL,
position_y REAL,
video_source TEXT,
frame_number INTEGER,
confidence REAL,
FOREIGN KEY (dog_id) REFERENCES dogs(dog_id)
)
""")
# Processing sessions table
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS sessions (
session_id INTEGER PRIMARY KEY AUTOINCREMENT,
start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
end_time TIMESTAMP,
video_path TEXT,
total_frames INTEGER,
dogs_detected INTEGER,
settings TEXT
)
""")
# Create indexes
self.cursor.execute("CREATE INDEX IF NOT EXISTS idx_dog_features ON dog_features(dog_id)")
self.cursor.execute("CREATE INDEX IF NOT EXISTS idx_dog_images ON dog_images(dog_id)")
self.cursor.execute("CREATE INDEX IF NOT EXISTS idx_sightings ON sightings(dog_id)")
self.conn.commit()
# ========== Dog Management ==========
def add_dog(self, name: str = None) -> int:
"""Add new dog to database and return dog_id"""
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO dogs (name, first_seen, status)
VALUES (?, datetime('now'), 'active')
''', (name,))
self.conn.commit()
return cursor.lastrowid
def update_dog_sighting(self, dog_id: int):
"""Update last seen time and increment sighting count"""
self.cursor.execute("""
UPDATE dogs
SET last_seen = CURRENT_TIMESTAMP,
total_sightings = total_sightings + 1
WHERE dog_id = ?
""", (dog_id,))
self.conn.commit()
def export_training_dataset(self, output_dir: str = "training_dataset") -> Dict:
"""
Export database images to folder structure for fine-tuning
Output structure:
training_dataset/
β”œβ”€β”€ dog_1/
β”‚ β”œβ”€β”€ img_0000.jpg
β”‚ └── ...
β”œβ”€β”€ dog_2/
└── metadata.json
Returns:
dict with export statistics
"""
from pathlib import Path
import json
from datetime import datetime
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Get all active dogs
dogs_df = self.get_all_dogs(active_only=True)
if dogs_df.empty:
return {
'success': False,
'message': 'No dogs in database',
'total_dogs': 0,
'total_images': 0
}
total_images = 0
export_info = []
for _, dog in dogs_df.iterrows():
dog_id = dog['dog_id']
dog_name = dog['name'] or f"dog_{dog_id}"
# Create folder for this dog
dog_folder = output_path / dog_name
dog_folder.mkdir(exist_ok=True)
# Get all non-discarded images
images = self.get_dog_images(
dog_id=dog_id,
validated_only=False,
include_discarded=False
)
if not images:
print(f"Warning: Dog {dog_id} has no images")
continue
# Save each image as JPG
for idx, img_data in enumerate(images):
image = img_data['image'] # OpenCV array from BLOB
filename = f"img_{idx:04d}.jpg"
filepath = dog_folder / filename
# Write to disk
cv2.imwrite(str(filepath), image)
total_images += len(images)
export_info.append({
'dog_id': dog_id,
'name': dog_name,
'image_count': len(images),
'folder': str(dog_folder)
})
print(f"Exported {len(images)} images for {dog_name}")
# Create metadata file
metadata = {
'export_date': datetime.now().isoformat(),
'total_dogs': len(dogs_df),
'total_images': total_images,
'output_directory': str(output_path),
'dogs': export_info
}
metadata_path = output_path / 'metadata.json'
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
print(f"\nExport complete: {len(dogs_df)} dogs, {total_images} images")
print(f"Location: {output_path}")
return {
'success': True,
'total_dogs': len(dogs_df),
'total_images': total_images,
'output_path': str(output_path),
'dogs': export_info
}
def merge_dogs(self, keep_id: int, merge_id: int) -> bool:
"""Merge two dogs, keeping keep_id"""
try:
self.cursor.execute("UPDATE dog_features SET dog_id = ? WHERE dog_id = ?", (keep_id, merge_id))
self.cursor.execute("UPDATE dog_images SET dog_id = ? WHERE dog_id = ?", (keep_id, merge_id))
self.cursor.execute("UPDATE sightings SET dog_id = ? WHERE dog_id = ?", (keep_id, merge_id))
self.cursor.execute("SELECT merged_from FROM dogs WHERE dog_id = ?", (merge_id,))
row = self.cursor.fetchone()
merged_history = json.loads(row['merged_from'] if row and row['merged_from'] else '[]')
merged_history.append(merge_id)
self.cursor.execute("""
UPDATE dogs
SET merged_from = ?,
total_sightings = total_sightings + (
SELECT total_sightings FROM dogs WHERE dog_id = ?
)
WHERE dog_id = ?
""", (json.dumps(merged_history), merge_id, keep_id))
self.cursor.execute("UPDATE dogs SET status = 'merged' WHERE dog_id = ?", (merge_id,))
self.conn.commit()
return True
except Exception as e:
print(f"Error merging dogs: {e}")
self.conn.rollback()
return False
def delete_dog(self, dog_id: int, hard_delete: bool = False):
"""Delete or mark dog as deleted"""
if hard_delete:
self.cursor.execute("DELETE FROM dog_features WHERE dog_id = ?", (dog_id,))
self.cursor.execute("DELETE FROM dog_images WHERE dog_id = ?", (dog_id,))
self.cursor.execute("DELETE FROM sightings WHERE dog_id = ?", (dog_id,))
self.cursor.execute("DELETE FROM dogs WHERE dog_id = ?", (dog_id,))
else:
self.cursor.execute("UPDATE dogs SET status = 'deleted' WHERE dog_id = ?", (dog_id,))
self.conn.commit()
# ========== Features Management ==========
def save_features(self, dog_id: int, resnet_features: np.ndarray,
color_histogram: np.ndarray, confidence: float):
"""Save dog features to database"""
resnet_blob = pickle.dumps(resnet_features)
color_blob = pickle.dumps(color_histogram)
self.cursor.execute("""
INSERT INTO dog_features
(dog_id, resnet_features, color_histogram, confidence)
VALUES (?, ?, ?, ?)
""", (dog_id, resnet_blob, color_blob, confidence))
self.conn.commit()
def get_features(self, dog_id: int, limit: int = 20) -> List[Dict]:
"""Get recent features for a dog"""
self.cursor.execute("""
SELECT * FROM dog_features
WHERE dog_id = ?
ORDER BY timestamp DESC
LIMIT ?
""", (dog_id, limit))
features = []
for row in self.cursor.fetchall():
features.append({
'resnet_features': pickle.loads(row['resnet_features']),
'color_histogram': pickle.loads(row['color_histogram']),
'confidence': row['confidence'],
'timestamp': row['timestamp']
})
return features
# ========== Images Management ==========
# ========== Images Management ==========
def save_image(self, dog_id: int, image: np.ndarray,
frame_number: int, video_source: str,
bbox: List[float], confidence: float,
pose_keypoints: Optional[List] = None):
"""Save dog image to database"""
_, buffer = cv2.imencode('.jpg', image)
image_data = base64.b64encode(buffer).decode('utf-8')
thumbnail = cv2.resize(image, (128, 128))
_, thumb_buffer = cv2.imencode('.jpg', thumbnail, [cv2.IMWRITE_JPEG_QUALITY, 70])
thumb_data = base64.b64encode(thumb_buffer).decode('utf-8')
h, w = image.shape[:2]
self.cursor.execute("""
INSERT INTO dog_images
(dog_id, image_data, thumbnail, width, height,
frame_number, video_source, bbox, confidence, pose_keypoints)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (dog_id, image_data, thumb_data, w, h,
frame_number, video_source, json.dumps(bbox),
confidence, json.dumps(pose_keypoints) if pose_keypoints else None))
self.conn.commit()
return self.cursor.lastrowid
def add_dog_image(self, dog_id: int, image: np.ndarray,
timestamp: float, confidence: float, bbox: tuple):
"""Add a dog image to database (simplified for dataset collection)"""
try:
# Convert image to base64 for storage
_, buffer = cv2.imencode('.jpg', image)
image_data = base64.b64encode(buffer).decode('utf-8')
# Create thumbnail
thumbnail = cv2.resize(image, (128, 128))
_, thumb_buffer = cv2.imencode('.jpg', thumbnail, [cv2.IMWRITE_JPEG_QUALITY, 70])
thumb_data = base64.b64encode(thumb_buffer).decode('utf-8')
h, w = image.shape[:2]
# Insert into database
self.cursor.execute("""
INSERT INTO dog_images
(dog_id, image_data, thumbnail, width, height,
timestamp, bbox, confidence)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (dog_id, image_data, thumb_data, w, h,
timestamp, json.dumps(bbox), confidence))
# Update dog's last_seen and sighting count
self.cursor.execute("""
UPDATE dogs
SET last_seen = datetime('now'),
total_sightings = total_sightings + 1
WHERE dog_id = ?
""", (dog_id,))
self.conn.commit()
return self.cursor.lastrowid
except Exception as e:
self.conn.rollback()
print(f"Error adding dog image: {e}")
raise
def get_dog_images(self, dog_id: int, validated_only: bool = False,
include_discarded: bool = False) -> List[Dict]:
"""Get all images for a dog"""
query = "SELECT * FROM dog_images WHERE dog_id = ?"
params = [dog_id]
if validated_only:
query += " AND is_validated = 1"
if not include_discarded:
query += " AND is_discarded = 0"
query += " ORDER BY timestamp DESC"
self.cursor.execute(query, params)
rows = self.cursor.fetchall()
images = []
for row in rows:
# βœ… Decode base64 string to bytes first
image_bytes = base64.b64decode(row['image_data'])
nparr = np.frombuffer(image_bytes, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
images.append({
'image_id': row['image_id'],
'image': image,
'thumbnail': row['thumbnail'],
'bbox': json.loads(row['bbox']) if row['bbox'] else [],
'confidence': row['confidence'],
'frame_number': row['frame_number'] if 'frame_number' in row.keys() else None,
'video_source': row['video_source'] if 'video_source' in row.keys() else None,
'is_validated': row['is_validated'],
'is_discarded': row['is_discarded'],
'pose_keypoints': json.loads(row['pose_keypoints']) if row['pose_keypoints'] else None
})
return images
def validate_image(self, image_id: int, is_valid: bool = True):
"""Mark image as validated or discarded"""
if is_valid:
self.cursor.execute("UPDATE dog_images SET is_validated = 1 WHERE image_id = ?", (image_id,))
else:
self.cursor.execute("UPDATE dog_images SET is_discarded = 1 WHERE image_id = ?", (image_id,))
self.conn.commit()
# ========== Body Parts Management ==========
def save_body_parts(self, dog_id: int, image_id: int,
head_crop: Optional[np.ndarray],
torso_crop: Optional[np.ndarray],
rear_crop: Optional[np.ndarray],
confidences: Dict[str, float]):
"""Save body part crops to database"""
parts = {
'head': head_crop,
'torso': torso_crop,
'rear': rear_crop
}
for part_type, crop in parts.items():
if crop is not None:
_, buffer = cv2.imencode('.jpg', crop)
crop_data = base64.b64encode(buffer).decode('utf-8')
confidence = confidences.get(part_type, 0.0)
self.cursor.execute("""
INSERT INTO body_parts
(dog_id, image_id, part_type, part_image, confidence)
VALUES (?, ?, ?, ?, ?)
""", (dog_id, image_id, part_type, crop_data, confidence))
self.conn.commit()
def get_body_parts(self, dog_id: int, part_type: Optional[str] = None,
validated_only: bool = False, include_discarded: bool = False) -> List[Dict]:
"""Get body part crops for a dog"""
query = "SELECT * FROM body_parts WHERE dog_id = ?"
params = [dog_id]
if part_type:
query += " AND part_type = ?"
params.append(part_type)
if validated_only:
query += " AND is_validated = 1"
if not include_discarded:
query += " AND is_discarded = 0"
self.cursor.execute(query, params)
parts = []
for row in self.cursor.fetchall():
image_bytes = base64.b64decode(row['part_image'])
nparr = np.frombuffer(image_bytes, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
parts.append({
'part_id': row['part_id'],
'part_type': row['part_type'],
'image': image,
'confidence': row['confidence'],
'is_validated': row['is_validated'],
'image_id': row['image_id']
})
return parts
def validate_body_part(self, part_id: int, is_valid: bool = True):
"""Mark body part as validated or discarded"""
if is_valid:
self.cursor.execute("UPDATE body_parts SET is_validated = 1 WHERE part_id = ?", (part_id,))
else:
self.cursor.execute("UPDATE body_parts SET is_discarded = 1 WHERE part_id = ?", (part_id,))
self.conn.commit()
def add_sighting(self, dog_id: int, position: Tuple[float, float],
video_source: str, frame_number: int, confidence: float):
"""Record a dog sighting"""
self.cursor.execute("""
INSERT INTO sightings
(dog_id, position_x, position_y, video_source, frame_number, confidence)
VALUES (?, ?, ?, ?, ?, ?)
""", (dog_id, position[0], position[1], video_source, frame_number, confidence))
self.conn.commit()
# ========== Query Methods ==========
def get_all_dogs(self, active_only: bool = True) -> pd.DataFrame:
"""Get all dogs as DataFrame"""
query = "SELECT * FROM dogs"
if active_only:
query += " WHERE status = 'active'"
query += " ORDER BY dog_id"
return pd.read_sql_query(query, self.conn)
def get_dog_statistics(self) -> Dict:
"""Get overall statistics"""
stats = {}
self.cursor.execute("SELECT COUNT(*) FROM dogs WHERE status = 'active'")
stats['total_active_dogs'] = self.cursor.fetchone()[0]
self.cursor.execute("SELECT COUNT(*) FROM dog_images WHERE is_discarded = 0")
stats['total_images'] = self.cursor.fetchone()[0]
self.cursor.execute("SELECT COUNT(*) FROM dog_images WHERE is_validated = 1")
stats['validated_images'] = self.cursor.fetchone()[0]
self.cursor.execute("SELECT COUNT(*) FROM sightings")
stats['total_sightings'] = self.cursor.fetchone()[0]
self.cursor.execute("""
SELECT d.dog_id, d.name, d.total_sightings
FROM dogs d
WHERE d.status = 'active'
ORDER BY d.total_sightings DESC
LIMIT 1
""")
row = self.cursor.fetchone()
if row:
stats['most_seen_dog'] = {
'dog_id': row[0],
'name': row[1] or f"Dog #{row[0]}",
'sightings': row[2]
}
return stats
# ========== Export Methods ==========
def export_training_dataset(self, output_dir: str, validated_only: bool = True) -> Dict:
"""Export dataset with body parts for fine-tuning"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
images_dir = output_path / "images"
images_dir.mkdir(exist_ok=True)
dataset = []
dogs = self.get_all_dogs()
for _, dog in dogs.iterrows():
dog_id = dog['dog_id']
dog_dir = images_dir / f"dog_{dog_id}"
dog_dir.mkdir(exist_ok=True)
for part in ['full', 'head', 'torso', 'rear']:
(dog_dir / part).mkdir(exist_ok=True)
images = self.get_dog_images(dog_id, validated_only=validated_only)
for idx, img_data in enumerate(images):
full_path = dog_dir / 'full' / f"img_{idx:04d}.jpg"
cv2.imwrite(str(full_path), img_data['image'])
parts = self.get_body_parts(dog_id, validated_only=validated_only)
part_paths = {}
for part_data in parts:
if part_data['image_id'] == img_data['image_id']:
part_type = part_data['part_type']
part_path = dog_dir / part_type / f"img_{idx:04d}.jpg"
cv2.imwrite(str(part_path), part_data['image'])
part_paths[part_type] = str(part_path.relative_to(output_path))
dataset_entry = {
'dog_id': dog_id,
'full_image': str(full_path.relative_to(output_path)),
'bbox': img_data['bbox'],
'confidence': img_data['confidence']
}
for part_type in ['head', 'torso', 'rear']:
dataset_entry[f'{part_type}_image'] = part_paths.get(part_type, None)
dataset.append(dataset_entry)
dataset_df = pd.DataFrame(dataset)
dataset_df.to_csv(output_path / "dataset.csv", index=False)
metadata = {
'total_dogs': len(dogs),
'total_images': len(dataset),
'export_date': datetime.now().isoformat(),
'validated_only': validated_only,
'includes_body_parts': True
}
with open(output_path / "metadata.json", 'w') as f:
json.dump(metadata, f, indent=2)
from sklearn.model_selection import train_test_split
train_df, test_df = train_test_split(dataset_df, test_size=0.2, stratify=dataset_df['dog_id'])
train_df.to_csv(output_path / "train.csv", index=False)
test_df.to_csv(output_path / "test.csv", index=False)
metadata['train_samples'] = len(train_df)
metadata['test_samples'] = len(test_df)
return metadata
# ========== Cleanup Methods ==========
def reset_database(self, confirm: bool = False):
"""Reset entire database"""
if not confirm:
return False
tables = ['sightings', 'dog_images', 'dog_features', 'dogs', 'sessions']
for table in tables:
self.cursor.execute(f"DELETE FROM {table}")
self.cursor.execute("DELETE FROM sqlite_sequence")
self.conn.commit()
return True
def vacuum(self):
"""Optimize database file size"""
self.conn.execute("VACUUM")
def close(self):
"""Close database connection"""
self.conn.close()
def __del__(self):
"""Ensure connection is closed"""
if hasattr(self, 'conn'):
self.conn.close()
# ========== Health Methods (NEW) ==========
def save_health_assessment(self, dog_id: int, health_score: float, status: str,
posture_score: float = None, gait_score: float = None,
body_condition_score: float = None, activity_score: float = None,
alerts: List[str] = None, recommendations: List[str] = None,
confidence: float = 0.5, video_source: str = None,
frame_number: int = None):
"""Save health assessment to database"""
self.cursor.execute("""
INSERT INTO health_assessments
(dog_id, health_score, status, posture_score, gait_score,
body_condition_score, activity_score, alerts, recommendations,
confidence, video_source, frame_number)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (dog_id, health_score, status, posture_score, gait_score,
body_condition_score, activity_score,
json.dumps(alerts) if alerts else None,
json.dumps(recommendations) if recommendations else None,
confidence, video_source, frame_number))
self.cursor.execute("""
UPDATE dogs
SET last_health_score = ?,
health_status = ?
WHERE dog_id = ?
""", (health_score, status, dog_id))
self.conn.commit()
def get_health_history(self, dog_id: int, limit: int = 50) -> List[Dict]:
"""Get health assessment history for a dog"""
self.cursor.execute("""
SELECT * FROM health_assessments
WHERE dog_id = ?
ORDER BY timestamp DESC
LIMIT ?
""", (dog_id, limit))
assessments = []
for row in self.cursor.fetchall():
assessments.append({
'timestamp': row['timestamp'],
'health_score': row['health_score'],
'status': row['status'],
'posture_score': row['posture_score'],
'gait_score': row['gait_score'],
'body_condition_score': row['body_condition_score'],
'activity_score': row['activity_score'],
'alerts': json.loads(row['alerts']) if row['alerts'] else [],
'recommendations': json.loads(row['recommendations']) if row['recommendations'] else [],
'confidence': row['confidence']
})
return assessments
def get_health_statistics(self) -> Dict:
"""Get overall health statistics"""
stats = {}
self.cursor.execute("""
SELECT AVG(last_health_score) as avg_health,
COUNT(CASE WHEN last_health_score >= 8 THEN 1 END) as healthy_count,
COUNT(CASE WHEN last_health_score < 6 THEN 1 END) as unhealthy_count,
COUNT(*) as total_dogs
FROM dogs
WHERE status = 'active'
""")
row = self.cursor.fetchone()
if row:
stats['average_health'] = round(row['avg_health'] or 5.0, 1)
stats['healthy_dogs'] = row['healthy_count'] or 0
stats['unhealthy_dogs'] = row['unhealthy_count'] or 0
stats['total_dogs'] = row['total_dogs'] or 0
self.cursor.execute("""
SELECT dog_id, name, last_health_score, health_status
FROM dogs
WHERE status = 'active' AND last_health_score < 6
ORDER BY last_health_score ASC
LIMIT 10
""")
stats['dogs_needing_attention'] = []
for row in self.cursor.fetchall():
stats['dogs_needing_attention'].append({
'dog_id': row['dog_id'],
'name': row['name'] or f"Dog #{row['dog_id']}",
'health_score': row['last_health_score'],
'status': row['health_status']
})
return stats
def save_pose_keypoints(self, dog_id: int, keypoints: np.ndarray,
frame_number: int, video_source: str):
"""Save pose keypoints for a dog"""
keypoints_json = json.dumps(keypoints.tolist()) if keypoints is not None else None
self.cursor.execute("""
UPDATE sightings
SET pose_keypoints = ?
WHERE dog_id = ? AND frame_number = ? AND video_source = ?
""", (keypoints_json, dog_id, frame_number, video_source))
self.conn.commit()
def export_training_dataset(self, output_dir: str = "training_dataset") -> Dict:
"""
Export database images to folder structure for fine-tuning
Output structure:
training_dataset/
β”œβ”€β”€ dog_1/
β”‚ β”œβ”€β”€ img_0000.jpg
β”‚ └── ...
β”œβ”€β”€ dog_2/
└── metadata.json
Returns:
dict with export statistics
"""
from pathlib import Path
import json
from datetime import datetime
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Get all active dogs
dogs_df = self.get_all_dogs(active_only=True)
if dogs_df.empty:
return {
'success': False,
'message': 'No dogs in database',
'total_dogs': 0,
'total_images': 0
}
total_images = 0
export_info = []
for _, dog in dogs_df.iterrows():
dog_id = dog['dog_id']
dog_name = dog['name'] or f"dog_{dog_id}"
# Sanitize folder name (remove special characters)
safe_name = "".join(c for c in dog_name if c.isalnum() or c in ('_', '-'))
# Create folder for this dog
dog_folder = output_path / safe_name
dog_folder.mkdir(exist_ok=True)
# Get all non-discarded images
images = self.get_dog_images(
dog_id=dog_id,
validated_only=False,
include_discarded=False
)
if not images:
print(f"Warning: Dog {dog_id} has no images")
continue
# Save each image as JPG
for idx, img_data in enumerate(images):
image = img_data['image'] # OpenCV array from BLOB
filename = f"img_{idx:04d}.jpg"
filepath = dog_folder / filename
# Write to disk
success = cv2.imwrite(str(filepath), image)
if not success:
print(f"Warning: Failed to write {filepath}")
total_images += len(images)
export_info.append({
'dog_id': dog_id,
'name': dog_name,
'folder_name': safe_name,
'image_count': len(images),
'folder': str(dog_folder)
})
print(f"Exported {len(images)} images for {dog_name}")
# Create metadata file
metadata = {
'export_date': datetime.now().isoformat(),
'total_dogs': len(dogs_df),
'total_images': total_images,
'output_directory': str(output_path),
'dogs': export_info
}
metadata_path = output_path / 'metadata.json'
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
print(f"\nExport complete: {len(dogs_df)} dogs, {total_images} images")
print(f"Location: {output_path}")
return {
'success': True,
'total_dogs': len(dogs_df),
'total_images': total_images,
'output_path': str(output_path),
'dogs': export_info
}