from flask import Flask, render_template, request, jsonify import google.generativeai as genai import pandas as pd import numpy as np import joblib import os import json from datetime import datetime import re from typing import Dict, List, Any, Optional import logging from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestRegressor from sklearn.preprocessing import StandardScaler from sklearn.pipeline import Pipeline from sklearn.metrics import mean_absolute_error, r2_score import sklearn # Import sklearn to get version app = Flask(__name__) # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Configure Gemini API GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', 'AIzaSyAU8NFxxscrLcPcYqp5bnnyBPTVH0v1aZY') # IMPORTANT: Replace with a secure way to manage API keys genai.configure(api_key=GEMINI_API_KEY) # Using gemini-1.5-flash for potentially larger context window and better performance model = genai.GenerativeModel('gemini-2.0-flash') class EnhancedRealEstateSalesAssistant: def __init__(self): self.pipeline = None # Initialize pipeline to None self.training_feature_names = [] # To store feature names from training self.model_version = 'N/A' self.load_model_and_data() self.conversation_history = [] self.client_preferences = {} self.last_search_results = None self.context_memory = [] def _clean_gemini_text(self, text: str) -> str: """Removes common Markdown formatting characters (like asterisks, hashes) from Gemini's output.""" if not isinstance(text, str): return text # Return as is if not a string # Remove bold/italic markers (**) cleaned_text = re.sub(r'\*\*', '', text) # Remove italic markers (*) cleaned_text = re.sub(r'\*', '', cleaned_text) # Remove header markers (#) at the start of a line, and any following whitespace cleaned_text = re.sub(r'^\s*#+\s*', '', cleaned_text, flags=re.MULTILINE) # Remove typical list bullet points like '- ' or '+ ' or '* ' at the start of a line # (Note: '*' removal above might catch list items, this is for redundancy/clarity) cleaned_text = re.sub(r'^\s*[-+]\s+', '', cleaned_text, flags=re.MULTILINE) # Replace multiple newlines with a single one (or two, depending on desired paragraph spacing) cleaned_text = re.sub(r'\n\s*\n', '\n\n', cleaned_text) # Remove excessive leading/trailing whitespace from each line cleaned_text = '\n'.join([line.strip() for line in cleaned_text.splitlines()]) # Final strip for the whole block cleaned_text = cleaned_text.strip() return cleaned_text def load_model_and_data(self): """Load the trained model and dataset with version compatibility""" try: # Load the dataset first self.df = pd.read_csv('Pune_House_Data.csv') # Clean and process data self.df = self.clean_dataset() # Extract unique values for filtering self.locations = sorted(self.df['site_location'].unique().tolist()) self.area_types = sorted(self.df['area_type'].unique().tolist()) self.availability_types = sorted(self.df['availability'].unique().tolist()) self.bhk_options = sorted(self.df['bhk'].unique().tolist()) # Try to load model with version compatibility model_loaded = self.load_model_with_version_check() if not model_loaded: logger.warning("Model loading failed, prediction will use fallback method.") self.pipeline = None # Ensure pipeline is None if loading failed logger.info("✅ Model and data loading process completed.") logger.info(f"Dataset shape: {self.df.shape}") logger.info(f"Available locations: {len(self.locations)}") except Exception as e: logger.error(f"❌ Critical error loading model/data: {e}") # Create fallback data self.df = self.create_fallback_data() self.locations = ['Baner', 'Hadapsar', 'Kothrud', 'Viman Nagar', 'Wakad', 'Hinjewadi'] self.area_types = ['Built-up Area', 'Super built-up Area', 'Plot Area', 'Carpet Area'] self.availability_types = ['Ready To Move', 'Not Ready', 'Under Construction'] self.bhk_options = [1, 2, 3, 4, 5, 6] self.pipeline = None # Ensure pipeline is None if data loading failed def load_model_with_version_check(self): """Load model with version compatibility check, or retrain if necessary.""" try: # First try to load the new version-compatible model try: model_data = joblib.load('house_price_prediction_v2.pkl') self.pipeline = model_data['pipeline'] self.training_feature_names = model_data.get('feature_names', []) self.model_version = model_data.get('version', '2.0') logger.info(f"✅ Loaded model version {self.model_version} with {len(self.training_feature_names)} features.") return True except FileNotFoundError: logger.info("New model version (house_price_prediction_v2.pkl) not found, attempting to retrain.") return self.retrain_model_with_current_version() except Exception as v2_load_error: logger.error(f"Failed to load new model version (v2): {v2_load_error}. Attempting retraining.") return self.retrain_model_with_current_version() except Exception as e: logger.error(f"Error in model loading process: {e}. Retraining will be attempted as a last resort.") return self.retrain_model_with_current_version() # Final attempt to retrain if other loading fails def retrain_model_with_current_version(self): """Retrain the model with current scikit-learn version""" try: logger.info(f"🔄 Retraining model with scikit-learn version {sklearn.__version__}") # Prepare features (this method also sets self.training_feature_names) X, y = self.create_training_data_with_proper_features() if X is None or y is None: logger.error("Failed to create training data, cannot retrain model.") self.pipeline = None return False # Split data X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) # Create pipeline pipeline = Pipeline([ ('scaler', StandardScaler()), ('model', RandomForestRegressor( n_estimators=100, random_state=42, n_jobs=-1 # Use all available cores )) ]) # Train model logger.info("Training model...") pipeline.fit(X_train, y_train) # Test model y_pred = pipeline.predict(X_test) mae = mean_absolute_error(y_test, y_pred) r2 = r2_score(y_test, y_pred) logger.info(f"✅ Model trained successfully:") logger.info(f" MAE: {mae:.2f} lakhs") logger.info(f" R² Score: {r2:.3f}") # Save the new model self.pipeline = pipeline # self.training_feature_names is already set by create_training_data_with_proper_features model_data = { 'pipeline': self.pipeline, 'feature_names': self.training_feature_names, 'version': '2.0', 'sklearn_version': sklearn.__version__, 'training_info': { 'mae': mae, 'r2_score': r2, 'training_samples': len(X_train), 'test_samples': len(X_test), 'feature_count': len(self.training_feature_names) }, 'created_at': datetime.now().isoformat() } # Save with version info joblib.dump(model_data, 'house_price_prediction_v2.pkl') logger.info("✅ Model saved as house_price_prediction_v2.pkl") return True except Exception as e: logger.error(f"Error retraining model: {e}") self.pipeline = None return False def create_training_data_with_proper_features(self): """Create training data with proper feature engineering for consistent predictions""" try: # Prepare features for training df_features = self.df.copy() # Select relevant columns for training feature_columns = ['site_location', 'area_type', 'availability', 'bhk', 'bath', 'balcony', 'total_sqft'] # Check if all required columns exist missing_columns = [col for col in feature_columns + ['price'] if col not in df_features.columns] if missing_columns: logger.error(f"Missing essential columns for training: {missing_columns}") return None, None df_features = df_features[feature_columns + ['price']] # Clean data (ensure numeric columns are correct, fillna where appropriate) df_features['bhk'] = pd.to_numeric(df_features['bhk'], errors='coerce') df_features['bath'] = pd.to_numeric(df_features['bath'], errors='coerce').fillna(df_features['bath'].median()) df_features['balcony'] = pd.to_numeric(df_features['balcony'], errors='coerce').fillna(df_features['balcony'].median()) df_features['total_sqft'] = pd.to_numeric(df_features['total_sqft'], errors='coerce') df_features['price'] = pd.to_numeric(df_features['price'], errors='coerce') # Remove rows with critical missing values for training df_features = df_features.dropna(subset=['price', 'total_sqft', 'bhk', 'site_location']) # Remove outliers before one-hot encoding for more robust training df_features = self.remove_outliers(df_features) # One-hot encode categorical variables categorical_cols = ['site_location', 'area_type', 'availability'] df_encoded = pd.get_dummies(df_features, columns=categorical_cols, prefix=categorical_cols, dummy_na=False) # Separate features and target X = df_encoded.drop('price', axis=1) y = df_encoded['price'] # Store feature names for later use in prediction self.training_feature_names = X.columns.tolist() logger.info(f"Training data prepared: {X.shape[0]} samples, {X.shape[1]} features") return X, y except Exception as e: logger.error(f"Error creating training data: {e}") return None, None def remove_outliers(self, df): """Remove outliers from the dataset based on price and area.""" try: # Calculate price_per_sqft for outlier removal, then drop it if not needed as a feature df['price_per_sqft'] = (df['price'] * 100000) / df['total_sqft'] # Remove price outliers (e.g., beyond 3 standard deviations from mean) price_mean = df['price'].mean() price_std = df['price'].std() df = df[abs(df['price'] - price_mean) <= 3 * price_std] # Remove area outliers (e.g., beyond 3 standard deviations from mean) area_mean = df['total_sqft'].mean() area_std = df['total_sqft'].std() df = df[abs(df['total_sqft'] - area_mean) <= 3 * area_std] # Remove properties with unrealistic price per sqft (e.g., <1000 or >20000 INR/sqft for Pune) df = df[(df['price_per_sqft'] >= 1000) & (df['price_per_sqft'] <= 20000)] # Drop temporary price_per_sqft column df = df.drop(columns=['price_per_sqft']) logger.info(f"Dataset after outlier removal: {len(df)} rows") return df except Exception as e: logger.error(f"Error removing outliers: {e}") return df # Return original df if error occurs def predict_single_price(self, location, bhk, bath, balcony, sqft, area_type, availability): """Predict price for a single property using ML model, with fallback to heuristic.""" try: # Check if ML pipeline is loaded if not hasattr(self, 'pipeline') or self.pipeline is None or not self.training_feature_names: logger.warning("ML pipeline or training features not available. Using fallback prediction.") return self.predict_single_price_fallback(location, bhk, bath, balcony, sqft, area_type, availability) try: # Create a feature dictionary initialized with 0s for all expected features feature_dict = {feature: 0 for feature in self.training_feature_names} # Set numerical features numerical_mapping = { 'bhk': bhk, 'bath': bath, 'balcony': balcony, 'total_sqft': sqft } for feature, value in numerical_mapping.items(): if feature in feature_dict: feature_dict[feature] = value else: logger.warning(f"Numerical feature '{feature}' not found in training features. Skipping.") # Set categorical features (one-hot encoded) # Location location_key = f"site_location_{location}" if location_key in feature_dict: feature_dict[location_key] = 1 else: found_loc = False for feature in self.training_feature_names: if feature.startswith('site_location_') and location.lower() in feature.lower(): feature_dict[feature] = 1 found_loc = True break if not found_loc: logger.warning(f"Location '{location}' not found as exact or fuzzy match in training features. This location will not contribute to the ML prediction specific to its one-hot encoding.") # Area Type area_type_key = f"area_type_{area_type}" if area_type_key in feature_dict: feature_dict[area_type_key] = 1 else: found_area_type = False for feature in self.training_feature_names: if feature.startswith('area_type_') and area_type.lower() in feature.lower(): feature_dict[feature] = 1 found_area_type = True break if not found_area_type: logger.warning(f"Area type '{area_type}' not found in training features. This area type will not contribute to the ML prediction specific to its one-hot encoding.") # Availability availability_key = f"availability_{availability}" if availability_key in feature_dict: feature_dict[availability_key] = 1 else: found_availability = False for feature in self.training_feature_names: if feature.startswith('availability_') and availability.lower() in feature.lower(): feature_dict[feature] = 1 found_availability = True break if not found_availability: logger.warning(f"Availability '{availability}' not found in training features. This availability will not contribute to the ML prediction specific to its one-hot encoding.") # Create DataFrame from the feature dictionary and ensure column order input_df = pd.DataFrame([feature_dict])[self.training_feature_names] predicted_price = self.pipeline.predict(input_df)[0] return round(predicted_price, 2) except Exception as ml_prediction_error: logger.warning(f"ML prediction failed for input {location, bhk, sqft}: {ml_prediction_error}. Falling back to heuristic model.") return self.predict_single_price_fallback(location, bhk, bath, balcony, sqft, area_type, availability) except Exception as general_error: logger.error(f"General error in predict_single_price wrapper: {general_error}. Falling back to heuristic.") return self.predict_single_price_fallback(location, bhk, bath, balcony, sqft, area_type, availability) def predict_single_price_fallback(self, location, bhk, bath, balcony, sqft, area_type, availability): """Enhanced fallback prediction method when ML model is unavailable or fails.""" try: # Base price per sqft for different areas in Pune (extensive map for better fallback) location_price_map = { 'Koregaon Park': 12000, 'Kalyani Nagar': 11000, 'Boat Club Road': 10500, 'Aundh': 9500, 'Baner': 8500, 'Balewadi': 8000, 'Kothrud': 8500, 'Viman Nagar': 7500, 'Wakad': 7000, 'Hinjewadi': 7500, 'Pune': 7000, 'Hadapsar': 6500, 'Kharadi': 7200, 'Yerawada': 7000, 'Lohegaon': 6000, 'Wagholi': 5500, 'Mundhwa': 7000, 'Undri': 6500, 'Kondhwa': 6000, 'Katraj': 5800, 'Dhankawadi': 6000, 'Warje': 6500, 'Karve Nagar': 7500, 'Bavdhan': 8000, 'Pashan': 7500, 'Sus': 7200, 'Pimpri': 6000, 'Chinchwad': 6200, 'Akurdi': 5800, 'Nigdi': 6500, 'Bhosari': 5500, 'Chakan': 4800, 'Talegaon': 4500, 'Alandi': 4200, 'Dehu': 4000, 'Lonavala': 5000, 'Kamshet': 4500, 'Sinhagad Road': 6800, 'Balaji Nagar': 5800, 'Parvati': 5500, 'Gultekdi': 5200, 'Wanowrie': 6200, 'Shivajinagar': 9000, 'Deccan': 8500, 'Camp': 7500, 'Koregaon': 8000, 'Sadashiv Peth': 8200, 'Shukrawar Peth': 7800, 'Kasba Peth': 7500, 'Narayan Peth': 7200, 'Rasta Peth': 7000, 'Ganj Peth': 6800, 'Magarpatta': 7500, 'Fursungi': 5000, 'Handewadi': 4800, 'Mahatma Phule Peth': 6200, 'Bhavani Peth': 6000, 'Bibwewadi': 7000 } # Get base price per sqft, try fuzzy matching if exact match not found base_price_per_sqft = location_price_map.get(location, 6500) # Default if no match if location not in location_price_map: for loc_key, price in location_price_map.items(): if location.lower() in loc_key.lower() or loc_key.lower() in location.lower(): base_price_per_sqft = price break # Area type multiplier area_type_multiplier = { 'Super built-up Area': 1.0, 'Built-up Area': 0.92, # Typically slightly less than super built-up for same listed price 'Plot Area': 0.85, # Plot area might translate to lower built-up 'Carpet Area': 1.08 # Carpet area is typically more expensive per sqft }.get(area_type, 1.0) # Availability multiplier (Under Construction/Not Ready typically cheaper) availability_multiplier = { 'Ready To Move': 1.0, 'Under Construction': 0.88, 'Not Ready': 0.82 }.get(availability, 0.95) # BHK-based pricing adjustments (larger BHKs might have slightly lower price/sqft due to bulk) bhk_multiplier = { 1: 1.15, # Premium for 1BHK 2: 1.0, # Base 3: 0.95, # Slight economy 4: 0.92, 5: 0.90, 6: 0.88 }.get(bhk, 1.0) # Bathroom and Balcony multipliers (simple linear adjustment) bath_multiplier = 1.0 + (bath - 2) * 0.03 # Each extra bath adds 3% to price balcony_multiplier = 1.0 + (balcony - 1) * 0.02 # Each extra balcony adds 2% to price # Ensure multipliers don't go negative or too low bath_multiplier = max(0.9, bath_multiplier) balcony_multiplier = max(0.95, balcony_multiplier) # Calculate final estimated price estimated_price_raw = (sqft * base_price_per_sqft * area_type_multiplier * availability_multiplier * bhk_multiplier * bath_multiplier * balcony_multiplier) estimated_price_lakhs = estimated_price_raw / 100000 return round(estimated_price_lakhs, 2) except Exception as e: logger.error(f"Error in fallback prediction: {e}") return None # Indicate failure def clean_dataset(self): """Clean and process the dataset""" df = self.df.copy() # Rename 'size' to 'bhk' based on previous logic (if 'size' exists) if 'size' in df.columns and 'bhk' not in df.columns: df['bhk'] = df['size'].str.extract(r'(\d+)').astype(float) elif 'bhk' not in df.columns: df['bhk'] = np.nan # Ensure bhk column exists even if 'size' is missing # Clean total_sqft column df['total_sqft'] = pd.to_numeric(df['total_sqft'], errors='coerce') # Clean price column df['price'] = pd.to_numeric(df['price'], errors='coerce') # Fill missing values for numerical columns (using median for robustness) df['bath'] = pd.to_numeric(df['bath'], errors='coerce').fillna(df['bath'].median()) df['balcony'] = pd.to_numeric(df['balcony'], errors='coerce').fillna(df['balcony'].median()) # Fill missing values for categorical columns df['society'] = df['society'].fillna('Not Specified') df['area_type'] = df['area_type'].fillna('Super built-up Area') # Common default df['availability'] = df['availability'].fillna('Ready To Move') # Common default df['site_location'] = df['site_location'].fillna('Pune') # General default # Remove rows with critical missing values after initial cleaning df = df.dropna(subset=['price', 'total_sqft', 'bhk', 'site_location']) # Ensure 'bhk' is an integer (or float if decimals are possible) df['bhk'] = df['bhk'].astype(int) df['bath'] = df['bath'].astype(int) df['balcony'] = df['balcony'].astype(int) logger.info(f"Dataset cleaned. Original rows: {self.df.shape[0]}, Cleaned rows: {df.shape[0]}") return df def create_fallback_data(self): """Create fallback data if CSV loading fails""" logger.warning("Creating fallback dataset due to CSV loading failure.") fallback_data = { 'area_type': ['Super built-up Area', 'Built-up Area', 'Carpet Area'] * 10, 'availability': ['Ready To Move', 'Under Construction', 'Ready To Move'] * 10, 'size': ['2 BHK', '3 BHK', '4 BHK'] * 10, # Keep 'size' for cleaning process if needed 'society': ['Sample Society'] * 30, 'total_sqft': [1000, 1200, 1500] * 10, 'bath': [2, 3, 4] * 10, 'balcony': [1, 2, 3] * 10, 'price': [50, 75, 100] * 10, 'site_location': ['Baner', 'Hadapsar', 'Kothrud'] * 10, } df_fallback = pd.DataFrame(fallback_data) # Ensure 'bhk' is generated if 'size' is used, and other derived columns if 'size' in df_fallback.columns: df_fallback['bhk'] = df_fallback['size'].str.extract(r'(\d+)').astype(float) else: df_fallback['bhk'] = [2, 3, 4] * 10 # Direct bhk if 'size' is not used # Add a placeholder for price_per_sqft as it's used in analysis/outlier removal df_fallback['price_per_sqft'] = (df_fallback['price'] * 100000) / df_fallback['total_sqft'] return df_fallback def filter_properties(self, filters: Dict[str, Any]) -> pd.DataFrame: """Filter properties based on user criteria""" filtered_df = self.df.copy() # Apply filters if filters.get('location'): # Use a more robust search for location, allowing partial or case-insensitive matches search_location = filters['location'].lower() filtered_df = filtered_df[filtered_df['site_location'].str.lower().str.contains(search_location, na=False)] if filters.get('bhk') is not None: # Check for None explicitly as bhk can be 0 filtered_df = filtered_df[filtered_df['bhk'] == filters['bhk']] if filters.get('min_price'): filtered_df = filtered_df[filtered_df['price'] >= filters['min_price']] if filters.get('max_price'): filtered_df = filtered_df[filtered_df['price'] <= filters['max_price']] if filters.get('min_area'): filtered_df = filtered_df[filtered_df['total_sqft'] >= filters['min_area']] if filters.get('max_area'): filtered_df = filtered_df[filtered_df['total_sqft'] <= filters['max_area']] if filters.get('area_type'): search_area_type = filters['area_type'].lower() filtered_df = filtered_df[filtered_df['area_type'].str.lower().str.contains(search_area_type, na=False)] if filters.get('availability'): search_availability = filters['availability'].lower() filtered_df = filtered_df[filtered_df['availability'].str.lower().str.contains(search_availability, na=False)] return filtered_df.head(20) # Limit results for display def get_price_range_analysis(self, filtered_df: pd.DataFrame) -> Dict[str, Any]: """Analyze price range and provide insights""" if filtered_df.empty: return { "total_properties": 0, "price_range": {"min": 0.0, "max": 0.0, "avg": 0.0, "median": 0.0}, "area_range": {"min": 0.0, "max": 0.0, "avg": 0.0}, "price_per_sqft": {"min": 0.0, "max": 0.0, "avg": 0.0}, "location_distribution": {}, "area_type_distribution": {}, "availability_distribution": {} } analysis = { "total_properties": len(filtered_df), "price_range": { "min": float(filtered_df['price'].min()), "max": float(filtered_df['price'].max()), "avg": float(filtered_df['price'].mean()), "median": float(filtered_df['price'].median()) }, "area_range": { "min": float(filtered_df['total_sqft'].min()), "max": float(filtered_df['total_sqft'].max()), "avg": float(filtered_df['total_sqft'].mean()) }, # Recalculate price_per_sqft for analysis as it might be dropped after outlier removal "price_per_sqft": { "min": float(((filtered_df['price'] * 100000) / filtered_df['total_sqft']).min()), "max": float(((filtered_df['price'] * 100000) / filtered_df['total_sqft']).max()), "avg": float(((filtered_df['price'] * 100000) / filtered_df['total_sqft']).mean()) }, "location_distribution": filtered_df['site_location'].value_counts().to_dict(), "area_type_distribution": filtered_df['area_type'].value_counts().to_dict(), "availability_distribution": filtered_df['availability'].value_counts().to_dict() } return analysis def predict_prices_for_filtered_results(self, filtered_df: pd.DataFrame) -> pd.DataFrame: """Predict prices for all filtered properties and add to DataFrame.""" if filtered_df.empty: return filtered_df predictions = [] for index, row in filtered_df.iterrows(): predicted_price = self.predict_single_price( location=row['site_location'], bhk=row['bhk'], bath=row['bath'], balcony=row['balcony'], sqft=row['total_sqft'], area_type=row['area_type'], availability=row['availability'] ) # Use original price if prediction fails (returns None) predictions.append(predicted_price if predicted_price is not None else row['price']) filtered_df['predicted_price'] = predictions filtered_df['price_difference'] = filtered_df['predicted_price'] - filtered_df['price'] # Avoid division by zero if original price is 0 filtered_df['price_difference_pct'] = ( filtered_df['price_difference'] / filtered_df['price'].replace(0, np.nan) * 100 ).fillna(0) # Fill NaN with 0 if original price was 0 or prediction failed return filtered_df def generate_deep_insights(self, filtered_df: pd.DataFrame, analysis: Dict[str, Any]) -> str: """Generate deep insights using Gemini AI""" try: # Prepare context for Gemini properties_sample_str = filtered_df[['site_location', 'bhk', 'total_sqft', 'price', 'predicted_price', 'price_difference_pct']].head().to_string(index=False) if properties_sample_str: properties_sample_str = f"Top 5 Properties Sample:\n{properties_sample_str}\n" else: properties_sample_str = "No specific property samples available to list." context = f""" You are a highly experienced real estate market expert specializing in Pune properties. Your task is to analyze the provided property data and offer comprehensive, actionable insights for a potential property buyer. Here's a summary of the current search results: - Total Properties Found: {analysis['total_properties']} - Price Range: ₹{analysis['price_range']['min']:.2f}L to ₹{analysis['price_range']['max']:.2f}L - Average Price: ₹{analysis['price_range']['avg']:.2f}L - Median Price: ₹{analysis['price_range']['median']:.2f}L - Average Area: {analysis['area_range']['avg']:.0f} sq ft - Average Price per sq ft: ₹{analysis['price_per_sqft']['avg']:.0f} Location Distribution: {json.dumps(analysis['location_distribution'], indent=2)} Area Type Distribution: {json.dumps(analysis['area_type_distribution'], indent=2)} Availability Distribution: {json.dumps(analysis['availability_distribution'], indent=2)} {properties_sample_str} Please provide detailed market insights including: 1. **Current Market Trends**: What are the prevailing trends in Pune's real estate market based on this data? (e.g., buyer's/seller's market, growth areas, demand patterns). 2. **Price Competitiveness**: How do the prices of these properties compare to the overall Pune market? Are they underpriced, overpriced, or fair value? Highlight any anomalies based on the 'price_difference_pct' if available. 3. **Best Value Propositions**: Identify what types of properties (BHK, area, location) offer the best value for money right now based on average price per sqft and price ranges. 4. **Location-Specific Insights**: For the top 2-3 locations, provide specific observations on amenities, connectivity, future development potential, and why they might be good/bad for investment or living. 5. **Investment Recommendations**: Based on the data, what would be your general investment advice for someone looking into Pune real estate? 6. **Future Market Outlook**: Briefly discuss the short-to-medium term outlook (next 1-2 years) for the Pune real estate market. Ensure your response is conversational, professional, and directly actionable for a property buyer. """ response = model.generate_content(context) return self._clean_gemini_text(response.text) # Apply cleaning here except Exception as e: logger.error(f"Error generating insights using Gemini: {e}") return "I'm analyzing the market data to provide you with detailed insights. Based on the current search results, I can see various opportunities in your preferred areas. Please bear with me while I gather more information." def get_nearby_properties_gemini(self, location: str, requirements: Dict[str, Any]) -> str: """Get nearby properties and recent market data using Gemini""" try: # Refine requirements to be more human-readable for Gemini req_str = "" for k, v in requirements.items(): if k == 'min_price': req_str += f"Minimum Budget: {v} Lakhs. " elif k == 'max_price': req_str += f"Maximum Budget: {v} Lakhs. " elif k == 'min_area': req_str += f"Minimum Area: {v} sq ft. " elif k == 'max_area': req_str += f"Maximum Area: {v} sq ft. " elif k == 'bhk': req_str += f"BHK: {v}. " elif k == 'area_type': req_str += f"Area Type: {v}. " elif k == 'availability': req_str += f"Availability: {v}. " elif k == 'location': req_str += f"Preferred Location: {v}. " if not req_str: req_str = "No specific requirements provided beyond location." context = f""" You are a knowledgeable real estate advisor with up-to-date market information for Pune. A client is interested in properties near **{location}**. Their current general requirements are: {req_str.strip()} Please provide a concise but informative overview focusing on: 1. **Recent Property Launches**: Mention any significant residential projects launched in {location} or very close vicinity in the last 6-12 months. 2. **Upcoming Projects**: Are there any major upcoming residential or commercial developments (e.g., IT parks, malls, new infrastructure) that could impact property values in {location} or adjacent areas? 3. **Infrastructure Developments**: Discuss any planned or ongoing infrastructure improvements (roads, metro, civic amenities) that might affect property values and lifestyle in {location}. 4. **Market Trends in {location}**: What are the current buying/rental trends specific to {location}? Is it a high-demand area, or is supply outpacing demand? 5. **Comparative Analysis**: Briefly compare {location} with one or two similar, nearby localities in terms of property values, amenities, and lifestyle. 6. **Investment Potential**: What is the general investment outlook and potential rental yield for properties in {location}? Focus on specific details relevant to the Pune real estate market in 2024-2025. """ response = model.generate_content(context) return self._clean_gemini_text(response.text) # Apply cleaning here except Exception as e: logger.error(f"Error getting nearby properties from Gemini: {e}") return f"I'm gathering information about recent properties and market trends near {location}. This area has shown consistent growth in property values, and I'll get you more specific details shortly." def update_context_memory(self, user_query: str, results: Dict[str, Any]): """Update context memory to maintain conversation continuity""" self.context_memory.append({ 'timestamp': datetime.now().isoformat(), 'user_query': user_query, 'results_summary': { 'total_properties': results.get('total_properties', 0), 'price_range': results.get('price_range', {}), 'locations': list(results.get('location_distribution', {}).keys()) } }) # Keep only last 5 interactions if len(self.context_memory) > 5: self.context_memory.pop(0) def extract_requirements(self, user_message: str) -> Dict[str, Any]: """Extract property requirements from user message""" requirements = {} message_lower = user_message.lower() # Extract BHK bhk_match = re.search(r'(\d+)\s*bhk', message_lower) if bhk_match: try: requirements['bhk'] = int(bhk_match.group(1)) except ValueError: pass # Ignore if not a valid number # Extract budget range (flexible with "lakh", "crore", "million") budget_match = re.search( r'(?:budget|price|cost)(?:.*?(?:of|between|from)\s*)?' r'(\d+(?:\.\d+)?)\s*(?:lakhs?|crores?|million)?(?:(?:\s*to|\s*-\s*)\s*(\d+(?:\.\d+)?)\s*(?:lakhs?|crores?|million)?)?', message_lower ) if budget_match: try: min_amount = float(budget_match.group(1)) max_amount = float(budget_match.group(2)) if budget_match.group(2) else None # Detect units for both min and max if provided if 'crore' in budget_match.group(0): # Check original matched string for "crore" requirements['min_price'] = min_amount * 100 if max_amount: requirements['max_price'] = max_amount * 100 elif 'million' in budget_match.group(0): requirements['min_price'] = min_amount * 10 if max_amount: requirements['max_price'] = max_amount * 10 else: # Default to lakhs requirements['min_price'] = min_amount if max_amount: requirements['max_price'] = max_amount except ValueError: pass # Extract location found_location = False for location in self.locations: if location.lower() in message_lower: requirements['location'] = location found_location = True break # Broader location matching if specific location not found if not found_location: general_locations = ['pune', 'pcmp'] # Add more general terms if needed for gen_loc in general_locations: if gen_loc in message_lower: requirements['location'] = gen_loc.capitalize() # Or a default location like 'Pune' break # Extract area requirements area_match = re.search(r'(\d+(?:\.\d+)?)\s*(?:to|-)?\s*(\d+(?:\.\d+)?)?\s*(?:sq\s*ft|sqft|square\s*feet|sft)', message_lower) if area_match: try: requirements['min_area'] = float(area_match.group(1)) if area_match.group(2): requirements['max_area'] = float(area_match.group(2)) except ValueError: pass # Extract area type for area_type in self.area_types: if area_type.lower() in message_lower: requirements['area_type'] = area_type break # Extract availability for availability in self.availability_types: if availability.lower() in message_lower: requirements['availability'] = availability break return requirements def handle_query(self, user_message: str) -> Dict[str, Any]: """Main function to handle user queries with context awareness""" try: # Extract requirements from current message requirements = self.extract_requirements(user_message) # Merge with previous context if building on last query if self.is_follow_up_query(user_message): requirements = self.merge_with_context(requirements) # Update client preferences (store all extracted preferences) self.client_preferences.update(requirements) # Filter properties based on requirements filtered_df = self.filter_properties(requirements) # If no properties found, provide suggestions if filtered_df.empty: return self.handle_no_results(requirements) # Predict prices for filtered results filtered_df = self.predict_prices_for_filtered_results(filtered_df) # Get price range analysis analysis = self.get_price_range_analysis(filtered_df) # Generate deep insights insights = self.generate_deep_insights(filtered_df, analysis) # Get nearby properties information nearby_info = "" if requirements.get('location'): nearby_info = self.get_nearby_properties_gemini(requirements['location'], requirements) # Prepare response response_data = { 'filtered_properties': filtered_df.to_dict('records'), 'analysis': analysis, 'insights': insights, 'nearby_properties': nearby_info, 'requirements': requirements, 'context_aware': self.is_follow_up_query(user_message) } # Update context memory self.update_context_memory(user_message, analysis) self.last_search_results = response_data # Store the full response return response_data except Exception as e: logger.error(f"Error handling query: {e}") return {'error': f'An error occurred while processing your query: {str(e)}'} def is_follow_up_query(self, user_message: str) -> bool: """Check if this is a follow-up query building on previous results""" follow_up_indicators = ['also', 'and', 'additionally', 'what about', 'show me more', 'similar', 'nearby', 'around', 'close to', 'next', 'other', 'different', 'change', 'update'] return any(indicator in user_message.lower() for indicator in follow_up_indicators) and len( self.context_memory) > 0 def merge_with_context(self, new_requirements: Dict[str, Any]) -> Dict[str, Any]: """Merge new requirements with context from previous queries""" # Start with the overall client preferences merged = self.client_preferences.copy() # Overwrite with any new requirements from the current message merged.update(new_requirements) # Apply default or previously established values if not specified in current query # Example: If a new query doesn't specify location, use the last known location if 'location' not in new_requirements and 'location' in self.client_preferences: merged['location'] = self.client_preferences['location'] # You can add more sophisticated merging logic here if needed # e.g., if a new budget is provided, replace the old one entirely. # The current update() method already handles this for direct key conflicts. return merged def handle_no_results(self, requirements: Dict[str, Any]) -> Dict[str, Any]: """Handle cases where no properties match the criteria""" suggestions = ["Try relaxing some of your criteria."] if requirements.get('max_price'): suggestions.append(f"Consider increasing your budget above ₹{requirements['max_price']:.2f}L.") # Suggest a slightly higher range suggestions.append(f"Perhaps a range of ₹{requirements['max_price']:.2f}L to ₹{(requirements['max_price'] * 1.2):.2f}L?") if requirements.get('location'): # Find nearby popular locations in the dataset, excluding the one searched nearby_locs = [loc for loc in self.locations if loc.lower() != requirements['location'].lower()] if nearby_locs: # Sort by count if possible, or just pick top 3 top_locations = self.df['site_location'].value_counts().index.tolist() suggested_nearby = [loc for loc in top_locations if loc.lower() != requirements['location'].lower()][:3] if suggested_nearby: suggestions.append(f"Consider nearby popular areas like: {', '.join(suggested_nearby)}.") if requirements.get('bhk') is not None: # Suggest +/- 1 BHK if not already at min/max reasonable bhk if requirements['bhk'] > 1: suggestions.append(f"Consider looking for {requirements['bhk'] - 1} BHK options.") if requirements['bhk'] < 6: # Assuming 6 is a practical upper limit for most searches suggestions.append(f"Consider {requirements['bhk'] + 1} BHK options.") if requirements.get('max_area'): suggestions.append(f"You might find more options if you consider properties up to ~{(requirements['max_area'] * 1.1):.0f} sq ft.") # Default message if specific suggestions are not generated if len(suggestions) == 1 and "relaxing" in suggestions[0]: final_suggestion_text = "No properties found matching your exact criteria. Please try broadening your search or clarifying your preferences." else: final_suggestion_text = f"No properties found matching your exact criteria. Here are some suggestions: {'; '.join(suggestions)}." return { 'filtered_properties': [], 'analysis': self.get_price_range_analysis(pd.DataFrame()), # Empty analysis 'insights': final_suggestion_text, 'nearby_properties': '', 'requirements': requirements, 'suggestions': suggestions } # Initialize the enhanced sales assistant (MUST be after class definition) sales_assistant = EnhancedRealEstateSalesAssistant() @app.route('/') def index(): return render_template('index2.html') @app.route('/chat', methods=['POST']) def chat(): """Enhanced chat endpoint with comprehensive property search""" try: data = request.get_json() user_message = data.get('message', '').strip() if not user_message: return jsonify({'error': 'Please provide your query'}), 400 # Get comprehensive response from sales assistant response_data = sales_assistant.handle_query(user_message) # Ensure response_data always includes necessary keys even on error if 'error' in response_data: return jsonify({ 'response': response_data, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'conversation_context': len(sales_assistant.context_memory) }), 500 # Return 500 for internal errors else: return jsonify({ 'response': response_data, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'conversation_context': len(sales_assistant.context_memory) }) except Exception as e: logger.error(f"Chat endpoint error: {e}") return jsonify({'error': f'An unexpected error occurred in chat endpoint: {str(e)}'}), 500 # Flask routes must be defined at the module level, not inside a class @app.route('/model_status') def model_status(): """Get current model status and version info""" try: status = { 'model_loaded': hasattr(sales_assistant, 'pipeline') and sales_assistant.pipeline is not None, 'model_version': getattr(sales_assistant, 'model_version', 'unknown'), 'feature_count': len(getattr(sales_assistant, 'training_feature_names', [])), 'prediction_method': 'ML' if hasattr(sales_assistant, 'pipeline') and sales_assistant.pipeline else 'Fallback', 'sklearn_version': sklearn.__version__ } return jsonify(status) except Exception as e: logger.error(f"Error in model_status endpoint: {e}") return jsonify({'error': str(e)}), 500 @app.route('/retrain_model', methods=['POST']) def retrain_model(): """Retrain the model with current version""" try: logger.info("Starting model retraining via endpoint...") success = sales_assistant.retrain_model_with_current_version() if success: return jsonify({ 'message': 'Model retrained successfully', 'version': sales_assistant.model_version, 'feature_count': len(sales_assistant.training_feature_names), 'sample_features': sales_assistant.training_feature_names[:10] if sales_assistant.training_feature_names else [] }) else: return jsonify({'error': 'Failed to retrain model. Check logs for details.'}), 500 except Exception as e: logger.error(f"Retrain endpoint error: {e}") return jsonify({'error': f'An unexpected error occurred during retraining: {str(e)}'}), 500 @app.route('/filter_properties', methods=['POST']) def filter_properties_endpoint(): # Renamed to avoid conflict with class method """Dedicated endpoint for property filtering""" try: filters = request.get_json() filtered_df = sales_assistant.filter_properties(filters) filtered_df = sales_assistant.predict_prices_for_filtered_results(filtered_df) analysis = sales_assistant.get_price_range_analysis(filtered_df) return jsonify({ 'properties': filtered_df.to_dict('records'), 'analysis': analysis, 'total_count': len(filtered_df) }) except Exception as e: logger.error(f"Filter endpoint error: {e}") return jsonify({'error': f'Filtering error: {str(e)}'}), 500 @app.route('/get_insights', methods=['POST']) def get_insights(): """Get AI-powered insights for current search results""" try: data = request.get_json() location_from_request = data.get('location') requirements_from_request = data.get('requirements', {}) if sales_assistant.last_search_results: filtered_df = pd.DataFrame(sales_assistant.last_search_results['filtered_properties']) analysis = sales_assistant.last_search_results['analysis'] insights = sales_assistant.generate_deep_insights(filtered_df, analysis) nearby_info = "" # Use location from request if provided, otherwise from last search results effective_location = location_from_request or sales_assistant.last_search_results['requirements'].get('location') if effective_location: nearby_info = sales_assistant.get_nearby_properties_gemini(effective_location, requirements_from_request or sales_assistant.last_search_results['requirements']) return jsonify({ 'insights': insights, 'nearby_properties': nearby_info }) else: return jsonify({'error': 'No previous search results available for insights. Please perform a search first.'}), 400 except Exception as e: logger.error(f"Insights endpoint error: {e}") return jsonify({'error': f'Insights error: {str(e)}'}), 500 @app.route('/get_market_data') def get_market_data(): """Get comprehensive market data""" try: # Ensure df is loaded before accessing its properties if not hasattr(sales_assistant, 'df') or sales_assistant.df.empty: return jsonify({'error': 'Market data not available, dataset not loaded.'}), 500 market_data = { 'locations': sales_assistant.locations, 'area_types': sales_assistant.area_types, 'availability_types': sales_assistant.availability_types, 'bhk_options': sales_assistant.bhk_options, 'price_statistics': { 'min_price': float(sales_assistant.df['price'].min()), 'max_price': float(sales_assistant.df['price'].max()), 'avg_price': float(sales_assistant.df['price'].mean()), 'median_price': float(sales_assistant.df['price'].median()) }, 'area_statistics': { 'min_area': float(sales_assistant.df['total_sqft'].min()), 'max_area': float(sales_assistant.df['total_sqft'].max()), 'avg_area': float(sales_assistant.df['total_sqft'].mean()) } } return jsonify(market_data) except Exception as e: logger.error(f"Market data endpoint error: {e}") return jsonify({'error': f'Market data error: {str(e)}'}), 500 @app.route('/client_preferences') def get_client_preferences(): """Get current client preferences and conversation history""" return jsonify({ 'preferences': sales_assistant.client_preferences, 'conversation_history': sales_assistant.context_memory, 'last_search_available': sales_assistant.last_search_results is not None }) @app.route('/reset_session', methods=['POST']) def reset_session(): """Reset entire session including preferences and context""" try: sales_assistant.client_preferences = {} sales_assistant.context_memory = [] sales_assistant.last_search_results = None return jsonify({'message': 'Session reset successfully'}) except Exception as e: logger.error(f"Reset session error: {e}") return jsonify({'error': f'Failed to reset session: {str(e)}'}), 500 @app.route('/health') def health(): """Health check endpoint""" try: return jsonify({ 'status': 'healthy', 'model_loaded': hasattr(sales_assistant, 'pipeline') and sales_assistant.pipeline is not None, 'model_version': sales_assistant.model_version, 'data_loaded': hasattr(sales_assistant, 'df') and not sales_assistant.df.empty, 'dataset_size': len(sales_assistant.df) if hasattr(sales_assistant, 'df') else 0, 'locations_available': len(sales_assistant.locations), 'context_memory_size': len(sales_assistant.context_memory), 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"Health check error: {e}") return jsonify({'status': 'unhealthy', 'error': str(e)}), 500 if __name__ == '__main__': # It's better to explicitly set the host for deployment environments. # For development, debug=True is useful but should be False in production. app.run(debug=True, host='0.0.0.0', port=5000)