Arthur Passuello
initial commit
5e1a30c
"""
Document Processing Pipeline Implementation.
This module implements the ProcessingPipeline interface to orchestrate
the flow of document processing through parsing, chunking, and cleaning
stages. It coordinates sub-components and handles error recovery.
Key Features:
- Sequential processing pipeline (parse → chunk → clean)
- Error handling with graceful degradation options
- Performance monitoring and metrics collection
- Configurable validation and quality controls
- Comprehensive logging and debugging support
Architecture Notes:
- Implements ProcessingPipeline abstract base class
- Orchestrates DocumentParser, TextChunker, and ContentCleaner
- Provides centralized error handling and metrics
- Supports different processing strategies (fail-fast vs. graceful)
"""
import time
from pathlib import Path
from typing import List, Dict, Any, Optional
import sys
# Add project paths for imports
project_root = Path(__file__).parent.parent.parent.parent
sys.path.append(str(project_root))
from src.core.interfaces import Document
from .base import ProcessingPipeline, DocumentParser, TextChunker, ContentCleaner, ValidationResult, ConfigurableComponent
class DocumentProcessingPipeline(ProcessingPipeline, ConfigurableComponent):
"""
Orchestrates document processing through multiple stages.
This pipeline coordinates the three main processing stages:
1. Document Parsing - Extract text and metadata from files
2. Text Chunking - Split text into retrieval-sized chunks
3. Content Cleaning - Normalize and clean text content
Features:
- Sequential processing with error handling
- Performance metrics for each stage
- Configurable quality controls
- Graceful degradation on errors
- Comprehensive validation
Configuration Options:
- enable_validation: Validate documents before processing
- enable_metrics: Collect detailed performance metrics
- fail_fast: Stop on first error vs. continue processing
- skip_cleaning: Skip cleaning stage for faster processing
- quality_threshold: Minimum quality score for chunk inclusion
"""
def __init__(
self,
parser: DocumentParser,
chunker: TextChunker,
cleaner: ContentCleaner,
config: Dict[str, Any] = None
):
"""
Initialize the processing pipeline.
Args:
parser: Document parser instance
chunker: Text chunker instance
cleaner: Content cleaner instance
config: Pipeline configuration
"""
self.parser = parser
self.chunker = chunker
self.cleaner = cleaner
# Default configuration
self.config = {
'enable_validation': True,
'enable_metrics': True,
'fail_fast': False,
'skip_cleaning': False,
'quality_threshold': 0.0,
'max_chunks_per_document': 1000,
'enable_debug_logging': False
}
# Apply provided configuration
if config:
self.config.update(config)
# Pipeline metrics
self.metrics = {
'documents_processed': 0,
'total_pipeline_time': 0.0,
'stage_times': {
'parsing': 0.0,
'chunking': 0.0,
'cleaning': 0.0,
'validation': 0.0
},
'stage_calls': {
'parsing': 0,
'chunking': 0,
'cleaning': 0,
'validation': 0
},
'errors': {
'parsing_errors': 0,
'chunking_errors': 0,
'cleaning_errors': 0,
'validation_errors': 0
},
'chunks_created': 0,
'chunks_filtered': 0,
'average_chunks_per_document': 0.0
}
def process_document(self, file_path: Path) -> List[Document]:
"""
Process a document through the complete pipeline.
Args:
file_path: Path to the document file
Returns:
List of processed Document objects
Raises:
ValueError: If document format is not supported
IOError: If document cannot be processed
"""
pipeline_start = time.perf_counter()
try:
# Stage 1: Document Parsing
parsed_data = self._parse_document(file_path)
# Stage 2: Text Chunking
chunks = self._chunk_text(parsed_data, file_path)
# Stage 3: Content Cleaning (optional)
if not self.config['skip_cleaning']:
cleaned_chunks = self._clean_chunks(chunks)
else:
cleaned_chunks = chunks
# Stage 4: Convert to Document objects
documents = self._create_documents(cleaned_chunks, parsed_data, file_path)
# Stage 5: Quality filtering
filtered_documents = self._filter_documents(documents)
# Update metrics
pipeline_time = time.perf_counter() - pipeline_start
self._update_pipeline_metrics(filtered_documents, pipeline_time)
return filtered_documents
except Exception as e:
if self.config['fail_fast']:
raise
# Graceful degradation - log error and return empty list
self._log_error(f"Pipeline error for {file_path}: {str(e)}")
return []
def validate_document(self, file_path: Path) -> ValidationResult:
"""
Validate document before processing.
Args:
file_path: Path to the document file
Returns:
ValidationResult with validation status and messages
"""
if not self.config['enable_validation']:
return ValidationResult(valid=True)
validation_start = time.perf_counter()
try:
errors = []
warnings = []
# Basic file validation
if not file_path.exists():
errors.append(f"File not found: {file_path}")
if not file_path.is_file():
errors.append(f"Path is not a file: {file_path}")
# Format validation
if file_path.suffix.lower() not in self.parser.supported_formats():
errors.append(f"Unsupported format: {file_path.suffix}")
# Size validation
if file_path.exists():
file_size_mb = file_path.stat().st_size / (1024 * 1024)
if hasattr(self.parser, 'config') and 'max_file_size_mb' in self.parser.config:
max_size = self.parser.config['max_file_size_mb']
if file_size_mb > max_size:
errors.append(f"File too large: {file_size_mb:.1f}MB > {max_size}MB")
elif file_size_mb > max_size * 0.8:
warnings.append(f"Large file: {file_size_mb:.1f}MB")
# Permissions validation
if file_path.exists() and not file_path.stat().st_mode & 0o444:
errors.append(f"File not readable: {file_path}")
result = ValidationResult(
valid=len(errors) == 0,
errors=errors,
warnings=warnings
)
# Update metrics
validation_time = time.perf_counter() - validation_start
self.metrics['stage_times']['validation'] += validation_time
self.metrics['stage_calls']['validation'] += 1
if not result.valid:
self.metrics['errors']['validation_errors'] += 1
return result
except Exception as e:
self.metrics['errors']['validation_errors'] += 1
return ValidationResult(
valid=False,
errors=[f"Validation error: {str(e)}"]
)
def get_metrics(self) -> Dict[str, Any]:
"""
Get processing metrics for monitoring.
Returns:
Dictionary with processing metrics and statistics
"""
# Calculate derived metrics
derived_metrics = {}
if self.metrics['stage_calls']['parsing'] > 0:
derived_metrics['average_parsing_time'] = (
self.metrics['stage_times']['parsing'] / self.metrics['stage_calls']['parsing']
)
if self.metrics['stage_calls']['chunking'] > 0:
derived_metrics['average_chunking_time'] = (
self.metrics['stage_times']['chunking'] / self.metrics['stage_calls']['chunking']
)
if self.metrics['stage_calls']['cleaning'] > 0:
derived_metrics['average_cleaning_time'] = (
self.metrics['stage_times']['cleaning'] / self.metrics['stage_calls']['cleaning']
)
if self.metrics['documents_processed'] > 0:
derived_metrics['average_pipeline_time'] = (
self.metrics['total_pipeline_time'] / self.metrics['documents_processed']
)
self.metrics['average_chunks_per_document'] = (
self.metrics['chunks_created'] / self.metrics['documents_processed']
)
# Combine base and derived metrics
result = self.metrics.copy()
result['derived_metrics'] = derived_metrics
return result
def configure(self, config: Dict[str, Any]) -> None:
"""
Configure the pipeline with provided settings.
Args:
config: Configuration dictionary
Raises:
ValueError: If configuration is invalid
"""
# Validate configuration
valid_keys = {
'enable_validation', 'enable_metrics', 'fail_fast',
'skip_cleaning', 'quality_threshold', 'max_chunks_per_document',
'enable_debug_logging'
}
invalid_keys = set(config.keys()) - valid_keys
if invalid_keys:
raise ValueError(f"Invalid configuration keys: {invalid_keys}")
# Validate specific values
if 'quality_threshold' in config:
if not isinstance(config['quality_threshold'], (int, float)) or not 0 <= config['quality_threshold'] <= 1:
raise ValueError("quality_threshold must be a float between 0 and 1")
if 'max_chunks_per_document' in config:
if not isinstance(config['max_chunks_per_document'], int) or config['max_chunks_per_document'] <= 0:
raise ValueError("max_chunks_per_document must be a positive integer")
# Update configuration
self.config.update(config)
def get_config(self) -> Dict[str, Any]:
"""
Get current configuration.
Returns:
Current configuration dictionary
"""
return self.config.copy()
def _parse_document(self, file_path: Path) -> Dict[str, Any]:
"""
Parse document using the configured parser.
Args:
file_path: Path to document file
Returns:
Parsed document data
"""
parse_start = time.perf_counter()
try:
parsed_data = self.parser.parse(file_path)
# Update metrics
parse_time = time.perf_counter() - parse_start
self.metrics['stage_times']['parsing'] += parse_time
self.metrics['stage_calls']['parsing'] += 1
self._log_debug(f"Parsed {file_path} in {parse_time:.3f}s")
return parsed_data
except Exception as e:
self.metrics['errors']['parsing_errors'] += 1
raise IOError(f"Failed to parse document {file_path}: {str(e)}") from e
def _chunk_text(self, parsed_data: Dict[str, Any], file_path: Path) -> List[Dict[str, Any]]:
"""
Chunk text using the configured chunker.
Args:
parsed_data: Parsed document data
file_path: Original file path for metadata
Returns:
List of text chunks with metadata
"""
chunk_start = time.perf_counter()
try:
text = parsed_data.get('text', '')
if not text:
raise ValueError("No text found in parsed document")
# Prepare metadata for chunking
metadata = {
'source_file': str(file_path),
'document_metadata': parsed_data.get('metadata', {}),
'processing_timestamp': time.time()
}
# Chunk the text
chunks = self.chunker.chunk(text, metadata)
# Convert Chunk objects to dictionaries for consistency
chunk_dicts = []
for chunk in chunks:
chunk_dict = {
'content': chunk.content,
'start_pos': chunk.start_pos,
'end_pos': chunk.end_pos,
'metadata': chunk.metadata
}
chunk_dicts.append(chunk_dict)
# Limit chunks if configured
if len(chunk_dicts) > self.config['max_chunks_per_document']:
chunk_dicts = chunk_dicts[:self.config['max_chunks_per_document']]
self._log_debug(f"Limited chunks to {self.config['max_chunks_per_document']} for {file_path}")
# Update metrics
chunk_time = time.perf_counter() - chunk_start
self.metrics['stage_times']['chunking'] += chunk_time
self.metrics['stage_calls']['chunking'] += 1
self._log_debug(f"Created {len(chunk_dicts)} chunks in {chunk_time:.3f}s")
return chunk_dicts
except Exception as e:
self.metrics['errors']['chunking_errors'] += 1
raise ValueError(f"Failed to chunk text: {str(e)}") from e
def _clean_chunks(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Clean chunk content using the configured cleaner.
Args:
chunks: List of chunk dictionaries
Returns:
List of cleaned chunk dictionaries
"""
clean_start = time.perf_counter()
try:
cleaned_chunks = []
for chunk in chunks:
content = chunk.get('content', '')
if not content:
continue
# Clean the content
cleaned_content = self.cleaner.clean(content)
# Update chunk with cleaned content
cleaned_chunk = chunk.copy()
cleaned_chunk['content'] = cleaned_content
cleaned_chunk['metadata'] = chunk.get('metadata', {}).copy()
cleaned_chunk['metadata']['cleaned'] = True
cleaned_chunk['metadata']['original_length'] = len(content)
cleaned_chunk['metadata']['cleaned_length'] = len(cleaned_content)
cleaned_chunks.append(cleaned_chunk)
# Update metrics
clean_time = time.perf_counter() - clean_start
self.metrics['stage_times']['cleaning'] += clean_time
self.metrics['stage_calls']['cleaning'] += 1
self._log_debug(f"Cleaned {len(cleaned_chunks)} chunks in {clean_time:.3f}s")
return cleaned_chunks
except Exception as e:
self.metrics['errors']['cleaning_errors'] += 1
if self.config['fail_fast']:
raise ValueError(f"Failed to clean chunks: {str(e)}") from e
else:
# Return original chunks if cleaning fails
self._log_error(f"Cleaning failed, using original chunks: {str(e)}")
return chunks
def _create_documents(
self,
chunks: List[Dict[str, Any]],
parsed_data: Dict[str, Any],
file_path: Path
) -> List[Document]:
"""
Convert chunks to Document objects.
Args:
chunks: List of chunk dictionaries
parsed_data: Original parsed document data
file_path: Source file path
Returns:
List of Document objects
"""
documents = []
for i, chunk in enumerate(chunks):
content = chunk.get('content', '')
if not content.strip():
continue
# Create comprehensive metadata
metadata = {
# Source information
'source': str(file_path),
'source_name': file_path.name,
'source_type': file_path.suffix.lower().lstrip('.'),
# Chunk information
'chunk_index': i,
'chunk_id': chunk.get('metadata', {}).get('chunk_id', f'chunk_{i}'),
'start_pos': chunk.get('start_pos', 0),
'end_pos': chunk.get('end_pos', len(content)),
# Document metadata
'document_metadata': parsed_data.get('metadata', {}),
'document_page_count': parsed_data.get('page_count', 0),
'document_processing_time': parsed_data.get('extraction_time', 0.0),
# Processing metadata
'processed_by': 'document_processing_pipeline',
'processing_timestamp': time.time(),
'pipeline_config': self.config.copy(),
# Quality information
'quality_score': chunk.get('metadata', {}).get('quality_score', 0.0),
# Preserve any additional metadata from chunk
**chunk.get('metadata', {})
}
# Create Document object
document = Document(
content=content,
metadata=metadata,
embedding=None # Embeddings will be added later
)
documents.append(document)
return documents
def _filter_documents(self, documents: List[Document]) -> List[Document]:
"""
Filter documents based on quality threshold.
Args:
documents: List of Document objects
Returns:
List of filtered Document objects
"""
if self.config['quality_threshold'] <= 0:
return documents
filtered = []
filtered_count = 0
for document in documents:
quality_score = document.metadata.get('quality_score', 0.0)
if quality_score >= self.config['quality_threshold']:
filtered.append(document)
else:
filtered_count += 1
self.metrics['chunks_filtered'] += filtered_count
if filtered_count > 0:
self._log_debug(f"Filtered {filtered_count} documents below quality threshold {self.config['quality_threshold']}")
return filtered
def _update_pipeline_metrics(self, documents: List[Document], pipeline_time: float) -> None:
"""
Update overall pipeline metrics.
Args:
documents: Processed documents
pipeline_time: Total pipeline processing time
"""
self.metrics['documents_processed'] += 1
self.metrics['total_pipeline_time'] += pipeline_time
self.metrics['chunks_created'] += len(documents)
def _log_debug(self, message: str) -> None:
"""
Log debug message if debug logging is enabled.
Args:
message: Debug message
"""
if self.config['enable_debug_logging']:
print(f"[PIPELINE DEBUG] {message}")
def _log_error(self, message: str) -> None:
"""
Log error message.
Args:
message: Error message
"""
print(f"[PIPELINE ERROR] {message}")