|
from io import BytesIO |
|
|
|
import numpy as np |
|
import pandas as pd |
|
import plotly.graph_objs as go |
|
import ruptures as rpt |
|
import streamlit as st |
|
|
|
st.title("KPIsAnomaly Detection") |
|
|
|
uploaded_file = st.file_uploader("Upload KPI file", type=["csv", "xlsx"]) |
|
penalty = st.number_input("Penalty", min_value=1.0, max_value=100.0, value=2.5) |
|
|
|
|
|
@st.cache_data(show_spinner="Analyzing anomalies...") |
|
def detect_anomalies(df: pd.DataFrame, penalty: int): |
|
|
|
df = df.rename( |
|
columns={ |
|
df.columns[0]: "date", |
|
df.columns[1]: "ctrl", |
|
df.columns[2]: "bts", |
|
df.columns[3]: "cell", |
|
df.columns[4]: "DN", |
|
} |
|
) |
|
df["date"] = pd.to_datetime(df["date"], errors="coerce") |
|
df = df.dropna(subset=["date", "cell"]) |
|
|
|
non_kpi_columns = ["date", "cell", "bts", "ctrl", "DN"] |
|
kpi_columns = [col for col in df.columns if col not in non_kpi_columns] |
|
|
|
anomaly_dict = {} |
|
anomaly_data = {} |
|
|
|
def detect_change_points(series, model="rbf", penalty=penalty): |
|
algo = rpt.Pelt(model=model).fit(series) |
|
result = algo.predict(pen=penalty) |
|
return result[:-1] |
|
|
|
def process_kpi_cell(df_cell, kpi, cell): |
|
df_kpi = ( |
|
df_cell[["date", kpi]].dropna().sort_values("date").reset_index(drop=True) |
|
) |
|
if len(df_kpi) < 30: |
|
return None |
|
|
|
series = df_kpi[kpi].values.copy() |
|
|
|
|
|
try: |
|
change_indices = detect_change_points(series) |
|
if not change_indices: |
|
return None |
|
|
|
df_kpi["change_point"] = False |
|
for idx in change_indices: |
|
if 0 <= idx < len(df_kpi): |
|
df_kpi.loc[idx, "change_point"] = True |
|
|
|
df_kpi["cell"] = cell |
|
|
|
change_indices = [0] + change_indices + [len(df_kpi)] |
|
segments = [ |
|
series[change_indices[i] : change_indices[i + 1]] |
|
for i in range(len(change_indices) - 1) |
|
] |
|
|
|
if len(segments) < 2: |
|
return None |
|
|
|
initial_mean = np.mean(segments[0]) |
|
final_mean = np.mean(segments[-1]) |
|
|
|
if abs(final_mean - initial_mean) > 0.1 * abs(initial_mean): |
|
|
|
df_kpi["initial_mean"] = initial_mean |
|
df_kpi["final_mean"] = final_mean |
|
return df_kpi |
|
else: |
|
return None |
|
except Exception as e: |
|
print(f"Error {cell}-{kpi}: {e}") |
|
return None |
|
|
|
for kpi in kpi_columns: |
|
anomalies = [] |
|
for cell, group in df.groupby("cell"): |
|
result = process_kpi_cell(group, kpi, cell) |
|
if result is not None: |
|
anomalies.append(cell) |
|
anomaly_data[(kpi, cell)] = result |
|
if anomalies: |
|
anomaly_dict[kpi] = anomalies |
|
|
|
return anomaly_dict, anomaly_data, kpi_columns |
|
|
|
|
|
if uploaded_file: |
|
|
|
|
|
if uploaded_file.name.endswith(".csv"): |
|
df = pd.read_csv(uploaded_file, delimiter=";") |
|
else: |
|
df = pd.read_excel(uploaded_file, sheet_name=0, skiprows=[1]) |
|
|
|
anomaly_dict, anomaly_data, all_kpis = detect_anomalies(df, penalty) |
|
|
|
if not anomaly_dict: |
|
st.info("No anomalies detected.") |
|
else: |
|
st.success(f"{len(anomaly_dict)} KPI(s) have un-recovered anomalies detected.") |
|
|
|
@st.fragment |
|
def selection_and_plot(): |
|
selected_kpi = st.selectbox("KPI with anomalies", list(anomaly_dict.keys())) |
|
selected_cell = st.selectbox("Affected cell", anomaly_dict[selected_kpi]) |
|
df_plot = anomaly_data[(selected_kpi, selected_cell)] |
|
|
|
fig = go.Figure() |
|
fig.add_trace( |
|
go.Scatter( |
|
x=df_plot["date"], |
|
y=df_plot[selected_kpi], |
|
mode="lines+markers", |
|
name="KPI", |
|
) |
|
) |
|
fig.add_trace( |
|
go.Scatter( |
|
x=df_plot[df_plot["change_point"]]["date"], |
|
y=df_plot[df_plot["change_point"]][selected_kpi], |
|
mode="markers", |
|
name="Change Point", |
|
marker=dict(color="red", size=10), |
|
) |
|
) |
|
fig.add_hline( |
|
y=df_plot["initial_mean"].iloc[0], |
|
line_dash="dot", |
|
line_color="gray", |
|
annotation_text="Initial Mean", |
|
) |
|
fig.add_hline( |
|
y=df_plot["final_mean"].iloc[0], |
|
line_dash="dash", |
|
line_color="black", |
|
annotation_text="Final Mean", |
|
) |
|
fig.update_layout( |
|
title=f"{selected_kpi} - {selected_cell}", |
|
xaxis_title="Date", |
|
yaxis_title=selected_kpi, |
|
) |
|
st.plotly_chart(fig, use_container_width=True) |
|
|
|
@st.fragment |
|
def export_button(): |
|
if st.button("Generate Excel file with anomalies"): |
|
buffer = BytesIO() |
|
with pd.ExcelWriter(buffer, engine="openpyxl") as writer: |
|
for kpi, cells in anomaly_dict.items(): |
|
results = [anomaly_data[(kpi, c)] for c in cells] |
|
df_final = pd.concat(results) |
|
df_final.to_excel(writer, sheet_name=kpi[:31], index=False) |
|
st.download_button( |
|
label="Download Excel file", |
|
data=buffer.getvalue(), |
|
file_name="anomalies_kpi_2G.xlsx", |
|
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", |
|
type="primary", |
|
) |
|
|
|
selection_and_plot() |
|
export_button() |
|
|