Stray_Dogs / database.py
mustafa2ak's picture
Update database.py
6bba3db verified
raw
history blame
22.7 kB
"""
database.py - SQLite Database Manager for Dog Monitoring System
Handles persistent storage of dog data, features, and annotations
"""
import sqlite3
import json
import pickle
import base64
import numpy as np
import cv2
from datetime import datetime
from typing import List, Dict, Optional, Tuple, Any
from pathlib import Path
import pandas as pd
class DogDatabase:
"""SQLite database manager for dog monitoring system"""
def __init__(self, db_path: str = "dog_monitoring.db"):
"""Initialize database connection and create tables"""
self.db_path = db_path
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self.conn.row_factory = sqlite3.Row
self.cursor = self.conn.cursor()
# Create tables if they don't exist
self._create_tables()
def _create_tables(self):
"""Create all necessary database tables"""
# Dogs table - main registry
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS dogs (
dog_id INTEGER PRIMARY KEY,
name TEXT,
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
total_sightings INTEGER DEFAULT 1,
notes TEXT,
merged_from TEXT, -- JSON list of merged dog IDs
status TEXT DEFAULT 'active' -- active, merged, deleted
)
""")
# Dog features table - stores extracted features
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS dog_features (
feature_id INTEGER PRIMARY KEY AUTOINCREMENT,
dog_id INTEGER,
resnet_features BLOB, -- Pickled numpy array
color_histogram BLOB, -- Pickled numpy array
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
confidence REAL,
FOREIGN KEY (dog_id) REFERENCES dogs(dog_id)
)
""")
# Dog images table - stores actual images
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS dog_images (
image_id INTEGER PRIMARY KEY AUTOINCREMENT,
dog_id INTEGER,
image_data BLOB, -- Base64 encoded image
thumbnail BLOB, -- Small preview
width INTEGER,
height INTEGER,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
frame_number INTEGER,
video_source TEXT,
bbox TEXT, -- JSON [x1, y1, x2, y2]
confidence REAL,
pose_keypoints TEXT, -- JSON of keypoints
is_validated BOOLEAN DEFAULT 0,
is_discarded BOOLEAN DEFAULT 0,
FOREIGN KEY (dog_id) REFERENCES dogs(dog_id)
)
""")
# Body parts table - stores cropped body parts
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS body_parts (
part_id INTEGER PRIMARY KEY AUTOINCREMENT,
dog_id INTEGER,
image_id INTEGER,
part_type TEXT, -- 'head', 'torso', 'rear'
part_image BLOB, -- Base64 encoded crop
crop_bbox TEXT, -- JSON [x1, y1, x2, y2] relative to full image
confidence REAL,
is_validated BOOLEAN DEFAULT 0,
is_discarded BOOLEAN DEFAULT 0,
FOREIGN KEY (dog_id) REFERENCES dogs(dog_id),
FOREIGN KEY (image_id) REFERENCES dog_images(image_id)
)
""")
# Sightings table - tracks when/where dogs were seen
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS sightings (
sighting_id INTEGER PRIMARY KEY AUTOINCREMENT,
dog_id INTEGER,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
position_x REAL,
position_y REAL,
video_source TEXT,
frame_number INTEGER,
confidence REAL,
FOREIGN KEY (dog_id) REFERENCES dogs(dog_id)
)
""")
# Processing sessions table
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS sessions (
session_id INTEGER PRIMARY KEY AUTOINCREMENT,
start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
end_time TIMESTAMP,
video_path TEXT,
total_frames INTEGER,
dogs_detected INTEGER,
settings TEXT -- JSON of processing settings
)
""")
# Create indexes for performance
self.cursor.execute("CREATE INDEX IF NOT EXISTS idx_dog_features ON dog_features(dog_id)")
self.cursor.execute("CREATE INDEX IF NOT EXISTS idx_dog_images ON dog_images(dog_id)")
self.cursor.execute("CREATE INDEX IF NOT EXISTS idx_sightings ON sightings(dog_id)")
self.conn.commit()
# ========== Dog Management ==========
def add_dog(self, dog_id: Optional[int] = None, name: Optional[str] = None) -> int:
"""Add a new dog to the database"""
if dog_id:
self.cursor.execute(
"INSERT OR IGNORE INTO dogs (dog_id, name) VALUES (?, ?)",
(dog_id, name)
)
else:
self.cursor.execute(
"INSERT INTO dogs (name) VALUES (?)",
(name,)
)
dog_id = self.cursor.lastrowid
self.conn.commit()
return dog_id
def update_dog_sighting(self, dog_id: int):
"""Update last seen time and increment sighting count"""
self.cursor.execute("""
UPDATE dogs
SET last_seen = CURRENT_TIMESTAMP,
total_sightings = total_sightings + 1
WHERE dog_id = ?
""", (dog_id,))
self.conn.commit()
def merge_dogs(self, keep_id: int, merge_id: int) -> bool:
"""Merge two dogs, keeping keep_id"""
try:
# Update all references
self.cursor.execute("UPDATE dog_features SET dog_id = ? WHERE dog_id = ?",
(keep_id, merge_id))
self.cursor.execute("UPDATE dog_images SET dog_id = ? WHERE dog_id = ?",
(keep_id, merge_id))
self.cursor.execute("UPDATE sightings SET dog_id = ? WHERE dog_id = ?",
(keep_id, merge_id))
# Get merged_from history
self.cursor.execute("SELECT merged_from FROM dogs WHERE dog_id = ?", (merge_id,))
row = self.cursor.fetchone()
merged_history = json.loads(row['merged_from'] if row and row['merged_from'] else '[]')
merged_history.append(merge_id)
# Update keep_id dog with merge history
self.cursor.execute("""
UPDATE dogs
SET merged_from = ?,
total_sightings = total_sightings + (
SELECT total_sightings FROM dogs WHERE dog_id = ?
)
WHERE dog_id = ?
""", (json.dumps(merged_history), merge_id, keep_id))
# Mark merge_id as merged
self.cursor.execute(
"UPDATE dogs SET status = 'merged' WHERE dog_id = ?",
(merge_id,)
)
self.conn.commit()
return True
except Exception as e:
print(f"Error merging dogs: {e}")
self.conn.rollback()
return False
def delete_dog(self, dog_id: int, hard_delete: bool = False):
"""Delete or mark dog as deleted"""
if hard_delete:
# Hard delete - remove all data
self.cursor.execute("DELETE FROM dog_features WHERE dog_id = ?", (dog_id,))
self.cursor.execute("DELETE FROM dog_images WHERE dog_id = ?", (dog_id,))
self.cursor.execute("DELETE FROM sightings WHERE dog_id = ?", (dog_id,))
self.cursor.execute("DELETE FROM dogs WHERE dog_id = ?", (dog_id,))
else:
# Soft delete - mark as deleted
self.cursor.execute(
"UPDATE dogs SET status = 'deleted' WHERE dog_id = ?",
(dog_id,)
)
self.conn.commit()
# ========== Features Management ==========
def save_features(self, dog_id: int, resnet_features: np.ndarray,
color_histogram: np.ndarray, confidence: float):
"""Save dog features to database"""
resnet_blob = pickle.dumps(resnet_features)
color_blob = pickle.dumps(color_histogram)
self.cursor.execute("""
INSERT INTO dog_features
(dog_id, resnet_features, color_histogram, confidence)
VALUES (?, ?, ?, ?)
""", (dog_id, resnet_blob, color_blob, confidence))
self.conn.commit()
def get_features(self, dog_id: int, limit: int = 20) -> List[Dict]:
"""Get recent features for a dog"""
self.cursor.execute("""
SELECT * FROM dog_features
WHERE dog_id = ?
ORDER BY timestamp DESC
LIMIT ?
""", (dog_id, limit))
features = []
for row in self.cursor.fetchall():
features.append({
'resnet_features': pickle.loads(row['resnet_features']),
'color_histogram': pickle.loads(row['color_histogram']),
'confidence': row['confidence'],
'timestamp': row['timestamp']
})
return features
# ========== Images Management ==========
def save_image(self, dog_id: int, image: np.ndarray,
frame_number: int, video_source: str,
bbox: List[float], confidence: float,
pose_keypoints: Optional[List] = None):
"""Save dog image to database"""
# Encode image as JPEG
_, buffer = cv2.imencode('.jpg', image)
image_data = base64.b64encode(buffer).decode('utf-8')
# Create thumbnail
thumbnail = cv2.resize(image, (128, 128))
_, thumb_buffer = cv2.imencode('.jpg', thumbnail, [cv2.IMWRITE_JPEG_QUALITY, 70])
thumb_data = base64.b64encode(thumb_buffer).decode('utf-8')
h, w = image.shape[:2]
self.cursor.execute("""
INSERT INTO dog_images
(dog_id, image_data, thumbnail, width, height,
frame_number, video_source, bbox, confidence, pose_keypoints)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (dog_id, image_data, thumb_data, w, h,
frame_number, video_source, json.dumps(bbox),
confidence, json.dumps(pose_keypoints) if pose_keypoints else None))
self.conn.commit()
return self.cursor.lastrowid # THIS LINE - make sure it returns the image_id
def get_dog_images(self, dog_id: int, validated_only: bool = False,
include_discarded: bool = False) -> List[Dict]:
"""Get all images for a dog"""
query = "SELECT * FROM dog_images WHERE dog_id = ?"
params = [dog_id]
if validated_only:
query += " AND is_validated = 1"
if not include_discarded:
query += " AND is_discarded = 0"
query += " ORDER BY timestamp DESC"
self.cursor.execute(query, params)
rows = self.cursor.fetchall()
images = []
for row in rows:
# Decode image
image_bytes = base64.b64decode(row['image_data'])
nparr = np.frombuffer(image_bytes, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
images.append({
'image_id': row['image_id'],
'image': image,
'thumbnail': row['thumbnail'],
'bbox': json.loads(row['bbox']),
'confidence': row['confidence'],
'frame_number': row['frame_number'],
'video_source': row['video_source'],
'is_validated': row['is_validated'],
'is_discarded': row['is_discarded'],
'pose_keypoints': json.loads(row['pose_keypoints']) if row['pose_keypoints'] else None
})
return images
def validate_image(self, image_id: int, is_valid: bool = True):
"""Mark image as validated or discarded"""
if is_valid:
self.cursor.execute(
"UPDATE dog_images SET is_validated = 1 WHERE image_id = ?",
(image_id,)
)
else:
self.cursor.execute(
"UPDATE dog_images SET is_discarded = 1 WHERE image_id = ?",
(image_id,)
)
self.conn.commit()
# ========== Body Parts Management ==========
def save_body_parts(self, dog_id: int, image_id: int,
head_crop: Optional[np.ndarray],
torso_crop: Optional[np.ndarray],
rear_crop: Optional[np.ndarray],
confidences: Dict[str, float]):
"""Save body part crops to database"""
parts = {
'head': head_crop,
'torso': torso_crop,
'rear': rear_crop
}
for part_type, crop in parts.items():
if crop is not None:
# Encode crop as JPEG
_, buffer = cv2.imencode('.jpg', crop)
crop_data = base64.b64encode(buffer).decode('utf-8')
confidence = confidences.get(part_type, 0.0)
self.cursor.execute("""
INSERT INTO body_parts
(dog_id, image_id, part_type, part_image, confidence)
VALUES (?, ?, ?, ?, ?)
""", (dog_id, image_id, part_type, crop_data, confidence))
self.conn.commit()
def get_body_parts(self, dog_id: int, part_type: Optional[str] = None,
validated_only: bool = False, include_discarded: bool = False) -> List[Dict]:
"""Get body part crops for a dog"""
query = "SELECT * FROM body_parts WHERE dog_id = ?"
params = [dog_id]
if part_type:
query += " AND part_type = ?"
params.append(part_type)
if validated_only:
query += " AND is_validated = 1"
if not include_discarded:
query += " AND is_discarded = 0"
self.cursor.execute(query, params)
parts = []
for row in self.cursor.fetchall():
# Decode image
image_bytes = base64.b64decode(row['part_image'])
nparr = np.frombuffer(image_bytes, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
parts.append({
'part_id': row['part_id'],
'part_type': row['part_type'],
'image': image,
'confidence': row['confidence'],
'is_validated': row['is_validated'],
'image_id': row['image_id']
})
return parts
def validate_body_part(self, part_id: int, is_valid: bool = True):
"""Mark body part as validated or discarded"""
if is_valid:
self.cursor.execute(
"UPDATE body_parts SET is_validated = 1 WHERE part_id = ?",
(part_id,)
)
else:
self.cursor.execute(
"UPDATE body_parts SET is_discarded = 1 WHERE part_id = ?",
(part_id,)
)
self.conn.commit()
def add_sighting(self, dog_id: int, position: Tuple[float, float],
video_source: str, frame_number: int, confidence: float):
"""Record a dog sighting"""
self.cursor.execute("""
INSERT INTO sightings
(dog_id, position_x, position_y, video_source, frame_number, confidence)
VALUES (?, ?, ?, ?, ?, ?)
""", (dog_id, position[0], position[1], video_source, frame_number, confidence))
self.conn.commit()
# ========== Query Methods ==========
def get_all_dogs(self, active_only: bool = True) -> pd.DataFrame:
"""Get all dogs as DataFrame"""
query = "SELECT * FROM dogs"
if active_only:
query += " WHERE status = 'active'"
query += " ORDER BY dog_id"
return pd.read_sql_query(query, self.conn)
def get_dog_statistics(self) -> Dict:
"""Get overall statistics"""
stats = {}
# Total dogs
self.cursor.execute("SELECT COUNT(*) FROM dogs WHERE status = 'active'")
stats['total_active_dogs'] = self.cursor.fetchone()[0]
# Total images
self.cursor.execute("SELECT COUNT(*) FROM dog_images WHERE is_discarded = 0")
stats['total_images'] = self.cursor.fetchone()[0]
# Validated images
self.cursor.execute("SELECT COUNT(*) FROM dog_images WHERE is_validated = 1")
stats['validated_images'] = self.cursor.fetchone()[0]
# Total sightings
self.cursor.execute("SELECT COUNT(*) FROM sightings")
stats['total_sightings'] = self.cursor.fetchone()[0]
# Most seen dog
self.cursor.execute("""
SELECT d.dog_id, d.name, d.total_sightings
FROM dogs d
WHERE d.status = 'active'
ORDER BY d.total_sightings DESC
LIMIT 1
""")
row = self.cursor.fetchone()
if row:
stats['most_seen_dog'] = {
'dog_id': row[0],
'name': row[1] or f"Dog #{row[0]}",
'sightings': row[2]
}
return stats
# ========== Export Methods ==========
def export_training_dataset(self, output_dir: str, validated_only: bool = True) -> Dict:
"""Export dataset with body parts for fine-tuning"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Create directories
images_dir = output_path / "images"
images_dir.mkdir(exist_ok=True)
# Export data
dataset = []
dogs = self.get_all_dogs()
for _, dog in dogs.iterrows():
dog_id = dog['dog_id']
# Create directories for each dog
dog_dir = images_dir / f"dog_{dog_id}"
dog_dir.mkdir(exist_ok=True)
# Subdirectories for body parts
for part in ['full', 'head', 'torso', 'rear']:
part_dir = dog_dir / part
part_dir.mkdir(exist_ok=True)
# Get full images
images = self.get_dog_images(dog_id, validated_only=validated_only)
for idx, img_data in enumerate(images):
# Save full image
full_path = dog_dir / 'full' / f"img_{idx:04d}.jpg"
cv2.imwrite(str(full_path), img_data['image'])
# Get and save body parts for this image
parts = self.get_body_parts(dog_id, validated_only=validated_only)
part_paths = {}
for part_data in parts:
if part_data['image_id'] == img_data['image_id']:
part_type = part_data['part_type']
part_path = dog_dir / part_type / f"img_{idx:04d}.jpg"
cv2.imwrite(str(part_path), part_data['image'])
part_paths[part_type] = str(part_path.relative_to(output_path))
# Add to dataset
dataset_entry = {
'dog_id': dog_id,
'full_image': str(full_path.relative_to(output_path)),
'bbox': img_data['bbox'],
'confidence': img_data['confidence']
}
# Add body part paths if available
for part_type in ['head', 'torso', 'rear']:
dataset_entry[f'{part_type}_image'] = part_paths.get(part_type, None)
dataset.append(dataset_entry)
# Save dataset info
dataset_df = pd.DataFrame(dataset)
dataset_df.to_csv(output_path / "dataset.csv", index=False)
# Save metadata
metadata = {
'total_dogs': len(dogs),
'total_images': len(dataset),
'export_date': datetime.now().isoformat(),
'validated_only': validated_only,
'includes_body_parts': True
}
with open(output_path / "metadata.json", 'w') as f:
json.dump(metadata, f, indent=2)
# Create training splits
from sklearn.model_selection import train_test_split
train_df, test_df = train_test_split(dataset_df, test_size=0.2,
stratify=dataset_df['dog_id'])
train_df.to_csv(output_path / "train.csv", index=False)
test_df.to_csv(output_path / "test.csv", index=False)
metadata['train_samples'] = len(train_df)
metadata['test_samples'] = len(test_df)
return metadata
# ========== Cleanup Methods ==========
def reset_database(self, confirm: bool = False):
"""Reset entire database"""
if not confirm:
return False
tables = ['sightings', 'dog_images', 'dog_features', 'dogs', 'sessions']
for table in tables:
self.cursor.execute(f"DELETE FROM {table}")
# Reset autoincrement
self.cursor.execute("DELETE FROM sqlite_sequence")
self.conn.commit()
return True
def vacuum(self):
"""Optimize database file size"""
self.conn.execute("VACUUM")
def close(self):
"""Close database connection"""
self.conn.close()
def __del__(self):
"""Ensure connection is closed"""
if hasattr(self, 'conn'):
self.conn.close()