# app.py from flask import Flask, render_template, request, jsonify from flask_cors import CORS import base64 import io import re import json import uuid import time import asyncio from geopy.geocoders import Nominatim from datetime import datetime # from langdetect import detect # from deep_translator import GoogleTranslator from models.logging_config import logger from models.model_loader import load_model, clear_model_cache from models.parallel_processor import parallel_processor from models.performance_optimizer import performance_optimizer, optimize_model_loading, timed_function from models.image_analysis import analyze_image from models.pdf_analysis import extract_text_from_pdf, analyze_pdf_content from models.property_summary import generate_property_summary from models.fraud_classification import classify_fraud from models.trust_score import generate_trust_score from models.suggestions import generate_suggestions from models.text_quality import assess_text_quality from models.address_verification import verify_address from models.cross_validation import perform_cross_validation from models.location_analysis import analyze_location from models.price_analysis import analyze_price from models.legal_analysis import analyze_legal_details from models.property_specs import verify_property_specs from models.market_value import analyze_market_value from models.image_quality import assess_image_quality from models.property_relation import check_if_property_related import torch import numpy as np import concurrent.futures from PIL import Image app = Flask(__name__) CORS(app) # Enable CORS for frontend # Initialize geocoder geocoder = Nominatim(user_agent="indian_property_verifier", timeout=10) # Pre-load models to avoid loading delays during requests @timed_function def preload_models(): """Pre-load essential models to improve response times.""" try: logger.info("Pre-loading essential models with performance optimization...") # Only preload the most essential models to avoid disconnections essential_models = [ "zero-shot-classification", # For fraud, legal, suggestions "summarization" # For property summary ] for model_task in essential_models: try: logger.info(f"Pre-loading {model_task} model...") model = load_model(model_task) if hasattr(model, 'fallback_used') and model.fallback_used: logger.info(f"Using fallback for {model_task}: {getattr(model, 'fallback_model', 'unknown')}") else: logger.info(f"Successfully pre-loaded {model_task} model") except Exception as e: logger.warning(f"Failed to pre-load {model_task}: {str(e)}") logger.info("Model pre-loading completed with optimization") except Exception as e: logger.error(f"Error during model pre-loading: {str(e)}") # Pre-load models on startup preload_models() def make_json_serializable(obj): try: if isinstance(obj, (bool, int, float, str, type(None))): return obj elif isinstance(obj, (list, tuple)): return [make_json_serializable(item) for item in obj] elif isinstance(obj, dict): return {str(key): make_json_serializable(value) for key, value in obj.items()} elif torch.is_tensor(obj): return obj.item() if obj.numel() == 1 else obj.tolist() elif np.isscalar(obj): return obj.item() if hasattr(obj, 'item') else float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() else: return str(obj) except Exception as e: logger.error(f"Error serializing object: {str(e)}") return str(obj) @app.route('/') def index(): return render_template('index.html') @app.route('/get-location', methods=['POST']) def get_location(): try: data = request.json or {} latitude = data.get('latitude') longitude = data.get('longitude') if not latitude or not longitude: logger.warning("Missing latitude or longitude") return jsonify({ 'status': 'error', 'message': 'Latitude and longitude are required' }), 400 # Validate coordinates are within India try: lat, lng = float(latitude), float(longitude) if not (6.5 <= lat <= 37.5 and 68.0 <= lng <= 97.5): return jsonify({ 'status': 'error', 'message': 'Coordinates are outside India' }), 400 except ValueError: return jsonify({ 'status': 'error', 'message': 'Invalid coordinates format' }), 400 # Retry geocoding up to 3 times for attempt in range(3): try: location = geocoder.reverse((latitude, longitude), exactly_one=True) if location: address_components = location.raw.get('address', {}) # Extract Indian-specific address components city = address_components.get('city', '') if not city: city = address_components.get('town', '') if not city: city = address_components.get('village', '') if not city: city = address_components.get('suburb', '') state = address_components.get('state', '') if not state: state = address_components.get('state_district', '') # Get postal code and validate Indian format postal_code = address_components.get('postcode', '') if postal_code and not re.match(r'^\d{6}$', postal_code): postal_code = '' # Get road/street name road = address_components.get('road', '') if not road: road = address_components.get('street', '') # Get area/locality area = address_components.get('suburb', '') if not area: area = address_components.get('neighbourhood', '') return jsonify({ 'status': 'success', 'address': location.address, 'street': road, 'area': area, 'city': city, 'state': state, 'country': 'India', 'postal_code': postal_code, 'latitude': latitude, 'longitude': longitude, 'formatted_address': f"{road}, {area}, {city}, {state}, India - {postal_code}" }) logger.warning(f"Geocoding failed on attempt {attempt + 1}") time.sleep(1) # Wait before retry except Exception as e: logger.error(f"Geocoding error on attempt {attempt + 1}: {str(e)}") time.sleep(1) return jsonify({ 'status': 'error', 'message': 'Could not determine location after retries' }), 500 except Exception as e: logger.error(f"Error in get_location: {str(e)}") return jsonify({ 'status': 'error', 'message': str(e) }), 500 @app.route('/performance', methods=['GET']) def get_performance_metrics(): """Get system performance metrics and cache statistics""" try: from models.performance_optimizer import get_performance_metrics metrics = get_performance_metrics() return jsonify({ 'status': 'success', 'metrics': metrics, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) except Exception as e: logger.error(f"Error getting performance metrics: {str(e)}") return jsonify({ 'status': 'error', 'message': str(e) }), 500 @app.route('/clear-cache', methods=['POST']) def clear_cache(): """Clear all cached results""" try: performance_optimizer.clear_cache() return jsonify({ 'status': 'success', 'message': 'Cache cleared successfully' }) except Exception as e: logger.error(f"Error clearing cache: {str(e)}") return jsonify({ 'status': 'error', 'message': str(e) }), 500 def calculate_final_verdict(results): """ Calculate a comprehensive final verdict based on all analysis results. This function combines all verification scores, fraud indicators, and quality assessments to determine if a property listing is legitimate, suspicious, or fraudulent. """ try: # Defensive: ensure results is a dict if not isinstance(results, dict): logger.warning(f"Results is not a dict: {type(results)}") return { 'verdict': 'VERIFICATION REQUIRED', 'confidence': 0.0, 'reasoning': 'Insufficient data for verification', 'risk_level': 'medium', 'overall_score': 50 # Increased from 25 } # Extract key metrics with defensive programming fraud_classification = results.get('fraud_classification', {}) trust_score_data = results.get('trust_score', {}) address_verification = results.get('address_verification', {}) cross_validation = results.get('cross_validation', []) location_analysis = results.get('location_analysis', {}) price_analysis = results.get('price_analysis', {}) legal_analysis = results.get('legal_analysis', {}) specs_verification = results.get('specs_verification', {}) quality_assessment = results.get('quality_assessment', {}) # CRITICAL: Check for fake data patterns in cross validation - Much more lenient fake_data_detected = False fraudulent_issues = 0 high_severity_issues = 0 medium_severity_issues = 0 low_severity_issues = 0 if isinstance(cross_validation, list): for issue in cross_validation: if isinstance(issue, dict): status = issue.get('status', '') severity = issue.get('severity', 'low') if status == 'fraudulent': fraudulent_issues += 1 fake_data_detected = True elif severity == 'high': high_severity_issues += 1 elif severity == 'medium': medium_severity_issues += 1 elif severity == 'low': low_severity_issues += 1 # Calculate fraud risk score - Much more lenient fraud_score = 0.0 fraud_level = fraud_classification.get('alert_level', 'minimal') fraud_alert_score = fraud_classification.get('alert_score', 0.0) fraud_score_mapping = { 'critical': 0.8, # Reduced from 1.0 'high': 0.6, # Reduced from 0.8 'medium': 0.4, # Reduced from 0.6 'low': 0.2, # Reduced from 0.4 'minimal': 0.05 # Reduced from 0.1 } fraud_score = fraud_score_mapping.get(fraud_level, 0.05) * fraud_alert_score # CRITICAL: Much more lenient penalty for fake data if fake_data_detected: fraud_score = max(fraud_score, 0.4) # Reduced from 0.8 to 0.4 fraud_level = 'medium' # Changed from 'high' to 'medium' # Calculate trust score trust_score = 0.0 if isinstance(trust_score_data, dict): trust_score = trust_score_data.get('score', 0.0) # Convert percentage to decimal if needed if trust_score > 1.0: trust_score = trust_score / 100.0 elif isinstance(trust_score_data, tuple) and len(trust_score_data) > 0: trust_score = trust_score_data[0] # Convert percentage to decimal if needed if trust_score > 1.0: trust_score = trust_score / 100.0 else: trust_score = 0.0 # CRITICAL: Much more lenient penalty for fake data in trust score if fake_data_detected: trust_score = max(0.0, trust_score - 0.2) # Reduced penalty from 0.5 to 0.2 # Calculate address verification score address_score = 0.0 if address_verification and isinstance(address_verification, dict): verification_score = address_verification.get('verification_score', 0.0) address_score = float(verification_score) / 100.0 if verification_score > 0 else 0.0 # Calculate location analysis score location_score = 0.0 if location_analysis and isinstance(location_analysis, dict): completeness_score = location_analysis.get('completeness_score', 0.0) location_score = float(completeness_score) / 100.0 if completeness_score > 0 else 0.0 # Calculate price analysis score price_score = 0.0 if price_analysis and isinstance(price_analysis, dict): confidence = price_analysis.get('confidence', 0.0) price_score = float(confidence) if confidence > 0 else 0.0 # Calculate legal analysis score legal_score = 0.0 if legal_analysis and isinstance(legal_analysis, dict): confidence = legal_analysis.get('confidence', 0.0) legal_score = float(confidence) if confidence > 0 else 0.0 # Calculate specs verification score specs_score = 0.0 if specs_verification and isinstance(specs_verification, dict): verification_score = specs_verification.get('verification_score', 0.0) specs_score = float(verification_score) / 100.0 if verification_score > 0 else 0.0 # Calculate quality assessment score quality_score = 0.0 if quality_assessment and isinstance(quality_assessment, dict): score = quality_assessment.get('score', 0.0) quality_score = float(score) / 100.0 if score > 0 else 0.0 # Much more balanced weighted scoring system weights = { 'fraud': 0.25, # Reduced from 0.35 'trust': 0.30, # Increased from 0.25 'address': 0.15, # Keep address verification 'location': 0.12, # Increased from 0.10 'price': 0.10, # Keep price analysis 'legal': 0.05, # Increased from 0.03 'specs': 0.02, # Increased from 0.01 'quality': 0.01 # Keep quality assessment } # Calculate weighted score weighted_score = ( (1.0 - fraud_score) * weights['fraud'] + trust_score * weights['trust'] + address_score * weights['address'] + location_score * weights['location'] + price_score * weights['price'] + legal_score * weights['legal'] + specs_score * weights['specs'] + quality_score * weights['quality'] ) # Debug logging logger.info(f"Score components: fraud={fraud_score:.3f}, trust={trust_score:.3f}, address={address_score:.3f}, location={location_score:.3f}, price={price_score:.3f}, legal={legal_score:.3f}, specs={specs_score:.3f}, quality={quality_score:.3f}") logger.info(f"Weighted score before penalty: {weighted_score:.3f}") # Much more lenient penalty system issue_penalty = 0.0 if fraudulent_issues > 0: issue_penalty += fraudulent_issues * 0.08 # Reduced from 0.15 to 0.08 if high_severity_issues > 0: issue_penalty += high_severity_issues * 0.05 # Reduced from 0.10 to 0.05 if medium_severity_issues > 0: issue_penalty += medium_severity_issues * 0.02 # Reduced from 0.05 to 0.02 if low_severity_issues > 0: issue_penalty += low_severity_issues * 0.01 # Reduced from 0.02 to 0.01 weighted_score = max(0.0, weighted_score - issue_penalty) logger.info(f"Issue penalty: {issue_penalty:.3f}, Final weighted score: {weighted_score:.3f}") # CRITICAL: Much more lenient minimum score requirements if fake_data_detected: weighted_score = max(0.15, weighted_score) # Increased from 0.05 to 0.15 elif any([trust_score > 0, address_score > 0, location_score > 0, price_score > 0]): weighted_score = max(0.30, weighted_score) # Increased from 0.15 to 0.30 # Much more lenient verdict determination if fake_data_detected and fraudulent_issues > 5: # Increased threshold from 2 to 5 verdict = 'HIGH RISK LISTING' risk_level = 'high' elif weighted_score >= 0.60 and fraud_score < 0.4 and high_severity_issues == 0: # Reduced from 0.70 to 0.60 verdict = 'VERIFIED REAL ESTATE LISTING' risk_level = 'low' elif weighted_score >= 0.40 and fraud_score < 0.5 and high_severity_issues <= 2: # Reduced from 0.50 to 0.40 verdict = 'LIKELY LEGITIMATE' risk_level = 'low' elif weighted_score >= 0.25 and fraud_score < 0.7 and high_severity_issues <= 3: # Reduced from 0.30 to 0.25 verdict = 'SUSPICIOUS LISTING' risk_level = 'medium' elif fraud_score >= 0.8 or weighted_score < 0.20 or high_severity_issues >= 6: # Much more lenient thresholds verdict = 'HIGH RISK LISTING' risk_level = 'high' elif weighted_score >= 0.20: # Reduced from 0.15 verdict = 'VERIFICATION REQUIRED' risk_level = 'medium' else: verdict = 'INSUFFICIENT DATA' risk_level = 'medium' # Generate detailed reasoning reasoning_parts = [] if fake_data_detected: reasoning_parts.append("Fake data patterns detected") if fraudulent_issues > 0: reasoning_parts.append(f"{fraudulent_issues} fraudulent validation issues") if fraud_score > 0.4: # Reduced from 0.3 reasoning_parts.append(f"Fraud risk detected (level: {fraud_level})") if trust_score < 0.4: # Reduced from 0.3 reasoning_parts.append(f"Low trust score ({trust_score:.1%})") if address_score < 0.6: # Reduced from 0.5 reasoning_parts.append("Address verification issues") if location_score < 0.6: # Reduced from 0.5 reasoning_parts.append("Location verification issues") if price_score < 0.6: # Reduced from 0.5 reasoning_parts.append("Price analysis concerns") if legal_score < 0.6: # Reduced from 0.5 reasoning_parts.append("Legal documentation issues") if high_severity_issues > 0: reasoning_parts.append(f"{high_severity_issues} critical validation issues") if medium_severity_issues > 0: reasoning_parts.append(f"{medium_severity_issues} moderate validation issues") if not reasoning_parts: reasoning_parts.append("All verification checks passed successfully") reasoning = ". ".join(reasoning_parts) # Calculate overall score as percentage overall_score = int(weighted_score * 100) # Ensure score is between 0 and 100 overall_score = max(0, min(100, overall_score)) # CRITICAL: Much more lenient minimum score for fake data if fake_data_detected: overall_score = max(25, min(50, overall_score)) # Increased range from 10-25% to 25-50% elif overall_score == 0 and any([trust_score > 0, address_score > 0, location_score > 0]): overall_score = 40 # Increased from 20 to 40 # Final score adjustment based on data quality - Much more lenient if fake_data_detected or fraudulent_issues > 0: overall_score = max(25, min(50, overall_score)) # Increased from 10-25% to 25-50% elif high_severity_issues >= 3: overall_score = max(30, overall_score) # Increased from 15 to 30 elif high_severity_issues >= 1: overall_score = max(40, overall_score) # Increased from 20 to 40 else: overall_score = max(50, overall_score) # Increased from 25 to 50 return { 'verdict': verdict, 'confidence': min(1.0, weighted_score), 'reasoning': reasoning, 'risk_level': risk_level, 'overall_score': overall_score, 'scores': { 'fraud_score': fraud_score, 'trust_score': trust_score, 'address_score': address_score, 'location_score': location_score, 'price_score': price_score, 'legal_score': legal_score, 'specs_score': specs_score, 'quality_score': quality_score, 'weighted_score': weighted_score, 'cross_validation_issues': len(cross_validation) if isinstance(cross_validation, list) else 0, 'high_severity_issues': high_severity_issues, 'medium_severity_issues': medium_severity_issues, 'low_severity_issues': low_severity_issues, 'fraudulent_issues': fraudulent_issues, 'fake_data_detected': fake_data_detected } } except Exception as e: logger.error(f"Error calculating final verdict: {str(e)}") return { 'verdict': 'VERIFICATION REQUIRED', 'confidence': 0.0, 'reasoning': f'Error in verdict calculation: {str(e)}', 'risk_level': 'medium', 'overall_score': 50 # Increased from 25 } @app.route('/verify', methods=['POST']) def verify_property(): try: start_time = time.time() if not request.form and not request.files: logger.warning("No form data or files provided") return jsonify({ 'error': 'No data provided', 'status': 'error' }), 400 # Extract form data data = { 'property_name': request.form.get('property_name', '').strip(), 'property_type': request.form.get('property_type', '').strip(), 'status': request.form.get('status', '').strip(), 'description': request.form.get('description', '').strip(), 'address': request.form.get('address', '').strip(), 'city': request.form.get('city', '').strip(), 'state': request.form.get('state', '').strip(), 'country': request.form.get('country', 'India').strip(), 'zip': request.form.get('zip', '').strip(), 'latitude': request.form.get('latitude', '').strip(), 'longitude': request.form.get('longitude', '').strip(), 'bedrooms': request.form.get('bedrooms', '').strip(), 'bathrooms': request.form.get('bathrooms', '').strip(), 'total_rooms': request.form.get('total_rooms', '').strip(), 'year_built': request.form.get('year_built', '').strip(), 'parking': request.form.get('parking', '').strip(), 'sq_ft': request.form.get('sq_ft', '').strip(), 'market_value': request.form.get('market_value', '').strip(), 'amenities': request.form.get('amenities', '').strip(), 'nearby_landmarks': request.form.get('nearby_landmarks', '').strip(), 'legal_details': request.form.get('legal_details', '').strip() } # Validate required fields required_fields = ['property_name', 'property_type', 'address', 'city', 'state'] missing_fields = [field for field in required_fields if not data[field]] if missing_fields: logger.warning(f"Missing required fields: {', '.join(missing_fields)}") return jsonify({ 'error': f"Missing required fields: {', '.join(missing_fields)}", 'status': 'error' }), 400 # Process images in parallel images = [] image_analysis = [] image_model_used = set() image_parallel_info = [] if 'images' in request.files: image_files = [] for img_file in request.files.getlist('images'): if img_file.filename and img_file.filename.lower().endswith(('.jpg', '.jpeg', '.png')): image_files.append(img_file) if image_files: # Process images in parallel image_results = parallel_processor.process_images_parallel(image_files) for result in image_results: if 'image_data' in result: images.append(result['image_data']) image_analysis.append(result['analysis']) if 'model_used' in result['analysis']: image_model_used.add(result['analysis']['model_used']) if 'parallelization_info' in result: image_parallel_info.append(result['parallelization_info']) else: image_analysis.append(result) if 'model_used' in result: image_model_used.add(result['model_used']) if 'parallelization_info' in result: image_parallel_info.append(result['parallelization_info']) # Add image count to data for cross-validation data['image_count'] = len(images) data['has_images'] = len(images) > 0 # Process PDFs in parallel pdf_texts = [] pdf_analysis = [] pdf_parallel_info = [] if 'documents' in request.files: pdf_files = [] for pdf_file in request.files.getlist('documents'): if pdf_file.filename and pdf_file.filename.lower().endswith('.pdf'): pdf_files.append(pdf_file) if pdf_files: # Process PDFs in parallel pdf_results = parallel_processor.process_pdfs_parallel(pdf_files) for result in pdf_results: if 'filename' in result: pdf_texts.append({ 'filename': result['filename'], 'text': result['text'] }) pdf_analysis.append(result['analysis']) if 'parallelization_info' in result: pdf_parallel_info.append(result['parallelization_info']) else: pdf_analysis.append(result) if 'parallelization_info' in result: pdf_parallel_info.append(result['parallelization_info']) # Add document count to data for cross-validation data['document_count'] = len(pdf_texts) data['has_documents'] = len(pdf_texts) > 0 # Create consolidated text for analysis consolidated_text = f""" Property Name: {data['property_name']} Property Type: {data['property_type']} Status: {data['status']} Description: {data['description']} Location: {data['address']}, {data['city']}, {data['state']}, {data['country']}, {data['zip']} Coordinates: Lat {data['latitude']}, Long {data['longitude']} Specifications: {data['bedrooms']} bedrooms, {data['bathrooms']} bathrooms, {data['total_rooms']} total rooms Year Built: {data['year_built']} Parking: {data['parking']} Size: {data['sq_ft']} sq. ft. Market Value: ₹{data['market_value']} Amenities: {data['amenities']} Nearby Landmarks: {data['nearby_landmarks']} Legal Details: {data['legal_details']} """ # Detect if this is a rental property is_rental = any(keyword in data['status'].lower() for keyword in ['rent', 'lease', 'let', 'hiring']) if not is_rental: # Check description for rental keywords is_rental = any(keyword in data['description'].lower() for keyword in ['rent', 'lease', 'let', 'hiring', 'monthly', 'per month']) # Add rental detection to data data['is_rental'] = is_rental data['property_status'] = 'rental' if is_rental else 'sale' # Process description translation if needed try: description = data['description'] if description and len(description) > 10: data['description_translated'] = description else: data['description_translated'] = description except Exception as e: logger.error(f"Error in language detection/translation: {str(e)}") data['description_translated'] = data['description'] # Run all analyses in parallel using the new parallel processor analysis_start_time = time.time() # Create new event loop for async operations loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: analysis_results = loop.run_until_complete( parallel_processor.run_analyses_parallel(data, consolidated_text, image_analysis, pdf_analysis) ) finally: loop.close() analysis_time = time.time() - analysis_start_time logger.info(f"Analysis completed in {analysis_time:.2f} seconds") # Ensemble/agentic logic for summary, fraud, and legal analysis # (run multiple models and combine results if possible) # For demonstration, just add model_used/fallback info to the results # Unpack results summary = analysis_results.get('summary', "Property summary unavailable.") fraud_classification = analysis_results.get('fraud', {}) legal_analysis = analysis_results.get('legal', {}) trust_result = analysis_results.get('trust', (0.0, "Trust analysis failed")) suggestions = analysis_results.get('suggestions', {}) quality_assessment = analysis_results.get('quality', {}) address_verification = analysis_results.get('address', {}) cross_validation = analysis_results.get('cross_validation', []) location_analysis = analysis_results.get('location', {}) price_analysis = analysis_results.get('price', {}) specs_verification = analysis_results.get('specs', {}) market_analysis = analysis_results.get('market', {}) # Add model_used/fallback info if present if hasattr(summary, 'model_used'): summary_model_used = summary.model_used else: summary_model_used = getattr(summary, 'fallback_model', None) if hasattr(fraud_classification, 'model_used'): fraud_model_used = fraud_classification.model_used else: fraud_model_used = getattr(fraud_classification, 'fallback_model', None) if hasattr(legal_analysis, 'model_used'): legal_model_used = legal_analysis.model_used else: legal_model_used = getattr(legal_analysis, 'fallback_model', None) # Handle trust score result if isinstance(trust_result, tuple): trust_score, trust_reasoning = trust_result else: trust_score, trust_reasoning = 0.0, "Trust analysis failed" # Prepare response document_analysis = { 'pdf_count': len(pdf_texts), 'pdf_texts': pdf_texts, 'pdf_analysis': pdf_analysis, 'pdf_parallelization': pdf_parallel_info } # Fix image analysis structure to match frontend expectations image_results = { 'image_count': len(images), 'image_analysis': image_analysis, 'image_model_used': list(image_model_used), 'image_parallelization': image_parallel_info } # Ensure image analysis has proper structure for frontend if image_analysis: # Convert image analysis to proper format if needed formatted_image_analysis = [] for i, analysis in enumerate(image_analysis): if isinstance(analysis, dict): # Ensure all required fields are present formatted_analysis = { 'is_property_related': analysis.get('is_property_related', False), 'predicted_label': analysis.get('predicted_label', 'Unknown'), 'confidence': analysis.get('confidence', 0.0), 'real_estate_confidence': analysis.get('real_estate_confidence', 0.0), 'authenticity_score': analysis.get('authenticity_score', 0.0), 'is_ai_generated': analysis.get('is_ai_generated', False), 'image_quality': analysis.get('image_quality', { 'resolution': 'Unknown', 'quality_score': 0.0, 'total_pixels': 0, 'aspect_ratio': 1.0 }), 'top_predictions': analysis.get('top_predictions', []), 'model_used': analysis.get('model_used', 'static_fallback') } formatted_image_analysis.append(formatted_analysis) else: # Fallback for non-dict analysis formatted_image_analysis.append({ 'is_property_related': False, 'predicted_label': 'Unknown', 'confidence': 0.0, 'real_estate_confidence': 0.0, 'authenticity_score': 0.0, 'is_ai_generated': False, 'image_quality': { 'resolution': 'Unknown', 'quality_score': 0.0, 'total_pixels': 0, 'aspect_ratio': 1.0 }, 'top_predictions': [], 'model_used': 'static_fallback' }) image_results['image_analysis'] = formatted_image_analysis # Ensure document analysis has proper structure for frontend if pdf_analysis: formatted_pdf_analysis = [] for i, analysis in enumerate(pdf_analysis): if isinstance(analysis, dict): # Ensure all required fields are present formatted_analysis = { 'is_property_related': analysis.get('is_property_related', False), 'confidence': analysis.get('confidence', 0.0), 'document_type': analysis.get('document_type', 'Unknown'), 'document_confidence': analysis.get('document_confidence', 0.0), 'authenticity_assessment': analysis.get('authenticity_assessment', 'Unknown'), 'authenticity_confidence': analysis.get('authenticity_confidence', 0.0), 'summary': analysis.get('summary', 'No summary available'), 'key_info': analysis.get('key_info', {}), 'contains_signatures': analysis.get('contains_signatures', False), 'contains_dates': analysis.get('contains_dates', False), 'verification_score': analysis.get('verification_score', 0.0), 'real_estate_indicators': analysis.get('real_estate_indicators', []), 'legal_terms_found': analysis.get('legal_terms_found', []), 'keyword_analysis': analysis.get('keyword_analysis', {}), 'model_used': analysis.get('model_used', 'static_fallback') } formatted_pdf_analysis.append(formatted_analysis) else: # Fallback for non-dict analysis formatted_pdf_analysis.append({ 'is_property_related': False, 'confidence': 0.0, 'document_type': 'Unknown', 'document_confidence': 0.0, 'authenticity_assessment': 'Unknown', 'authenticity_confidence': 0.0, 'summary': 'No summary available', 'key_info': {}, 'contains_signatures': False, 'contains_dates': False, 'verification_score': 0.0, 'real_estate_indicators': [], 'legal_terms_found': [], 'keyword_analysis': {}, 'model_used': 'static_fallback' }) document_analysis['pdf_analysis'] = formatted_pdf_analysis report_id = str(uuid.uuid4()) # Create results dictionary results = { 'report_id': report_id, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'summary': summary, 'summary_model_used': summary_model_used, 'fraud_classification': fraud_classification, 'fraud_model_used': fraud_model_used, 'trust_score': { 'score': trust_score, 'reasoning': trust_reasoning }, 'suggestions': suggestions, 'quality_assessment': quality_assessment, 'address_verification': address_verification, 'cross_validation': cross_validation, 'location_analysis': location_analysis, 'price_analysis': price_analysis, 'legal_analysis': legal_analysis, 'legal_model_used': legal_model_used, 'document_analysis': document_analysis, 'image_analysis': image_results, 'specs_verification': specs_verification, 'market_analysis': market_analysis, 'images': images, 'processing_time': { 'total_time': time.time() - start_time, 'analysis_time': analysis_time } } # Calculate final verdict final_verdict = calculate_final_verdict(results) results['final_verdict'] = final_verdict total_time = time.time() - start_time logger.info(f"Total verification completed in {total_time:.2f} seconds") return jsonify(make_json_serializable(results)) except Exception as e: logger.error(f"Error in verify_property: {str(e)}") return jsonify({ 'error': 'Server error occurred. Please try again later.', 'status': 'error', 'details': str(e) }), 500 if __name__ == '__main__': # Run Flask app app.run(host='0.0.0.0', port=8000, debug=True, use_reloader=False)