""" Document Processing Pipeline Implementation. This module implements the ProcessingPipeline interface to orchestrate the flow of document processing through parsing, chunking, and cleaning stages. It coordinates sub-components and handles error recovery. Key Features: - Sequential processing pipeline (parse → chunk → clean) - Error handling with graceful degradation options - Performance monitoring and metrics collection - Configurable validation and quality controls - Comprehensive logging and debugging support Architecture Notes: - Implements ProcessingPipeline abstract base class - Orchestrates DocumentParser, TextChunker, and ContentCleaner - Provides centralized error handling and metrics - Supports different processing strategies (fail-fast vs. graceful) """ import time from pathlib import Path from typing import List, Dict, Any, Optional import sys # Add project paths for imports project_root = Path(__file__).parent.parent.parent.parent sys.path.append(str(project_root)) from src.core.interfaces import Document from .base import ProcessingPipeline, DocumentParser, TextChunker, ContentCleaner, ValidationResult, ConfigurableComponent class DocumentProcessingPipeline(ProcessingPipeline, ConfigurableComponent): """ Orchestrates document processing through multiple stages. This pipeline coordinates the three main processing stages: 1. Document Parsing - Extract text and metadata from files 2. Text Chunking - Split text into retrieval-sized chunks 3. Content Cleaning - Normalize and clean text content Features: - Sequential processing with error handling - Performance metrics for each stage - Configurable quality controls - Graceful degradation on errors - Comprehensive validation Configuration Options: - enable_validation: Validate documents before processing - enable_metrics: Collect detailed performance metrics - fail_fast: Stop on first error vs. continue processing - skip_cleaning: Skip cleaning stage for faster processing - quality_threshold: Minimum quality score for chunk inclusion """ def __init__( self, parser: DocumentParser, chunker: TextChunker, cleaner: ContentCleaner, config: Dict[str, Any] = None ): """ Initialize the processing pipeline. Args: parser: Document parser instance chunker: Text chunker instance cleaner: Content cleaner instance config: Pipeline configuration """ self.parser = parser self.chunker = chunker self.cleaner = cleaner # Default configuration self.config = { 'enable_validation': True, 'enable_metrics': True, 'fail_fast': False, 'skip_cleaning': False, 'quality_threshold': 0.0, 'max_chunks_per_document': 1000, 'enable_debug_logging': False } # Apply provided configuration if config: self.config.update(config) # Pipeline metrics self.metrics = { 'documents_processed': 0, 'total_pipeline_time': 0.0, 'stage_times': { 'parsing': 0.0, 'chunking': 0.0, 'cleaning': 0.0, 'validation': 0.0 }, 'stage_calls': { 'parsing': 0, 'chunking': 0, 'cleaning': 0, 'validation': 0 }, 'errors': { 'parsing_errors': 0, 'chunking_errors': 0, 'cleaning_errors': 0, 'validation_errors': 0 }, 'chunks_created': 0, 'chunks_filtered': 0, 'average_chunks_per_document': 0.0 } def process_document(self, file_path: Path) -> List[Document]: """ Process a document through the complete pipeline. Args: file_path: Path to the document file Returns: List of processed Document objects Raises: ValueError: If document format is not supported IOError: If document cannot be processed """ pipeline_start = time.perf_counter() try: # Stage 1: Document Parsing parsed_data = self._parse_document(file_path) # Stage 2: Text Chunking chunks = self._chunk_text(parsed_data, file_path) # Stage 3: Content Cleaning (optional) if not self.config['skip_cleaning']: cleaned_chunks = self._clean_chunks(chunks) else: cleaned_chunks = chunks # Stage 4: Convert to Document objects documents = self._create_documents(cleaned_chunks, parsed_data, file_path) # Stage 5: Quality filtering filtered_documents = self._filter_documents(documents) # Update metrics pipeline_time = time.perf_counter() - pipeline_start self._update_pipeline_metrics(filtered_documents, pipeline_time) return filtered_documents except Exception as e: if self.config['fail_fast']: raise # Graceful degradation - log error and return empty list self._log_error(f"Pipeline error for {file_path}: {str(e)}") return [] def validate_document(self, file_path: Path) -> ValidationResult: """ Validate document before processing. Args: file_path: Path to the document file Returns: ValidationResult with validation status and messages """ if not self.config['enable_validation']: return ValidationResult(valid=True) validation_start = time.perf_counter() try: errors = [] warnings = [] # Basic file validation if not file_path.exists(): errors.append(f"File not found: {file_path}") if not file_path.is_file(): errors.append(f"Path is not a file: {file_path}") # Format validation if file_path.suffix.lower() not in self.parser.supported_formats(): errors.append(f"Unsupported format: {file_path.suffix}") # Size validation if file_path.exists(): file_size_mb = file_path.stat().st_size / (1024 * 1024) if hasattr(self.parser, 'config') and 'max_file_size_mb' in self.parser.config: max_size = self.parser.config['max_file_size_mb'] if file_size_mb > max_size: errors.append(f"File too large: {file_size_mb:.1f}MB > {max_size}MB") elif file_size_mb > max_size * 0.8: warnings.append(f"Large file: {file_size_mb:.1f}MB") # Permissions validation if file_path.exists() and not file_path.stat().st_mode & 0o444: errors.append(f"File not readable: {file_path}") result = ValidationResult( valid=len(errors) == 0, errors=errors, warnings=warnings ) # Update metrics validation_time = time.perf_counter() - validation_start self.metrics['stage_times']['validation'] += validation_time self.metrics['stage_calls']['validation'] += 1 if not result.valid: self.metrics['errors']['validation_errors'] += 1 return result except Exception as e: self.metrics['errors']['validation_errors'] += 1 return ValidationResult( valid=False, errors=[f"Validation error: {str(e)}"] ) def get_metrics(self) -> Dict[str, Any]: """ Get processing metrics for monitoring. Returns: Dictionary with processing metrics and statistics """ # Calculate derived metrics derived_metrics = {} if self.metrics['stage_calls']['parsing'] > 0: derived_metrics['average_parsing_time'] = ( self.metrics['stage_times']['parsing'] / self.metrics['stage_calls']['parsing'] ) if self.metrics['stage_calls']['chunking'] > 0: derived_metrics['average_chunking_time'] = ( self.metrics['stage_times']['chunking'] / self.metrics['stage_calls']['chunking'] ) if self.metrics['stage_calls']['cleaning'] > 0: derived_metrics['average_cleaning_time'] = ( self.metrics['stage_times']['cleaning'] / self.metrics['stage_calls']['cleaning'] ) if self.metrics['documents_processed'] > 0: derived_metrics['average_pipeline_time'] = ( self.metrics['total_pipeline_time'] / self.metrics['documents_processed'] ) self.metrics['average_chunks_per_document'] = ( self.metrics['chunks_created'] / self.metrics['documents_processed'] ) # Combine base and derived metrics result = self.metrics.copy() result['derived_metrics'] = derived_metrics return result def configure(self, config: Dict[str, Any]) -> None: """ Configure the pipeline with provided settings. Args: config: Configuration dictionary Raises: ValueError: If configuration is invalid """ # Validate configuration valid_keys = { 'enable_validation', 'enable_metrics', 'fail_fast', 'skip_cleaning', 'quality_threshold', 'max_chunks_per_document', 'enable_debug_logging' } invalid_keys = set(config.keys()) - valid_keys if invalid_keys: raise ValueError(f"Invalid configuration keys: {invalid_keys}") # Validate specific values if 'quality_threshold' in config: if not isinstance(config['quality_threshold'], (int, float)) or not 0 <= config['quality_threshold'] <= 1: raise ValueError("quality_threshold must be a float between 0 and 1") if 'max_chunks_per_document' in config: if not isinstance(config['max_chunks_per_document'], int) or config['max_chunks_per_document'] <= 0: raise ValueError("max_chunks_per_document must be a positive integer") # Update configuration self.config.update(config) def get_config(self) -> Dict[str, Any]: """ Get current configuration. Returns: Current configuration dictionary """ return self.config.copy() def _parse_document(self, file_path: Path) -> Dict[str, Any]: """ Parse document using the configured parser. Args: file_path: Path to document file Returns: Parsed document data """ parse_start = time.perf_counter() try: parsed_data = self.parser.parse(file_path) # Update metrics parse_time = time.perf_counter() - parse_start self.metrics['stage_times']['parsing'] += parse_time self.metrics['stage_calls']['parsing'] += 1 self._log_debug(f"Parsed {file_path} in {parse_time:.3f}s") return parsed_data except Exception as e: self.metrics['errors']['parsing_errors'] += 1 raise IOError(f"Failed to parse document {file_path}: {str(e)}") from e def _chunk_text(self, parsed_data: Dict[str, Any], file_path: Path) -> List[Dict[str, Any]]: """ Chunk text using the configured chunker. Args: parsed_data: Parsed document data file_path: Original file path for metadata Returns: List of text chunks with metadata """ chunk_start = time.perf_counter() try: text = parsed_data.get('text', '') if not text: raise ValueError("No text found in parsed document") # Prepare metadata for chunking metadata = { 'source_file': str(file_path), 'document_metadata': parsed_data.get('metadata', {}), 'processing_timestamp': time.time() } # Chunk the text chunks = self.chunker.chunk(text, metadata) # Convert Chunk objects to dictionaries for consistency chunk_dicts = [] for chunk in chunks: chunk_dict = { 'content': chunk.content, 'start_pos': chunk.start_pos, 'end_pos': chunk.end_pos, 'metadata': chunk.metadata } chunk_dicts.append(chunk_dict) # Limit chunks if configured if len(chunk_dicts) > self.config['max_chunks_per_document']: chunk_dicts = chunk_dicts[:self.config['max_chunks_per_document']] self._log_debug(f"Limited chunks to {self.config['max_chunks_per_document']} for {file_path}") # Update metrics chunk_time = time.perf_counter() - chunk_start self.metrics['stage_times']['chunking'] += chunk_time self.metrics['stage_calls']['chunking'] += 1 self._log_debug(f"Created {len(chunk_dicts)} chunks in {chunk_time:.3f}s") return chunk_dicts except Exception as e: self.metrics['errors']['chunking_errors'] += 1 raise ValueError(f"Failed to chunk text: {str(e)}") from e def _clean_chunks(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Clean chunk content using the configured cleaner. Args: chunks: List of chunk dictionaries Returns: List of cleaned chunk dictionaries """ clean_start = time.perf_counter() try: cleaned_chunks = [] for chunk in chunks: content = chunk.get('content', '') if not content: continue # Clean the content cleaned_content = self.cleaner.clean(content) # Update chunk with cleaned content cleaned_chunk = chunk.copy() cleaned_chunk['content'] = cleaned_content cleaned_chunk['metadata'] = chunk.get('metadata', {}).copy() cleaned_chunk['metadata']['cleaned'] = True cleaned_chunk['metadata']['original_length'] = len(content) cleaned_chunk['metadata']['cleaned_length'] = len(cleaned_content) cleaned_chunks.append(cleaned_chunk) # Update metrics clean_time = time.perf_counter() - clean_start self.metrics['stage_times']['cleaning'] += clean_time self.metrics['stage_calls']['cleaning'] += 1 self._log_debug(f"Cleaned {len(cleaned_chunks)} chunks in {clean_time:.3f}s") return cleaned_chunks except Exception as e: self.metrics['errors']['cleaning_errors'] += 1 if self.config['fail_fast']: raise ValueError(f"Failed to clean chunks: {str(e)}") from e else: # Return original chunks if cleaning fails self._log_error(f"Cleaning failed, using original chunks: {str(e)}") return chunks def _create_documents( self, chunks: List[Dict[str, Any]], parsed_data: Dict[str, Any], file_path: Path ) -> List[Document]: """ Convert chunks to Document objects. Args: chunks: List of chunk dictionaries parsed_data: Original parsed document data file_path: Source file path Returns: List of Document objects """ documents = [] for i, chunk in enumerate(chunks): content = chunk.get('content', '') if not content.strip(): continue # Create comprehensive metadata metadata = { # Source information 'source': str(file_path), 'source_name': file_path.name, 'source_type': file_path.suffix.lower().lstrip('.'), # Chunk information 'chunk_index': i, 'chunk_id': chunk.get('metadata', {}).get('chunk_id', f'chunk_{i}'), 'start_pos': chunk.get('start_pos', 0), 'end_pos': chunk.get('end_pos', len(content)), # Document metadata 'document_metadata': parsed_data.get('metadata', {}), 'document_page_count': parsed_data.get('page_count', 0), 'document_processing_time': parsed_data.get('extraction_time', 0.0), # Processing metadata 'processed_by': 'document_processing_pipeline', 'processing_timestamp': time.time(), 'pipeline_config': self.config.copy(), # Quality information 'quality_score': chunk.get('metadata', {}).get('quality_score', 0.0), # Preserve any additional metadata from chunk **chunk.get('metadata', {}) } # Create Document object document = Document( content=content, metadata=metadata, embedding=None # Embeddings will be added later ) documents.append(document) return documents def _filter_documents(self, documents: List[Document]) -> List[Document]: """ Filter documents based on quality threshold. Args: documents: List of Document objects Returns: List of filtered Document objects """ if self.config['quality_threshold'] <= 0: return documents filtered = [] filtered_count = 0 for document in documents: quality_score = document.metadata.get('quality_score', 0.0) if quality_score >= self.config['quality_threshold']: filtered.append(document) else: filtered_count += 1 self.metrics['chunks_filtered'] += filtered_count if filtered_count > 0: self._log_debug(f"Filtered {filtered_count} documents below quality threshold {self.config['quality_threshold']}") return filtered def _update_pipeline_metrics(self, documents: List[Document], pipeline_time: float) -> None: """ Update overall pipeline metrics. Args: documents: Processed documents pipeline_time: Total pipeline processing time """ self.metrics['documents_processed'] += 1 self.metrics['total_pipeline_time'] += pipeline_time self.metrics['chunks_created'] += len(documents) def _log_debug(self, message: str) -> None: """ Log debug message if debug logging is enabled. Args: message: Debug message """ if self.config['enable_debug_logging']: print(f"[PIPELINE DEBUG] {message}") def _log_error(self, message: str) -> None: """ Log error message. Args: message: Error message """ print(f"[PIPELINE ERROR] {message}")