db_query / apps /kpi_analysis /gsm_lac_load.py
DavMelchi's picture
Adding gsm paging analysis
0898fcc
from typing import List, Tuple
import pandas as pd
import plotly.express as px
import streamlit as st
from queries.process_gsm import combined_gsm_database
from utils.convert_to_excel import convert_gsm_dfs, save_dataframe
from utils.kpi_analysis_utils import create_hourly_date, kpi_naming_cleaning
# Constants
GSM_COLUMNS = [
"ID_BTS",
"BSC",
"code",
"Region",
"locationAreaIdLAC",
"Longitude",
"Latitude",
]
TRX_COLUMNS = [
"ID_BTS",
"number_trx_per_cell",
"number_tch_per_cell",
"number_sd_per_cell",
"number_bcch_per_cell",
"number_ccch_per_cell",
"number_cbc_per_cell",
"number_total_channels_per_cell",
"number_signals_per_cell",
]
KPI_COLUMNS = [
"BSC_name",
"BCF_name",
"BTS_name",
"Paging_messages_on_air_interface",
"DELETE_PAGING_COMMAND_c003038",
"datetime",
"date",
"hour",
]
def get_gsm_databases(dump_path: str) -> pd.DataFrame:
"""
Process GSM database dump and return combined DataFrame with BTS and TRX data.
Args:
dump_path: Path to the GSM dump file
Returns:
pd.DataFrame: Combined DataFrame with BTS and TRX information
"""
dfs = combined_gsm_database(dump_path)
bts_df: pd.DataFrame = dfs[0]
trx_df: pd.DataFrame = dfs[2]
# Clean GSM df
bts_df = bts_df[GSM_COLUMNS]
trx_df = trx_df[TRX_COLUMNS]
trx_df = trx_df.drop_duplicates(subset=["ID_BTS"])
gsm_df = pd.merge(bts_df, trx_df, on="ID_BTS", how="left")
# Create BSC_Lac column
gsm_df["BSC_Lac"] = (
gsm_df["BSC"].astype(str) + "_" + gsm_df["locationAreaIdLAC"].astype(str)
)
# Calculate number of TRX per LAC
gsm_df["number_trx_per_lac"] = gsm_df.groupby("BSC_Lac")[
"number_trx_per_cell"
].transform("sum")
return gsm_df
def analyze_lac_load_kpi(hourly_report_path: str) -> pd.DataFrame:
"""
Process hourly KPI report and prepare it for LAC load analysis.
Args:
hourly_report_path: Path to the hourly KPI report CSV file
Returns:
pd.DataFrame: Processed DataFrame with KPI data
"""
df = pd.read_csv(hourly_report_path, delimiter=";")
df = kpi_naming_cleaning(df)
df = create_hourly_date(df)
df = df[KPI_COLUMNS]
# Clean and process BTS codes
df = df[df["BTS_name"].str.len() >= 5]
df["code"] = df["BTS_name"].str.split("_").str[0]
df["code"] = pd.to_numeric(df["code"], errors="coerce").fillna(0).astype(int)
return df
def analyze_lac_load(dump_path: str, hourly_report_path: str) -> List[pd.DataFrame]:
"""
Analyze LAC load from GSM dump and hourly KPI report.
Args:
dump_path: Path to the GSM dump file
hourly_report_path: Path to the hourly KPI report CSV file
Returns:
List containing two DataFrames: [lac_load_df, max_paging_df]
"""
gsm_df = get_gsm_databases(dump_path)
lac_load_df = analyze_lac_load_kpi(hourly_report_path)
lac_load_df = pd.merge(gsm_df, lac_load_df, on="code", how="left")
# Aggregate data
lac_load_df = (
lac_load_df.groupby(
[
"datetime",
"date",
"hour",
"BSC_name",
"BSC_Lac",
"number_trx_per_lac",
]
)
.agg(
{
"Paging_messages_on_air_interface": "max",
"DELETE_PAGING_COMMAND_c003038": "max",
}
)
.reset_index()
)
# Get max paging messages
max_paging_messages = lac_load_df.sort_values(
by=["BSC_Lac", "Paging_messages_on_air_interface"], ascending=False
).drop_duplicates(subset=["BSC_Lac"], keep="first")[
[
"BSC_name",
"BSC_Lac",
"number_trx_per_lac",
"Paging_messages_on_air_interface",
]
]
# Get max delete paging commands
max_delete_paging = lac_load_df.sort_values(
by=["BSC_Lac", "DELETE_PAGING_COMMAND_c003038"], ascending=False
).drop_duplicates(subset=["BSC_Lac"], keep="first")[
["BSC_name", "BSC_Lac", "DELETE_PAGING_COMMAND_c003038"]
]
# Merge results
max_paging_df = pd.merge(
max_paging_messages,
max_delete_paging,
on=["BSC_name", "BSC_Lac"],
how="left",
)
# Calculate utilization (paging/640800)
max_paging_df["Utilization"] = (
(max_paging_df["Paging_messages_on_air_interface"] / 640800) * 100
).round(2)
return [lac_load_df, max_paging_df]
def display_ui() -> None:
"""Display the Streamlit user interface."""
st.title(" 📊 GSM LAC Load Analysis")
doc_col, image_col = st.columns(2)
with doc_col:
st.write(
"""
The report should be run with a minimum of 7 days of data.
- Dump file required
- Hourly Report in CSV format
"""
)
with image_col:
st.image("./assets/gsm_lac_load.png", width=250)
@st.fragment
def display_filtered_lac_load(lac_load_df: pd.DataFrame) -> None:
"""
Display filtered LAC load data with interactive charts.
Args:
lac_load_df: DataFrame containing LAC load data
"""
st.write("### Filtered LAC Load by BSC and BSC_Lac")
bsc_col, bsc_lac_col = st.columns(2)
with bsc_col:
selected_bsc = st.multiselect(
"Select BSC",
lac_load_df["BSC_name"].unique(),
key="selected_bsc",
default=[lac_load_df["BSC_name"].unique()[0]],
)
with bsc_lac_col:
selected_bsc_lac = st.multiselect(
"Select BSC_Lac",
lac_load_df[lac_load_df["BSC_name"].isin(selected_bsc)]["BSC_Lac"].unique(),
key="selected_bsc_lac",
default=lac_load_df[lac_load_df["BSC_name"].isin(selected_bsc)][
"BSC_Lac"
].unique(),
)
filtered_lac_load_df = lac_load_df[
lac_load_df["BSC_name"].isin(selected_bsc)
& lac_load_df["BSC_Lac"].isin(selected_bsc_lac)
]
# Display charts
chart1, chart2 = st.columns(2)
with chart1:
st.write("### Paging Messages on Air Interface")
fig1 = px.line(
filtered_lac_load_df,
x="datetime",
y="Paging_messages_on_air_interface",
color="BSC_Lac",
title="Max Paging Messages on Air Interface Per BSC_Lac",
)
fig1.update_layout(
xaxis_title="<b>Datetime</b>",
yaxis_title="<b>Paging Messages on Air Interface</b>",
)
fig1.add_hline(y=256000, line_color="red", line_dash="dash", line_width=2)
st.plotly_chart(fig1)
with chart2:
st.write("### Delete Paging Commands")
fig2 = px.line(
filtered_lac_load_df,
x="datetime",
y="DELETE_PAGING_COMMAND_c003038",
color="BSC_Lac",
title="Max Delete Paging Commands Per BSC_Lac",
)
fig2.update_layout(
xaxis_title="<b>Datetime</b>",
yaxis_title="<b>Delete Paging Commands</b>",
)
st.plotly_chart(fig2)
st.write("### Filtered LAC Load Data")
st.dataframe(filtered_lac_load_df)
def main() -> None:
"""Main function to run the Streamlit app."""
display_ui()
# File uploaders
file1, file2 = st.columns(2)
with file1:
uploaded_dump = st.file_uploader("Upload Dump file in xlsb format", type="xlsb")
with file2:
uploaded_hourly_report = st.file_uploader(
"Upload Hourly Report in CSV format", type="csv"
)
if uploaded_dump is not None and uploaded_hourly_report is not None:
if st.button("Analyze Data", type="primary"):
with st.spinner("Analyzing data..."):
dfs = analyze_lac_load(
dump_path=uploaded_dump,
hourly_report_path=uploaded_hourly_report,
)
lac_load_df = dfs[0]
max_paging_df = dfs[1]
if lac_load_df is not None and "lac_load_df" not in st.session_state:
st.session_state.lac_load_df = lac_load_df
st.write("### LAC Load and Utilization with Max Paging 640800")
st.dataframe(max_paging_df)
display_filtered_lac_load(lac_load_df)
if __name__ == "__main__":
main()