Visual-Image / image_database.py
VesperAI's picture
addede a Production Branch
60444f3
import sqlite3
import base64
import uuid
from pathlib import Path
from typing import Optional, List, Dict, Tuple
from PIL import Image
import io
import hashlib
class ImageDatabase:
"""SQLite database for storing images and metadata"""
def __init__(self, db_path: str = "images.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""Initialize the database with required tables"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create images table
cursor.execute('''
CREATE TABLE IF NOT EXISTS images (
id TEXT PRIMARY KEY,
file_hash TEXT UNIQUE NOT NULL,
original_path TEXT NOT NULL,
filename TEXT NOT NULL,
file_extension TEXT NOT NULL,
file_size INTEGER NOT NULL,
width INTEGER NOT NULL,
height INTEGER NOT NULL,
image_data BLOB NOT NULL,
thumbnail_data BLOB,
root_folder TEXT NOT NULL,
relative_path TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create indexes for better performance
cursor.execute('CREATE INDEX IF NOT EXISTS idx_file_hash ON images(file_hash)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_root_folder ON images(root_folder)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_relative_path ON images(relative_path)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_filename ON images(filename)')
conn.commit()
conn.close()
def _calculate_file_hash(self, image_data: bytes) -> str:
"""Calculate SHA-256 hash of image data"""
return hashlib.sha256(image_data).hexdigest()
def _create_thumbnail(self, image: Image.Image, size: Tuple[int, int] = (200, 200)) -> bytes:
"""Create a thumbnail of the image"""
# Create a copy to avoid modifying original
thumbnail = image.copy()
thumbnail.thumbnail(size, Image.Resampling.LANCZOS)
# Convert to bytes
img_byte_arr = io.BytesIO()
# Save as JPEG for thumbnails to reduce size
if thumbnail.mode in ('RGBA', 'LA', 'P'):
thumbnail = thumbnail.convert('RGB')
thumbnail.save(img_byte_arr, format='JPEG', quality=85, optimize=True)
return img_byte_arr.getvalue()
def store_image(self, image_path: Path, root_folder: Path) -> Optional[str]:
"""
Store an image in the database
Returns the image ID if successful, None if failed
"""
try:
# Load the image
with Image.open(image_path) as image:
# Convert to RGB if needed
if image.mode in ('RGBA', 'LA', 'P'):
image = image.convert('RGB')
# Get image data as bytes
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='JPEG', quality=95, optimize=True)
image_data = img_byte_arr.getvalue()
# Calculate file hash
file_hash = self._calculate_file_hash(image_data)
# Create thumbnail
thumbnail_data = self._create_thumbnail(image)
# Calculate relative path
relative_path = str(image_path.relative_to(root_folder))
# Prepare metadata
image_id = str(uuid.uuid4())
filename = image_path.name
file_extension = image_path.suffix.lower()
file_size = len(image_data)
width, height = image.size
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Check if image already exists (by hash)
cursor.execute('SELECT id FROM images WHERE file_hash = ?', (file_hash,))
existing = cursor.fetchone()
if existing:
print(f"Image already exists in database: {filename}")
conn.close()
return existing[0]
# Insert new image
cursor.execute('''
INSERT INTO images (
id, file_hash, original_path, filename, file_extension,
file_size, width, height, image_data, thumbnail_data,
root_folder, relative_path
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
image_id, file_hash, str(image_path.absolute()), filename,
file_extension, file_size, width, height, image_data,
thumbnail_data, str(root_folder.absolute()), relative_path
))
conn.commit()
conn.close()
print(f"Stored image in database: {filename} (ID: {image_id})")
return image_id
except Exception as e:
print(f"Error storing image {image_path}: {e}")
return None
def get_image(self, image_id: str) -> Optional[Dict]:
"""Get an image by ID"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT id, filename, file_extension, file_size, width, height,
image_data, root_folder, relative_path, created_at
FROM images WHERE id = ?
''', (image_id,))
result = cursor.fetchone()
conn.close()
if result:
return {
'id': result[0],
'filename': result[1],
'file_extension': result[2],
'file_size': result[3],
'width': result[4],
'height': result[5],
'image_data': result[6],
'root_folder': result[7],
'relative_path': result[8],
'created_at': result[9]
}
return None
def get_thumbnail(self, image_id: str) -> Optional[bytes]:
"""Get thumbnail data for an image"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('SELECT thumbnail_data FROM images WHERE id = ?', (image_id,))
result = cursor.fetchone()
conn.close()
return result[0] if result else None
def get_images_by_folder(self, root_folder: str) -> List[Dict]:
"""Get all images from a specific folder"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT id, filename, file_extension, file_size, width, height,
root_folder, relative_path, created_at
FROM images WHERE root_folder = ?
ORDER BY created_at DESC
''', (root_folder,))
results = cursor.fetchall()
conn.close()
return [
{
'id': row[0],
'filename': row[1],
'file_extension': row[2],
'file_size': row[3],
'width': row[4],
'height': row[5],
'root_folder': row[6],
'relative_path': row[7],
'created_at': row[8]
}
for row in results
]
def get_all_images(self) -> List[Dict]:
"""Get all images from the database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT id, filename, file_extension, file_size, width, height,
root_folder, relative_path, created_at
FROM images
ORDER BY created_at DESC
''')
results = cursor.fetchall()
conn.close()
return [
{
'id': row[0],
'filename': row[1],
'file_extension': row[2],
'file_size': row[3],
'width': row[4],
'height': row[5],
'root_folder': row[6],
'relative_path': row[7],
'created_at': row[8]
}
for row in results
]
def delete_image(self, image_id: str) -> bool:
"""Delete an image from the database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('DELETE FROM images WHERE id = ?', (image_id,))
deleted = cursor.rowcount > 0
conn.commit()
conn.close()
return deleted
def delete_images_by_folder(self, root_folder: str) -> int:
"""Delete all images from a specific folder"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('DELETE FROM images WHERE root_folder = ?', (root_folder,))
deleted_count = cursor.rowcount
conn.commit()
conn.close()
return deleted_count
def image_exists_by_path(self, relative_path: str, root_folder: str) -> Optional[str]:
"""Check if an image exists by its path, return image ID if exists"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT id FROM images
WHERE relative_path = ? AND root_folder = ?
''', (relative_path, root_folder))
result = cursor.fetchone()
conn.close()
return result[0] if result else None
def get_database_stats(self) -> Dict:
"""Get database statistics"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Total images
cursor.execute('SELECT COUNT(*) FROM images')
total_images = cursor.fetchone()[0]
# Total size
cursor.execute('SELECT SUM(file_size) FROM images')
total_size = cursor.fetchone()[0] or 0
# Images by folder
cursor.execute('SELECT root_folder, COUNT(*) FROM images GROUP BY root_folder')
folders = cursor.fetchall()
conn.close()
return {
'total_images': total_images,
'total_size_bytes': total_size,
'total_size_mb': round(total_size / (1024 * 1024), 2),
'folders': {folder: count for folder, count in folders}
}