Spaces:
Running
Running
""" | |
Migration Utilities for Epic 2 Demo Database | |
============================================ | |
Utilities to migrate existing pickle-based cache to persistent database | |
and handle data migrations between versions. | |
""" | |
import logging | |
import pickle | |
import json | |
import time | |
from pathlib import Path | |
from typing import Dict, Any, List, Optional, Tuple | |
import numpy as np | |
from .database_manager import DatabaseManager, get_database_manager | |
from .knowledge_cache import KnowledgeCache | |
logger = logging.getLogger(__name__) | |
class CacheMigrator: | |
"""Handles migration from pickle cache to database""" | |
def __init__(self, db_manager: Optional[DatabaseManager] = None): | |
""" | |
Initialize cache migrator | |
Args: | |
db_manager: Database manager instance (creates default if None) | |
""" | |
self.db_manager = db_manager or get_database_manager() | |
self.knowledge_cache = KnowledgeCache() | |
def migrate_cache_to_database(self, pdf_files: List[Path], | |
processor_config: Dict[str, Any], | |
embedder_config: Dict[str, Any]) -> bool: | |
""" | |
Migrate existing pickle cache to database | |
Args: | |
pdf_files: List of PDF files that were processed | |
processor_config: Document processor configuration | |
embedder_config: Embedder configuration | |
Returns: | |
True if migration successful | |
""" | |
logger.info("Starting migration from pickle cache to database...") | |
try: | |
# Check if cache is valid and has data | |
# Note: knowledge_cache.is_cache_valid expects (pdf_files, embedder_config) but | |
# create_embedder_config_hash expects a system object | |
# For migration, we'll use a simplified validation | |
if not self.knowledge_cache.is_valid(): | |
logger.warning("Pickle cache is not valid or missing") | |
return False | |
# Load documents and embeddings from pickle cache | |
documents, embeddings = self.knowledge_cache.load_knowledge_base() | |
if not documents or embeddings is None: | |
logger.warning("No data found in pickle cache") | |
return False | |
logger.info(f"Loaded {len(documents)} documents and {embeddings.shape} embeddings from pickle cache") | |
# Convert documents to expected format | |
converted_docs = self._convert_documents_format(documents, embeddings) | |
logger.info(f"Converted {len(converted_docs)} documents for database save") | |
# Save to database | |
success = self.db_manager.save_documents_and_embeddings( | |
converted_docs, pdf_files, processor_config, embedder_config | |
) | |
if success: | |
logger.info("Migration to database completed successfully") | |
# Create backup of pickle cache before clearing | |
self._backup_pickle_cache() | |
# Optionally clear pickle cache | |
logger.info("Migration successful - pickle cache backed up") | |
return True | |
else: | |
logger.error("Failed to save migrated data to database") | |
return False | |
except Exception as e: | |
logger.error(f"Cache migration failed: {e}") | |
return False | |
def _convert_documents_format(self, documents: List[Any], embeddings: np.ndarray) -> List[Dict[str, Any]]: | |
"""Convert documents from pickle format to database format""" | |
converted_docs = [] | |
for i, doc in enumerate(documents): | |
# Handle different document formats | |
if hasattr(doc, '__dict__'): | |
# Object format | |
converted_doc = { | |
'content': getattr(doc, 'content', ''), | |
'metadata': getattr(doc, 'metadata', {}), | |
'confidence': getattr(doc, 'confidence', 0.8), | |
'embedding': embeddings[i] if i < len(embeddings) else None | |
} | |
elif isinstance(doc, dict): | |
# Dictionary format | |
converted_doc = { | |
'content': doc.get('content', ''), | |
'metadata': doc.get('metadata', {}), | |
'confidence': doc.get('confidence', 0.8), | |
'embedding': embeddings[i] if i < len(embeddings) else None | |
} | |
else: | |
# String format | |
converted_doc = { | |
'content': str(doc), | |
'metadata': {}, | |
'confidence': 0.8, | |
'embedding': embeddings[i] if i < len(embeddings) else None | |
} | |
# Ensure metadata has required fields | |
if 'metadata' not in converted_doc: | |
converted_doc['metadata'] = {} | |
# Extract source from metadata or create default | |
if 'source' not in converted_doc['metadata']: | |
# Try to get source from existing metadata | |
if hasattr(doc, 'metadata') and isinstance(doc.metadata, dict) and 'source' in doc.metadata: | |
converted_doc['metadata']['source'] = doc.metadata['source'] | |
elif isinstance(doc, dict) and 'metadata' in doc and isinstance(doc['metadata'], dict) and 'source' in doc['metadata']: | |
converted_doc['metadata']['source'] = doc['metadata']['source'] | |
else: | |
converted_doc['metadata']['source'] = f'document_{i}.pdf' | |
if 'page' not in converted_doc['metadata']: | |
converted_doc['metadata']['page'] = 1 | |
converted_docs.append(converted_doc) | |
logger.info(f"Converted {len(converted_docs)} documents to database format") | |
return converted_docs | |
def _backup_pickle_cache(self) -> None: | |
"""Create backup of pickle cache files""" | |
try: | |
cache_dir = self.knowledge_cache.cache_dir | |
backup_dir = cache_dir / "backup" | |
backup_dir.mkdir(exist_ok=True) | |
timestamp = int(time.time()) | |
# Backup main cache files | |
for cache_file in [self.knowledge_cache.documents_file, | |
self.knowledge_cache.embeddings_file, | |
self.knowledge_cache.metadata_file]: | |
if cache_file.exists(): | |
backup_file = backup_dir / f"{cache_file.name}.{timestamp}.bak" | |
backup_file.write_bytes(cache_file.read_bytes()) | |
logger.info(f"Pickle cache backed up to {backup_dir}") | |
except Exception as e: | |
logger.warning(f"Failed to backup pickle cache: {e}") | |
def verify_migration(self, pdf_files: List[Path]) -> bool: | |
""" | |
Verify that migration was successful by comparing data | |
Args: | |
pdf_files: List of PDF files to verify | |
Returns: | |
True if migration verification successful | |
""" | |
try: | |
# Load data from database | |
db_docs, db_embeddings = self.db_manager.load_documents_and_embeddings(pdf_files) | |
if not db_docs or db_embeddings is None: | |
logger.error("No data found in database after migration") | |
return False | |
# Basic checks | |
if len(db_docs) == 0: | |
logger.error("No documents found in database") | |
return False | |
if db_embeddings.shape[0] != len(db_docs): | |
logger.error(f"Embedding count mismatch: {db_embeddings.shape[0]} vs {len(db_docs)}") | |
return False | |
# Check that embeddings are valid | |
if np.isnan(db_embeddings).any(): | |
logger.error("Database contains invalid embeddings (NaN values)") | |
return False | |
logger.info(f"Migration verification successful: {len(db_docs)} documents, {db_embeddings.shape} embeddings") | |
return True | |
except Exception as e: | |
logger.error(f"Migration verification failed: {e}") | |
return False | |
class DatabaseUpgrader: | |
"""Handles database schema upgrades and version migrations""" | |
def __init__(self, db_manager: Optional[DatabaseManager] = None): | |
""" | |
Initialize database upgrader | |
Args: | |
db_manager: Database manager instance | |
""" | |
self.db_manager = db_manager or get_database_manager() | |
def get_database_version(self) -> str: | |
"""Get current database version""" | |
try: | |
with self.db_manager.get_session() as session: | |
from .database_schema import SystemCache | |
version_cache = session.query(SystemCache).filter( | |
SystemCache.cache_key == 'database_version' | |
).first() | |
if version_cache: | |
return version_cache.cache_value.get('version', '1.0') | |
else: | |
# First time setup | |
return '1.0' | |
except Exception as e: | |
logger.warning(f"Could not get database version: {e}") | |
return '1.0' | |
def set_database_version(self, version: str) -> None: | |
"""Set database version""" | |
try: | |
with self.db_manager.get_session() as session: | |
from .database_schema import SystemCache | |
version_cache = session.query(SystemCache).filter( | |
SystemCache.cache_key == 'database_version' | |
).first() | |
if version_cache: | |
version_cache.cache_value = {'version': version} | |
version_cache.is_valid = True | |
else: | |
version_cache = SystemCache( | |
cache_key='database_version', | |
cache_type='system', | |
cache_value={'version': version}, | |
cache_hash=self.db_manager._hash_config({'version': version}) | |
) | |
session.add(version_cache) | |
session.commit() | |
except Exception as e: | |
logger.error(f"Could not set database version: {e}") | |
def upgrade_database(self) -> bool: | |
""" | |
Upgrade database to latest version | |
Returns: | |
True if upgrade successful | |
""" | |
current_version = self.get_database_version() | |
target_version = '1.0' # Current version | |
logger.info(f"Database version check: current={current_version}, target={target_version}") | |
if current_version == target_version: | |
logger.info("Database is already at latest version") | |
return True | |
try: | |
# Apply version-specific upgrades | |
if current_version < '1.0': | |
self._upgrade_to_1_0() | |
# Set final version | |
self.set_database_version(target_version) | |
logger.info(f"Database upgraded to version {target_version}") | |
return True | |
except Exception as e: | |
logger.error(f"Database upgrade failed: {e}") | |
return False | |
def _upgrade_to_1_0(self) -> None: | |
"""Upgrade to version 1.0""" | |
logger.info("Upgrading database to version 1.0...") | |
# Version 1.0 is the initial version, so just ensure tables exist | |
from .database_schema import DatabaseSchema | |
DatabaseSchema.create_all_tables(self.db_manager.engine) | |
logger.info("Database upgrade to 1.0 complete") | |
def migrate_existing_cache(pdf_files: List[Path], processor_config: Dict[str, Any], | |
embedder_config: Dict[str, Any]) -> bool: | |
""" | |
High-level function to migrate existing cache to database | |
Args: | |
pdf_files: List of PDF files | |
processor_config: Document processor configuration | |
embedder_config: Embedder configuration | |
Returns: | |
True if migration successful | |
""" | |
logger.info("Starting cache migration process...") | |
try: | |
# Initialize migrator | |
migrator = CacheMigrator() | |
# Attempt migration | |
success = migrator.migrate_cache_to_database(pdf_files, processor_config, embedder_config) | |
if success: | |
# Verify migration | |
if migrator.verify_migration(pdf_files): | |
logger.info("Cache migration completed and verified successfully") | |
return True | |
else: | |
logger.error("Migration verification failed") | |
return False | |
else: | |
logger.error("Cache migration failed") | |
return False | |
except Exception as e: | |
logger.error(f"Cache migration process failed: {e}") | |
return False | |
def upgrade_database() -> bool: | |
""" | |
High-level function to upgrade database to latest version | |
Returns: | |
True if upgrade successful | |
""" | |
logger.info("Starting database upgrade process...") | |
try: | |
upgrader = DatabaseUpgrader() | |
return upgrader.upgrade_database() | |
except Exception as e: | |
logger.error(f"Database upgrade process failed: {e}") | |
return False | |
def get_migration_status() -> Dict[str, Any]: | |
""" | |
Get status of migration and database | |
Returns: | |
Dictionary with migration status information | |
""" | |
try: | |
db_manager = get_database_manager() | |
upgrader = DatabaseUpgrader(db_manager) | |
knowledge_cache = KnowledgeCache() | |
status = { | |
'database_exists': db_manager.is_database_populated(), | |
'database_version': upgrader.get_database_version(), | |
'database_stats': db_manager.get_database_stats(), | |
'pickle_cache_exists': knowledge_cache.is_valid(), | |
'pickle_cache_info': knowledge_cache.get_cache_info() | |
} | |
return status | |
except Exception as e: | |
logger.error(f"Failed to get migration status: {e}") | |
return {'error': str(e)} |