contract-guard-ai / services /term_analyzer.py
satyakimitra's picture
Final Repor Updated
522f7a0
# DEPENDENCIES
import re
import sys
from typing import List
from typing import Dict
from typing import Tuple
from pathlib import Path
from typing import Optional
from collections import Counter
# Add parent directory to path for imports
sys.path.append(str(Path(__file__).parent.parent))
from utils.logger import log_info
from utils.logger import log_error
from config.risk_rules import RiskRules
from config.risk_rules import ContractType
from utils.logger import ContractAnalyzerLogger
from services.data_models import ExtractedClause
from services.data_models import UnfavorableTerm
class TermAnalyzer:
"""
Detect unfavorable and one-sided terms in contracts using RiskRules framework and integrated with comprehensive risk analysis system
"""
def __init__(self, contract_type: ContractType = ContractType.GENERAL):
"""
Initialize term analyzer with contract-type specific risk rules
Arguments:
----------
contract_type { ContractType } : Contract type for risk rule adjustments
"""
self.contract_type = contract_type
self.risk_rules = RiskRules()
self.logger = ContractAnalyzerLogger.get_logger()
# Contract-type specific weights
self.category_weights = self.risk_rules.get_adjusted_weights(contract_type)
log_info("TermAnalyzer initialized",
contract_type = contract_type.value,
category_weights = self.category_weights,
)
def _map_to_risk_category(self, clause_category: str) -> str:
"""
Map clause category to risk category for proper risk scoring for ensureing unfavorable terms are correctly attributed to risk categories
for score calculation
"""
# Clause categories → Risk categories
mapping = {"non_compete" : "restrictive_covenants",
"confidentiality" : "restrictive_covenants",
"termination" : "termination_rights",
"indemnification" : "liability_indemnity",
"liability" : "penalties_liability",
"compensation" : "compensation_benefits",
"intellectual_property" : "intellectual_property",
"warranty" : "warranties",
"dispute_resolution" : "dispute_resolution",
"assignment" : "assignment_change",
"amendment" : "assignment_change",
"insurance" : "insurance",
"force_majeure" : "force_majeure",
"general" : "general",
"payment" : "payment_terms",
"governing_law" : "governing_law",
}
risk_category_by_clause_category = mapping.get(clause_category, clause_category)
return risk_category_by_clause_category
@ContractAnalyzerLogger.log_execution_time("analyze_unfavorable_terms")
def analyze_unfavorable_terms(self, contract_text: str, clauses: List[ExtractedClause], contract_type: Optional[ContractType] = None) -> List[UnfavorableTerm]:
"""
Detect all unfavorable terms using RiskRules framework
Arguments:
----------
contract_text { str } : Full contract text
clauses { list } : Extracted clauses
contract_type { ContractType } : Override contract type
Returns:
--------
{ list } : List of UnfavorableTerm objects
"""
# Update contract type if provided
if contract_type:
self.contract_type = contract_type
self.category_weights = self.risk_rules.get_adjusted_weights(contract_type)
log_info("Starting unfavorable terms analysis",
text_length = len(contract_text),
num_clauses = len(clauses),
contract_type = self.contract_type.value,
)
unfavorable_terms = list()
# Clause-level analysis using RiskRules patterns
for clause in clauses:
terms = self._analyze_clause_with_risk_rules(clause = clause)
unfavorable_terms.extend(terms)
# Cross-clause analysis for systemic issues
cross_clause_terms = self._analyze_cross_clause_issues(text = contract_text,
clauses = clauses,
)
unfavorable_terms.extend(cross_clause_terms)
# PHASE 3: Missing protections analysis
missing_protections = self._analyze_missing_protections(clauses = clauses)
unfavorable_terms.extend(missing_protections)
# PHASE 4: Industry benchmark analysis
benchmark_issues = self._analyze_against_benchmarks(clauses = clauses)
unfavorable_terms.extend(benchmark_issues)
# Deduplicate and prioritize by risk
final_terms = self._deduplicate_and_prioritize(terms = unfavorable_terms)
log_info("Unfavorable terms analysis complete",
total_found = len(final_terms),
critical = sum(1 for t in final_terms if (t.severity == "critical")),
high = sum(1 for t in final_terms if (t.severity == "high")))
return final_terms
def _analyze_clause_with_risk_rules(self, clause: ExtractedClause) -> List[UnfavorableTerm]:
"""
Analyze clause using comprehensive RiskRules framework
"""
terms = list()
text_lower = clause.text.lower()
# Map clause category to risk category for consistency
risk_category = self._map_to_risk_category(clause_category = clause.category)
# Risky Patterns Analysis from RiskRules
for pattern, risk_score, description in self.risk_rules.RISKY_PATTERNS:
matches = re.finditer(pattern, text_lower, re.IGNORECASE)
for match in matches:
severity = self._score_to_severity(risk_score)
terms.append(UnfavorableTerm(term = description,
category = risk_category,
severity = severity,
explanation = self._generate_pattern_explanation(description, match.group()),
risk_score = risk_score,
clause_reference = clause.reference,
suggested_fix = self._generate_pattern_fix(description, clause.category),
contract_type = self.contract_type.value,
specific_text = match.group(),
legal_basis = self._get_legal_basis(description),
)
)
# Critical Keyword Analysis from RiskRules
for keyword, risk_score in self.risk_rules.CRITICAL_KEYWORDS.items():
if re.search(rf'\b{re.escape(keyword)}\b', text_lower):
severity = self._score_to_severity(risk_score)
terms.append(UnfavorableTerm(term = f"Critical Risk: {keyword.title()}",
category = risk_category,
severity = severity,
explanation = self._generate_keyword_explanation(keyword, clause.category),
risk_score = risk_score,
clause_reference = clause.reference,
suggested_fix = self._generate_keyword_fix(keyword, clause.category),
contract_type = self.contract_type.value,
specific_text = keyword,
legal_basis = self._get_legal_basis(keyword),
)
)
# High Risk Keyword Analysis
for keyword, risk_score in self.risk_rules.HIGH_RISK_KEYWORDS.items():
if re.search(rf'\b{re.escape(keyword)}\b', text_lower):
severity = self._score_to_severity(risk_score)
terms.append(UnfavorableTerm(term = f"High Risk: {keyword.title()}",
category = risk_category,
severity = severity,
explanation = self._generate_keyword_explanation(keyword, clause.category),
risk_score = risk_score,
clause_reference = clause.reference,
suggested_fix = self._generate_keyword_fix(keyword, clause.category),
contract_type = self.contract_type.value,
specific_text = keyword,
legal_basis = self._get_legal_basis(keyword),
)
)
# Clause-specific Risk Factors From RiskRules.CLAUSE_RISK_FACTORS
clause_risk_analysis = self._analyze_clause_risk_factors(clause)
terms.extend(clause_risk_analysis)
return terms
def _analyze_clause_risk_factors(self, clause: ExtractedClause) -> List[UnfavorableTerm]:
"""
Analyze clause using CLAUSE_RISK_FACTORS from RiskRules
"""
terms = list()
# Map clause categories to risk factors
category_mapping = {'non_compete' : 'restrictive_covenants',
'termination' : 'termination_rights',
'indemnification' : 'liability_indemnity',
'compensation' : 'compensation_benefits',
'intellectual_property' : 'intellectual_property',
'confidentiality' : 'confidentiality',
'liability' : 'penalties_liability',
'warranty' : 'warranties',
'dispute_resolution' : 'dispute_resolution',
'assignment' : 'assignment_change',
'insurance' : 'insurance',
'force_majeure' : 'force_majeure',
}
risk_factors_key = category_mapping.get(clause.category)
if not risk_factors_key or risk_factors_key not in self.risk_rules.CLAUSE_RISK_FACTORS:
return terms
risk_factors = self.risk_rules.CLAUSE_RISK_FACTORS[risk_factors_key]
text_lower = clause.text.lower()
# Map clause category to risk category for consistency
risk_category = self._map_to_risk_category(clause_category = clause.category)
# Check for red flags in this clause
for red_flag, risk_adjustment in risk_factors["red_flags"].items():
if (red_flag in text_lower):
base_risk = risk_factors["base_risk"]
total_risk = base_risk + risk_adjustment
severity = self._score_to_severity(total_risk)
terms.append(UnfavorableTerm(term = f"Risk Factor: {red_flag.replace('_', ' ').title()}",
category = risk_category,
severity = severity,
explanation = f"Base risk {base_risk} + {risk_adjustment} for '{red_flag}'. {self._get_risk_factor_explanation(risk_factors_key, red_flag)}",
risk_score = total_risk,
clause_reference = clause.reference,
suggested_fix = self._get_risk_factor_fix(risk_factors_key, red_flag),
contract_type = self.contract_type.value,
specific_text = red_flag,
legal_basis = self._get_legal_basis(red_flag)
)
)
return terms
def _analyze_cross_clause_issues(self, text: str, clauses: List[ExtractedClause]) -> List[UnfavorableTerm]:
"""
Detect systemic issues spanning multiple clauses
"""
terms = list()
# Notice period imbalance (from your original but enhanced)
notice_imbalance = self._check_notice_imbalance(clauses = clauses)
if notice_imbalance:
# Ensure the category used is a risk category
notice_imbalance.category = self._map_to_risk_category(clause_category = "termination")
terms.append(notice_imbalance)
# Missing reciprocal provisions
missing_reciprocal = self._check_missing_reciprocal(text = text,
clauses = clauses,
)
for item in missing_reciprocal:
# Ensure the category used is a risk category
item.category = self._map_to_risk_category(clause_category = "indemnification")
terms.extend(missing_reciprocal)
# Conflicting clauses
conflicts = self._check_conflicting_clauses(clauses = clauses)
for item in conflicts:
# Ensure the category used is a risk category
item.category = self._map_to_risk_category(clause_category = item.category)
terms.extend(conflicts)
# One-sided discretionary powers
one_sided_powers = self._check_one_sided_discretion(clauses = clauses)
for item in one_sided_powers:
# Ensure the category used is a risk category
item.category = self._map_to_risk_category(clause_category = item.category)
terms.extend(one_sided_powers)
return terms
def _analyze_missing_protections(self, clauses: List[ExtractedClause]) -> List[UnfavorableTerm]:
"""
Analyze missing critical protections using PROTECTION_CHECKLIST
"""
terms = list()
for protection, config in self.risk_rules.PROTECTION_CHECKLIST.items():
if not self._has_protection(clauses, protection, config['categories']):
# For missing protections, map the first associated category to a risk category
# This assumes config['categories'][0] is a clause category like "termination"
risk_category = self._map_to_risk_category(clause_category = config['categories'][0]) if config['categories'] else "general"
terms.append(UnfavorableTerm(term = f"Missing Protection: {protection.replace('_', ' ').title()}",
category = risk_category,
severity = self._score_to_severity(config['risk_if_missing']),
explanation = f"Missing critical protection: {protection}. {self._get_missing_protection_explanation(protection)}",
risk_score = config['risk_if_missing'],
suggested_fix = self._get_missing_protection_fix(protection),
contract_type = self.contract_type.value,
legal_basis = f"Standard protection in {self.contract_type.value} contracts",
)
)
return terms
def _analyze_against_benchmarks(self, clauses: List[ExtractedClause]) -> List[UnfavorableTerm]:
"""
Compare terms against industry benchmarks
"""
terms = list()
for clause in clauses:
benchmark_issues = self._check_benchmark_compliance(clause = clause)
for item in benchmark_issues:
# Ensure the category used is a risk category
item.category = self._map_to_risk_category(clause_category = clause.category)
terms.extend(benchmark_issues)
return terms
def _check_notice_imbalance(self, clauses: List[ExtractedClause]) -> Optional[UnfavorableTerm]:
"""
Enhanced notice period imbalance detection
"""
term_clauses = [c for c in clauses if (c.category == "termination")]
if not term_clauses:
return None
text = " ".join([c.text for c in term_clauses])
# Pattern matching for notice periods
notice_patterns = [r'(\d+)\s*days?\s*notice',
r'notice\s*of\s*(\d+)\s*days',
r'(\d+)\s*days?\s*prior\s*notice',
r'written\s*notice\s*of\s*(\d+)\s*days',
]
all_periods = list()
for pattern in notice_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
all_periods.extend([int(m) for m in matches])
if (len(all_periods) >= 2):
min_period = min(all_periods)
max_period = max(all_periods)
ratio = max_period / min_period
if (ratio >= 2):
severity = "critical" if (ratio >= 3) else "high"
risk_score = 80 if (ratio >= 3) else 60
# Use the risk category mapping for termination
risk_category = self._map_to_risk_category(clause_category = "termination")
return UnfavorableTerm(term = "Imbalanced Notice Periods",
category = risk_category,
severity = severity,
explanation = f"Significant notice period imbalance: {max_period} days vs {min_period} days (ratio: {ratio:.1f}x). Creates unfair burden.",
risk_score = risk_score,
clause_reference = term_clauses[0].reference,
suggested_fix = f"Equalize notice periods to reasonable duration (e.g., 30 days mutual notice).",
contract_type = self.contract_type.value,
benchmark_info = f"Industry standard: Mutual 30-day notice periods",
)
return None
def _check_missing_reciprocal(self, text: str, clauses: List[ExtractedClause]) -> List[UnfavorableTerm]:
"""
Enhanced reciprocal provision analysis
"""
terms = list()
# Check indemnification reciprocity
indem_clauses = [c for c in clauses if (c.category == "indemnification")]
if indem_clauses:
has_one_sided = any(re.search(r'(you|employee|consultant|contractor)\s+shall\s+indemnify', c.text, re.IGNORECASE) for c in indem_clauses)
has_mutual = any("mutual" in c.text.lower() or "both parties" in c.text.lower() or "each party" in c.text.lower() for c in indem_clauses)
if has_one_sided and not has_mutual:
# Use the risk category mapping for indemnification
risk_category = self._map_to_risk_category(clause_category = "indemnification")
terms.append(UnfavorableTerm(term = "Non-Reciprocal Indemnification",
category = risk_category,
severity = "critical",
explanation = "One-sided indemnification creates unlimited liability exposure without reciprocal protection.",
risk_score = 85,
clause_reference = indem_clauses[0].reference,
suggested_fix = "Change to mutual indemnification: 'Each party shall indemnify the other for losses arising from their respective breach or negligence.'",
contract_type = self.contract_type.value,
legal_basis = "Mutuality of obligation principle",
)
)
return terms
def _check_conflicting_clauses(self, clauses: List[ExtractedClause]) -> List[UnfavorableTerm]:
"""
Detect conflicting clauses
"""
terms = list()
# Group clauses by category for conflict analysis
by_category = dict()
for clause in clauses:
# Map the clause category to the risk category for grouping purposes
risk_cat = self._map_to_risk_category(clause_category = clause.category)
if risk_cat not in by_category:
by_category[risk_cat] = []
by_category[risk_cat].append(clause)
# Check for conflicts within each category
for risk_category, category_clauses in by_category.items():
if (len(category_clauses) >= 2):
for i, clause1 in enumerate(category_clauses):
for clause2 in category_clauses[i+1:]:
if (self._are_clauses_conflicting(clause1, clause2)):
terms.append(UnfavorableTerm(term = f"Conflicting {risk_category.title()} Clauses",
category = risk_category,
severity = "high",
explanation = f"Clauses {clause1.reference} and {clause2.reference} contain conflicting terms creating legal ambiguity.",
risk_score = 70,
clause_reference = f"{clause1.reference}, {clause2.reference}",
suggested_fix = "Consolidate into single consistent clause or clarify precedence.",
contract_type = self.contract_type.value,
)
)
return terms
def _check_one_sided_discretion(self, clauses: List[ExtractedClause]) -> List[UnfavorableTerm]:
"""
Check for one-sided discretionary powers
"""
terms = list()
for clause in clauses:
text_lower = clause.text.lower()
# Look for one-sided discretionary language
if re.search(r'(sole|absolute|unfettered|unilateral)\s+(discretion|right|authority)', text_lower):
if not re.search(r'(mutual|both parties|reasonable)\s+(discretion|agreement)', text_lower):
# Use the risk category mapping for the clause's category
risk_category = self._map_to_risk_category(clause_category = clause.category)
terms.append(UnfavorableTerm(term = "One-Sided Discretionary Power",
category = risk_category,
severity = "high",
explanation = "Gives one party unilateral decision-making power without accountability standards.",
risk_score = 75,
clause_reference = clause.reference,
suggested_fix = "Change to 'reasonable discretion' or require 'mutual agreement'.",
contract_type = self.contract_type.value,
legal_basis = "Doctrine of good faith and fair dealing",
)
)
return terms
def _check_benchmark_compliance(self, clause: ExtractedClause) -> List[UnfavorableTerm]:
"""
Check clause against industry benchmarks
"""
terms = list()
# Non-compete duration benchmark
if (clause.category == "non_compete"):
duration_match = re.search(r'(\d+)\s*(month|year)', clause.text.lower())
if duration_match:
duration = int(duration_match.group(1))
unit = duration_match.group(2)
# Convert to months for comparison
total_months = duration * (12 if (unit == "year") else 1)
benchmarks = self.risk_rules.INDUSTRY_BENCHMARKS.get('non_compete_duration', {})
industry_benchmark = benchmarks.get(self.contract_type.value, benchmarks.get('general', {}))
if industry_benchmark:
reasonable = industry_benchmark.get('reasonable', 12)
excessive = industry_benchmark.get('excessive', 24)
if (total_months > excessive):
# Use the risk category mapping for non_compete
risk_category = self._map_to_risk_category(clause_category = clause.category)
terms.append(UnfavorableTerm(term = "Excessive Non-Compete Duration",
category = risk_category,
severity = "critical",
explanation = f"{duration} {unit} non-compete exceeds industry excessive threshold of {excessive} months.",
risk_score = 90,
clause_reference = clause.reference,
suggested_fix = f"Reduce to {reasonable} months maximum.",
contract_type = self.contract_type.value,
benchmark_info = f"Industry standard: {reasonable} months reasonable, {excessive} months excessive",
)
)
return terms
def _has_protection(self, clauses: List[ExtractedClause], protection: str, categories: List[str]) -> bool:
"""
Check if protection exists in clauses
"""
protection_patterns = {'for_cause_definition' : ['for cause', 'cause defined', 'termination for cause', 'just cause'],
'severance_provision' : ['severance', 'severance pay', 'termination benefits', 'separation pay'],
'mutual_indemnification' : ['mutual indemnification', 'both parties indemnify', 'each party shall indemnify'],
'liability_cap' : ['liability cap', 'limited liability', 'maximum liability', 'cap on damages'],
'prior_ip_exclusion' : ['prior inventions', 'pre-existing ip', 'prior intellectual property', 'background ip'],
'confidentiality_duration' : ['confidentiality period', 'duration of confidentiality', 'term of confidentiality'],
'dispute_resolution' : ['dispute resolution', 'arbitration', 'mediation', 'alternative dispute resolution'],
'change_control_process' : ['change control', 'amendment process', 'modification procedure', 'change order'],
'insurance_requirements' : ['insurance requirements', 'maintain insurance', 'proof of insurance'],
'force_majeure' : ['force majeure', 'act of god', 'unforeseeable circumstances'],
}
patterns = protection_patterns.get(protection, [])
relevant_clauses = [c for c in clauses if not categories or c.category in categories]
for clause in relevant_clauses:
text_lower = clause.text.lower()
if any(pattern in text_lower for pattern in patterns):
return True
return False
# HELPER METHODS FOR EXPLANATIONS AND FIXES
def _score_to_severity(self, score: float) -> str:
"""
Convert risk score to severity level
"""
if (score >= 80):
return "critical"
elif (score >= 60):
return "high"
elif (score >= 40):
return "medium"
else:
return "low"
def _generate_pattern_explanation(self, pattern_desc: str, matched_text: str) -> str:
"""
Generate explanation for pattern matches
"""
explanations = {"Long duration restrictive covenant" : f"Overly long restrictive period found: '{matched_text}'. May unreasonably restrict future employment.",
"Overly broad geographic/industry scope" : f"Excessively broad scope: '{matched_text}'. Could prevent working in entire industries or regions.",
"Unequal notice periods" : f"Imbalanced notice requirements: '{matched_text}'. Creates unfair advantage for one party.",
"Unlimited liability exposure" : f"Uncapped liability: '{matched_text}'. Exposes to potentially catastrophic financial risk.",
}
return explanations.get(pattern_desc, f"Risk pattern detected: {pattern_desc}")
def _generate_pattern_fix(self, pattern_desc: str, category: str) -> str:
"""
Generate fix suggestions for patterns
"""
fixes = {"Long duration restrictive covenant" : "Limit to 6-12 months maximum with reasonable geographic scope.",
"Overly broad geographic/industry scope" : "Narrow to specific competitors and reasonable geographic area.",
"Unequal notice periods" : "Equalize notice periods for both parties (e.g., 30 days mutual notice).",
"Unlimited liability exposure" : "Add mutual liability cap (e.g., fees paid in preceding 12 months).",
}
return fixes.get(pattern_desc, "Review and modify to reasonable industry standards.")
def _generate_keyword_explanation(self, keyword: str, category: str) -> str:
"""
Generate explanations for keyword risks
"""
explanations = {"non-compete" : "Restrictive covenant limiting future employment opportunities.",
"unlimited liability" : "No cap on financial exposure - potentially catastrophic risk.",
"sole discretion" : "Unilateral decision-making power without accountability.",
"at-will" : "Termination without cause or protection - high job insecurity."
}
return explanations.get(keyword, f"High-risk term '{keyword}' detected in {category} clause.")
def _generate_keyword_fix(self, keyword: str, category: str) -> str:
"""
Generate fixes for keyword risks
"""
fixes = {"non-compete" : "Limit duration to 12 months maximum and narrow geographic scope.",
"unlimited liability" : "Add mutual liability cap based on contract value.",
"sole discretion" : "Change to 'reasonable discretion' or require 'mutual agreement'.",
"at-will" : "Add 'for cause' definition and reasonable notice period.",
}
return fixes.get(keyword, "Modify to reasonable industry standards.")
def _get_legal_basis(self, issue: str) -> str:
"""
Get legal basis for risk issue
"""
legal_bases = {"non-compete" : "Reasonableness standard for restrictive covenants",
"unlimited liability" : "Unconscionability doctrine",
"sole discretion" : "Doctrine of good faith and fair dealing",
"at-will" : "Employment protection statutes",
"unequal notice" : "Mutuality of obligation principle",
}
return legal_bases.get(issue, "General contract law principles")
def _get_risk_factor_explanation(self, risk_category: str, red_flag: str) -> str:
"""
Get explanation for risk factor red flags
"""
explanations = {"restrictive_covenants": {"entire industry" : "Prohibits working in entire industry, not just direct competitors",
"worldwide" : "Geographic scope is unreasonably broad",
}
}
return explanations.get(risk_category, {}).get(red_flag, "Increases risk exposure")
def _get_risk_factor_fix(self, risk_category: str, red_flag: str) -> str:
"""
Get fix for risk factor issues
"""
fixes = {"restrictive_covenants": {"entire industry" : "Limit to direct competitors only",
"worldwide" : "Narrow to specific geographic regions",
}
}
return fixes.get(risk_category, {}).get(red_flag, "Modify to reasonable standards")
def _get_missing_protection_explanation(self, protection: str) -> str:
"""
Get explanation for missing protections
"""
explanations = {"liability_cap" : "No limit on potential financial damages",
"mutual_indemnification" : "One-sided liability protection",
"prior_ip_exclusion" : "Could claim ownership of your existing work",
}
return explanations.get(protection, "Critical protection missing from contract")
def _get_missing_protection_fix(self, protection: str) -> str:
"""
Get fix for missing protections
"""
fixes = {"liability_cap" : "Add mutual liability cap clause",
"mutual_indemnification" : "Add reciprocal indemnification",
"prior_ip_exclusion" : "Add prior IP exclusion clause",
}
return fixes.get(protection, "Add appropriate protection clause")
def _are_clauses_conflicting(self, clause1: ExtractedClause, clause2: ExtractedClause) -> bool:
"""
Conflict detection between clauses
"""
# Extract key numbers and terms
nums1 = set(re.findall(r'\b\d+\b', clause1.text))
nums2 = set(re.findall(r'\b\d+\b', clause2.text))
# If both have numbers but no overlap, potential conflict
if nums1 and nums2 and not nums1.intersection(nums2):
return True
# Check for contradictory language
contradictions = [("shall", "shall not"),
("must", "may not"),
("required", "prohibited"),
]
for positive, negative in contradictions:
if (positive in clause1.text.lower() and negative in clause2.text.lower()) or (positive in clause2.text.lower() and negative in clause1.text.lower()):
return True
return False
def _deduplicate_and_prioritize(self, terms: List[UnfavorableTerm]) -> List[UnfavorableTerm]:
"""
Remove duplicates and sort by risk score
"""
seen = set()
unique_terms = list()
for term in terms:
# Create unique key based on term, category, and specific text
key = (term.term, term.category, term.specific_text)
if key not in seen:
seen.add(key)
unique_terms.append(term)
# Sort by risk score (descending)
unique_terms.sort(key = lambda t: t.risk_score, reverse = True)
# Return top 25 most critical terms
return unique_terms[:25]
def get_severity_distribution(self, terms: List[UnfavorableTerm]) -> Dict[str, int]:
"""
Get distribution by severity
"""
distribution = {"critical" : 0,
"high" : 0,
"medium" : 0,
"low" : 0,
}
for term in terms:
distribution[term.severity] = distribution.get(term.severity, 0) + 1
log_info("Unfavorable terms severity distribution", **distribution)
return distribution
def get_category_distribution(self, terms: List[UnfavorableTerm]) -> Dict[str, int]:
"""
Get distribution by category
"""
categories = [t.category for t in terms]
distribution = dict(Counter(categories))
log_info("Unfavorable terms category distribution", **distribution)
return distribution