File size: 7,319 Bytes
9145e48 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
import re
import logging
from typing import List, Optional
import unicodedata
logger = logging.getLogger(__name__)
class TextPreprocessor:
def __init__(self):
# Common stop words for basic filtering
self.stop_words = {
'en': set([
'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for',
'of', 'with', 'by', 'from', 'up', 'about', 'into', 'through', 'during',
'before', 'after', 'above', 'below', 'between', 'among', 'throughout',
'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had',
'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might',
'must', 'shall', 'can', 'this', 'that', 'these', 'those', 'i', 'me',
'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your', 'yours'
])
}
def clean_text(self, text: str, aggressive: bool = False) -> str:
"""Clean and normalize text"""
if not text:
return ""
try:
# Normalize unicode characters
text = unicodedata.normalize('NFKD', text)
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Remove or replace special characters
if aggressive:
# More aggressive cleaning for embedding
text = re.sub(r'[^\w\s\-.,!?;:]', ' ', text)
text = re.sub(r'[.,!?;:]+', '.', text)
else:
# Basic cleaning for readability
text = re.sub(r'[^\w\s\-.,!?;:()\[\]{}"\']', ' ', text)
# Remove excessive punctuation
text = re.sub(r'\.{2,}', '.', text)
text = re.sub(r'[!?]{2,}', '!', text)
# Clean up whitespace again
text = re.sub(r'\s+', ' ', text)
# Remove leading/trailing whitespace
text = text.strip()
return text
except Exception as e:
logger.error(f"Error cleaning text: {str(e)}")
return text
def extract_sentences(self, text: str) -> List[str]:
"""Extract sentences from text"""
if not text:
return []
try:
# Simple sentence splitting
sentences = re.split(r'[.!?]+', text)
# Clean and filter sentences
clean_sentences = []
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) > 10: # Minimum sentence length
clean_sentences.append(sentence)
return clean_sentences
except Exception as e:
logger.error(f"Error extracting sentences: {str(e)}")
return [text]
def extract_keywords(self, text: str, language: str = 'en', max_keywords: int = 20) -> List[str]:
"""Extract potential keywords from text"""
if not text:
return []
try:
# Convert to lowercase and split into words
words = re.findall(r'\b[a-zA-Z]{3,}\b', text.lower())
# Remove stop words
stop_words = self.stop_words.get(language, set())
keywords = [word for word in words if word not in stop_words]
# Count word frequency
word_freq = {}
for word in keywords:
word_freq[word] = word_freq.get(word, 0) + 1
# Sort by frequency and return top keywords
sorted_keywords = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
return [word for word, freq in sorted_keywords[:max_keywords]]
except Exception as e:
logger.error(f"Error extracting keywords: {str(e)}")
return []
def prepare_for_embedding(self, text: str) -> str:
"""Prepare text specifically for embedding generation"""
if not text:
return ""
try:
# Clean text aggressively for better embeddings
clean_text = self.clean_text(text, aggressive=True)
# Remove very short words
words = clean_text.split()
filtered_words = [word for word in words if len(word) >= 2]
# Rejoin and ensure reasonable length
result = ' '.join(filtered_words)
# Truncate if too long (most embedding models have token limits)
if len(result) > 5000: # Rough character limit
result = result[:5000] + "..."
return result
except Exception as e:
logger.error(f"Error preparing text for embedding: {str(e)}")
return text
def extract_metadata_from_text(self, text: str) -> dict:
"""Extract metadata from text content"""
if not text:
return {}
try:
metadata = {}
# Basic statistics
metadata['character_count'] = len(text)
metadata['word_count'] = len(text.split())
metadata['sentence_count'] = len(self.extract_sentences(text))
metadata['paragraph_count'] = len([p for p in text.split('\n\n') if p.strip()])
# Content characteristics
metadata['avg_word_length'] = sum(len(word) for word in text.split()) / max(1, len(text.split()))
metadata['avg_sentence_length'] = metadata['word_count'] / max(1, metadata['sentence_count'])
# Special content detection
metadata['has_urls'] = bool(re.search(r'https?://\S+', text))
metadata['has_emails'] = bool(re.search(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text))
metadata['has_phone_numbers'] = bool(re.search(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', text))
metadata['has_dates'] = bool(re.search(r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b', text))
metadata['has_numbers'] = bool(re.search(r'\b\d+\b', text))
# Language indicators
metadata['punctuation_density'] = len(re.findall(r'[.,!?;:]', text)) / max(1, len(text))
metadata['caps_ratio'] = len(re.findall(r'[A-Z]', text)) / max(1, len(text))
return metadata
except Exception as e:
logger.error(f"Error extracting text metadata: {str(e)}")
return {}
def normalize_for_search(self, text: str) -> str:
"""Normalize text for search queries"""
if not text:
return ""
try:
# Convert to lowercase
text = text.lower()
# Remove special characters but keep spaces
text = re.sub(r'[^\w\s]', ' ', text)
# Normalize whitespace
text = re.sub(r'\s+', ' ', text)
# Strip leading/trailing whitespace
text = text.strip()
return text
except Exception as e:
logger.error(f"Error normalizing text for search: {str(e)}")
return text |