|
import re |
|
import logging |
|
from typing import List, Optional |
|
import unicodedata |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class TextPreprocessor: |
|
def __init__(self): |
|
|
|
self.stop_words = { |
|
'en': set([ |
|
'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', |
|
'of', 'with', 'by', 'from', 'up', 'about', 'into', 'through', 'during', |
|
'before', 'after', 'above', 'below', 'between', 'among', 'throughout', |
|
'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', |
|
'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might', |
|
'must', 'shall', 'can', 'this', 'that', 'these', 'those', 'i', 'me', |
|
'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your', 'yours' |
|
]) |
|
} |
|
|
|
def clean_text(self, text: str, aggressive: bool = False) -> str: |
|
"""Clean and normalize text""" |
|
if not text: |
|
return "" |
|
|
|
try: |
|
|
|
text = unicodedata.normalize('NFKD', text) |
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
|
|
if aggressive: |
|
|
|
text = re.sub(r'[^\w\s\-.,!?;:]', ' ', text) |
|
text = re.sub(r'[.,!?;:]+', '.', text) |
|
else: |
|
|
|
text = re.sub(r'[^\w\s\-.,!?;:()\[\]{}"\']', ' ', text) |
|
|
|
|
|
text = re.sub(r'\.{2,}', '.', text) |
|
text = re.sub(r'[!?]{2,}', '!', text) |
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
|
|
text = text.strip() |
|
|
|
return text |
|
except Exception as e: |
|
logger.error(f"Error cleaning text: {str(e)}") |
|
return text |
|
|
|
def extract_sentences(self, text: str) -> List[str]: |
|
"""Extract sentences from text""" |
|
if not text: |
|
return [] |
|
|
|
try: |
|
|
|
sentences = re.split(r'[.!?]+', text) |
|
|
|
|
|
clean_sentences = [] |
|
for sentence in sentences: |
|
sentence = sentence.strip() |
|
if len(sentence) > 10: |
|
clean_sentences.append(sentence) |
|
|
|
return clean_sentences |
|
except Exception as e: |
|
logger.error(f"Error extracting sentences: {str(e)}") |
|
return [text] |
|
|
|
def extract_keywords(self, text: str, language: str = 'en', max_keywords: int = 20) -> List[str]: |
|
"""Extract potential keywords from text""" |
|
if not text: |
|
return [] |
|
|
|
try: |
|
|
|
words = re.findall(r'\b[a-zA-Z]{3,}\b', text.lower()) |
|
|
|
|
|
stop_words = self.stop_words.get(language, set()) |
|
keywords = [word for word in words if word not in stop_words] |
|
|
|
|
|
word_freq = {} |
|
for word in keywords: |
|
word_freq[word] = word_freq.get(word, 0) + 1 |
|
|
|
|
|
sorted_keywords = sorted(word_freq.items(), key=lambda x: x[1], reverse=True) |
|
|
|
return [word for word, freq in sorted_keywords[:max_keywords]] |
|
except Exception as e: |
|
logger.error(f"Error extracting keywords: {str(e)}") |
|
return [] |
|
|
|
def prepare_for_embedding(self, text: str) -> str: |
|
"""Prepare text specifically for embedding generation""" |
|
if not text: |
|
return "" |
|
|
|
try: |
|
|
|
clean_text = self.clean_text(text, aggressive=True) |
|
|
|
|
|
words = clean_text.split() |
|
filtered_words = [word for word in words if len(word) >= 2] |
|
|
|
|
|
result = ' '.join(filtered_words) |
|
|
|
|
|
if len(result) > 5000: |
|
result = result[:5000] + "..." |
|
|
|
return result |
|
except Exception as e: |
|
logger.error(f"Error preparing text for embedding: {str(e)}") |
|
return text |
|
|
|
def extract_metadata_from_text(self, text: str) -> dict: |
|
"""Extract metadata from text content""" |
|
if not text: |
|
return {} |
|
|
|
try: |
|
metadata = {} |
|
|
|
|
|
metadata['character_count'] = len(text) |
|
metadata['word_count'] = len(text.split()) |
|
metadata['sentence_count'] = len(self.extract_sentences(text)) |
|
metadata['paragraph_count'] = len([p for p in text.split('\n\n') if p.strip()]) |
|
|
|
|
|
metadata['avg_word_length'] = sum(len(word) for word in text.split()) / max(1, len(text.split())) |
|
metadata['avg_sentence_length'] = metadata['word_count'] / max(1, metadata['sentence_count']) |
|
|
|
|
|
metadata['has_urls'] = bool(re.search(r'https?://\S+', text)) |
|
metadata['has_emails'] = bool(re.search(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text)) |
|
metadata['has_phone_numbers'] = bool(re.search(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', text)) |
|
metadata['has_dates'] = bool(re.search(r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b', text)) |
|
metadata['has_numbers'] = bool(re.search(r'\b\d+\b', text)) |
|
|
|
|
|
metadata['punctuation_density'] = len(re.findall(r'[.,!?;:]', text)) / max(1, len(text)) |
|
metadata['caps_ratio'] = len(re.findall(r'[A-Z]', text)) / max(1, len(text)) |
|
|
|
return metadata |
|
except Exception as e: |
|
logger.error(f"Error extracting text metadata: {str(e)}") |
|
return {} |
|
|
|
def normalize_for_search(self, text: str) -> str: |
|
"""Normalize text for search queries""" |
|
if not text: |
|
return "" |
|
|
|
try: |
|
|
|
text = text.lower() |
|
|
|
|
|
text = re.sub(r'[^\w\s]', ' ', text) |
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
|
|
text = text.strip() |
|
|
|
return text |
|
except Exception as e: |
|
logger.error(f"Error normalizing text for search: {str(e)}") |
|
return text |