Spaces:
Running
Running
| # DEPENDENCIES | |
| import re | |
| from typing import Any | |
| from typing import List | |
| from typing import Dict | |
| from typing import Optional | |
| from difflib import SequenceMatcher | |
| # Advanced NLP (optional but recommended) | |
| try: | |
| import spacy | |
| SPACY_AVAILABLE = True | |
| except ImportError: | |
| SPACY_AVAILABLE = False | |
| print("[TextProcessor] spaCy not available. Install with: pip install spacy && python -m spacy download en_core_web_sm") | |
| # Language detection | |
| try: | |
| from langdetect import detect, LangDetectException | |
| LANGDETECT_AVAILABLE = True | |
| except ImportError: | |
| LANGDETECT_AVAILABLE = False | |
| class TextProcessor: | |
| """ | |
| Text processing and normalization utilities | |
| """ | |
| def __init__(self, use_spacy: bool = True): | |
| """ | |
| Initialize text processor | |
| Arguments: | |
| ---------- | |
| use_spacy { bool } : Whether to use spaCy for advanced NLP (if available) | |
| """ | |
| self.nlp = None | |
| if use_spacy and SPACY_AVAILABLE: | |
| try: | |
| self.nlp = spacy.load("en_core_web_sm") | |
| print("[TextProcessor] spaCy model loaded successfully") | |
| except OSError: | |
| print("[TextProcessor] spaCy model not found. Run: python -m spacy download en_core_web_sm") | |
| self.nlp = None | |
| def normalize_text(text: str, lowercase: bool = True, remove_special_chars: bool = False) -> str: | |
| """ | |
| Normalize text for analysis | |
| Arguments: | |
| ---------- | |
| text { str } : Input text | |
| lowercase { bool } : Convert to lowercase | |
| remove_special_chars { bool } : Remove special characters | |
| Returns: | |
| -------- | |
| { str } : Normalized text | |
| """ | |
| if lowercase: | |
| text = text.lower() | |
| # Remove extra whitespace | |
| text = re.sub(r'\s+', ' ', text) | |
| if remove_special_chars: | |
| # Keep alphanumeric and basic punctuation | |
| text = re.sub(r'[^\w\s.,;:!?()\-\'\"&@#$%]', '', text) | |
| return text.strip() | |
| def split_into_paragraphs(text: str, min_length: int = 20) -> List[str]: | |
| """ | |
| Split text into paragraphs | |
| Arguments: | |
| ---------- | |
| text { str } : Input text | |
| min_length { int } : Minimum paragraph length in characters | |
| Returns: | |
| -------- | |
| { list } : List of paragraphs | |
| """ | |
| # Split on double newlines | |
| paragraphs = re.split(r'\n\s*\n', text) | |
| # Filter short and empty paragraphs | |
| return [p.strip() for p in paragraphs if len(p.strip()) >= min_length] | |
| def extract_sentences(text: str, min_length: int = 10) -> List[str]: | |
| """ | |
| Extract sentences from text (basic method) | |
| Arguments: | |
| ---------- | |
| text { str } : Input text | |
| min_length { int } : Minimum sentence length in characters | |
| Returns: | |
| -------- | |
| { list } : List of sentences | |
| """ | |
| # Simple sentence splitting on .!? | |
| sentences = re.split(r'[.!?]+', text) | |
| # Clean and filter | |
| sentences = [s.strip() for s in sentences if len(s.strip()) >= min_length] | |
| return sentences | |
| def extract_sentences_advanced(self, text: str) -> List[Dict[str, Any]]: | |
| """ | |
| Extract sentences with NER and metadata using spaCy | |
| Args: | |
| text: Input text | |
| Returns: | |
| List of sentence dictionaries with entities and metadata | |
| """ | |
| if not self.nlp: | |
| # Fallback to basic extraction | |
| basic_sentences = self.extract_sentences(text) | |
| return [{"text" : s, "entities" : [], "start_char" : 0, "end_char" : 0} for s in basic_sentences] | |
| # Limit to 100K chars for performance | |
| doc = self.nlp(text[:100000]) | |
| sentences = list() | |
| for sent in doc.sents: | |
| sentences.append({"text" : sent.text.strip(), | |
| "entities" : [(ent.text, ent.label_) for ent in sent.ents], | |
| "start_char" : sent.start_char, | |
| "end_char" : sent.end_char, | |
| "tokens" : [token.text for token in sent], | |
| }) | |
| return sentences | |
| def extract_legal_entities(text: str) -> Dict[str, List[str]]: | |
| """ | |
| Extract legal-specific entities (parties, dates, amounts, references) | |
| Arguments: | |
| ---------- | |
| text { str } : Input text | |
| Returns: | |
| -------- | |
| { dict } : Dictionary of extracted entities by type | |
| """ | |
| entities = {"parties" : [], | |
| "dates" : [], | |
| "amounts" : [], | |
| "addresses" : [], | |
| "references" : [], | |
| "emails" : [], | |
| "phone_numbers" : [], | |
| } | |
| # Party names (PARTY A, "the Employee", Company Name Inc.) | |
| party_patterns = [r'(?:PARTY|Party)\s+[A-Z]', | |
| r'"the\s+\w+"', | |
| r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\s+(?:Inc|LLC|Corp|Ltd|Limited|Company)\.?', | |
| r'(?:the\s+)?(Employer|Employee|Consultant|Contractor|Client|Vendor|Supplier|Landlord|Tenant|Buyer|Seller)', | |
| ] | |
| for pattern in party_patterns: | |
| matches = re.findall(pattern, text) | |
| entities["parties"].extend(matches) | |
| # Dates (various formats) | |
| date_patterns = [r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b', | |
| r'\b(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}\b', | |
| r'\b\d{1,2}\s+(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4}\b' | |
| ] | |
| for pattern in date_patterns: | |
| matches = re.findall(pattern, text, re.IGNORECASE) | |
| entities["dates"].extend(matches) | |
| # Legal references (Section 5.2, Clause 11.1, Article III) | |
| ref_patterns = [r'(?:Section|Clause|Article|Paragraph|Exhibit|Schedule|Appendix)\s+(?:\d+(?:\.\d+)*|[IVXLCDM]+)'] | |
| for pattern in ref_patterns: | |
| matches = re.findall(pattern, text, re.IGNORECASE) | |
| entities["references"].extend(matches) | |
| # Monetary amounts | |
| entities["amounts"] = TextProcessor.extract_monetary_amounts(text) | |
| # Email addresses | |
| email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' | |
| entities["emails"] = re.findall(email_pattern, text) | |
| # Phone numbers (US format) | |
| phone_pattern = r'\b(?:\+?1[-.\s]?)?\(?([0-9]{3})\)?[-.\s]?([0-9]{3})[-.\s]?([0-9]{4})\b' | |
| phone_matches = re.findall(phone_pattern, text) | |
| entities["phone_numbers"] = ['-'.join(match) for match in phone_matches] | |
| # Deduplicate | |
| for key in entities: | |
| entities[key] = list(set(entities[key])) | |
| return entities | |
| def count_words(text: str) -> int: | |
| """ | |
| Count words in text | |
| """ | |
| return len(text.split()) | |
| def extract_numbers(text: str) -> List[str]: | |
| """ | |
| Extract all numbers from text | |
| """ | |
| return re.findall(r'\d+', text) | |
| def extract_monetary_amounts(text: str) -> List[str]: | |
| """ | |
| Extract monetary amounts from text | |
| Returns: | |
| -------- | |
| { list } : List of monetary amounts (e.g., ['$1,000', '$2,500.00']) | |
| """ | |
| # Match patterns like $1,000 or $1000.00 or USD 1,000 | |
| patterns = [r'\$[\d,]+(?:\.\d{2})?', | |
| r'USD\s*[\d,]+(?:\.\d{2})?', | |
| r'EUR\s*[\d,]+(?:\.\d{2})?', | |
| r'GBP\s*[\d,]+(?:\.\d{2})?' | |
| ] | |
| amounts = list() | |
| for pattern in patterns: | |
| amounts.extend(re.findall(pattern, text, re.IGNORECASE)) | |
| return amounts | |
| def extract_durations(text: str) -> List[Dict[str, str]]: | |
| """ | |
| Extract time durations (e.g., "6 months", "2 years") | |
| Returns: | |
| -------- | |
| { list } : List of duration dictionaries with 'amount' and 'unit' | |
| """ | |
| pattern = r'(\d+)\s*(day|week|month|year)s?' | |
| matches = re.findall(pattern, text, re.IGNORECASE) | |
| return [{"amount": m[0], "unit": m[1].lower()} for m in matches] | |
| def extract_percentages(text: str) -> List[str]: | |
| """ | |
| Extract percentages from text | |
| """ | |
| return re.findall(r'\d+(?:\.\d+)?%', text) | |
| def chunk_text_for_embedding(text: str, chunk_size: int = 512, overlap: int = 50) -> List[Dict[str, Any]]: | |
| """ | |
| Chunk text with overlap for embedding models (preserves sentence boundaries) | |
| Arguments: | |
| ---------- | |
| text { str } : Input text | |
| chunk_size { int } : Maximum chunk size in words | |
| overlap { int } : Number of words to overlap between chunks | |
| Returns: | |
| -------- | |
| { list } : List of chunk dictionaries with metadata | |
| """ | |
| sentences = TextProcessor.extract_sentences(text) | |
| chunks = list() | |
| current_chunk = list() | |
| current_length = 0 | |
| start_sentence_idx = 0 | |
| for i, sentence in enumerate(sentences): | |
| sentence_words = sentence.split() | |
| sentence_length = len(sentence_words) | |
| if (((current_length + sentence_length) > chunk_size) and current_chunk): | |
| # Save current chunk | |
| chunks.append({"text" : " ".join(current_chunk), | |
| "start_sentence" : start_sentence_idx, | |
| "end_sentence" : i - 1, | |
| "word_count" : current_length, | |
| "chunk_id" : len(chunks), | |
| }) | |
| # Start new chunk with overlap | |
| overlap_sentences = current_chunk[-2:] if (len(current_chunk) > 2) else current_chunk | |
| current_chunk = overlap_sentences + [sentence] | |
| current_length = sum(len(s.split()) for s in current_chunk) | |
| start_sentence_idx = max(0, i - len(overlap_sentences)) | |
| else: | |
| current_chunk.append(sentence) | |
| current_length += sentence_length | |
| # Add final chunk | |
| if current_chunk: | |
| chunks.append({"text" : " ".join(current_chunk), | |
| "start_sentence" : start_sentence_idx, | |
| "end_sentence" : len(sentences) - 1, | |
| "word_count" : current_length, | |
| "chunk_id" : len(chunks), | |
| }) | |
| return chunks | |
| def text_similarity(text1: str, text2: str) -> float: | |
| """ | |
| Calculate similarity between two texts (0-1 scale) | |
| Arguments: | |
| ---------- | |
| text1 { str } : First text | |
| text2 { str } : Second text | |
| Returns: | |
| -------- | |
| { float } : Similarity score (0.0 = completely different, 1.0 = identical) | |
| """ | |
| return SequenceMatcher(None, text1.lower(), text2.lower()).ratio() | |
| def deduplicate_clauses(clauses: List[str], threshold: float = 0.85) -> List[str]: | |
| """ | |
| Remove near-duplicate clauses | |
| Arguments: | |
| ---------- | |
| clauses { list } : List of clause texts | |
| threshold { float } : Similarity threshold for deduplication (0.0-1.0) | |
| Returns: | |
| -------- | |
| { list } : List of unique clauses | |
| """ | |
| unique = list() | |
| for clause in clauses: | |
| is_duplicate = any(TextProcessor.text_similarity(clause, existing) > threshold for existing in unique) | |
| if not is_duplicate: | |
| unique.append(clause) | |
| return unique | |
| def detect_language(text: str) -> str: | |
| """ | |
| Detect text language | |
| Arguments: | |
| ---------- | |
| text { str } : Input text | |
| Returns: | |
| -------- | |
| { str } : ISO 639-1 language code (e.g., 'en', 'es', 'fr') | |
| """ | |
| if not LANGDETECT_AVAILABLE: | |
| # Default to English | |
| return "en" | |
| try: | |
| # Use first 1000 chars for detection | |
| return detect(text[:1000]) | |
| except LangDetectException: | |
| return "en" | |
| def get_text_statistics(text: str) -> Dict[str, Any]: | |
| """ | |
| Get comprehensive text statistics | |
| Returns: | |
| -------- | |
| { dict } : Dictionary with character count, word count, sentence count, etc. | |
| """ | |
| sentences = TextProcessor.extract_sentences(text) | |
| paragraphs = TextProcessor.split_into_paragraphs(text) | |
| words = text.split() | |
| return {"character_count" : len(text), | |
| "word_count" : len(words), | |
| "sentence_count" : len(sentences), | |
| "paragraph_count" : len(paragraphs), | |
| "avg_words_per_sentence" : len(words) / len(sentences) if sentences else 0, | |
| "avg_chars_per_word" : len(text) / len(words) if words else 0, | |
| "language" : TextProcessor.detect_language(text), | |
| } | |
| def highlight_keywords(text: str, keywords: List[str], highlight_format: str = "**{}**") -> str: | |
| """ | |
| Highlight keywords in text (for display purposes) | |
| Arguments: | |
| ---------- | |
| text { str } : Input text | |
| keywords { list } : List of keywords to highlight | |
| highlight_format { str } : Format string with {} placeholder (default: Markdown bold) | |
| Returns: | |
| -------- | |
| { str } : Text with highlighted keywords | |
| """ | |
| for keyword in keywords: | |
| pattern = re.compile(re.escape(keyword), re.IGNORECASE) | |
| text = pattern.sub(lambda m: highlight_format.format(m.group(0)), text) | |
| return text | |
| def extract_numbered_sections(text: str) -> List[Dict[str, Any]]: | |
| """ | |
| Extract numbered sections/clauses (1.1, 1.2, Article 5, etc.) | |
| Returns: | |
| -------- | |
| { list } : List of section dictionaries with number and text | |
| """ | |
| patterns = [(r'(\d+\.\d+(?:\.\d+)*)\.\s*([^\n]{20,}?)(?=\n\s*\d+\.\d+|\n\n|$)', 'numbered'), | |
| (r'(Article\s+(?:\d+|[IVXLCDM]+))\.\s*([^\n]{20,}?)(?=\nArticle|\n\n|$)', 'article'), | |
| (r'(Section\s+(?:\d+|[IVXLCDM]+))\.\s*([^\n]{20,}?)(?=\nSection|\n\n|$)', 'section'), | |
| (r'(Clause\s+\d+(?:\.\d+)*)\.\s*([^\n]{20,}?)(?=\nClause|\n\n|$)', 'clause'), | |
| ] | |
| sections = list() | |
| for pattern, section_type in patterns: | |
| matches = re.finditer(pattern, text, re.IGNORECASE | re.DOTALL) | |
| for match in matches: | |
| sections.append({"reference" : match.group(1).strip(), | |
| "text" : match.group(2).strip(), | |
| "type" : section_type, | |
| "start_pos" : match.start(), | |
| "end_pos" : match.end(), | |
| }) | |
| # Sort by position | |
| sections.sort(key = lambda x: x['start_pos']) | |
| return sections | |
| def clean_legal_text(text: str) -> str: | |
| """ | |
| Clean legal text by removing boilerplate artifacts | |
| Arguments: | |
| ---------- | |
| text { str } : Input legal text | |
| Returns: | |
| -------- | |
| { str } : Cleaned text | |
| """ | |
| # Remove "Page X of Y" markers | |
| text = re.sub(r'Page\s+\d+\s+of\s+\d+', '', text, flags = re.IGNORECASE) | |
| # Remove "[Signature Page Follows]" type markers | |
| text = re.sub(r'\[.*?(?:Signature|Initial|Page).*?\]', '', text, flags = re.IGNORECASE) | |
| # Remove excessive underscores (signature lines) | |
| text = re.sub(r'_{3,}', '', text) | |
| # Remove "CONFIDENTIAL" watermarks | |
| text = re.sub(r'\b(CONFIDENTIAL|DRAFT|INTERNAL USE ONLY)\b', '', text, flags = re.IGNORECASE) | |
| # Clean up resulting whitespace | |
| text = re.sub(r'\n{3,}', '\n\n', text) | |
| text = re.sub(r' {2,}', ' ', text) | |
| return text.strip() | |