enhanced-rag-demo / demo /utils /parallel_processor.py
Arthur Passuello
initial commit
5e1a30c
"""
Parallel Document Processing Utilities
=====================================
Optimized document processing for faster Epic 2 system initialization.
"""
import logging
from pathlib import Path
from typing import List, Dict, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import threading
logger = logging.getLogger(__name__)
class ParallelDocumentProcessor:
"""Parallel document processor for faster system initialization"""
def __init__(self, system, max_workers: int = 2):
"""
Initialize parallel processor
Args:
system: PlatformOrchestrator instance
max_workers: Maximum number of parallel workers (reduced to 2 for stability)
"""
self.system = system
self.max_workers = max_workers
self.lock = threading.Lock() # Thread safety for system operations
def process_documents_batched(self, pdf_files: List[Path], batch_size: int = 10) -> Dict[str, int]:
"""
Process documents in batches for better performance and memory management
Args:
pdf_files: List of PDF file paths
batch_size: Number of documents to process in each batch
Returns:
Dictionary mapping file paths to chunk counts
"""
logger.info(f"Processing {len(pdf_files)} documents in batches of {batch_size}")
results = {}
failed_files = []
# Process documents in batches to avoid memory issues
for i in range(0, len(pdf_files), batch_size):
batch = pdf_files[i:i + batch_size]
logger.info(f"Processing batch {i//batch_size + 1}/{(len(pdf_files) + batch_size - 1)//batch_size}: {len(batch)} files")
# Process batch sequentially for stability
batch_results = self.system.process_documents(batch)
results.update(batch_results)
# Brief pause between batches to avoid overwhelming the system
time.sleep(0.1)
total_chunks = sum(results.values())
logger.info(f"Batch processing complete: {total_chunks} chunks from {len(pdf_files)} files")
return results
def process_documents_parallel(self, pdf_files: List[Path]) -> Dict[str, int]:
"""
Process documents in parallel for faster initialization
Args:
pdf_files: List of PDF file paths
Returns:
Dictionary mapping file paths to chunk counts
"""
logger.info(f"Processing {len(pdf_files)} documents with {self.max_workers} parallel workers")
results = {}
failed_files = []
# Use ThreadPoolExecutor with timeout for I/O-bound operations
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all document processing tasks
future_to_file = {
executor.submit(self._process_single_document, pdf_file): pdf_file
for pdf_file in pdf_files
}
# Collect results as they complete with timeout
completed = 0
for future in as_completed(future_to_file, timeout=600): # 10 minute timeout per batch
pdf_file = future_to_file[future]
completed += 1
try:
chunk_count = future.result(timeout=120) # 2 minute timeout per document
results[str(pdf_file)] = chunk_count
logger.info(f"βœ… Processed {pdf_file.name}: {chunk_count} chunks ({completed}/{len(pdf_files)})")
except Exception as e:
logger.error(f"❌ Failed to process {pdf_file}: {e}")
failed_files.append(str(pdf_file))
results[str(pdf_file)] = 0
# Progress logging every 5 files for better feedback
if completed % 5 == 0:
logger.info(f"πŸ“Š Progress: {completed}/{len(pdf_files)} documents processed")
if failed_files:
logger.warning(f"Failed to process {len(failed_files)} files")
return results
def _process_single_document(self, pdf_file: Path) -> int:
"""
Process a single document with thread safety
Args:
pdf_file: Path to PDF file
Returns:
Number of chunks created
"""
try:
# Process document without indexing first (to avoid FAISS thread conflicts)
logger.debug(f"πŸ”„ Starting processing: {pdf_file.name}")
# Get document processor and embedder directly
doc_processor = self.system.get_component('document_processor')
embedder = self.system.get_component('embedder')
# Process document to get chunks (thread-safe)
documents = doc_processor.process(pdf_file)
# Generate embeddings for chunks (thread-safe)
texts_to_embed = []
docs_needing_embedding = []
for doc in documents:
if not hasattr(doc, 'embedding') or doc.embedding is None:
texts_to_embed.append(doc.content)
docs_needing_embedding.append(doc)
# Batch embed all texts that need embeddings
if texts_to_embed:
embeddings = embedder.embed(texts_to_embed)
for doc, embedding in zip(docs_needing_embedding, embeddings):
doc.embedding = embedding
# Store results for later indexing (thread-safe)
chunk_count = len(documents)
# Index documents in the main thread (using lock for FAISS safety)
with self.lock:
retriever = self.system.get_component('retriever')
retriever.index_documents(documents)
logger.debug(f"βœ… Completed processing: {pdf_file.name} ({chunk_count} chunks)")
return chunk_count
except Exception as e:
logger.error(f"❌ Error processing {pdf_file}: {e}")
raise
def create_optimized_batch_processor(pdf_files: List[Path], batch_size: int = 16) -> List[List[Path]]:
"""
Create optimized batches for document processing
Args:
pdf_files: List of PDF files
batch_size: Size of each batch
Returns:
List of batches (each batch is a list of file paths)
"""
# Sort files by size for better load balancing
try:
pdf_files_with_size = [(f, f.stat().st_size) for f in pdf_files if f.exists()]
pdf_files_with_size.sort(key=lambda x: x[1], reverse=True) # Largest first
sorted_files = [f for f, _ in pdf_files_with_size]
except:
sorted_files = pdf_files
# Create batches
batches = []
for i in range(0, len(sorted_files), batch_size):
batch = sorted_files[i:i + batch_size]
batches.append(batch)
return batches