Spaces:
Runtime error
Runtime error
import sqlite3 | |
import base64 | |
import uuid | |
from pathlib import Path | |
from typing import Optional, List, Dict, Tuple | |
from PIL import Image | |
import io | |
import hashlib | |
class ImageDatabase: | |
"""SQLite database for storing images and metadata""" | |
def __init__(self, db_path: str = "images.db"): | |
self.db_path = db_path | |
self.init_database() | |
def init_database(self): | |
"""Initialize the database with required tables""" | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
# Create images table | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS images ( | |
id TEXT PRIMARY KEY, | |
file_hash TEXT UNIQUE NOT NULL, | |
original_path TEXT NOT NULL, | |
filename TEXT NOT NULL, | |
file_extension TEXT NOT NULL, | |
file_size INTEGER NOT NULL, | |
width INTEGER NOT NULL, | |
height INTEGER NOT NULL, | |
image_data BLOB NOT NULL, | |
thumbnail_data BLOB, | |
root_folder TEXT NOT NULL, | |
relative_path TEXT NOT NULL, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
# Create indexes for better performance | |
cursor.execute('CREATE INDEX IF NOT EXISTS idx_file_hash ON images(file_hash)') | |
cursor.execute('CREATE INDEX IF NOT EXISTS idx_root_folder ON images(root_folder)') | |
cursor.execute('CREATE INDEX IF NOT EXISTS idx_relative_path ON images(relative_path)') | |
cursor.execute('CREATE INDEX IF NOT EXISTS idx_filename ON images(filename)') | |
conn.commit() | |
conn.close() | |
def _calculate_file_hash(self, image_data: bytes) -> str: | |
"""Calculate SHA-256 hash of image data""" | |
return hashlib.sha256(image_data).hexdigest() | |
def _create_thumbnail(self, image: Image.Image, size: Tuple[int, int] = (200, 200)) -> bytes: | |
"""Create a thumbnail of the image""" | |
# Create a copy to avoid modifying original | |
thumbnail = image.copy() | |
thumbnail.thumbnail(size, Image.Resampling.LANCZOS) | |
# Convert to bytes | |
img_byte_arr = io.BytesIO() | |
# Save as JPEG for thumbnails to reduce size | |
if thumbnail.mode in ('RGBA', 'LA', 'P'): | |
thumbnail = thumbnail.convert('RGB') | |
thumbnail.save(img_byte_arr, format='JPEG', quality=85, optimize=True) | |
return img_byte_arr.getvalue() | |
def store_image(self, image_path: Path, root_folder: Path) -> Optional[str]: | |
""" | |
Store an image in the database | |
Returns the image ID if successful, None if failed | |
""" | |
try: | |
# Load the image | |
with Image.open(image_path) as image: | |
# Convert to RGB if needed | |
if image.mode in ('RGBA', 'LA', 'P'): | |
image = image.convert('RGB') | |
# Get image data as bytes | |
img_byte_arr = io.BytesIO() | |
image.save(img_byte_arr, format='JPEG', quality=95, optimize=True) | |
image_data = img_byte_arr.getvalue() | |
# Calculate file hash | |
file_hash = self._calculate_file_hash(image_data) | |
# Create thumbnail | |
thumbnail_data = self._create_thumbnail(image) | |
# Calculate relative path | |
relative_path = str(image_path.relative_to(root_folder)) | |
# Prepare metadata | |
image_id = str(uuid.uuid4()) | |
filename = image_path.name | |
file_extension = image_path.suffix.lower() | |
file_size = len(image_data) | |
width, height = image.size | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
# Check if image already exists (by hash) | |
cursor.execute('SELECT id FROM images WHERE file_hash = ?', (file_hash,)) | |
existing = cursor.fetchone() | |
if existing: | |
print(f"Image already exists in database: {filename}") | |
conn.close() | |
return existing[0] | |
# Insert new image | |
cursor.execute(''' | |
INSERT INTO images ( | |
id, file_hash, original_path, filename, file_extension, | |
file_size, width, height, image_data, thumbnail_data, | |
root_folder, relative_path | |
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
''', ( | |
image_id, file_hash, str(image_path.absolute()), filename, | |
file_extension, file_size, width, height, image_data, | |
thumbnail_data, str(root_folder.absolute()), relative_path | |
)) | |
conn.commit() | |
conn.close() | |
print(f"Stored image in database: {filename} (ID: {image_id})") | |
return image_id | |
except Exception as e: | |
print(f"Error storing image {image_path}: {e}") | |
return None | |
def get_image(self, image_id: str) -> Optional[Dict]: | |
"""Get an image by ID""" | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
cursor.execute(''' | |
SELECT id, filename, file_extension, file_size, width, height, | |
image_data, root_folder, relative_path, created_at | |
FROM images WHERE id = ? | |
''', (image_id,)) | |
result = cursor.fetchone() | |
conn.close() | |
if result: | |
return { | |
'id': result[0], | |
'filename': result[1], | |
'file_extension': result[2], | |
'file_size': result[3], | |
'width': result[4], | |
'height': result[5], | |
'image_data': result[6], | |
'root_folder': result[7], | |
'relative_path': result[8], | |
'created_at': result[9] | |
} | |
return None | |
def get_thumbnail(self, image_id: str) -> Optional[bytes]: | |
"""Get thumbnail data for an image""" | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
cursor.execute('SELECT thumbnail_data FROM images WHERE id = ?', (image_id,)) | |
result = cursor.fetchone() | |
conn.close() | |
return result[0] if result else None | |
def get_images_by_folder(self, root_folder: str) -> List[Dict]: | |
"""Get all images from a specific folder""" | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
cursor.execute(''' | |
SELECT id, filename, file_extension, file_size, width, height, | |
root_folder, relative_path, created_at | |
FROM images WHERE root_folder = ? | |
ORDER BY created_at DESC | |
''', (root_folder,)) | |
results = cursor.fetchall() | |
conn.close() | |
return [ | |
{ | |
'id': row[0], | |
'filename': row[1], | |
'file_extension': row[2], | |
'file_size': row[3], | |
'width': row[4], | |
'height': row[5], | |
'root_folder': row[6], | |
'relative_path': row[7], | |
'created_at': row[8] | |
} | |
for row in results | |
] | |
def get_all_images(self) -> List[Dict]: | |
"""Get all images from the database""" | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
cursor.execute(''' | |
SELECT id, filename, file_extension, file_size, width, height, | |
root_folder, relative_path, created_at | |
FROM images | |
ORDER BY created_at DESC | |
''') | |
results = cursor.fetchall() | |
conn.close() | |
return [ | |
{ | |
'id': row[0], | |
'filename': row[1], | |
'file_extension': row[2], | |
'file_size': row[3], | |
'width': row[4], | |
'height': row[5], | |
'root_folder': row[6], | |
'relative_path': row[7], | |
'created_at': row[8] | |
} | |
for row in results | |
] | |
def delete_image(self, image_id: str) -> bool: | |
"""Delete an image from the database""" | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
cursor.execute('DELETE FROM images WHERE id = ?', (image_id,)) | |
deleted = cursor.rowcount > 0 | |
conn.commit() | |
conn.close() | |
return deleted | |
def delete_images_by_folder(self, root_folder: str) -> int: | |
"""Delete all images from a specific folder""" | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
cursor.execute('DELETE FROM images WHERE root_folder = ?', (root_folder,)) | |
deleted_count = cursor.rowcount | |
conn.commit() | |
conn.close() | |
return deleted_count | |
def image_exists_by_path(self, relative_path: str, root_folder: str) -> Optional[str]: | |
"""Check if an image exists by its path, return image ID if exists""" | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
cursor.execute(''' | |
SELECT id FROM images | |
WHERE relative_path = ? AND root_folder = ? | |
''', (relative_path, root_folder)) | |
result = cursor.fetchone() | |
conn.close() | |
return result[0] if result else None | |
def get_database_stats(self) -> Dict: | |
"""Get database statistics""" | |
conn = sqlite3.connect(self.db_path) | |
cursor = conn.cursor() | |
# Total images | |
cursor.execute('SELECT COUNT(*) FROM images') | |
total_images = cursor.fetchone()[0] | |
# Total size | |
cursor.execute('SELECT SUM(file_size) FROM images') | |
total_size = cursor.fetchone()[0] or 0 | |
# Images by folder | |
cursor.execute('SELECT root_folder, COUNT(*) FROM images GROUP BY root_folder') | |
folders = cursor.fetchall() | |
conn.close() | |
return { | |
'total_images': total_images, | |
'total_size_bytes': total_size, | |
'total_size_mb': round(total_size / (1024 * 1024), 2), | |
'folders': {folder: count for folder, count in folders} | |
} | |