File size: 14,761 Bytes
5e1a30c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
"""
Migration Utilities for Epic 2 Demo Database
============================================

Utilities to migrate existing pickle-based cache to persistent database
and handle data migrations between versions.
"""

import logging
import pickle
import json
import time
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple
import numpy as np

from .database_manager import DatabaseManager, get_database_manager
from .knowledge_cache import KnowledgeCache

logger = logging.getLogger(__name__)


class CacheMigrator:
    """Handles migration from pickle cache to database"""
    
    def __init__(self, db_manager: Optional[DatabaseManager] = None):
        """
        Initialize cache migrator
        
        Args:
            db_manager: Database manager instance (creates default if None)
        """
        self.db_manager = db_manager or get_database_manager()
        self.knowledge_cache = KnowledgeCache()
    
    def migrate_cache_to_database(self, pdf_files: List[Path], 
                                processor_config: Dict[str, Any],
                                embedder_config: Dict[str, Any]) -> bool:
        """
        Migrate existing pickle cache to database
        
        Args:
            pdf_files: List of PDF files that were processed
            processor_config: Document processor configuration
            embedder_config: Embedder configuration
            
        Returns:
            True if migration successful
        """
        logger.info("Starting migration from pickle cache to database...")
        
        try:
            # Check if cache is valid and has data
            # Note: knowledge_cache.is_cache_valid expects (pdf_files, embedder_config) but 
            # create_embedder_config_hash expects a system object
            # For migration, we'll use a simplified validation
            if not self.knowledge_cache.is_valid():
                logger.warning("Pickle cache is not valid or missing")
                return False
            
            # Load documents and embeddings from pickle cache
            documents, embeddings = self.knowledge_cache.load_knowledge_base()
            
            if not documents or embeddings is None:
                logger.warning("No data found in pickle cache")
                return False
            
            logger.info(f"Loaded {len(documents)} documents and {embeddings.shape} embeddings from pickle cache")
            
            # Convert documents to expected format
            converted_docs = self._convert_documents_format(documents, embeddings)
            
            logger.info(f"Converted {len(converted_docs)} documents for database save")
            
            # Save to database
            success = self.db_manager.save_documents_and_embeddings(
                converted_docs, pdf_files, processor_config, embedder_config
            )
            
            if success:
                logger.info("Migration to database completed successfully")
                
                # Create backup of pickle cache before clearing
                self._backup_pickle_cache()
                
                # Optionally clear pickle cache
                logger.info("Migration successful - pickle cache backed up")
                return True
            else:
                logger.error("Failed to save migrated data to database")
                return False
                
        except Exception as e:
            logger.error(f"Cache migration failed: {e}")
            return False
    
    def _convert_documents_format(self, documents: List[Any], embeddings: np.ndarray) -> List[Dict[str, Any]]:
        """Convert documents from pickle format to database format"""
        converted_docs = []
        
        for i, doc in enumerate(documents):
            # Handle different document formats
            if hasattr(doc, '__dict__'):
                # Object format
                converted_doc = {
                    'content': getattr(doc, 'content', ''),
                    'metadata': getattr(doc, 'metadata', {}),
                    'confidence': getattr(doc, 'confidence', 0.8),
                    'embedding': embeddings[i] if i < len(embeddings) else None
                }
            elif isinstance(doc, dict):
                # Dictionary format
                converted_doc = {
                    'content': doc.get('content', ''),
                    'metadata': doc.get('metadata', {}),
                    'confidence': doc.get('confidence', 0.8),
                    'embedding': embeddings[i] if i < len(embeddings) else None
                }
            else:
                # String format
                converted_doc = {
                    'content': str(doc),
                    'metadata': {},
                    'confidence': 0.8,
                    'embedding': embeddings[i] if i < len(embeddings) else None
                }
            
            # Ensure metadata has required fields
            if 'metadata' not in converted_doc:
                converted_doc['metadata'] = {}
            
            # Extract source from metadata or create default
            if 'source' not in converted_doc['metadata']:
                # Try to get source from existing metadata
                if hasattr(doc, 'metadata') and isinstance(doc.metadata, dict) and 'source' in doc.metadata:
                    converted_doc['metadata']['source'] = doc.metadata['source']
                elif isinstance(doc, dict) and 'metadata' in doc and isinstance(doc['metadata'], dict) and 'source' in doc['metadata']:
                    converted_doc['metadata']['source'] = doc['metadata']['source']
                else:
                    converted_doc['metadata']['source'] = f'document_{i}.pdf'
            
            if 'page' not in converted_doc['metadata']:
                converted_doc['metadata']['page'] = 1
            
            converted_docs.append(converted_doc)
        
        logger.info(f"Converted {len(converted_docs)} documents to database format")
        return converted_docs
    
    def _backup_pickle_cache(self) -> None:
        """Create backup of pickle cache files"""
        try:
            cache_dir = self.knowledge_cache.cache_dir
            backup_dir = cache_dir / "backup"
            backup_dir.mkdir(exist_ok=True)
            
            timestamp = int(time.time())
            
            # Backup main cache files
            for cache_file in [self.knowledge_cache.documents_file, 
                             self.knowledge_cache.embeddings_file,
                             self.knowledge_cache.metadata_file]:
                if cache_file.exists():
                    backup_file = backup_dir / f"{cache_file.name}.{timestamp}.bak"
                    backup_file.write_bytes(cache_file.read_bytes())
            
            logger.info(f"Pickle cache backed up to {backup_dir}")
            
        except Exception as e:
            logger.warning(f"Failed to backup pickle cache: {e}")
    
    def verify_migration(self, pdf_files: List[Path]) -> bool:
        """
        Verify that migration was successful by comparing data
        
        Args:
            pdf_files: List of PDF files to verify
            
        Returns:
            True if migration verification successful
        """
        try:
            # Load data from database
            db_docs, db_embeddings = self.db_manager.load_documents_and_embeddings(pdf_files)
            
            if not db_docs or db_embeddings is None:
                logger.error("No data found in database after migration")
                return False
            
            # Basic checks
            if len(db_docs) == 0:
                logger.error("No documents found in database")
                return False
            
            if db_embeddings.shape[0] != len(db_docs):
                logger.error(f"Embedding count mismatch: {db_embeddings.shape[0]} vs {len(db_docs)}")
                return False
            
            # Check that embeddings are valid
            if np.isnan(db_embeddings).any():
                logger.error("Database contains invalid embeddings (NaN values)")
                return False
            
            logger.info(f"Migration verification successful: {len(db_docs)} documents, {db_embeddings.shape} embeddings")
            return True
            
        except Exception as e:
            logger.error(f"Migration verification failed: {e}")
            return False


