from io import BytesIO import pandas as pd import plotly.express as px import streamlit as st from utils.utils_vars import get_physical_db st.title("LTE Cell Traffic Drop Detection") doc_col, image_col = st.columns(2) with doc_col: st.write( """ This App allow you to detect cells with significant traffic drop in LTE Network. - Upload traffic CSV file - Select number of last days for drop analysis - Select loss percentage threshold """ ) with image_col: st.image("./assets/traffic_drop.png", width=250) uploaded_file = st.file_uploader("Upload traffic CSV file", type=["csv"]) if uploaded_file: df = pd.read_csv(uploaded_file, sep=";") df["PERIOD_START_TIME"] = pd.to_datetime(df["PERIOD_START_TIME"], format="%m.%d.%Y") df.sort_values("PERIOD_START_TIME", inplace=True) df["Total_Traffic"] = ( df["4G/LTE DL Traffic Volume (GBytes)"] + df["4G/LTE UL Traffic Volume (GBytes)"] ) unique_dates = sorted(df["PERIOD_START_TIME"].unique()) last_n_days = st.slider( "Select number of last days for drop analysis", 1, min(10, len(unique_dates) - 1), 2, ) treshold_percent = st.slider("Loss percentage threshold", 10, 100, 50) last_days = unique_dates[-last_n_days:] long_term_days = unique_dates[:-last_n_days] last_df = df[df["PERIOD_START_TIME"].isin(last_days)] long_term_df = df[df["PERIOD_START_TIME"].isin(long_term_days)] avg_last = last_df.groupby("LNCEL name")["Total_Traffic"].mean() avg_long = long_term_df.groupby("LNCEL name")["Total_Traffic"].mean() result = pd.DataFrame( {"avg_long_term": avg_long, "avg_last_days": avg_last} ).dropna() result["drop_%"] = ( (result["avg_long_term"] - result["avg_last_days"]) / result["avg_long_term"] * 100 ) result = result[result["drop_%"] >= treshold_percent] result = result.reset_index() st.subheader("Cells with Significant Traffic Drop") st.dataframe(result) def convert_df(df: pd.DataFrame) -> bytes: output = BytesIO() df.to_excel(output, index=False) processed_data = output.getvalue() return processed_data if not result.empty: st.download_button( label="📥 Download affected cells", data=convert_df(result), file_name="traffic_drop_cells.xlsx", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", type="primary", ) @st.fragment def trend_plot(): st.subheader("Traffic Trend Plot") default_cell = result["LNCEL name"].iloc[0] selected_cell = st.selectbox( "Select cell to plot", result["LNCEL name"].unique(), index=result["LNCEL name"].unique().tolist().index(default_cell), ) if selected_cell: trend_df = df[df["LNCEL name"].eq(selected_cell)] fig = px.line( trend_df, x="PERIOD_START_TIME", y="Total_Traffic", title="Traffic Trends", markers=True, height=700, ) if selected_cell in avg_long: fig.add_shape( type="line", x0=trend_df["PERIOD_START_TIME"].min(), x1=trend_df["PERIOD_START_TIME"].max(), y0=avg_long[selected_cell], y1=avg_long[selected_cell], line=dict(color="blue", dash="dot"), name=f"{selected_cell} Long Term Avg", ) if last_days: start_date = pd.to_datetime(str(last_days[0])) fig.add_shape( type="line", x0=start_date, x1=start_date, y0=0, y1=trend_df["Total_Traffic"].max(), line=dict(color="red", dash="dash"), name="Start of Last Days", ) st.plotly_chart(fig, use_container_width=True) trend_plot() st.subheader("Map of Affected Cells (Bubble Size = Drop %)") result_map = result.copy() physical_db = get_physical_db() # Add code column to physical_db element before _ physical_db["code"] = physical_db["Code_Sector"].str.split("_").str[0] # add code column to result_map result_map["code"] = result_map["LNCEL name"].str.split("_").str[0] result_map = pd.merge(result_map, physical_db, on="code", how="left") result_map["Latitude"] = result_map["Latitude"] result_map["Longitude"] = result_map["Longitude"] fig_map = px.scatter_map( result_map, lat="Latitude", lon="Longitude", size="drop_%", color=result_map["drop_%"], color_continuous_scale="reds", hover_name="LNCEL name", zoom=6, height=600, title="Dropped Cells Map", map_style="satellite-streets", ) st.plotly_chart(fig_map, use_container_width=True)