Spaces:
Running
Running
File size: 39,742 Bytes
bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 522f7a0 1099afe bdedf43 522f7a0 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe 522f7a0 1099afe bdedf43 522f7a0 1099afe bdedf43 522f7a0 1099afe bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 1099afe bdedf43 522f7a0 bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 1099afe 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 1099afe bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 1099afe bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe 522f7a0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 |
# DEPENDENCIES
import re
import sys
from typing import List
from typing import Dict
from typing import Tuple
from pathlib import Path
from typing import Optional
from collections import Counter
# Add parent directory to path for imports
sys.path.append(str(Path(__file__).parent.parent))
from utils.logger import log_info
from utils.logger import log_error
from config.risk_rules import RiskRules
from config.risk_rules import ContractType
from utils.logger import ContractAnalyzerLogger
from services.data_models import ExtractedClause
from services.data_models import UnfavorableTerm
class TermAnalyzer:
"""
Detect unfavorable and one-sided terms in contracts using RiskRules framework and integrated with comprehensive risk analysis system
"""
def __init__(self, contract_type: ContractType = ContractType.GENERAL):
"""
Initialize term analyzer with contract-type specific risk rules
Arguments:
----------
contract_type { ContractType } : Contract type for risk rule adjustments
"""
self.contract_type = contract_type
self.risk_rules = RiskRules()
self.logger = ContractAnalyzerLogger.get_logger()
# Contract-type specific weights
self.category_weights = self.risk_rules.get_adjusted_weights(contract_type)
log_info("TermAnalyzer initialized",
contract_type = contract_type.value,
category_weights = self.category_weights,
)
def _map_to_risk_category(self, clause_category: str) -> str:
"""
Map clause category to risk category for proper risk scoring for ensureing unfavorable terms are correctly attributed to risk categories
for score calculation
"""
# Clause categories → Risk categories
mapping = {"non_compete" : "restrictive_covenants",
"confidentiality" : "restrictive_covenants",
"termination" : "termination_rights",
"indemnification" : "liability_indemnity",
"liability" : "penalties_liability",
"compensation" : "compensation_benefits",
"intellectual_property" : "intellectual_property",
"warranty" : "warranties",
"dispute_resolution" : "dispute_resolution",
"assignment" : "assignment_change",
"amendment" : "assignment_change",
"insurance" : "insurance",
"force_majeure" : "force_majeure",
"general" : "general",
"payment" : "payment_terms",
"governing_law" : "governing_law",
}
risk_category_by_clause_category = mapping.get(clause_category, clause_category)
return risk_category_by_clause_category
@ContractAnalyzerLogger.log_execution_time("analyze_unfavorable_terms")
def analyze_unfavorable_terms(self, contract_text: str, clauses: List[ExtractedClause], contract_type: Optional[ContractType] = None) -> List[UnfavorableTerm]:
"""
Detect all unfavorable terms using RiskRules framework
Arguments:
----------
contract_text { str } : Full contract text
clauses { list } : Extracted clauses
contract_type { ContractType } : Override contract type
Returns:
--------
{ list } : List of UnfavorableTerm objects
"""
# Update contract type if provided
if contract_type:
self.contract_type = contract_type
self.category_weights = self.risk_rules.get_adjusted_weights(contract_type)
log_info("Starting unfavorable terms analysis",
text_length = len(contract_text),
num_clauses = len(clauses),
contract_type = self.contract_type.value,
)
unfavorable_terms = list()
# Clause-level analysis using RiskRules patterns
for clause in clauses:
terms = self._analyze_clause_with_risk_rules(clause = clause)
unfavorable_terms.extend(terms)
# Cross-clause analysis for systemic issues
cross_clause_terms = self._analyze_cross_clause_issues(text = contract_text,
clauses = clauses,
)
unfavorable_terms.extend(cross_clause_terms)
# PHASE 3: Missing protections analysis
missing_protections = self._analyze_missing_protections(clauses = clauses)
unfavorable_terms.extend(missing_protections)
# PHASE 4: Industry benchmark analysis
benchmark_issues = self._analyze_against_benchmarks(clauses = clauses)
unfavorable_terms.extend(benchmark_issues)
# Deduplicate and prioritize by risk
final_terms = self._deduplicate_and_prioritize(terms = unfavorable_terms)
log_info("Unfavorable terms analysis complete",
total_found = len(final_terms),
critical = sum(1 for t in final_terms if (t.severity == "critical")),
high = sum(1 for t in final_terms if (t.severity == "high")))
return final_terms
def _analyze_clause_with_risk_rules(self, clause: ExtractedClause) -> List[UnfavorableTerm]:
"""
Analyze clause using comprehensive RiskRules framework
"""
terms = list()
text_lower = clause.text.lower()
# Map clause category to risk category for consistency
risk_category = self._map_to_risk_category(clause_category = clause.category)
# Risky Patterns Analysis from RiskRules
for pattern, risk_score, description in self.risk_rules.RISKY_PATTERNS:
matches = re.finditer(pattern, text_lower, re.IGNORECASE)
for match in matches:
severity = self._score_to_severity(risk_score)
terms.append(UnfavorableTerm(term = description,
category = risk_category,
severity = severity,
explanation = self._generate_pattern_explanation(description, match.group()),
risk_score = risk_score,
clause_reference = clause.reference,
suggested_fix = self._generate_pattern_fix(description, clause.category),
contract_type = self.contract_type.value,
specific_text = match.group(),
legal_basis = self._get_legal_basis(description),
)
)
# Critical Keyword Analysis from RiskRules
for keyword, risk_score in self.risk_rules.CRITICAL_KEYWORDS.items():
if re.search(rf'\b{re.escape(keyword)}\b', text_lower):
severity = self._score_to_severity(risk_score)
terms.append(UnfavorableTerm(term = f"Critical Risk: {keyword.title()}",
category = risk_category,
severity = severity,
explanation = self._generate_keyword_explanation(keyword, clause.category),
risk_score = risk_score,
clause_reference = clause.reference,
suggested_fix = self._generate_keyword_fix(keyword, clause.category),
contract_type = self.contract_type.value,
specific_text = keyword,
legal_basis = self._get_legal_basis(keyword),
)
)
# High Risk Keyword Analysis
for keyword, risk_score in self.risk_rules.HIGH_RISK_KEYWORDS.items():
if re.search(rf'\b{re.escape(keyword)}\b', text_lower):
severity = self._score_to_severity(risk_score)
terms.append(UnfavorableTerm(term = f"High Risk: {keyword.title()}",
category = risk_category,
severity = severity,
explanation = self._generate_keyword_explanation(keyword, clause.category),
risk_score = risk_score,
clause_reference = clause.reference,
suggested_fix = self._generate_keyword_fix(keyword, clause.category),
contract_type = self.contract_type.value,
specific_text = keyword,
legal_basis = self._get_legal_basis(keyword),
)
)
# Clause-specific Risk Factors From RiskRules.CLAUSE_RISK_FACTORS
clause_risk_analysis = self._analyze_clause_risk_factors(clause)
terms.extend(clause_risk_analysis)
return terms
def _analyze_clause_risk_factors(self, clause: ExtractedClause) -> List[UnfavorableTerm]:
"""
Analyze clause using CLAUSE_RISK_FACTORS from RiskRules
"""
terms = list()
# Map clause categories to risk factors
category_mapping = {'non_compete' : 'restrictive_covenants',
'termination' : 'termination_rights',
'indemnification' : 'liability_indemnity',
'compensation' : 'compensation_benefits',
'intellectual_property' : 'intellectual_property',
'confidentiality' : 'confidentiality',
'liability' : 'penalties_liability',
'warranty' : 'warranties',
'dispute_resolution' : 'dispute_resolution',
'assignment' : 'assignment_change',
'insurance' : 'insurance',
'force_majeure' : 'force_majeure',
}
risk_factors_key = category_mapping.get(clause.category)
if not risk_factors_key or risk_factors_key not in self.risk_rules.CLAUSE_RISK_FACTORS:
return terms
risk_factors = self.risk_rules.CLAUSE_RISK_FACTORS[risk_factors_key]
text_lower = clause.text.lower()
# Map clause category to risk category for consistency
risk_category = self._map_to_risk_category(clause_category = clause.category)
# Check for red flags in this clause
for red_flag, risk_adjustment in risk_factors["red_flags"].items():
if (red_flag in text_lower):
base_risk = risk_factors["base_risk"]
total_risk = base_risk + risk_adjustment
severity = self._score_to_severity(total_risk)
terms.append(UnfavorableTerm(term = f"Risk Factor: {red_flag.replace('_', ' ').title()}",
category = risk_category,
severity = severity,
explanation = f"Base risk {base_risk} + {risk_adjustment} for '{red_flag}'. {self._get_risk_factor_explanation(risk_factors_key, red_flag)}",
risk_score = total_risk,
clause_reference = clause.reference,
suggested_fix = self._get_risk_factor_fix(risk_factors_key, red_flag),
contract_type = self.contract_type.value,
specific_text = red_flag,
legal_basis = self._get_legal_basis(red_flag)
)
)
return terms
def _analyze_cross_clause_issues(self, text: str, clauses: List[ExtractedClause]) -> List[UnfavorableTerm]:
"""
Detect systemic issues spanning multiple clauses
"""
terms = list()
# Notice period imbalance (from your original but enhanced)
notice_imbalance = self._check_notice_imbalance(clauses = clauses)
if notice_imbalance:
# Ensure the category used is a risk category
notice_imbalance.category = self._map_to_risk_category(clause_category = "termination")
terms.append(notice_imbalance)
# Missing reciprocal provisions
missing_reciprocal = self._check_missing_reciprocal(text = text,
clauses = clauses,
)
for item in missing_reciprocal:
# Ensure the category used is a risk category
item.category = self._map_to_risk_category(clause_category = "indemnification")
terms.extend(missing_reciprocal)
# Conflicting clauses
conflicts = self._check_conflicting_clauses(clauses = clauses)
for item in conflicts:
# Ensure the category used is a risk category
item.category = self._map_to_risk_category(clause_category = item.category)
terms.extend(conflicts)
# One-sided discretionary powers
one_sided_powers = self._check_one_sided_discretion(clauses = clauses)
for item in one_sided_powers:
# Ensure the category used is a risk category
item.category = self._map_to_risk_category(clause_category = item.category)
terms.extend(one_sided_powers)
return terms
def _analyze_missing_protections(self, clauses: List[ExtractedClause]) -> List[UnfavorableTerm]:
"""
Analyze missing critical protections using PROTECTION_CHECKLIST
"""
terms = list()
for protection, config in self.risk_rules.PROTECTION_CHECKLIST.items():
if not self._has_protection(clauses, protection, config['categories']):
# For missing protections, map the first associated category to a risk category
# This assumes config['categories'][0] is a clause category like "termination"
risk_category = self._map_to_risk_category(clause_category = config['categories'][0]) if config['categories'] else "general"
terms.append(UnfavorableTerm(term = f"Missing Protection: {protection.replace('_', ' ').title()}",
category = risk_category,
severity = self._score_to_severity(config['risk_if_missing']),
explanation = f"Missing critical protection: {protection}. {self._get_missing_protection_explanation(protection)}",
risk_score = config['risk_if_missing'],
suggested_fix = self._get_missing_protection_fix(protection),
contract_type = self.contract_type.value,
legal_basis = f"Standard protection in {self.contract_type.value} contracts",
)
)
return terms
def _analyze_against_benchmarks(self, clauses: List[ExtractedClause]) -> List[UnfavorableTerm]:
"""
Compare terms against industry benchmarks
"""
terms = list()
for clause in clauses:
benchmark_issues = self._check_benchmark_compliance(clause = clause)
for item in benchmark_issues:
# Ensure the category used is a risk category
item.category = self._map_to_risk_category(clause_category = clause.category)
terms.extend(benchmark_issues)
return terms
def _check_notice_imbalance(self, clauses: List[ExtractedClause]) -> Optional[UnfavorableTerm]:
"""
Enhanced notice period imbalance detection
"""
term_clauses = [c for c in clauses if (c.category == "termination")]
if not term_clauses:
return None
text = " ".join([c.text for c in term_clauses])
# Pattern matching for notice periods
notice_patterns = [r'(\d+)\s*days?\s*notice',
r'notice\s*of\s*(\d+)\s*days',
r'(\d+)\s*days?\s*prior\s*notice',
r'written\s*notice\s*of\s*(\d+)\s*days',
]
all_periods = list()
for pattern in notice_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
all_periods.extend([int(m) for m in matches])
if (len(all_periods) >= 2):
min_period = min(all_periods)
max_period = max(all_periods)
ratio = max_period / min_period
if (ratio >= 2):
severity = "critical" if (ratio >= 3) else "high"
risk_score = 80 if (ratio >= 3) else 60
# Use the risk category mapping for termination
risk_category = self._map_to_risk_category(clause_category = "termination")
return UnfavorableTerm(term = "Imbalanced Notice Periods",
category = risk_category,
severity = severity,
explanation = f"Significant notice period imbalance: {max_period} days vs {min_period} days (ratio: {ratio:.1f}x). Creates unfair burden.",
risk_score = risk_score,
clause_reference = term_clauses[0].reference,
suggested_fix = f"Equalize notice periods to reasonable duration (e.g., 30 days mutual notice).",
contract_type = self.contract_type.value,
benchmark_info = f"Industry standard: Mutual 30-day notice periods",
)
return None
def _check_missing_reciprocal(self, text: str, clauses: List[ExtractedClause]) -> List[UnfavorableTerm]:
"""
Enhanced reciprocal provision analysis
"""
terms = list()
# Check indemnification reciprocity
indem_clauses = [c for c in clauses if (c.category == "indemnification")]
if indem_clauses:
has_one_sided = any(re.search(r'(you|employee|consultant|contractor)\s+shall\s+indemnify', c.text, re.IGNORECASE) for c in indem_clauses)
has_mutual = any("mutual" in c.text.lower() or "both parties" in c.text.lower() or "each party" in c.text.lower() for c in indem_clauses)
if has_one_sided and not has_mutual:
# Use the risk category mapping for indemnification
risk_category = self._map_to_risk_category(clause_category = "indemnification")
terms.append(UnfavorableTerm(term = "Non-Reciprocal Indemnification",
category = risk_category,
severity = "critical",
explanation = "One-sided indemnification creates unlimited liability exposure without reciprocal protection.",
risk_score = 85,
clause_reference = indem_clauses[0].reference,
suggested_fix = "Change to mutual indemnification: 'Each party shall indemnify the other for losses arising from their respective breach or negligence.'",
contract_type = self.contract_type.value,
legal_basis = "Mutuality of obligation principle",
)
)
return terms
def _check_conflicting_clauses(self, clauses: List[ExtractedClause]) -> List[UnfavorableTerm]:
"""
Detect conflicting clauses
"""
terms = list()
# Group clauses by category for conflict analysis
by_category = dict()
for clause in clauses:
# Map the clause category to the risk category for grouping purposes
risk_cat = self._map_to_risk_category(clause_category = clause.category)
if risk_cat not in by_category:
by_category[risk_cat] = []
by_category[risk_cat].append(clause)
# Check for conflicts within each category
for risk_category, category_clauses in by_category.items():
if (len(category_clauses) >= 2):
for i, clause1 in enumerate(category_clauses):
for clause2 in category_clauses[i+1:]:
if (self._are_clauses_conflicting(clause1, clause2)):
terms.append(UnfavorableTerm(term = f"Conflicting {risk_category.title()} Clauses",
category = risk_category,
severity = "high",
explanation = f"Clauses {clause1.reference} and {clause2.reference} contain conflicting terms creating legal ambiguity.",
risk_score = 70,
clause_reference = f"{clause1.reference}, {clause2.reference}",
suggested_fix = "Consolidate into single consistent clause or clarify precedence.",
contract_type = self.contract_type.value,
)
)
return terms
def _check_one_sided_discretion(self, clauses: List[ExtractedClause]) -> List[UnfavorableTerm]:
"""
Check for one-sided discretionary powers
"""
terms = list()
for clause in clauses:
text_lower = clause.text.lower()
# Look for one-sided discretionary language
if re.search(r'(sole|absolute|unfettered|unilateral)\s+(discretion|right|authority)', text_lower):
if not re.search(r'(mutual|both parties|reasonable)\s+(discretion|agreement)', text_lower):
# Use the risk category mapping for the clause's category
risk_category = self._map_to_risk_category(clause_category = clause.category)
terms.append(UnfavorableTerm(term = "One-Sided Discretionary Power",
category = risk_category,
severity = "high",
explanation = "Gives one party unilateral decision-making power without accountability standards.",
risk_score = 75,
clause_reference = clause.reference,
suggested_fix = "Change to 'reasonable discretion' or require 'mutual agreement'.",
contract_type = self.contract_type.value,
legal_basis = "Doctrine of good faith and fair dealing",
)
)
return terms
def _check_benchmark_compliance(self, clause: ExtractedClause) -> List[UnfavorableTerm]:
"""
Check clause against industry benchmarks
"""
terms = list()
# Non-compete duration benchmark
if (clause.category == "non_compete"):
duration_match = re.search(r'(\d+)\s*(month|year)', clause.text.lower())
if duration_match:
duration = int(duration_match.group(1))
unit = duration_match.group(2)
# Convert to months for comparison
total_months = duration * (12 if (unit == "year") else 1)
benchmarks = self.risk_rules.INDUSTRY_BENCHMARKS.get('non_compete_duration', {})
industry_benchmark = benchmarks.get(self.contract_type.value, benchmarks.get('general', {}))
if industry_benchmark:
reasonable = industry_benchmark.get('reasonable', 12)
excessive = industry_benchmark.get('excessive', 24)
if (total_months > excessive):
# Use the risk category mapping for non_compete
risk_category = self._map_to_risk_category(clause_category = clause.category)
terms.append(UnfavorableTerm(term = "Excessive Non-Compete Duration",
category = risk_category,
severity = "critical",
explanation = f"{duration} {unit} non-compete exceeds industry excessive threshold of {excessive} months.",
risk_score = 90,
clause_reference = clause.reference,
suggested_fix = f"Reduce to {reasonable} months maximum.",
contract_type = self.contract_type.value,
benchmark_info = f"Industry standard: {reasonable} months reasonable, {excessive} months excessive",
)
)
return terms
def _has_protection(self, clauses: List[ExtractedClause], protection: str, categories: List[str]) -> bool:
"""
Check if protection exists in clauses
"""
protection_patterns = {'for_cause_definition' : ['for cause', 'cause defined', 'termination for cause', 'just cause'],
'severance_provision' : ['severance', 'severance pay', 'termination benefits', 'separation pay'],
'mutual_indemnification' : ['mutual indemnification', 'both parties indemnify', 'each party shall indemnify'],
'liability_cap' : ['liability cap', 'limited liability', 'maximum liability', 'cap on damages'],
'prior_ip_exclusion' : ['prior inventions', 'pre-existing ip', 'prior intellectual property', 'background ip'],
'confidentiality_duration' : ['confidentiality period', 'duration of confidentiality', 'term of confidentiality'],
'dispute_resolution' : ['dispute resolution', 'arbitration', 'mediation', 'alternative dispute resolution'],
'change_control_process' : ['change control', 'amendment process', 'modification procedure', 'change order'],
'insurance_requirements' : ['insurance requirements', 'maintain insurance', 'proof of insurance'],
'force_majeure' : ['force majeure', 'act of god', 'unforeseeable circumstances'],
}
patterns = protection_patterns.get(protection, [])
relevant_clauses = [c for c in clauses if not categories or c.category in categories]
for clause in relevant_clauses:
text_lower = clause.text.lower()
if any(pattern in text_lower for pattern in patterns):
return True
return False
# HELPER METHODS FOR EXPLANATIONS AND FIXES
def _score_to_severity(self, score: float) -> str:
"""
Convert risk score to severity level
"""
if (score >= 80):
return "critical"
elif (score >= 60):
return "high"
elif (score >= 40):
return "medium"
else:
return "low"
def _generate_pattern_explanation(self, pattern_desc: str, matched_text: str) -> str:
"""
Generate explanation for pattern matches
"""
explanations = {"Long duration restrictive covenant" : f"Overly long restrictive period found: '{matched_text}'. May unreasonably restrict future employment.",
"Overly broad geographic/industry scope" : f"Excessively broad scope: '{matched_text}'. Could prevent working in entire industries or regions.",
"Unequal notice periods" : f"Imbalanced notice requirements: '{matched_text}'. Creates unfair advantage for one party.",
"Unlimited liability exposure" : f"Uncapped liability: '{matched_text}'. Exposes to potentially catastrophic financial risk.",
}
return explanations.get(pattern_desc, f"Risk pattern detected: {pattern_desc}")
def _generate_pattern_fix(self, pattern_desc: str, category: str) -> str:
"""
Generate fix suggestions for patterns
"""
fixes = {"Long duration restrictive covenant" : "Limit to 6-12 months maximum with reasonable geographic scope.",
"Overly broad geographic/industry scope" : "Narrow to specific competitors and reasonable geographic area.",
"Unequal notice periods" : "Equalize notice periods for both parties (e.g., 30 days mutual notice).",
"Unlimited liability exposure" : "Add mutual liability cap (e.g., fees paid in preceding 12 months).",
}
return fixes.get(pattern_desc, "Review and modify to reasonable industry standards.")
def _generate_keyword_explanation(self, keyword: str, category: str) -> str:
"""
Generate explanations for keyword risks
"""
explanations = {"non-compete" : "Restrictive covenant limiting future employment opportunities.",
"unlimited liability" : "No cap on financial exposure - potentially catastrophic risk.",
"sole discretion" : "Unilateral decision-making power without accountability.",
"at-will" : "Termination without cause or protection - high job insecurity."
}
return explanations.get(keyword, f"High-risk term '{keyword}' detected in {category} clause.")
def _generate_keyword_fix(self, keyword: str, category: str) -> str:
"""
Generate fixes for keyword risks
"""
fixes = {"non-compete" : "Limit duration to 12 months maximum and narrow geographic scope.",
"unlimited liability" : "Add mutual liability cap based on contract value.",
"sole discretion" : "Change to 'reasonable discretion' or require 'mutual agreement'.",
"at-will" : "Add 'for cause' definition and reasonable notice period.",
}
return fixes.get(keyword, "Modify to reasonable industry standards.")
def _get_legal_basis(self, issue: str) -> str:
"""
Get legal basis for risk issue
"""
legal_bases = {"non-compete" : "Reasonableness standard for restrictive covenants",
"unlimited liability" : "Unconscionability doctrine",
"sole discretion" : "Doctrine of good faith and fair dealing",
"at-will" : "Employment protection statutes",
"unequal notice" : "Mutuality of obligation principle",
}
return legal_bases.get(issue, "General contract law principles")
def _get_risk_factor_explanation(self, risk_category: str, red_flag: str) -> str:
"""
Get explanation for risk factor red flags
"""
explanations = {"restrictive_covenants": {"entire industry" : "Prohibits working in entire industry, not just direct competitors",
"worldwide" : "Geographic scope is unreasonably broad",
}
}
return explanations.get(risk_category, {}).get(red_flag, "Increases risk exposure")
def _get_risk_factor_fix(self, risk_category: str, red_flag: str) -> str:
"""
Get fix for risk factor issues
"""
fixes = {"restrictive_covenants": {"entire industry" : "Limit to direct competitors only",
"worldwide" : "Narrow to specific geographic regions",
}
}
return fixes.get(risk_category, {}).get(red_flag, "Modify to reasonable standards")
def _get_missing_protection_explanation(self, protection: str) -> str:
"""
Get explanation for missing protections
"""
explanations = {"liability_cap" : "No limit on potential financial damages",
"mutual_indemnification" : "One-sided liability protection",
"prior_ip_exclusion" : "Could claim ownership of your existing work",
}
return explanations.get(protection, "Critical protection missing from contract")
def _get_missing_protection_fix(self, protection: str) -> str:
"""
Get fix for missing protections
"""
fixes = {"liability_cap" : "Add mutual liability cap clause",
"mutual_indemnification" : "Add reciprocal indemnification",
"prior_ip_exclusion" : "Add prior IP exclusion clause",
}
return fixes.get(protection, "Add appropriate protection clause")
def _are_clauses_conflicting(self, clause1: ExtractedClause, clause2: ExtractedClause) -> bool:
"""
Conflict detection between clauses
"""
# Extract key numbers and terms
nums1 = set(re.findall(r'\b\d+\b', clause1.text))
nums2 = set(re.findall(r'\b\d+\b', clause2.text))
# If both have numbers but no overlap, potential conflict
if nums1 and nums2 and not nums1.intersection(nums2):
return True
# Check for contradictory language
contradictions = [("shall", "shall not"),
("must", "may not"),
("required", "prohibited"),
]
for positive, negative in contradictions:
if (positive in clause1.text.lower() and negative in clause2.text.lower()) or (positive in clause2.text.lower() and negative in clause1.text.lower()):
return True
return False
def _deduplicate_and_prioritize(self, terms: List[UnfavorableTerm]) -> List[UnfavorableTerm]:
"""
Remove duplicates and sort by risk score
"""
seen = set()
unique_terms = list()
for term in terms:
# Create unique key based on term, category, and specific text
key = (term.term, term.category, term.specific_text)
if key not in seen:
seen.add(key)
unique_terms.append(term)
# Sort by risk score (descending)
unique_terms.sort(key = lambda t: t.risk_score, reverse = True)
# Return top 25 most critical terms
return unique_terms[:25]
def get_severity_distribution(self, terms: List[UnfavorableTerm]) -> Dict[str, int]:
"""
Get distribution by severity
"""
distribution = {"critical" : 0,
"high" : 0,
"medium" : 0,
"low" : 0,
}
for term in terms:
distribution[term.severity] = distribution.get(term.severity, 0) + 1
log_info("Unfavorable terms severity distribution", **distribution)
return distribution
def get_category_distribution(self, terms: List[UnfavorableTerm]) -> Dict[str, int]:
"""
Get distribution by category
"""
categories = [t.category for t in terms]
distribution = dict(Counter(categories))
log_info("Unfavorable terms category distribution", **distribution)
return distribution
|