File size: 14,455 Bytes
877e000 9860c76 877e000 9860c76 877e000 9860c76 877e000 9860c76 877e000 9860c76 877e000 9860c76 877e000 9860c76 877e000 9860c76 877e000 9860c76 877e000 9860c76 877e000 9860c76 877e000 9860c76 877e000 9860c76 877e000 9860c76 877e000 9860c76 877e000 9860c76 877e000 9860c76 877e000 9860c76 877e000 9860c76 877e000 9860c76 877e000 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 |
# models/legal_analysis.py
import re
from .model_loader import load_model
from .logging_config import logger
from typing import Dict, Any, List, Tuple
def analyze_legal_details(legal_text: str) -> Dict[str, Any]:
"""Analyze legal details of a property with comprehensive validation."""
try:
if not legal_text or len(str(legal_text).strip()) < 5:
return {
'assessment': 'insufficient',
'confidence': 0.1, # Small confidence instead of 0
'summary': 'No legal details provided',
'completeness_score': 5, # Minimum score instead of 0
'potential_issues': False,
'legal_metrics': {
'text_length': 0,
'word_count': 0,
'legal_terms_found': 0
},
'reasoning': 'No legal details provided for analysis',
'top_classifications': [],
'document_verification': {},
'compliance_status': {},
'risk_assessment': {}
}
# Try to load the classifier with fallback
try:
classifier = load_model("zero-shot-classification")
except Exception as e:
logger.error(f"Error loading model in legal analysis: {str(e)}")
# Provide fallback scoring based on text content
legal_text_str = str(legal_text)
legal_terms = ['title', 'deed', 'registration', 'tax', 'permit', 'approval', 'certificate', 'compliance', 'legal']
legal_terms_found = sum(1 for term in legal_terms if term in legal_text_str.lower())
fallback_score = min(50, legal_terms_found * 10) # 10 points per legal term, max 50
return {
'assessment': 'basic',
'confidence': 0.3, # Basic confidence
'summary': f'Model loading error, using fallback analysis. Found {legal_terms_found} legal terms.',
'completeness_score': fallback_score,
'potential_issues': False,
'legal_metrics': {
'text_length': len(legal_text_str),
'word_count': len(legal_text_str.split()),
'legal_terms_found': legal_terms_found
},
'reasoning': f'Model loading error: {str(e)}. Using fallback scoring based on legal terms found.',
'top_classifications': [],
'document_verification': {},
'compliance_status': {},
'risk_assessment': {}
}
# Enhanced legal categories with more specific indicators
categories = [
# Title and Ownership
"clear title documentation",
"title verification documents",
"ownership transfer documents",
"inheritance documents",
"gift deed documents",
"power of attorney documents",
# Property Registration
"property registration documents",
"sale deed documents",
"conveyance deed documents",
"development agreement documents",
"joint development agreement documents",
# Tax and Financial
"property tax records",
"tax clearance certificates",
"encumbrance certificates",
"bank loan documents",
"mortgage documents",
# Approvals and Permits
"building permits",
"construction approvals",
"occupation certificates",
"completion certificates",
"environmental clearances",
# Land and Usage
"land use certificates",
"zoning certificates",
"layout approvals",
"master plan compliance",
"land conversion documents",
# Compliance and Legal
"legal compliance certificates",
"no objection certificates",
"fire safety certificates",
"structural stability certificates",
"water and electricity compliance",
# Disputes and Litigation
"property dispute records",
"litigation history",
"court orders",
"settlement agreements",
"pending legal cases"
]
# Create a more detailed context for analysis
legal_context = f"""
Legal Documentation Analysis:
{legal_text}
Please analyze the above legal documentation for:
1. Completeness of legal information
2. Presence of required documents
3. Compliance with regulations
4. Potential legal issues
5. Risk assessment
"""
# Analyze with the classifier
try:
legal_result = classifier(legal_context[:1000], categories, multi_label=True)
except Exception as e:
logger.error(f"Error in legal classification: {str(e)}")
# Fallback to simple analysis
return simple_legal_analysis(legal_text, categories)
# Calculate legal metrics
legal_metrics = calculate_legal_metrics(legal_result, categories)
# Get top classifications
top_classifications = []
for label, score in zip(legal_result['labels'][:5], legal_result['scores'][:5]):
if score > 0.2: # Lower threshold for legal terms
top_classifications.append({
'classification': label,
'confidence': float(score)
})
# Calculate completeness score
positive_categories = [
"clear title documentation", "property registration documents", "sale deed documents",
"property tax records", "building permits", "occupation certificates",
"legal compliance certificates", "no objection certificates"
]
positive_score = sum(score for label, score in zip(legal_result['labels'], legal_result['scores'])
if label in positive_categories)
completeness_score = min(100, int(positive_score * 100))
# Ensure minimum score for any legal content
if completeness_score < 10 and len(legal_text) > 20:
completeness_score = 10 # Minimum 10% for having some legal content
# Determine assessment
if completeness_score >= 80:
assessment = 'excellent'
confidence = 0.9
elif completeness_score >= 60:
assessment = 'good'
confidence = 0.7
elif completeness_score >= 40:
assessment = 'adequate'
confidence = 0.5
elif completeness_score >= 20:
assessment = 'basic'
confidence = 0.3
else:
assessment = 'basic'
confidence = 0.2
# Generate summary
summary = summarize_text(legal_text)
return {
'assessment': assessment,
'confidence': confidence,
'summary': summary,
'completeness_score': completeness_score,
'potential_issues': legal_metrics.get('potential_issues', False),
'legal_metrics': legal_metrics,
'reasoning': f'Legal analysis completed with {completeness_score}% completeness score.',
'top_classifications': top_classifications,
'document_verification': {
'title_docs': legal_metrics.get('title_docs', 0),
'registration_docs': legal_metrics.get('registration_docs', 0),
'tax_docs': legal_metrics.get('tax_docs', 0),
'approval_docs': legal_metrics.get('approval_docs', 0)
},
'compliance_status': {
'overall_compliance': legal_metrics.get('compliance_score', 0),
'missing_documents': legal_metrics.get('missing_docs', [])
},
'risk_assessment': {
'risk_level': legal_metrics.get('risk_level', 'low'),
'risk_factors': legal_metrics.get('risk_factors', [])
}
}
except Exception as e:
logger.error(f"Error in legal analysis: {str(e)}")
# Return reasonable fallback instead of complete failure
return {
'assessment': 'basic',
'confidence': 0.2,
'summary': 'Legal analysis failed due to technical error',
'completeness_score': 10, # Minimum score instead of 0
'potential_issues': False,
'legal_metrics': {
'text_length': len(str(legal_text)) if legal_text else 0,
'word_count': len(str(legal_text).split()) if legal_text else 0,
'legal_terms_found': 0
},
'reasoning': f'Legal analysis error: {str(e)}. Using fallback scoring.',
'top_classifications': [],
'document_verification': {},
'compliance_status': {},
'risk_assessment': {}
}
def calculate_legal_metrics(legal_result, categories):
"""Calculate legal metrics from classification results."""
try:
if not isinstance(legal_result, dict) or 'scores' not in legal_result:
# Return default metrics for fallback
return {
'title_and_ownership': 0.5,
'property_registration': 0.5,
'tax_and_financial': 0.5,
'approvals_and_permits': 0.5,
'land_and_usage': 0.5,
'compliance_and_legal': 0.5,
'disputes_and_litigation': 0.1
}
scores = legal_result.get('scores', [])
labels = legal_result.get('labels', [])
# Create a mapping of labels to scores
label_scores = dict(zip(labels, scores))
return {
'title_and_ownership': sum(label_scores.get(label, 0) for label in
['clear title documentation', 'title verification documents',
'ownership transfer documents', 'inheritance documents']) / 4,
'property_registration': sum(label_scores.get(label, 0) for label in
['property registration documents', 'sale deed documents',
'conveyance deed documents', 'development agreement documents']) / 4,
'tax_and_financial': sum(label_scores.get(label, 0) for label in
['property tax records', 'tax clearance certificates',
'encumbrance certificates', 'bank loan documents']) / 4,
'approvals_and_permits': sum(label_scores.get(label, 0) for label in
['building permits', 'construction approvals',
'occupation certificates', 'completion certificates']) / 4,
'land_and_usage': sum(label_scores.get(label, 0) for label in
['land use certificates', 'zoning certificates',
'layout approvals', 'master plan compliance']) / 4,
'compliance_and_legal': sum(label_scores.get(label, 0) for label in
['legal compliance certificates', 'no objection certificates',
'fire safety certificates', 'structural stability certificates']) / 4,
'disputes_and_litigation': sum(label_scores.get(label, 0) for label in
['property dispute records', 'litigation history',
'court orders', 'pending legal cases']) / 4
}
except Exception as e:
logger.error(f"Error calculating legal metrics: {str(e)}")
return {
'title_and_ownership': 0.5,
'property_registration': 0.5,
'tax_and_financial': 0.5,
'approvals_and_permits': 0.5,
'land_and_usage': 0.5,
'compliance_and_legal': 0.5,
'disputes_and_litigation': 0.1
}
def simple_legal_analysis(legal_text, categories):
"""Simple keyword-based legal analysis fallback."""
text_lower = legal_text.lower()
# Define keywords for each category
category_keywords = {
"clear title documentation": ["title", "clear", "documentation", "ownership"],
"property registration documents": ["registration", "property", "documents", "registered"],
"property tax records": ["tax", "property", "records", "assessment"],
"building permits": ["permit", "building", "construction", "approval"],
"legal compliance certificates": ["compliance", "legal", "certificate", "approved"],
"property dispute records": ["dispute", "litigation", "court", "case"],
"legitimate listing": ["real", "genuine", "authentic", "verified"]
}
scores = []
for category in categories:
keywords = category_keywords.get(category, [category.split()[0]]) # Use first word as fallback
score = sum(1 for keyword in keywords if keyword in text_lower) / len(keywords) if keywords else 0.1
scores.append(min(1.0, score))
return {
"labels": categories,
"scores": scores
}
def summarize_text(text):
"""Generate summary using model or fallback."""
try:
summarizer = load_model("summarization")
if hasattr(summarizer, 'task_type') and summarizer.task_type == "summarization":
# Using fallback summarizer
result = summarizer(text)
return result[0]['summary_text'] if result else text[:200] + "..."
else:
# Using actual model
result = summarizer(text, max_length=130, min_length=30, do_sample=False)
return result[0]['summary_text']
except Exception as e:
logger.warning(f"Model generation failed, using static summary: {str(e)}")
# Simple extractive summarization
sentences = text.split('.')
if len(sentences) > 3:
return '. '.join(sentences[:2]) + '.'
else:
return text[:200] + '...' if len(text) > 200 else text
|