Spaces:
Running
Running
File size: 28,229 Bytes
bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 1099afe bdedf43 522f7a0 1099afe bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 1099afe bdedf43 1099afe bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 1099afe bdedf43 1099afe 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 1099afe 522f7a0 bdedf43 1099afe bdedf43 522f7a0 bdedf43 522f7a0 1099afe bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 1099afe 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 1099afe bdedf43 522f7a0 1099afe 522f7a0 1099afe 522f7a0 bdedf43 522f7a0 1099afe 522f7a0 1099afe 522f7a0 bdedf43 1099afe bdedf43 522f7a0 bdedf43 522f7a0 1099afe 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 bdedf43 522f7a0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 |
# DEPENDENCIES
import re
import sys
from typing import List
from typing import Dict
from typing import Tuple
from pathlib import Path
from typing import Optional
# Add parent directory to path for imports
sys.path.append(str(Path(__file__).parent.parent))
from utils.logger import log_info
from utils.logger import log_error
from config.risk_rules import RiskRules
from config.risk_rules import ContractType
from utils.logger import ContractAnalyzerLogger
from services.data_models import ExtractedClause
from services.data_models import MissingProtection
class ProtectionChecker:
"""
Check for missing critical protections in contracts using RiskRules framework
"""
def __init__(self, contract_type: ContractType = ContractType.GENERAL):
"""
Initialize protection checker with contract-type specific analysis
Arguments:
----------
contract_type { ContractType } : Contract type for protection prioritization
"""
self.contract_type = contract_type
self.rules = RiskRules()
self.logger = ContractAnalyzerLogger.get_logger()
# Contract-type specific protection priorities
self.protection_priorities = self._get_contract_type_priorities()
log_info("ProtectionChecker initialized",
contract_type = self.contract_type.value,
protection_count = len(self.rules.PROTECTION_CHECKLIST),
)
def _get_contract_type_priorities(self) -> Dict[str, List[str]]:
"""
Get protection priorities by contract type
"""
priorities = {ContractType.EMPLOYMENT.value : ['for_cause_definition', 'severance_provision', 'prior_ip_exclusion', 'confidentiality_duration'],
ContractType.SOFTWARE.value : ['liability_cap', 'prior_ip_exclusion', 'mutual_indemnification', 'dispute_resolution'],
ContractType.CONSULTING.value : ['liability_cap', 'mutual_indemnification', 'payment_terms', 'change_control_process'],
ContractType.NDA.value : ['confidentiality_duration', 'prior_ip_exclusion', 'dispute_resolution'],
ContractType.LEASE.value : ['dispute_resolution', 'change_control_process', 'insurance_requirements'],
ContractType.PURCHASE.value : ['liability_cap', 'warranty_protection', 'dispute_resolution'],
ContractType.GENERAL.value : ['liability_cap', 'mutual_indemnification', 'dispute_resolution'],
}
return priorities.get(self.contract_type.value, [])
@ContractAnalyzerLogger.log_execution_time("check_missing_protections")
def check_missing_protections(self, contract_text: str, clauses: List[ExtractedClause], contract_type: Optional[ContractType] = None) -> List[MissingProtection]:
"""
Identify all missing protections using comprehensive RiskRules framework
Arguments:
----------
contract_text { str } : Full contract text
clauses { list } : Extracted clauses
contract_type { ContractType } : Override contract type
Returns:
--------
{ list } : List of MissingProtection objects
"""
# Update contract type if provided
if contract_type:
self.contract_type = contract_type
self.protection_priorities = self._get_contract_type_priorities()
log_info("Starting missing protections analysis",
text_length = len(contract_text),
num_clauses = len(clauses),
contract_type = self.contract_type.value,
)
missing = list()
text_lower = contract_text.lower()
# Check each protection in RiskRules PROTECTION_CHECKLIST
for protection_id, config in self.rules.PROTECTION_CHECKLIST.items():
is_present, found_in_clauses = self._check_protection_comprehensive(protection_id = protection_id,
text_lower = text_lower,
clauses = clauses,
)
if not is_present:
missing_protection = self._create_missing_protection(protection_id = protection_id,
config = config,
found_in_clauses = found_in_clauses,
)
missing.append(missing_protection)
# Prioritize by contract type and risk score
final_missing = self._prioritize_missing_protections(missing_protections = missing)
log_info("Missing protections analysis complete",
total_missing = len(final_missing),
critical = sum(1 for p in final_missing if (p.importance == "critical")),
high = sum(1 for p in final_missing if (p.importance == "high")),
)
return final_missing
def _check_protection_comprehensive(self, protection_id: str, text_lower: str, clauses: List[ExtractedClause]) -> Tuple[bool, List[str]]:
"""
Comprehensive protection detection using multiple methods
Returns:
--------
{ tuple } : (is_present, list of clause references where protection was found)
"""
found_in_clauses = list()
# Enhanced protection patterns with regex for better matching
protection_patterns = self._get_protection_patterns(protection_id = protection_id)
# Check in full text with regex patterns
for pattern in protection_patterns:
if re.search(pattern, text_lower, re.IGNORECASE):
return True, found_in_clauses
# Check in relevant clauses with context awareness
relevant_categories = self.rules.PROTECTION_CHECKLIST[protection_id]["categories"]
relevant_clauses = [c for c in clauses if c.category in relevant_categories]
for clause in relevant_clauses:
clause_text_lower = clause.text.lower()
for pattern in protection_patterns:
if re.search(pattern, clause_text_lower, re.IGNORECASE):
found_in_clauses.append(clause.reference)
return True, found_in_clauses
# Additional semantic checks for complex protections
if self._check_protection_semantic(protection_id=protection_id, text_lower=text_lower, clauses=clauses):
return True, found_in_clauses
return False, found_in_clauses
def _get_protection_patterns(self, protection_id: str) -> List[str]:
"""
Get comprehensive regex patterns for each protection
"""
patterns = {"for_cause_definition" : [r'for\s+cause\s+means', r'cause\s+defined\s+as', r'grounds?\s+for\s+termination', r'termination\s+for\s+cause', r'just\s+cause\s+definition',],
"severance_provision" : [r'severance\s+(pay|compensation|benefits)', r'separation\s+(pay|package|compensation)', r'termination\s+(pay|benefits)', r'upon\s+termination.*pay', r'severance.*equal\s+to',],
"mutual_indemnification" : [r'mutual\s+indemnification', r'each\s+party\s+shall\s+indemnify', r'both\s+parties\s+indemnify', r'reciprocal\s+indemnification', r'indemnification.*mutual',],
"liability_cap" : [r'liability.*cap', r'maximum\s+liability', r'limited\s+to.*\$?\d+', r'not\s+exceed.*\$?\d+', r'liability\s+shall\s+not\s+exceed', r'cap.*liability',],
"prior_ip_exclusion" : [r'prior\s+intellectual\s+property', r'existing\s+ip', r'background\s+ip', r'pre-existing', r'prior\s+inventions', r'personal\s+projects',],
"confidentiality_duration" : [r'confidentiality.*period\s+of', r'for\s+\d+\s+years\s+from', r'confidentiality.*expire', r'confidentiality.*term', r'duration.*confidentiality',],
"dispute_resolution" : [r'arbitration', r'mediation', r'dispute\s+resolution', r'resolution\s+of\s+disputes', r'alternative\s+dispute', r'adr',],
"change_control_process" : [r'change\s+order', r'change\s+request', r'amendment.*writing', r'modification.*writing', r'written\s+consent', r'change\s+control',],
"insurance_requirements" : [r'insurance\s+requirements', r'maintain\s+insurance', r'proof\s+of\s+insurance', r'coverage.*\$?\d+', r'liability\s+insurance',],
"force_majeure" : [ r'force\s+majeure', r'act\s+of\s+god', r'unforeseeable', r'beyond\s+control', r'natural\s+disaster',],
}
return patterns.get(protection_id, [rf'\b{protection_id}\b'])
def _check_protection_semantic(self, protection_id: str, text_lower: str, clauses: List[ExtractedClause]) -> bool:
"""
Semantic checks for complex protections that need context understanding
"""
if (protection_id == "mutual_indemnification"):
# Check if there's any indemnification that's not mutual
has_indemnification = bool(re.search(r'indemnif', text_lower))
has_mutual_language = bool(re.search(r'mutual|each party|both parties', text_lower))
return has_indemnification and has_mutual_language
elif (protection_id == "liability_cap"):
# Check if there's liability language but no cap
has_liability = bool(re.search(r'liability|liable', text_lower))
has_cap = bool(re.search(r'cap|limit|maximum|not exceed', text_lower))
return has_liability and has_cap
elif (protection_id == "prior_ip_exclusion"):
# Check if there's IP assignment but no exclusion
has_ip_assignment = bool(re.search(r'intellectual property|work product|inventions', text_lower))
has_exclusion = bool(re.search(r'prior|existing|background|exclude', text_lower))
return has_ip_assignment and has_exclusion
return False
def _create_missing_protection(self, protection_id: str, config: Dict, found_in_clauses: List[str]) -> MissingProtection:
"""
Create comprehensive MissingProtection object
"""
# Use centralized map for display name
protection_name = self.rules.get_protection_display_name(protection_id)
return MissingProtection(protection_id = protection_id,
protection = protection_name,
importance = config["importance"],
risk_score = config["risk_if_missing"],
explanation = self._get_comprehensive_explanation(protection_id = protection_id),
recommendation = self._get_detailed_recommendation(protection_id = protection_id),
categories = config["categories"],
contract_type = self.contract_type.value,
suggested_language = self._get_suggested_language(protection_id = protection_id),
legal_basis = self._get_legal_basis(protection_id = protection_id),
affected_clauses = found_in_clauses,
)
def _get_comprehensive_explanation(self, protection_id: str) -> str:
"""
Get detailed explanation for why this protection matters
"""
explanations = {"for_cause_definition" : ("Without a clear 'for cause' definition, termination grounds remain ambiguous and subject to interpretation abuse. "
"This creates significant job insecurity and potential for arbitrary termination without proper recourse."
),
"severance_provision" : ("Missing severance provision means zero financial protection if terminated without cause. "
"Industry standards provide 2-3 months salary to support transition and mitigate sudden income loss."
),
"mutual_indemnification" : ("One-sided indemnification creates asymmetric liability exposure. Mutual protection ensures both parties share "
"responsibility for their respective breaches, negligence, or misconduct."
),
"liability_cap" : ("Unlimited liability exposes you to catastrophic financial risk beyond reasonable business expectations. "
"Standard practice caps liability at fees paid or a reasonable multiple of contract value."
),
"prior_ip_exclusion" : ("Without prior IP exclusion, your existing intellectual property and personal projects could be claimed by the other party. "
"This protection preserves ownership of work created before and outside this engagement."
),
"confidentiality_duration" : ("Indefinite confidentiality obligations unreasonably restrict future business activities indefinitely. "
"Industry standards limit confidentiality to 3-5 years post-termination for most information."
),
"dispute_resolution" : ("Without formal dispute resolution, conflicts escalate directly to costly litigation. Mediation and arbitration "
"provide efficient, cost-effective alternatives with specialized expertise."
),
"change_control_process" : ("Lack of change control enables scope creep and verbal modifications that create ambiguity. Formal processes "
"ensure all changes are documented, approved, and properly scoped."
),
"insurance_requirements" : ("Missing insurance requirements leave you exposed to uncovered liabilities. "
"Proper coverage transfers risk and provides financial protection for both parties."
),
"force_majeure" : ("Without force majeure protection, you remain liable for performance during unforeseeable events beyond control. "
"This clause provides reasonable relief during extraordinary circumstances."
),
}
return explanations.get(protection_id, "This protection is critical for balanced risk allocation and legal fairness.")
def _get_detailed_recommendation(self, protection_id: str) -> str:
"""
Get detailed recommendation for adding this protection
"""
recommendations = {"for_cause_definition" : ("Add clear 'For Cause' definition including: gross negligence, willful misconduct, material breach after "
"30-day cure period, conviction of felony, or fraud. Require written notice specifying grounds."
),
"severance_provision" : ("Include severance equal to 2-3 months base salary for termination without cause, payable within 30 days. "
"Add pro-rated bonus calculation and continuation of benefits during severance period."
),
"mutual_indemnification" : ("Replace one-sided language with: 'Each party shall indemnify, defend, and hold harmless the other party "
"from claims arising from their respective breach, negligence, or willful misconduct.'"
),
"liability_cap" : ("Add: 'Total liability of either party under this Agreement shall not exceed the greater of (a) fees paid "
"in the 12 months preceding the claim, or (b) $[reasonable amount]. Exclude liability for indirect damages.'"
),
"prior_ip_exclusion" : ("Include: 'Work Product excludes Employee's prior intellectual property, existing inventions, personal projects "
"unrelated to Company business, and open source contributions. Attach prior IP list as Exhibit A.'"
),
"confidentiality_duration" : ("Specify: 'Confidentiality obligations shall survive termination for 3-5 years. Trade secrets protected "
"indefinitely but must be specifically identified. Publicly available information excluded.'"
),
"dispute_resolution" : ("Add: 'Disputes shall first be subject to 30-day good faith mediation. If unresolved, binding arbitration "
"under [rules] in [neutral location]. Each party bears own costs, arbitrator may award fees to prevailing party.'"
),
"change_control_process" : ("Include: 'All amendments require written change orders signed by both parties. Change orders must specify "
"scope, timeline, cost, and acceptance criteria. Verbal agreements are not binding.'"
),
"insurance_requirements" : ("Specify: 'Contractor shall maintain general liability insurance of $1M per occurrence, professional liability "
"insurance of $2M, and workers' compensation. Provide certificates of insurance before commencement.'"
),
"force_majeure" : ("Add: 'Neither party liable for failure to perform due to causes beyond reasonable control including acts of God, "
"war, strikes, or natural disasters. Performance suspended during event, resume when practicable.'"
),
}
return recommendations.get(protection_id, "Negotiate to include this standard protection for balanced risk allocation.")
def _get_suggested_language(self, protection_id: str) -> str:
"""
Get actual suggested clause language
"""
language_library = {"for_cause_definition" : ("\"For Cause\" means: (a) gross negligence or willful misconduct; (b) material breach of this Agreement after 30-day written notice and cure period; (c) conviction of a felony; or (d) fraud, dishonesty, or embezzlement."),
"severance_provision" : ("Upon termination without cause, Company shall pay Employee severance equal to three months of base salary, payable within 30 days of termination. Employee shall also receive pro-rated annual bonus and continuation of health benefits during severance period."),
"mutual_indemnification" : ("Each party shall indemnify, defend, and hold harmless the other party from and against any and all claims, damages, losses, and expenses arising from the indemnifying party's breach of this Agreement, negligence, or willful misconduct."),
"liability_cap" : ("Notwithstanding anything to the contrary, the total liability of either party under this Agreement shall not exceed the greater of (a) the fees paid by Customer to Provider in the twelve months preceding the claim, or (b) $500,000. Neither party shall be liable for any indirect, special, incidental, or consequential damages."),
"prior_ip_exclusion" : ("Work Product excludes any intellectual property, inventions, or creative works developed by Employee prior to this Agreement or developed outside the scope of employment without using Company resources. Employee has listed prior IP in Exhibit A. Background IP remains the property of its respective owner."),
"confidentiality_duration" : ("The obligations of confidentiality shall survive termination of this Agreement for a period of five years. Trade secrets shall be protected indefinitely. Confidential Information shall not include information that is or becomes publicly available through no fault of Receiving Party."),
"dispute_resolution" : ("Any dispute arising under this Agreement shall first be submitted to mediation with a mutually acceptable mediator. If mediation fails after 30 days, either party may initiate binding arbitration under the rules of the American Arbitration Association. The prevailing party in any dispute shall be entitled to recover reasonable attorneys' fees and costs."),
"change_control_process" : ("No amendment, modification, or waiver of any provision of this Agreement shall be effective unless in writing and signed by both parties. All change requests must be submitted in writing as Change Orders, specifying the changes, associated costs, timeline impacts, and acceptance criteria."),
"insurance_requirements" : ("Contractor shall maintain at its own expense: (a) Commercial General Liability insurance with limits of $1,000,000 per occurrence; (b) Professional Liability insurance with limits of $2,000,000 per claim; and (c) Workers' Compensation insurance as required by law. Certificates of insurance shall be provided to Client upon request."),
"force_majeure" : ("Neither party shall be liable for any failure or delay in performance under this Agreement due to causes beyond its reasonable control, including acts of God, war, terrorism, labor disputes, or governmental actions. The affected party shall notify the other party promptly and resume performance as soon as practicable."),
}
return language_library.get(protection_id, "Standard protection clause appropriate for this contract type.")
def _get_legal_basis(self, protection_id: str) -> str:
"""
Get legal basis for why this protection is important
"""
legal_bases = {"for_cause_definition" : "Employment protection statutes and doctrine of good faith and fair dealing",
"severance_provision" : "Industry standards and reasonable notice requirements",
"mutual_indemnification" : "Principle of mutuality and unconscionability doctrine",
"liability_cap" : "Commercial reasonableness and risk allocation principles",
"prior_ip_exclusion" : "Intellectual property rights and prior ownership protection",
"confidentiality_duration" : "Reasonableness standard for restrictive covenants",
"dispute_resolution" : "Efficient dispute resolution and access to justice",
"change_control_process" : "Contract formation and modification requirements",
"insurance_requirements" : "Risk management and liability transfer principles",
"force_majeure" : "Impossibility of performance and commercial impracticability",
}
return legal_bases.get(protection_id, "Standard contractual protection for balanced risk allocation")
def _prioritize_missing_protections(self, missing_protections: List[MissingProtection]) -> List[MissingProtection]:
"""
Prioritize missing protections by contract type and risk score
"""
if not missing_protections:
return []
# Sort by risk score (descending)
missing_protections.sort(key = lambda p: p.risk_score, reverse = True)
# Boost priority for contract-type specific critical protections
for protection in missing_protections:
# Use the protection_id for the check
if protection.protection_id in self.protection_priorities:
# Boost for contract relevance
protection.risk_score += 10
# Re-sort with boosted scores
missing_protections.sort(key = lambda p: p.risk_score, reverse = True)
# Return top 15 most critical missing protections
top_missing_protections = missing_protections[:15]
return top_missing_protections
def get_critical_missing(self, protections: List[MissingProtection]) -> List[MissingProtection]:
"""
Filter to only critical missing protections
"""
critical = [p for p in protections if (p.importance == "critical")]
log_info(f"Found {len(critical)} critical missing protections")
return critical
def get_by_category(self, protections: List[MissingProtection], category: str) -> List[MissingProtection]:
"""
Filter protections by category
"""
filtered = [p for p in protections if category in p.categories]
log_info(f"Found {len(filtered)} missing protections in category '{category}'")
return filtered
def get_importance_distribution(self, protections: List[MissingProtection]) -> Dict[str, int]:
"""
Get distribution by importance level
"""
distribution = {"critical" : 0,
"high" : 0,
"medium" : 0,
"low" : 0,
}
for protection in protections:
distribution[protection.importance] = distribution.get(protection.importance, 0) + 1
log_info("Missing protections importance distribution", **distribution)
return distribution
def get_risk_score_summary(self, protections: List[MissingProtection]) -> Dict[str, float]:
"""
Get risk score summary statistics
"""
if not protections:
return {"total_risk" : 0,
"average_risk" : 0,
"max_risk" : 0,
}
scores = [p.risk_score for p in protections]
total_risk = sum(scores)
average_risk = total_risk / len(scores)
max_risk = max(scores)
summary = {"total_risk" : round(total_risk, 2),
"average_risk" : round(average_risk, 2),
"max_risk" : round(max_risk, 2),
}
log_info("Missing protections risk score summary", **summary)
return summary
|