Spaces:
Running
Running
""" | |
Parallel Document Processing Utilities | |
===================================== | |
Optimized document processing for faster Epic 2 system initialization. | |
""" | |
import logging | |
from pathlib import Path | |
from typing import List, Dict, Any | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import time | |
import threading | |
logger = logging.getLogger(__name__) | |
class ParallelDocumentProcessor: | |
"""Parallel document processor for faster system initialization""" | |
def __init__(self, system, max_workers: int = 2): | |
""" | |
Initialize parallel processor | |
Args: | |
system: PlatformOrchestrator instance | |
max_workers: Maximum number of parallel workers (reduced to 2 for stability) | |
""" | |
self.system = system | |
self.max_workers = max_workers | |
self.lock = threading.Lock() # Thread safety for system operations | |
def process_documents_batched(self, pdf_files: List[Path], batch_size: int = 10) -> Dict[str, int]: | |
""" | |
Process documents in batches for better performance and memory management | |
Args: | |
pdf_files: List of PDF file paths | |
batch_size: Number of documents to process in each batch | |
Returns: | |
Dictionary mapping file paths to chunk counts | |
""" | |
logger.info(f"Processing {len(pdf_files)} documents in batches of {batch_size}") | |
results = {} | |
failed_files = [] | |
# Process documents in batches to avoid memory issues | |
for i in range(0, len(pdf_files), batch_size): | |
batch = pdf_files[i:i + batch_size] | |
logger.info(f"Processing batch {i//batch_size + 1}/{(len(pdf_files) + batch_size - 1)//batch_size}: {len(batch)} files") | |
# Process batch sequentially for stability | |
batch_results = self.system.process_documents(batch) | |
results.update(batch_results) | |
# Brief pause between batches to avoid overwhelming the system | |
time.sleep(0.1) | |
total_chunks = sum(results.values()) | |
logger.info(f"Batch processing complete: {total_chunks} chunks from {len(pdf_files)} files") | |
return results | |
def process_documents_parallel(self, pdf_files: List[Path]) -> Dict[str, int]: | |
""" | |
Process documents in parallel for faster initialization | |
Args: | |
pdf_files: List of PDF file paths | |
Returns: | |
Dictionary mapping file paths to chunk counts | |
""" | |
logger.info(f"Processing {len(pdf_files)} documents with {self.max_workers} parallel workers") | |
results = {} | |
failed_files = [] | |
# Use ThreadPoolExecutor with timeout for I/O-bound operations | |
with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
# Submit all document processing tasks | |
future_to_file = { | |
executor.submit(self._process_single_document, pdf_file): pdf_file | |
for pdf_file in pdf_files | |
} | |
# Collect results as they complete with timeout | |
completed = 0 | |
for future in as_completed(future_to_file, timeout=600): # 10 minute timeout per batch | |
pdf_file = future_to_file[future] | |
completed += 1 | |
try: | |
chunk_count = future.result(timeout=120) # 2 minute timeout per document | |
results[str(pdf_file)] = chunk_count | |
logger.info(f"β Processed {pdf_file.name}: {chunk_count} chunks ({completed}/{len(pdf_files)})") | |
except Exception as e: | |
logger.error(f"β Failed to process {pdf_file}: {e}") | |
failed_files.append(str(pdf_file)) | |
results[str(pdf_file)] = 0 | |
# Progress logging every 5 files for better feedback | |
if completed % 5 == 0: | |
logger.info(f"π Progress: {completed}/{len(pdf_files)} documents processed") | |
if failed_files: | |
logger.warning(f"Failed to process {len(failed_files)} files") | |
return results | |
def _process_single_document(self, pdf_file: Path) -> int: | |
""" | |
Process a single document with thread safety | |
Args: | |
pdf_file: Path to PDF file | |
Returns: | |
Number of chunks created | |
""" | |
try: | |
# Process document without indexing first (to avoid FAISS thread conflicts) | |
logger.debug(f"π Starting processing: {pdf_file.name}") | |
# Get document processor and embedder directly | |
doc_processor = self.system.get_component('document_processor') | |
embedder = self.system.get_component('embedder') | |
# Process document to get chunks (thread-safe) | |
documents = doc_processor.process(pdf_file) | |
# Generate embeddings for chunks (thread-safe) | |
texts_to_embed = [] | |
docs_needing_embedding = [] | |
for doc in documents: | |
if not hasattr(doc, 'embedding') or doc.embedding is None: | |
texts_to_embed.append(doc.content) | |
docs_needing_embedding.append(doc) | |
# Batch embed all texts that need embeddings | |
if texts_to_embed: | |
embeddings = embedder.embed(texts_to_embed) | |
for doc, embedding in zip(docs_needing_embedding, embeddings): | |
doc.embedding = embedding | |
# Store results for later indexing (thread-safe) | |
chunk_count = len(documents) | |
# Index documents in the main thread (using lock for FAISS safety) | |
with self.lock: | |
retriever = self.system.get_component('retriever') | |
retriever.index_documents(documents) | |
logger.debug(f"β Completed processing: {pdf_file.name} ({chunk_count} chunks)") | |
return chunk_count | |
except Exception as e: | |
logger.error(f"β Error processing {pdf_file}: {e}") | |
raise | |
def create_optimized_batch_processor(pdf_files: List[Path], batch_size: int = 16) -> List[List[Path]]: | |
""" | |
Create optimized batches for document processing | |
Args: | |
pdf_files: List of PDF files | |
batch_size: Size of each batch | |
Returns: | |
List of batches (each batch is a list of file paths) | |
""" | |
# Sort files by size for better load balancing | |
try: | |
pdf_files_with_size = [(f, f.stat().st_size) for f in pdf_files if f.exists()] | |
pdf_files_with_size.sort(key=lambda x: x[1], reverse=True) # Largest first | |
sorted_files = [f for f, _ in pdf_files_with_size] | |
except: | |
sorted_files = pdf_files | |
# Create batches | |
batches = [] | |
for i in range(0, len(sorted_files), batch_size): | |
batch = sorted_files[i:i + batch_size] | |
batches.append(batch) | |
return batches |