from io import BytesIO import numpy as np import pandas as pd import plotly.graph_objs as go import ruptures as rpt import streamlit as st st.title("KPIsAnomaly Detection") uploaded_file = st.file_uploader("Upload KPI file", type=["csv", "xlsx"]) penalty = st.number_input("Penalty", min_value=1.0, max_value=100.0, value=2.5) @st.cache_data(show_spinner="Analyzing anomalies...") def detect_anomalies(df: pd.DataFrame, penalty: int): # Cleaning df = df.rename( columns={ df.columns[0]: "date", df.columns[1]: "ctrl", df.columns[2]: "bts", df.columns[3]: "cell", df.columns[4]: "DN", } ) df["date"] = pd.to_datetime(df["date"], errors="coerce") df = df.dropna(subset=["date", "cell"]) non_kpi_columns = ["date", "cell", "bts", "ctrl", "DN"] kpi_columns = [col for col in df.columns if col not in non_kpi_columns] anomaly_dict = {} anomaly_data = {} def detect_change_points(series, model="rbf", penalty=penalty): algo = rpt.Pelt(model=model).fit(series) result = algo.predict(pen=penalty) return result[:-1] def process_kpi_cell(df_cell, kpi, cell): df_kpi = ( df_cell[["date", kpi]].dropna().sort_values("date").reset_index(drop=True) ) if len(df_kpi) < 30: return None series = df_kpi[kpi].values.copy() # if len(series) > 0: # series[-1] *= 10 # Boost last value to give it more weight try: change_indices = detect_change_points(series) if not change_indices: return None df_kpi["change_point"] = False for idx in change_indices: if 0 <= idx < len(df_kpi): df_kpi.loc[idx, "change_point"] = True df_kpi["cell"] = cell change_indices = [0] + change_indices + [len(df_kpi)] segments = [ series[change_indices[i] : change_indices[i + 1]] for i in range(len(change_indices) - 1) ] if len(segments) < 2: return None initial_mean = np.mean(segments[0]) final_mean = np.mean(segments[-1]) if abs(final_mean - initial_mean) > 0.1 * abs(initial_mean): # Attach full history, not just final segment df_kpi["initial_mean"] = initial_mean df_kpi["final_mean"] = final_mean return df_kpi else: return None except Exception as e: print(f"Error {cell}-{kpi}: {e}") return None for kpi in kpi_columns: anomalies = [] for cell, group in df.groupby("cell"): result = process_kpi_cell(group, kpi, cell) if result is not None: anomalies.append(cell) anomaly_data[(kpi, cell)] = result if anomalies: anomaly_dict[kpi] = anomalies return anomaly_dict, anomaly_data, kpi_columns if uploaded_file: # Detect if file is csv or excel if uploaded_file.name.endswith(".csv"): df = pd.read_csv(uploaded_file, delimiter=";") else: df = pd.read_excel(uploaded_file, sheet_name=0, skiprows=[1]) anomaly_dict, anomaly_data, all_kpis = detect_anomalies(df, penalty) if not anomaly_dict: st.info("No anomalies detected.") else: st.success(f"{len(anomaly_dict)} KPI(s) have un-recovered anomalies detected.") @st.fragment def selection_and_plot(): selected_kpi = st.selectbox("KPI with anomalies", list(anomaly_dict.keys())) selected_cell = st.selectbox("Affected cell", anomaly_dict[selected_kpi]) df_plot = anomaly_data[(selected_kpi, selected_cell)] fig = go.Figure() fig.add_trace( go.Scatter( x=df_plot["date"], y=df_plot[selected_kpi], mode="lines+markers", name="KPI", ) ) fig.add_trace( go.Scatter( x=df_plot[df_plot["change_point"]]["date"], y=df_plot[df_plot["change_point"]][selected_kpi], mode="markers", name="Change Point", marker=dict(color="red", size=10), ) ) fig.add_hline( y=df_plot["initial_mean"].iloc[0], line_dash="dot", line_color="gray", annotation_text="Initial Mean", ) fig.add_hline( y=df_plot["final_mean"].iloc[0], line_dash="dash", line_color="black", annotation_text="Final Mean", ) fig.update_layout( title=f"{selected_kpi} - {selected_cell}", xaxis_title="Date", yaxis_title=selected_kpi, ) st.plotly_chart(fig, use_container_width=True) @st.fragment def export_button(): if st.button("Generate Excel file with anomalies"): buffer = BytesIO() with pd.ExcelWriter(buffer, engine="openpyxl") as writer: for kpi, cells in anomaly_dict.items(): results = [anomaly_data[(kpi, c)] for c in cells] df_final = pd.concat(results) df_final.to_excel(writer, sheet_name=kpi[:31], index=False) st.download_button( label="Download Excel file", data=buffer.getvalue(), file_name="anomalies_kpi_2G.xlsx", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", type="primary", ) selection_and_plot() export_button()