Intelligent_Content_Organizer / core /text_preprocessor.py
Nihal2000's picture
Gradio mcp
9145e48
import re
import logging
from typing import List, Optional
import unicodedata
logger = logging.getLogger(__name__)
class TextPreprocessor:
def __init__(self):
# Common stop words for basic filtering
self.stop_words = {
'en': set([
'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for',
'of', 'with', 'by', 'from', 'up', 'about', 'into', 'through', 'during',
'before', 'after', 'above', 'below', 'between', 'among', 'throughout',
'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had',
'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might',
'must', 'shall', 'can', 'this', 'that', 'these', 'those', 'i', 'me',
'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your', 'yours'
])
}
def clean_text(self, text: str, aggressive: bool = False) -> str:
"""Clean and normalize text"""
if not text:
return ""
try:
# Normalize unicode characters
text = unicodedata.normalize('NFKD', text)
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Remove or replace special characters
if aggressive:
# More aggressive cleaning for embedding
text = re.sub(r'[^\w\s\-.,!?;:]', ' ', text)
text = re.sub(r'[.,!?;:]+', '.', text)
else:
# Basic cleaning for readability
text = re.sub(r'[^\w\s\-.,!?;:()\[\]{}"\']', ' ', text)
# Remove excessive punctuation
text = re.sub(r'\.{2,}', '.', text)
text = re.sub(r'[!?]{2,}', '!', text)
# Clean up whitespace again
text = re.sub(r'\s+', ' ', text)
# Remove leading/trailing whitespace
text = text.strip()
return text
except Exception as e:
logger.error(f"Error cleaning text: {str(e)}")
return text
def extract_sentences(self, text: str) -> List[str]:
"""Extract sentences from text"""
if not text:
return []
try:
# Simple sentence splitting
sentences = re.split(r'[.!?]+', text)
# Clean and filter sentences
clean_sentences = []
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) > 10: # Minimum sentence length
clean_sentences.append(sentence)
return clean_sentences
except Exception as e:
logger.error(f"Error extracting sentences: {str(e)}")
return [text]
def extract_keywords(self, text: str, language: str = 'en', max_keywords: int = 20) -> List[str]:
"""Extract potential keywords from text"""
if not text:
return []
try:
# Convert to lowercase and split into words
words = re.findall(r'\b[a-zA-Z]{3,}\b', text.lower())
# Remove stop words
stop_words = self.stop_words.get(language, set())
keywords = [word for word in words if word not in stop_words]
# Count word frequency
word_freq = {}
for word in keywords:
word_freq[word] = word_freq.get(word, 0) + 1
# Sort by frequency and return top keywords
sorted_keywords = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
return [word for word, freq in sorted_keywords[:max_keywords]]
except Exception as e:
logger.error(f"Error extracting keywords: {str(e)}")
return []
def prepare_for_embedding(self, text: str) -> str:
"""Prepare text specifically for embedding generation"""
if not text:
return ""
try:
# Clean text aggressively for better embeddings
clean_text = self.clean_text(text, aggressive=True)
# Remove very short words
words = clean_text.split()
filtered_words = [word for word in words if len(word) >= 2]
# Rejoin and ensure reasonable length
result = ' '.join(filtered_words)
# Truncate if too long (most embedding models have token limits)
if len(result) > 5000: # Rough character limit
result = result[:5000] + "..."
return result
except Exception as e:
logger.error(f"Error preparing text for embedding: {str(e)}")
return text
def extract_metadata_from_text(self, text: str) -> dict:
"""Extract metadata from text content"""
if not text:
return {}
try:
metadata = {}
# Basic statistics
metadata['character_count'] = len(text)
metadata['word_count'] = len(text.split())
metadata['sentence_count'] = len(self.extract_sentences(text))
metadata['paragraph_count'] = len([p for p in text.split('\n\n') if p.strip()])
# Content characteristics
metadata['avg_word_length'] = sum(len(word) for word in text.split()) / max(1, len(text.split()))
metadata['avg_sentence_length'] = metadata['word_count'] / max(1, metadata['sentence_count'])
# Special content detection
metadata['has_urls'] = bool(re.search(r'https?://\S+', text))
metadata['has_emails'] = bool(re.search(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text))
metadata['has_phone_numbers'] = bool(re.search(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', text))
metadata['has_dates'] = bool(re.search(r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b', text))
metadata['has_numbers'] = bool(re.search(r'\b\d+\b', text))
# Language indicators
metadata['punctuation_density'] = len(re.findall(r'[.,!?;:]', text)) / max(1, len(text))
metadata['caps_ratio'] = len(re.findall(r'[A-Z]', text)) / max(1, len(text))
return metadata
except Exception as e:
logger.error(f"Error extracting text metadata: {str(e)}")
return {}
def normalize_for_search(self, text: str) -> str:
"""Normalize text for search queries"""
if not text:
return ""
try:
# Convert to lowercase
text = text.lower()
# Remove special characters but keep spaces
text = re.sub(r'[^\w\s]', ' ', text)
# Normalize whitespace
text = re.sub(r'\s+', ' ', text)
# Strip leading/trailing whitespace
text = text.strip()
return text
except Exception as e:
logger.error(f"Error normalizing text for search: {str(e)}")
return text