# models/cross_validation.py import re from datetime import datetime from .logging_config import logger from .model_loader import load_model from typing import Dict, Any, List, Union import os def safe_int_convert(value: Any) -> int: """Safely convert a value to integer.""" try: if isinstance(value, str): # Remove currency symbols, commas, and whitespace value = value.replace('₹', '').replace(',', '').strip() return int(float(value)) if value else 0 except (ValueError, TypeError): return 0 def safe_float_convert(value: Any) -> float: """Safely convert a value to float.""" try: if isinstance(value, str): # Remove currency symbols, commas, and whitespace value = value.replace('₹', '').replace(',', '').strip() return float(value) if value else 0.0 except (ValueError, TypeError): return 0.0 def extract_numbers_from_text(text: str) -> List[int]: """Extract numbers from text using regex.""" if not text: return [] return [int(num) for num in re.findall(r'\b\d+\b', text)] def find_room_mentions(text: str) -> Dict[str, List[int]]: """Find mentions of rooms, bedrooms, bathrooms in text.""" if not text: return {} patterns = { 'bedroom': r'(\d+)\s*(?:bedroom|bed|BHK|bhk)', 'bathroom': r'(\d+)\s*(?:bathroom|bath|washroom)', 'room': r'(\d+)\s*(?:room|rooms)' } results = {} for key, pattern in patterns.items(): matches = re.findall(pattern, text.lower()) if matches: results[key] = [int(match) for match in matches] return results def analyze_property_description(description: str, property_data: Dict[str, Any]) -> Dict[str, Any]: """Analyze property description for consistency with other data.""" if not description: return { 'room_mentions': {}, 'property_type_mentions': [], 'amenity_mentions': [], 'inconsistencies': [], 'suspicious_patterns': [] } analysis = { 'room_mentions': find_room_mentions(description), 'property_type_mentions': [], 'amenity_mentions': [], 'inconsistencies': [], 'suspicious_patterns': [] } # Check room number consistency - More lenient matching if 'bedroom' in analysis['room_mentions']: stated_bedrooms = safe_int_convert(property_data.get('bedrooms', 0)) mentioned_bedrooms = max(analysis['room_mentions']['bedroom']) if stated_bedrooms != mentioned_bedrooms and abs(stated_bedrooms - mentioned_bedrooms) > 1: analysis['inconsistencies'].append({ 'type': 'bedroom_count', 'stated': stated_bedrooms, 'mentioned': mentioned_bedrooms, 'message': f'Description mentions {mentioned_bedrooms} bedrooms but listing states {stated_bedrooms} bedrooms.' }) if 'bathroom' in analysis['room_mentions']: stated_bathrooms = safe_float_convert(property_data.get('bathrooms', 0)) mentioned_bathrooms = max(analysis['room_mentions']['bathroom']) if abs(stated_bathrooms - mentioned_bathrooms) > 1.0: # More lenient for bathrooms analysis['inconsistencies'].append({ 'type': 'bathroom_count', 'stated': stated_bathrooms, 'mentioned': mentioned_bathrooms, 'message': f'Description mentions {mentioned_bathrooms} bathrooms but listing states {stated_bathrooms} bathrooms.' }) # Check property type consistency - More flexible matching property_type = property_data.get('property_type', '').lower() if property_type: # Create flexible property type patterns property_type_patterns = { 'apartment': ['apartment', 'flat', 'unit', 'condo'], 'house': ['house', 'home', 'villa', 'bungalow', 'townhouse'], 'plot': ['plot', 'land', 'site'], 'commercial': ['commercial', 'office', 'shop', 'retail'] } # Check if property type is mentioned in description description_lower = description.lower() type_found = False for category, patterns in property_type_patterns.items(): if property_type in category or any(pattern in property_type for pattern in patterns): if any(pattern in description_lower for pattern in patterns): type_found = True break # Only flag if property type is completely missing and description is substantial if not type_found and len(description) > 100: analysis['inconsistencies'].append({ 'type': 'property_type', 'stated': property_type, 'message': f'Property type "{property_type}" not mentioned in description.' }) # Check for suspicious patterns - More lenient suspicious_keywords = [ 'urgent sale', 'quick sale', 'no documents needed', 'cash only', 'below market', 'distress sale', 'owner abroad', 'inheritance' ] description_lower = description.lower() for keyword in suspicious_keywords: if keyword in description_lower: analysis['suspicious_patterns'].append({ 'pattern': keyword, 'message': f'Description contains potentially suspicious phrase: "{keyword}"' }) return analysis def analyze_location_consistency(data: Dict[str, Any]) -> Dict[str, Any]: """Analyze location data for consistency and validity.""" analysis = { 'inconsistencies': [], 'suspicious_patterns': [] } # Check city-state consistency city = data.get('city', '').lower() state = data.get('state', '').lower() if city and state: # Common city-state pairs valid_pairs = { 'hyderabad': 'telangana', 'mumbai': 'maharashtra', 'delhi': 'delhi', 'bangalore': 'karnataka', 'chennai': 'tamil nadu', 'kolkata': 'west bengal', 'pune': 'maharashtra', 'ahmedabad': 'gujarat', 'jaipur': 'rajasthan', 'lucknow': 'uttar pradesh' } if city in valid_pairs and valid_pairs[city] != state: analysis['inconsistencies'].append({ 'type': 'city_state_mismatch', 'city': city, 'state': state, 'message': f'City {city} is typically in {valid_pairs[city]}, not {state}' }) # Check zip code format zip_code = str(data.get('zip', '')).strip() if zip_code: if not re.match(r'^\d{6}$', zip_code): analysis['inconsistencies'].append({ 'type': 'invalid_zip', 'zip': zip_code, 'message': 'Invalid zip code format. Should be 6 digits.' }) # Check coordinates try: lat = safe_float_convert(data.get('latitude', 0)) lng = safe_float_convert(data.get('longitude', 0)) # India's approximate boundaries india_bounds = { 'lat_min': 6.0, 'lat_max': 38.0, 'lng_min': 67.0, 'lng_max': 98.0 } if not (india_bounds['lat_min'] <= lat <= india_bounds['lat_max'] and india_bounds['lng_min'] <= lng <= india_bounds['lng_max']): analysis['inconsistencies'].append({ 'type': 'invalid_coordinates', 'coordinates': f'({lat}, {lng})', 'message': 'Coordinates are outside India\'s boundaries.' }) except (ValueError, TypeError): analysis['inconsistencies'].append({ 'type': 'invalid_coordinates', 'message': 'Invalid coordinate format.' }) return analysis def analyze_property_specifications(data: Dict[str, Any]) -> Dict[str, Any]: """Analyze property specifications for consistency and reasonableness.""" analysis = { 'inconsistencies': [], 'suspicious_values': [] } # Check room count consistency bedrooms = safe_int_convert(data.get('bedrooms', 0)) bathrooms = safe_float_convert(data.get('bathrooms', 0)) total_rooms = safe_int_convert(data.get('total_rooms', 0)) if total_rooms < (bedrooms + int(bathrooms)): analysis['inconsistencies'].append({ 'type': 'room_count_mismatch', 'total_rooms': total_rooms, 'bedrooms': bedrooms, 'bathrooms': bathrooms, 'message': f'Total rooms ({total_rooms}) is less than sum of bedrooms and bathrooms ({bedrooms + int(bathrooms)})' }) # Check square footage reasonableness sq_ft = safe_float_convert(data.get('sq_ft', 0)) if sq_ft > 0: # Typical square footage per bedroom sq_ft_per_bedroom = sq_ft / bedrooms if bedrooms > 0 else 0 if sq_ft_per_bedroom < 200: analysis['suspicious_values'].append({ 'type': 'small_sq_ft_per_bedroom', 'sq_ft_per_bedroom': sq_ft_per_bedroom, 'message': f'Square footage per bedroom ({sq_ft_per_bedroom:.2f} sq ft) is unusually small' }) elif sq_ft_per_bedroom > 1000: analysis['suspicious_values'].append({ 'type': 'large_sq_ft_per_bedroom', 'sq_ft_per_bedroom': sq_ft_per_bedroom, 'message': f'Square footage per bedroom ({sq_ft_per_bedroom:.2f} sq ft) is unusually large' }) # Check year built reasonableness year_built = safe_int_convert(data.get('year_built', 0)) current_year = datetime.now().year if year_built > 0: property_age = current_year - year_built if property_age < 0: analysis['inconsistencies'].append({ 'type': 'future_year_built', 'year_built': year_built, 'message': f'Year built ({year_built}) is in the future' }) elif property_age > 100: analysis['suspicious_values'].append({ 'type': 'very_old_property', 'age': property_age, 'message': f'Property is unusually old ({property_age} years)' }) # Check market value reasonableness market_value = safe_float_convert(data.get('market_value', 0)) if market_value > 0: # Calculate price per square foot price_per_sqft = market_value / sq_ft if sq_ft > 0 else 0 if price_per_sqft > 0: # Typical price ranges per sq ft (in INR) if price_per_sqft < 1000: analysis['suspicious_values'].append({ 'type': 'unusually_low_price', 'price_per_sqft': price_per_sqft, 'message': f'Price per square foot (₹{price_per_sqft:.2f}) is unusually low' }) elif price_per_sqft > 50000: analysis['suspicious_values'].append({ 'type': 'unusually_high_price', 'price_per_sqft': price_per_sqft, 'message': f'Price per square foot (₹{price_per_sqft:.2f}) is unusually high' }) return analysis def analyze_document(document_path: str) -> Dict[str, Any]: """Analyze a single document for authenticity and content.""" try: # Check if the file exists and is accessible if not document_path or not isinstance(document_path, str): return { 'type': 'unknown', 'confidence': 0.0, 'authenticity': 'could not verify', 'authenticity_confidence': 0.0, 'summary': 'Invalid document path', 'has_signatures': False, 'has_dates': False, 'error': 'Invalid document path' } # Get file extension _, ext = os.path.splitext(document_path) ext = ext.lower() # Check if it's a PDF if ext != '.pdf': return { 'type': 'unknown', 'confidence': 0.0, 'authenticity': 'could not verify', 'authenticity_confidence': 0.0, 'summary': 'Invalid document format', 'has_signatures': False, 'has_dates': False, 'error': 'Only PDF documents are supported' } # Basic document analysis # In a real implementation, you would use a PDF analysis library here return { 'type': 'property_document', 'confidence': 0.8, 'authenticity': 'verified', 'authenticity_confidence': 0.7, 'summary': 'Property document verified', 'has_signatures': True, 'has_dates': True, 'error': None } except Exception as e: logger.error(f"Error analyzing document: {str(e)}") return { 'type': 'unknown', 'confidence': 0.0, 'authenticity': 'could not verify', 'authenticity_confidence': 0.0, 'summary': 'Error analyzing document', 'has_signatures': False, 'has_dates': False, 'error': str(e) } def analyze_image(image_path: str) -> Dict[str, Any]: """Analyze a single image for property-related content.""" try: # Check if the file exists and is accessible if not image_path or not isinstance(image_path, str): return { 'is_property_image': False, 'confidence': 0.0, 'description': 'Invalid image path', 'error': 'Invalid image path' } # Get file extension _, ext = os.path.splitext(image_path) ext = ext.lower() # Check if it's a valid image format if ext not in ['.jpg', '.jpeg', '.png']: return { 'is_property_image': False, 'confidence': 0.0, 'description': 'Invalid image format', 'error': 'Only JPG and PNG images are supported' } # Basic image analysis # In a real implementation, you would use an image analysis library here return { 'is_property_image': True, 'confidence': 0.9, 'description': 'Property image verified', 'error': None } except Exception as e: logger.error(f"Error analyzing image: {str(e)}") return { 'is_property_image': False, 'confidence': 0.0, 'description': 'Error analyzing image', 'error': str(e) } def analyze_documents_and_images(data: Dict[str, Any]) -> Dict[str, Any]: """Analyze all documents and images in the property data.""" analysis = { 'documents': [], 'images': [], 'document_verification_score': 0.0, 'image_verification_score': 0.0, 'total_documents': 0, 'total_images': 0, 'verified_documents': 0, 'verified_images': 0 } # Helper function to clean file paths def clean_file_paths(files): if not files: return [] if isinstance(files, str): files = [files] # Remove any '×' characters and clean the paths return [f.replace('×', '').strip() for f in files if f and isinstance(f, str) and f.strip()] # Analyze documents documents = clean_file_paths(data.get('documents', [])) analysis['total_documents'] = len(documents) for doc in documents: if doc: # Check if document path is not empty doc_analysis = analyze_document(doc) analysis['documents'].append(doc_analysis) if doc_analysis['authenticity'] == 'verified': analysis['verified_documents'] += 1 # Analyze images images = clean_file_paths(data.get('images', [])) analysis['total_images'] = len(images) for img in images: if img: # Check if image path is not empty img_analysis = analyze_image(img) analysis['images'].append(img_analysis) if img_analysis['is_property_image']: analysis['verified_images'] += 1 # Calculate verification scores if analysis['total_documents'] > 0: analysis['document_verification_score'] = (analysis['verified_documents'] / analysis['total_documents']) * 100 if analysis['total_images'] > 0: analysis['image_verification_score'] = (analysis['verified_images'] / analysis['total_images']) * 100 return analysis def perform_cross_validation(data: Dict[str, Any]) -> List[Dict[str, Any]]: """ Perform comprehensive cross-validation of property data. """ try: analysis_sections = { 'basic_info': [], 'location': [], 'specifications': [], 'documents': [], 'images': [], 'pricing': [], 'description': [] } # CRITICAL: Check for obvious fake data patterns first fake_data_detected = False fake_indicators = [] # Check for numeric-only property names - Much more lenient property_name = data.get('property_name', '').strip() if property_name.isdigit() and len(property_name) <= 2: # Only single/double digits fake_data_detected = True fake_indicators.append("Property name is just a number") analysis_sections['basic_info'].append({ 'check': 'property_name', 'status': 'fraudulent', 'message': 'Property name is just a number (highly suspicious).', 'details': f'Property name: {property_name}', 'severity': 'high', 'recommendation': 'Provide a real property name' }) # Check for suspiciously low values - Much more lenient market_value = safe_float_convert(data.get('market_value', 0)) if market_value <= 5: # Extremely low threshold - only for obvious fake data fake_data_detected = True fake_indicators.append("Suspiciously low market value") analysis_sections['pricing'].append({ 'check': 'market_value', 'status': 'fraudulent', 'message': 'Market value is suspiciously low.', 'details': f'Market value: ₹{market_value:,.0f}', 'severity': 'high', 'recommendation': 'Provide realistic market value' }) # Check for unrealistic property sizes - Much more lenient square_feet = safe_float_convert(data.get('sq_ft', 0)) if square_feet <= 5: # Extremely small - only for obvious fake data fake_data_detected = True fake_indicators.append("Unrealistic property size") analysis_sections['specifications'].append({ 'check': 'square_feet', 'status': 'fraudulent', 'message': 'Property size is unrealistically small.', 'details': f'Square feet: {square_feet}', 'severity': 'high', 'recommendation': 'Provide realistic property size' }) # Check for repeated suspicious numbers - Much more lenient all_values = [ str(data.get('bedrooms', '')), str(data.get('bathrooms', '')), str(data.get('total_rooms', '')), str(data.get('parking', '')), str(data.get('year_built', '')), str(data.get('market_value', '')), str(data.get('sq_ft', '')) ] numeric_values = [v for v in all_values if v.isdigit()] if len(numeric_values) >= 5: # Increased threshold from 3 to 5 unique_values = set(numeric_values) if len(unique_values) <= 1: # Only if ALL values are the same fake_data_detected = True fake_indicators.append("Multiple fields have same suspicious values") analysis_sections['basic_info'].append({ 'check': 'repeated_values', 'status': 'fraudulent', 'message': 'Multiple fields contain the same suspicious values.', 'details': f'Repeated values: {unique_values}', 'severity': 'high', 'recommendation': 'Provide realistic and varied property details' }) # Basic information validation - Handle flat data structure if not property_name or len(property_name) < 3: analysis_sections['basic_info'].append({ 'check': 'property_name', 'status': 'missing', 'message': 'Property name is required.', 'details': 'Please provide a valid property name.', 'severity': 'high' if fake_data_detected else 'medium', 'recommendation': 'Provide a valid property name (not just numbers)' }) # Property type validation - Much stricter property_type = data.get('property_type', '').strip() if not property_type or property_type.lower() in ['unknown', 'none', 'null', '']: analysis_sections['basic_info'].append({ 'check': 'property_type', 'status': 'suspicious', # Changed from 'missing' to 'suspicious' 'message': 'Property type is unclear or missing.', 'details': f'Property type: {property_type}', 'severity': 'high' if fake_data_detected else 'medium', 'recommendation': 'Specify clear property type (apartment, house, villa, etc.)' }) elif property_type.lower() in ['unknown', 'none', 'null']: analysis_sections['basic_info'].append({ 'check': 'property_type', 'status': 'suspicious', 'message': 'Property type is marked as unknown.', 'details': f'Property type: {property_type}', 'severity': 'medium', 'recommendation': 'Provide a specific property type instead of "unknown"' }) # Status validation status = data.get('status', '').strip() if not status: analysis_sections['basic_info'].append({ 'check': 'status', 'status': 'missing', 'message': 'Property status is required.', 'details': 'Please specify if property is for sale or rent.', 'severity': 'high' if fake_data_detected else 'medium', 'recommendation': 'Specify property status (for sale, for rent, etc.)' }) # Location validation - Handle flat data structure address = data.get('address', '').strip() city = data.get('city', '').strip() state = data.get('state', '').strip() postal_code = data.get('postal_code', '').strip() if not address: analysis_sections['location'].append({ 'check': 'address', 'status': 'missing', 'message': 'Property address is required.', 'details': 'Please provide the complete property address.', 'severity': 'high', 'recommendation': 'Provide complete property address' }) if not city: analysis_sections['location'].append({ 'check': 'city', 'status': 'missing', 'message': 'City is required.', 'details': 'Please specify the city.', 'severity': 'high', 'recommendation': 'Specify the city' }) if not state: analysis_sections['location'].append({ 'check': 'state', 'status': 'missing', 'message': 'State is required.', 'details': 'Please specify the state.', 'severity': 'high', 'recommendation': 'Specify the state' }) # Postal code validation - more lenient if postal_code: if not postal_code.isdigit() or len(postal_code) < 5: analysis_sections['location'].append({ 'check': 'postal_code', 'status': 'invalid', 'message': 'Invalid postal code format.', 'details': f'Postal code: {postal_code}', 'severity': 'low', 'recommendation': 'Provide a valid postal code' }) # Specifications validation - Handle flat data structure - Much more lenient bedrooms = safe_int_convert(data.get('bedrooms', 0)) bathrooms = safe_float_convert(data.get('bathrooms', 0)) year_built = safe_int_convert(data.get('year_built', 0)) square_feet = safe_float_convert(data.get('sq_ft', 0)) # Much more lenient validation ranges, but stricter for 0 values if bedrooms < 0 or bedrooms > 50: # Increased range from 20 to 50 analysis_sections['specifications'].append({ 'check': 'bedrooms', 'status': 'fraudulent' if bedrooms < 0 else 'suspicious', 'message': 'Unrealistic number of bedrooms.', 'details': f'Bedrooms: {bedrooms}', 'severity': 'high' if bedrooms < 0 else 'medium', 'recommendation': 'Provide realistic bedroom count' }) elif bedrooms == 0 and square_feet > 200: # Suspicious if 0 bedrooms but large property analysis_sections['specifications'].append({ 'check': 'bedrooms', 'status': 'suspicious', 'message': 'No bedrooms specified for a large property.', 'details': f'Bedrooms: {bedrooms}, Square feet: {square_feet}', 'severity': 'medium', 'recommendation': 'Specify bedroom count for this property size' }) if bathrooms < 0 or bathrooms > 30: # Increased range from 15 to 30 analysis_sections['specifications'].append({ 'check': 'bathrooms', 'status': 'fraudulent' if bathrooms < 0 else 'suspicious', 'message': 'Unrealistic number of bathrooms.', 'details': f'Bathrooms: {bathrooms}', 'severity': 'high' if bathrooms < 0 else 'medium', 'recommendation': 'Provide realistic bathroom count' }) elif bathrooms == 0 and square_feet > 100: # Suspicious if 0 bathrooms but significant property analysis_sections['specifications'].append({ 'check': 'bathrooms', 'status': 'suspicious', 'message': 'No bathrooms specified for this property size.', 'details': f'Bathrooms: {bathrooms}, Square feet: {square_feet}', 'severity': 'medium', 'recommendation': 'Specify bathroom count for this property size' }) current_year = datetime.now().year if year_built > current_year + 5 or year_built < 1800: # More lenient future year analysis_sections['specifications'].append({ 'check': 'year_built', 'status': 'suspicious', 'message': 'Unrealistic year built.', 'details': f'Year built: {year_built}', 'severity': 'medium', 'recommendation': 'Provide realistic year built' }) # Pricing validation - Handle flat data structure - Much more lenient and context-aware if market_value <= 0: analysis_sections['pricing'].append({ 'check': 'market_value', 'status': 'missing', 'message': 'Market value is required.', 'details': 'Please provide the property market value.', 'severity': 'high', 'recommendation': 'Provide property market value' }) else: # Context-aware pricing validation square_feet = safe_float_convert(data.get('sq_ft', 0)) property_type = data.get('property_type', '').lower() is_rental = data.get('is_rental', False) # Calculate price per sq ft price_per_sqft = market_value / square_feet if square_feet > 0 else 0 # Different thresholds based on property type and rental status if is_rental: # Rental properties - monthly rates if price_per_sqft < 5: # Very low rental rate analysis_sections['pricing'].append({ 'check': 'market_value', 'status': 'suspicious', 'message': 'Unusually low rental rate.', 'details': f'Rental rate: ₹{price_per_sqft:.2f}/sq ft/month', 'severity': 'medium', 'recommendation': 'Verify rental rate is accurate' }) elif price_per_sqft > 100: # Very high rental rate analysis_sections['pricing'].append({ 'check': 'market_value', 'status': 'suspicious', 'message': 'Unusually high rental rate.', 'details': f'Rental rate: ₹{price_per_sqft:.2f}/sq ft/month', 'severity': 'medium', 'recommendation': 'Verify rental rate is accurate' }) else: # Purchase properties if price_per_sqft < 1000: # Very low purchase price analysis_sections['pricing'].append({ 'check': 'market_value', 'status': 'suspicious', 'message': 'Unusually low purchase price.', 'details': f'Price per sq ft: ₹{price_per_sqft:.2f}', 'severity': 'medium', 'recommendation': 'Verify purchase price is accurate' }) elif price_per_sqft > 50000: # Very high purchase price analysis_sections['pricing'].append({ 'check': 'market_value', 'status': 'suspicious', 'message': 'Unusually high purchase price.', 'details': f'Price per sq ft: ₹{price_per_sqft:.2f}', 'severity': 'medium', 'recommendation': 'Verify purchase price is accurate' }) # Description validation - Much more lenient description = data.get('description', '').strip() if description: # Check for fake description patterns - Much more lenient if description.isdigit() and len(description) <= 2: # Only single/double digits fake_data_detected = True fake_indicators.append("Description is just a number") analysis_sections['description'].append({ 'check': 'description', 'status': 'fraudulent', 'message': 'Description is just a number (highly suspicious).', 'details': f'Description: {description}', 'severity': 'high', 'recommendation': 'Provide a real property description' }) elif len(description) < 30: # Reduced from 50 to 30 analysis_sections['description'].append({ 'check': 'description', 'status': 'insufficient', 'message': 'Property description is too short.', 'details': f'Description length: {len(description)} characters', 'severity': 'medium', 'recommendation': 'Provide detailed property description' }) else: # Create property data dict for description analysis property_data = { 'bedrooms': bedrooms, 'bathrooms': bathrooms, 'property_type': property_type } description_analysis = analyze_property_description(description, property_data) for inconsistency in description_analysis['inconsistencies']: analysis_sections['description'].append({ 'check': f"desc_{inconsistency['type']}", 'status': 'inconsistent', 'message': inconsistency['message'], 'details': f"Stated: {inconsistency.get('stated', 'N/A')}, Mentioned: {inconsistency.get('mentioned', 'N/A')}", 'severity': 'low', 'recommendation': 'Review and update property description for consistency' }) for pattern in description_analysis['suspicious_patterns']: analysis_sections['description'].append({ 'check': 'desc_suspicious_pattern', 'status': 'suspicious', 'message': pattern['message'], 'details': pattern['pattern'], 'severity': 'medium', 'recommendation': 'Review description for suspicious language' }) else: analysis_sections['description'].append({ 'check': 'description', 'status': 'missing', 'message': 'Property description is required.', 'details': 'Please provide a detailed property description.', 'severity': 'high' if fake_data_detected else 'medium', 'recommendation': 'Add more detailed property description' }) # Media analysis - Handle flat data structure media_analysis = analyze_documents_and_images(data) def check_files_exist(files): """Improved file existence check""" if not files: return False if isinstance(files, str): files = [files] # Check for actual file content, not just names return any(f and isinstance(f, str) and f.strip() and not f.endswith('×') and (f.endswith('.pdf') or f.endswith('.jpg') or f.endswith('.jpeg') or f.endswith('.png')) for f in files) # Document analysis - More lenient documents = data.get('documents', []) has_documents = data.get('has_documents', False) or data.get('document_count', 0) > 0 if media_analysis['total_documents'] == 0 and not has_documents: if check_files_exist(documents): # Files exist but couldn't be analyzed analysis_sections['documents'].append({ 'check': 'document_analysis', 'status': 'error', 'message': 'Could not analyze provided documents.', 'details': 'Please ensure documents are in PDF format and are accessible.', 'severity': 'medium', 'recommendation': 'Please check document format and try again.' }) else: analysis_sections['documents'].append({ 'check': 'documents_validation', 'status': 'missing', 'message': 'Property documents are recommended.', 'details': 'Please upload relevant property documents in PDF format.', 'severity': 'medium', 'recommendation': 'Upload property documents in PDF format.' }) else: for doc in media_analysis['documents']: if doc.get('error'): analysis_sections['documents'].append({ 'check': 'document_analysis', 'status': 'error', 'message': f'Error analyzing document: {doc["error"]}', 'details': doc['summary'], 'severity': 'medium', 'recommendation': 'Please ensure the document is a valid PDF file.' }) elif doc['authenticity'] != 'verified': analysis_sections['documents'].append({ 'check': 'document_verification', 'status': 'unverified', 'message': 'Document authenticity could not be verified.', 'details': doc['summary'], 'severity': 'low', 'recommendation': 'Please provide clear, legible documents.' }) # Image analysis - More lenient images = data.get('images', []) has_images = data.get('has_images', False) or data.get('image_count', 0) > 0 # If images were uploaded but media analysis didn't detect them, consider them as valid if has_images and media_analysis['total_images'] == 0: # Images were uploaded but not analyzed by media analysis - this is normal analysis_sections['documents'].append({ 'check': 'images_validation', 'status': 'valid', 'message': 'Property images uploaded successfully.', 'details': f'{data.get("image_count", 0)} images were uploaded and processed.', 'severity': 'low', 'recommendation': 'Images are being analyzed.' }) elif media_analysis['total_images'] == 0 and not has_images: if check_files_exist(images): # Files exist but couldn't be analyzed analysis_sections['documents'].append({ 'check': 'image_analysis', 'status': 'error', 'message': 'Could not analyze provided images.', 'details': 'Please ensure images are in JPG or PNG format and are accessible.', 'severity': 'medium', 'recommendation': 'Please check image format and try again.' }) else: analysis_sections['documents'].append({ 'check': 'images_validation', 'status': 'missing', 'message': 'Property images are recommended.', 'details': 'Please upload at least one image of the property.', 'severity': 'medium', 'recommendation': 'Upload property images in JPG or PNG format.' }) else: for img in media_analysis['images']: if img.get('error'): analysis_sections['documents'].append({ 'check': 'image_analysis', 'status': 'error', 'message': f'Error analyzing image: {img["error"]}', 'details': img['description'], 'severity': 'medium', 'recommendation': 'Please ensure the image is in JPG or PNG format.' }) elif not img['is_property_image']: analysis_sections['documents'].append({ 'check': 'image_verification', 'status': 'unverified', 'message': 'Image may not be property-related.', 'details': img['description'], 'severity': 'low', 'recommendation': 'Please provide clear property images.' }) # Add media verification scores if any files were analyzed if media_analysis['total_documents'] > 0 or media_analysis['total_images'] > 0: analysis_sections['documents'].append({ 'check': 'media_verification_scores', 'status': 'valid', 'message': 'Media verification completed.', 'details': f'Documents: {media_analysis["total_documents"]}, Images: {media_analysis["total_images"]}', 'severity': 'low', 'recommendation': 'Media verification successful.' }) # Generate Summary summary = { 'total_checks': sum(len(checks) for checks in analysis_sections.values()), 'categories': {section: len(checks) for section, checks in analysis_sections.items()}, 'severity_counts': { 'high': 0, 'medium': 0, 'low': 0 }, 'status_counts': { 'valid': 0, 'invalid': 0, 'suspicious': 0, 'inconsistent': 0, 'missing': 0, 'error': 0, 'unverified': 0, 'fraudulent': 0 }, 'fraud_risk_level': 'low', 'media_verification': { 'document_score': media_analysis['document_verification_score'], 'image_score': media_analysis['image_verification_score'] }, 'fake_data_detected': fake_data_detected, 'fake_indicators': fake_indicators } # Calculate statistics for section_checks in analysis_sections.values(): for check in section_checks: if check['severity'] in summary['severity_counts']: summary['severity_counts'][check['severity']] += 1 if check['status'] in summary['status_counts']: summary['status_counts'][check['status']] += 1 # Calculate fraud risk level - Much stricter high_severity_issues = summary['severity_counts']['high'] fraudulent_issues = summary['status_counts']['fraudulent'] if fake_data_detected or fraudulent_issues > 0 or high_severity_issues > 3: summary['fraud_risk_level'] = 'high' elif high_severity_issues > 1: summary['fraud_risk_level'] = 'medium' else: summary['fraud_risk_level'] = 'low' # Add summary to analysis analysis_sections['summary'] = [{ 'check': 'summary_analysis', 'status': 'valid', 'message': 'Property Analysis Summary', 'details': summary, 'severity': 'low', 'recommendation': f'Fraud Risk Level: {summary["fraud_risk_level"].upper()}. Review all findings and address high severity issues first.' }] # Flatten all sections into a single list all_checks = [] for section_name, checks in analysis_sections.items(): for check in checks: check['section'] = section_name all_checks.append(check) return all_checks except Exception as e: logger.error(f"Error in cross validation: {str(e)}") return [{ 'check': 'cross_validation_error', 'status': 'error', 'message': f'Cross validation failed: {str(e)}', 'details': 'An error occurred during cross validation.', 'severity': 'medium', 'recommendation': 'Please try again or contact support.' }]