db_query / apps /dump_compare.py
DavMelchi's picture
Update dump compare tools- remove tempfile utilization
962e6f5
import os
import zipfile
from io import BytesIO
import pandas as pd
import streamlit as st
# === Fonctions ===
def find_header_row(df, keyword="Dist_Name"):
for i in range(min(20, len(df))):
row = df.iloc[i].astype(str).str.strip().str.lower()
if any(keyword.lower() in str(cell) for cell in row):
return i
raise ValueError(f"No row with '{keyword}' found.")
def read_sheet_fallback(file_bytes, sheet):
file_bytes.seek(0)
return pd.read_excel(file_bytes, sheet_name=sheet, header=None, engine="calamine")
def load_clean_df(file_bytes, sheet):
df_raw = read_sheet_fallback(file_bytes, sheet)
header_row = find_header_row(df_raw)
df_raw.columns = df_raw.iloc[header_row]
df = df_raw.drop(index=list(range(header_row + 1)))
df.columns = [str(c).strip().replace("\xa0", " ") for c in df.columns]
df = df.astype(str).apply(lambda col: col.str.strip())
return df
def detect_dist_col(columns):
for col in columns:
if "dist" in col.lower() and "name" in col.lower():
return col
raise ValueError("Dist_Name column not found.")
# === Interface Streamlit ===
st.title("πŸ“Š Dump Compare Tool")
st.markdown(
":blue[**Upload the old and new dumps, then input the object class (comma-separated) to compare**]"
)
old_file = st.file_uploader("Upload Old Dump (.xlsb)", type=["xlsb"], key="old")
new_file = st.file_uploader("Upload New Dump (.xlsb)", type=["xlsb"], key="new")
sheet_list_input = st.text_input(
"Enter object class (comma-separated)", placeholder="e.g. BCF, BTS, CELL"
)
if st.button("Run Comparison", type="primary", use_container_width=True):
if not all([old_file, new_file, sheet_list_input.strip()]):
st.warning("Please upload both files and provide at least one sheet name.")
else:
sheet_names = [s.strip() for s in sheet_list_input.split(",") if s.strip()]
old_bytes = BytesIO(old_file.read())
new_bytes = BytesIO(new_file.read())
logs = []
total = 0
all_results = {}
for sheet in sheet_names:
try:
df_old = load_clean_df(old_bytes, sheet)
old_bytes.seek(0)
df_new = load_clean_df(new_bytes, sheet)
new_bytes.seek(0)
dist_col_old = detect_dist_col(df_old.columns)
dist_col_new = detect_dist_col(df_new.columns)
df_old = df_old[df_old[dist_col_old].notna()].set_index(dist_col_old)
df_new = df_new[df_new[dist_col_new].notna()].set_index(dist_col_new)
common = df_old.index.intersection(df_new.index)
df_old_common = df_old.loc[common]
df_new_common = df_new.loc[common]
mask = (df_old_common != df_new_common) & ~(
df_old_common.isna() & df_new_common.isna()
)
changes = []
for dist in mask.index:
for param in mask.columns[mask.loc[dist]]:
if param.strip().lower() == "file_name":
continue
changes.append(
{
"Dist_Name": dist,
"Parameter": param,
os.path.basename(old_file.name): df_old_common.loc[
dist, param
],
os.path.basename(new_file.name): df_new_common.loc[
dist, param
],
}
)
df_changes = pd.DataFrame(changes)
if not df_changes.empty:
all_results[sheet] = df_changes
logs.append(f"{len(df_changes)} changes in '{sheet}'")
total += len(df_changes)
else:
logs.append(f"No changes in '{sheet}'")
except Exception as e:
logs.append(f"❌ Error in '{sheet}': {e}")
st.success(f"βœ… Comparison completed. Total changes: {total}")
for log in logs:
st.write(log)
if all_results:
output_buffer = BytesIO()
with zipfile.ZipFile(output_buffer, mode="w") as zf:
for sheet, df in all_results.items():
file_buffer = BytesIO()
df.to_excel(file_buffer, index=False)
zf.writestr(f"{sheet}_differences.xlsx", file_buffer.getvalue())
st.download_button(
"Download Results (.zip)",
data=output_buffer.getvalue(),
file_name="differences.zip",
mime="application/zip",
type="primary",
on_click="ignore",
)