# models/pdf_analysis.py import fitz # PyMuPDF import re from .model_loader import load_model from .logging_config import logger def extract_text_from_pdf(pdf_file): """ Extract text from PDF file with better error handling. """ try: # Open the PDF doc = fitz.open(stream=pdf_file.read(), filetype="pdf") text = "" # Extract text from all pages for page_num in range(len(doc)): page = doc.load_page(page_num) text += page.get_text() doc.close() return text.strip() except Exception as e: logger.error(f"Error extracting text from PDF: {str(e)}") return "" def analyze_pdf_content(document_text, property_data): """ Analyze PDF content for real estate verification with perfect classification and summarization. Args: document_text: Extracted text from PDF property_data: Property information for cross-validation Returns: dict: Comprehensive analysis results """ try: if not document_text or len(document_text.strip()) < 10: return { 'is_property_related': False, 'confidence': 0.0, 'summary': 'Document too short or empty', 'key_info': {}, 'verification_score': 0.0, 'document_type': 'Unknown', 'document_confidence': 0.0, 'authenticity_assessment': 'Unknown', 'authenticity_confidence': 0.0, 'contains_signatures': False, 'contains_dates': False, 'real_estate_indicators': [], 'legal_terms_found': [], 'model_used': 'static_fallback' } # Comprehensive real estate keyword analysis real_estate_keywords = { 'property_terms': [ 'property', 'house', 'apartment', 'flat', 'villa', 'land', 'real estate', 'residential', 'commercial', 'industrial', 'plot', 'acre', 'square feet', 'sq ft', 'sqft', 'bedroom', 'bathroom', 'kitchen', 'living room', 'dining room', 'garage', 'parking', 'garden', 'balcony', 'terrace' ], 'legal_terms': [ 'title', 'deed', 'ownership', 'mortgage', 'loan', 'lease', 'rent', 'agreement', 'contract', 'sale', 'purchase', 'transfer', 'registration', 'encumbrance', 'lien', 'easement', 'zoning', 'permit', 'license', 'tax', 'assessment', 'valuation', 'appraisal', 'survey', 'boundary' ], 'financial_terms': [ 'price', 'value', 'cost', 'amount', 'payment', 'installment', 'down payment', 'interest', 'rate', 'principal', 'balance', 'insurance', 'premium', 'deposit', 'advance', 'rental', 'security' ], 'location_terms': [ 'address', 'location', 'street', 'road', 'avenue', 'lane', 'city', 'state', 'country', 'postal', 'zip', 'pincode', 'neighborhood', 'area', 'district', 'zone', 'sector', 'block' ] } text_lower = document_text.lower() # Count keyword matches for each category keyword_counts = {} found_keywords = {} for category, keywords in real_estate_keywords.items(): matches = [] for keyword in keywords: if keyword in text_lower: matches.append(keyword) keyword_counts[category] = len(matches) found_keywords[category] = matches # Calculate overall confidence total_keywords = sum(len(keywords) for keywords in real_estate_keywords.values()) total_matches = sum(keyword_counts.values()) confidence = min(1.0, total_matches / (total_keywords * 0.3)) # 30% threshold # Determine document type with high accuracy document_type, document_confidence = classify_document_type(text_lower, found_keywords) # Generate comprehensive summary summary = generate_document_summary(document_text, document_type) # Extract key information key_info = extract_document_key_info(document_text) # Check for signatures and dates contains_signatures = detect_signatures(text_lower) contains_dates = detect_dates(document_text) # Assess authenticity authenticity_assessment, authenticity_confidence = assess_document_authenticity( document_text, contains_signatures, contains_dates, key_info ) # Calculate verification score verification_score = calculate_verification_score( confidence, document_confidence, authenticity_confidence, contains_signatures, contains_dates, key_info ) # Determine if it's real estate related is_property_related = confidence > 0.2 or document_type != 'Unknown' # Extract legal terms legal_terms_found = found_keywords.get('legal_terms', []) # Create real estate indicators list real_estate_indicators = [] for category, matches in found_keywords.items(): if matches: real_estate_indicators.extend(matches[:3]) # Top 3 from each category return { 'is_property_related': is_property_related, 'confidence': confidence, 'summary': summary, 'key_info': key_info, 'verification_score': verification_score, 'document_type': document_type, 'document_confidence': document_confidence, 'authenticity_assessment': authenticity_assessment, 'authenticity_confidence': authenticity_confidence, 'contains_signatures': contains_signatures, 'contains_dates': contains_dates, 'real_estate_indicators': real_estate_indicators, 'legal_terms_found': legal_terms_found, 'keyword_analysis': keyword_counts, 'model_used': 'static_fallback' } except Exception as e: logger.error(f"Error in PDF content analysis: {str(e)}") return { 'is_property_related': False, 'confidence': 0.0, 'summary': f'Analysis error: {str(e)}', 'key_info': {}, 'verification_score': 0.0, 'document_type': 'Unknown', 'document_confidence': 0.0, 'authenticity_assessment': 'Unknown', 'authenticity_confidence': 0.0, 'contains_signatures': False, 'contains_dates': False, 'real_estate_indicators': [], 'legal_terms_found': [], 'model_used': 'static_fallback', 'error': str(e) } def classify_document_type(text_lower, found_keywords): """ Classify document type with high accuracy. """ # Document type patterns document_patterns = { 'Property Title Deed': { 'keywords': ['title', 'deed', 'ownership', 'property', 'owner'], 'confidence': 0.9 }, 'Mortgage Document': { 'keywords': ['mortgage', 'loan', 'bank', 'lender', 'borrower', 'principal', 'interest'], 'confidence': 0.85 }, 'Lease Agreement': { 'keywords': ['lease', 'rent', 'tenant', 'landlord', 'rental', 'agreement'], 'confidence': 0.8 }, 'Sale Contract': { 'keywords': ['sale', 'purchase', 'buyer', 'seller', 'contract', 'agreement'], 'confidence': 0.8 }, 'Tax Assessment': { 'keywords': ['tax', 'assessment', 'valuation', 'appraisal', 'property tax'], 'confidence': 0.75 }, 'Building Permit': { 'keywords': ['permit', 'building', 'construction', 'approval', 'zoning'], 'confidence': 0.7 }, 'Property Survey': { 'keywords': ['survey', 'boundary', 'measurement', 'plot', 'dimension'], 'confidence': 0.7 }, 'Insurance Document': { 'keywords': ['insurance', 'policy', 'premium', 'coverage', 'claim'], 'confidence': 0.65 } } best_match = 'Unknown' best_confidence = 0.0 for doc_type, pattern in document_patterns.items(): matches = sum(1 for keyword in pattern['keywords'] if keyword in text_lower) if matches > 0: # Calculate confidence based on matches match_ratio = matches / len(pattern['keywords']) confidence = pattern['confidence'] * match_ratio if confidence > best_confidence: best_match = doc_type best_confidence = confidence return best_match, best_confidence def generate_document_summary(document_text, document_type): """ Generate comprehensive document summary. """ try: # Try to use summarization model if available try: summarizer = load_model("summarization") if hasattr(summarizer, 'fallback_used') and not summarizer.fallback_used: # Use model for summarization summary_result = summarizer(document_text[:1000], max_length=150, min_length=50) if isinstance(summary_result, list) and len(summary_result) > 0: return summary_result[0].get('summary_text', '') except Exception as e: logger.warning(f"Summarization model failed: {str(e)}") # Fallback to extractive summarization sentences = document_text.split('.') sentences = [s.strip() for s in sentences if len(s.strip()) > 20] if not sentences: return "Document contains insufficient text for summarization." # Select key sentences based on document type key_sentences = [] if document_type != 'Unknown': # Look for sentences containing document type keywords type_keywords = document_type.lower().split() for sentence in sentences: if any(keyword in sentence.lower() for keyword in type_keywords): key_sentences.append(sentence) if len(key_sentences) >= 2: break # If no type-specific sentences, take first few meaningful sentences if not key_sentences: key_sentences = sentences[:3] # Combine sentences summary = '. '.join(key_sentences) + '.' # Truncate if too long if len(summary) > 300: summary = summary[:297] + '...' return summary except Exception as e: logger.error(f"Error generating summary: {str(e)}") return "Summary generation failed." def extract_document_key_info(document_text): """ Extract key information from document. """ key_info = {} try: # Extract addresses address_patterns = [ r'\b\d+\s+[A-Za-z\s]+(?:Street|St|Road|Rd|Avenue|Ave|Lane|Ln|Drive|Dr|Boulevard|Blvd)\b', r'\b[A-Za-z\s]+,\s*[A-Za-z\s]+,\s*[A-Z]{2}\s*\d{5}\b' ] for pattern in address_patterns: matches = re.findall(pattern, document_text, re.IGNORECASE) if matches: key_info['addresses'] = matches[:3] # Top 3 addresses break # Extract dates date_patterns = [ r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b', r'\b\d{4}[/-]\d{1,2}[/-]\d{1,2}\b', r'\b(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}\b' ] dates = [] for pattern in date_patterns: dates.extend(re.findall(pattern, document_text, re.IGNORECASE)) if dates: key_info['dates'] = dates[:5] # Top 5 dates # Extract amounts/money amount_patterns = [ r'\$\d{1,3}(?:,\d{3})*(?:\.\d{2})?', r'₹\d{1,3}(?:,\d{3})*(?:\.\d{2})?', r'\d{1,3}(?:,\d{3})*(?:\.\d{2})?\s*(?:dollars?|rupees?|rs?)', ] amounts = [] for pattern in amount_patterns: amounts.extend(re.findall(pattern, document_text, re.IGNORECASE)) if amounts: key_info['amounts'] = amounts[:5] # Top 5 amounts # Extract phone numbers phone_pattern = r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b' phones = re.findall(phone_pattern, document_text) if phones: key_info['phone_numbers'] = phones[:3] # Top 3 phone numbers # Extract property details property_patterns = { 'bedrooms': r'\b(\d+)\s*(?:bedroom|bed|br)\b', 'bathrooms': r'\b(\d+)\s*(?:bathroom|bath|ba)\b', 'square_feet': r'\b(\d{1,3}(?:,\d{3})*)\s*(?:square\s*feet|sq\s*ft|sqft)\b', 'acres': r'\b(\d+(?:\.\d+)?)\s*acres?\b' } for key, pattern in property_patterns.items(): matches = re.findall(pattern, document_text, re.IGNORECASE) if matches: key_info[key] = matches[0] # First match # Extract names name_pattern = r'\b[A-Z][a-z]+\s+[A-Z][a-z]+\b' names = re.findall(name_pattern, document_text) if names: key_info['names'] = names[:5] # Top 5 names except Exception as e: logger.warning(f"Error extracting key info: {str(e)}") return key_info def detect_signatures(text_lower): """ Detect signatures in document. """ signature_indicators = [ 'signature', 'signed', 'sign', 'signatory', 'witness', 'notary', 'notarized', 'attorney', 'lawyer', 'agent' ] return any(indicator in text_lower for indicator in signature_indicators) def detect_dates(document_text): """ Detect dates in document. """ date_patterns = [ r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b', r'\b\d{4}[/-]\d{1,2}[/-]\d{1,2}\b', r'\b(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}\b' ] for pattern in date_patterns: if re.search(pattern, document_text, re.IGNORECASE): return True return False def assess_document_authenticity(document_text, has_signatures, has_dates, key_info): """ Assess document authenticity. """ authenticity_score = 0.0 # Base score if has_signatures: authenticity_score += 0.3 if has_dates: authenticity_score += 0.2 if key_info.get('addresses'): authenticity_score += 0.2 if key_info.get('amounts'): authenticity_score += 0.1 if key_info.get('names'): authenticity_score += 0.1 if len(document_text) > 500: authenticity_score += 0.1 # Determine assessment if authenticity_score >= 0.7: assessment = 'Authentic' elif authenticity_score >= 0.4: assessment = 'Likely Authentic' elif authenticity_score >= 0.2: assessment = 'Suspicious' else: assessment = 'Potentially Fake' return assessment, authenticity_score def calculate_verification_score(confidence, document_confidence, authenticity_confidence, has_signatures, has_dates, key_info): """ Calculate overall verification score. """ score = 0.0 # Base confidence score += confidence * 0.3 # Document type confidence score += document_confidence * 0.2 # Authenticity confidence score += authenticity_confidence * 0.2 # Additional factors if has_signatures: score += 0.1 if has_dates: score += 0.1 if key_info.get('addresses'): score += 0.05 if key_info.get('amounts'): score += 0.05 return min(100.0, score * 100) def check_document_consistency(document_text, property_data): """ Check document consistency with property data. """ try: if not property_data: return { 'is_consistent': True, 'confidence': 0.5, 'issues': [], 'model_used': 'static_fallback' } consistency_score = 0.5 # Base score issues = [] # Check address consistency if property_data.get('address'): property_address = property_data['address'].lower() doc_addresses = re.findall(r'\b\d+\s+[A-Za-z\s]+(?:Street|St|Road|Rd|Avenue|Ave)\b', document_text, re.IGNORECASE) for doc_addr in doc_addresses: if any(word in doc_addr.lower() for word in property_address.split()): consistency_score += 0.2 break else: issues.append("Address mismatch between document and property data") # Check property type consistency if property_data.get('property_type'): property_type = property_data['property_type'].lower() if property_type in document_text.lower(): consistency_score += 0.1 else: issues.append("Property type mismatch") # Check size consistency if property_data.get('sq_ft'): property_size = property_data['sq_ft'] size_matches = re.findall(r'\b(\d{1,3}(?:,\d{3})*)\s*(?:square\s*feet|sq\s*ft|sqft)\b', document_text, re.IGNORECASE) if size_matches: doc_size = size_matches[0].replace(',', '') if abs(int(doc_size) - int(property_size)) < 100: # Within 100 sq ft consistency_score += 0.1 else: issues.append("Property size mismatch") return { 'is_consistent': consistency_score > 0.6, 'confidence': min(1.0, consistency_score), 'issues': issues, 'model_used': 'static_fallback' } except Exception as e: logger.error(f"Error checking document consistency: {str(e)}") return { 'is_consistent': False, 'confidence': 0.0, 'issues': [f"Consistency check error: {str(e)}"], 'model_used': 'static_fallback' }