Spaces:
Sleeping
Sleeping
from flask import Flask, render_template, request, redirect, url_for, send_file | |
import os | |
import shutil | |
import pandas as pd | |
from werkzeug.utils import secure_filename | |
from joblib import load, dump | |
import numpy as np | |
from sklearn.preprocessing import LabelEncoder | |
from time import time | |
from huggingface_hub import hf_hub_download | |
import pickle | |
import uuid | |
from pathlib import Path | |
import numpy as np | |
import pandas as pd | |
import seaborn as sns | |
import matplotlib as mpl | |
import matplotlib.pyplot as plt | |
import matplotlib.pylab as pylab | |
from sklearn.preprocessing import OneHotEncoder, LabelEncoder | |
from sklearn.model_selection import train_test_split | |
from sklearn.preprocessing import StandardScaler | |
from sklearn.decomposition import PCA | |
from sklearn.pipeline import Pipeline | |
from sklearn.tree import DecisionTreeRegressor | |
from sklearn.ensemble import RandomForestRegressor | |
from sklearn.linear_model import LinearRegression | |
from xgboost import XGBRegressor | |
from sklearn.neighbors import KNeighborsRegressor | |
from sklearn.model_selection import cross_val_score | |
from sklearn.metrics import mean_squared_error | |
from sklearn import metrics | |
from sklearn.model_selection import train_test_split | |
from sklearn.pipeline import Pipeline | |
from sklearn.preprocessing import PowerTransformer, StandardScaler | |
from sklearn.ensemble import RandomForestRegressor | |
from sklearn.model_selection import train_test_split, cross_val_score, RandomizedSearchCV | |
import lightgbm as lgb | |
from catboost import CatBoostRegressor | |
from sklearn.ensemble import StackingRegressor | |
import json | |
import imblearn | |
app = Flask(__name__) | |
# Set the secret key for session management | |
app.secret_key = os.urandom(24) | |
# Configurations | |
UPLOAD_FOLDER = "uploads/" | |
DATA_FOLDER = "data/" | |
MODEL_FOLDER = "models/" | |
os.makedirs(MODEL_FOLDER, exist_ok=True) | |
# Define the model directory and label encoder directory | |
MODEL_DIR = r'./Model' | |
LABEL_ENCODER_DIR = r'./Label_encoders' # Renamed for clarity | |
# Global file names for outputs; these will be updated per prediction. | |
# Note: we now include a unique id to avoid overwriting. | |
PRED_OUTPUT_FILE = None | |
CLASS_OUTPUT_FILE = None | |
ALLOWED_EXTENSIONS = {'csv', 'xlsx'} | |
# Create directories if they do not exist. | |
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) | |
app.config['DATA_FOLDER'] = DATA_FOLDER | |
os.makedirs(app.config['DATA_FOLDER'], exist_ok=True) | |
os.makedirs("data", exist_ok=True) | |
app.config['MODEL_FOLDER'] = MODEL_FOLDER | |
os.makedirs(app.config['MODEL_FOLDER'], exist_ok=True) | |
# Prediction analysis models loaded from Hugging Face. | |
#classsification model on the task | |
# ---------------------------------------------- | |
# Code classification models for real data. | |
# ---------------------------------------------- | |
#black code change | |
src_path = hf_hub_download( | |
repo_id="WebashalarForML/Diamond_model_", | |
filename="CLASS_DUMMY/DT_best__2_class_blk(M)_change.pkl", | |
cache_dir=MODEL_FOLDER | |
) | |
dst_path = os.path.join(MODEL_FOLDER, "DT_best__2_class_blk(M)_change.pkl") | |
shutil.copy(src_path, dst_path) | |
blk_change = load(dst_path) | |
# white code change | |
src_path = hf_hub_download( | |
repo_id="WebashalarForML/Diamond_model_", | |
filename="CLASS_DUMMY/DT_best__2_class_wht(M)_change.pkl", | |
cache_dir=MODEL_FOLDER | |
) | |
dst_path = os.path.join(MODEL_FOLDER, "DT_best__2_class_wht(M)_change.pkl") | |
shutil.copy(src_path, dst_path) | |
wht_change = load(dst_path) | |
# pav code change | |
src_path = hf_hub_download( | |
repo_id="WebashalarForML/Diamond_model_", | |
filename="CLASS_DUMMY/DT_best__2_class_pav(M)_change.pkl", | |
cache_dir=MODEL_FOLDER | |
) | |
dst_path = os.path.join(MODEL_FOLDER, "DT_best__2_class_pav(M)_change.pkl") | |
shutil.copy(src_path, dst_path) | |
pav_change = load(dst_path) | |
#open code change | |
src_path = hf_hub_download( | |
repo_id="WebashalarForML/Diamond_model_", | |
filename="CLASS_DUMMY/DT_best__2_class_open(M)_change.pkl", | |
cache_dir=MODEL_FOLDER | |
) | |
dst_path = os.path.join(MODEL_FOLDER, "DT_best__2_class_open(M)_change.pkl") | |
shutil.copy(src_path, dst_path) | |
open_change = load(dst_path) | |
# ---------------------------------------------- | |
# parameter classification models for real data. | |
# ---------------------------------------------- | |
#shape change | |
src_path = hf_hub_download( | |
repo_id="WebashalarForML/Diamond_model_", | |
filename="CLASS_DUMMY/DT_best__2_class_shp_change.pkl", | |
cache_dir=MODEL_FOLDER | |
) | |
dst_path = os.path.join(MODEL_FOLDER, "DT_best__2_class_shp_change.pkl") | |
shutil.copy(src_path, dst_path) | |
shape_change = load(dst_path) | |
# color change | |
src_path = hf_hub_download( | |
repo_id="WebashalarForML/Diamond_model_", | |
filename="CLASS_DUMMY/DT_best__2_class_col_change.pkl", | |
cache_dir=MODEL_FOLDER | |
) | |
dst_path = os.path.join(MODEL_FOLDER, "DT_best__2_class_col_change.pkl") | |
shutil.copy(src_path, dst_path) | |
col_change = load(dst_path) | |
# quality change | |
src_path = hf_hub_download( | |
repo_id="WebashalarForML/Diamond_model_", | |
filename="CLASS_DUMMY/DT_best__2_class_qua_change.pkl", | |
cache_dir=MODEL_FOLDER | |
) | |
dst_path = os.path.join(MODEL_FOLDER, "DT_best__2_class_qua_change.pkl") | |
shutil.copy(src_path, dst_path) | |
qua_change = load(dst_path) | |
# cut change | |
src_path = hf_hub_download( | |
repo_id="WebashalarForML/Diamond_model_", | |
filename="CLASS_DUMMY/DT_best__2_class_cut_change.pkl", | |
cache_dir=MODEL_FOLDER | |
) | |
dst_path = os.path.join(MODEL_FOLDER, "DT_best__2_class_cut_change.pkl") | |
shutil.copy(src_path, dst_path) | |
cut_change = load(dst_path) | |
print("================================") | |
# List of label encoder names. | |
encoder_list = [ | |
'EngShp', 'EngQua', 'EngCol', 'EngCut', 'EngPol', 'EngSym', 'EngFlo', | |
'EngNts', 'EngMikly','EngBlk', 'EngWht', 'EngOpen','EngPav', | |
'Change_cts_value_v2', 'Change_shape_value_v2', 'Change_quality_value_v2', 'Change_color_value_v2', | |
'Change_cut_value_v2', 'Change_Blk_Eng_to_Mkbl_value_2', 'Change_Wht_Eng_to_Mkbl_value_2', | |
'Change_Open_Eng_to_Mkbl_value_2', 'Change_Pav_Eng_to_Mkbl_value_2', 'Change_Blk_Eng_to_Grd_value_2', | |
'Change_Wht_Eng_to_Grd_value_2', 'Change_Open_Eng_to_Grd_value_2', 'Change_Pav_Eng_to_Grd_value_2', | |
'Change_Blk_Eng_to_ByGrd_value_2', 'Change_Wht_Eng_to_ByGrd_value_2', 'Change_Open_Eng_to_ByGrd_value_2', | |
'Change_Pav_Eng_to_ByGrd_value_2', 'Change_Blk_Eng_to_Gia_value_2', 'Change_Wht_Eng_to_Gia_value_2', | |
'Change_Open_Eng_to_Gia_value_2', 'Change_Pav_Eng_to_Gia_value_2' | |
] | |
# Load label encoders using pathlib for cleaner path management. | |
loaded_label_encoder = {} | |
enc_path = Path(LABEL_ENCODER_DIR) | |
for val in encoder_list: | |
encoder_file = enc_path / f"label_encoder_{val}.joblib" | |
loaded_label_encoder[val] = load(encoder_file) | |
# ----------------------------------------- | |
# Utility: Allowed File Check | |
# ----------------------------------------- | |
def allowed_file(filename): | |
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
# ----------------------------------------- | |
# Routes | |
# ----------------------------------------- | |
def index(): | |
return render_template('index.html') | |
def predict(): | |
if 'file' not in request.files: | |
print('No file part', 'error') | |
return redirect(url_for('index')) | |
file = request.files['file'] | |
if file.filename == '': | |
print('No selected file', 'error') | |
return redirect(url_for('index')) | |
if file and allowed_file(file.filename): | |
filename = secure_filename(file.filename) | |
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
file.save(filepath) | |
# Convert file to DataFrame | |
try: | |
if filename.endswith('.csv'): | |
df = pd.read_csv(filepath) | |
else: | |
df = pd.read_excel(filepath) | |
except Exception as e: | |
print(f'Error reading file: {e}', 'error') | |
return redirect(url_for('index')) | |
# Process the DataFrame and generate predictions and classification analysis. | |
df_pred, dx_class = process_dataframe(df) | |
if df_pred.empty: | |
print("Processed prediction DataFrame is empty. Check the input file and processing logic.", "error") | |
return redirect(url_for('index')) | |
# Save output files with a timestamp and unique id. | |
current_date = pd.Timestamp.now().strftime("%Y-%m-%d") | |
unique_id = uuid.uuid4().hex[:8] | |
global PRED_OUTPUT_FILE, CLASS_OUTPUT_FILE | |
PRED_OUTPUT_FILE = f'data/prediction_output_{current_date}_{unique_id}.csv' | |
CLASS_OUTPUT_FILE = f'data/classification_output_{current_date}_{unique_id}.csv' | |
df_pred.to_csv(PRED_OUTPUT_FILE, index=False) | |
dx_class.to_csv(CLASS_OUTPUT_FILE, index=False) | |
# Redirect to report view; default to prediction report, page 1. | |
return redirect(url_for('report_view', report_type='pred', page=1)) | |
else: | |
print('Invalid file type. Only CSV and Excel files are allowed.', 'error') | |
return redirect(url_for('index')) | |
def process_dataframe(df): | |
try: | |
#df = df[df["MkblAmt"].notna()] | |
# Define the columns needed for two parts. | |
required_columns = ['EngCts', 'EngShp', 'EngQua', 'EngCol', 'EngCut', 'EngPol', | |
'EngSym', 'EngFlo', 'EngNts', 'EngMikly', 'EngBlk', 'EngWht', 'EngOpen', | |
'EngPav', 'EngAmt'] | |
required_columns_2 = ['EngCts', 'EngShp', 'EngQua', 'EngCol', 'EngCut', 'EngPol', | |
'EngSym', 'EngFlo', 'EngNts', 'EngMikly', 'EngAmt'] | |
# Create two DataFrames: one for prediction and one for classification. | |
df_pred = df[required_columns].copy() | |
#df_pred = df_pred[(df_pred[['EngCts']] > 0.99).all(axis=1) & (df_pred[['EngCts']] < 1.50).all(axis=1)] | |
df_pred[['EngBlk', 'EngWht', 'EngOpen', 'EngPav']]=df_pred[['EngBlk', 'EngWht', 'EngOpen', 'EngPav']].fillna("NA") | |
df_class = df[required_columns_2].fillna("NA").copy() | |
# Transform categorical columns for prediction DataFrame using the label encoders. | |
for col in ['EngShp', 'EngQua', 'EngCol', 'EngCut', 'EngPol', 'EngSym', 'EngFlo', 'EngNts', 'EngMikly','EngBlk', 'EngWht', 'EngOpen', 'EngPav']: | |
try: | |
encoder = loaded_label_encoder[col] | |
df_pred[col] = df_pred[col].map(lambda x: encoder.transform([x])[0] if x in encoder.classes_ else -1) | |
# df_pred[col] = loaded_label_encoder[col].transform(df_pred[col]) | |
except ValueError as e: | |
print(f'Invalid value in column {col}: {e}', 'error') | |
return pd.DataFrame(), pd.DataFrame() | |
# Update the classification DataFrame with the transformed prediction columns. | |
for col in ['EngShp', 'EngQua', 'EngCol', 'EngCut', 'EngPol', 'EngSym', 'EngFlo', 'EngNts', 'EngMikly']: | |
df_class[col] = df_pred[col] | |
df_pred = df_pred.astype(float) | |
df_class = df_class.astype(float) | |
# ------------------------------------ | |
# Prediction Report Section | |
# ------------------------------------ | |
try: | |
# for model BLK CODE | |
df_pred_0 = df_pred.copy() | |
df_pred_0['Change_Blk_Eng_to_Mkbl_value_2'] = pd.DataFrame(blk_change.predict(df_pred), columns=["Change_Blk_Eng_to_Mkbl_value_2"]) | |
print(df_pred_0.columns) | |
# for model WHT CODE | |
df_pred_0['Change_Wht_Eng_to_Mkbl_value_2'] = pd.DataFrame(wht_change.predict(df_pred), columns=["Change_Wht_Eng_to_Mkbl_value_2"]) | |
print(df_pred_0.columns) | |
# for model PAV CODE (need change) | |
df_pred_0['Change_Pav_Eng_to_Mkbl_value_2'] = pd.DataFrame(pav_change.predict(df_pred), columns=["Change_Pav_Eng_to_Mkbl_value_2"]) | |
print(df_pred_0.columns) | |
# for model OPEN CODE (need change) | |
df_pred_0['Change_Open_Eng_to_Mkbl_value_2'] = pd.DataFrame(open_change.predict(df_pred), columns=["Change_Open_Eng_to_Mkbl_value_2"]) | |
print(df_pred_0.columns) | |
# for model SHP CODE (need change) | |
df_pred_0['Change_shape_value_v2'] = pd.DataFrame(shape_change.predict(df_class), columns=["Change_shape_value_v2"]) | |
print(df_pred_0.columns) | |
# for model COL CODE (need change) | |
df_pred_0['Change_color_value_v2'] = pd.DataFrame(col_change.predict(df_class), columns=["Change_color_value_v2"]) | |
print(df_pred_0.columns) | |
# for model CUT CODE (need change) | |
df_pred_0['Change_cut_value_v2'] = pd.DataFrame(cut_change.predict(df_class), columns=["Change_cut_value_v2"]) | |
print(df_pred_0.columns) | |
# for model QUA CODE (need change) | |
df_pred_0['Change_quality_value_v2'] = pd.DataFrame(qua_change.predict(df_class), columns=["Change_quality_value_v2"]) | |
print(df_pred_0.columns) | |
# Concatenate the DataFrames row-wise | |
#df_pred_main = pd.concat([df_pred_0, df_pred_1, df_pred_0], ignore_index=True) | |
df_pred_main = df_pred_0.copy() | |
for col in ['EngShp', 'EngQua', 'EngCol', 'EngCut', 'EngPol', 'EngSym', 'EngFlo', | |
'EngNts', 'EngMikly','EngBlk', 'EngWht', 'EngOpen','EngPav', | |
'Change_shape_value_v2','Change_quality_value_v2', 'Change_color_value_v2', 'Change_cut_value_v2', | |
#'Change_cts_value_v2', | |
'Change_Blk_Eng_to_Mkbl_value_2', | |
'Change_Wht_Eng_to_Mkbl_value_2', | |
'Change_Open_Eng_to_Mkbl_value_2', | |
'Change_Pav_Eng_to_Mkbl_value_2', | |
#'Change_Blk_Eng_to_Grd_value_2','Change_Wht_Eng_to_Grd_value_2', 'Change_Open_Eng_to_Grd_value_2', 'Change_Pav_Eng_to_Grd_value_2', | |
#'Change_Blk_Eng_to_ByGrd_value_2', 'Change_Wht_Eng_to_ByGrd_value_2', 'Change_Open_Eng_to_ByGrd_value_2', 'Change_Pav_Eng_to_ByGrd_value_2', | |
#'Change_Blk_Eng_to_Gia_value_2', 'Change_Wht_Eng_to_Gia_value_2', 'Change_Open_Eng_to_Gia_value_2', 'Change_Pav_Eng_to_Gia_value_2' | |
]: | |
try: | |
#def safe_inverse_transform(le: LabelEncoder, codes: np.ndarray): | |
# known = set(le.classes_) | |
# return np.array([le.inverse_transform([c])[0] if c in known else "Unknown" for c in codes]) | |
#df_pred_main[col] = safe_inverse_transform(loaded_label_encoder[col], df_pred_0[col]) | |
df_pred_main[col] = loaded_label_encoder[col].inverse_transform(df_pred_main[col].astype(int)) | |
except ValueError as e: | |
print(f'inverse transform fails value in column {col}: {e}', 'error') | |
except ValueError as e: | |
print(f'pred model error----->: {e}', 'error') | |
print("EngBlk", df_pred_main['EngBlk'].unique()) | |
print("EngWht", df_pred_main['EngWht'].unique()) | |
print("EngOpen", df_pred_main['EngOpen'].unique()) | |
print("EngPav", df_pred_main['EngPav'].unique()) | |
# Final return with full data for pagination. | |
df_pred_main['EngBlk'] = df_pred_main['EngBlk'].fillna("-") | |
df_pred_main['EngWht'] = df_pred_main['EngWht'].fillna("-") | |
df_pred_main['EngOpen'] = df_pred_main['EngOpen'].fillna("-") | |
df_pred_main['EngPav'] = df_pred_main['EngPav'].fillna("-") | |
df_pred_main['EngBlk'] = df_pred_main['EngBlk'].replace("NA", "-", regex=True) | |
df_pred_main['EngWht'] = df_pred_main['EngWht'].replace("NA", "-", regex=True) | |
df_pred_main['EngOpen'] = df_pred_main['EngOpen'].replace("NA", "-", regex=True) | |
df_pred_main['EngPav'] = df_pred_main['EngPav'].replace("NA", "-", regex=True) | |
# Final step to replace NaN or empty values with "-" | |
df_pred_main = df_pred_main.fillna("-") | |
df_pred_main = df_pred_main.replace(r'^\s*$', "-", regex=True) | |
return df_pred_main, df_pred_main | |
except Exception as e: | |
print(f'Error processing file: {e}', 'error') | |
return pd.DataFrame(), pd.DataFrame() | |
# ---------------------------------------------------- | |
# Report View Route with Pagination & Toggle | |
# ---------------------------------------------------- | |
def report_view(): | |
report_type = request.args.get('report_type', 'pred') | |
try: | |
page = int(request.args.get('page', 1)) | |
except ValueError: | |
page = 1 | |
per_page = 15 | |
# load CSV | |
if report_type == 'pred': | |
df = pd.read_csv(PRED_OUTPUT_FILE) | |
else: | |
df = pd.read_csv(CLASS_OUTPUT_FILE) | |
# page slice | |
start = (page - 1) * per_page | |
end = start + per_page | |
df_page = df.iloc[start:end].copy() | |
# optional: colored arrow in Makable_Predicted | |
def add_colored_arrow(row): | |
try: | |
pred = float(row['Makable_Predicted']) | |
diff = float(row['Makable_Diff']) | |
arrow = '↑' if diff > 0 else '↓' | |
color = 'green' if diff > 0 else 'red' | |
return f"{pred:.3f} <span style='color:{color};'>{arrow}</span>" | |
except: | |
return row.get('Makable_Predicted', '') | |
df_page['Makable_Predicted'] = df_page.apply(add_colored_arrow, axis=1) | |
# render to HTML (allow our <span> tags) | |
table_html = df_page.to_html( | |
classes="report-table", | |
index=False, | |
escape=False | |
) | |
has_prev = page > 1 | |
has_next = end < len(df) | |
return render_template( | |
"output.html", | |
report_type=report_type, | |
page=page, | |
has_prev=has_prev, | |
has_next=has_next, | |
table_html=table_html | |
) | |
# ------------------------------ | |
# Download Routes | |
# ------------------------------ | |
def download_pred(): | |
return send_file(PRED_OUTPUT_FILE, as_attachment=True) | |
def download_class(): | |
return send_file(CLASS_OUTPUT_FILE, as_attachment=True) | |
if __name__ == "__main__": | |
app.run(debug=True) |