File size: 7,253 Bytes
5e1a30c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
"""
Parallel Document Processing Utilities
=====================================

Optimized document processing for faster Epic 2 system initialization.
"""

import logging
from pathlib import Path
from typing import List, Dict, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import threading

logger = logging.getLogger(__name__)


class ParallelDocumentProcessor:
    """Parallel document processor for faster system initialization"""
    
    def __init__(self, system, max_workers: int = 2):
        """
        Initialize parallel processor
        
        Args:
            system: PlatformOrchestrator instance
            max_workers: Maximum number of parallel workers (reduced to 2 for stability)
        """
        self.system = system
        self.max_workers = max_workers
        self.lock = threading.Lock()  # Thread safety for system operations
    
    def process_documents_batched(self, pdf_files: List[Path], batch_size: int = 10) -> Dict[str, int]:
        """
        Process documents in batches for better performance and memory management
        
        Args:
            pdf_files: List of PDF file paths
            batch_size: Number of documents to process in each batch
            
        Returns:
            Dictionary mapping file paths to chunk counts
        """
        logger.info(f"Processing {len(pdf_files)} documents in batches of {batch_size}")
        
        results = {}
        failed_files = []
        
        # Process documents in batches to avoid memory issues
        for i in range(0, len(pdf_files), batch_size):
            batch = pdf_files[i:i + batch_size]
            logger.info(f"Processing batch {i//batch_size + 1}/{(len(pdf_files) + batch_size - 1)//batch_size}: {len(batch)} files")
            
            # Process batch sequentially for stability
            batch_results = self.system.process_documents(batch)
            results.update(batch_results)
            
            # Brief pause between batches to avoid overwhelming the system
            time.sleep(0.1)
        
        total_chunks = sum(results.values())
        logger.info(f"Batch processing complete: {total_chunks} chunks from {len(pdf_files)} files")
        
        return results
    
    def process_documents_parallel(self, pdf_files: List[Path]) -> Dict[str, int]:
        """
        Process documents in parallel for faster initialization
        
        Args:
            pdf_files: List of PDF file paths
            
        Returns:
            Dictionary mapping file paths to chunk counts
        """
        logger.info(f"Processing {len(pdf_files)} documents with {self.max_workers} parallel workers")
        
        results = {}
        failed_files = []
        
        # Use ThreadPoolExecutor with timeout for I/O-bound operations
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Submit all document processing tasks
            future_to_file = {
                executor.submit(self._process_single_document, pdf_file): pdf_file 
                for pdf_file in pdf_files
            }
            
            # Collect results as they complete with timeout
            completed = 0
            for future in as_completed(future_to_file, timeout=600):  # 10 minute timeout per batch
                pdf_file = future_to_file[future]
                completed += 1
                
                try:
                    chunk_count = future.result(timeout=120)  # 2 minute timeout per document
                    results[str(pdf_file)] = chunk_count
                    logger.info(f"βœ… Processed {pdf_file.name}: {chunk_count} chunks ({completed}/{len(pdf_files)})")
                except Exception as e:
                    logger.error(f"❌ Failed to process {pdf_file}: {e}")
                    failed_files.append(str(pdf_file))
                    results[str(pdf_file)] = 0
                
                # Progress logging every 5 files for better feedback
                if completed % 5 == 0:
                    logger.info(f"πŸ“Š Progress: {completed}/{len(pdf_files)} documents processed")
        
        if failed_files:
            logger.warning(f"Failed to process {len(failed_files)} files")
        
        return results
    
    def _process_single_document(self, pdf_file: Path) -> int:
        """
        Process a single document with thread safety
        
        Args:
            pdf_file: Path to PDF file
            
        Returns:
            Number of chunks created
        """
        try:
            # Process document without indexing first (to avoid FAISS thread conflicts)
            logger.debug(f"πŸ”„ Starting processing: {pdf_file.name}")
            
            # Get document processor and embedder directly
            doc_processor = self.system.get_component('document_processor')
            embedder = self.system.get_component('embedder')
            
            # Process document to get chunks (thread-safe)
            documents = doc_processor.process(pdf_file)
            
            # Generate embeddings for chunks (thread-safe)
            texts_to_embed = []
            docs_needing_embedding = []
            
            for doc in documents:
                if not hasattr(doc, 'embedding') or doc.embedding is None:
                    texts_to_embed.append(doc.content)
                    docs_needing_embedding.append(doc)
            
            # Batch embed all texts that need embeddings
            if texts_to_embed:
                embeddings = embedder.embed(texts_to_embed)
                for doc, embedding in zip(docs_needing_embedding, embeddings):
                    doc.embedding = embedding
            
            # Store results for later indexing (thread-safe)
            chunk_count = len(documents)
            
            # Index documents in the main thread (using lock for FAISS safety)
            with self.lock:
                retriever = self.system.get_component('retriever')
                retriever.index_documents(documents)
            
            logger.debug(f"βœ… Completed processing: {pdf_file.name} ({chunk_count} chunks)")
            return chunk_count
            
        except Exception as e:
            logger.error(f"❌ Error processing {pdf_file}: {e}")
            raise


def create_optimized_batch_processor(pdf_files: List[Path], batch_size: int = 16) -> List[List[Path]]:
    """
    Create optimized batches for document processing
    
    Args:
        pdf_files: List of PDF files
        batch_size: Size of each batch
        
    Returns:
        List of batches (each batch is a list of file paths)
    """
    # Sort files by size for better load balancing
    try:
        pdf_files_with_size = [(f, f.stat().st_size) for f in pdf_files if f.exists()]
        pdf_files_with_size.sort(key=lambda x: x[1], reverse=True)  # Largest first
        sorted_files = [f for f, _ in pdf_files_with_size]
    except:
        sorted_files = pdf_files
    
    # Create batches
    batches = []
    for i in range(0, len(sorted_files), batch_size):
        batch = sorted_files[i:i + batch_size]
        batches.append(batch)
    
    return batches