contract-guard-ai / utils /text_processor.py
satyakimitra's picture
initial commit
1099afe
# DEPENDENCIES
import re
from typing import Any
from typing import List
from typing import Dict
from typing import Optional
from difflib import SequenceMatcher
# Advanced NLP (optional but recommended)
try:
import spacy
SPACY_AVAILABLE = True
except ImportError:
SPACY_AVAILABLE = False
print("[TextProcessor] spaCy not available. Install with: pip install spacy && python -m spacy download en_core_web_sm")
# Language detection
try:
from langdetect import detect, LangDetectException
LANGDETECT_AVAILABLE = True
except ImportError:
LANGDETECT_AVAILABLE = False
class TextProcessor:
"""
Text processing and normalization utilities
"""
def __init__(self, use_spacy: bool = True):
"""
Initialize text processor
Arguments:
----------
use_spacy { bool } : Whether to use spaCy for advanced NLP (if available)
"""
self.nlp = None
if use_spacy and SPACY_AVAILABLE:
try:
self.nlp = spacy.load("en_core_web_sm")
print("[TextProcessor] spaCy model loaded successfully")
except OSError:
print("[TextProcessor] spaCy model not found. Run: python -m spacy download en_core_web_sm")
self.nlp = None
@staticmethod
def normalize_text(text: str, lowercase: bool = True, remove_special_chars: bool = False) -> str:
"""
Normalize text for analysis
Arguments:
----------
text { str } : Input text
lowercase { bool } : Convert to lowercase
remove_special_chars { bool } : Remove special characters
Returns:
--------
{ str } : Normalized text
"""
if lowercase:
text = text.lower()
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text)
if remove_special_chars:
# Keep alphanumeric and basic punctuation
text = re.sub(r'[^\w\s.,;:!?()\-\'\"&@#$%]', '', text)
return text.strip()
@staticmethod
def split_into_paragraphs(text: str, min_length: int = 20) -> List[str]:
"""
Split text into paragraphs
Arguments:
----------
text { str } : Input text
min_length { int } : Minimum paragraph length in characters
Returns:
--------
{ list } : List of paragraphs
"""
# Split on double newlines
paragraphs = re.split(r'\n\s*\n', text)
# Filter short and empty paragraphs
return [p.strip() for p in paragraphs if len(p.strip()) >= min_length]
@staticmethod
def extract_sentences(text: str, min_length: int = 10) -> List[str]:
"""
Extract sentences from text (basic method)
Arguments:
----------
text { str } : Input text
min_length { int } : Minimum sentence length in characters
Returns:
--------
{ list } : List of sentences
"""
# Simple sentence splitting on .!?
sentences = re.split(r'[.!?]+', text)
# Clean and filter
sentences = [s.strip() for s in sentences if len(s.strip()) >= min_length]
return sentences
def extract_sentences_advanced(self, text: str) -> List[Dict[str, Any]]:
"""
Extract sentences with NER and metadata using spaCy
Args:
text: Input text
Returns:
List of sentence dictionaries with entities and metadata
"""
if not self.nlp:
# Fallback to basic extraction
basic_sentences = self.extract_sentences(text)
return [{"text" : s, "entities" : [], "start_char" : 0, "end_char" : 0} for s in basic_sentences]
# Limit to 100K chars for performance
doc = self.nlp(text[:100000])
sentences = list()
for sent in doc.sents:
sentences.append({"text" : sent.text.strip(),
"entities" : [(ent.text, ent.label_) for ent in sent.ents],
"start_char" : sent.start_char,
"end_char" : sent.end_char,
"tokens" : [token.text for token in sent],
})
return sentences
@staticmethod
def extract_legal_entities(text: str) -> Dict[str, List[str]]:
"""
Extract legal-specific entities (parties, dates, amounts, references)
Arguments:
----------
text { str } : Input text
Returns:
--------
{ dict } : Dictionary of extracted entities by type
"""
entities = {"parties" : [],
"dates" : [],
"amounts" : [],
"addresses" : [],
"references" : [],
"emails" : [],
"phone_numbers" : [],
}
# Party names (PARTY A, "the Employee", Company Name Inc.)
party_patterns = [r'(?:PARTY|Party)\s+[A-Z]',
r'"the\s+\w+"',
r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\s+(?:Inc|LLC|Corp|Ltd|Limited|Company)\.?',
r'(?:the\s+)?(Employer|Employee|Consultant|Contractor|Client|Vendor|Supplier|Landlord|Tenant|Buyer|Seller)',
]
for pattern in party_patterns:
matches = re.findall(pattern, text)
entities["parties"].extend(matches)
# Dates (various formats)
date_patterns = [r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b',
r'\b(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}\b',
r'\b\d{1,2}\s+(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4}\b'
]
for pattern in date_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
entities["dates"].extend(matches)
# Legal references (Section 5.2, Clause 11.1, Article III)
ref_patterns = [r'(?:Section|Clause|Article|Paragraph|Exhibit|Schedule|Appendix)\s+(?:\d+(?:\.\d+)*|[IVXLCDM]+)']
for pattern in ref_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
entities["references"].extend(matches)
# Monetary amounts
entities["amounts"] = TextProcessor.extract_monetary_amounts(text)
# Email addresses
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
entities["emails"] = re.findall(email_pattern, text)
# Phone numbers (US format)
phone_pattern = r'\b(?:\+?1[-.\s]?)?\(?([0-9]{3})\)?[-.\s]?([0-9]{3})[-.\s]?([0-9]{4})\b'
phone_matches = re.findall(phone_pattern, text)
entities["phone_numbers"] = ['-'.join(match) for match in phone_matches]
# Deduplicate
for key in entities:
entities[key] = list(set(entities[key]))
return entities
@staticmethod
def count_words(text: str) -> int:
"""
Count words in text
"""
return len(text.split())
@staticmethod
def extract_numbers(text: str) -> List[str]:
"""
Extract all numbers from text
"""
return re.findall(r'\d+', text)
@staticmethod
def extract_monetary_amounts(text: str) -> List[str]:
"""
Extract monetary amounts from text
Returns:
--------
{ list } : List of monetary amounts (e.g., ['$1,000', '$2,500.00'])
"""
# Match patterns like $1,000 or $1000.00 or USD 1,000
patterns = [r'\$[\d,]+(?:\.\d{2})?',
r'USD\s*[\d,]+(?:\.\d{2})?',
r'EUR\s*[\d,]+(?:\.\d{2})?',
r'GBP\s*[\d,]+(?:\.\d{2})?'
]
amounts = list()
for pattern in patterns:
amounts.extend(re.findall(pattern, text, re.IGNORECASE))
return amounts
@staticmethod
def extract_durations(text: str) -> List[Dict[str, str]]:
"""
Extract time durations (e.g., "6 months", "2 years")
Returns:
--------
{ list } : List of duration dictionaries with 'amount' and 'unit'
"""
pattern = r'(\d+)\s*(day|week|month|year)s?'
matches = re.findall(pattern, text, re.IGNORECASE)
return [{"amount": m[0], "unit": m[1].lower()} for m in matches]
@staticmethod
def extract_percentages(text: str) -> List[str]:
"""
Extract percentages from text
"""
return re.findall(r'\d+(?:\.\d+)?%', text)
@staticmethod
def chunk_text_for_embedding(text: str, chunk_size: int = 512, overlap: int = 50) -> List[Dict[str, Any]]:
"""
Chunk text with overlap for embedding models (preserves sentence boundaries)
Arguments:
----------
text { str } : Input text
chunk_size { int } : Maximum chunk size in words
overlap { int } : Number of words to overlap between chunks
Returns:
--------
{ list } : List of chunk dictionaries with metadata
"""
sentences = TextProcessor.extract_sentences(text)
chunks = list()
current_chunk = list()
current_length = 0
start_sentence_idx = 0
for i, sentence in enumerate(sentences):
sentence_words = sentence.split()
sentence_length = len(sentence_words)
if (((current_length + sentence_length) > chunk_size) and current_chunk):
# Save current chunk
chunks.append({"text" : " ".join(current_chunk),
"start_sentence" : start_sentence_idx,
"end_sentence" : i - 1,
"word_count" : current_length,
"chunk_id" : len(chunks),
})
# Start new chunk with overlap
overlap_sentences = current_chunk[-2:] if (len(current_chunk) > 2) else current_chunk
current_chunk = overlap_sentences + [sentence]
current_length = sum(len(s.split()) for s in current_chunk)
start_sentence_idx = max(0, i - len(overlap_sentences))
else:
current_chunk.append(sentence)
current_length += sentence_length
# Add final chunk
if current_chunk:
chunks.append({"text" : " ".join(current_chunk),
"start_sentence" : start_sentence_idx,
"end_sentence" : len(sentences) - 1,
"word_count" : current_length,
"chunk_id" : len(chunks),
})
return chunks
@staticmethod
def text_similarity(text1: str, text2: str) -> float:
"""
Calculate similarity between two texts (0-1 scale)
Arguments:
----------
text1 { str } : First text
text2 { str } : Second text
Returns:
--------
{ float } : Similarity score (0.0 = completely different, 1.0 = identical)
"""
return SequenceMatcher(None, text1.lower(), text2.lower()).ratio()
@staticmethod
def deduplicate_clauses(clauses: List[str], threshold: float = 0.85) -> List[str]:
"""
Remove near-duplicate clauses
Arguments:
----------
clauses { list } : List of clause texts
threshold { float } : Similarity threshold for deduplication (0.0-1.0)
Returns:
--------
{ list } : List of unique clauses
"""
unique = list()
for clause in clauses:
is_duplicate = any(TextProcessor.text_similarity(clause, existing) > threshold for existing in unique)
if not is_duplicate:
unique.append(clause)
return unique
@staticmethod
def detect_language(text: str) -> str:
"""
Detect text language
Arguments:
----------
text { str } : Input text
Returns:
--------
{ str } : ISO 639-1 language code (e.g., 'en', 'es', 'fr')
"""
if not LANGDETECT_AVAILABLE:
# Default to English
return "en"
try:
# Use first 1000 chars for detection
return detect(text[:1000])
except LangDetectException:
return "en"
@staticmethod
def get_text_statistics(text: str) -> Dict[str, Any]:
"""
Get comprehensive text statistics
Returns:
--------
{ dict } : Dictionary with character count, word count, sentence count, etc.
"""
sentences = TextProcessor.extract_sentences(text)
paragraphs = TextProcessor.split_into_paragraphs(text)
words = text.split()
return {"character_count" : len(text),
"word_count" : len(words),
"sentence_count" : len(sentences),
"paragraph_count" : len(paragraphs),
"avg_words_per_sentence" : len(words) / len(sentences) if sentences else 0,
"avg_chars_per_word" : len(text) / len(words) if words else 0,
"language" : TextProcessor.detect_language(text),
}
@staticmethod
def highlight_keywords(text: str, keywords: List[str], highlight_format: str = "**{}**") -> str:
"""
Highlight keywords in text (for display purposes)
Arguments:
----------
text { str } : Input text
keywords { list } : List of keywords to highlight
highlight_format { str } : Format string with {} placeholder (default: Markdown bold)
Returns:
--------
{ str } : Text with highlighted keywords
"""
for keyword in keywords:
pattern = re.compile(re.escape(keyword), re.IGNORECASE)
text = pattern.sub(lambda m: highlight_format.format(m.group(0)), text)
return text
@staticmethod
def extract_numbered_sections(text: str) -> List[Dict[str, Any]]:
"""
Extract numbered sections/clauses (1.1, 1.2, Article 5, etc.)
Returns:
--------
{ list } : List of section dictionaries with number and text
"""
patterns = [(r'(\d+\.\d+(?:\.\d+)*)\.\s*([^\n]{20,}?)(?=\n\s*\d+\.\d+|\n\n|$)', 'numbered'),
(r'(Article\s+(?:\d+|[IVXLCDM]+))\.\s*([^\n]{20,}?)(?=\nArticle|\n\n|$)', 'article'),
(r'(Section\s+(?:\d+|[IVXLCDM]+))\.\s*([^\n]{20,}?)(?=\nSection|\n\n|$)', 'section'),
(r'(Clause\s+\d+(?:\.\d+)*)\.\s*([^\n]{20,}?)(?=\nClause|\n\n|$)', 'clause'),
]
sections = list()
for pattern, section_type in patterns:
matches = re.finditer(pattern, text, re.IGNORECASE | re.DOTALL)
for match in matches:
sections.append({"reference" : match.group(1).strip(),
"text" : match.group(2).strip(),
"type" : section_type,
"start_pos" : match.start(),
"end_pos" : match.end(),
})
# Sort by position
sections.sort(key = lambda x: x['start_pos'])
return sections
@staticmethod
def clean_legal_text(text: str) -> str:
"""
Clean legal text by removing boilerplate artifacts
Arguments:
----------
text { str } : Input legal text
Returns:
--------
{ str } : Cleaned text
"""
# Remove "Page X of Y" markers
text = re.sub(r'Page\s+\d+\s+of\s+\d+', '', text, flags = re.IGNORECASE)
# Remove "[Signature Page Follows]" type markers
text = re.sub(r'\[.*?(?:Signature|Initial|Page).*?\]', '', text, flags = re.IGNORECASE)
# Remove excessive underscores (signature lines)
text = re.sub(r'_{3,}', '', text)
# Remove "CONFIDENTIAL" watermarks
text = re.sub(r'\b(CONFIDENTIAL|DRAFT|INTERNAL USE ONLY)\b', '', text, flags = re.IGNORECASE)
# Clean up resulting whitespace
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r' {2,}', ' ', text)
return text.strip()