|
import os |
|
import zipfile |
|
from io import BytesIO |
|
|
|
import pandas as pd |
|
import streamlit as st |
|
|
|
|
|
|
|
|
|
def find_header_row(df, keyword="Dist_Name"): |
|
for i in range(min(20, len(df))): |
|
row = df.iloc[i].astype(str).str.strip().str.lower() |
|
if any(keyword.lower() in str(cell) for cell in row): |
|
return i |
|
raise ValueError(f"No row with '{keyword}' found.") |
|
|
|
|
|
def read_sheet_fallback(file_bytes, sheet): |
|
file_bytes.seek(0) |
|
return pd.read_excel(file_bytes, sheet_name=sheet, header=None, engine="calamine") |
|
|
|
|
|
def load_clean_df(file_bytes, sheet): |
|
df_raw = read_sheet_fallback(file_bytes, sheet) |
|
header_row = find_header_row(df_raw) |
|
df_raw.columns = df_raw.iloc[header_row] |
|
df = df_raw.drop(index=list(range(header_row + 1))) |
|
df.columns = [str(c).strip().replace("\xa0", " ") for c in df.columns] |
|
df = df.astype(str).apply(lambda col: col.str.strip()) |
|
return df |
|
|
|
|
|
def detect_dist_col(columns): |
|
for col in columns: |
|
if "dist" in col.lower() and "name" in col.lower(): |
|
return col |
|
raise ValueError("Dist_Name column not found.") |
|
|
|
|
|
|
|
|
|
st.title("π Dump Compare Tool") |
|
st.markdown( |
|
":blue[**Upload the old and new dumps, then input the object class (comma-separated) to compare**]" |
|
) |
|
|
|
old_file = st.file_uploader("Upload Old Dump (.xlsb)", type=["xlsb"], key="old") |
|
new_file = st.file_uploader("Upload New Dump (.xlsb)", type=["xlsb"], key="new") |
|
|
|
sheet_list_input = st.text_input( |
|
"Enter object class (comma-separated)", placeholder="e.g. BCF, BTS, CELL" |
|
) |
|
|
|
if st.button("Run Comparison", type="primary", use_container_width=True): |
|
if not all([old_file, new_file, sheet_list_input.strip()]): |
|
st.warning("Please upload both files and provide at least one sheet name.") |
|
else: |
|
sheet_names = [s.strip() for s in sheet_list_input.split(",") if s.strip()] |
|
old_bytes = BytesIO(old_file.read()) |
|
new_bytes = BytesIO(new_file.read()) |
|
|
|
logs = [] |
|
total = 0 |
|
all_results = {} |
|
|
|
for sheet in sheet_names: |
|
try: |
|
df_old = load_clean_df(old_bytes, sheet) |
|
old_bytes.seek(0) |
|
df_new = load_clean_df(new_bytes, sheet) |
|
new_bytes.seek(0) |
|
|
|
dist_col_old = detect_dist_col(df_old.columns) |
|
dist_col_new = detect_dist_col(df_new.columns) |
|
|
|
df_old = df_old[df_old[dist_col_old].notna()].set_index(dist_col_old) |
|
df_new = df_new[df_new[dist_col_new].notna()].set_index(dist_col_new) |
|
|
|
common = df_old.index.intersection(df_new.index) |
|
df_old_common = df_old.loc[common] |
|
df_new_common = df_new.loc[common] |
|
|
|
mask = (df_old_common != df_new_common) & ~( |
|
df_old_common.isna() & df_new_common.isna() |
|
) |
|
|
|
changes = [] |
|
for dist in mask.index: |
|
for param in mask.columns[mask.loc[dist]]: |
|
if param.strip().lower() == "file_name": |
|
continue |
|
changes.append( |
|
{ |
|
"Dist_Name": dist, |
|
"Parameter": param, |
|
os.path.basename(old_file.name): df_old_common.loc[ |
|
dist, param |
|
], |
|
os.path.basename(new_file.name): df_new_common.loc[ |
|
dist, param |
|
], |
|
} |
|
) |
|
|
|
df_changes = pd.DataFrame(changes) |
|
if not df_changes.empty: |
|
all_results[sheet] = df_changes |
|
logs.append(f"{len(df_changes)} changes in '{sheet}'") |
|
total += len(df_changes) |
|
else: |
|
logs.append(f"No changes in '{sheet}'") |
|
|
|
except Exception as e: |
|
logs.append(f"β Error in '{sheet}': {e}") |
|
|
|
st.success(f"β
Comparison completed. Total changes: {total}") |
|
for log in logs: |
|
st.write(log) |
|
|
|
if all_results: |
|
output_buffer = BytesIO() |
|
with zipfile.ZipFile(output_buffer, mode="w") as zf: |
|
for sheet, df in all_results.items(): |
|
file_buffer = BytesIO() |
|
df.to_excel(file_buffer, index=False) |
|
zf.writestr(f"{sheet}_differences.xlsx", file_buffer.getvalue()) |
|
|
|
st.download_button( |
|
"Download Results (.zip)", |
|
data=output_buffer.getvalue(), |
|
file_name="differences.zip", |
|
mime="application/zip", |
|
type="primary", |
|
on_click="ignore", |
|
) |
|
|