Arthur Passuello
initial commit
5e1a30c
"""
Main Document Processor Implementation.
This module implements the primary DocumentProcessor interface that
coordinates all document processing sub-components (parsing, chunking,
cleaning) through a configurable pipeline.
Architecture Notes:
- Implements DocumentProcessor interface from core.interfaces
- Coordinates sub-components via pipeline pattern
- Configuration-driven component selection
- Provides unified interface for document processing
- Includes comprehensive error handling and metrics
"""
import time
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional
import sys
# Add project paths for imports
project_root = Path(__file__).parent.parent.parent.parent
sys.path.append(str(project_root))
from src.core.interfaces import DocumentProcessor as DocumentProcessorInterface, Document, HealthStatus
# Forward declaration to avoid circular import
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from src.core.platform_orchestrator import PlatformOrchestrator
from .base import ProcessingPipeline, ConfigurableComponent, ValidationResult
from .pipeline import DocumentProcessingPipeline
from .adapters import PyMuPDFAdapter
from .chunkers import SentenceBoundaryChunker
from .cleaners import TechnicalContentCleaner
logger = logging.getLogger(__name__)
class ModularDocumentProcessor(DocumentProcessorInterface, ConfigurableComponent):
"""
Modular document processor with configurable sub-components.
This processor implements the DocumentProcessor interface while
providing a modular architecture where parsing, chunking, and
cleaning strategies can be configured independently.
Features:
- Configuration-driven component selection
- Multiple document format support (extensible)
- Comprehensive error handling and validation
- Performance metrics and monitoring
- Pluggable sub-component architecture
Configuration Structure:
{
"parser": {
"type": "pymupdf",
"config": {...}
},
"chunker": {
"type": "sentence_boundary",
"config": {...}
},
"cleaner": {
"type": "technical",
"config": {...}
}
}
"""
def __init__(self, config: Dict[str, Any] = None, chunk_size: int = None, chunk_overlap: int = None, **kwargs):
"""
Initialize the modular document processor.
Args:
config: Configuration dictionary for all sub-components
chunk_size: Legacy parameter for chunk size (backwards compatibility)
chunk_overlap: Legacy parameter for chunk overlap (backwards compatibility)
**kwargs: Additional legacy parameters for backwards compatibility
"""
# Default configuration
self.config = {
'parser': {
'type': 'pymupdf',
'config': {
'max_file_size_mb': 100,
'preserve_layout': True
}
},
'chunker': {
'type': 'sentence_boundary',
'config': {
'chunk_size': 1400,
'overlap': 200,
'quality_threshold': 0.0
}
},
'cleaner': {
'type': 'technical',
'config': {
'normalize_whitespace': True,
'remove_artifacts': True,
'preserve_code_blocks': True
}
},
'pipeline': {
'enable_validation': True,
'enable_metrics': True,
'fail_fast': False
}
}
# Handle legacy parameters for backwards compatibility
if chunk_size is not None or chunk_overlap is not None or kwargs:
# Convert legacy parameters to config format
legacy_config = {}
if chunk_size is not None:
legacy_config['chunker'] = {'config': {'chunk_size': chunk_size}}
if chunk_overlap is not None:
if 'chunker' not in legacy_config:
legacy_config['chunker'] = {'config': {}}
legacy_config['chunker']['config']['overlap'] = chunk_overlap
# Handle other legacy parameters
for key, value in kwargs.items():
if key in ['min_chunk_size', 'enable_quality_filtering', 'quality_threshold']:
if 'chunker' not in legacy_config:
legacy_config['chunker'] = {'config': {}}
legacy_config['chunker']['config'][key] = value
elif key in ['preserve_layout', 'max_file_size_mb', 'extract_images']:
if 'parser' not in legacy_config:
legacy_config['parser'] = {'config': {}}
legacy_config['parser']['config'][key] = value
elif key in ['normalize_whitespace', 'remove_artifacts', 'preserve_code_blocks']:
if 'cleaner' not in legacy_config:
legacy_config['cleaner'] = {'config': {}}
legacy_config['cleaner']['config'][key] = value
# Merge legacy config first
self._merge_config(self.config, legacy_config)
# Apply provided configuration
if config:
self._merge_config(self.config, config)
# Initialize components
self.parser = None
self.chunker = None
self.cleaner = None
self.pipeline = None
# Processing metrics
self.metrics = {
'documents_processed': 0,
'total_processing_time': 0.0,
'total_chunks_created': 0,
'total_bytes_processed': 0,
'errors_encountered': 0,
'validation_failures': 0,
'component_metrics': {}
}
# Platform services (initialized via initialize_services)
self.platform: Optional['PlatformOrchestrator'] = None
# Initialize components
self._initialize_components()
def process(self, file_path: Path) -> List[Document]:
"""
Process a document through the complete pipeline.
Args:
file_path: Path to the document file
Returns:
List of processed Document objects
Raises:
ValueError: If file format is not supported
IOError: If file cannot be read
"""
start_time = time.perf_counter()
try:
# Validate document
if self.config['pipeline']['enable_validation']:
validation_result = self.validate_document(file_path)
if not validation_result.valid:
self.metrics['validation_failures'] += 1
raise ValueError(f"Document validation failed: {validation_result.errors}")
# Process through pipeline
documents = self.pipeline.process_document(file_path)
# Update metrics
processing_time = time.perf_counter() - start_time
self._update_metrics(documents, processing_time, file_path)
# Track performance using platform services
if self.platform:
self.platform.track_component_performance(
self,
"document_processing",
{
"success": True,
"processing_time": processing_time,
"file_path": str(file_path),
"documents_created": len(documents),
"total_chunks": sum(1 for doc in documents if doc.content)
}
)
return documents
except Exception as e:
self.metrics['errors_encountered'] += 1
# Track failure using platform services
if self.platform:
processing_time = time.perf_counter() - start_time
self.platform.track_component_performance(
self,
"document_processing",
{
"success": False,
"processing_time": processing_time,
"file_path": str(file_path),
"error": str(e)
}
)
if self.config['pipeline']['fail_fast']:
raise
# Log error and return empty list for graceful degradation
print(f"Error processing {file_path}: {str(e)}")
return []
def supported_formats(self) -> List[str]:
"""
Return list of supported file extensions.
Returns:
List of supported file extensions
"""
if self.parser:
return self.parser.supported_formats()
return []
def validate_document(self, file_path: Path) -> ValidationResult:
"""
Validate document before processing.
Args:
file_path: Path to the document file
Returns:
ValidationResult with validation status and messages
"""
if self.pipeline:
return self.pipeline.validate_document(file_path)
# Fallback validation
errors = []
warnings = []
if not file_path.exists():
errors.append(f"File not found: {file_path}")
if file_path.suffix.lower() not in self.supported_formats():
errors.append(f"Unsupported file format: {file_path.suffix}")
return ValidationResult(
valid=len(errors) == 0,
errors=errors,
warnings=warnings
)
def configure(self, config: Dict[str, Any]) -> None:
"""
Configure the processor with new settings.
Args:
config: Configuration dictionary
Raises:
ValueError: If configuration is invalid
"""
# Use platform configuration service if available
if self.platform:
self.platform.update_component_configuration(self, config)
# Validate configuration
self._validate_config(config)
# Update configuration
self._merge_config(self.config, config)
# Reinitialize components with new configuration
self._initialize_components()
def get_config(self) -> Dict[str, Any]:
"""
Get current configuration.
Returns:
Current configuration dictionary
"""
return self._deep_copy_dict(self.config)
def get_component_info(self) -> Dict[str, Any]:
"""
Get information about active components.
Returns:
Dictionary with component information
"""
return {
'parser': {
'type': self.config['parser']['type'],
'class': self.parser.__class__.__name__ if self.parser else None,
'supported_formats': self.parser.supported_formats() if self.parser else []
},
'chunker': {
'type': self.config['chunker']['type'],
'class': self.chunker.__class__.__name__ if self.chunker else None,
'strategy': self.chunker.get_chunk_strategy() if hasattr(self.chunker, 'get_chunk_strategy') else None
},
'cleaner': {
'type': self.config['cleaner']['type'],
'class': self.cleaner.__class__.__name__ if self.cleaner else None,
'quality_factors': self.cleaner.get_quality_factors() if hasattr(self.cleaner, 'get_quality_factors') else []
},
'pipeline': {
'class': self.pipeline.__class__.__name__ if self.pipeline else None,
'validation_enabled': self.config['pipeline']['enable_validation'],
'metrics_enabled': self.config['pipeline']['enable_metrics']
}
}
# Standard ComponentBase interface implementation
def initialize_services(self, platform: 'PlatformOrchestrator') -> None:
"""Initialize platform services for the component.
Args:
platform: PlatformOrchestrator instance providing services
"""
self.platform = platform
logger.info("ModularDocumentProcessor initialized with platform services")
def get_health_status(self) -> HealthStatus:
"""Get the current health status of the component.
Returns:
HealthStatus object with component health information
"""
if self.platform:
return self.platform.check_component_health(self)
# Fallback if platform services not initialized
is_healthy = True
issues = []
# Check sub-components
if not self.parser:
is_healthy = False
issues.append("Parser not initialized")
if not self.chunker:
is_healthy = False
issues.append("Chunker not initialized")
if not self.cleaner:
is_healthy = False
issues.append("Cleaner not initialized")
if not self.pipeline:
is_healthy = False
issues.append("Pipeline not initialized")
return HealthStatus(
is_healthy=is_healthy,
issues=issues,
metrics={
"sub_components": self.get_component_info(),
"basic_metrics": self.metrics.copy()
},
component_name=self.__class__.__name__
)
def get_metrics(self) -> Dict[str, Any]:
"""Get component-specific metrics.
Returns:
Dictionary containing component metrics
"""
if self.platform:
try:
component_metrics = self.platform.analytics_service.collect_component_metrics(self)
return {
"component_name": component_metrics.component_name,
"component_type": component_metrics.component_type,
"success_count": component_metrics.success_count,
"error_count": component_metrics.error_count,
"resource_usage": component_metrics.resource_usage,
"performance_metrics": component_metrics.performance_metrics,
"timestamp": component_metrics.timestamp
}
except Exception as e:
# Fallback if platform service fails
pass
# Fallback if platform services not initialized - use existing method
# Update component metrics
if self.config['pipeline']['enable_metrics']:
self.metrics['component_metrics'] = {
'parser': self.parser.get_metrics() if hasattr(self.parser, 'get_metrics') else {},
'chunker': self.chunker.get_metrics() if hasattr(self.chunker, 'get_metrics') else {},
'cleaner': self.cleaner.get_metrics() if hasattr(self.cleaner, 'get_metrics') else {},
'pipeline': self.pipeline.get_metrics() if hasattr(self.pipeline, 'get_metrics') else {}
}
return self.metrics.copy()
def get_capabilities(self) -> List[str]:
"""Get list of component capabilities.
Returns:
List of capability strings
"""
capabilities = [
"document_processing",
"multi_format_support",
"modular_architecture",
"configurable_pipeline",
"performance_metrics"
]
# Add parser capabilities
if self.parser and hasattr(self.parser, 'supported_formats'):
capabilities.extend([f"parser_{fmt}" for fmt in self.parser.supported_formats()])
# Add chunker capabilities
if self.chunker:
capabilities.append(f"chunker_{self.config['chunker']['type']}")
# Add cleaner capabilities
if self.cleaner:
capabilities.append(f"cleaner_{self.config['cleaner']['type']}")
return capabilities
def _initialize_components(self) -> None:
"""Initialize all sub-components based on configuration."""
# Initialize parser
parser_config = self.config['parser']
if parser_config['type'] == 'pymupdf':
self.parser = PyMuPDFAdapter(parser_config['config'])
else:
raise ValueError(f"Unknown parser type: {parser_config['type']}")
# Initialize chunker
chunker_config = self.config['chunker']
if chunker_config['type'] == 'sentence_boundary':
self.chunker = SentenceBoundaryChunker(chunker_config['config'])
else:
raise ValueError(f"Unknown chunker type: {chunker_config['type']}")
# Initialize cleaner
cleaner_config = self.config['cleaner']
if cleaner_config['type'] == 'technical':
self.cleaner = TechnicalContentCleaner(cleaner_config['config'])
else:
raise ValueError(f"Unknown cleaner type: {cleaner_config['type']}")
# Initialize pipeline
self.pipeline = DocumentProcessingPipeline(
parser=self.parser,
chunker=self.chunker,
cleaner=self.cleaner,
config=self.config['pipeline']
)
def _validate_config(self, config: Dict[str, Any]) -> None:
"""
Validate configuration structure and values.
Args:
config: Configuration to validate
Raises:
ValueError: If configuration is invalid
"""
# Check required top-level keys
required_keys = {'parser', 'chunker', 'cleaner'}
if not all(key in config for key in required_keys):
missing_keys = required_keys - set(config.keys())
raise ValueError(f"Missing required configuration keys: {missing_keys}")
# Validate parser configuration
parser_config = config['parser']
if 'type' not in parser_config:
raise ValueError("Parser configuration must include 'type'")
if parser_config['type'] not in ['pymupdf']:
raise ValueError(f"Unknown parser type: {parser_config['type']}")
# Validate chunker configuration
chunker_config = config['chunker']
if 'type' not in chunker_config:
raise ValueError("Chunker configuration must include 'type'")
if chunker_config['type'] not in ['sentence_boundary']:
raise ValueError(f"Unknown chunker type: {chunker_config['type']}")
# Validate cleaner configuration
cleaner_config = config['cleaner']
if 'type' not in cleaner_config:
raise ValueError("Cleaner configuration must include 'type'")
if cleaner_config['type'] not in ['technical']:
raise ValueError(f"Unknown cleaner type: {cleaner_config['type']}")
def _merge_config(self, base: Dict[str, Any], update: Dict[str, Any]) -> None:
"""
Recursively merge configuration dictionaries.
Args:
base: Base configuration dictionary (modified in place)
update: Update configuration dictionary
"""
for key, value in update.items():
if key in base and isinstance(base[key], dict) and isinstance(value, dict):
self._merge_config(base[key], value)
else:
base[key] = value
def _deep_copy_dict(self, d: Dict[str, Any]) -> Dict[str, Any]:
"""
Create a deep copy of a dictionary.
Args:
d: Dictionary to copy
Returns:
Deep copy of the dictionary
"""
result = {}
for key, value in d.items():
if isinstance(value, dict):
result[key] = self._deep_copy_dict(value)
elif isinstance(value, list):
result[key] = value.copy()
else:
result[key] = value
return result
def _update_metrics(self, documents: List[Document], processing_time: float, file_path: Path) -> None:
"""
Update processing metrics.
Args:
documents: Processed documents
processing_time: Time taken for processing
file_path: Path to processed file
"""
self.metrics['documents_processed'] += 1
self.metrics['total_processing_time'] += processing_time
self.metrics['total_chunks_created'] += len(documents)
# Calculate bytes processed
try:
file_size = file_path.stat().st_size
self.metrics['total_bytes_processed'] += file_size
except OSError:
pass # File might not exist or be accessible
# Calculate derived metrics
if self.metrics['total_processing_time'] > 0:
self.metrics['average_processing_speed'] = (
self.metrics['total_bytes_processed'] / self.metrics['total_processing_time']
)
self.metrics['documents_per_second'] = (
self.metrics['documents_processed'] / self.metrics['total_processing_time']
)
if self.metrics['documents_processed'] > 0:
self.metrics['average_chunks_per_document'] = (
self.metrics['total_chunks_created'] / self.metrics['documents_processed']
)
# Factory functions for easier instantiation
def create_pdf_processor(config: Dict[str, Any] = None) -> ModularDocumentProcessor:
"""
Create a document processor optimized for PDF processing.
Args:
config: Optional configuration overrides
Returns:
Configured ModularDocumentProcessor for PDF processing
"""
default_config = {
'parser': {
'type': 'pymupdf',
'config': {
'max_file_size_mb': 100,
'preserve_layout': True,
'extract_images': False
}
},
'chunker': {
'type': 'sentence_boundary',
'config': {
'chunk_size': 1400,
'overlap': 200,
'quality_threshold': 0.0,
'preserve_sentences': True
}
},
'cleaner': {
'type': 'technical',
'config': {
'normalize_whitespace': True,
'remove_artifacts': True,
'preserve_code_blocks': True,
'preserve_equations': True
}
}
}
if config:
processor = ModularDocumentProcessor()
processor._merge_config(default_config, config)
processor.configure(default_config)
return processor
return ModularDocumentProcessor(default_config)
def create_fast_processor(config: Dict[str, Any] = None) -> ModularDocumentProcessor:
"""
Create a document processor optimized for speed.
Args:
config: Optional configuration overrides
Returns:
Configured ModularDocumentProcessor optimized for speed
"""
fast_config = {
'parser': {
'type': 'pymupdf',
'config': {
'max_file_size_mb': 50,
'preserve_layout': False,
'extract_images': False
}
},
'chunker': {
'type': 'sentence_boundary',
'config': {
'chunk_size': 1000,
'overlap': 100,
'quality_threshold': 0.3,
'enable_quality_filtering': False
}
},
'cleaner': {
'type': 'technical',
'config': {
'normalize_whitespace': True,
'remove_artifacts': False,
'preserve_code_blocks': False,
'detect_pii': False
}
},
'pipeline': {
'enable_validation': False,
'enable_metrics': False,
'fail_fast': True
}
}
if config:
processor = ModularDocumentProcessor()
processor._merge_config(fast_config, config)
processor.configure(fast_config)
return processor
return ModularDocumentProcessor(fast_config)
def create_high_quality_processor(config: Dict[str, Any] = None) -> ModularDocumentProcessor:
"""
Create a document processor optimized for quality.
Args:
config: Optional configuration overrides
Returns:
Configured ModularDocumentProcessor optimized for quality
"""
quality_config = {
'parser': {
'type': 'pymupdf',
'config': {
'max_file_size_mb': 200,
'preserve_layout': True,
'extract_images': True
}
},
'chunker': {
'type': 'sentence_boundary',
'config': {
'chunk_size': 1800,
'overlap': 300,
'quality_threshold': 0.5,
'enable_quality_filtering': True,
'preserve_sentences': True
}
},
'cleaner': {
'type': 'technical',
'config': {
'normalize_whitespace': True,
'remove_artifacts': True,
'preserve_code_blocks': True,
'preserve_equations': True,
'detect_pii': True,
'preserve_technical_formatting': True
}
},
'pipeline': {
'enable_validation': True,
'enable_metrics': True,
'fail_fast': False
}
}
if config:
processor = ModularDocumentProcessor()
processor._merge_config(quality_config, config)
processor.configure(quality_config)
return processor
return ModularDocumentProcessor(quality_config)