# models/legal_analysis.py import re from .model_loader import load_model from .logging_config import logger from typing import Dict, Any, List, Tuple def analyze_legal_details(legal_text: str) -> Dict[str, Any]: """Analyze legal details of a property with comprehensive validation.""" try: if not legal_text or len(str(legal_text).strip()) < 5: return { 'assessment': 'insufficient', 'confidence': 0.1, # Small confidence instead of 0 'summary': 'No legal details provided', 'completeness_score': 5, # Minimum score instead of 0 'potential_issues': False, 'legal_metrics': { 'text_length': 0, 'word_count': 0, 'legal_terms_found': 0 }, 'reasoning': 'No legal details provided for analysis', 'top_classifications': [], 'document_verification': {}, 'compliance_status': {}, 'risk_assessment': {} } # Try to load the classifier with fallback try: classifier = load_model("zero-shot-classification") except Exception as e: logger.error(f"Error loading model in legal analysis: {str(e)}") # Provide fallback scoring based on text content legal_text_str = str(legal_text) legal_terms = ['title', 'deed', 'registration', 'tax', 'permit', 'approval', 'certificate', 'compliance', 'legal'] legal_terms_found = sum(1 for term in legal_terms if term in legal_text_str.lower()) fallback_score = min(50, legal_terms_found * 10) # 10 points per legal term, max 50 return { 'assessment': 'basic', 'confidence': 0.3, # Basic confidence 'summary': f'Model loading error, using fallback analysis. Found {legal_terms_found} legal terms.', 'completeness_score': fallback_score, 'potential_issues': False, 'legal_metrics': { 'text_length': len(legal_text_str), 'word_count': len(legal_text_str.split()), 'legal_terms_found': legal_terms_found }, 'reasoning': f'Model loading error: {str(e)}. Using fallback scoring based on legal terms found.', 'top_classifications': [], 'document_verification': {}, 'compliance_status': {}, 'risk_assessment': {} } # Enhanced legal categories with more specific indicators categories = [ # Title and Ownership "clear title documentation", "title verification documents", "ownership transfer documents", "inheritance documents", "gift deed documents", "power of attorney documents", # Property Registration "property registration documents", "sale deed documents", "conveyance deed documents", "development agreement documents", "joint development agreement documents", # Tax and Financial "property tax records", "tax clearance certificates", "encumbrance certificates", "bank loan documents", "mortgage documents", # Approvals and Permits "building permits", "construction approvals", "occupation certificates", "completion certificates", "environmental clearances", # Land and Usage "land use certificates", "zoning certificates", "layout approvals", "master plan compliance", "land conversion documents", # Compliance and Legal "legal compliance certificates", "no objection certificates", "fire safety certificates", "structural stability certificates", "water and electricity compliance", # Disputes and Litigation "property dispute records", "litigation history", "court orders", "settlement agreements", "pending legal cases" ] # Create a more detailed context for analysis legal_context = f""" Legal Documentation Analysis: {legal_text} Please analyze the above legal documentation for: 1. Completeness of legal information 2. Presence of required documents 3. Compliance with regulations 4. Potential legal issues 5. Risk assessment """ # Analyze with the classifier try: legal_result = classifier(legal_context[:1000], categories, multi_label=True) except Exception as e: logger.error(f"Error in legal classification: {str(e)}") # Fallback to simple analysis return simple_legal_analysis(legal_text, categories) # Calculate legal metrics legal_metrics = calculate_legal_metrics(legal_result, categories) # Get top classifications top_classifications = [] for label, score in zip(legal_result['labels'][:5], legal_result['scores'][:5]): if score > 0.2: # Lower threshold for legal terms top_classifications.append({ 'classification': label, 'confidence': float(score) }) # Calculate completeness score positive_categories = [ "clear title documentation", "property registration documents", "sale deed documents", "property tax records", "building permits", "occupation certificates", "legal compliance certificates", "no objection certificates" ] positive_score = sum(score for label, score in zip(legal_result['labels'], legal_result['scores']) if label in positive_categories) completeness_score = min(100, int(positive_score * 100)) # Ensure minimum score for any legal content if completeness_score < 10 and len(legal_text) > 20: completeness_score = 10 # Minimum 10% for having some legal content # Determine assessment if completeness_score >= 80: assessment = 'excellent' confidence = 0.9 elif completeness_score >= 60: assessment = 'good' confidence = 0.7 elif completeness_score >= 40: assessment = 'adequate' confidence = 0.5 elif completeness_score >= 20: assessment = 'basic' confidence = 0.3 else: assessment = 'basic' confidence = 0.2 # Generate summary summary = summarize_text(legal_text) return { 'assessment': assessment, 'confidence': confidence, 'summary': summary, 'completeness_score': completeness_score, 'potential_issues': legal_metrics.get('potential_issues', False), 'legal_metrics': legal_metrics, 'reasoning': f'Legal analysis completed with {completeness_score}% completeness score.', 'top_classifications': top_classifications, 'document_verification': { 'title_docs': legal_metrics.get('title_docs', 0), 'registration_docs': legal_metrics.get('registration_docs', 0), 'tax_docs': legal_metrics.get('tax_docs', 0), 'approval_docs': legal_metrics.get('approval_docs', 0) }, 'compliance_status': { 'overall_compliance': legal_metrics.get('compliance_score', 0), 'missing_documents': legal_metrics.get('missing_docs', []) }, 'risk_assessment': { 'risk_level': legal_metrics.get('risk_level', 'low'), 'risk_factors': legal_metrics.get('risk_factors', []) } } except Exception as e: logger.error(f"Error in legal analysis: {str(e)}") # Return reasonable fallback instead of complete failure return { 'assessment': 'basic', 'confidence': 0.2, 'summary': 'Legal analysis failed due to technical error', 'completeness_score': 10, # Minimum score instead of 0 'potential_issues': False, 'legal_metrics': { 'text_length': len(str(legal_text)) if legal_text else 0, 'word_count': len(str(legal_text).split()) if legal_text else 0, 'legal_terms_found': 0 }, 'reasoning': f'Legal analysis error: {str(e)}. Using fallback scoring.', 'top_classifications': [], 'document_verification': {}, 'compliance_status': {}, 'risk_assessment': {} } def calculate_legal_metrics(legal_result, categories): """Calculate legal metrics from classification results.""" try: if not isinstance(legal_result, dict) or 'scores' not in legal_result: # Return default metrics for fallback return { 'title_and_ownership': 0.5, 'property_registration': 0.5, 'tax_and_financial': 0.5, 'approvals_and_permits': 0.5, 'land_and_usage': 0.5, 'compliance_and_legal': 0.5, 'disputes_and_litigation': 0.1 } scores = legal_result.get('scores', []) labels = legal_result.get('labels', []) # Create a mapping of labels to scores label_scores = dict(zip(labels, scores)) return { 'title_and_ownership': sum(label_scores.get(label, 0) for label in ['clear title documentation', 'title verification documents', 'ownership transfer documents', 'inheritance documents']) / 4, 'property_registration': sum(label_scores.get(label, 0) for label in ['property registration documents', 'sale deed documents', 'conveyance deed documents', 'development agreement documents']) / 4, 'tax_and_financial': sum(label_scores.get(label, 0) for label in ['property tax records', 'tax clearance certificates', 'encumbrance certificates', 'bank loan documents']) / 4, 'approvals_and_permits': sum(label_scores.get(label, 0) for label in ['building permits', 'construction approvals', 'occupation certificates', 'completion certificates']) / 4, 'land_and_usage': sum(label_scores.get(label, 0) for label in ['land use certificates', 'zoning certificates', 'layout approvals', 'master plan compliance']) / 4, 'compliance_and_legal': sum(label_scores.get(label, 0) for label in ['legal compliance certificates', 'no objection certificates', 'fire safety certificates', 'structural stability certificates']) / 4, 'disputes_and_litigation': sum(label_scores.get(label, 0) for label in ['property dispute records', 'litigation history', 'court orders', 'pending legal cases']) / 4 } except Exception as e: logger.error(f"Error calculating legal metrics: {str(e)}") return { 'title_and_ownership': 0.5, 'property_registration': 0.5, 'tax_and_financial': 0.5, 'approvals_and_permits': 0.5, 'land_and_usage': 0.5, 'compliance_and_legal': 0.5, 'disputes_and_litigation': 0.1 } def simple_legal_analysis(legal_text, categories): """Simple keyword-based legal analysis fallback.""" text_lower = legal_text.lower() # Define keywords for each category category_keywords = { "clear title documentation": ["title", "clear", "documentation", "ownership"], "property registration documents": ["registration", "property", "documents", "registered"], "property tax records": ["tax", "property", "records", "assessment"], "building permits": ["permit", "building", "construction", "approval"], "legal compliance certificates": ["compliance", "legal", "certificate", "approved"], "property dispute records": ["dispute", "litigation", "court", "case"], "legitimate listing": ["real", "genuine", "authentic", "verified"] } scores = [] for category in categories: keywords = category_keywords.get(category, [category.split()[0]]) # Use first word as fallback score = sum(1 for keyword in keywords if keyword in text_lower) / len(keywords) if keywords else 0.1 scores.append(min(1.0, score)) return { "labels": categories, "scores": scores } def summarize_text(text): """Generate summary using model or fallback.""" try: summarizer = load_model("summarization") if hasattr(summarizer, 'task_type') and summarizer.task_type == "summarization": # Using fallback summarizer result = summarizer(text) return result[0]['summary_text'] if result else text[:200] + "..." else: # Using actual model result = summarizer(text, max_length=130, min_length=30, do_sample=False) return result[0]['summary_text'] except Exception as e: logger.warning(f"Model generation failed, using static summary: {str(e)}") # Simple extractive summarization sentences = text.split('.') if len(sentences) > 3: return '. '.join(sentences[:2]) + '.' else: return text[:200] + '...' if len(text) > 200 else text