sksameermujahid's picture
Upload 22 files
01dfef8 verified
# app.py
from flask import Flask, render_template, request, jsonify
from flask_cors import CORS
import base64
import io
import re
import json
import uuid
import time
import asyncio
from geopy.geocoders import Nominatim
from datetime import datetime
# from langdetect import detect
# from deep_translator import GoogleTranslator
from models.logging_config import logger
from models.model_loader import load_model, clear_model_cache
from models.parallel_processor import parallel_processor
from models.performance_optimizer import performance_optimizer, optimize_model_loading, timed_function
from models.image_analysis import analyze_image
from models.pdf_analysis import extract_text_from_pdf, analyze_pdf_content
from models.property_summary import generate_property_summary
from models.fraud_classification import classify_fraud
from models.trust_score import generate_trust_score
from models.suggestions import generate_suggestions
from models.text_quality import assess_text_quality
from models.address_verification import verify_address
from models.cross_validation import perform_cross_validation
from models.location_analysis import analyze_location
from models.price_analysis import analyze_price
from models.legal_analysis import analyze_legal_details
from models.property_specs import verify_property_specs
from models.market_value import analyze_market_value
from models.image_quality import assess_image_quality
from models.property_relation import check_if_property_related
import torch
import numpy as np
import concurrent.futures
from PIL import Image
app = Flask(__name__)
CORS(app) # Enable CORS for frontend
# Initialize geocoder
geocoder = Nominatim(user_agent="indian_property_verifier", timeout=10)
# Pre-load models to avoid loading delays during requests
@timed_function
def preload_models():
"""Pre-load essential models to improve response times."""
try:
logger.info("Pre-loading essential models with performance optimization...")
# Only preload the most essential models to avoid disconnections
essential_models = [
"zero-shot-classification", # For fraud, legal, suggestions
"summarization" # For property summary
]
for model_task in essential_models:
try:
logger.info(f"Pre-loading {model_task} model...")
model = load_model(model_task)
if hasattr(model, 'fallback_used') and model.fallback_used:
logger.info(f"Using fallback for {model_task}: {getattr(model, 'fallback_model', 'unknown')}")
else:
logger.info(f"Successfully pre-loaded {model_task} model")
except Exception as e:
logger.warning(f"Failed to pre-load {model_task}: {str(e)}")
logger.info("Model pre-loading completed with optimization")
except Exception as e:
logger.error(f"Error during model pre-loading: {str(e)}")
# Pre-load models on startup
preload_models()
def make_json_serializable(obj):
try:
if isinstance(obj, (bool, int, float, str, type(None))):
return obj
elif isinstance(obj, (list, tuple)):
return [make_json_serializable(item) for item in obj]
elif isinstance(obj, dict):
return {str(key): make_json_serializable(value) for key, value in obj.items()}
elif torch.is_tensor(obj):
return obj.item() if obj.numel() == 1 else obj.tolist()
elif np.isscalar(obj):
return obj.item() if hasattr(obj, 'item') else float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return str(obj)
except Exception as e:
logger.error(f"Error serializing object: {str(e)}")
return str(obj)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/get-location', methods=['POST'])
def get_location():
try:
data = request.json or {}
latitude = data.get('latitude')
longitude = data.get('longitude')
if not latitude or not longitude:
logger.warning("Missing latitude or longitude")
return jsonify({
'status': 'error',
'message': 'Latitude and longitude are required'
}), 400
# Validate coordinates are within India
try:
lat, lng = float(latitude), float(longitude)
if not (6.5 <= lat <= 37.5 and 68.0 <= lng <= 97.5):
return jsonify({
'status': 'error',
'message': 'Coordinates are outside India'
}), 400
except ValueError:
return jsonify({
'status': 'error',
'message': 'Invalid coordinates format'
}), 400
# Retry geocoding up to 3 times
for attempt in range(3):
try:
location = geocoder.reverse((latitude, longitude), exactly_one=True)
if location:
address_components = location.raw.get('address', {})
# Extract Indian-specific address components
city = address_components.get('city', '')
if not city:
city = address_components.get('town', '')
if not city:
city = address_components.get('village', '')
if not city:
city = address_components.get('suburb', '')
state = address_components.get('state', '')
if not state:
state = address_components.get('state_district', '')
# Get postal code and validate Indian format
postal_code = address_components.get('postcode', '')
if postal_code and not re.match(r'^\d{6}$', postal_code):
postal_code = ''
# Get road/street name
road = address_components.get('road', '')
if not road:
road = address_components.get('street', '')
# Get area/locality
area = address_components.get('suburb', '')
if not area:
area = address_components.get('neighbourhood', '')
return jsonify({
'status': 'success',
'address': location.address,
'street': road,
'area': area,
'city': city,
'state': state,
'country': 'India',
'postal_code': postal_code,
'latitude': latitude,
'longitude': longitude,
'formatted_address': f"{road}, {area}, {city}, {state}, India - {postal_code}"
})
logger.warning(f"Geocoding failed on attempt {attempt + 1}")
time.sleep(1) # Wait before retry
except Exception as e:
logger.error(f"Geocoding error on attempt {attempt + 1}: {str(e)}")
time.sleep(1)
return jsonify({
'status': 'error',
'message': 'Could not determine location after retries'
}), 500
except Exception as e:
logger.error(f"Error in get_location: {str(e)}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/performance', methods=['GET'])
def get_performance_metrics():
"""Get system performance metrics and cache statistics"""
try:
from models.performance_optimizer import get_performance_metrics
metrics = get_performance_metrics()
return jsonify({
'status': 'success',
'metrics': metrics,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
except Exception as e:
logger.error(f"Error getting performance metrics: {str(e)}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/clear-cache', methods=['POST'])
def clear_cache():
"""Clear all cached results"""
try:
performance_optimizer.clear_cache()
return jsonify({
'status': 'success',
'message': 'Cache cleared successfully'
})
except Exception as e:
logger.error(f"Error clearing cache: {str(e)}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
def calculate_final_verdict(results):
"""
Calculate a comprehensive final verdict based on all analysis results.
This function combines all verification scores, fraud indicators, and quality assessments
to determine if a property listing is legitimate, suspicious, or fraudulent.
"""
try:
# Defensive: ensure results is a dict
if not isinstance(results, dict):
logger.warning(f"Results is not a dict: {type(results)}")
return {
'verdict': 'VERIFICATION REQUIRED',
'confidence': 0.0,
'reasoning': 'Insufficient data for verification',
'risk_level': 'medium',
'overall_score': 50 # Increased from 25
}
# Extract key metrics with defensive programming
fraud_classification = results.get('fraud_classification', {})
trust_score_data = results.get('trust_score', {})
address_verification = results.get('address_verification', {})
cross_validation = results.get('cross_validation', [])
location_analysis = results.get('location_analysis', {})
price_analysis = results.get('price_analysis', {})
legal_analysis = results.get('legal_analysis', {})
specs_verification = results.get('specs_verification', {})
quality_assessment = results.get('quality_assessment', {})
# CRITICAL: Check for fake data patterns in cross validation - Much more lenient
fake_data_detected = False
fraudulent_issues = 0
high_severity_issues = 0
medium_severity_issues = 0
low_severity_issues = 0
if isinstance(cross_validation, list):
for issue in cross_validation:
if isinstance(issue, dict):
status = issue.get('status', '')
severity = issue.get('severity', 'low')
if status == 'fraudulent':
fraudulent_issues += 1
fake_data_detected = True
elif severity == 'high':
high_severity_issues += 1
elif severity == 'medium':
medium_severity_issues += 1
elif severity == 'low':
low_severity_issues += 1
# Calculate fraud risk score - Much more lenient
fraud_score = 0.0
fraud_level = fraud_classification.get('alert_level', 'minimal')
fraud_alert_score = fraud_classification.get('alert_score', 0.0)
fraud_score_mapping = {
'critical': 0.8, # Reduced from 1.0
'high': 0.6, # Reduced from 0.8
'medium': 0.4, # Reduced from 0.6
'low': 0.2, # Reduced from 0.4
'minimal': 0.05 # Reduced from 0.1
}
fraud_score = fraud_score_mapping.get(fraud_level, 0.05) * fraud_alert_score
# CRITICAL: Much more lenient penalty for fake data
if fake_data_detected:
fraud_score = max(fraud_score, 0.4) # Reduced from 0.8 to 0.4
fraud_level = 'medium' # Changed from 'high' to 'medium'
# Calculate trust score
trust_score = 0.0
if isinstance(trust_score_data, dict):
trust_score = trust_score_data.get('score', 0.0)
# Convert percentage to decimal if needed
if trust_score > 1.0:
trust_score = trust_score / 100.0
elif isinstance(trust_score_data, tuple) and len(trust_score_data) > 0:
trust_score = trust_score_data[0]
# Convert percentage to decimal if needed
if trust_score > 1.0:
trust_score = trust_score / 100.0
else:
trust_score = 0.0
# CRITICAL: Much more lenient penalty for fake data in trust score
if fake_data_detected:
trust_score = max(0.0, trust_score - 0.2) # Reduced penalty from 0.5 to 0.2
# Calculate address verification score
address_score = 0.0
if address_verification and isinstance(address_verification, dict):
verification_score = address_verification.get('verification_score', 0.0)
address_score = float(verification_score) / 100.0 if verification_score > 0 else 0.0
# Calculate location analysis score
location_score = 0.0
if location_analysis and isinstance(location_analysis, dict):
completeness_score = location_analysis.get('completeness_score', 0.0)
location_score = float(completeness_score) / 100.0 if completeness_score > 0 else 0.0
# Calculate price analysis score
price_score = 0.0
if price_analysis and isinstance(price_analysis, dict):
confidence = price_analysis.get('confidence', 0.0)
price_score = float(confidence) if confidence > 0 else 0.0
# Calculate legal analysis score
legal_score = 0.0
if legal_analysis and isinstance(legal_analysis, dict):
confidence = legal_analysis.get('confidence', 0.0)
legal_score = float(confidence) if confidence > 0 else 0.0
# Calculate specs verification score
specs_score = 0.0
if specs_verification and isinstance(specs_verification, dict):
verification_score = specs_verification.get('verification_score', 0.0)
specs_score = float(verification_score) / 100.0 if verification_score > 0 else 0.0
# Calculate quality assessment score
quality_score = 0.0
if quality_assessment and isinstance(quality_assessment, dict):
score = quality_assessment.get('score', 0.0)
quality_score = float(score) / 100.0 if score > 0 else 0.0
# Much more balanced weighted scoring system
weights = {
'fraud': 0.25, # Reduced from 0.35
'trust': 0.30, # Increased from 0.25
'address': 0.15, # Keep address verification
'location': 0.12, # Increased from 0.10
'price': 0.10, # Keep price analysis
'legal': 0.05, # Increased from 0.03
'specs': 0.02, # Increased from 0.01
'quality': 0.01 # Keep quality assessment
}
# Calculate weighted score
weighted_score = (
(1.0 - fraud_score) * weights['fraud'] +
trust_score * weights['trust'] +
address_score * weights['address'] +
location_score * weights['location'] +
price_score * weights['price'] +
legal_score * weights['legal'] +
specs_score * weights['specs'] +
quality_score * weights['quality']
)
# Debug logging
logger.info(f"Score components: fraud={fraud_score:.3f}, trust={trust_score:.3f}, address={address_score:.3f}, location={location_score:.3f}, price={price_score:.3f}, legal={legal_score:.3f}, specs={specs_score:.3f}, quality={quality_score:.3f}")
logger.info(f"Weighted score before penalty: {weighted_score:.3f}")
# Much more lenient penalty system
issue_penalty = 0.0
if fraudulent_issues > 0:
issue_penalty += fraudulent_issues * 0.08 # Reduced from 0.15 to 0.08
if high_severity_issues > 0:
issue_penalty += high_severity_issues * 0.05 # Reduced from 0.10 to 0.05
if medium_severity_issues > 0:
issue_penalty += medium_severity_issues * 0.02 # Reduced from 0.05 to 0.02
if low_severity_issues > 0:
issue_penalty += low_severity_issues * 0.01 # Reduced from 0.02 to 0.01
weighted_score = max(0.0, weighted_score - issue_penalty)
logger.info(f"Issue penalty: {issue_penalty:.3f}, Final weighted score: {weighted_score:.3f}")
# CRITICAL: Much more lenient minimum score requirements
if fake_data_detected:
weighted_score = max(0.15, weighted_score) # Increased from 0.05 to 0.15
elif any([trust_score > 0, address_score > 0, location_score > 0, price_score > 0]):
weighted_score = max(0.30, weighted_score) # Increased from 0.15 to 0.30
# Much more lenient verdict determination
if fake_data_detected and fraudulent_issues > 5: # Increased threshold from 2 to 5
verdict = 'HIGH RISK LISTING'
risk_level = 'high'
elif weighted_score >= 0.60 and fraud_score < 0.4 and high_severity_issues == 0: # Reduced from 0.70 to 0.60
verdict = 'VERIFIED REAL ESTATE LISTING'
risk_level = 'low'
elif weighted_score >= 0.40 and fraud_score < 0.5 and high_severity_issues <= 2: # Reduced from 0.50 to 0.40
verdict = 'LIKELY LEGITIMATE'
risk_level = 'low'
elif weighted_score >= 0.25 and fraud_score < 0.7 and high_severity_issues <= 3: # Reduced from 0.30 to 0.25
verdict = 'SUSPICIOUS LISTING'
risk_level = 'medium'
elif fraud_score >= 0.8 or weighted_score < 0.20 or high_severity_issues >= 6: # Much more lenient thresholds
verdict = 'HIGH RISK LISTING'
risk_level = 'high'
elif weighted_score >= 0.20: # Reduced from 0.15
verdict = 'VERIFICATION REQUIRED'
risk_level = 'medium'
else:
verdict = 'INSUFFICIENT DATA'
risk_level = 'medium'
# Generate detailed reasoning
reasoning_parts = []
if fake_data_detected:
reasoning_parts.append("Fake data patterns detected")
if fraudulent_issues > 0:
reasoning_parts.append(f"{fraudulent_issues} fraudulent validation issues")
if fraud_score > 0.4: # Reduced from 0.3
reasoning_parts.append(f"Fraud risk detected (level: {fraud_level})")
if trust_score < 0.4: # Reduced from 0.3
reasoning_parts.append(f"Low trust score ({trust_score:.1%})")
if address_score < 0.6: # Reduced from 0.5
reasoning_parts.append("Address verification issues")
if location_score < 0.6: # Reduced from 0.5
reasoning_parts.append("Location verification issues")
if price_score < 0.6: # Reduced from 0.5
reasoning_parts.append("Price analysis concerns")
if legal_score < 0.6: # Reduced from 0.5
reasoning_parts.append("Legal documentation issues")
if high_severity_issues > 0:
reasoning_parts.append(f"{high_severity_issues} critical validation issues")
if medium_severity_issues > 0:
reasoning_parts.append(f"{medium_severity_issues} moderate validation issues")
if not reasoning_parts:
reasoning_parts.append("All verification checks passed successfully")
reasoning = ". ".join(reasoning_parts)
# Calculate overall score as percentage
overall_score = int(weighted_score * 100)
# Ensure score is between 0 and 100
overall_score = max(0, min(100, overall_score))
# CRITICAL: Much more lenient minimum score for fake data
if fake_data_detected:
overall_score = max(25, min(50, overall_score)) # Increased range from 10-25% to 25-50%
elif overall_score == 0 and any([trust_score > 0, address_score > 0, location_score > 0]):
overall_score = 40 # Increased from 20 to 40
# Final score adjustment based on data quality - Much more lenient
if fake_data_detected or fraudulent_issues > 0:
overall_score = max(25, min(50, overall_score)) # Increased from 10-25% to 25-50%
elif high_severity_issues >= 3:
overall_score = max(30, overall_score) # Increased from 15 to 30
elif high_severity_issues >= 1:
overall_score = max(40, overall_score) # Increased from 20 to 40
else:
overall_score = max(50, overall_score) # Increased from 25 to 50
return {
'verdict': verdict,
'confidence': min(1.0, weighted_score),
'reasoning': reasoning,
'risk_level': risk_level,
'overall_score': overall_score,
'scores': {
'fraud_score': fraud_score,
'trust_score': trust_score,
'address_score': address_score,
'location_score': location_score,
'price_score': price_score,
'legal_score': legal_score,
'specs_score': specs_score,
'quality_score': quality_score,
'weighted_score': weighted_score,
'cross_validation_issues': len(cross_validation) if isinstance(cross_validation, list) else 0,
'high_severity_issues': high_severity_issues,
'medium_severity_issues': medium_severity_issues,
'low_severity_issues': low_severity_issues,
'fraudulent_issues': fraudulent_issues,
'fake_data_detected': fake_data_detected
}
}
except Exception as e:
logger.error(f"Error calculating final verdict: {str(e)}")
return {
'verdict': 'VERIFICATION REQUIRED',
'confidence': 0.0,
'reasoning': f'Error in verdict calculation: {str(e)}',
'risk_level': 'medium',
'overall_score': 50 # Increased from 25
}
@app.route('/verify', methods=['POST'])
def verify_property():
try:
start_time = time.time()
if not request.form and not request.files:
logger.warning("No form data or files provided")
return jsonify({
'error': 'No data provided',
'status': 'error'
}), 400
# Extract form data
data = {
'property_name': request.form.get('property_name', '').strip(),
'property_type': request.form.get('property_type', '').strip(),
'status': request.form.get('status', '').strip(),
'description': request.form.get('description', '').strip(),
'address': request.form.get('address', '').strip(),
'city': request.form.get('city', '').strip(),
'state': request.form.get('state', '').strip(),
'country': request.form.get('country', 'India').strip(),
'zip': request.form.get('zip', '').strip(),
'latitude': request.form.get('latitude', '').strip(),
'longitude': request.form.get('longitude', '').strip(),
'bedrooms': request.form.get('bedrooms', '').strip(),
'bathrooms': request.form.get('bathrooms', '').strip(),
'total_rooms': request.form.get('total_rooms', '').strip(),
'year_built': request.form.get('year_built', '').strip(),
'parking': request.form.get('parking', '').strip(),
'sq_ft': request.form.get('sq_ft', '').strip(),
'market_value': request.form.get('market_value', '').strip(),
'amenities': request.form.get('amenities', '').strip(),
'nearby_landmarks': request.form.get('nearby_landmarks', '').strip(),
'legal_details': request.form.get('legal_details', '').strip()
}
# Validate required fields
required_fields = ['property_name', 'property_type', 'address', 'city', 'state']
missing_fields = [field for field in required_fields if not data[field]]
if missing_fields:
logger.warning(f"Missing required fields: {', '.join(missing_fields)}")
return jsonify({
'error': f"Missing required fields: {', '.join(missing_fields)}",
'status': 'error'
}), 400
# Process images in parallel
images = []
image_analysis = []
image_model_used = set()
image_parallel_info = []
if 'images' in request.files:
image_files = []
for img_file in request.files.getlist('images'):
if img_file.filename and img_file.filename.lower().endswith(('.jpg', '.jpeg', '.png')):
image_files.append(img_file)
if image_files:
# Process images in parallel
image_results = parallel_processor.process_images_parallel(image_files)
for result in image_results:
if 'image_data' in result:
images.append(result['image_data'])
image_analysis.append(result['analysis'])
if 'model_used' in result['analysis']:
image_model_used.add(result['analysis']['model_used'])
if 'parallelization_info' in result:
image_parallel_info.append(result['parallelization_info'])
else:
image_analysis.append(result)
if 'model_used' in result:
image_model_used.add(result['model_used'])
if 'parallelization_info' in result:
image_parallel_info.append(result['parallelization_info'])
# Add image count to data for cross-validation
data['image_count'] = len(images)
data['has_images'] = len(images) > 0
# Process PDFs in parallel
pdf_texts = []
pdf_analysis = []
pdf_parallel_info = []
if 'documents' in request.files:
pdf_files = []
for pdf_file in request.files.getlist('documents'):
if pdf_file.filename and pdf_file.filename.lower().endswith('.pdf'):
pdf_files.append(pdf_file)
if pdf_files:
# Process PDFs in parallel
pdf_results = parallel_processor.process_pdfs_parallel(pdf_files)
for result in pdf_results:
if 'filename' in result:
pdf_texts.append({
'filename': result['filename'],
'text': result['text']
})
pdf_analysis.append(result['analysis'])
if 'parallelization_info' in result:
pdf_parallel_info.append(result['parallelization_info'])
else:
pdf_analysis.append(result)
if 'parallelization_info' in result:
pdf_parallel_info.append(result['parallelization_info'])
# Add document count to data for cross-validation
data['document_count'] = len(pdf_texts)
data['has_documents'] = len(pdf_texts) > 0
# Create consolidated text for analysis
consolidated_text = f"""
Property Name: {data['property_name']}
Property Type: {data['property_type']}
Status: {data['status']}
Description: {data['description']}
Location: {data['address']}, {data['city']}, {data['state']}, {data['country']}, {data['zip']}
Coordinates: Lat {data['latitude']}, Long {data['longitude']}
Specifications: {data['bedrooms']} bedrooms, {data['bathrooms']} bathrooms, {data['total_rooms']} total rooms
Year Built: {data['year_built']}
Parking: {data['parking']}
Size: {data['sq_ft']} sq. ft.
Market Value: ₹{data['market_value']}
Amenities: {data['amenities']}
Nearby Landmarks: {data['nearby_landmarks']}
Legal Details: {data['legal_details']}
"""
# Detect if this is a rental property
is_rental = any(keyword in data['status'].lower() for keyword in ['rent', 'lease', 'let', 'hiring'])
if not is_rental:
# Check description for rental keywords
is_rental = any(keyword in data['description'].lower() for keyword in ['rent', 'lease', 'let', 'hiring', 'monthly', 'per month'])
# Add rental detection to data
data['is_rental'] = is_rental
data['property_status'] = 'rental' if is_rental else 'sale'
# Process description translation if needed
try:
description = data['description']
if description and len(description) > 10:
data['description_translated'] = description
else:
data['description_translated'] = description
except Exception as e:
logger.error(f"Error in language detection/translation: {str(e)}")
data['description_translated'] = data['description']
# Run all analyses in parallel using the new parallel processor
analysis_start_time = time.time()
# Create new event loop for async operations
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
analysis_results = loop.run_until_complete(
parallel_processor.run_analyses_parallel(data, consolidated_text, image_analysis, pdf_analysis)
)
finally:
loop.close()
analysis_time = time.time() - analysis_start_time
logger.info(f"Analysis completed in {analysis_time:.2f} seconds")
# Ensemble/agentic logic for summary, fraud, and legal analysis
# (run multiple models and combine results if possible)
# For demonstration, just add model_used/fallback info to the results
# Unpack results
summary = analysis_results.get('summary', "Property summary unavailable.")
fraud_classification = analysis_results.get('fraud', {})
legal_analysis = analysis_results.get('legal', {})
trust_result = analysis_results.get('trust', (0.0, "Trust analysis failed"))
suggestions = analysis_results.get('suggestions', {})
quality_assessment = analysis_results.get('quality', {})
address_verification = analysis_results.get('address', {})
cross_validation = analysis_results.get('cross_validation', [])
location_analysis = analysis_results.get('location', {})
price_analysis = analysis_results.get('price', {})
specs_verification = analysis_results.get('specs', {})
market_analysis = analysis_results.get('market', {})
# Add model_used/fallback info if present
if hasattr(summary, 'model_used'):
summary_model_used = summary.model_used
else:
summary_model_used = getattr(summary, 'fallback_model', None)
if hasattr(fraud_classification, 'model_used'):
fraud_model_used = fraud_classification.model_used
else:
fraud_model_used = getattr(fraud_classification, 'fallback_model', None)
if hasattr(legal_analysis, 'model_used'):
legal_model_used = legal_analysis.model_used
else:
legal_model_used = getattr(legal_analysis, 'fallback_model', None)
# Handle trust score result
if isinstance(trust_result, tuple):
trust_score, trust_reasoning = trust_result
else:
trust_score, trust_reasoning = 0.0, "Trust analysis failed"
# Prepare response
document_analysis = {
'pdf_count': len(pdf_texts),
'pdf_texts': pdf_texts,
'pdf_analysis': pdf_analysis,
'pdf_parallelization': pdf_parallel_info
}
# Fix image analysis structure to match frontend expectations
image_results = {
'image_count': len(images),
'image_analysis': image_analysis,
'image_model_used': list(image_model_used),
'image_parallelization': image_parallel_info
}
# Ensure image analysis has proper structure for frontend
if image_analysis:
# Convert image analysis to proper format if needed
formatted_image_analysis = []
for i, analysis in enumerate(image_analysis):
if isinstance(analysis, dict):
# Ensure all required fields are present
formatted_analysis = {
'is_property_related': analysis.get('is_property_related', False),
'predicted_label': analysis.get('predicted_label', 'Unknown'),
'confidence': analysis.get('confidence', 0.0),
'real_estate_confidence': analysis.get('real_estate_confidence', 0.0),
'authenticity_score': analysis.get('authenticity_score', 0.0),
'is_ai_generated': analysis.get('is_ai_generated', False),
'image_quality': analysis.get('image_quality', {
'resolution': 'Unknown',
'quality_score': 0.0,
'total_pixels': 0,
'aspect_ratio': 1.0
}),
'top_predictions': analysis.get('top_predictions', []),
'model_used': analysis.get('model_used', 'static_fallback')
}
formatted_image_analysis.append(formatted_analysis)
else:
# Fallback for non-dict analysis
formatted_image_analysis.append({
'is_property_related': False,
'predicted_label': 'Unknown',
'confidence': 0.0,
'real_estate_confidence': 0.0,
'authenticity_score': 0.0,
'is_ai_generated': False,
'image_quality': {
'resolution': 'Unknown',
'quality_score': 0.0,
'total_pixels': 0,
'aspect_ratio': 1.0
},
'top_predictions': [],
'model_used': 'static_fallback'
})
image_results['image_analysis'] = formatted_image_analysis
# Ensure document analysis has proper structure for frontend
if pdf_analysis:
formatted_pdf_analysis = []
for i, analysis in enumerate(pdf_analysis):
if isinstance(analysis, dict):
# Ensure all required fields are present
formatted_analysis = {
'is_property_related': analysis.get('is_property_related', False),
'confidence': analysis.get('confidence', 0.0),
'document_type': analysis.get('document_type', 'Unknown'),
'document_confidence': analysis.get('document_confidence', 0.0),
'authenticity_assessment': analysis.get('authenticity_assessment', 'Unknown'),
'authenticity_confidence': analysis.get('authenticity_confidence', 0.0),
'summary': analysis.get('summary', 'No summary available'),
'key_info': analysis.get('key_info', {}),
'contains_signatures': analysis.get('contains_signatures', False),
'contains_dates': analysis.get('contains_dates', False),
'verification_score': analysis.get('verification_score', 0.0),
'real_estate_indicators': analysis.get('real_estate_indicators', []),
'legal_terms_found': analysis.get('legal_terms_found', []),
'keyword_analysis': analysis.get('keyword_analysis', {}),
'model_used': analysis.get('model_used', 'static_fallback')
}
formatted_pdf_analysis.append(formatted_analysis)
else:
# Fallback for non-dict analysis
formatted_pdf_analysis.append({
'is_property_related': False,
'confidence': 0.0,
'document_type': 'Unknown',
'document_confidence': 0.0,
'authenticity_assessment': 'Unknown',
'authenticity_confidence': 0.0,
'summary': 'No summary available',
'key_info': {},
'contains_signatures': False,
'contains_dates': False,
'verification_score': 0.0,
'real_estate_indicators': [],
'legal_terms_found': [],
'keyword_analysis': {},
'model_used': 'static_fallback'
})
document_analysis['pdf_analysis'] = formatted_pdf_analysis
report_id = str(uuid.uuid4())
# Create results dictionary
results = {
'report_id': report_id,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'summary': summary,
'summary_model_used': summary_model_used,
'fraud_classification': fraud_classification,
'fraud_model_used': fraud_model_used,
'trust_score': {
'score': trust_score,
'reasoning': trust_reasoning
},
'suggestions': suggestions,
'quality_assessment': quality_assessment,
'address_verification': address_verification,
'cross_validation': cross_validation,
'location_analysis': location_analysis,
'price_analysis': price_analysis,
'legal_analysis': legal_analysis,
'legal_model_used': legal_model_used,
'document_analysis': document_analysis,
'image_analysis': image_results,
'specs_verification': specs_verification,
'market_analysis': market_analysis,
'images': images,
'processing_time': {
'total_time': time.time() - start_time,
'analysis_time': analysis_time
}
}
# Calculate final verdict
final_verdict = calculate_final_verdict(results)
results['final_verdict'] = final_verdict
total_time = time.time() - start_time
logger.info(f"Total verification completed in {total_time:.2f} seconds")
return jsonify(make_json_serializable(results))
except Exception as e:
logger.error(f"Error in verify_property: {str(e)}")
return jsonify({
'error': 'Server error occurred. Please try again later.',
'status': 'error',
'details': str(e)
}), 500
if __name__ == '__main__':
# Run Flask app
app.run(host='0.0.0.0', port=8000, debug=True, use_reloader=False)