Spaces:
Running
Running
| # DEPENDENCIES | |
| import re | |
| import sys | |
| import torch | |
| import numpy as np | |
| from typing import Any | |
| from typing import List | |
| from typing import Dict | |
| from typing import Tuple | |
| from pathlib import Path | |
| from typing import Optional | |
| from collections import defaultdict | |
| from sentence_transformers import util | |
| # Import utilities | |
| sys.path.append(str(Path(__file__).parent.parent)) | |
| from utils.logger import log_info | |
| from utils.logger import log_error | |
| from config.risk_rules import RiskRules | |
| from config.risk_rules import ContractType | |
| from utils.text_processor import TextProcessor | |
| from utils.logger import ContractAnalyzerLogger | |
| from services.data_models import ExtractedClause | |
| from model_manager.model_loader import ModelLoader | |
| from services.data_models import ClauseInterpretation | |
| class ComprehensiveClauseExtractor: | |
| """ | |
| For general clause extraction across all contract types : Extracts and classifies clauses using Legal-BERT + structural patterns | |
| Will be used for: General document analysis, clause discovery, contract understanding | |
| """ | |
| # COMPREHENSIVE CLAUSE CATEGORIES COVERING ALL LEGAL AREAS | |
| CLAUSE_CATEGORIES = {'compensation' : {'keywords' : ['salary', 'wage', 'compensation', 'pay', 'payment', 'bonus', 'commission', 'remuneration', 'fee', 'rate', 'benefits', 'equity', 'stock options', 'incentive'], | |
| 'representative_text' : ("The Employee shall receive an annual base salary of One Hundred Thousand Dollars payable in accordance with the Company's standard payroll practices. Additional compensation may include performance bonuses and stock options."), | |
| 'weight' : 1.0, | |
| }, | |
| 'termination' : {'keywords' : ['termination', 'terminate', 'notice period', 'resignation', 'dismissal', 'severance', 'end of employment', 'cessation', 'notice', 'for cause', 'without cause'], | |
| 'representative_text' : ("Either party may terminate this Agreement upon thirty days written notice. The Company may terminate for cause immediately upon written notice to Employee. Upon termination, Employee shall receive severance compensation."), | |
| 'weight' : 1.2, | |
| }, | |
| 'non_compete' : {'keywords' : ['non-compete', 'non-solicit', 'non-solicitation', 'restrictive covenant', 'competitive', 'competition', 'competing business', 'competitive activities', 'non-competition'], | |
| 'representative_text' : ("Employee agrees not to engage in any competitive business activities for a period of twelve months following termination within a fifty-mile radius. Employee shall not solicit Company clients or employees during this period."), | |
| 'weight' : 1.5, | |
| }, | |
| 'confidentiality' : {'keywords' : ['confidential', 'proprietary', 'trade secret', 'disclosure', 'confidentiality', 'secret', 'private', 'non-disclosure', 'protected information'], | |
| 'representative_text' : ("Employee shall maintain the confidentiality of all proprietary information and trade secrets of the Company. Confidential Information includes business plans, customer lists, and technical data. These obligations survive termination."), | |
| 'weight' : 1.1, | |
| }, | |
| 'indemnification' : {'keywords' : ['indemnify', 'indemnification', 'hold harmless', 'defend', 'liability', 'claims', 'losses', 'damages', 'indemnity'], | |
| 'representative_text' : ("Party A shall indemnify and hold harmless Party B from any claims, losses, or damages arising from Party A's breach or negligence. This indemnification includes reasonable attorneys' fees and costs of defense."), | |
| 'weight' : 1.3, | |
| }, | |
| 'intellectual_property' : {'keywords' : ['intellectual property', 'ip', 'copyright', 'patent', 'trademark', 'work product', 'inventions', 'creation', 'ownership', 'ip rights', 'proprietary rights'], | |
| 'representative_text' : ("All work product and inventions created by Employee during employment shall be the exclusive property of the Company. Employee assigns all intellectual property rights including patents, copyrights, and trade secrets to the Company."), | |
| 'weight' : 1.2, | |
| }, | |
| 'liability' : {'keywords' : ['liable', 'liability', 'damages', 'limitation', 'consequential', 'indirect', 'punitive', 'cap', 'limited liability', 'responsibility'], | |
| 'representative_text' : ("In no event shall either party be liable for indirect, incidental, or consequential damages. Total liability under this Agreement shall not exceed the amounts paid in the twelve months preceding the claim."), | |
| 'weight' : 1.2, | |
| }, | |
| 'warranty' : {'keywords' : ['warranty', 'warrant', 'representation', 'guarantee', 'assurance', 'promise', 'warranties', 'guaranty'], | |
| 'representative_text' : ("Company warrants that the Services will be performed in a professional manner. EXCEPT AS EXPRESSLY PROVIDED, COMPANY DISCLAIMS ALL WARRANTIES, EXPRESS OR IMPLIED, INCLUDING WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE."), | |
| 'weight' : 0.9, | |
| }, | |
| 'dispute_resolution' : {'keywords' : ['arbitration', 'mediation', 'dispute', 'jurisdiction', 'governing law', 'venue', 'forum', 'resolution', 'litigation'], | |
| 'representative_text' : ("Any disputes arising under this Agreement shall be resolved through binding arbitration in accordance with the rules of the American Arbitration Association. This Agreement shall be governed by the laws of the State of California."), | |
| 'weight' : 0.9, | |
| }, | |
| 'insurance' : {'keywords' : ['insurance', 'coverage', 'insured', 'policy', 'premium', 'insurer', 'liability insurance'], | |
| 'representative_text' : ("Contractor shall maintain general liability insurance with minimum coverage of one million dollars per occurrence. Proof of insurance shall be provided to Client. Company shall be named as additional insured on all policies."), | |
| 'weight' : 0.8, | |
| }, | |
| 'assignment' : {'keywords' : ['assignment', 'assign', 'transfer', 'successor', 'binding', 'assignee', 'assignor'], | |
| 'representative_text' : ("This Agreement may not be assigned by either party without the prior written consent of the other party. This Agreement shall be binding upon and inure to the benefit of the parties' successors and permitted assigns."), | |
| 'weight' : 0.8, | |
| }, | |
| 'amendment' : {'keywords' : ['amendment', 'modify', 'modification', 'change', 'alteration', 'waiver', 'amend'], | |
| 'representative_text' : ("This Agreement may not be amended or modified except by written instrument signed by both parties. No waiver of any provision shall be effective unless in writing. All modifications must be mutually agreed upon."), | |
| 'weight' : 0.7, | |
| }, | |
| 'force_majeure' : {'keywords' : ['force majeure', 'act of god', 'unforeseeable', 'beyond control', 'natural disaster', 'unforeseen circumstances'], | |
| 'representative_text' : ("Neither party shall be liable for failure to perform due to causes beyond its reasonable control including acts of God, war, strikes, or natural disasters. Performance shall be suspended during the force majeure event."), | |
| 'weight' : 0.7, | |
| }, | |
| 'entire_agreement' : {'keywords' : ['entire agreement', 'integration', 'supersedes', 'prior agreements', 'complete agreement', 'whole agreement'], | |
| 'representative_text' : ("This Agreement constitutes the entire agreement between the parties and supersedes all prior agreements, whether written or oral. No other representations or warranties shall be binding unless incorporated herein."), | |
| 'weight' : 0.6, | |
| }, | |
| 'payment_terms' : {'keywords' : ['payment terms', 'net 30', 'due date', 'invoice', 'billing', 'payment due', 'late payment', 'interest'], | |
| 'representative_text' : ("Payment shall be due within thirty days of invoice date. Late payments shall accrue interest at the rate of 1.5% per month. All payments shall be made in US dollars."), | |
| 'weight' : 0.9, | |
| }, | |
| 'governing_law' : {'keywords' : ['governing law', 'jurisdiction', 'venue', 'applicable law', 'state law', 'federal law'], | |
| 'representative_text' : ("This Agreement shall be governed by and construed in accordance with the laws of the State of Delaware. Any legal action shall be brought in the state or federal courts located in Wilmington, Delaware."), | |
| 'weight' : 0.8, | |
| }, | |
| 'general' : {'keywords' : ['provision', 'term', 'condition', 'obligation', 'requirement', 'clause', 'section'], | |
| 'representative_text' : ("The parties agree to the following terms and conditions governing their relationship. Each party shall perform its obligations in good faith and in accordance with industry standards and applicable law."), | |
| 'weight' : 0.5, | |
| } | |
| } | |
| def __init__(self, model_loader: ModelLoader): | |
| """ | |
| Initialize comprehensive clause extractor | |
| Arguments: | |
| ---------- | |
| model_loader { ModelLoader } : ModelLoader instance for accessing Legal-BERT | |
| """ | |
| self.model_loader = model_loader | |
| # Models (lazy loaded) | |
| self.legal_bert_model = None | |
| self.legal_bert_tokenizer = None | |
| self.embedding_model = None | |
| self.device = None | |
| # Category embeddings (computed from representative texts) | |
| self.category_embeddings = dict() | |
| # Text processor | |
| self.text_processor = TextProcessor(use_spacy = False) | |
| # Logger | |
| self.logger = ContractAnalyzerLogger.get_logger() | |
| # Lazy load | |
| self._lazy_load() | |
| # Risk Rules | |
| self.risk_rules = RiskRules() | |
| def _lazy_load(self): | |
| """ | |
| Lazy load Legal-BERT and embedding models | |
| """ | |
| if self.legal_bert_model is None: | |
| try: | |
| log_info("Loading Legal-BERT for comprehensive clause extraction...") | |
| # Load Legal-BERT (nlpaueb/legal-bert-base-uncased) | |
| self.legal_bert_model, self.legal_bert_tokenizer = self.model_loader.load_legal_bert() | |
| self.device = self.model_loader.device | |
| # Load sentence transformer for embeddings | |
| self.embedding_model = self.model_loader.load_embedding_model() | |
| # Prepare category embeddings using Legal-BERT | |
| self._prepare_category_embeddings() | |
| log_info("Comprehensive clause extractor models loaded successfully") | |
| except Exception as e: | |
| log_error(e, context = {"component": "ComprehensiveClauseExtractor", "operation": "model_loading"}) | |
| raise | |
| def _prepare_category_embeddings(self): | |
| """ | |
| Pre-compute Legal-BERT embeddings for category representative texts | |
| """ | |
| log_info("Computing Legal-BERT embeddings for comprehensive clause categories...") | |
| for category, config in self.CLAUSE_CATEGORIES.items(): | |
| representative_text = config['representative_text'] | |
| # Get Legal-BERT embedding (using [CLS] token) | |
| embedding = self._get_legal_bert_embedding(text = representative_text) | |
| self.category_embeddings[category] = embedding | |
| log_info(f"Prepared Legal-BERT embeddings for {len(self.category_embeddings)} categories") | |
| def _get_legal_bert_embedding(self, text: str) -> np.ndarray: | |
| """ | |
| Get Legal-BERT embedding for text using [CLS] token | |
| """ | |
| # Tokenize | |
| inputs = self.legal_bert_tokenizer(text, | |
| return_tensors = "pt", | |
| padding = True, | |
| truncation = True, | |
| max_length = 512, | |
| ).to(self.device) | |
| # Get embeddings | |
| with torch.no_grad(): | |
| outputs = self.legal_bert_model(**inputs) | |
| # Use [CLS] token embedding (first token) | |
| cls_embedding = outputs.last_hidden_state[:, 0, :].cpu().numpy()[0] | |
| return cls_embedding | |
| def extract_clauses(self, contract_text: str, max_clauses: int = 25) -> List[ExtractedClause]: | |
| """ | |
| Extract and classify clauses from contract using hybrid approach | |
| Process: | |
| 1. Structural extraction (numbered sections) | |
| 2. Semantic chunking (for unstructured text) | |
| 3. Legal-BERT classification | |
| 4. Deduplicate and rank by confidence | |
| Arguments: | |
| ---------- | |
| contract_text { str } : Full contract text | |
| max_clauses { int } : Maximum number of clauses to return | |
| Returns: | |
| -------- | |
| { list } : List of ExtractedClause objects sorted by confidence | |
| """ | |
| log_info("Starting comprehensive clause extraction", | |
| text_length = len(contract_text), | |
| max_clauses = max_clauses, | |
| ) | |
| # Extract using structural patterns | |
| structural_clauses = self._extract_structural_clauses(contract_text) | |
| log_info(f"Extracted {len(structural_clauses)} structural clauses") | |
| # Semantic chunking for unstructured parts | |
| semantic_chunks = self._semantic_chunking(contract_text, structural_clauses) | |
| log_info(f"Created {len(semantic_chunks)} semantic chunks") | |
| # Combine all candidates | |
| all_candidates = structural_clauses + semantic_chunks | |
| log_info(f"Total candidates: {len(all_candidates)}") | |
| # Classify with Legal-BERT | |
| classified_clauses = self._classify_clauses_with_legal_bert(all_candidates) | |
| log_info(f"Classified {len(classified_clauses)} clauses") | |
| # Deduplicate and rank | |
| final_clauses = self._deduplicate_and_rank(classified_clauses, max_clauses) | |
| log_info(f"Final output: {len(final_clauses)} clauses") | |
| return final_clauses | |
| def generate_clause_analysis(self, clause: ExtractedClause, llm_interpretation: ClauseInterpretation = None) -> Dict[str, str]: | |
| """ | |
| Generate analysis and recommendation for a clause | |
| Arguments: | |
| ---------- | |
| clause { ExtractedClause } : ExtractedClause object | |
| llm_interpretation { ClauseInterpretation } : Optional ClauseInterpretation from LLM | |
| Returns: | |
| -------- | |
| { dict } : Dictionary with 'analysis' and 'recommendation' keys | |
| """ | |
| if llm_interpretation: | |
| # Use LLM interpretation if available | |
| analysis = llm_interpretation.plain_english_summary | |
| # Combine key points into analysis | |
| if llm_interpretation.key_points: | |
| analysis += " " + " ".join(llm_interpretation.key_points[:2]) | |
| # Combine potential risks into analysis | |
| if llm_interpretation.potential_risks: | |
| risk_text = " Key risks: " + ", ".join(llm_interpretation.potential_risks[:2]) | |
| analysis += risk_text | |
| # Use suggested improvements as recommendation | |
| if llm_interpretation.suggested_improvements: | |
| recommendation = " ".join(llm_interpretation.suggested_improvements[:2]) | |
| else: | |
| recommendation = "Review this clause with legal counsel for specific recommendations." | |
| else: | |
| # Fallback: Generate analysis from risk indicators and category | |
| risk_indicators = clause.risk_indicators if clause.risk_indicators else [] | |
| risk_score = getattr(clause, 'risk_score', 0) | |
| # Generate specific analysis based on category and risk | |
| analysis = self._generate_fallback_analysis(clause = clause, | |
| risk_indicators = risk_indicators, | |
| risk_score = risk_score, | |
| ) | |
| recommendation = self._generate_fallback_recommendation(clause = clause, | |
| risk_indicators = risk_indicators, | |
| risk_score = risk_score, | |
| ) | |
| return {'analysis' : analysis, | |
| 'recommendation' : recommendation, | |
| } | |
| def _generate_fallback_analysis(self, clause: ExtractedClause, risk_indicators: List[str], risk_score: float) -> str: | |
| """ | |
| Generate fallback analysis when LLM unavailable | |
| """ | |
| category_analyses = {'compensation' : f"This compensation clause {'contains concerning terms' if risk_score > 50 else 'appears standard'} regarding payment obligations and structures. ", | |
| 'termination' : f"This termination clause {'creates significant imbalance' if risk_score > 60 else 'establishes'} the conditions and procedures for ending the agreement. ", | |
| 'non_compete' : f"This restrictive covenant {'is overly broad and' if risk_score > 60 else ''} limits future business activities and employment opportunities. ", | |
| 'confidentiality' : f"This confidentiality provision {'has excessive scope' if risk_score > 50 else 'defines'} the obligations to protect sensitive information. ", | |
| 'indemnification' : f"This indemnification clause {'creates one-sided liability exposure' if risk_score > 60 else 'allocates'} responsibility for claims and losses. ", | |
| 'intellectual_property' : f"This IP clause {'may claim overly broad ownership' if risk_score > 50 else 'addresses'} rights to work product and inventions. ", | |
| 'liability' : f"This liability provision {'lacks adequate caps or limitations' if risk_score > 60 else 'establishes'} the financial exposure for damages. ", | |
| } | |
| analysis = category_analyses.get(clause.category, f"This {clause.category} clause establishes specific rights and obligations. ") | |
| # Add risk-specific details | |
| if risk_indicators: | |
| analysis += f"Specific concerns include: {', '.join(risk_indicators[:3])}. " | |
| if (risk_score > 70): | |
| analysis += "This clause requires immediate attention and likely modification." | |
| elif (risk_score > 50): | |
| analysis += "This clause should be reviewed carefully and potentially negotiated." | |
| else: | |
| analysis += "This clause appears to contain standard provisions for this type of agreement." | |
| return analysis | |
| def _generate_fallback_recommendation(self, clause: ExtractedClause, risk_indicators: List[str], risk_score: float) -> str: | |
| """ | |
| Generate fallback recommendation when LLM unavailable | |
| """ | |
| if (risk_score > 70): | |
| return f"Strongly recommend negotiating substantial changes to this clause. Seek legal counsel to address the identified risks and ensure your interests are protected." | |
| elif (risk_score > 50): | |
| return f"Negotiate modifications to balance the terms more fairly. Consider adding protective language or limiting the scope of obligations." | |
| elif (risk_score > 30): | |
| return f"Review with legal counsel to ensure the terms are clear and acceptable. Minor clarifications may be beneficial." | |
| else: | |
| return f"Standard clause - review for consistency with the overall agreement and your business needs." | |
| def _extract_structural_clauses(self, text: str) -> List[Dict]: | |
| """ | |
| Extract clauses using structural numbering patterns | |
| """ | |
| candidates = list() | |
| # Clean text | |
| text = re.sub(r'\s+', ' ', text) | |
| # Patterns for legal numbering | |
| patterns = [(r'(\d+\.\d+(?:\.\d+)*)\.\s*([^\n]{30,800}?)(?=\d+\.\d+(?:\.\d+)*\.|$)', 'numbered'), | |
| (r'(Article\s+(?:\d+(?:\.\d+)*|[IVXLCDM]+))\.\s*([^\n]{30,800}?)(?=Article\s+(?:\d+|[IVXLCDM]+)|$)', 'article'), | |
| (r'(Section\s+\d+(?:\.\d+)*)\.\s*([^\n]{30,800}?)(?=Section\s+\d+|$)', 'section'), | |
| (r'(Clause\s+\d+(?:\.\d+)*)\.\s*([^\n]{30,800}?)(?=Clause\s+\d+|$)', 'clause'), | |
| (r'\(([a-z]|[ivxlcdm]+)\)\s*([^\n]{30,500}?)(?=\([a-z]|[ivxlcdm]+\)|\n\n|$)', 'subclause'), | |
| ] | |
| for pattern, ref_type in patterns: | |
| matches = re.finditer(pattern, text, re.IGNORECASE | re.DOTALL) | |
| for match in matches: | |
| clause_text = match.group(2).strip() | |
| # Filter out boilerplate/definitions | |
| if not self._is_boilerplate(clause_text): | |
| # Check for meaningful content | |
| if self._has_meaningful_content(clause_text): | |
| candidates.append({'text' : clause_text, | |
| 'reference' : match.group(1).strip(), | |
| 'start' : match.start(), | |
| 'end' : match.end(), | |
| 'type' : 'structural', | |
| 'ref_type' : ref_type, | |
| }) | |
| # Remove overlapping clauses | |
| candidates = self._remove_overlapping(candidates) | |
| return candidates | |
| def _is_boilerplate(self, text: str) -> bool: | |
| """ | |
| Check if text is boilerplate/definitional rather than substantive | |
| """ | |
| boilerplate_indicators = ['shall mean', | |
| 'means and includes', | |
| 'defined as', | |
| 'definition of', | |
| 'hereinafter referred to', | |
| 'for purposes of this', | |
| 'interpretation of', | |
| 'as used in this', | |
| 'the term', | |
| 'shall include', | |
| 'includes but not limited', | |
| ] | |
| text_lower = text.lower() | |
| # Must have at least one strong indicator AND be definition-heavy | |
| has_indicator = any(indicator in text_lower for indicator in boilerplate_indicators) | |
| is_short_definition = len(text.split()) < 50 and '"' in text | |
| return has_indicator or is_short_definition | |
| def _has_meaningful_content(self, text: str) -> bool: | |
| """ | |
| Check if text has meaningful legal content | |
| """ | |
| # Must have minimum length | |
| if (len(text.split()) < 15): | |
| return False | |
| # Check for legal action verbs | |
| action_verbs = ['shall', | |
| 'must', | |
| 'will', | |
| 'may', | |
| 'agrees', | |
| 'undertakes', | |
| 'covenants', | |
| 'warrants', | |
| 'represents', | |
| 'acknowledges', | |
| 'certifies', | |
| 'indemnifies', | |
| 'waives', | |
| 'terminates', | |
| ] | |
| text_lower = text.lower() | |
| has_action = any(verb in text_lower for verb in action_verbs) | |
| # Check for legal subjects | |
| legal_subjects = ['party', | |
| 'parties', | |
| 'employee', | |
| 'employer', | |
| 'company', | |
| 'contractor', | |
| 'consultant', | |
| 'client', | |
| 'vendor', | |
| 'buyer', | |
| 'seller', | |
| 'landlord', | |
| 'tenant', | |
| 'licensor', | |
| 'licensee', | |
| ] | |
| has_subject = any(subj in text_lower for subj in legal_subjects) | |
| return has_action or has_subject | |
| def _remove_overlapping(self, candidates: List[Dict]) -> List[Dict]: | |
| """ | |
| Remove overlapping clause extractions | |
| """ | |
| if not candidates: | |
| return [] | |
| # Sort by start position | |
| candidates.sort(key = lambda x: x['start']) | |
| non_overlapping = [candidates[0]] | |
| for candidate in candidates[1:]: | |
| last = non_overlapping[-1] | |
| # Check if overlaps | |
| if (candidate['start'] >= last['end']): | |
| non_overlapping.append(candidate) | |
| elif (len(candidate['text']) > len(last['text'])): | |
| # Keep longer clause if overlapping | |
| non_overlapping[-1] = candidate | |
| return non_overlapping | |
| def _semantic_chunking(self, text: str, structural_clauses: List[Dict], chunk_size: int = 200) -> List[Dict]: | |
| """ | |
| Chunk unstructured text semantically uses sentence boundaries | |
| """ | |
| # Get covered ranges from structural clauses | |
| covered_ranges = [(c['start'], c['end']) for c in structural_clauses] | |
| # Split into sentences | |
| sentences = self.text_processor.extract_sentences(text) | |
| chunks = list() | |
| current_chunk = list() | |
| current_length = 0 | |
| current_start = 0 | |
| for sentence in sentences: | |
| # Check if sentence is already covered by structural extraction | |
| sentence_start = text.find(sentence, current_start) | |
| if (sentence_start == -1): | |
| continue | |
| if self._is_in_range(sentence_start, covered_ranges): | |
| current_start = sentence_start + len(sentence) | |
| continue | |
| current_chunk.append(sentence) | |
| current_length += len(sentence.split()) | |
| # Create chunk when reaching size limit | |
| if (current_length >= chunk_size): | |
| chunk_text = ' '.join(current_chunk).strip() | |
| if (len(chunk_text) >= 50) and (not self._is_boilerplate(chunk_text)): | |
| if self._has_meaningful_content(chunk_text): | |
| chunks.append({'text' : chunk_text, | |
| 'reference' : f'Semantic-{len(chunks)+1}', | |
| 'start' : sentence_start, | |
| 'end' : sentence_start + len(chunk_text), | |
| 'type' : 'semantic', | |
| 'ref_type' : 'semantic', | |
| }) | |
| current_chunk = list() | |
| current_length = 0 | |
| current_start = sentence_start + len(sentence) | |
| # Add final chunk if exists | |
| if current_chunk: | |
| chunk_text = ' '.join(current_chunk).strip() | |
| if ((len(chunk_text) >= 50) and (not self._is_boilerplate(chunk_text))): | |
| if self._has_meaningful_content(chunk_text): | |
| sentence_start = text.find(current_chunk[0]) | |
| chunks.append({'text' : chunk_text, | |
| 'reference' : f'Semantic-{len(chunks)+1}', | |
| 'start' : sentence_start, | |
| 'end' : sentence_start + len(chunk_text), | |
| 'type' : 'semantic', | |
| 'ref_type' : 'semantic', | |
| }) | |
| return chunks | |
| def _is_in_range(self, position: int, ranges: List[Tuple[int, int]]) -> bool: | |
| """ | |
| Check if position is within any of the ranges | |
| """ | |
| return any(start <= position <= end for start, end in ranges) | |
| def _classify_clauses_with_legal_bert(self, candidates: List[Dict]) -> List[ExtractedClause]: | |
| """ | |
| Classify clauses using Legal-BERT embeddings + keyword matching | |
| """ | |
| classified = list() | |
| for candidate in candidates: | |
| # Get Legal-BERT embedding for clause | |
| clause_embedding = self._get_legal_bert_embedding(text = candidate['text']) | |
| # Classify using hybrid approach | |
| category, confidence, legal_bert_score = self._classify_single_clause(text = candidate['text'], | |
| clause_embedding = clause_embedding, | |
| ) | |
| # Extract risk indicators | |
| risk_indicators = self._extract_risk_indicators(text = candidate['text']) | |
| # Extract sub-clauses if any | |
| subclauses = self._extract_subclauses(text = candidate['text']) | |
| classified.append(ExtractedClause(text = candidate['text'], | |
| reference = candidate['reference'], | |
| category = category, | |
| confidence = confidence, | |
| start_pos = candidate['start'], | |
| end_pos = candidate['end'], | |
| extraction_method = candidate['type'], | |
| risk_indicators = risk_indicators, | |
| embeddings = clause_embedding, | |
| subclauses = subclauses, | |
| legal_bert_score = legal_bert_score, | |
| ) | |
| ) | |
| return classified | |
| def _classify_single_clause(self, text: str, clause_embedding: np.ndarray) -> Tuple[str, float, float]: | |
| """ | |
| Classify single clause using Legal-BERT + keyword matching | |
| """ | |
| text_lower = text.lower() | |
| # Keyword matching | |
| keyword_scores = dict() | |
| for category, config in self.CLAUSE_CATEGORIES.items(): | |
| keywords = config['keywords'] | |
| weight = config['weight'] | |
| keyword_count = sum(1 for kw in keywords if kw in text_lower) | |
| keyword_scores[category] = (keyword_count / len(keywords)) * weight | |
| # Legal-BERT semantic similarity | |
| semantic_scores = dict() | |
| clause_embedding_tensor = torch.tensor(clause_embedding).unsqueeze(0) | |
| for category, category_embedding in self.category_embeddings.items(): | |
| category_embedding_tensor = torch.tensor(category_embedding).unsqueeze(0) | |
| similarity = torch.nn.functional.cosine_similarity(clause_embedding_tensor, category_embedding_tensor).item() | |
| semantic_scores[category] = similarity | |
| # Combine scores (70% semantic, 30% keyword) | |
| combined_scores = dict() | |
| for category in self.CLAUSE_CATEGORIES.keys(): | |
| combined = (semantic_scores.get(category, 0) * 0.70 + keyword_scores.get(category, 0) * 0.30) | |
| combined_scores[category] = combined | |
| # Get best category | |
| best_category = max(combined_scores, key = combined_scores.get) | |
| confidence = combined_scores[best_category] | |
| legal_bert_score = semantic_scores[best_category] | |
| return best_category, confidence, legal_bert_score | |
| def _extract_risk_indicators(self, text: str) -> List[str]: | |
| """ | |
| Extract risk indicator keywords from clause text using RiskRule with the central risk rules | |
| """ | |
| text_lower = text.lower() | |
| risk_indicators = list() | |
| # Check for matches against CRITICAL_KEYWORDS from RiskRules | |
| for keyword in self.risk_rules.CRITICAL_KEYWORDS.keys(): | |
| if keyword in text_lower: | |
| risk_indicators.append(keyword) | |
| # Check for matches against HIGH_RISK_KEYWORDS from RiskRules | |
| for keyword in self.risk_rules.HIGH_RISK_KEYWORDS.keys(): | |
| if keyword in text_lower: | |
| risk_indicators.append(keyword) | |
| # Check for matches against MEDIUM_RISK_KEYWORDS from RiskRules | |
| for keyword in self.risk_rules.MEDIUM_RISK_KEYWORDS.keys(): | |
| if keyword in text_lower: | |
| risk_indicators.append(keyword) | |
| # Check for matches against RISKY_PATTERNS from RiskRules | |
| for pattern, score, description in self.risk_rules.RISKY_PATTERNS: | |
| if re.search(pattern, text_lower): | |
| # Use the description from RiskRules as the indicator | |
| risk_indicators.append(description) | |
| # Remove duplicates while preserving order | |
| seen = set() | |
| unique_indicators = list() | |
| for indicator in risk_indicators: | |
| if indicator not in seen: | |
| seen.add(indicator) | |
| unique_indicators.append(indicator) | |
| return unique_indicators | |
| def _extract_subclauses(self, text: str) -> List[str]: | |
| """ | |
| Extract sub-clauses from main clause (e.g., (a), (b), (i), (ii)) | |
| """ | |
| # Pattern for sub-clauses: (a), (i), etc. | |
| subclause_pattern = r'\(([a-z]|[ivxlcdm]+)\)\s*([^()]{20,200}?)(?=\([a-z]|[ivxlcdm]+\)|$)' | |
| matches = re.findall(subclause_pattern, text, re.IGNORECASE) | |
| subclauses = list() | |
| for ref, subtext in matches: | |
| clean_text = subtext.strip() | |
| if (len(clean_text) >= 20): | |
| subclauses.append(f"({ref}) {clean_text}") | |
| # Max 25 sub-clauses | |
| return subclauses[:25] | |
| def _deduplicate_and_rank(self, clauses: List[ExtractedClause], max_clauses: int) -> List[ExtractedClause]: | |
| """ | |
| Remove duplicates and rank by confidence + legal_bert_score | |
| """ | |
| if not clauses: | |
| return [] | |
| # Sort by combined score (confidence * 0.6 + legal_bert_score * 0.4) | |
| clauses.sort(key = lambda x: (x.confidence * 0.6 + x.legal_bert_score * 0.4), reverse = True) | |
| # Deduplicate by text similarity | |
| unique_clauses = list() | |
| seen_texts = set() | |
| for clause in clauses: | |
| # Simple deduplication by first 100 chars | |
| text_key = clause.text[:100].lower().strip() | |
| # Also check similarity to already added clauses | |
| is_duplicate = False | |
| for existing in unique_clauses: | |
| similarity = self._text_similarity(clause.text, existing.text) | |
| if (similarity > 0.85): | |
| is_duplicate = True | |
| break | |
| if text_key not in seen_texts and not is_duplicate: | |
| unique_clauses.append(clause) | |
| seen_texts.add(text_key) | |
| if (len(unique_clauses) >= max_clauses): | |
| break | |
| return unique_clauses | |
| def _text_similarity(self, text1: str, text2: str) -> float: | |
| """ | |
| Calculate text similarity (simple Jaccard similarity) | |
| """ | |
| words1 = set(text1.lower().split()) | |
| words2 = set(text2.lower().split()) | |
| intersection = len(words1 & words2) | |
| union = len(words1 | words2) | |
| return intersection / union if union > 0 else 0.0 | |
| def get_category_distribution(self, clauses: List[ExtractedClause]) -> Dict[str, int]: | |
| """ | |
| Get distribution of clause categories | |
| """ | |
| distribution = defaultdict(int) | |
| for clause in clauses: | |
| distribution[clause.category] += 1 | |
| log_info("Clause category distribution", distribution=dict(distribution)) | |
| return dict(distribution) | |
| def get_high_risk_clauses(self, clauses: List[ExtractedClause]) -> List[ExtractedClause]: | |
| """ | |
| Get clauses with risk indicators | |
| """ | |
| risky = [c for c in clauses if c.risk_indicators] | |
| risky.sort(key = lambda x: len(x.risk_indicators), reverse = True) | |
| top_25_risky_clauses = risky[:25] | |
| return top_25_risky_clauses | |
| class RiskClauseExtractor: | |
| """ | |
| Risk-Focused Clause Extractor: Specifically for risk analysis using RiskRules framework for contract-type specific risk assessment | |
| This will be used for: Risk analysis, protection gap detection, contract-type specific assessment | |
| """ | |
| def __init__(self, model_loader: ModelLoader, contract_type: ContractType): | |
| """ | |
| Initialize risk-focused clause extractor | |
| Arguments: | |
| ---------- | |
| model_loader { ModelLoader } : ModelLoader instance | |
| contract_type { ContractType } : Contract type for risk rule adjustments | |
| """ | |
| self.model_loader = model_loader | |
| self.contract_type = contract_type | |
| self.risk_rules = RiskRules() | |
| # Models (lazy loaded) | |
| self.legal_bert_model = None | |
| self.legal_bert_tokenizer = None | |
| self.embedding_model = None | |
| self.device = None | |
| # Risk category embeddings | |
| self.risk_category_embeddings = dict() | |
| # Text processor | |
| self.text_processor = TextProcessor(use_spacy = False) | |
| # Logger | |
| self.logger = ContractAnalyzerLogger.get_logger() | |
| # Contract-type specific weights | |
| self.category_weights = self.risk_rules.get_adjusted_weights(contract_type) | |
| # Lazy load | |
| self._lazy_load() | |
| def _lazy_load(self): | |
| """ | |
| Lazy load models for risk analysis | |
| """ | |
| if self.legal_bert_model is None: | |
| try: | |
| log_info("Loading models for risk-focused clause extraction...") | |
| # Load Legal-BERT | |
| self.legal_bert_model, self.legal_bert_tokenizer = self.model_loader.load_legal_bert() | |
| self.device = self.model_loader.device | |
| # Load embedding model | |
| self.embedding_model = self.model_loader.load_embedding_model() | |
| # Prepare risk category embeddings | |
| self._prepare_risk_category_embeddings() | |
| log_info("Risk clause extractor models loaded successfully") | |
| except Exception as e: | |
| log_error(e, context = {"component": "RiskClauseExtractor", "operation": "model_loading"}) | |
| raise | |
| def _prepare_risk_category_embeddings(self): | |
| """ | |
| Prepare embeddings for risk categories using RiskRules framework | |
| """ | |
| log_info("Preparing risk category embeddings...") | |
| # Create representative texts for each risk category | |
| risk_category_texts = {'restrictive_covenants' : "Non-compete non-solicitation restrictive covenants competition limitations duration geographic scope industry restrictions", | |
| 'termination_rights' : "Termination notice period severance for cause without cause immediate termination at-will employment end of agreement", | |
| 'penalties_liability' : "Penalties liquidated damages liability limitations unlimited liability consequential damages indemnification hold harmless", | |
| 'compensation_benefits' : "Compensation salary wages benefits bonus commission equity stock options retirement health insurance paid time off", | |
| 'intellectual_property' : "Intellectual property IP ownership copyright patent trademark trade secrets work product inventions proprietary rights", | |
| 'confidentiality' : "Confidentiality non-disclosure proprietary information trade secrets protection secrecy confidential obligations", | |
| 'liability_indemnity' : "Liability indemnification hold harmless defense costs claims losses damages responsibility accountability", | |
| 'governing_law' : "Governing law jurisdiction venue dispute resolution arbitration mediation legal forum applicable law", | |
| 'payment_terms' : "Payment terms due date invoice billing net 30 late payment interest fees compensation remuneration", | |
| 'warranties' : "Warranties representations guarantees disclaimers merchantability fitness for purpose product quality service standards", | |
| 'dispute_resolution' : "Dispute resolution arbitration mediation litigation legal proceedings costs attorneys fees jurisdiction", | |
| 'assignment_change' : "Assignment transfer change control amendment modification consent approval successor parties", | |
| 'insurance' : "Insurance coverage liability insurance professional indemnity proof of insurance additional insured policy requirements", | |
| 'force_majeure' : "Force majeure act of god unforeseen circumstances beyond control natural disasters performance suspension" | |
| } | |
| for category, text in risk_category_texts.items(): | |
| embedding = self._get_legal_bert_embedding(text) | |
| self.risk_category_embeddings[category] = embedding | |
| log_info(f"Prepared risk embeddings for {len(self.risk_category_embeddings)} categories") | |
| def _get_legal_bert_embedding(self, text: str) -> np.ndarray: | |
| """ | |
| Get Legal-BERT embedding for risk analysis | |
| """ | |
| inputs = self.legal_bert_tokenizer(text, | |
| return_tensors = "pt", | |
| padding = True, | |
| truncation = True, | |
| max_length = 512, | |
| ).to(self.device) | |
| with torch.no_grad(): | |
| outputs = self.legal_bert_model(**inputs) | |
| cls_embedding = outputs.last_hidden_state[:, 0, :].cpu().numpy()[0] | |
| return cls_embedding | |
| def extract_risk_clauses(self, contract_text: str, max_clauses: int = 20) -> List[ExtractedClause]: | |
| """ | |
| Extract clauses specifically for risk analysis using RiskRules framework | |
| Process: | |
| 1. Focus on high-weight categories for this contract type | |
| 2. Use risk patterns from RiskRules | |
| 3. Calculate risk scores based on RiskRules factors | |
| 4. Prioritize clauses with high risk indicators | |
| Arguments: | |
| ---------- | |
| contract_text { str } : Full contract text | |
| max_clauses { int } : Maximum clauses to return | |
| Returns: | |
| -------- | |
| { list } : Risk-focused clauses with calculated risk scores | |
| """ | |
| log_info("Starting risk-focused clause extraction", | |
| contract_type = self.contract_type.value, | |
| max_clauses = max_clauses, | |
| ) | |
| # Use comprehensive extractor as base | |
| comprehensive_extractor = ComprehensiveClauseExtractor(self.model_loader) | |
| all_clauses = comprehensive_extractor.extract_clauses(contract_text = contract_text, | |
| max_clauses = 50, | |
| ) | |
| # Re-classify using risk framework | |
| risk_clauses = self._reclassify_with_risk_framework(clauses = all_clauses) | |
| # Calculate risk scores | |
| risk_clauses = self._calculate_risk_scores(clauses = risk_clauses) | |
| # Prioritize by risk score and contract-type relevance | |
| prioritized = self._prioritize_risk_clauses(clauses = risk_clauses, | |
| max_clauses = max_clauses, | |
| ) | |
| log_info(f"Extracted {len(prioritized)} risk-focused clauses") | |
| return prioritized | |
| def _reclassify_with_risk_framework(self, clauses: List[ExtractedClause]) -> List[ExtractedClause]: | |
| """ | |
| Re-classify clauses using RiskRules categories and weights | |
| """ | |
| risk_classified = list() | |
| for clause in clauses: | |
| # Map to risk categories and calculate relevance | |
| risk_category, risk_confidence = self._classify_with_risk_categories(text = clause.text) | |
| # Update clause with risk classification | |
| clause.category = risk_category | |
| clause.confidence = risk_confidence | |
| risk_classified.append(clause) | |
| return risk_classified | |
| def _classify_with_risk_categories(self, text: str) -> Tuple[str, float]: | |
| """ | |
| Classify text using RiskRules categories with contract-type weights | |
| """ | |
| text_lower = text.lower() | |
| # Keyword matching with risk categories | |
| keyword_scores = dict() | |
| for risk_category in self.risk_rules.CATEGORY_WEIGHTS.keys(): | |
| # Get keywords from risk rules patterns | |
| keywords = self._get_keywords_for_risk_category(risk_category = risk_category) | |
| keyword_count = sum(1 for kw in keywords if kw in text_lower) | |
| base_score = (keyword_count / max(len(keywords), 1)) * 100 | |
| # Apply contract-type specific weight | |
| weight = self.category_weights.get(risk_category, 1.0) | |
| keyword_scores[risk_category] = base_score * weight | |
| # Legal-BERT similarity with risk categories | |
| semantic_scores = dict() | |
| text_embedding = self._get_legal_bert_embedding(text = text) | |
| text_tensor = torch.tensor(text_embedding).unsqueeze(0) | |
| for risk_category, category_embedding in self.risk_category_embeddings.items(): | |
| cat_tensor = torch.tensor(category_embedding).unsqueeze(0) | |
| similarity = torch.nn.functional.cosine_similarity(text_tensor, cat_tensor).item() | |
| semantic_scores[risk_category] = similarity * 100 # Scale to 0-100 | |
| # Combine scores (60% semantic, 40% keyword) | |
| combined_scores = dict() | |
| for risk_category in self.risk_rules.CATEGORY_WEIGHTS.keys(): | |
| combined = (semantic_scores.get(risk_category, 0) * 0.6 + keyword_scores.get(risk_category, 0) * 0.4) | |
| combined_scores[risk_category] = combined | |
| # Get best category | |
| best_category = max(combined_scores, key = combined_scores.get) | |
| # Normalize to 0-1 | |
| confidence = min(combined_scores[best_category] / 100, 1.0) | |
| return best_category, confidence | |
| def _get_keywords_for_risk_category(self, risk_category: str) -> List[str]: | |
| """ | |
| Get relevant keywords for a risk category from RiskRules patterns | |
| """ | |
| # Map risk categories to relevant keywords from RiskRules | |
| keyword_map = {'restrictive_covenants' : ['non-compete', 'non-solicit', 'restrictive', 'covenant', 'competition', 'geographic', 'duration'], | |
| 'termination_rights' : ['termination', 'notice', 'severance', 'dismissal', 'resignation', 'for cause', 'without cause'], | |
| 'penalties_liability' : ['penalty', 'liquidated damages', 'liability', 'indemnification', 'hold harmless', 'damages'], | |
| 'compensation_benefits' : ['compensation', 'salary', 'benefits', 'bonus', 'commission', 'equity', 'insurance'], | |
| 'intellectual_property' : ['intellectual property', 'ip', 'copyright', 'patent', 'trademark', 'inventions'], | |
| 'confidentiality' : ['confidential', 'proprietary', 'trade secret', 'non-disclosure'], | |
| 'liability_indemnity' : ['liability', 'indemnification', 'hold harmless', 'defend', 'claims'], | |
| 'governing_law' : ['governing law', 'jurisdiction', 'venue', 'dispute resolution'], | |
| 'payment_terms' : ['payment', 'due', 'invoice', 'net 30', 'late payment'], | |
| 'warranties' : ['warranty', 'representation', 'guarantee', 'disclaimer'], | |
| 'dispute_resolution' : ['arbitration', 'mediation', 'dispute', 'litigation'], | |
| 'assignment_change' : ['assignment', 'transfer', 'amendment', 'modification'], | |
| 'insurance' : ['insurance', 'coverage', 'policy', 'insured'], | |
| 'force_majeure' : ['force majeure', 'act of god', 'beyond control'], | |
| } | |
| return keyword_map.get(risk_category, []) | |
| def _calculate_risk_scores(self, clauses: List[ExtractedClause]) -> List[ExtractedClause]: | |
| """ | |
| Calculate risk scores for clauses based on RiskRules factors | |
| """ | |
| for clause in clauses: | |
| risk_score = self._calculate_single_clause_risk(clause = clause) | |
| clause.risk_score = risk_score | |
| return clauses | |
| def _calculate_single_clause_risk(self, clause: ExtractedClause) -> float: | |
| """ | |
| Calculate risk score using RiskRules framework | |
| """ | |
| base_score = 0.0 | |
| text_lower = clause.text.lower() | |
| # Base risk from category weight (adjusted for contract type) | |
| category_weight = self.category_weights.get(clause.category, 1.0) | |
| base_score += category_weight | |
| # Add risk from CLAUSE_RISK_FACTORS (red flags) | |
| factor_config = self.risk_rules.CLAUSE_RISK_FACTORS.get(clause.category) | |
| if factor_config: | |
| for red_flag, adjustment in factor_config["red_flags"].items(): | |
| if red_flag in text_lower: | |
| base_score += adjustment | |
| # Add risk from RISKY_PATTERNS (with actual scores) | |
| for pattern, score, description in self.risk_rules.RISKY_PATTERNS: | |
| if re.search(pattern, text_lower): | |
| base_score += score | |
| # Add risk from CRITICAL_KEYWORDS | |
| for keyword, risk_score in self.risk_rules.CRITICAL_KEYWORDS.items(): | |
| if re.search(rf'\b{re.escape(keyword)}\b', text_lower): | |
| base_score += risk_score | |
| # Cap final score at 100 | |
| return min(max(base_score, 0), 100) | |
| def _extract_risk_indicators(self, text: str) -> List[str]: | |
| """ | |
| Extract risk indicators using RiskRules patterns | |
| """ | |
| text_lower = text.lower() | |
| indicators = list() | |
| # Check critical risk patterns | |
| for pattern, score, description in self.risk_rules.RISKY_PATTERNS: | |
| if re.search(pattern, text_lower): | |
| indicators.append(description) | |
| # Check keyword risk indicators | |
| for indicator in self.risk_rules.CRITICAL_KEYWORDS.keys(): | |
| if indicator in text_lower: | |
| indicators.append(indicator) | |
| for indicator in self.risk_rules.HIGH_RISK_KEYWORDS.keys(): | |
| if indicator in text_lower: | |
| indicators.append(indicator) | |
| return indicators | |
| def _check_risk_patterns(self, text: str) -> float: | |
| """ | |
| Check for high-risk patterns from RiskRules | |
| """ | |
| text_lower = text.lower() | |
| pattern_risk = 0.0 | |
| # Check risky patterns | |
| for pattern, score, description in self.risk_rules.RISKY_PATTERNS: | |
| if re.search(pattern, text_lower): | |
| pattern_risk += score | |
| # Cap pattern risk | |
| return min(pattern_risk, 20) | |
| def _prioritize_risk_clauses(self, clauses: List[ExtractedClause], max_clauses: int) -> List[ExtractedClause]: | |
| """ | |
| Prioritize clauses by risk score and contract-type relevance | |
| """ | |
| # Sort by risk score (descending) | |
| clauses.sort(key = lambda x: x.risk_score, reverse = True) | |
| # Take top clauses | |
| return clauses[:max_clauses] | |
| def detect_missing_protections(self, extracted_clauses: List[ExtractedClause]) -> List[Dict]: | |
| """ | |
| Detect missing critical protections based on contract type | |
| """ | |
| missing = list() | |
| checklist = self.risk_rules.PROTECTION_CHECKLIST | |
| for protection, config in checklist.items(): | |
| if not self._has_protection(extracted_clauses, protection, config['categories']): | |
| missing.append({"protection" : protection, | |
| "importance" : config['importance'], | |
| "risk_if_missing" : config['risk_if_missing'], | |
| "categories" : config['categories'], | |
| }) | |
| log_info(f"Detected {len(missing)} missing protections") | |
| return missing | |
| def _has_protection(self, clauses: List[ExtractedClause], protection: str, categories: List[str]) -> bool: | |
| """ | |
| Check if protection exists in extracted clauses | |
| """ | |
| protection_patterns = {'for_cause_definition' : ['for cause', 'cause defined', 'termination for cause'], | |
| 'severance_provision' : ['severance', 'severance pay', 'termination benefits'], | |
| 'mutual_indemnification' : ['mutual indemnification', 'both parties indemnify'], | |
| 'liability_cap' : ['liability cap', 'limited liability', 'maximum liability'], | |
| 'prior_ip_exclusion' : ['prior inventions', 'pre-existing ip', 'prior intellectual property'], | |
| 'confidentiality_duration' : ['confidentiality period', 'duration of confidentiality'], | |
| 'dispute_resolution' : ['dispute resolution', 'arbitration', 'mediation'], | |
| 'change_control_process' : ['change control', 'amendment process', 'modification procedure'], | |
| 'insurance_requirements' : ['insurance requirements', 'maintain insurance'], | |
| 'force_majeure' : ['force majeure', 'act of god'], | |
| } | |
| patterns = protection_patterns.get(protection, []) | |
| for clause in clauses: | |
| if clause.category in categories: | |
| text_lower = clause.text.lower() | |
| if any(pattern in text_lower for pattern in patterns): | |
| return True | |
| return False |