""" Main Document Processor Implementation. This module implements the primary DocumentProcessor interface that coordinates all document processing sub-components (parsing, chunking, cleaning) through a configurable pipeline. Architecture Notes: - Implements DocumentProcessor interface from core.interfaces - Coordinates sub-components via pipeline pattern - Configuration-driven component selection - Provides unified interface for document processing - Includes comprehensive error handling and metrics """ import time import logging from pathlib import Path from typing import List, Dict, Any, Optional import sys # Add project paths for imports project_root = Path(__file__).parent.parent.parent.parent sys.path.append(str(project_root)) from src.core.interfaces import DocumentProcessor as DocumentProcessorInterface, Document, HealthStatus # Forward declaration to avoid circular import from typing import TYPE_CHECKING if TYPE_CHECKING: from src.core.platform_orchestrator import PlatformOrchestrator from .base import ProcessingPipeline, ConfigurableComponent, ValidationResult from .pipeline import DocumentProcessingPipeline from .adapters import PyMuPDFAdapter from .chunkers import SentenceBoundaryChunker from .cleaners import TechnicalContentCleaner logger = logging.getLogger(__name__) class ModularDocumentProcessor(DocumentProcessorInterface, ConfigurableComponent): """ Modular document processor with configurable sub-components. This processor implements the DocumentProcessor interface while providing a modular architecture where parsing, chunking, and cleaning strategies can be configured independently. Features: - Configuration-driven component selection - Multiple document format support (extensible) - Comprehensive error handling and validation - Performance metrics and monitoring - Pluggable sub-component architecture Configuration Structure: { "parser": { "type": "pymupdf", "config": {...} }, "chunker": { "type": "sentence_boundary", "config": {...} }, "cleaner": { "type": "technical", "config": {...} } } """ def __init__(self, config: Dict[str, Any] = None, chunk_size: int = None, chunk_overlap: int = None, **kwargs): """ Initialize the modular document processor. Args: config: Configuration dictionary for all sub-components chunk_size: Legacy parameter for chunk size (backwards compatibility) chunk_overlap: Legacy parameter for chunk overlap (backwards compatibility) **kwargs: Additional legacy parameters for backwards compatibility """ # Default configuration self.config = { 'parser': { 'type': 'pymupdf', 'config': { 'max_file_size_mb': 100, 'preserve_layout': True } }, 'chunker': { 'type': 'sentence_boundary', 'config': { 'chunk_size': 1400, 'overlap': 200, 'quality_threshold': 0.0 } }, 'cleaner': { 'type': 'technical', 'config': { 'normalize_whitespace': True, 'remove_artifacts': True, 'preserve_code_blocks': True } }, 'pipeline': { 'enable_validation': True, 'enable_metrics': True, 'fail_fast': False } } # Handle legacy parameters for backwards compatibility if chunk_size is not None or chunk_overlap is not None or kwargs: # Convert legacy parameters to config format legacy_config = {} if chunk_size is not None: legacy_config['chunker'] = {'config': {'chunk_size': chunk_size}} if chunk_overlap is not None: if 'chunker' not in legacy_config: legacy_config['chunker'] = {'config': {}} legacy_config['chunker']['config']['overlap'] = chunk_overlap # Handle other legacy parameters for key, value in kwargs.items(): if key in ['min_chunk_size', 'enable_quality_filtering', 'quality_threshold']: if 'chunker' not in legacy_config: legacy_config['chunker'] = {'config': {}} legacy_config['chunker']['config'][key] = value elif key in ['preserve_layout', 'max_file_size_mb', 'extract_images']: if 'parser' not in legacy_config: legacy_config['parser'] = {'config': {}} legacy_config['parser']['config'][key] = value elif key in ['normalize_whitespace', 'remove_artifacts', 'preserve_code_blocks']: if 'cleaner' not in legacy_config: legacy_config['cleaner'] = {'config': {}} legacy_config['cleaner']['config'][key] = value # Merge legacy config first self._merge_config(self.config, legacy_config) # Apply provided configuration if config: self._merge_config(self.config, config) # Initialize components self.parser = None self.chunker = None self.cleaner = None self.pipeline = None # Processing metrics self.metrics = { 'documents_processed': 0, 'total_processing_time': 0.0, 'total_chunks_created': 0, 'total_bytes_processed': 0, 'errors_encountered': 0, 'validation_failures': 0, 'component_metrics': {} } # Platform services (initialized via initialize_services) self.platform: Optional['PlatformOrchestrator'] = None # Initialize components self._initialize_components() def process(self, file_path: Path) -> List[Document]: """ Process a document through the complete pipeline. Args: file_path: Path to the document file Returns: List of processed Document objects Raises: ValueError: If file format is not supported IOError: If file cannot be read """ start_time = time.perf_counter() try: # Validate document if self.config['pipeline']['enable_validation']: validation_result = self.validate_document(file_path) if not validation_result.valid: self.metrics['validation_failures'] += 1 raise ValueError(f"Document validation failed: {validation_result.errors}") # Process through pipeline documents = self.pipeline.process_document(file_path) # Update metrics processing_time = time.perf_counter() - start_time self._update_metrics(documents, processing_time, file_path) # Track performance using platform services if self.platform: self.platform.track_component_performance( self, "document_processing", { "success": True, "processing_time": processing_time, "file_path": str(file_path), "documents_created": len(documents), "total_chunks": sum(1 for doc in documents if doc.content) } ) return documents except Exception as e: self.metrics['errors_encountered'] += 1 # Track failure using platform services if self.platform: processing_time = time.perf_counter() - start_time self.platform.track_component_performance( self, "document_processing", { "success": False, "processing_time": processing_time, "file_path": str(file_path), "error": str(e) } ) if self.config['pipeline']['fail_fast']: raise # Log error and return empty list for graceful degradation print(f"Error processing {file_path}: {str(e)}") return [] def supported_formats(self) -> List[str]: """ Return list of supported file extensions. Returns: List of supported file extensions """ if self.parser: return self.parser.supported_formats() return [] def validate_document(self, file_path: Path) -> ValidationResult: """ Validate document before processing. Args: file_path: Path to the document file Returns: ValidationResult with validation status and messages """ if self.pipeline: return self.pipeline.validate_document(file_path) # Fallback validation errors = [] warnings = [] if not file_path.exists(): errors.append(f"File not found: {file_path}") if file_path.suffix.lower() not in self.supported_formats(): errors.append(f"Unsupported file format: {file_path.suffix}") return ValidationResult( valid=len(errors) == 0, errors=errors, warnings=warnings ) def configure(self, config: Dict[str, Any]) -> None: """ Configure the processor with new settings. Args: config: Configuration dictionary Raises: ValueError: If configuration is invalid """ # Use platform configuration service if available if self.platform: self.platform.update_component_configuration(self, config) # Validate configuration self._validate_config(config) # Update configuration self._merge_config(self.config, config) # Reinitialize components with new configuration self._initialize_components() def get_config(self) -> Dict[str, Any]: """ Get current configuration. Returns: Current configuration dictionary """ return self._deep_copy_dict(self.config) def get_component_info(self) -> Dict[str, Any]: """ Get information about active components. Returns: Dictionary with component information """ return { 'parser': { 'type': self.config['parser']['type'], 'class': self.parser.__class__.__name__ if self.parser else None, 'supported_formats': self.parser.supported_formats() if self.parser else [] }, 'chunker': { 'type': self.config['chunker']['type'], 'class': self.chunker.__class__.__name__ if self.chunker else None, 'strategy': self.chunker.get_chunk_strategy() if hasattr(self.chunker, 'get_chunk_strategy') else None }, 'cleaner': { 'type': self.config['cleaner']['type'], 'class': self.cleaner.__class__.__name__ if self.cleaner else None, 'quality_factors': self.cleaner.get_quality_factors() if hasattr(self.cleaner, 'get_quality_factors') else [] }, 'pipeline': { 'class': self.pipeline.__class__.__name__ if self.pipeline else None, 'validation_enabled': self.config['pipeline']['enable_validation'], 'metrics_enabled': self.config['pipeline']['enable_metrics'] } } # Standard ComponentBase interface implementation def initialize_services(self, platform: 'PlatformOrchestrator') -> None: """Initialize platform services for the component. Args: platform: PlatformOrchestrator instance providing services """ self.platform = platform logger.info("ModularDocumentProcessor initialized with platform services") def get_health_status(self) -> HealthStatus: """Get the current health status of the component. Returns: HealthStatus object with component health information """ if self.platform: return self.platform.check_component_health(self) # Fallback if platform services not initialized is_healthy = True issues = [] # Check sub-components if not self.parser: is_healthy = False issues.append("Parser not initialized") if not self.chunker: is_healthy = False issues.append("Chunker not initialized") if not self.cleaner: is_healthy = False issues.append("Cleaner not initialized") if not self.pipeline: is_healthy = False issues.append("Pipeline not initialized") return HealthStatus( is_healthy=is_healthy, issues=issues, metrics={ "sub_components": self.get_component_info(), "basic_metrics": self.metrics.copy() }, component_name=self.__class__.__name__ ) def get_metrics(self) -> Dict[str, Any]: """Get component-specific metrics. Returns: Dictionary containing component metrics """ if self.platform: try: component_metrics = self.platform.analytics_service.collect_component_metrics(self) return { "component_name": component_metrics.component_name, "component_type": component_metrics.component_type, "success_count": component_metrics.success_count, "error_count": component_metrics.error_count, "resource_usage": component_metrics.resource_usage, "performance_metrics": component_metrics.performance_metrics, "timestamp": component_metrics.timestamp } except Exception as e: # Fallback if platform service fails pass # Fallback if platform services not initialized - use existing method # Update component metrics if self.config['pipeline']['enable_metrics']: self.metrics['component_metrics'] = { 'parser': self.parser.get_metrics() if hasattr(self.parser, 'get_metrics') else {}, 'chunker': self.chunker.get_metrics() if hasattr(self.chunker, 'get_metrics') else {}, 'cleaner': self.cleaner.get_metrics() if hasattr(self.cleaner, 'get_metrics') else {}, 'pipeline': self.pipeline.get_metrics() if hasattr(self.pipeline, 'get_metrics') else {} } return self.metrics.copy() def get_capabilities(self) -> List[str]: """Get list of component capabilities. Returns: List of capability strings """ capabilities = [ "document_processing", "multi_format_support", "modular_architecture", "configurable_pipeline", "performance_metrics" ] # Add parser capabilities if self.parser and hasattr(self.parser, 'supported_formats'): capabilities.extend([f"parser_{fmt}" for fmt in self.parser.supported_formats()]) # Add chunker capabilities if self.chunker: capabilities.append(f"chunker_{self.config['chunker']['type']}") # Add cleaner capabilities if self.cleaner: capabilities.append(f"cleaner_{self.config['cleaner']['type']}") return capabilities def _initialize_components(self) -> None: """Initialize all sub-components based on configuration.""" # Initialize parser parser_config = self.config['parser'] if parser_config['type'] == 'pymupdf': self.parser = PyMuPDFAdapter(parser_config['config']) else: raise ValueError(f"Unknown parser type: {parser_config['type']}") # Initialize chunker chunker_config = self.config['chunker'] if chunker_config['type'] == 'sentence_boundary': self.chunker = SentenceBoundaryChunker(chunker_config['config']) else: raise ValueError(f"Unknown chunker type: {chunker_config['type']}") # Initialize cleaner cleaner_config = self.config['cleaner'] if cleaner_config['type'] == 'technical': self.cleaner = TechnicalContentCleaner(cleaner_config['config']) else: raise ValueError(f"Unknown cleaner type: {cleaner_config['type']}") # Initialize pipeline self.pipeline = DocumentProcessingPipeline( parser=self.parser, chunker=self.chunker, cleaner=self.cleaner, config=self.config['pipeline'] ) def _validate_config(self, config: Dict[str, Any]) -> None: """ Validate configuration structure and values. Args: config: Configuration to validate Raises: ValueError: If configuration is invalid """ # Check required top-level keys required_keys = {'parser', 'chunker', 'cleaner'} if not all(key in config for key in required_keys): missing_keys = required_keys - set(config.keys()) raise ValueError(f"Missing required configuration keys: {missing_keys}") # Validate parser configuration parser_config = config['parser'] if 'type' not in parser_config: raise ValueError("Parser configuration must include 'type'") if parser_config['type'] not in ['pymupdf']: raise ValueError(f"Unknown parser type: {parser_config['type']}") # Validate chunker configuration chunker_config = config['chunker'] if 'type' not in chunker_config: raise ValueError("Chunker configuration must include 'type'") if chunker_config['type'] not in ['sentence_boundary']: raise ValueError(f"Unknown chunker type: {chunker_config['type']}") # Validate cleaner configuration cleaner_config = config['cleaner'] if 'type' not in cleaner_config: raise ValueError("Cleaner configuration must include 'type'") if cleaner_config['type'] not in ['technical']: raise ValueError(f"Unknown cleaner type: {cleaner_config['type']}") def _merge_config(self, base: Dict[str, Any], update: Dict[str, Any]) -> None: """ Recursively merge configuration dictionaries. Args: base: Base configuration dictionary (modified in place) update: Update configuration dictionary """ for key, value in update.items(): if key in base and isinstance(base[key], dict) and isinstance(value, dict): self._merge_config(base[key], value) else: base[key] = value def _deep_copy_dict(self, d: Dict[str, Any]) -> Dict[str, Any]: """ Create a deep copy of a dictionary. Args: d: Dictionary to copy Returns: Deep copy of the dictionary """ result = {} for key, value in d.items(): if isinstance(value, dict): result[key] = self._deep_copy_dict(value) elif isinstance(value, list): result[key] = value.copy() else: result[key] = value return result def _update_metrics(self, documents: List[Document], processing_time: float, file_path: Path) -> None: """ Update processing metrics. Args: documents: Processed documents processing_time: Time taken for processing file_path: Path to processed file """ self.metrics['documents_processed'] += 1 self.metrics['total_processing_time'] += processing_time self.metrics['total_chunks_created'] += len(documents) # Calculate bytes processed try: file_size = file_path.stat().st_size self.metrics['total_bytes_processed'] += file_size except OSError: pass # File might not exist or be accessible # Calculate derived metrics if self.metrics['total_processing_time'] > 0: self.metrics['average_processing_speed'] = ( self.metrics['total_bytes_processed'] / self.metrics['total_processing_time'] ) self.metrics['documents_per_second'] = ( self.metrics['documents_processed'] / self.metrics['total_processing_time'] ) if self.metrics['documents_processed'] > 0: self.metrics['average_chunks_per_document'] = ( self.metrics['total_chunks_created'] / self.metrics['documents_processed'] ) # Factory functions for easier instantiation def create_pdf_processor(config: Dict[str, Any] = None) -> ModularDocumentProcessor: """ Create a document processor optimized for PDF processing. Args: config: Optional configuration overrides Returns: Configured ModularDocumentProcessor for PDF processing """ default_config = { 'parser': { 'type': 'pymupdf', 'config': { 'max_file_size_mb': 100, 'preserve_layout': True, 'extract_images': False } }, 'chunker': { 'type': 'sentence_boundary', 'config': { 'chunk_size': 1400, 'overlap': 200, 'quality_threshold': 0.0, 'preserve_sentences': True } }, 'cleaner': { 'type': 'technical', 'config': { 'normalize_whitespace': True, 'remove_artifacts': True, 'preserve_code_blocks': True, 'preserve_equations': True } } } if config: processor = ModularDocumentProcessor() processor._merge_config(default_config, config) processor.configure(default_config) return processor return ModularDocumentProcessor(default_config) def create_fast_processor(config: Dict[str, Any] = None) -> ModularDocumentProcessor: """ Create a document processor optimized for speed. Args: config: Optional configuration overrides Returns: Configured ModularDocumentProcessor optimized for speed """ fast_config = { 'parser': { 'type': 'pymupdf', 'config': { 'max_file_size_mb': 50, 'preserve_layout': False, 'extract_images': False } }, 'chunker': { 'type': 'sentence_boundary', 'config': { 'chunk_size': 1000, 'overlap': 100, 'quality_threshold': 0.3, 'enable_quality_filtering': False } }, 'cleaner': { 'type': 'technical', 'config': { 'normalize_whitespace': True, 'remove_artifacts': False, 'preserve_code_blocks': False, 'detect_pii': False } }, 'pipeline': { 'enable_validation': False, 'enable_metrics': False, 'fail_fast': True } } if config: processor = ModularDocumentProcessor() processor._merge_config(fast_config, config) processor.configure(fast_config) return processor return ModularDocumentProcessor(fast_config) def create_high_quality_processor(config: Dict[str, Any] = None) -> ModularDocumentProcessor: """ Create a document processor optimized for quality. Args: config: Optional configuration overrides Returns: Configured ModularDocumentProcessor optimized for quality """ quality_config = { 'parser': { 'type': 'pymupdf', 'config': { 'max_file_size_mb': 200, 'preserve_layout': True, 'extract_images': True } }, 'chunker': { 'type': 'sentence_boundary', 'config': { 'chunk_size': 1800, 'overlap': 300, 'quality_threshold': 0.5, 'enable_quality_filtering': True, 'preserve_sentences': True } }, 'cleaner': { 'type': 'technical', 'config': { 'normalize_whitespace': True, 'remove_artifacts': True, 'preserve_code_blocks': True, 'preserve_equations': True, 'detect_pii': True, 'preserve_technical_formatting': True } }, 'pipeline': { 'enable_validation': True, 'enable_metrics': True, 'fail_fast': False } } if config: processor = ModularDocumentProcessor() processor._merge_config(quality_config, config) processor.configure(quality_config) return processor return ModularDocumentProcessor(quality_config)