class DatabaseUpgrader:
    """Handles database schema upgrades and version migrations"""
    
    def __init__(self, db_manager: Optional[DatabaseManager] = None):
        """
        Initialize database upgrader
        
        Args:
            db_manager: Database manager instance
        """
        self.db_manager = db_manager or get_database_manager()
    
    def get_database_version(self) -> str:
        """Get current database version"""
        try:
            with self.db_manager.get_session() as session:
                from .database_schema import SystemCache
                
                version_cache = session.query(SystemCache).filter(
                    SystemCache.cache_key == 'database_version'
                ).first()
                
                if version_cache:
                    return version_cache.cache_value.get('version', '1.0')
                else:
                    # First time setup
                    return '1.0'
                    
        except Exception as e:
            logger.warning(f"Could not get database version: {e}")
            return '1.0'
    
    def set_database_version(self, version: str) -> None:
        """Set database version"""
        try:
            with self.db_manager.get_session() as session:
                from .database_schema import SystemCache
                
                version_cache = session.query(SystemCache).filter(
                    SystemCache.cache_key == 'database_version'
                ).first()
                
                if version_cache:
                    version_cache.cache_value = {'version': version}
                    version_cache.is_valid = True
                else:
                    version_cache = SystemCache(
                        cache_key='database_version',
                        cache_type='system',
                        cache_value={'version': version},
                        cache_hash=self.db_manager._hash_config({'version': version})
                    )
                    session.add(version_cache)
                
                session.commit()
                
        except Exception as e:
            logger.error(f"Could not set database version: {e}")
    
    def upgrade_database(self) -> bool:
        """
        Upgrade database to latest version
        
        Returns:
            True if upgrade successful
        """
        current_version = self.get_database_version()
        target_version = '1.0'  # Current version
        
        logger.info(f"Database version check: current={current_version}, target={target_version}")
        
        if current_version == target_version:
            logger.info("Database is already at latest version")
            return True
        
        try:
            # Apply version-specific upgrades
            if current_version < '1.0':
                self._upgrade_to_1_0()
            
            # Set final version
            self.set_database_version(target_version)
            logger.info(f"Database upgraded to version {target_version}")
            return True
            
        except Exception as e:
            logger.error(f"Database upgrade failed: {e}")
            return False
    
    def _upgrade_to_1_0(self) -> None:
        """Upgrade to version 1.0"""
        logger.info("Upgrading database to version 1.0...")
        
        # Version 1.0 is the initial version, so just ensure tables exist
        from .database_schema import DatabaseSchema
        DatabaseSchema.create_all_tables(self.db_manager.engine)
        
        logger.info("Database upgrade to 1.0 complete")


def migrate_existing_cache(pdf_files: List[Path], processor_config: Dict[str, Any], 
                          embedder_config: Dict[str, Any]) -> bool:
    """
    High-level function to migrate existing cache to database
    
    Args:
        pdf_files: List of PDF files
        processor_config: Document processor configuration
        embedder_config: Embedder configuration
        
    Returns:
        True if migration successful
    """
    logger.info("Starting cache migration process...")
    
    try:
        # Initialize migrator
        migrator = CacheMigrator()
        
        # Attempt migration
        success = migrator.migrate_cache_to_database(pdf_files, processor_config, embedder_config)
        
        if success:
            # Verify migration
            if migrator.verify_migration(pdf_files):
                logger.info("Cache migration completed and verified successfully")
                return True
            else:
                logger.error("Migration verification failed")
                return False
        else:
            logger.error("Cache migration failed")
            return False
            
    except Exception as e:
        logger.error(f"Cache migration process failed: {e}")
        return False


def upgrade_database() -> bool:
    """
    High-level function to upgrade database to latest version
    
    Returns:
        True if upgrade successful
    """
    logger.info("Starting database upgrade process...")
    
    try:
        upgrader = DatabaseUpgrader()
        return upgrader.upgrade_database()
        
    except Exception as e:
        logger.error(f"Database upgrade process failed: {e}")
        return False


def get_migration_status() -> Dict[str, Any]:
    """
    Get status of migration and database
    
    Returns:
        Dictionary with migration status information
    """
    try:
        db_manager = get_database_manager()
        upgrader = DatabaseUpgrader(db_manager)
        knowledge_cache = KnowledgeCache()
        
        status = {
            'database_exists': db_manager.is_database_populated(),
            'database_version': upgrader.get_database_version(),
            'database_stats': db_manager.get_database_stats(),
            'pickle_cache_exists': knowledge_cache.is_valid(),
            'pickle_cache_info': knowledge_cache.get_cache_info()
        }
        
        return status
        
    except Exception as e:
        logger.error(f"Failed to get migration status: {e}")
        return {'error': str(e)}