Spaces:
Sleeping
Sleeping
""" | |
Technical Content Cleaner Implementation. | |
This cleaner implements text normalization and cleaning specifically | |
optimized for technical documentation. It preserves important technical | |
content while removing artifacts and normalizing formatting. | |
Key Features: | |
- Technical content preservation (code blocks, equations, specifications) | |
- Whitespace normalization without losing structure | |
- Artifact removal (headers, footers, navigation elements) | |
- PII detection placeholder for future implementation | |
- Configurable cleaning strategies | |
Architecture Notes: | |
- Direct implementation (no adapter pattern) as per MASTER-ARCHITECTURE.md | |
- Focuses on technical documentation requirements | |
- Preserves formatting critical for technical understanding | |
""" | |
import re | |
from typing import List, Dict, Any, Tuple | |
from pathlib import Path | |
import sys | |
# Add project paths for imports | |
project_root = Path(__file__).parent.parent.parent.parent.parent | |
sys.path.append(str(project_root)) | |
from ..base import ContentCleaner, ConfigurableComponent, QualityAssessment | |
class TechnicalContentCleaner(ContentCleaner, ConfigurableComponent, QualityAssessment): | |
""" | |
Technical documentation content cleaner. | |
This cleaner is specifically designed for technical documentation, | |
preserving important technical content while removing artifacts | |
and normalizing formatting for better retrieval and generation. | |
Features: | |
- Preserve code blocks, equations, and technical specifications | |
- Remove common document artifacts (headers, footers, TOCs) | |
- Normalize whitespace while preserving structure | |
- Handle technical formatting (bullet points, numbered lists) | |
- Basic PII detection (placeholder for future enhancement) | |
Configuration Options: | |
- normalize_whitespace: Enable whitespace normalization (default: True) | |
- remove_artifacts: Remove document artifacts (default: True) | |
- preserve_code_blocks: Preserve code block formatting (default: True) | |
- preserve_equations: Preserve mathematical equations (default: True) | |
- detect_pii: Enable PII detection (default: False) | |
- pii_action: Action for PII ('redact', 'remove', 'flag') (default: 'flag') | |
""" | |
def __init__(self, config: Dict[str, Any] = None): | |
""" | |
Initialize the technical content cleaner. | |
Args: | |
config: Configuration dictionary with cleaner settings | |
""" | |
# Default configuration | |
self.config = { | |
'normalize_whitespace': True, | |
'remove_artifacts': True, | |
'preserve_code_blocks': True, | |
'preserve_equations': True, | |
'detect_pii': False, | |
'pii_action': 'flag', | |
'min_line_length': 10, | |
'max_consecutive_newlines': 2, | |
'preserve_technical_formatting': True | |
} | |
# Apply provided configuration | |
if config: | |
self.config.update(config) | |
# Cleaning metrics | |
self.metrics = { | |
'texts_processed': 0, | |
'artifacts_removed': 0, | |
'pii_detected': 0, | |
'bytes_cleaned': 0, | |
'cleaning_operations': { | |
'whitespace_normalized': 0, | |
'artifacts_removed': 0, | |
'code_blocks_preserved': 0, | |
'equations_preserved': 0 | |
} | |
} | |
# Quality assessment factors | |
self.quality_factors = [ | |
'technical_content_preservation', | |
'formatting_consistency', | |
'artifact_removal', | |
'content_completeness', | |
'readability_improvement' | |
] | |
# Compile regex patterns for performance | |
self._compile_patterns() | |
def clean(self, text: str) -> str: | |
""" | |
Clean and normalize text content. | |
Args: | |
text: Input text to be cleaned | |
Returns: | |
Cleaned text with normalized formatting | |
Raises: | |
ValueError: If text is None or invalid | |
""" | |
if text is None: | |
raise ValueError("Text cannot be None") | |
if not isinstance(text, str): | |
raise ValueError("Text must be a string") | |
if not text.strip(): | |
return "" | |
original_length = len(text) | |
cleaned_text = text | |
# Step 1: Preserve important technical content | |
protected_content = self._protect_technical_content(cleaned_text) | |
# Step 2: Remove document artifacts | |
if self.config['remove_artifacts']: | |
cleaned_text = self._remove_artifacts(protected_content['text']) | |
self.metrics['cleaning_operations']['artifacts_removed'] += 1 | |
# Step 3: Normalize whitespace | |
if self.config['normalize_whitespace']: | |
cleaned_text = self._normalize_whitespace(cleaned_text) | |
self.metrics['cleaning_operations']['whitespace_normalized'] += 1 | |
# Step 4: Restore protected content | |
cleaned_text = self._restore_protected_content(cleaned_text, protected_content) | |
# Update metrics | |
self.metrics['texts_processed'] += 1 | |
self.metrics['bytes_cleaned'] += abs(len(cleaned_text) - original_length) | |
return cleaned_text | |
def normalize(self, text: str) -> str: | |
""" | |
Normalize text formatting and structure. | |
Args: | |
text: Input text to normalize | |
Returns: | |
Normalized text with consistent formatting | |
""" | |
if not text: | |
return "" | |
normalized = text | |
# Normalize line endings | |
normalized = re.sub(r'\r\n|\r', '\n', normalized) | |
# Normalize quotation marks | |
normalized = re.sub(r'[""„"«»]', '"', normalized) | |
normalized = re.sub(r"[''‚'‹›]", "'", normalized) | |
# Normalize dashes | |
normalized = re.sub(r'[–—]', '-', normalized) | |
# Normalize ellipsis | |
normalized = re.sub(r'\.{3,}', '...', normalized) | |
# Normalize multiple spaces (but preserve intentional spacing) | |
normalized = re.sub(r' {2,}', ' ', normalized) | |
# Normalize bullet points | |
normalized = re.sub(r'[•·‧▪▫]', '•', normalized) | |
return normalized | |
def remove_pii(self, text: str) -> Tuple[str, List[Dict[str, Any]]]: | |
""" | |
Remove personally identifiable information from text. | |
Args: | |
text: Input text potentially containing PII | |
Returns: | |
Tuple of (cleaned_text, detected_pii_entities) | |
Note: | |
This is a basic implementation that can be enhanced | |
with more sophisticated PII detection in the future. | |
""" | |
if not self.config['detect_pii']: | |
return text, [] | |
detected_pii = [] | |
cleaned_text = text | |
# Basic PII patterns | |
pii_patterns = { | |
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', | |
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', | |
'ssn': r'\b\d{3}-\d{2}-\d{4}\b', | |
'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', | |
'ip_address': r'\b(?:\d{1,3}\.){3}\d{1,3}\b' | |
} | |
for pii_type, pattern in pii_patterns.items(): | |
matches = re.finditer(pattern, cleaned_text) | |
for match in matches: | |
detected_pii.append({ | |
'type': pii_type, | |
'value': match.group(), | |
'start': match.start(), | |
'end': match.end() | |
}) | |
# Apply PII action | |
if self.config['pii_action'] == 'redact': | |
cleaned_text = cleaned_text.replace(match.group(), '[REDACTED]') | |
elif self.config['pii_action'] == 'remove': | |
cleaned_text = cleaned_text.replace(match.group(), '') | |
# 'flag' action just detects without modifying | |
self.metrics['pii_detected'] += len(detected_pii) | |
return cleaned_text, detected_pii | |
def configure(self, config: Dict[str, Any]) -> None: | |
""" | |
Configure the cleaner with provided settings. | |
Args: | |
config: Configuration dictionary | |
Raises: | |
ValueError: If configuration is invalid | |
""" | |
# Validate configuration | |
self._validate_config(config) | |
# Update configuration | |
self.config.update(config) | |
# Recompile patterns if needed | |
self._compile_patterns() | |
def get_config(self) -> Dict[str, Any]: | |
""" | |
Get current configuration. | |
Returns: | |
Current configuration dictionary | |
""" | |
return self.config.copy() | |
def assess_quality(self, content: str) -> float: | |
""" | |
Assess the quality of cleaned content. | |
Args: | |
content: Content to assess | |
Returns: | |
Quality score between 0.0 and 1.0 | |
""" | |
if not content: | |
return 0.0 | |
quality_score = 0.0 | |
# Factor 1: Technical content preservation (30% weight) | |
tech_score = self._assess_technical_preservation(content) | |
quality_score += tech_score * 0.3 | |
# Factor 2: Formatting consistency (25% weight) | |
format_score = self._assess_formatting_consistency(content) | |
quality_score += format_score * 0.25 | |
# Factor 3: Artifact removal (20% weight) | |
artifact_score = self._assess_artifact_removal(content) | |
quality_score += artifact_score * 0.2 | |
# Factor 4: Content completeness (15% weight) | |
completeness_score = self._assess_content_completeness(content) | |
quality_score += completeness_score * 0.15 | |
# Factor 5: Readability improvement (10% weight) | |
readability_score = self._assess_readability_improvement(content) | |
quality_score += readability_score * 0.1 | |
return min(1.0, quality_score) | |
def get_quality_factors(self) -> List[str]: | |
""" | |
Get list of quality factors considered. | |
Returns: | |
List of quality factor names | |
""" | |
return self.quality_factors.copy() | |
def get_metrics(self) -> Dict[str, Any]: | |
""" | |
Get cleaning metrics. | |
Returns: | |
Dictionary with cleaning metrics and statistics | |
""" | |
return self.metrics.copy() | |
def _compile_patterns(self) -> None: | |
"""Compile regex patterns for performance.""" | |
# Common artifacts to remove | |
self.artifact_patterns = [ | |
# Headers and footers | |
r'^\s*page \d+\s*$', | |
r'^\s*\d+\s*$', | |
r'^\s*chapter \d+\s*$', | |
r'^\s*section \d+\s*$', | |
# Table of contents patterns | |
r'^\s*\d+\..*\.\.\.\.\.\d+\s*$', | |
r'^\s*contents?\s*$', | |
r'^\s*table of contents\s*$', | |
# Navigation elements | |
r'^\s*next\s*$', | |
r'^\s*previous\s*$', | |
r'^\s*back to top\s*$', | |
# Copyright and legal | |
r'^\s*copyright \d{4}', | |
r'^\s*©\s*\d{4}', | |
r'^\s*all rights reserved', | |
# Document metadata | |
r'^\s*document id:', | |
r'^\s*version:', | |
r'^\s*last updated:', | |
r'^\s*created:', | |
] | |
# Compile patterns | |
self.compiled_artifact_patterns = [ | |
re.compile(pattern, re.IGNORECASE | re.MULTILINE) | |
for pattern in self.artifact_patterns | |
] | |
# Code block patterns | |
self.code_block_patterns = [ | |
re.compile(r'```.*?```', re.DOTALL), | |
re.compile(r'`[^`]+`'), | |
re.compile(r'^\s{4,}.*$', re.MULTILINE), # Indented code | |
re.compile(r'^\t+.*$', re.MULTILINE), # Tab-indented code | |
] | |
# Equation patterns | |
self.equation_patterns = [ | |
re.compile(r'\$\$.*?\$\$', re.DOTALL), | |
re.compile(r'\$[^$]+\$'), | |
re.compile(r'\\begin\{.*?\}.*?\\end\{.*?\}', re.DOTALL), | |
] | |
def _protect_technical_content(self, text: str) -> Dict[str, Any]: | |
""" | |
Protect technical content from cleaning operations. | |
Args: | |
text: Input text | |
Returns: | |
Dictionary with protected content and placeholders | |
""" | |
protected = { | |
'text': text, | |
'code_blocks': [], | |
'equations': [], | |
'placeholders': {} | |
} | |
placeholder_counter = 0 | |
# Protect code blocks | |
if self.config['preserve_code_blocks']: | |
for pattern in self.code_block_patterns: | |
matches = pattern.finditer(text) | |
for match in matches: | |
placeholder = f"__PROTECTED_CODE_{placeholder_counter}__" | |
protected['code_blocks'].append(match.group()) | |
protected['placeholders'][placeholder] = match.group() | |
protected['text'] = protected['text'].replace(match.group(), placeholder) | |
placeholder_counter += 1 | |
self.metrics['cleaning_operations']['code_blocks_preserved'] += 1 | |
# Protect equations | |
if self.config['preserve_equations']: | |
for pattern in self.equation_patterns: | |
matches = pattern.finditer(protected['text']) | |
for match in matches: | |
placeholder = f"__PROTECTED_EQUATION_{placeholder_counter}__" | |
protected['equations'].append(match.group()) | |
protected['placeholders'][placeholder] = match.group() | |
protected['text'] = protected['text'].replace(match.group(), placeholder) | |
placeholder_counter += 1 | |
self.metrics['cleaning_operations']['equations_preserved'] += 1 | |
return protected | |
def _remove_artifacts(self, text: str) -> str: | |
""" | |
Remove document artifacts. | |
Args: | |
text: Input text | |
Returns: | |
Text with artifacts removed | |
""" | |
cleaned = text | |
artifacts_removed = 0 | |
# Remove common artifacts | |
for pattern in self.compiled_artifact_patterns: | |
matches = pattern.findall(cleaned) | |
artifacts_removed += len(matches) | |
cleaned = pattern.sub('', cleaned) | |
# Remove short lines that are likely artifacts | |
lines = cleaned.split('\n') | |
cleaned_lines = [] | |
for line in lines: | |
line_stripped = line.strip() | |
# Keep line if it meets criteria | |
if (len(line_stripped) >= self.config['min_line_length'] or | |
not line_stripped or # Keep empty lines for structure | |
any(pattern in line_stripped.lower() for pattern in ['algorithm', 'equation', 'figure', 'table'])): | |
cleaned_lines.append(line) | |
else: | |
artifacts_removed += 1 | |
self.metrics['artifacts_removed'] += artifacts_removed | |
return '\n'.join(cleaned_lines) | |
def _normalize_whitespace(self, text: str) -> str: | |
""" | |
Normalize whitespace while preserving structure. | |
Args: | |
text: Input text | |
Returns: | |
Text with normalized whitespace | |
""" | |
# Remove trailing whitespace from lines | |
text = re.sub(r'[ \t]+$', '', text, flags=re.MULTILINE) | |
# Normalize multiple consecutive newlines | |
max_newlines = self.config['max_consecutive_newlines'] | |
text = re.sub(f'\n{{{max_newlines+1},}}', '\n' * max_newlines, text) | |
# Remove leading/trailing whitespace from entire text | |
text = text.strip() | |
return text | |
def _restore_protected_content(self, text: str, protected: Dict[str, Any]) -> str: | |
""" | |
Restore protected technical content. | |
Args: | |
text: Cleaned text with placeholders | |
protected: Protected content dictionary | |
Returns: | |
Text with protected content restored | |
""" | |
restored = text | |
# Restore all protected content | |
for placeholder, original in protected['placeholders'].items(): | |
restored = restored.replace(placeholder, original) | |
return restored | |
def _validate_config(self, config: Dict[str, Any]) -> None: | |
""" | |
Validate configuration parameters. | |
Args: | |
config: Configuration to validate | |
Raises: | |
ValueError: If configuration is invalid | |
""" | |
valid_keys = { | |
'normalize_whitespace', 'remove_artifacts', 'preserve_code_blocks', | |
'preserve_equations', 'detect_pii', 'pii_action', 'min_line_length', | |
'max_consecutive_newlines', 'preserve_technical_formatting' | |
} | |
invalid_keys = set(config.keys()) - valid_keys | |
if invalid_keys: | |
raise ValueError(f"Invalid configuration keys: {invalid_keys}") | |
# Validate specific values | |
if 'pii_action' in config and config['pii_action'] not in ['redact', 'remove', 'flag']: | |
raise ValueError("pii_action must be 'redact', 'remove', or 'flag'") | |
if 'min_line_length' in config and (not isinstance(config['min_line_length'], int) or config['min_line_length'] < 0): | |
raise ValueError("min_line_length must be a non-negative integer") | |
if 'max_consecutive_newlines' in config and (not isinstance(config['max_consecutive_newlines'], int) or config['max_consecutive_newlines'] < 1): | |
raise ValueError("max_consecutive_newlines must be a positive integer") | |
def _assess_technical_preservation(self, content: str) -> float: | |
""" | |
Assess how well technical content is preserved. | |
Args: | |
content: Content to assess | |
Returns: | |
Technical preservation score (0.0 to 1.0) | |
""" | |
# Look for technical indicators | |
technical_indicators = [ | |
'algorithm', 'function', 'variable', 'parameter', 'return', | |
'struct', 'class', 'interface', 'implementation', 'specification', | |
'register', 'memory', 'processor', 'instruction', 'operation', | |
'equation', 'formula', 'calculation', 'value', 'result' | |
] | |
content_lower = content.lower() | |
found_indicators = sum(1 for indicator in technical_indicators if indicator in content_lower) | |
return min(1.0, found_indicators / 10.0) | |
def _assess_formatting_consistency(self, content: str) -> float: | |
""" | |
Assess formatting consistency. | |
Args: | |
content: Content to assess | |
Returns: | |
Formatting consistency score (0.0 to 1.0) | |
""" | |
lines = content.split('\n') | |
if not lines: | |
return 0.0 | |
# Check for consistent indentation | |
indentation_levels = set() | |
for line in lines: | |
if line.strip(): | |
leading_spaces = len(line) - len(line.lstrip()) | |
indentation_levels.add(leading_spaces) | |
# Consistent indentation suggests good formatting | |
consistency_score = 1.0 - min(0.5, len(indentation_levels) / 10.0) | |
return consistency_score | |
def _assess_artifact_removal(self, content: str) -> float: | |
""" | |
Assess how well artifacts were removed. | |
Args: | |
content: Content to assess | |
Returns: | |
Artifact removal score (0.0 to 1.0) | |
""" | |
# Look for common artifacts that should be removed | |
artifact_indicators = [ | |
'page ', 'chapter ', 'section ', 'contents', 'copyright', | |
'next', 'previous', 'back to top', 'document id', 'version:' | |
] | |
content_lower = content.lower() | |
found_artifacts = sum(1 for indicator in artifact_indicators if indicator in content_lower) | |
# Fewer artifacts = better score | |
return max(0.0, 1.0 - (found_artifacts / 10.0)) | |
def _assess_content_completeness(self, content: str) -> float: | |
""" | |
Assess content completeness. | |
Args: | |
content: Content to assess | |
Returns: | |
Content completeness score (0.0 to 1.0) | |
""" | |
# Check for sentence completeness | |
sentences = re.split(r'[.!?]+', content) | |
complete_sentences = [s.strip() for s in sentences if len(s.strip()) > 5] | |
if not sentences: | |
return 0.0 | |
completeness_ratio = len(complete_sentences) / len(sentences) | |
return min(1.0, completeness_ratio) | |
def _assess_readability_improvement(self, content: str) -> float: | |
""" | |
Assess readability improvement. | |
Args: | |
content: Content to assess | |
Returns: | |
Readability improvement score (0.0 to 1.0) | |
""" | |
# Simple readability metrics | |
words = content.split() | |
if not words: | |
return 0.0 | |
# Check for reasonable word lengths | |
avg_word_length = sum(len(word) for word in words) / len(words) | |
word_length_score = min(1.0, avg_word_length / 8.0) | |
# Check for reasonable sentence lengths | |
sentences = re.split(r'[.!?]+', content) | |
if sentences: | |
avg_sentence_length = sum(len(sentence.split()) for sentence in sentences) / len(sentences) | |
sentence_length_score = min(1.0, avg_sentence_length / 20.0) | |
else: | |
sentence_length_score = 0.0 | |
return (word_length_score + sentence_length_score) / 2.0 |