|
import numpy as np |
|
import pandas as pd |
|
|
|
from utils.kpi_analysis_utils import ( |
|
analyze_lcg_utilization, |
|
combine_comments, |
|
create_daily_date, |
|
create_dfs_per_kpi, |
|
kpi_naming_cleaning, |
|
) |
|
from utils.utils_vars import get_physical_db |
|
|
|
lcg_comments_mapping = { |
|
"2": "No Congestion", |
|
"1": "No Congestion", |
|
"lcg1 exceeded threshold, lcg2 exceeded threshold, 2": "Need BB SU upgrage", |
|
"lcg1 exceeded threshold, 2": "Need LCG balancing", |
|
"lcg1 exceeded threshold, 1": "Need BB SU upgrage", |
|
"lcg2 exceeded threshold, 2": "Need LCG balancing", |
|
} |
|
|
|
|
|
KPI_COLUMNS = [ |
|
"date", |
|
"WBTS_name", |
|
"lcg_id", |
|
"BB_SU_LCG_MAX_R", |
|
] |
|
|
|
LCG_ANALYSIS_COLUMNS = [ |
|
"WBTS_name", |
|
"lcg1_utilisation", |
|
"avg_lcg1", |
|
"max_lcg1", |
|
"number_of_days_with_lcg1_exceeded", |
|
"lcg1_comment", |
|
"lcg2_utilisation", |
|
"avg_lcg2", |
|
"max_lcg2", |
|
"number_of_days_with_lcg2_exceeded", |
|
"lcg2_comment", |
|
"difference_between_lcgs", |
|
"difference_between_lcgs_comment", |
|
"lcg_comment", |
|
"number_of_lcg", |
|
"final_comments", |
|
] |
|
|
|
|
|
def lcg_kpi_analysis( |
|
df, |
|
num_last_days, |
|
num_threshold_days, |
|
lcg_utilization_threshold, |
|
difference_between_lcgs, |
|
) -> pd.DataFrame: |
|
""" |
|
Analyze LCG capacity data. |
|
|
|
Args: |
|
df: DataFrame containing LCG capacity data |
|
num_last_days: Number of days for analysis |
|
num_threshold_days: Minimum days above threshold to flag for upgrade |
|
lcg_utilization_threshold: Utilization threshold percentage for flagging |
|
difference_between_lcgs: Difference between LCGs for flagging |
|
|
|
Returns: |
|
Processed DataFrame with LCG capacity analysis results |
|
""" |
|
|
|
lcg1_df = df[df["lcg_id"] == 1] |
|
lcg2_df = df[df["lcg_id"] == 2] |
|
|
|
pivoted_kpi_dfs = create_dfs_per_kpi( |
|
df=df, |
|
pivot_date_column="date", |
|
pivot_name_column="WBTS_name", |
|
kpi_columns_from=2, |
|
) |
|
|
|
pivoted_lcg1_df = create_dfs_per_kpi( |
|
df=lcg1_df, |
|
pivot_date_column="date", |
|
pivot_name_column="WBTS_name", |
|
kpi_columns_from=2, |
|
) |
|
pivoted_lcg2_df = create_dfs_per_kpi( |
|
df=lcg2_df, |
|
pivot_date_column="date", |
|
pivot_name_column="WBTS_name", |
|
kpi_columns_from=2, |
|
) |
|
|
|
|
|
BB_SU_LCG_MAX_R_df = pivoted_kpi_dfs["BB_SU_LCG_MAX_R"] |
|
|
|
pivoted_lcg1_df = pivoted_lcg1_df["BB_SU_LCG_MAX_R"] |
|
pivoted_lcg2_df = pivoted_lcg2_df["BB_SU_LCG_MAX_R"] |
|
|
|
|
|
pivoted_lcg1_df = pivoted_lcg1_df.rename( |
|
columns={"BB_SU_LCG_MAX_R": "lcg1_utilisation"} |
|
) |
|
pivoted_lcg2_df = pivoted_lcg2_df.rename( |
|
columns={"BB_SU_LCG_MAX_R": "lcg2_utilisation"} |
|
) |
|
|
|
|
|
pivoted_lcg1_df = analyze_lcg_utilization( |
|
df=pivoted_lcg1_df, |
|
number_of_kpi_days=num_last_days, |
|
number_of_threshold_days=num_threshold_days, |
|
kpi_threshold=lcg_utilization_threshold, |
|
kpi_column_name="lcg1", |
|
) |
|
pivoted_lcg2_df = analyze_lcg_utilization( |
|
df=pivoted_lcg2_df, |
|
number_of_kpi_days=num_last_days, |
|
number_of_threshold_days=num_threshold_days, |
|
kpi_threshold=lcg_utilization_threshold, |
|
kpi_column_name="lcg2", |
|
) |
|
kpi_df = pd.concat( |
|
[ |
|
BB_SU_LCG_MAX_R_df, |
|
pivoted_lcg1_df, |
|
pivoted_lcg2_df, |
|
], |
|
axis=1, |
|
) |
|
|
|
kpi_df = kpi_df.reset_index() |
|
|
|
|
|
|
|
|
|
|
|
kpi_df["difference_between_lcgs"] = kpi_df[["avg_lcg1", "avg_lcg2"]].apply( |
|
lambda row: max(row) - min(row), axis=1 |
|
) |
|
|
|
|
|
kpi_df["difference_between_lcgs_comment"] = np.where( |
|
kpi_df["difference_between_lcgs"] > difference_between_lcgs, |
|
"difference between lcgs exceeded threshold", |
|
None, |
|
) |
|
|
|
|
|
kpi_df = combine_comments( |
|
kpi_df, |
|
"lcg1_comment", |
|
"lcg2_comment", |
|
|
|
new_column="lcg_comment", |
|
) |
|
|
|
|
|
kpi_df["lcg_comment"] = kpi_df["lcg_comment"].replace("nan", None) |
|
|
|
|
|
kpi_df["lcg_comment"] = ( |
|
kpi_df["lcg_comment"].str.replace(r"\bnan\b,?\s?", "", regex=True).str.strip() |
|
) |
|
|
|
kpi_df["number_of_lcg"] = np.where( |
|
kpi_df["avg_lcg1"].notna() & kpi_df["avg_lcg2"].notna(), |
|
2, |
|
np.where(kpi_df["avg_lcg1"].notna() | kpi_df["avg_lcg2"].notna(), 1, 0), |
|
) |
|
|
|
kpi_df = combine_comments( |
|
kpi_df, |
|
"lcg_comment", |
|
"number_of_lcg", |
|
new_column="final_comments", |
|
) |
|
kpi_df["final_comments"] = kpi_df["final_comments"].apply( |
|
lambda x: lcg_comments_mapping.get(x, x) |
|
) |
|
kpi_df = kpi_df[LCG_ANALYSIS_COLUMNS] |
|
|
|
lcg_analysis_df = kpi_df.copy() |
|
|
|
lcg_analysis_df = lcg_analysis_df[ |
|
[ |
|
"WBTS_name", |
|
"avg_lcg1", |
|
"max_lcg1", |
|
"number_of_days_with_lcg1_exceeded", |
|
"lcg1_comment", |
|
"avg_lcg2", |
|
"max_lcg2", |
|
"number_of_days_with_lcg2_exceeded", |
|
"lcg2_comment", |
|
"difference_between_lcgs", |
|
"final_comments", |
|
] |
|
] |
|
|
|
lcg_analysis_df = lcg_analysis_df.droplevel(level=1, axis=1) |
|
|
|
lcg_analysis_df = lcg_analysis_df[lcg_analysis_df["WBTS_name"].str.len() >= 5] |
|
|
|
|
|
lcg_analysis_df["code"] = lcg_analysis_df["WBTS_name"].str.split("_").str[0] |
|
|
|
lcg_analysis_df["code"] = ( |
|
pd.to_numeric(lcg_analysis_df["code"], errors="coerce").fillna(0).astype(int) |
|
) |
|
|
|
lcg_analysis_df["Region"] = ( |
|
lcg_analysis_df["WBTS_name"].str.split("_").str[1:2].str.join("_") |
|
) |
|
lcg_analysis_df["Region"] = lcg_analysis_df["Region"].fillna("UNKNOWN") |
|
|
|
|
|
lcg_analysis_df = lcg_analysis_df[ |
|
["code", "Region"] |
|
+ [col for col in lcg_analysis_df if col != "code" and col != "Region"] |
|
] |
|
|
|
|
|
physical_db: pd.DataFrame = get_physical_db() |
|
|
|
|
|
physical_db["code"] = physical_db["Code_Sector"].str.split("_").str[0] |
|
|
|
physical_db = physical_db.drop_duplicates(subset="code") |
|
|
|
|
|
physical_db = physical_db[["code", "Longitude", "Latitude"]] |
|
|
|
physical_db["code"] = ( |
|
pd.to_numeric(physical_db["code"], errors="coerce").fillna(0).astype(int) |
|
) |
|
|
|
lcg_analysis_df = pd.merge( |
|
lcg_analysis_df, |
|
physical_db, |
|
on="code", |
|
how="left", |
|
) |
|
|
|
return [lcg_analysis_df, kpi_df] |
|
|
|
|
|
def load_and_process_lcg_data( |
|
uploaded_file, |
|
num_last_days, |
|
num_threshold_days, |
|
lcg_utilization_threshold, |
|
difference_between_lcgs, |
|
) -> pd.DataFrame: |
|
"""Load and process data for LCG capacity analysis.""" |
|
try: |
|
|
|
df = pd.read_csv(uploaded_file, delimiter=";") |
|
if df.empty: |
|
raise ValueError("Uploaded file is empty") |
|
|
|
df = kpi_naming_cleaning(df) |
|
df = create_daily_date(df) |
|
|
|
|
|
missing_cols = [col for col in KPI_COLUMNS if col not in df.columns] |
|
if missing_cols: |
|
raise ValueError(f"Missing required columns: {', '.join(missing_cols)}") |
|
|
|
df = df[KPI_COLUMNS] |
|
|
|
|
|
dfs = lcg_kpi_analysis( |
|
df, |
|
num_last_days, |
|
num_threshold_days, |
|
lcg_utilization_threshold, |
|
difference_between_lcgs, |
|
) |
|
return dfs |
|
|
|
except Exception as e: |
|
|
|
error_msg = f"Error processing LCG data: {str(e)}" |
|
st.error(error_msg) |
|
raise |
|
|