Spaces:
Running
Running
File size: 7,253 Bytes
5e1a30c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
"""
Parallel Document Processing Utilities
=====================================
Optimized document processing for faster Epic 2 system initialization.
"""
import logging
from pathlib import Path
from typing import List, Dict, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import threading
logger = logging.getLogger(__name__)
class ParallelDocumentProcessor:
"""Parallel document processor for faster system initialization"""
def __init__(self, system, max_workers: int = 2):
"""
Initialize parallel processor
Args:
system: PlatformOrchestrator instance
max_workers: Maximum number of parallel workers (reduced to 2 for stability)
"""
self.system = system
self.max_workers = max_workers
self.lock = threading.Lock() # Thread safety for system operations
def process_documents_batched(self, pdf_files: List[Path], batch_size: int = 10) -> Dict[str, int]:
"""
Process documents in batches for better performance and memory management
Args:
pdf_files: List of PDF file paths
batch_size: Number of documents to process in each batch
Returns:
Dictionary mapping file paths to chunk counts
"""
logger.info(f"Processing {len(pdf_files)} documents in batches of {batch_size}")
results = {}
failed_files = []
# Process documents in batches to avoid memory issues
for i in range(0, len(pdf_files), batch_size):
batch = pdf_files[i:i + batch_size]
logger.info(f"Processing batch {i//batch_size + 1}/{(len(pdf_files) + batch_size - 1)//batch_size}: {len(batch)} files")
# Process batch sequentially for stability
batch_results = self.system.process_documents(batch)
results.update(batch_results)
# Brief pause between batches to avoid overwhelming the system
time.sleep(0.1)
total_chunks = sum(results.values())
logger.info(f"Batch processing complete: {total_chunks} chunks from {len(pdf_files)} files")
return results
def process_documents_parallel(self, pdf_files: List[Path]) -> Dict[str, int]:
"""
Process documents in parallel for faster initialization
Args:
pdf_files: List of PDF file paths
Returns:
Dictionary mapping file paths to chunk counts
"""
logger.info(f"Processing {len(pdf_files)} documents with {self.max_workers} parallel workers")
results = {}
failed_files = []
# Use ThreadPoolExecutor with timeout for I/O-bound operations
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all document processing tasks
future_to_file = {
executor.submit(self._process_single_document, pdf_file): pdf_file
for pdf_file in pdf_files
}
# Collect results as they complete with timeout
completed = 0
for future in as_completed(future_to_file, timeout=600): # 10 minute timeout per batch
pdf_file = future_to_file[future]
completed += 1
try:
chunk_count = future.result(timeout=120) # 2 minute timeout per document
results[str(pdf_file)] = chunk_count
logger.info(f"β
Processed {pdf_file.name}: {chunk_count} chunks ({completed}/{len(pdf_files)})")
except Exception as e:
logger.error(f"β Failed to process {pdf_file}: {e}")
failed_files.append(str(pdf_file))
results[str(pdf_file)] = 0
# Progress logging every 5 files for better feedback
if completed % 5 == 0:
logger.info(f"π Progress: {completed}/{len(pdf_files)} documents processed")
if failed_files:
logger.warning(f"Failed to process {len(failed_files)} files")
return results
def _process_single_document(self, pdf_file: Path) -> int:
"""
Process a single document with thread safety
Args:
pdf_file: Path to PDF file
Returns:
Number of chunks created
"""
try:
# Process document without indexing first (to avoid FAISS thread conflicts)
logger.debug(f"π Starting processing: {pdf_file.name}")
# Get document processor and embedder directly
doc_processor = self.system.get_component('document_processor')
embedder = self.system.get_component('embedder')
# Process document to get chunks (thread-safe)
documents = doc_processor.process(pdf_file)
# Generate embeddings for chunks (thread-safe)
texts_to_embed = []
docs_needing_embedding = []
for doc in documents:
if not hasattr(doc, 'embedding') or doc.embedding is None:
texts_to_embed.append(doc.content)
docs_needing_embedding.append(doc)
# Batch embed all texts that need embeddings
if texts_to_embed:
embeddings = embedder.embed(texts_to_embed)
for doc, embedding in zip(docs_needing_embedding, embeddings):
doc.embedding = embedding
# Store results for later indexing (thread-safe)
chunk_count = len(documents)
# Index documents in the main thread (using lock for FAISS safety)
with self.lock:
retriever = self.system.get_component('retriever')
retriever.index_documents(documents)
logger.debug(f"β
Completed processing: {pdf_file.name} ({chunk_count} chunks)")
return chunk_count
except Exception as e:
logger.error(f"β Error processing {pdf_file}: {e}")
raise
def create_optimized_batch_processor(pdf_files: List[Path], batch_size: int = 16) -> List[List[Path]]:
"""
Create optimized batches for document processing
Args:
pdf_files: List of PDF files
batch_size: Size of each batch
Returns:
List of batches (each batch is a list of file paths)
"""
# Sort files by size for better load balancing
try:
pdf_files_with_size = [(f, f.stat().st_size) for f in pdf_files if f.exists()]
pdf_files_with_size.sort(key=lambda x: x[1], reverse=True) # Largest first
sorted_files = [f for f, _ in pdf_files_with_size]
except:
sorted_files = pdf_files
# Create batches
batches = []
for i in range(0, len(sorted_files), batch_size):
batch = sorted_files[i:i + batch_size]
batches.append(batch)
return batches |