Arthur Passuello
initial commit
5e1a30c
"""
Technical Content Cleaner Implementation.
This cleaner implements text normalization and cleaning specifically
optimized for technical documentation. It preserves important technical
content while removing artifacts and normalizing formatting.
Key Features:
- Technical content preservation (code blocks, equations, specifications)
- Whitespace normalization without losing structure
- Artifact removal (headers, footers, navigation elements)
- PII detection placeholder for future implementation
- Configurable cleaning strategies
Architecture Notes:
- Direct implementation (no adapter pattern) as per MASTER-ARCHITECTURE.md
- Focuses on technical documentation requirements
- Preserves formatting critical for technical understanding
"""
import re
from typing import List, Dict, Any, Tuple
from pathlib import Path
import sys
# Add project paths for imports
project_root = Path(__file__).parent.parent.parent.parent.parent
sys.path.append(str(project_root))
from ..base import ContentCleaner, ConfigurableComponent, QualityAssessment
class TechnicalContentCleaner(ContentCleaner, ConfigurableComponent, QualityAssessment):
"""
Technical documentation content cleaner.
This cleaner is specifically designed for technical documentation,
preserving important technical content while removing artifacts
and normalizing formatting for better retrieval and generation.
Features:
- Preserve code blocks, equations, and technical specifications
- Remove common document artifacts (headers, footers, TOCs)
- Normalize whitespace while preserving structure
- Handle technical formatting (bullet points, numbered lists)
- Basic PII detection (placeholder for future enhancement)
Configuration Options:
- normalize_whitespace: Enable whitespace normalization (default: True)
- remove_artifacts: Remove document artifacts (default: True)
- preserve_code_blocks: Preserve code block formatting (default: True)
- preserve_equations: Preserve mathematical equations (default: True)
- detect_pii: Enable PII detection (default: False)
- pii_action: Action for PII ('redact', 'remove', 'flag') (default: 'flag')
"""
def __init__(self, config: Dict[str, Any] = None):
"""
Initialize the technical content cleaner.
Args:
config: Configuration dictionary with cleaner settings
"""
# Default configuration
self.config = {
'normalize_whitespace': True,
'remove_artifacts': True,
'preserve_code_blocks': True,
'preserve_equations': True,
'detect_pii': False,
'pii_action': 'flag',
'min_line_length': 10,
'max_consecutive_newlines': 2,
'preserve_technical_formatting': True
}
# Apply provided configuration
if config:
self.config.update(config)
# Cleaning metrics
self.metrics = {
'texts_processed': 0,
'artifacts_removed': 0,
'pii_detected': 0,
'bytes_cleaned': 0,
'cleaning_operations': {
'whitespace_normalized': 0,
'artifacts_removed': 0,
'code_blocks_preserved': 0,
'equations_preserved': 0
}
}
# Quality assessment factors
self.quality_factors = [
'technical_content_preservation',
'formatting_consistency',
'artifact_removal',
'content_completeness',
'readability_improvement'
]
# Compile regex patterns for performance
self._compile_patterns()
def clean(self, text: str) -> str:
"""
Clean and normalize text content.
Args:
text: Input text to be cleaned
Returns:
Cleaned text with normalized formatting
Raises:
ValueError: If text is None or invalid
"""
if text is None:
raise ValueError("Text cannot be None")
if not isinstance(text, str):
raise ValueError("Text must be a string")
if not text.strip():
return ""
original_length = len(text)
cleaned_text = text
# Step 1: Preserve important technical content
protected_content = self._protect_technical_content(cleaned_text)
# Step 2: Remove document artifacts
if self.config['remove_artifacts']:
cleaned_text = self._remove_artifacts(protected_content['text'])
self.metrics['cleaning_operations']['artifacts_removed'] += 1
# Step 3: Normalize whitespace
if self.config['normalize_whitespace']:
cleaned_text = self._normalize_whitespace(cleaned_text)
self.metrics['cleaning_operations']['whitespace_normalized'] += 1
# Step 4: Restore protected content
cleaned_text = self._restore_protected_content(cleaned_text, protected_content)
# Update metrics
self.metrics['texts_processed'] += 1
self.metrics['bytes_cleaned'] += abs(len(cleaned_text) - original_length)
return cleaned_text
def normalize(self, text: str) -> str:
"""
Normalize text formatting and structure.
Args:
text: Input text to normalize
Returns:
Normalized text with consistent formatting
"""
if not text:
return ""
normalized = text
# Normalize line endings
normalized = re.sub(r'\r\n|\r', '\n', normalized)
# Normalize quotation marks
normalized = re.sub(r'[""„"«»]', '"', normalized)
normalized = re.sub(r"[''‚'‹›]", "'", normalized)
# Normalize dashes
normalized = re.sub(r'[–—]', '-', normalized)
# Normalize ellipsis
normalized = re.sub(r'\.{3,}', '...', normalized)
# Normalize multiple spaces (but preserve intentional spacing)
normalized = re.sub(r' {2,}', ' ', normalized)
# Normalize bullet points
normalized = re.sub(r'[•·‧▪▫]', '•', normalized)
return normalized
def remove_pii(self, text: str) -> Tuple[str, List[Dict[str, Any]]]:
"""
Remove personally identifiable information from text.
Args:
text: Input text potentially containing PII
Returns:
Tuple of (cleaned_text, detected_pii_entities)
Note:
This is a basic implementation that can be enhanced
with more sophisticated PII detection in the future.
"""
if not self.config['detect_pii']:
return text, []
detected_pii = []
cleaned_text = text
# Basic PII patterns
pii_patterns = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
'ip_address': r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
}
for pii_type, pattern in pii_patterns.items():
matches = re.finditer(pattern, cleaned_text)
for match in matches:
detected_pii.append({
'type': pii_type,
'value': match.group(),
'start': match.start(),
'end': match.end()
})
# Apply PII action
if self.config['pii_action'] == 'redact':
cleaned_text = cleaned_text.replace(match.group(), '[REDACTED]')
elif self.config['pii_action'] == 'remove':
cleaned_text = cleaned_text.replace(match.group(), '')
# 'flag' action just detects without modifying
self.metrics['pii_detected'] += len(detected_pii)
return cleaned_text, detected_pii
def configure(self, config: Dict[str, Any]) -> None:
"""
Configure the cleaner with provided settings.
Args:
config: Configuration dictionary
Raises:
ValueError: If configuration is invalid
"""
# Validate configuration
self._validate_config(config)
# Update configuration
self.config.update(config)
# Recompile patterns if needed
self._compile_patterns()
def get_config(self) -> Dict[str, Any]:
"""
Get current configuration.
Returns:
Current configuration dictionary
"""
return self.config.copy()
def assess_quality(self, content: str) -> float:
"""
Assess the quality of cleaned content.
Args:
content: Content to assess
Returns:
Quality score between 0.0 and 1.0
"""
if not content:
return 0.0
quality_score = 0.0
# Factor 1: Technical content preservation (30% weight)
tech_score = self._assess_technical_preservation(content)
quality_score += tech_score * 0.3
# Factor 2: Formatting consistency (25% weight)
format_score = self._assess_formatting_consistency(content)
quality_score += format_score * 0.25
# Factor 3: Artifact removal (20% weight)
artifact_score = self._assess_artifact_removal(content)
quality_score += artifact_score * 0.2
# Factor 4: Content completeness (15% weight)
completeness_score = self._assess_content_completeness(content)
quality_score += completeness_score * 0.15
# Factor 5: Readability improvement (10% weight)
readability_score = self._assess_readability_improvement(content)
quality_score += readability_score * 0.1
return min(1.0, quality_score)
def get_quality_factors(self) -> List[str]:
"""
Get list of quality factors considered.
Returns:
List of quality factor names
"""
return self.quality_factors.copy()
def get_metrics(self) -> Dict[str, Any]:
"""
Get cleaning metrics.
Returns:
Dictionary with cleaning metrics and statistics
"""
return self.metrics.copy()
def _compile_patterns(self) -> None:
"""Compile regex patterns for performance."""
# Common artifacts to remove
self.artifact_patterns = [
# Headers and footers
r'^\s*page \d+\s*$',
r'^\s*\d+\s*$',
r'^\s*chapter \d+\s*$',
r'^\s*section \d+\s*$',
# Table of contents patterns
r'^\s*\d+\..*\.\.\.\.\.\d+\s*$',
r'^\s*contents?\s*$',
r'^\s*table of contents\s*$',
# Navigation elements
r'^\s*next\s*$',
r'^\s*previous\s*$',
r'^\s*back to top\s*$',
# Copyright and legal
r'^\s*copyright \d{4}',
r'^\s*©\s*\d{4}',
r'^\s*all rights reserved',
# Document metadata
r'^\s*document id:',
r'^\s*version:',
r'^\s*last updated:',
r'^\s*created:',
]
# Compile patterns
self.compiled_artifact_patterns = [
re.compile(pattern, re.IGNORECASE | re.MULTILINE)
for pattern in self.artifact_patterns
]
# Code block patterns
self.code_block_patterns = [
re.compile(r'```.*?```', re.DOTALL),
re.compile(r'`[^`]+`'),
re.compile(r'^\s{4,}.*$', re.MULTILINE), # Indented code
re.compile(r'^\t+.*$', re.MULTILINE), # Tab-indented code
]
# Equation patterns
self.equation_patterns = [
re.compile(r'\$\$.*?\$\$', re.DOTALL),
re.compile(r'\$[^$]+\$'),
re.compile(r'\\begin\{.*?\}.*?\\end\{.*?\}', re.DOTALL),
]
def _protect_technical_content(self, text: str) -> Dict[str, Any]:
"""
Protect technical content from cleaning operations.
Args:
text: Input text
Returns:
Dictionary with protected content and placeholders
"""
protected = {
'text': text,
'code_blocks': [],
'equations': [],
'placeholders': {}
}
placeholder_counter = 0
# Protect code blocks
if self.config['preserve_code_blocks']:
for pattern in self.code_block_patterns:
matches = pattern.finditer(text)
for match in matches:
placeholder = f"__PROTECTED_CODE_{placeholder_counter}__"
protected['code_blocks'].append(match.group())
protected['placeholders'][placeholder] = match.group()
protected['text'] = protected['text'].replace(match.group(), placeholder)
placeholder_counter += 1
self.metrics['cleaning_operations']['code_blocks_preserved'] += 1
# Protect equations
if self.config['preserve_equations']:
for pattern in self.equation_patterns:
matches = pattern.finditer(protected['text'])
for match in matches:
placeholder = f"__PROTECTED_EQUATION_{placeholder_counter}__"
protected['equations'].append(match.group())
protected['placeholders'][placeholder] = match.group()
protected['text'] = protected['text'].replace(match.group(), placeholder)
placeholder_counter += 1
self.metrics['cleaning_operations']['equations_preserved'] += 1
return protected
def _remove_artifacts(self, text: str) -> str:
"""
Remove document artifacts.
Args:
text: Input text
Returns:
Text with artifacts removed
"""
cleaned = text
artifacts_removed = 0
# Remove common artifacts
for pattern in self.compiled_artifact_patterns:
matches = pattern.findall(cleaned)
artifacts_removed += len(matches)
cleaned = pattern.sub('', cleaned)
# Remove short lines that are likely artifacts
lines = cleaned.split('\n')
cleaned_lines = []
for line in lines:
line_stripped = line.strip()
# Keep line if it meets criteria
if (len(line_stripped) >= self.config['min_line_length'] or
not line_stripped or # Keep empty lines for structure
any(pattern in line_stripped.lower() for pattern in ['algorithm', 'equation', 'figure', 'table'])):
cleaned_lines.append(line)
else:
artifacts_removed += 1
self.metrics['artifacts_removed'] += artifacts_removed
return '\n'.join(cleaned_lines)
def _normalize_whitespace(self, text: str) -> str:
"""
Normalize whitespace while preserving structure.
Args:
text: Input text
Returns:
Text with normalized whitespace
"""
# Remove trailing whitespace from lines
text = re.sub(r'[ \t]+$', '', text, flags=re.MULTILINE)
# Normalize multiple consecutive newlines
max_newlines = self.config['max_consecutive_newlines']
text = re.sub(f'\n{{{max_newlines+1},}}', '\n' * max_newlines, text)
# Remove leading/trailing whitespace from entire text
text = text.strip()
return text
def _restore_protected_content(self, text: str, protected: Dict[str, Any]) -> str:
"""
Restore protected technical content.
Args:
text: Cleaned text with placeholders
protected: Protected content dictionary
Returns:
Text with protected content restored
"""
restored = text
# Restore all protected content
for placeholder, original in protected['placeholders'].items():
restored = restored.replace(placeholder, original)
return restored
def _validate_config(self, config: Dict[str, Any]) -> None:
"""
Validate configuration parameters.
Args:
config: Configuration to validate
Raises:
ValueError: If configuration is invalid
"""
valid_keys = {
'normalize_whitespace', 'remove_artifacts', 'preserve_code_blocks',
'preserve_equations', 'detect_pii', 'pii_action', 'min_line_length',
'max_consecutive_newlines', 'preserve_technical_formatting'
}
invalid_keys = set(config.keys()) - valid_keys
if invalid_keys:
raise ValueError(f"Invalid configuration keys: {invalid_keys}")
# Validate specific values
if 'pii_action' in config and config['pii_action'] not in ['redact', 'remove', 'flag']:
raise ValueError("pii_action must be 'redact', 'remove', or 'flag'")
if 'min_line_length' in config and (not isinstance(config['min_line_length'], int) or config['min_line_length'] < 0):
raise ValueError("min_line_length must be a non-negative integer")
if 'max_consecutive_newlines' in config and (not isinstance(config['max_consecutive_newlines'], int) or config['max_consecutive_newlines'] < 1):
raise ValueError("max_consecutive_newlines must be a positive integer")
def _assess_technical_preservation(self, content: str) -> float:
"""
Assess how well technical content is preserved.
Args:
content: Content to assess
Returns:
Technical preservation score (0.0 to 1.0)
"""
# Look for technical indicators
technical_indicators = [
'algorithm', 'function', 'variable', 'parameter', 'return',
'struct', 'class', 'interface', 'implementation', 'specification',
'register', 'memory', 'processor', 'instruction', 'operation',
'equation', 'formula', 'calculation', 'value', 'result'
]
content_lower = content.lower()
found_indicators = sum(1 for indicator in technical_indicators if indicator in content_lower)
return min(1.0, found_indicators / 10.0)
def _assess_formatting_consistency(self, content: str) -> float:
"""
Assess formatting consistency.
Args:
content: Content to assess
Returns:
Formatting consistency score (0.0 to 1.0)
"""
lines = content.split('\n')
if not lines:
return 0.0
# Check for consistent indentation
indentation_levels = set()
for line in lines:
if line.strip():
leading_spaces = len(line) - len(line.lstrip())
indentation_levels.add(leading_spaces)
# Consistent indentation suggests good formatting
consistency_score = 1.0 - min(0.5, len(indentation_levels) / 10.0)
return consistency_score
def _assess_artifact_removal(self, content: str) -> float:
"""
Assess how well artifacts were removed.
Args:
content: Content to assess
Returns:
Artifact removal score (0.0 to 1.0)
"""
# Look for common artifacts that should be removed
artifact_indicators = [
'page ', 'chapter ', 'section ', 'contents', 'copyright',
'next', 'previous', 'back to top', 'document id', 'version:'
]
content_lower = content.lower()
found_artifacts = sum(1 for indicator in artifact_indicators if indicator in content_lower)
# Fewer artifacts = better score
return max(0.0, 1.0 - (found_artifacts / 10.0))
def _assess_content_completeness(self, content: str) -> float:
"""
Assess content completeness.
Args:
content: Content to assess
Returns:
Content completeness score (0.0 to 1.0)
"""
# Check for sentence completeness
sentences = re.split(r'[.!?]+', content)
complete_sentences = [s.strip() for s in sentences if len(s.strip()) > 5]
if not sentences:
return 0.0
completeness_ratio = len(complete_sentences) / len(sentences)
return min(1.0, completeness_ratio)
def _assess_readability_improvement(self, content: str) -> float:
"""
Assess readability improvement.
Args:
content: Content to assess
Returns:
Readability improvement score (0.0 to 1.0)
"""
# Simple readability metrics
words = content.split()
if not words:
return 0.0
# Check for reasonable word lengths
avg_word_length = sum(len(word) for word in words) / len(words)
word_length_score = min(1.0, avg_word_length / 8.0)
# Check for reasonable sentence lengths
sentences = re.split(r'[.!?]+', content)
if sentences:
avg_sentence_length = sum(len(sentence.split()) for sentence in sentences) / len(sentences)
sentence_length_score = min(1.0, avg_sentence_length / 20.0)
else:
sentence_length_score = 0.0
return (word_length_score + sentence_length_score) / 2.0