contract-guard-ai / services /clause_extractor.py
satyakimitra's picture
Final Repor Updated
522f7a0
# DEPENDENCIES
import re
import sys
import torch
import numpy as np
from typing import Any
from typing import List
from typing import Dict
from typing import Tuple
from pathlib import Path
from typing import Optional
from collections import defaultdict
from sentence_transformers import util
# Import utilities
sys.path.append(str(Path(__file__).parent.parent))
from utils.logger import log_info
from utils.logger import log_error
from config.risk_rules import RiskRules
from config.risk_rules import ContractType
from utils.text_processor import TextProcessor
from utils.logger import ContractAnalyzerLogger
from services.data_models import ExtractedClause
from model_manager.model_loader import ModelLoader
from services.data_models import ClauseInterpretation
class ComprehensiveClauseExtractor:
"""
For general clause extraction across all contract types : Extracts and classifies clauses using Legal-BERT + structural patterns
Will be used for: General document analysis, clause discovery, contract understanding
"""
# COMPREHENSIVE CLAUSE CATEGORIES COVERING ALL LEGAL AREAS
CLAUSE_CATEGORIES = {'compensation' : {'keywords' : ['salary', 'wage', 'compensation', 'pay', 'payment', 'bonus', 'commission', 'remuneration', 'fee', 'rate', 'benefits', 'equity', 'stock options', 'incentive'],
'representative_text' : ("The Employee shall receive an annual base salary of One Hundred Thousand Dollars payable in accordance with the Company's standard payroll practices. Additional compensation may include performance bonuses and stock options."),
'weight' : 1.0,
},
'termination' : {'keywords' : ['termination', 'terminate', 'notice period', 'resignation', 'dismissal', 'severance', 'end of employment', 'cessation', 'notice', 'for cause', 'without cause'],
'representative_text' : ("Either party may terminate this Agreement upon thirty days written notice. The Company may terminate for cause immediately upon written notice to Employee. Upon termination, Employee shall receive severance compensation."),
'weight' : 1.2,
},
'non_compete' : {'keywords' : ['non-compete', 'non-solicit', 'non-solicitation', 'restrictive covenant', 'competitive', 'competition', 'competing business', 'competitive activities', 'non-competition'],
'representative_text' : ("Employee agrees not to engage in any competitive business activities for a period of twelve months following termination within a fifty-mile radius. Employee shall not solicit Company clients or employees during this period."),
'weight' : 1.5,
},
'confidentiality' : {'keywords' : ['confidential', 'proprietary', 'trade secret', 'disclosure', 'confidentiality', 'secret', 'private', 'non-disclosure', 'protected information'],
'representative_text' : ("Employee shall maintain the confidentiality of all proprietary information and trade secrets of the Company. Confidential Information includes business plans, customer lists, and technical data. These obligations survive termination."),
'weight' : 1.1,
},
'indemnification' : {'keywords' : ['indemnify', 'indemnification', 'hold harmless', 'defend', 'liability', 'claims', 'losses', 'damages', 'indemnity'],
'representative_text' : ("Party A shall indemnify and hold harmless Party B from any claims, losses, or damages arising from Party A's breach or negligence. This indemnification includes reasonable attorneys' fees and costs of defense."),
'weight' : 1.3,
},
'intellectual_property' : {'keywords' : ['intellectual property', 'ip', 'copyright', 'patent', 'trademark', 'work product', 'inventions', 'creation', 'ownership', 'ip rights', 'proprietary rights'],
'representative_text' : ("All work product and inventions created by Employee during employment shall be the exclusive property of the Company. Employee assigns all intellectual property rights including patents, copyrights, and trade secrets to the Company."),
'weight' : 1.2,
},
'liability' : {'keywords' : ['liable', 'liability', 'damages', 'limitation', 'consequential', 'indirect', 'punitive', 'cap', 'limited liability', 'responsibility'],
'representative_text' : ("In no event shall either party be liable for indirect, incidental, or consequential damages. Total liability under this Agreement shall not exceed the amounts paid in the twelve months preceding the claim."),
'weight' : 1.2,
},
'warranty' : {'keywords' : ['warranty', 'warrant', 'representation', 'guarantee', 'assurance', 'promise', 'warranties', 'guaranty'],
'representative_text' : ("Company warrants that the Services will be performed in a professional manner. EXCEPT AS EXPRESSLY PROVIDED, COMPANY DISCLAIMS ALL WARRANTIES, EXPRESS OR IMPLIED, INCLUDING WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE."),
'weight' : 0.9,
},
'dispute_resolution' : {'keywords' : ['arbitration', 'mediation', 'dispute', 'jurisdiction', 'governing law', 'venue', 'forum', 'resolution', 'litigation'],
'representative_text' : ("Any disputes arising under this Agreement shall be resolved through binding arbitration in accordance with the rules of the American Arbitration Association. This Agreement shall be governed by the laws of the State of California."),
'weight' : 0.9,
},
'insurance' : {'keywords' : ['insurance', 'coverage', 'insured', 'policy', 'premium', 'insurer', 'liability insurance'],
'representative_text' : ("Contractor shall maintain general liability insurance with minimum coverage of one million dollars per occurrence. Proof of insurance shall be provided to Client. Company shall be named as additional insured on all policies."),
'weight' : 0.8,
},
'assignment' : {'keywords' : ['assignment', 'assign', 'transfer', 'successor', 'binding', 'assignee', 'assignor'],
'representative_text' : ("This Agreement may not be assigned by either party without the prior written consent of the other party. This Agreement shall be binding upon and inure to the benefit of the parties' successors and permitted assigns."),
'weight' : 0.8,
},
'amendment' : {'keywords' : ['amendment', 'modify', 'modification', 'change', 'alteration', 'waiver', 'amend'],
'representative_text' : ("This Agreement may not be amended or modified except by written instrument signed by both parties. No waiver of any provision shall be effective unless in writing. All modifications must be mutually agreed upon."),
'weight' : 0.7,
},
'force_majeure' : {'keywords' : ['force majeure', 'act of god', 'unforeseeable', 'beyond control', 'natural disaster', 'unforeseen circumstances'],
'representative_text' : ("Neither party shall be liable for failure to perform due to causes beyond its reasonable control including acts of God, war, strikes, or natural disasters. Performance shall be suspended during the force majeure event."),
'weight' : 0.7,
},
'entire_agreement' : {'keywords' : ['entire agreement', 'integration', 'supersedes', 'prior agreements', 'complete agreement', 'whole agreement'],
'representative_text' : ("This Agreement constitutes the entire agreement between the parties and supersedes all prior agreements, whether written or oral. No other representations or warranties shall be binding unless incorporated herein."),
'weight' : 0.6,
},
'payment_terms' : {'keywords' : ['payment terms', 'net 30', 'due date', 'invoice', 'billing', 'payment due', 'late payment', 'interest'],
'representative_text' : ("Payment shall be due within thirty days of invoice date. Late payments shall accrue interest at the rate of 1.5% per month. All payments shall be made in US dollars."),
'weight' : 0.9,
},
'governing_law' : {'keywords' : ['governing law', 'jurisdiction', 'venue', 'applicable law', 'state law', 'federal law'],
'representative_text' : ("This Agreement shall be governed by and construed in accordance with the laws of the State of Delaware. Any legal action shall be brought in the state or federal courts located in Wilmington, Delaware."),
'weight' : 0.8,
},
'general' : {'keywords' : ['provision', 'term', 'condition', 'obligation', 'requirement', 'clause', 'section'],
'representative_text' : ("The parties agree to the following terms and conditions governing their relationship. Each party shall perform its obligations in good faith and in accordance with industry standards and applicable law."),
'weight' : 0.5,
}
}
def __init__(self, model_loader: ModelLoader):
"""
Initialize comprehensive clause extractor
Arguments:
----------
model_loader { ModelLoader } : ModelLoader instance for accessing Legal-BERT
"""
self.model_loader = model_loader
# Models (lazy loaded)
self.legal_bert_model = None
self.legal_bert_tokenizer = None
self.embedding_model = None
self.device = None
# Category embeddings (computed from representative texts)
self.category_embeddings = dict()
# Text processor
self.text_processor = TextProcessor(use_spacy = False)
# Logger
self.logger = ContractAnalyzerLogger.get_logger()
# Lazy load
self._lazy_load()
# Risk Rules
self.risk_rules = RiskRules()
def _lazy_load(self):
"""
Lazy load Legal-BERT and embedding models
"""
if self.legal_bert_model is None:
try:
log_info("Loading Legal-BERT for comprehensive clause extraction...")
# Load Legal-BERT (nlpaueb/legal-bert-base-uncased)
self.legal_bert_model, self.legal_bert_tokenizer = self.model_loader.load_legal_bert()
self.device = self.model_loader.device
# Load sentence transformer for embeddings
self.embedding_model = self.model_loader.load_embedding_model()
# Prepare category embeddings using Legal-BERT
self._prepare_category_embeddings()
log_info("Comprehensive clause extractor models loaded successfully")
except Exception as e:
log_error(e, context = {"component": "ComprehensiveClauseExtractor", "operation": "model_loading"})
raise
def _prepare_category_embeddings(self):
"""
Pre-compute Legal-BERT embeddings for category representative texts
"""
log_info("Computing Legal-BERT embeddings for comprehensive clause categories...")
for category, config in self.CLAUSE_CATEGORIES.items():
representative_text = config['representative_text']
# Get Legal-BERT embedding (using [CLS] token)
embedding = self._get_legal_bert_embedding(text = representative_text)
self.category_embeddings[category] = embedding
log_info(f"Prepared Legal-BERT embeddings for {len(self.category_embeddings)} categories")
def _get_legal_bert_embedding(self, text: str) -> np.ndarray:
"""
Get Legal-BERT embedding for text using [CLS] token
"""
# Tokenize
inputs = self.legal_bert_tokenizer(text,
return_tensors = "pt",
padding = True,
truncation = True,
max_length = 512,
).to(self.device)
# Get embeddings
with torch.no_grad():
outputs = self.legal_bert_model(**inputs)
# Use [CLS] token embedding (first token)
cls_embedding = outputs.last_hidden_state[:, 0, :].cpu().numpy()[0]
return cls_embedding
@ContractAnalyzerLogger.log_execution_time("extract_clauses")
def extract_clauses(self, contract_text: str, max_clauses: int = 25) -> List[ExtractedClause]:
"""
Extract and classify clauses from contract using hybrid approach
Process:
1. Structural extraction (numbered sections)
2. Semantic chunking (for unstructured text)
3. Legal-BERT classification
4. Deduplicate and rank by confidence
Arguments:
----------
contract_text { str } : Full contract text
max_clauses { int } : Maximum number of clauses to return
Returns:
--------
{ list } : List of ExtractedClause objects sorted by confidence
"""
log_info("Starting comprehensive clause extraction",
text_length = len(contract_text),
max_clauses = max_clauses,
)
# Extract using structural patterns
structural_clauses = self._extract_structural_clauses(contract_text)
log_info(f"Extracted {len(structural_clauses)} structural clauses")
# Semantic chunking for unstructured parts
semantic_chunks = self._semantic_chunking(contract_text, structural_clauses)
log_info(f"Created {len(semantic_chunks)} semantic chunks")
# Combine all candidates
all_candidates = structural_clauses + semantic_chunks
log_info(f"Total candidates: {len(all_candidates)}")
# Classify with Legal-BERT
classified_clauses = self._classify_clauses_with_legal_bert(all_candidates)
log_info(f"Classified {len(classified_clauses)} clauses")
# Deduplicate and rank
final_clauses = self._deduplicate_and_rank(classified_clauses, max_clauses)
log_info(f"Final output: {len(final_clauses)} clauses")
return final_clauses
def generate_clause_analysis(self, clause: ExtractedClause, llm_interpretation: ClauseInterpretation = None) -> Dict[str, str]:
"""
Generate analysis and recommendation for a clause
Arguments:
----------
clause { ExtractedClause } : ExtractedClause object
llm_interpretation { ClauseInterpretation } : Optional ClauseInterpretation from LLM
Returns:
--------
{ dict } : Dictionary with 'analysis' and 'recommendation' keys
"""
if llm_interpretation:
# Use LLM interpretation if available
analysis = llm_interpretation.plain_english_summary
# Combine key points into analysis
if llm_interpretation.key_points:
analysis += " " + " ".join(llm_interpretation.key_points[:2])
# Combine potential risks into analysis
if llm_interpretation.potential_risks:
risk_text = " Key risks: " + ", ".join(llm_interpretation.potential_risks[:2])
analysis += risk_text
# Use suggested improvements as recommendation
if llm_interpretation.suggested_improvements:
recommendation = " ".join(llm_interpretation.suggested_improvements[:2])
else:
recommendation = "Review this clause with legal counsel for specific recommendations."
else:
# Fallback: Generate analysis from risk indicators and category
risk_indicators = clause.risk_indicators if clause.risk_indicators else []
risk_score = getattr(clause, 'risk_score', 0)
# Generate specific analysis based on category and risk
analysis = self._generate_fallback_analysis(clause = clause,
risk_indicators = risk_indicators,
risk_score = risk_score,
)
recommendation = self._generate_fallback_recommendation(clause = clause,
risk_indicators = risk_indicators,
risk_score = risk_score,
)
return {'analysis' : analysis,
'recommendation' : recommendation,
}
def _generate_fallback_analysis(self, clause: ExtractedClause, risk_indicators: List[str], risk_score: float) -> str:
"""
Generate fallback analysis when LLM unavailable
"""
category_analyses = {'compensation' : f"This compensation clause {'contains concerning terms' if risk_score > 50 else 'appears standard'} regarding payment obligations and structures. ",
'termination' : f"This termination clause {'creates significant imbalance' if risk_score > 60 else 'establishes'} the conditions and procedures for ending the agreement. ",
'non_compete' : f"This restrictive covenant {'is overly broad and' if risk_score > 60 else ''} limits future business activities and employment opportunities. ",
'confidentiality' : f"This confidentiality provision {'has excessive scope' if risk_score > 50 else 'defines'} the obligations to protect sensitive information. ",
'indemnification' : f"This indemnification clause {'creates one-sided liability exposure' if risk_score > 60 else 'allocates'} responsibility for claims and losses. ",
'intellectual_property' : f"This IP clause {'may claim overly broad ownership' if risk_score > 50 else 'addresses'} rights to work product and inventions. ",
'liability' : f"This liability provision {'lacks adequate caps or limitations' if risk_score > 60 else 'establishes'} the financial exposure for damages. ",
}
analysis = category_analyses.get(clause.category, f"This {clause.category} clause establishes specific rights and obligations. ")
# Add risk-specific details
if risk_indicators:
analysis += f"Specific concerns include: {', '.join(risk_indicators[:3])}. "
if (risk_score > 70):
analysis += "This clause requires immediate attention and likely modification."
elif (risk_score > 50):
analysis += "This clause should be reviewed carefully and potentially negotiated."
else:
analysis += "This clause appears to contain standard provisions for this type of agreement."
return analysis
def _generate_fallback_recommendation(self, clause: ExtractedClause, risk_indicators: List[str], risk_score: float) -> str:
"""
Generate fallback recommendation when LLM unavailable
"""
if (risk_score > 70):
return f"Strongly recommend negotiating substantial changes to this clause. Seek legal counsel to address the identified risks and ensure your interests are protected."
elif (risk_score > 50):
return f"Negotiate modifications to balance the terms more fairly. Consider adding protective language or limiting the scope of obligations."
elif (risk_score > 30):
return f"Review with legal counsel to ensure the terms are clear and acceptable. Minor clarifications may be beneficial."
else:
return f"Standard clause - review for consistency with the overall agreement and your business needs."
def _extract_structural_clauses(self, text: str) -> List[Dict]:
"""
Extract clauses using structural numbering patterns
"""
candidates = list()
# Clean text
text = re.sub(r'\s+', ' ', text)
# Patterns for legal numbering
patterns = [(r'(\d+\.\d+(?:\.\d+)*)\.\s*([^\n]{30,800}?)(?=\d+\.\d+(?:\.\d+)*\.|$)', 'numbered'),
(r'(Article\s+(?:\d+(?:\.\d+)*|[IVXLCDM]+))\.\s*([^\n]{30,800}?)(?=Article\s+(?:\d+|[IVXLCDM]+)|$)', 'article'),
(r'(Section\s+\d+(?:\.\d+)*)\.\s*([^\n]{30,800}?)(?=Section\s+\d+|$)', 'section'),
(r'(Clause\s+\d+(?:\.\d+)*)\.\s*([^\n]{30,800}?)(?=Clause\s+\d+|$)', 'clause'),
(r'\(([a-z]|[ivxlcdm]+)\)\s*([^\n]{30,500}?)(?=\([a-z]|[ivxlcdm]+\)|\n\n|$)', 'subclause'),
]
for pattern, ref_type in patterns:
matches = re.finditer(pattern, text, re.IGNORECASE | re.DOTALL)
for match in matches:
clause_text = match.group(2).strip()
# Filter out boilerplate/definitions
if not self._is_boilerplate(clause_text):
# Check for meaningful content
if self._has_meaningful_content(clause_text):
candidates.append({'text' : clause_text,
'reference' : match.group(1).strip(),
'start' : match.start(),
'end' : match.end(),
'type' : 'structural',
'ref_type' : ref_type,
})
# Remove overlapping clauses
candidates = self._remove_overlapping(candidates)
return candidates
def _is_boilerplate(self, text: str) -> bool:
"""
Check if text is boilerplate/definitional rather than substantive
"""
boilerplate_indicators = ['shall mean',
'means and includes',
'defined as',
'definition of',
'hereinafter referred to',
'for purposes of this',
'interpretation of',
'as used in this',
'the term',
'shall include',
'includes but not limited',
]
text_lower = text.lower()
# Must have at least one strong indicator AND be definition-heavy
has_indicator = any(indicator in text_lower for indicator in boilerplate_indicators)
is_short_definition = len(text.split()) < 50 and '"' in text
return has_indicator or is_short_definition
def _has_meaningful_content(self, text: str) -> bool:
"""
Check if text has meaningful legal content
"""
# Must have minimum length
if (len(text.split()) < 15):
return False
# Check for legal action verbs
action_verbs = ['shall',
'must',
'will',
'may',
'agrees',
'undertakes',
'covenants',
'warrants',
'represents',
'acknowledges',
'certifies',
'indemnifies',
'waives',
'terminates',
]
text_lower = text.lower()
has_action = any(verb in text_lower for verb in action_verbs)
# Check for legal subjects
legal_subjects = ['party',
'parties',
'employee',
'employer',
'company',
'contractor',
'consultant',
'client',
'vendor',
'buyer',
'seller',
'landlord',
'tenant',
'licensor',
'licensee',
]
has_subject = any(subj in text_lower for subj in legal_subjects)
return has_action or has_subject
def _remove_overlapping(self, candidates: List[Dict]) -> List[Dict]:
"""
Remove overlapping clause extractions
"""
if not candidates:
return []
# Sort by start position
candidates.sort(key = lambda x: x['start'])
non_overlapping = [candidates[0]]
for candidate in candidates[1:]:
last = non_overlapping[-1]
# Check if overlaps
if (candidate['start'] >= last['end']):
non_overlapping.append(candidate)
elif (len(candidate['text']) > len(last['text'])):
# Keep longer clause if overlapping
non_overlapping[-1] = candidate
return non_overlapping
def _semantic_chunking(self, text: str, structural_clauses: List[Dict], chunk_size: int = 200) -> List[Dict]:
"""
Chunk unstructured text semantically uses sentence boundaries
"""
# Get covered ranges from structural clauses
covered_ranges = [(c['start'], c['end']) for c in structural_clauses]
# Split into sentences
sentences = self.text_processor.extract_sentences(text)
chunks = list()
current_chunk = list()
current_length = 0
current_start = 0
for sentence in sentences:
# Check if sentence is already covered by structural extraction
sentence_start = text.find(sentence, current_start)
if (sentence_start == -1):
continue
if self._is_in_range(sentence_start, covered_ranges):
current_start = sentence_start + len(sentence)
continue
current_chunk.append(sentence)
current_length += len(sentence.split())
# Create chunk when reaching size limit
if (current_length >= chunk_size):
chunk_text = ' '.join(current_chunk).strip()
if (len(chunk_text) >= 50) and (not self._is_boilerplate(chunk_text)):
if self._has_meaningful_content(chunk_text):
chunks.append({'text' : chunk_text,
'reference' : f'Semantic-{len(chunks)+1}',
'start' : sentence_start,
'end' : sentence_start + len(chunk_text),
'type' : 'semantic',
'ref_type' : 'semantic',
})
current_chunk = list()
current_length = 0
current_start = sentence_start + len(sentence)
# Add final chunk if exists
if current_chunk:
chunk_text = ' '.join(current_chunk).strip()
if ((len(chunk_text) >= 50) and (not self._is_boilerplate(chunk_text))):
if self._has_meaningful_content(chunk_text):
sentence_start = text.find(current_chunk[0])
chunks.append({'text' : chunk_text,
'reference' : f'Semantic-{len(chunks)+1}',
'start' : sentence_start,
'end' : sentence_start + len(chunk_text),
'type' : 'semantic',
'ref_type' : 'semantic',
})
return chunks
def _is_in_range(self, position: int, ranges: List[Tuple[int, int]]) -> bool:
"""
Check if position is within any of the ranges
"""
return any(start <= position <= end for start, end in ranges)
def _classify_clauses_with_legal_bert(self, candidates: List[Dict]) -> List[ExtractedClause]:
"""
Classify clauses using Legal-BERT embeddings + keyword matching
"""
classified = list()
for candidate in candidates:
# Get Legal-BERT embedding for clause
clause_embedding = self._get_legal_bert_embedding(text = candidate['text'])
# Classify using hybrid approach
category, confidence, legal_bert_score = self._classify_single_clause(text = candidate['text'],
clause_embedding = clause_embedding,
)
# Extract risk indicators
risk_indicators = self._extract_risk_indicators(text = candidate['text'])
# Extract sub-clauses if any
subclauses = self._extract_subclauses(text = candidate['text'])
classified.append(ExtractedClause(text = candidate['text'],
reference = candidate['reference'],
category = category,
confidence = confidence,
start_pos = candidate['start'],
end_pos = candidate['end'],
extraction_method = candidate['type'],
risk_indicators = risk_indicators,
embeddings = clause_embedding,
subclauses = subclauses,
legal_bert_score = legal_bert_score,
)
)
return classified
def _classify_single_clause(self, text: str, clause_embedding: np.ndarray) -> Tuple[str, float, float]:
"""
Classify single clause using Legal-BERT + keyword matching
"""
text_lower = text.lower()
# Keyword matching
keyword_scores = dict()
for category, config in self.CLAUSE_CATEGORIES.items():
keywords = config['keywords']
weight = config['weight']
keyword_count = sum(1 for kw in keywords if kw in text_lower)
keyword_scores[category] = (keyword_count / len(keywords)) * weight
# Legal-BERT semantic similarity
semantic_scores = dict()
clause_embedding_tensor = torch.tensor(clause_embedding).unsqueeze(0)
for category, category_embedding in self.category_embeddings.items():
category_embedding_tensor = torch.tensor(category_embedding).unsqueeze(0)
similarity = torch.nn.functional.cosine_similarity(clause_embedding_tensor, category_embedding_tensor).item()
semantic_scores[category] = similarity
# Combine scores (70% semantic, 30% keyword)
combined_scores = dict()
for category in self.CLAUSE_CATEGORIES.keys():
combined = (semantic_scores.get(category, 0) * 0.70 + keyword_scores.get(category, 0) * 0.30)
combined_scores[category] = combined
# Get best category
best_category = max(combined_scores, key = combined_scores.get)
confidence = combined_scores[best_category]
legal_bert_score = semantic_scores[best_category]
return best_category, confidence, legal_bert_score
def _extract_risk_indicators(self, text: str) -> List[str]:
"""
Extract risk indicator keywords from clause text using RiskRule with the central risk rules
"""
text_lower = text.lower()
risk_indicators = list()
# Check for matches against CRITICAL_KEYWORDS from RiskRules
for keyword in self.risk_rules.CRITICAL_KEYWORDS.keys():
if keyword in text_lower:
risk_indicators.append(keyword)
# Check for matches against HIGH_RISK_KEYWORDS from RiskRules
for keyword in self.risk_rules.HIGH_RISK_KEYWORDS.keys():
if keyword in text_lower:
risk_indicators.append(keyword)
# Check for matches against MEDIUM_RISK_KEYWORDS from RiskRules
for keyword in self.risk_rules.MEDIUM_RISK_KEYWORDS.keys():
if keyword in text_lower:
risk_indicators.append(keyword)
# Check for matches against RISKY_PATTERNS from RiskRules
for pattern, score, description in self.risk_rules.RISKY_PATTERNS:
if re.search(pattern, text_lower):
# Use the description from RiskRules as the indicator
risk_indicators.append(description)
# Remove duplicates while preserving order
seen = set()
unique_indicators = list()
for indicator in risk_indicators:
if indicator not in seen:
seen.add(indicator)
unique_indicators.append(indicator)
return unique_indicators
def _extract_subclauses(self, text: str) -> List[str]:
"""
Extract sub-clauses from main clause (e.g., (a), (b), (i), (ii))
"""
# Pattern for sub-clauses: (a), (i), etc.
subclause_pattern = r'\(([a-z]|[ivxlcdm]+)\)\s*([^()]{20,200}?)(?=\([a-z]|[ivxlcdm]+\)|$)'
matches = re.findall(subclause_pattern, text, re.IGNORECASE)
subclauses = list()
for ref, subtext in matches:
clean_text = subtext.strip()
if (len(clean_text) >= 20):
subclauses.append(f"({ref}) {clean_text}")
# Max 25 sub-clauses
return subclauses[:25]
def _deduplicate_and_rank(self, clauses: List[ExtractedClause], max_clauses: int) -> List[ExtractedClause]:
"""
Remove duplicates and rank by confidence + legal_bert_score
"""
if not clauses:
return []
# Sort by combined score (confidence * 0.6 + legal_bert_score * 0.4)
clauses.sort(key = lambda x: (x.confidence * 0.6 + x.legal_bert_score * 0.4), reverse = True)
# Deduplicate by text similarity
unique_clauses = list()
seen_texts = set()
for clause in clauses:
# Simple deduplication by first 100 chars
text_key = clause.text[:100].lower().strip()
# Also check similarity to already added clauses
is_duplicate = False
for existing in unique_clauses:
similarity = self._text_similarity(clause.text, existing.text)
if (similarity > 0.85):
is_duplicate = True
break
if text_key not in seen_texts and not is_duplicate:
unique_clauses.append(clause)
seen_texts.add(text_key)
if (len(unique_clauses) >= max_clauses):
break
return unique_clauses
def _text_similarity(self, text1: str, text2: str) -> float:
"""
Calculate text similarity (simple Jaccard similarity)
"""
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
intersection = len(words1 & words2)
union = len(words1 | words2)
return intersection / union if union > 0 else 0.0
def get_category_distribution(self, clauses: List[ExtractedClause]) -> Dict[str, int]:
"""
Get distribution of clause categories
"""
distribution = defaultdict(int)
for clause in clauses:
distribution[clause.category] += 1
log_info("Clause category distribution", distribution=dict(distribution))
return dict(distribution)
def get_high_risk_clauses(self, clauses: List[ExtractedClause]) -> List[ExtractedClause]:
"""
Get clauses with risk indicators
"""
risky = [c for c in clauses if c.risk_indicators]
risky.sort(key = lambda x: len(x.risk_indicators), reverse = True)
top_25_risky_clauses = risky[:25]
return top_25_risky_clauses
class RiskClauseExtractor:
"""
Risk-Focused Clause Extractor: Specifically for risk analysis using RiskRules framework for contract-type specific risk assessment
This will be used for: Risk analysis, protection gap detection, contract-type specific assessment
"""
def __init__(self, model_loader: ModelLoader, contract_type: ContractType):
"""
Initialize risk-focused clause extractor
Arguments:
----------
model_loader { ModelLoader } : ModelLoader instance
contract_type { ContractType } : Contract type for risk rule adjustments
"""
self.model_loader = model_loader
self.contract_type = contract_type
self.risk_rules = RiskRules()
# Models (lazy loaded)
self.legal_bert_model = None
self.legal_bert_tokenizer = None
self.embedding_model = None
self.device = None
# Risk category embeddings
self.risk_category_embeddings = dict()
# Text processor
self.text_processor = TextProcessor(use_spacy = False)
# Logger
self.logger = ContractAnalyzerLogger.get_logger()
# Contract-type specific weights
self.category_weights = self.risk_rules.get_adjusted_weights(contract_type)
# Lazy load
self._lazy_load()
def _lazy_load(self):
"""
Lazy load models for risk analysis
"""
if self.legal_bert_model is None:
try:
log_info("Loading models for risk-focused clause extraction...")
# Load Legal-BERT
self.legal_bert_model, self.legal_bert_tokenizer = self.model_loader.load_legal_bert()
self.device = self.model_loader.device
# Load embedding model
self.embedding_model = self.model_loader.load_embedding_model()
# Prepare risk category embeddings
self._prepare_risk_category_embeddings()
log_info("Risk clause extractor models loaded successfully")
except Exception as e:
log_error(e, context = {"component": "RiskClauseExtractor", "operation": "model_loading"})
raise
def _prepare_risk_category_embeddings(self):
"""
Prepare embeddings for risk categories using RiskRules framework
"""
log_info("Preparing risk category embeddings...")
# Create representative texts for each risk category
risk_category_texts = {'restrictive_covenants' : "Non-compete non-solicitation restrictive covenants competition limitations duration geographic scope industry restrictions",
'termination_rights' : "Termination notice period severance for cause without cause immediate termination at-will employment end of agreement",
'penalties_liability' : "Penalties liquidated damages liability limitations unlimited liability consequential damages indemnification hold harmless",
'compensation_benefits' : "Compensation salary wages benefits bonus commission equity stock options retirement health insurance paid time off",
'intellectual_property' : "Intellectual property IP ownership copyright patent trademark trade secrets work product inventions proprietary rights",
'confidentiality' : "Confidentiality non-disclosure proprietary information trade secrets protection secrecy confidential obligations",
'liability_indemnity' : "Liability indemnification hold harmless defense costs claims losses damages responsibility accountability",
'governing_law' : "Governing law jurisdiction venue dispute resolution arbitration mediation legal forum applicable law",
'payment_terms' : "Payment terms due date invoice billing net 30 late payment interest fees compensation remuneration",
'warranties' : "Warranties representations guarantees disclaimers merchantability fitness for purpose product quality service standards",
'dispute_resolution' : "Dispute resolution arbitration mediation litigation legal proceedings costs attorneys fees jurisdiction",
'assignment_change' : "Assignment transfer change control amendment modification consent approval successor parties",
'insurance' : "Insurance coverage liability insurance professional indemnity proof of insurance additional insured policy requirements",
'force_majeure' : "Force majeure act of god unforeseen circumstances beyond control natural disasters performance suspension"
}
for category, text in risk_category_texts.items():
embedding = self._get_legal_bert_embedding(text)
self.risk_category_embeddings[category] = embedding
log_info(f"Prepared risk embeddings for {len(self.risk_category_embeddings)} categories")
def _get_legal_bert_embedding(self, text: str) -> np.ndarray:
"""
Get Legal-BERT embedding for risk analysis
"""
inputs = self.legal_bert_tokenizer(text,
return_tensors = "pt",
padding = True,
truncation = True,
max_length = 512,
).to(self.device)
with torch.no_grad():
outputs = self.legal_bert_model(**inputs)
cls_embedding = outputs.last_hidden_state[:, 0, :].cpu().numpy()[0]
return cls_embedding
@ContractAnalyzerLogger.log_execution_time("extract_risk_clauses")
def extract_risk_clauses(self, contract_text: str, max_clauses: int = 20) -> List[ExtractedClause]:
"""
Extract clauses specifically for risk analysis using RiskRules framework
Process:
1. Focus on high-weight categories for this contract type
2. Use risk patterns from RiskRules
3. Calculate risk scores based on RiskRules factors
4. Prioritize clauses with high risk indicators
Arguments:
----------
contract_text { str } : Full contract text
max_clauses { int } : Maximum clauses to return
Returns:
--------
{ list } : Risk-focused clauses with calculated risk scores
"""
log_info("Starting risk-focused clause extraction",
contract_type = self.contract_type.value,
max_clauses = max_clauses,
)
# Use comprehensive extractor as base
comprehensive_extractor = ComprehensiveClauseExtractor(self.model_loader)
all_clauses = comprehensive_extractor.extract_clauses(contract_text = contract_text,
max_clauses = 50,
)
# Re-classify using risk framework
risk_clauses = self._reclassify_with_risk_framework(clauses = all_clauses)
# Calculate risk scores
risk_clauses = self._calculate_risk_scores(clauses = risk_clauses)
# Prioritize by risk score and contract-type relevance
prioritized = self._prioritize_risk_clauses(clauses = risk_clauses,
max_clauses = max_clauses,
)
log_info(f"Extracted {len(prioritized)} risk-focused clauses")
return prioritized
def _reclassify_with_risk_framework(self, clauses: List[ExtractedClause]) -> List[ExtractedClause]:
"""
Re-classify clauses using RiskRules categories and weights
"""
risk_classified = list()
for clause in clauses:
# Map to risk categories and calculate relevance
risk_category, risk_confidence = self._classify_with_risk_categories(text = clause.text)
# Update clause with risk classification
clause.category = risk_category
clause.confidence = risk_confidence
risk_classified.append(clause)
return risk_classified
def _classify_with_risk_categories(self, text: str) -> Tuple[str, float]:
"""
Classify text using RiskRules categories with contract-type weights
"""
text_lower = text.lower()
# Keyword matching with risk categories
keyword_scores = dict()
for risk_category in self.risk_rules.CATEGORY_WEIGHTS.keys():
# Get keywords from risk rules patterns
keywords = self._get_keywords_for_risk_category(risk_category = risk_category)
keyword_count = sum(1 for kw in keywords if kw in text_lower)
base_score = (keyword_count / max(len(keywords), 1)) * 100
# Apply contract-type specific weight
weight = self.category_weights.get(risk_category, 1.0)
keyword_scores[risk_category] = base_score * weight
# Legal-BERT similarity with risk categories
semantic_scores = dict()
text_embedding = self._get_legal_bert_embedding(text = text)
text_tensor = torch.tensor(text_embedding).unsqueeze(0)
for risk_category, category_embedding in self.risk_category_embeddings.items():
cat_tensor = torch.tensor(category_embedding).unsqueeze(0)
similarity = torch.nn.functional.cosine_similarity(text_tensor, cat_tensor).item()
semantic_scores[risk_category] = similarity * 100 # Scale to 0-100
# Combine scores (60% semantic, 40% keyword)
combined_scores = dict()
for risk_category in self.risk_rules.CATEGORY_WEIGHTS.keys():
combined = (semantic_scores.get(risk_category, 0) * 0.6 + keyword_scores.get(risk_category, 0) * 0.4)
combined_scores[risk_category] = combined
# Get best category
best_category = max(combined_scores, key = combined_scores.get)
# Normalize to 0-1
confidence = min(combined_scores[best_category] / 100, 1.0)
return best_category, confidence
def _get_keywords_for_risk_category(self, risk_category: str) -> List[str]:
"""
Get relevant keywords for a risk category from RiskRules patterns
"""
# Map risk categories to relevant keywords from RiskRules
keyword_map = {'restrictive_covenants' : ['non-compete', 'non-solicit', 'restrictive', 'covenant', 'competition', 'geographic', 'duration'],
'termination_rights' : ['termination', 'notice', 'severance', 'dismissal', 'resignation', 'for cause', 'without cause'],
'penalties_liability' : ['penalty', 'liquidated damages', 'liability', 'indemnification', 'hold harmless', 'damages'],
'compensation_benefits' : ['compensation', 'salary', 'benefits', 'bonus', 'commission', 'equity', 'insurance'],
'intellectual_property' : ['intellectual property', 'ip', 'copyright', 'patent', 'trademark', 'inventions'],
'confidentiality' : ['confidential', 'proprietary', 'trade secret', 'non-disclosure'],
'liability_indemnity' : ['liability', 'indemnification', 'hold harmless', 'defend', 'claims'],
'governing_law' : ['governing law', 'jurisdiction', 'venue', 'dispute resolution'],
'payment_terms' : ['payment', 'due', 'invoice', 'net 30', 'late payment'],
'warranties' : ['warranty', 'representation', 'guarantee', 'disclaimer'],
'dispute_resolution' : ['arbitration', 'mediation', 'dispute', 'litigation'],
'assignment_change' : ['assignment', 'transfer', 'amendment', 'modification'],
'insurance' : ['insurance', 'coverage', 'policy', 'insured'],
'force_majeure' : ['force majeure', 'act of god', 'beyond control'],
}
return keyword_map.get(risk_category, [])
def _calculate_risk_scores(self, clauses: List[ExtractedClause]) -> List[ExtractedClause]:
"""
Calculate risk scores for clauses based on RiskRules factors
"""
for clause in clauses:
risk_score = self._calculate_single_clause_risk(clause = clause)
clause.risk_score = risk_score
return clauses
def _calculate_single_clause_risk(self, clause: ExtractedClause) -> float:
"""
Calculate risk score using RiskRules framework
"""
base_score = 0.0
text_lower = clause.text.lower()
# Base risk from category weight (adjusted for contract type)
category_weight = self.category_weights.get(clause.category, 1.0)
base_score += category_weight
# Add risk from CLAUSE_RISK_FACTORS (red flags)
factor_config = self.risk_rules.CLAUSE_RISK_FACTORS.get(clause.category)
if factor_config:
for red_flag, adjustment in factor_config["red_flags"].items():
if red_flag in text_lower:
base_score += adjustment
# Add risk from RISKY_PATTERNS (with actual scores)
for pattern, score, description in self.risk_rules.RISKY_PATTERNS:
if re.search(pattern, text_lower):
base_score += score
# Add risk from CRITICAL_KEYWORDS
for keyword, risk_score in self.risk_rules.CRITICAL_KEYWORDS.items():
if re.search(rf'\b{re.escape(keyword)}\b', text_lower):
base_score += risk_score
# Cap final score at 100
return min(max(base_score, 0), 100)
def _extract_risk_indicators(self, text: str) -> List[str]:
"""
Extract risk indicators using RiskRules patterns
"""
text_lower = text.lower()
indicators = list()
# Check critical risk patterns
for pattern, score, description in self.risk_rules.RISKY_PATTERNS:
if re.search(pattern, text_lower):
indicators.append(description)
# Check keyword risk indicators
for indicator in self.risk_rules.CRITICAL_KEYWORDS.keys():
if indicator in text_lower:
indicators.append(indicator)
for indicator in self.risk_rules.HIGH_RISK_KEYWORDS.keys():
if indicator in text_lower:
indicators.append(indicator)
return indicators
def _check_risk_patterns(self, text: str) -> float:
"""
Check for high-risk patterns from RiskRules
"""
text_lower = text.lower()
pattern_risk = 0.0
# Check risky patterns
for pattern, score, description in self.risk_rules.RISKY_PATTERNS:
if re.search(pattern, text_lower):
pattern_risk += score
# Cap pattern risk
return min(pattern_risk, 20)
def _prioritize_risk_clauses(self, clauses: List[ExtractedClause], max_clauses: int) -> List[ExtractedClause]:
"""
Prioritize clauses by risk score and contract-type relevance
"""
# Sort by risk score (descending)
clauses.sort(key = lambda x: x.risk_score, reverse = True)
# Take top clauses
return clauses[:max_clauses]
def detect_missing_protections(self, extracted_clauses: List[ExtractedClause]) -> List[Dict]:
"""
Detect missing critical protections based on contract type
"""
missing = list()
checklist = self.risk_rules.PROTECTION_CHECKLIST
for protection, config in checklist.items():
if not self._has_protection(extracted_clauses, protection, config['categories']):
missing.append({"protection" : protection,
"importance" : config['importance'],
"risk_if_missing" : config['risk_if_missing'],
"categories" : config['categories'],
})
log_info(f"Detected {len(missing)} missing protections")
return missing
def _has_protection(self, clauses: List[ExtractedClause], protection: str, categories: List[str]) -> bool:
"""
Check if protection exists in extracted clauses
"""
protection_patterns = {'for_cause_definition' : ['for cause', 'cause defined', 'termination for cause'],
'severance_provision' : ['severance', 'severance pay', 'termination benefits'],
'mutual_indemnification' : ['mutual indemnification', 'both parties indemnify'],
'liability_cap' : ['liability cap', 'limited liability', 'maximum liability'],
'prior_ip_exclusion' : ['prior inventions', 'pre-existing ip', 'prior intellectual property'],
'confidentiality_duration' : ['confidentiality period', 'duration of confidentiality'],
'dispute_resolution' : ['dispute resolution', 'arbitration', 'mediation'],
'change_control_process' : ['change control', 'amendment process', 'modification procedure'],
'insurance_requirements' : ['insurance requirements', 'maintain insurance'],
'force_majeure' : ['force majeure', 'act of god'],
}
patterns = protection_patterns.get(protection, [])
for clause in clauses:
if clause.category in categories:
text_lower = clause.text.lower()
if any(pattern in text_lower for pattern in patterns):
return True
return False