Spaces:
Sleeping
Sleeping
from flask import Flask, render_template, request, jsonify | |
import google.generativeai as genai | |
import pandas as pd | |
import numpy as np | |
import joblib | |
import os | |
import json | |
from datetime import datetime | |
import re | |
from typing import Dict, List, Any, Optional | |
import logging | |
from sklearn.model_selection import train_test_split | |
from sklearn.ensemble import RandomForestRegressor | |
from sklearn.preprocessing import StandardScaler | |
from sklearn.pipeline import Pipeline | |
from sklearn.metrics import mean_absolute_error, r2_score | |
import sklearn # Import sklearn to get version | |
app = Flask(__name__) | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Configure Gemini API | |
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', 'AIzaSyAU8NFxxscrLcPcYqp5bnnyBPTVH0v1aZY') # IMPORTANT: Replace with a secure way to manage API keys | |
genai.configure(api_key=GEMINI_API_KEY) | |
# Using gemini-1.5-flash for potentially larger context window and better performance | |
model = genai.GenerativeModel('gemini-2.0-flash') | |
class EnhancedRealEstateSalesAssistant: | |
def __init__(self): | |
self.pipeline = None # Initialize pipeline to None | |
self.training_feature_names = [] # To store feature names from training | |
self.model_version = 'N/A' | |
self.load_model_and_data() | |
self.conversation_history = [] | |
self.client_preferences = {} | |
self.last_search_results = None | |
self.context_memory = [] | |
def _clean_gemini_text(self, text: str) -> str: | |
"""Removes common Markdown formatting characters (like asterisks, hashes) from Gemini's output.""" | |
if not isinstance(text, str): | |
return text # Return as is if not a string | |
# Remove bold/italic markers (**) | |
cleaned_text = re.sub(r'\*\*', '', text) | |
# Remove italic markers (*) | |
cleaned_text = re.sub(r'\*', '', cleaned_text) | |
# Remove header markers (#) at the start of a line, and any following whitespace | |
cleaned_text = re.sub(r'^\s*#+\s*', '', cleaned_text, flags=re.MULTILINE) | |
# Remove typical list bullet points like '- ' or '+ ' or '* ' at the start of a line | |
# (Note: '*' removal above might catch list items, this is for redundancy/clarity) | |
cleaned_text = re.sub(r'^\s*[-+]\s+', '', cleaned_text, flags=re.MULTILINE) | |
# Replace multiple newlines with a single one (or two, depending on desired paragraph spacing) | |
cleaned_text = re.sub(r'\n\s*\n', '\n\n', cleaned_text) | |
# Remove excessive leading/trailing whitespace from each line | |
cleaned_text = '\n'.join([line.strip() for line in cleaned_text.splitlines()]) | |
# Final strip for the whole block | |
cleaned_text = cleaned_text.strip() | |
return cleaned_text | |
def load_model_and_data(self): | |
"""Load the trained model and dataset with version compatibility""" | |
try: | |
# Load the dataset first | |
self.df = pd.read_csv('Pune_House_Data.csv') | |
# Clean and process data | |
self.df = self.clean_dataset() | |
# Extract unique values for filtering | |
self.locations = sorted(self.df['site_location'].unique().tolist()) | |
self.area_types = sorted(self.df['area_type'].unique().tolist()) | |
self.availability_types = sorted(self.df['availability'].unique().tolist()) | |
self.bhk_options = sorted(self.df['bhk'].unique().tolist()) | |
# Try to load model with version compatibility | |
model_loaded = self.load_model_with_version_check() | |
if not model_loaded: | |
logger.warning("Model loading failed, prediction will use fallback method.") | |
self.pipeline = None # Ensure pipeline is None if loading failed | |
logger.info("β Model and data loading process completed.") | |
logger.info(f"Dataset shape: {self.df.shape}") | |
logger.info(f"Available locations: {len(self.locations)}") | |
except Exception as e: | |
logger.error(f"β Critical error loading model/data: {e}") | |
# Create fallback data | |
self.df = self.create_fallback_data() | |
self.locations = ['Baner', 'Hadapsar', 'Kothrud', 'Viman Nagar', 'Wakad', 'Hinjewadi'] | |
self.area_types = ['Built-up Area', 'Super built-up Area', 'Plot Area', 'Carpet Area'] | |
self.availability_types = ['Ready To Move', 'Not Ready', 'Under Construction'] | |
self.bhk_options = [1, 2, 3, 4, 5, 6] | |
self.pipeline = None # Ensure pipeline is None if data loading failed | |
def load_model_with_version_check(self): | |
"""Load model with version compatibility check, or retrain if necessary.""" | |
try: | |
# First try to load the new version-compatible model | |
try: | |
model_data = joblib.load('house_price_prediction_v2.pkl') | |
self.pipeline = model_data['pipeline'] | |
self.training_feature_names = model_data.get('feature_names', []) | |
self.model_version = model_data.get('version', '2.0') | |
logger.info(f"β Loaded model version {self.model_version} with {len(self.training_feature_names)} features.") | |
return True | |
except FileNotFoundError: | |
logger.info("New model version (house_price_prediction_v2.pkl) not found, attempting to retrain.") | |
return self.retrain_model_with_current_version() | |
except Exception as v2_load_error: | |
logger.error(f"Failed to load new model version (v2): {v2_load_error}. Attempting retraining.") | |
return self.retrain_model_with_current_version() | |
except Exception as e: | |
logger.error(f"Error in model loading process: {e}. Retraining will be attempted as a last resort.") | |
return self.retrain_model_with_current_version() # Final attempt to retrain if other loading fails | |
def retrain_model_with_current_version(self): | |
"""Retrain the model with current scikit-learn version""" | |
try: | |
logger.info(f"π Retraining model with scikit-learn version {sklearn.__version__}") | |
# Prepare features (this method also sets self.training_feature_names) | |
X, y = self.create_training_data_with_proper_features() | |
if X is None or y is None: | |
logger.error("Failed to create training data, cannot retrain model.") | |
self.pipeline = None | |
return False | |
# Split data | |
X_train, X_test, y_train, y_test = train_test_split( | |
X, y, test_size=0.2, random_state=42 | |
) | |
# Create pipeline | |
pipeline = Pipeline([ | |
('scaler', StandardScaler()), | |
('model', RandomForestRegressor( | |
n_estimators=100, | |
random_state=42, | |
n_jobs=-1 # Use all available cores | |
)) | |
]) | |
# Train model | |
logger.info("Training model...") | |
pipeline.fit(X_train, y_train) | |
# Test model | |
y_pred = pipeline.predict(X_test) | |
mae = mean_absolute_error(y_test, y_pred) | |
r2 = r2_score(y_test, y_pred) | |
logger.info(f"β Model trained successfully:") | |
logger.info(f" MAE: {mae:.2f} lakhs") | |
logger.info(f" RΒ² Score: {r2:.3f}") | |
# Save the new model | |
self.pipeline = pipeline | |
# self.training_feature_names is already set by create_training_data_with_proper_features | |
model_data = { | |
'pipeline': self.pipeline, | |
'feature_names': self.training_feature_names, | |
'version': '2.0', | |
'sklearn_version': sklearn.__version__, | |
'training_info': { | |
'mae': mae, | |
'r2_score': r2, | |
'training_samples': len(X_train), | |
'test_samples': len(X_test), | |
'feature_count': len(self.training_feature_names) | |
}, | |
'created_at': datetime.now().isoformat() | |
} | |
# Save with version info | |
joblib.dump(model_data, 'house_price_prediction_v2.pkl') | |
logger.info("β Model saved as house_price_prediction_v2.pkl") | |
return True | |
except Exception as e: | |
logger.error(f"Error retraining model: {e}") | |
self.pipeline = None | |
return False | |
def create_training_data_with_proper_features(self): | |
"""Create training data with proper feature engineering for consistent predictions""" | |
try: | |
# Prepare features for training | |
df_features = self.df.copy() | |
# Select relevant columns for training | |
feature_columns = ['site_location', 'area_type', 'availability', 'bhk', 'bath', 'balcony', 'total_sqft'] | |
# Check if all required columns exist | |
missing_columns = [col for col in feature_columns + ['price'] if col not in df_features.columns] | |
if missing_columns: | |
logger.error(f"Missing essential columns for training: {missing_columns}") | |
return None, None | |
df_features = df_features[feature_columns + ['price']] | |
# Clean data (ensure numeric columns are correct, fillna where appropriate) | |
df_features['bhk'] = pd.to_numeric(df_features['bhk'], errors='coerce') | |
df_features['bath'] = pd.to_numeric(df_features['bath'], errors='coerce').fillna(df_features['bath'].median()) | |
df_features['balcony'] = pd.to_numeric(df_features['balcony'], errors='coerce').fillna(df_features['balcony'].median()) | |
df_features['total_sqft'] = pd.to_numeric(df_features['total_sqft'], errors='coerce') | |
df_features['price'] = pd.to_numeric(df_features['price'], errors='coerce') | |
# Remove rows with critical missing values for training | |
df_features = df_features.dropna(subset=['price', 'total_sqft', 'bhk', 'site_location']) | |
# Remove outliers before one-hot encoding for more robust training | |
df_features = self.remove_outliers(df_features) | |
# One-hot encode categorical variables | |
categorical_cols = ['site_location', 'area_type', 'availability'] | |
df_encoded = pd.get_dummies(df_features, columns=categorical_cols, prefix=categorical_cols, dummy_na=False) | |
# Separate features and target | |
X = df_encoded.drop('price', axis=1) | |
y = df_encoded['price'] | |
# Store feature names for later use in prediction | |
self.training_feature_names = X.columns.tolist() | |
logger.info(f"Training data prepared: {X.shape[0]} samples, {X.shape[1]} features") | |
return X, y | |
except Exception as e: | |
logger.error(f"Error creating training data: {e}") | |
return None, None | |
def remove_outliers(self, df): | |
"""Remove outliers from the dataset based on price and area.""" | |
try: | |
# Calculate price_per_sqft for outlier removal, then drop it if not needed as a feature | |
df['price_per_sqft'] = (df['price'] * 100000) / df['total_sqft'] | |
# Remove price outliers (e.g., beyond 3 standard deviations from mean) | |
price_mean = df['price'].mean() | |
price_std = df['price'].std() | |
df = df[abs(df['price'] - price_mean) <= 3 * price_std] | |
# Remove area outliers (e.g., beyond 3 standard deviations from mean) | |
area_mean = df['total_sqft'].mean() | |
area_std = df['total_sqft'].std() | |
df = df[abs(df['total_sqft'] - area_mean) <= 3 * area_std] | |
# Remove properties with unrealistic price per sqft (e.g., <1000 or >20000 INR/sqft for Pune) | |
df = df[(df['price_per_sqft'] >= 1000) & (df['price_per_sqft'] <= 20000)] | |
# Drop temporary price_per_sqft column | |
df = df.drop(columns=['price_per_sqft']) | |
logger.info(f"Dataset after outlier removal: {len(df)} rows") | |
return df | |
except Exception as e: | |
logger.error(f"Error removing outliers: {e}") | |
return df # Return original df if error occurs | |
def predict_single_price(self, location, bhk, bath, balcony, sqft, area_type, availability): | |
"""Predict price for a single property using ML model, with fallback to heuristic.""" | |
try: | |
# Check if ML pipeline is loaded | |
if not hasattr(self, 'pipeline') or self.pipeline is None or not self.training_feature_names: | |
logger.warning("ML pipeline or training features not available. Using fallback prediction.") | |
return self.predict_single_price_fallback(location, bhk, bath, balcony, sqft, area_type, availability) | |
try: | |
# Create a feature dictionary initialized with 0s for all expected features | |
feature_dict = {feature: 0 for feature in self.training_feature_names} | |
# Set numerical features | |
numerical_mapping = { | |
'bhk': bhk, | |
'bath': bath, | |
'balcony': balcony, | |
'total_sqft': sqft | |
} | |
for feature, value in numerical_mapping.items(): | |
if feature in feature_dict: | |
feature_dict[feature] = value | |
else: | |
logger.warning(f"Numerical feature '{feature}' not found in training features. Skipping.") | |
# Set categorical features (one-hot encoded) | |
# Location | |
location_key = f"site_location_{location}" | |
if location_key in feature_dict: | |
feature_dict[location_key] = 1 | |
else: | |
found_loc = False | |
for feature in self.training_feature_names: | |
if feature.startswith('site_location_') and location.lower() in feature.lower(): | |
feature_dict[feature] = 1 | |
found_loc = True | |
break | |
if not found_loc: | |
logger.warning(f"Location '{location}' not found as exact or fuzzy match in training features. This location will not contribute to the ML prediction specific to its one-hot encoding.") | |
# Area Type | |
area_type_key = f"area_type_{area_type}" | |
if area_type_key in feature_dict: | |
feature_dict[area_type_key] = 1 | |
else: | |
found_area_type = False | |
for feature in self.training_feature_names: | |
if feature.startswith('area_type_') and area_type.lower() in feature.lower(): | |
feature_dict[feature] = 1 | |
found_area_type = True | |
break | |
if not found_area_type: | |
logger.warning(f"Area type '{area_type}' not found in training features. This area type will not contribute to the ML prediction specific to its one-hot encoding.") | |
# Availability | |
availability_key = f"availability_{availability}" | |
if availability_key in feature_dict: | |
feature_dict[availability_key] = 1 | |
else: | |
found_availability = False | |
for feature in self.training_feature_names: | |
if feature.startswith('availability_') and availability.lower() in feature.lower(): | |
feature_dict[feature] = 1 | |
found_availability = True | |
break | |
if not found_availability: | |
logger.warning(f"Availability '{availability}' not found in training features. This availability will not contribute to the ML prediction specific to its one-hot encoding.") | |
# Create DataFrame from the feature dictionary and ensure column order | |
input_df = pd.DataFrame([feature_dict])[self.training_feature_names] | |
predicted_price = self.pipeline.predict(input_df)[0] | |
return round(predicted_price, 2) | |
except Exception as ml_prediction_error: | |
logger.warning(f"ML prediction failed for input {location, bhk, sqft}: {ml_prediction_error}. Falling back to heuristic model.") | |
return self.predict_single_price_fallback(location, bhk, bath, balcony, sqft, area_type, availability) | |
except Exception as general_error: | |
logger.error(f"General error in predict_single_price wrapper: {general_error}. Falling back to heuristic.") | |
return self.predict_single_price_fallback(location, bhk, bath, balcony, sqft, area_type, availability) | |
def predict_single_price_fallback(self, location, bhk, bath, balcony, sqft, area_type, availability): | |
"""Enhanced fallback prediction method when ML model is unavailable or fails.""" | |
try: | |
# Base price per sqft for different areas in Pune (extensive map for better fallback) | |
location_price_map = { | |
'Koregaon Park': 12000, 'Kalyani Nagar': 11000, 'Boat Club Road': 10500, | |
'Aundh': 9500, 'Baner': 8500, 'Balewadi': 8000, 'Kothrud': 8500, | |
'Viman Nagar': 7500, 'Wakad': 7000, 'Hinjewadi': 7500, 'Pune': 7000, | |
'Hadapsar': 6500, 'Kharadi': 7200, 'Yerawada': 7000, 'Lohegaon': 6000, | |
'Wagholi': 5500, 'Mundhwa': 7000, 'Undri': 6500, 'Kondhwa': 6000, | |
'Katraj': 5800, 'Dhankawadi': 6000, 'Warje': 6500, 'Karve Nagar': 7500, | |
'Bavdhan': 8000, 'Pashan': 7500, 'Sus': 7200, 'Pimpri': 6000, | |
'Chinchwad': 6200, 'Akurdi': 5800, 'Nigdi': 6500, 'Bhosari': 5500, | |
'Chakan': 4800, 'Talegaon': 4500, 'Alandi': 4200, 'Dehu': 4000, | |
'Lonavala': 5000, 'Kamshet': 4500, 'Sinhagad Road': 6800, | |
'Balaji Nagar': 5800, 'Parvati': 5500, 'Gultekdi': 5200, 'Wanowrie': 6200, | |
'Shivajinagar': 9000, 'Deccan': 8500, 'Camp': 7500, 'Koregaon': 8000, | |
'Sadashiv Peth': 8200, 'Shukrawar Peth': 7800, 'Kasba Peth': 7500, | |
'Narayan Peth': 7200, 'Rasta Peth': 7000, 'Ganj Peth': 6800, | |
'Magarpatta': 7500, 'Fursungi': 5000, 'Handewadi': 4800, | |
'Mahatma Phule Peth': 6200, 'Bhavani Peth': 6000, 'Bibwewadi': 7000 | |
} | |
# Get base price per sqft, try fuzzy matching if exact match not found | |
base_price_per_sqft = location_price_map.get(location, 6500) # Default if no match | |
if location not in location_price_map: | |
for loc_key, price in location_price_map.items(): | |
if location.lower() in loc_key.lower() or loc_key.lower() in location.lower(): | |
base_price_per_sqft = price | |
break | |
# Area type multiplier | |
area_type_multiplier = { | |
'Super built-up Area': 1.0, | |
'Built-up Area': 0.92, # Typically slightly less than super built-up for same listed price | |
'Plot Area': 0.85, # Plot area might translate to lower built-up | |
'Carpet Area': 1.08 # Carpet area is typically more expensive per sqft | |
}.get(area_type, 1.0) | |
# Availability multiplier (Under Construction/Not Ready typically cheaper) | |
availability_multiplier = { | |
'Ready To Move': 1.0, | |
'Under Construction': 0.88, | |
'Not Ready': 0.82 | |
}.get(availability, 0.95) | |
# BHK-based pricing adjustments (larger BHKs might have slightly lower price/sqft due to bulk) | |
bhk_multiplier = { | |
1: 1.15, # Premium for 1BHK | |
2: 1.0, # Base | |
3: 0.95, # Slight economy | |
4: 0.92, | |
5: 0.90, | |
6: 0.88 | |
}.get(bhk, 1.0) | |
# Bathroom and Balcony multipliers (simple linear adjustment) | |
bath_multiplier = 1.0 + (bath - 2) * 0.03 # Each extra bath adds 3% to price | |
balcony_multiplier = 1.0 + (balcony - 1) * 0.02 # Each extra balcony adds 2% to price | |
# Ensure multipliers don't go negative or too low | |
bath_multiplier = max(0.9, bath_multiplier) | |
balcony_multiplier = max(0.95, balcony_multiplier) | |
# Calculate final estimated price | |
estimated_price_raw = (sqft * base_price_per_sqft * | |
area_type_multiplier * availability_multiplier * | |
bhk_multiplier * bath_multiplier * balcony_multiplier) | |
estimated_price_lakhs = estimated_price_raw / 100000 | |
return round(estimated_price_lakhs, 2) | |
except Exception as e: | |
logger.error(f"Error in fallback prediction: {e}") | |
return None # Indicate failure | |
def clean_dataset(self): | |
"""Clean and process the dataset""" | |
df = self.df.copy() | |
# Rename 'size' to 'bhk' based on previous logic (if 'size' exists) | |
if 'size' in df.columns and 'bhk' not in df.columns: | |
df['bhk'] = df['size'].str.extract(r'(\d+)').astype(float) | |
elif 'bhk' not in df.columns: | |
df['bhk'] = np.nan # Ensure bhk column exists even if 'size' is missing | |
# Clean total_sqft column | |
df['total_sqft'] = pd.to_numeric(df['total_sqft'], errors='coerce') | |
# Clean price column | |
df['price'] = pd.to_numeric(df['price'], errors='coerce') | |
# Fill missing values for numerical columns (using median for robustness) | |
df['bath'] = pd.to_numeric(df['bath'], errors='coerce').fillna(df['bath'].median()) | |
df['balcony'] = pd.to_numeric(df['balcony'], errors='coerce').fillna(df['balcony'].median()) | |
# Fill missing values for categorical columns | |
df['society'] = df['society'].fillna('Not Specified') | |
df['area_type'] = df['area_type'].fillna('Super built-up Area') # Common default | |
df['availability'] = df['availability'].fillna('Ready To Move') # Common default | |
df['site_location'] = df['site_location'].fillna('Pune') # General default | |
# Remove rows with critical missing values after initial cleaning | |
df = df.dropna(subset=['price', 'total_sqft', 'bhk', 'site_location']) | |
# Ensure 'bhk' is an integer (or float if decimals are possible) | |
df['bhk'] = df['bhk'].astype(int) | |
df['bath'] = df['bath'].astype(int) | |
df['balcony'] = df['balcony'].astype(int) | |
logger.info(f"Dataset cleaned. Original rows: {self.df.shape[0]}, Cleaned rows: {df.shape[0]}") | |
return df | |
def create_fallback_data(self): | |
"""Create fallback data if CSV loading fails""" | |
logger.warning("Creating fallback dataset due to CSV loading failure.") | |
fallback_data = { | |
'area_type': ['Super built-up Area', 'Built-up Area', 'Carpet Area'] * 10, | |
'availability': ['Ready To Move', 'Under Construction', 'Ready To Move'] * 10, | |
'size': ['2 BHK', '3 BHK', '4 BHK'] * 10, # Keep 'size' for cleaning process if needed | |
'society': ['Sample Society'] * 30, | |
'total_sqft': [1000, 1200, 1500] * 10, | |
'bath': [2, 3, 4] * 10, | |
'balcony': [1, 2, 3] * 10, | |
'price': [50, 75, 100] * 10, | |
'site_location': ['Baner', 'Hadapsar', 'Kothrud'] * 10, | |
} | |
df_fallback = pd.DataFrame(fallback_data) | |
# Ensure 'bhk' is generated if 'size' is used, and other derived columns | |
if 'size' in df_fallback.columns: | |
df_fallback['bhk'] = df_fallback['size'].str.extract(r'(\d+)').astype(float) | |
else: | |
df_fallback['bhk'] = [2, 3, 4] * 10 # Direct bhk if 'size' is not used | |
# Add a placeholder for price_per_sqft as it's used in analysis/outlier removal | |
df_fallback['price_per_sqft'] = (df_fallback['price'] * 100000) / df_fallback['total_sqft'] | |
return df_fallback | |
def filter_properties(self, filters: Dict[str, Any]) -> pd.DataFrame: | |
"""Filter properties based on user criteria""" | |
filtered_df = self.df.copy() | |
# Apply filters | |
if filters.get('location'): | |
# Use a more robust search for location, allowing partial or case-insensitive matches | |
search_location = filters['location'].lower() | |
filtered_df = filtered_df[filtered_df['site_location'].str.lower().str.contains(search_location, na=False)] | |
if filters.get('bhk') is not None: # Check for None explicitly as bhk can be 0 | |
filtered_df = filtered_df[filtered_df['bhk'] == filters['bhk']] | |
if filters.get('min_price'): | |
filtered_df = filtered_df[filtered_df['price'] >= filters['min_price']] | |
if filters.get('max_price'): | |
filtered_df = filtered_df[filtered_df['price'] <= filters['max_price']] | |
if filters.get('min_area'): | |
filtered_df = filtered_df[filtered_df['total_sqft'] >= filters['min_area']] | |
if filters.get('max_area'): | |
filtered_df = filtered_df[filtered_df['total_sqft'] <= filters['max_area']] | |
if filters.get('area_type'): | |
search_area_type = filters['area_type'].lower() | |
filtered_df = filtered_df[filtered_df['area_type'].str.lower().str.contains(search_area_type, na=False)] | |
if filters.get('availability'): | |
search_availability = filters['availability'].lower() | |
filtered_df = filtered_df[filtered_df['availability'].str.lower().str.contains(search_availability, na=False)] | |
return filtered_df.head(20) # Limit results for display | |
def get_price_range_analysis(self, filtered_df: pd.DataFrame) -> Dict[str, Any]: | |
"""Analyze price range and provide insights""" | |
if filtered_df.empty: | |
return { | |
"total_properties": 0, | |
"price_range": {"min": 0.0, "max": 0.0, "avg": 0.0, "median": 0.0}, | |
"area_range": {"min": 0.0, "max": 0.0, "avg": 0.0}, | |
"price_per_sqft": {"min": 0.0, "max": 0.0, "avg": 0.0}, | |
"location_distribution": {}, | |
"area_type_distribution": {}, | |
"availability_distribution": {} | |
} | |
analysis = { | |
"total_properties": len(filtered_df), | |
"price_range": { | |
"min": float(filtered_df['price'].min()), | |
"max": float(filtered_df['price'].max()), | |
"avg": float(filtered_df['price'].mean()), | |
"median": float(filtered_df['price'].median()) | |
}, | |
"area_range": { | |
"min": float(filtered_df['total_sqft'].min()), | |
"max": float(filtered_df['total_sqft'].max()), | |
"avg": float(filtered_df['total_sqft'].mean()) | |
}, | |
# Recalculate price_per_sqft for analysis as it might be dropped after outlier removal | |
"price_per_sqft": { | |
"min": float(((filtered_df['price'] * 100000) / filtered_df['total_sqft']).min()), | |
"max": float(((filtered_df['price'] * 100000) / filtered_df['total_sqft']).max()), | |
"avg": float(((filtered_df['price'] * 100000) / filtered_df['total_sqft']).mean()) | |
}, | |
"location_distribution": filtered_df['site_location'].value_counts().to_dict(), | |
"area_type_distribution": filtered_df['area_type'].value_counts().to_dict(), | |
"availability_distribution": filtered_df['availability'].value_counts().to_dict() | |
} | |
return analysis | |
def predict_prices_for_filtered_results(self, filtered_df: pd.DataFrame) -> pd.DataFrame: | |
"""Predict prices for all filtered properties and add to DataFrame.""" | |
if filtered_df.empty: | |
return filtered_df | |
predictions = [] | |
for index, row in filtered_df.iterrows(): | |
predicted_price = self.predict_single_price( | |
location=row['site_location'], | |
bhk=row['bhk'], | |
bath=row['bath'], | |
balcony=row['balcony'], | |
sqft=row['total_sqft'], | |
area_type=row['area_type'], | |
availability=row['availability'] | |
) | |
# Use original price if prediction fails (returns None) | |
predictions.append(predicted_price if predicted_price is not None else row['price']) | |
filtered_df['predicted_price'] = predictions | |
filtered_df['price_difference'] = filtered_df['predicted_price'] - filtered_df['price'] | |
# Avoid division by zero if original price is 0 | |
filtered_df['price_difference_pct'] = ( | |
filtered_df['price_difference'] / filtered_df['price'].replace(0, np.nan) * 100 | |
).fillna(0) # Fill NaN with 0 if original price was 0 or prediction failed | |
return filtered_df | |
def generate_deep_insights(self, filtered_df: pd.DataFrame, analysis: Dict[str, Any]) -> str: | |
"""Generate deep insights using Gemini AI""" | |
try: | |
# Prepare context for Gemini | |
properties_sample_str = filtered_df[['site_location', 'bhk', 'total_sqft', 'price', 'predicted_price', 'price_difference_pct']].head().to_string(index=False) | |
if properties_sample_str: | |
properties_sample_str = f"Top 5 Properties Sample:\n{properties_sample_str}\n" | |
else: | |
properties_sample_str = "No specific property samples available to list." | |
context = f""" | |
You are a highly experienced real estate market expert specializing in Pune properties. | |
Your task is to analyze the provided property data and offer comprehensive, actionable insights for a potential property buyer. | |
Here's a summary of the current search results: | |
- Total Properties Found: {analysis['total_properties']} | |
- Price Range: βΉ{analysis['price_range']['min']:.2f}L to βΉ{analysis['price_range']['max']:.2f}L | |
- Average Price: βΉ{analysis['price_range']['avg']:.2f}L | |
- Median Price: βΉ{analysis['price_range']['median']:.2f}L | |
- Average Area: {analysis['area_range']['avg']:.0f} sq ft | |
- Average Price per sq ft: βΉ{analysis['price_per_sqft']['avg']:.0f} | |
Location Distribution: {json.dumps(analysis['location_distribution'], indent=2)} | |
Area Type Distribution: {json.dumps(analysis['area_type_distribution'], indent=2)} | |
Availability Distribution: {json.dumps(analysis['availability_distribution'], indent=2)} | |
{properties_sample_str} | |
Please provide detailed market insights including: | |
1. **Current Market Trends**: What are the prevailing trends in Pune's real estate market based on this data? (e.g., buyer's/seller's market, growth areas, demand patterns). | |
2. **Price Competitiveness**: How do the prices of these properties compare to the overall Pune market? Are they underpriced, overpriced, or fair value? Highlight any anomalies based on the 'price_difference_pct' if available. | |
3. **Best Value Propositions**: Identify what types of properties (BHK, area, location) offer the best value for money right now based on average price per sqft and price ranges. | |
4. **Location-Specific Insights**: For the top 2-3 locations, provide specific observations on amenities, connectivity, future development potential, and why they might be good/bad for investment or living. | |
5. **Investment Recommendations**: Based on the data, what would be your general investment advice for someone looking into Pune real estate? | |
6. **Future Market Outlook**: Briefly discuss the short-to-medium term outlook (next 1-2 years) for the Pune real estate market. | |
Ensure your response is conversational, professional, and directly actionable for a property buyer. | |
""" | |
response = model.generate_content(context) | |
return self._clean_gemini_text(response.text) # Apply cleaning here | |
except Exception as e: | |
logger.error(f"Error generating insights using Gemini: {e}") | |
return "I'm analyzing the market data to provide you with detailed insights. Based on the current search results, I can see various opportunities in your preferred areas. Please bear with me while I gather more information." | |
def get_nearby_properties_gemini(self, location: str, requirements: Dict[str, Any]) -> str: | |
"""Get nearby properties and recent market data using Gemini""" | |
try: | |
# Refine requirements to be more human-readable for Gemini | |
req_str = "" | |
for k, v in requirements.items(): | |
if k == 'min_price': req_str += f"Minimum Budget: {v} Lakhs. " | |
elif k == 'max_price': req_str += f"Maximum Budget: {v} Lakhs. " | |
elif k == 'min_area': req_str += f"Minimum Area: {v} sq ft. " | |
elif k == 'max_area': req_str += f"Maximum Area: {v} sq ft. " | |
elif k == 'bhk': req_str += f"BHK: {v}. " | |
elif k == 'area_type': req_str += f"Area Type: {v}. " | |
elif k == 'availability': req_str += f"Availability: {v}. " | |
elif k == 'location': req_str += f"Preferred Location: {v}. " | |
if not req_str: req_str = "No specific requirements provided beyond location." | |
context = f""" | |
You are a knowledgeable real estate advisor with up-to-date market information for Pune. | |
A client is interested in properties near **{location}**. | |
Their current general requirements are: {req_str.strip()} | |
Please provide a concise but informative overview focusing on: | |
1. **Recent Property Launches**: Mention any significant residential projects launched in {location} or very close vicinity in the last 6-12 months. | |
2. **Upcoming Projects**: Are there any major upcoming residential or commercial developments (e.g., IT parks, malls, new infrastructure) that could impact property values in {location} or adjacent areas? | |
3. **Infrastructure Developments**: Discuss any planned or ongoing infrastructure improvements (roads, metro, civic amenities) that might affect property values and lifestyle in {location}. | |
4. **Market Trends in {location}**: What are the current buying/rental trends specific to {location}? Is it a high-demand area, or is supply outpacing demand? | |
5. **Comparative Analysis**: Briefly compare {location} with one or two similar, nearby localities in terms of property values, amenities, and lifestyle. | |
6. **Investment Potential**: What is the general investment outlook and potential rental yield for properties in {location}? | |
Focus on specific details relevant to the Pune real estate market in 2024-2025. | |
""" | |
response = model.generate_content(context) | |
return self._clean_gemini_text(response.text) # Apply cleaning here | |
except Exception as e: | |
logger.error(f"Error getting nearby properties from Gemini: {e}") | |
return f"I'm gathering information about recent properties and market trends near {location}. This area has shown consistent growth in property values, and I'll get you more specific details shortly." | |
def update_context_memory(self, user_query: str, results: Dict[str, Any]): | |
"""Update context memory to maintain conversation continuity""" | |
self.context_memory.append({ | |
'timestamp': datetime.now().isoformat(), | |
'user_query': user_query, | |
'results_summary': { | |
'total_properties': results.get('total_properties', 0), | |
'price_range': results.get('price_range', {}), | |
'locations': list(results.get('location_distribution', {}).keys()) | |
} | |
}) | |
# Keep only last 5 interactions | |
if len(self.context_memory) > 5: | |
self.context_memory.pop(0) | |
def extract_requirements(self, user_message: str) -> Dict[str, Any]: | |
"""Extract property requirements from user message""" | |
requirements = {} | |
message_lower = user_message.lower() | |
# Extract BHK | |
bhk_match = re.search(r'(\d+)\s*bhk', message_lower) | |
if bhk_match: | |
try: | |
requirements['bhk'] = int(bhk_match.group(1)) | |
except ValueError: | |
pass # Ignore if not a valid number | |
# Extract budget range (flexible with "lakh", "crore", "million") | |
budget_match = re.search( | |
r'(?:budget|price|cost)(?:.*?(?:of|between|from)\s*)?' | |
r'(\d+(?:\.\d+)?)\s*(?:lakhs?|crores?|million)?(?:(?:\s*to|\s*-\s*)\s*(\d+(?:\.\d+)?)\s*(?:lakhs?|crores?|million)?)?', | |
message_lower | |
) | |
if budget_match: | |
try: | |
min_amount = float(budget_match.group(1)) | |
max_amount = float(budget_match.group(2)) if budget_match.group(2) else None | |
# Detect units for both min and max if provided | |
if 'crore' in budget_match.group(0): # Check original matched string for "crore" | |
requirements['min_price'] = min_amount * 100 | |
if max_amount: | |
requirements['max_price'] = max_amount * 100 | |
elif 'million' in budget_match.group(0): | |
requirements['min_price'] = min_amount * 10 | |
if max_amount: | |
requirements['max_price'] = max_amount * 10 | |
else: # Default to lakhs | |
requirements['min_price'] = min_amount | |
if max_amount: | |
requirements['max_price'] = max_amount | |
except ValueError: | |
pass | |
# Extract location | |
found_location = False | |
for location in self.locations: | |
if location.lower() in message_lower: | |
requirements['location'] = location | |
found_location = True | |
break | |
# Broader location matching if specific location not found | |
if not found_location: | |
general_locations = ['pune', 'pcmp'] # Add more general terms if needed | |
for gen_loc in general_locations: | |
if gen_loc in message_lower: | |
requirements['location'] = gen_loc.capitalize() # Or a default location like 'Pune' | |
break | |
# Extract area requirements | |
area_match = re.search(r'(\d+(?:\.\d+)?)\s*(?:to|-)?\s*(\d+(?:\.\d+)?)?\s*(?:sq\s*ft|sqft|square\s*feet|sft)', | |
message_lower) | |
if area_match: | |
try: | |
requirements['min_area'] = float(area_match.group(1)) | |
if area_match.group(2): | |
requirements['max_area'] = float(area_match.group(2)) | |
except ValueError: | |
pass | |
# Extract area type | |
for area_type in self.area_types: | |
if area_type.lower() in message_lower: | |
requirements['area_type'] = area_type | |
break | |
# Extract availability | |
for availability in self.availability_types: | |
if availability.lower() in message_lower: | |
requirements['availability'] = availability | |
break | |
return requirements | |
def handle_query(self, user_message: str) -> Dict[str, Any]: | |
"""Main function to handle user queries with context awareness""" | |
try: | |
# Extract requirements from current message | |
requirements = self.extract_requirements(user_message) | |
# Merge with previous context if building on last query | |
if self.is_follow_up_query(user_message): | |
requirements = self.merge_with_context(requirements) | |
# Update client preferences (store all extracted preferences) | |
self.client_preferences.update(requirements) | |
# Filter properties based on requirements | |
filtered_df = self.filter_properties(requirements) | |
# If no properties found, provide suggestions | |
if filtered_df.empty: | |
return self.handle_no_results(requirements) | |
# Predict prices for filtered results | |
filtered_df = self.predict_prices_for_filtered_results(filtered_df) | |
# Get price range analysis | |
analysis = self.get_price_range_analysis(filtered_df) | |
# Generate deep insights | |
insights = self.generate_deep_insights(filtered_df, analysis) | |
# Get nearby properties information | |
nearby_info = "" | |
if requirements.get('location'): | |
nearby_info = self.get_nearby_properties_gemini(requirements['location'], requirements) | |
# Prepare response | |
response_data = { | |
'filtered_properties': filtered_df.to_dict('records'), | |
'analysis': analysis, | |
'insights': insights, | |
'nearby_properties': nearby_info, | |
'requirements': requirements, | |
'context_aware': self.is_follow_up_query(user_message) | |
} | |
# Update context memory | |
self.update_context_memory(user_message, analysis) | |
self.last_search_results = response_data # Store the full response | |
return response_data | |
except Exception as e: | |
logger.error(f"Error handling query: {e}") | |
return {'error': f'An error occurred while processing your query: {str(e)}'} | |
def is_follow_up_query(self, user_message: str) -> bool: | |
"""Check if this is a follow-up query building on previous results""" | |
follow_up_indicators = ['also', 'and', 'additionally', 'what about', 'show me more', 'similar', 'nearby', | |
'around', 'close to', 'next', 'other', 'different', 'change', 'update'] | |
return any(indicator in user_message.lower() for indicator in follow_up_indicators) and len( | |
self.context_memory) > 0 | |
def merge_with_context(self, new_requirements: Dict[str, Any]) -> Dict[str, Any]: | |
"""Merge new requirements with context from previous queries""" | |
# Start with the overall client preferences | |
merged = self.client_preferences.copy() | |
# Overwrite with any new requirements from the current message | |
merged.update(new_requirements) | |
# Apply default or previously established values if not specified in current query | |
# Example: If a new query doesn't specify location, use the last known location | |
if 'location' not in new_requirements and 'location' in self.client_preferences: | |
merged['location'] = self.client_preferences['location'] | |
# You can add more sophisticated merging logic here if needed | |
# e.g., if a new budget is provided, replace the old one entirely. | |
# The current update() method already handles this for direct key conflicts. | |
return merged | |
def handle_no_results(self, requirements: Dict[str, Any]) -> Dict[str, Any]: | |
"""Handle cases where no properties match the criteria""" | |
suggestions = ["Try relaxing some of your criteria."] | |
if requirements.get('max_price'): | |
suggestions.append(f"Consider increasing your budget above βΉ{requirements['max_price']:.2f}L.") | |
# Suggest a slightly higher range | |
suggestions.append(f"Perhaps a range of βΉ{requirements['max_price']:.2f}L to βΉ{(requirements['max_price'] * 1.2):.2f}L?") | |
if requirements.get('location'): | |
# Find nearby popular locations in the dataset, excluding the one searched | |
nearby_locs = [loc for loc in self.locations if loc.lower() != requirements['location'].lower()] | |
if nearby_locs: | |
# Sort by count if possible, or just pick top 3 | |
top_locations = self.df['site_location'].value_counts().index.tolist() | |
suggested_nearby = [loc for loc in top_locations if loc.lower() != requirements['location'].lower()][:3] | |
if suggested_nearby: | |
suggestions.append(f"Consider nearby popular areas like: {', '.join(suggested_nearby)}.") | |
if requirements.get('bhk') is not None: | |
# Suggest +/- 1 BHK if not already at min/max reasonable bhk | |
if requirements['bhk'] > 1: | |
suggestions.append(f"Consider looking for {requirements['bhk'] - 1} BHK options.") | |
if requirements['bhk'] < 6: # Assuming 6 is a practical upper limit for most searches | |
suggestions.append(f"Consider {requirements['bhk'] + 1} BHK options.") | |
if requirements.get('max_area'): | |
suggestions.append(f"You might find more options if you consider properties up to ~{(requirements['max_area'] * 1.1):.0f} sq ft.") | |
# Default message if specific suggestions are not generated | |
if len(suggestions) == 1 and "relaxing" in suggestions[0]: | |
final_suggestion_text = "No properties found matching your exact criteria. Please try broadening your search or clarifying your preferences." | |
else: | |
final_suggestion_text = f"No properties found matching your exact criteria. Here are some suggestions: {'; '.join(suggestions)}." | |
return { | |
'filtered_properties': [], | |
'analysis': self.get_price_range_analysis(pd.DataFrame()), # Empty analysis | |
'insights': final_suggestion_text, | |
'nearby_properties': '', | |
'requirements': requirements, | |
'suggestions': suggestions | |
} | |
# Initialize the enhanced sales assistant (MUST be after class definition) | |
sales_assistant = EnhancedRealEstateSalesAssistant() | |
def index(): | |
return render_template('index2.html') | |
def chat(): | |
"""Enhanced chat endpoint with comprehensive property search""" | |
try: | |
data = request.get_json() | |
user_message = data.get('message', '').strip() | |
if not user_message: | |
return jsonify({'error': 'Please provide your query'}), 400 | |
# Get comprehensive response from sales assistant | |
response_data = sales_assistant.handle_query(user_message) | |
# Ensure response_data always includes necessary keys even on error | |
if 'error' in response_data: | |
return jsonify({ | |
'response': response_data, | |
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), | |
'conversation_context': len(sales_assistant.context_memory) | |
}), 500 # Return 500 for internal errors | |
else: | |
return jsonify({ | |
'response': response_data, | |
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), | |
'conversation_context': len(sales_assistant.context_memory) | |
}) | |
except Exception as e: | |
logger.error(f"Chat endpoint error: {e}") | |
return jsonify({'error': f'An unexpected error occurred in chat endpoint: {str(e)}'}), 500 | |
# Flask routes must be defined at the module level, not inside a class | |
def model_status(): | |
"""Get current model status and version info""" | |
try: | |
status = { | |
'model_loaded': hasattr(sales_assistant, 'pipeline') and sales_assistant.pipeline is not None, | |
'model_version': getattr(sales_assistant, 'model_version', 'unknown'), | |
'feature_count': len(getattr(sales_assistant, 'training_feature_names', [])), | |
'prediction_method': 'ML' if hasattr(sales_assistant, 'pipeline') and sales_assistant.pipeline else 'Fallback', | |
'sklearn_version': sklearn.__version__ | |
} | |
return jsonify(status) | |
except Exception as e: | |
logger.error(f"Error in model_status endpoint: {e}") | |
return jsonify({'error': str(e)}), 500 | |
def retrain_model(): | |
"""Retrain the model with current version""" | |
try: | |
logger.info("Starting model retraining via endpoint...") | |
success = sales_assistant.retrain_model_with_current_version() | |
if success: | |
return jsonify({ | |
'message': 'Model retrained successfully', | |
'version': sales_assistant.model_version, | |
'feature_count': len(sales_assistant.training_feature_names), | |
'sample_features': sales_assistant.training_feature_names[:10] if sales_assistant.training_feature_names else [] | |
}) | |
else: | |
return jsonify({'error': 'Failed to retrain model. Check logs for details.'}), 500 | |
except Exception as e: | |
logger.error(f"Retrain endpoint error: {e}") | |
return jsonify({'error': f'An unexpected error occurred during retraining: {str(e)}'}), 500 | |
def filter_properties_endpoint(): # Renamed to avoid conflict with class method | |
"""Dedicated endpoint for property filtering""" | |
try: | |
filters = request.get_json() | |
filtered_df = sales_assistant.filter_properties(filters) | |
filtered_df = sales_assistant.predict_prices_for_filtered_results(filtered_df) | |
analysis = sales_assistant.get_price_range_analysis(filtered_df) | |
return jsonify({ | |
'properties': filtered_df.to_dict('records'), | |
'analysis': analysis, | |
'total_count': len(filtered_df) | |
}) | |
except Exception as e: | |
logger.error(f"Filter endpoint error: {e}") | |
return jsonify({'error': f'Filtering error: {str(e)}'}), 500 | |
def get_insights(): | |
"""Get AI-powered insights for current search results""" | |
try: | |
data = request.get_json() | |
location_from_request = data.get('location') | |
requirements_from_request = data.get('requirements', {}) | |
if sales_assistant.last_search_results: | |
filtered_df = pd.DataFrame(sales_assistant.last_search_results['filtered_properties']) | |
analysis = sales_assistant.last_search_results['analysis'] | |
insights = sales_assistant.generate_deep_insights(filtered_df, analysis) | |
nearby_info = "" | |
# Use location from request if provided, otherwise from last search results | |
effective_location = location_from_request or sales_assistant.last_search_results['requirements'].get('location') | |
if effective_location: | |
nearby_info = sales_assistant.get_nearby_properties_gemini(effective_location, requirements_from_request or sales_assistant.last_search_results['requirements']) | |
return jsonify({ | |
'insights': insights, | |
'nearby_properties': nearby_info | |
}) | |
else: | |
return jsonify({'error': 'No previous search results available for insights. Please perform a search first.'}), 400 | |
except Exception as e: | |
logger.error(f"Insights endpoint error: {e}") | |
return jsonify({'error': f'Insights error: {str(e)}'}), 500 | |
def get_market_data(): | |
"""Get comprehensive market data""" | |
try: | |
# Ensure df is loaded before accessing its properties | |
if not hasattr(sales_assistant, 'df') or sales_assistant.df.empty: | |
return jsonify({'error': 'Market data not available, dataset not loaded.'}), 500 | |
market_data = { | |
'locations': sales_assistant.locations, | |
'area_types': sales_assistant.area_types, | |
'availability_types': sales_assistant.availability_types, | |
'bhk_options': sales_assistant.bhk_options, | |
'price_statistics': { | |
'min_price': float(sales_assistant.df['price'].min()), | |
'max_price': float(sales_assistant.df['price'].max()), | |
'avg_price': float(sales_assistant.df['price'].mean()), | |
'median_price': float(sales_assistant.df['price'].median()) | |
}, | |
'area_statistics': { | |
'min_area': float(sales_assistant.df['total_sqft'].min()), | |
'max_area': float(sales_assistant.df['total_sqft'].max()), | |
'avg_area': float(sales_assistant.df['total_sqft'].mean()) | |
} | |
} | |
return jsonify(market_data) | |
except Exception as e: | |
logger.error(f"Market data endpoint error: {e}") | |
return jsonify({'error': f'Market data error: {str(e)}'}), 500 | |
def get_client_preferences(): | |
"""Get current client preferences and conversation history""" | |
return jsonify({ | |
'preferences': sales_assistant.client_preferences, | |
'conversation_history': sales_assistant.context_memory, | |
'last_search_available': sales_assistant.last_search_results is not None | |
}) | |
def reset_session(): | |
"""Reset entire session including preferences and context""" | |
try: | |
sales_assistant.client_preferences = {} | |
sales_assistant.context_memory = [] | |
sales_assistant.last_search_results = None | |
return jsonify({'message': 'Session reset successfully'}) | |
except Exception as e: | |
logger.error(f"Reset session error: {e}") | |
return jsonify({'error': f'Failed to reset session: {str(e)}'}), 500 | |
def health(): | |
"""Health check endpoint""" | |
try: | |
return jsonify({ | |
'status': 'healthy', | |
'model_loaded': hasattr(sales_assistant, 'pipeline') and sales_assistant.pipeline is not None, | |
'model_version': sales_assistant.model_version, | |
'data_loaded': hasattr(sales_assistant, 'df') and not sales_assistant.df.empty, | |
'dataset_size': len(sales_assistant.df) if hasattr(sales_assistant, 'df') else 0, | |
'locations_available': len(sales_assistant.locations), | |
'context_memory_size': len(sales_assistant.context_memory), | |
'timestamp': datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"Health check error: {e}") | |
return jsonify({'status': 'unhealthy', 'error': str(e)}), 500 | |
if __name__ == '__main__': | |
# It's better to explicitly set the host for deployment environments. | |
# For development, debug=True is useful but should be False in production. | |
app.run(debug=True, host='0.0.0.0', port=5000) |