import numpy as np import pandas as pd from queries.process_lte import process_lte_data from utils.convert_to_excel import save_dataframe from utils.kpi_analysis_utils import ( LteCapacity, analyze_prb_usage, cell_availability_analysis, create_dfs_per_kpi, create_hourly_date, kpi_naming_cleaning, ) LTE_ANALYSIS_COLUMNS = [ "code", "code_sector", "Region", "site_config_band", "Longitude", "Latitude", "LNCEL_name_l800", "LNCEL_name_l1800", "LNCEL_name_l2300", "LNCEL_name_l2600", "LNCEL_name_l1800s", "avg_prb_usage_bh_l800", "avg_prb_usage_bh_l1800", "avg_prb_usage_bh_l2300", "avg_prb_usage_bh_l2600", "avg_prb_usage_bh_l1800s", "avg_prb_usage_bh_l800_2nd", "avg_prb_usage_bh_l1800_2nd", "avg_prb_usage_bh_l2300_2nd", "avg_prb_usage_bh_l2600_2nd", "avg_prb_usage_bh_l1800s_2nd", "avg_act_ues_l800", "avg_act_ues_l1800", "avg_act_ues_l2300", "avg_act_ues_l2600", "avg_act_ues_l1800s", "avg_dl_thp_l800", "avg_dl_thp_l1800", "avg_dl_thp_l2300", "avg_dl_thp_l2600", "avg_dl_thp_l1800s", "avg_ul_thp_l800", "avg_ul_thp_l1800", "avg_ul_thp_l2300", "avg_ul_thp_l2600", "avg_ul_thp_l1800s", "num_congested_cells", "num_cells", "num_cell_with_kpi", "num_down_or_no_kpi_cells", "prb_diff_between_cells", "load_balance_required", "congestion_comment", "final_comments", ] LTE_DATABASE_COLUMNS = [ "code", "Region", "site_config_band", "final_name", "Longitude", "Latitude", ] KPI_COLUMNS = [ "date", "LNCEL_name", "Cell_Avail_excl_BLU", "E_UTRAN_Avg_PRB_usage_per_TTI_DL", "DL_PRB_Util_p_TTI_Lev_10", "Avg_PDCP_cell_thp_UL", "Avg_PDCP_cell_thp_DL", "Avg_act_UEs_DL", ] PRB_COLUMNS = [ "LNCEL_name", "avg_prb_usage_bh", "avg_prb_usage_bh_2nd", "avg_act_ues", "avg_dl_thp", "avg_ul_thp", ] def lte_analysis_logic( df: pd.DataFrame, prb_usage_threshold: int, prb_diff_between_cells_threshold: int, ) -> pd.DataFrame: lte_analysis_logic_df = df.copy() lte_analysis_logic_df["num_congested_cells"] = ( lte_analysis_logic_df[ [ "avg_prb_usage_bh_l800", "avg_prb_usage_bh_l1800", "avg_prb_usage_bh_l2300", "avg_prb_usage_bh_l2600", "avg_prb_usage_bh_l1800s", ] ] >= prb_usage_threshold ).sum(axis=1) # Add Number of cells LNCEL_name_l800 LNCEL_name_l1800 LNCEL_name_l2300 LNCEL_name_l2600 LNCEL_name_l1800s lte_analysis_logic_df["num_cells"] = lte_analysis_logic_df[ [ "LNCEL_name_l800", "LNCEL_name_l1800", "LNCEL_name_l2300", "LNCEL_name_l2600", "LNCEL_name_l1800s", ] ].count(axis=1) # Add Number of cell with KPI lte_analysis_logic_df["num_cell_with_kpi"] = lte_analysis_logic_df[ [ "avg_prb_usage_bh_l800", "avg_prb_usage_bh_l1800", "avg_prb_usage_bh_l2300", "avg_prb_usage_bh_l2600", "avg_prb_usage_bh_l1800s", ] ].count(axis=1) # Number of Down or No KPI cells = num_cells -num_cell_with_kpi lte_analysis_logic_df["num_down_or_no_kpi_cells"] = ( lte_analysis_logic_df["num_cells"] - lte_analysis_logic_df["num_cell_with_kpi"] ) # Check Max difference between avg_prb_usage_bh_l800 avg_prb_usage_bh_l1800 avg_prb_usage_bh_l2300 avg_prb_usage_bh_l2600 avg_prb_usage_bh_l1800s lte_analysis_logic_df["prb_diff_between_cells"] = lte_analysis_logic_df[ [ "avg_prb_usage_bh_l800", "avg_prb_usage_bh_l1800", "avg_prb_usage_bh_l2300", "avg_prb_usage_bh_l2600", "avg_prb_usage_bh_l1800s", ] ].apply(lambda row: max(row) - min(row), axis=1) # Add Load balance required column = Yes if prb_diff_between_cells > prb_diff_between_cells_threshold else No lte_analysis_logic_df["load_balance_required"] = lte_analysis_logic_df[ "prb_diff_between_cells" ].apply(lambda x: "Yes" if x > prb_diff_between_cells_threshold else "No") # Add Next band column lte_analysis_logic_df["next_band"] = lte_analysis_logic_df["site_config_band"].map( LteCapacity.next_band_mapping ) # Add congestion comments # if num_congested_cells == 0 and num_down_or_no_kpi_cells == 0 = " No Congestion" # if num_congested_cells == 0 and num_down_or_no_kpi_cells > 0 = "No congestion but Down cell" # if num_congested_cells > 0 and num_down_or_no_kpi_cells > 0 = "Congestion but Colocated Down Cell" # Else Need Action conditions = [ (lte_analysis_logic_df["num_congested_cells"] == 0) & (lte_analysis_logic_df["num_down_or_no_kpi_cells"] == 0), (lte_analysis_logic_df["num_congested_cells"] == 0) & (lte_analysis_logic_df["num_down_or_no_kpi_cells"] > 0), (lte_analysis_logic_df["num_congested_cells"] > 0) & (lte_analysis_logic_df["num_down_or_no_kpi_cells"] > 0), ] choices = [ "No Congestion", "No congestion but Down cell", "Congestion but Colocated Down Cell", ] lte_analysis_logic_df["congestion_comment"] = np.select( conditions, choices, default="Need Action" ) # Add "Actions" column # if load_balance_required = "Yes" and congestion_comment = "Need Action" then "Load Balancing parameter tuning required" # if load_balance_required = "Yes" and congestion_comment = "Need Action" then "Add Layer" # Else keep congestion_comment conditions = [ (lte_analysis_logic_df["load_balance_required"] == "Yes") & (lte_analysis_logic_df["congestion_comment"] == "Need Action"), (lte_analysis_logic_df["load_balance_required"] == "No") & (lte_analysis_logic_df["congestion_comment"] == "Need Action"), ] choices = [ "Load Balancing parameter tuning required", "Add Layer", ] lte_analysis_logic_df["actions"] = np.select( conditions, choices, default=lte_analysis_logic_df["congestion_comment"] ) # Add Final Comments # if "actions" = "Add Layer" then "'Add' + 'next_band'' # Else keep "actions" as it is lte_analysis_logic_df["final_comments"] = lte_analysis_logic_df.apply( lambda row: ( f"Add {row['next_band']}" if row["actions"] == "Add Layer" else row["actions"] ), axis=1, ) # create column "sector" equal to conteent of "LNCEL_name_l800" if not empty else "LNCEL_name_l1800" if not empty else "LNCEL_name_l2300" lte_analysis_logic_df["sector"] = ( lte_analysis_logic_df["LNCEL_name_l800"] .combine_first(lte_analysis_logic_df["LNCEL_name_l1800"]) .combine_first(lte_analysis_logic_df["LNCEL_name_l2300"]) .combine_first(lte_analysis_logic_df["LNCEL_name_l2600"]) .combine_first(lte_analysis_logic_df["LNCEL_name_l1800s"]) ) # remove rows where sector is empty lte_analysis_logic_df = lte_analysis_logic_df[ lte_analysis_logic_df["sector"].notna() ] # Add sector_id column if sector contains : '_1_" then 1 elif sector contains : '_2_" then 2 elif sector contains : '_3_" then 3 lte_analysis_logic_df["sector_id"] = np.where( lte_analysis_logic_df["sector"].str.contains("_1_"), 1, np.where( lte_analysis_logic_df["sector"].str.contains("_2_"), 2, np.where(lte_analysis_logic_df["sector"].str.contains("_3_"), 3, np.nan), ), ) # add code_sector column by combine code and sector_id lte_analysis_logic_df["code_sector"] = ( lte_analysis_logic_df["code"].astype(str) + "_" + lte_analysis_logic_df["sector_id"].astype(str) ) # remove '.0' from code_sector lte_analysis_logic_df["code_sector"] = lte_analysis_logic_df[ "code_sector" ].str.replace(".0", "") # lte_analysis_logic_df = lte_analysis_logic_df[LTE_ANALYSIS_COLUMNS] return lte_analysis_logic_df def dfs_per_band_cell(df: pd.DataFrame) -> pd.DataFrame: # Base DataFrame with unique codes, Region, and site_config_band all_codes_df = df[ ["code", "Region", "site_config_band", "Longitude", "Latitude"] ].drop_duplicates() # Configuration for sector groups and their respective LNCEL patterns and column suffixes # Format: { "group_key": [(lncel_name_pattern_part, column_suffix), ...] } # lncel_name_pattern_part will be combined with "_" or similar # Example: for group "1", pattern "_1_L800" gives suffix "l800" sector_groups_config = { "1": [ ("_1_L800", "l800"), ("_1_L1800", "l1800"), ("_1_L2300", "l2300"), ("_1_L2600", "l2600"), ("_1S_L1800", "l1800s"), ], "2": [ ("_2_L800", "l800"), ("_2_L1800", "l1800"), ("_2_L2300", "l2300"), ("_2_L2600", "l2600"), ("_2S_L1800", "l1800s"), ], "3": [ ("_3_L800", "l800"), ("_3_L1800", "l1800"), ("_3_L2300", "l2300"), ("_3_L2600", "l2600"), ("_3S_L1800", "l1800s"), ], } all_processed_sectors_dfs = [] for sector_group_key, band_configurations in sector_groups_config.items(): # Start with the base DataFrame for the current sector group current_sector_group_df = all_codes_df.copy() for lncel_name_pattern, column_suffix in band_configurations: # Filter the original DataFrame for the current LNCEL pattern # The pattern assumes LNCEL_name contains something like "SITENAME" filtered_band_df = df[df["LNCEL_name"].str.contains(lncel_name_pattern)] # Select relevant columns and rename them for the merge # This avoids pandas automatically adding _x, _y suffixes and then needing to rename them df_to_merge = filtered_band_df[ [ "code", "LNCEL_name", "avg_prb_usage_bh", "avg_prb_usage_bh_2nd", "avg_act_ues", "avg_dl_thp", "avg_ul_thp", ] ].rename( columns={ "LNCEL_name": f"LNCEL_name_{column_suffix}", "avg_prb_usage_bh": f"avg_prb_usage_bh_{column_suffix}", "avg_prb_usage_bh_2nd": f"avg_prb_usage_bh_{column_suffix}_2nd", "avg_act_ues": f"avg_act_ues_{column_suffix}", "avg_dl_thp": f"avg_dl_thp_{column_suffix}", "avg_ul_thp": f"avg_ul_thp_{column_suffix}", } ) # Perform a left merge current_sector_group_df = pd.merge( current_sector_group_df, df_to_merge, on="code", how="left" ) all_processed_sectors_dfs.append(current_sector_group_df) # Concatenate all the processed sector DataFrames all_sectors_dfs = pd.concat(all_processed_sectors_dfs, axis=0, ignore_index=True) # save_dataframe(all_sectors_dfs, "all_sectors_dfs.csv") return all_sectors_dfs def lte_database_for_capacity(dump_path: str): dfs = process_lte_data(dump_path) lte_fdd = dfs[0] lte_tdd = dfs[1] lte_fdd = lte_fdd[LTE_DATABASE_COLUMNS] lte_tdd = lte_tdd[LTE_DATABASE_COLUMNS] lte_db = pd.concat([lte_fdd, lte_tdd], axis=0) # rename final_name to LNCEL_name lte_db = lte_db.rename(columns={"final_name": "LNCEL_name"}) # save_dataframe(lte_db, "LTE_Database.csv") return lte_db def lte_bh_dfs_per_kpi( dump_path: str, df: pd.DataFrame, number_of_kpi_days: int = 7, availability_threshold: int = 95, prb_usage_threshold: int = 80, prb_diff_between_cells_threshold: int = 20, number_of_threshold_days: int = 3, main_prb_to_use: str = "", ) -> pd.DataFrame: # print(df.columns) pivoted_kpi_dfs = create_dfs_per_kpi( df=df, pivot_date_column="date", pivot_name_column="LNCEL_name", kpi_columns_from=2, ) cell_availability_df = cell_availability_analysis( df=pivoted_kpi_dfs["Cell_Avail_excl_BLU"], days=number_of_kpi_days, availability_threshold=availability_threshold, ) prb_usage_df = analyze_prb_usage( df=pivoted_kpi_dfs["E_UTRAN_Avg_PRB_usage_per_TTI_DL"], number_of_kpi_days=number_of_kpi_days, prb_usage_threshold=prb_usage_threshold, analysis_type="BH", number_of_threshold_days=number_of_threshold_days, suffix="" if main_prb_to_use == "E-UTRAN Avg PRB usage per TTI DL" else "_2nd", ) prb_lev10_usage_df = analyze_prb_usage( df=pivoted_kpi_dfs["DL_PRB_Util_p_TTI_Lev_10"], number_of_kpi_days=number_of_kpi_days, prb_usage_threshold=prb_usage_threshold, analysis_type="BH", number_of_threshold_days=number_of_threshold_days, suffix="" if main_prb_to_use == "DL PRB Util p TTI Lev_10" else "_2nd", ) act_ues_df = pivoted_kpi_dfs["Avg_act_UEs_DL"] # Add Max and avg columns for act_ues_df act_ues_df["max_act_ues"] = act_ues_df.max(axis=1) act_ues_df["avg_act_ues"] = act_ues_df.mean(axis=1) dl_thp_df = pivoted_kpi_dfs["Avg_PDCP_cell_thp_DL"] # Add Max and avg columns for dl_thp_df dl_thp_df["max_dl_thp"] = dl_thp_df.max(axis=1) dl_thp_df["avg_dl_thp"] = dl_thp_df.mean(axis=1) ul_thp_df = pivoted_kpi_dfs["Avg_PDCP_cell_thp_UL"] # Add Max and avg columns for ul_thp_df ul_thp_df["max_ul_thp"] = ul_thp_df.max(axis=1) ul_thp_df["avg_ul_thp"] = ul_thp_df.mean(axis=1) bh_kpi_df = pd.concat( [ cell_availability_df, prb_lev10_usage_df, prb_usage_df, act_ues_df, dl_thp_df, ul_thp_df, ], axis=1, ) bh_kpi_df = bh_kpi_df.reset_index() prb_df = bh_kpi_df[PRB_COLUMNS] # drop row if lnCEL_name is empty or 1 prb_df = prb_df[prb_df["LNCEL_name"].str.len() > 3] # prb_df = prb_df.reset_index() prb_df = prb_df.droplevel(level=1, axis=1) # Drop the first level (date) # prb_df = prb_df.reset_index() # prb_df["code"] = prb_df["LNCEL_name"].str.split("_").str[0] lte_db = lte_database_for_capacity(dump_path) db_and_prb = pd.merge(lte_db, prb_df, on="LNCEL_name", how="left") # if avg_prb_usage_bh is "" then set it to "cell exists in dump but not in BH report" # db_and_prb.loc[db_and_prb["avg_prb_usage_bh"].isnull(), "avg_prb_usage_bh"] = ( # "cell exists in dump but not in BH report" # ) # drop row if lnCEL_name is empty or 1 db_and_prb = db_and_prb[db_and_prb["LNCEL_name"].str.len() > 3] lte_analysis_df = dfs_per_band_cell(db_and_prb) lte_analysis_df = lte_analysis_logic( lte_analysis_df, prb_usage_threshold, prb_diff_between_cells_threshold, ) lte_analysis_df = lte_analysis_df[LTE_ANALYSIS_COLUMNS] # Rename columns lte_analysis_df = lte_analysis_df.rename( columns={ "LNCEL_name_l800": "name_l800", "LNCEL_name_l1800": "name_l1800", "LNCEL_name_l2300": "name_l2300", "LNCEL_name_l2600": "name_l2600", "LNCEL_name_l1800s": "name_l1800s", "avg_prb_usage_bh_l800": "prb_l800", "avg_prb_usage_bh_l1800": "prb_l1800", "avg_prb_usage_bh_l2300": "prb_l2300", "avg_prb_usage_bh_l2600": "prb_l2600", "avg_prb_usage_bh_l1800s": "prb_l1800s", "avg_prb_usage_bh_l800_2nd": "prb_l800_2nd", "avg_prb_usage_bh_l1800_2nd": "prb_l1800_2nd", "avg_prb_usage_bh_l2300_2nd": "prb_l2300_2nd", "avg_prb_usage_bh_l2600_2nd": "prb_l2600_2nd", "avg_prb_usage_bh_l1800s_2nd": "prb_l1800s_2nd", "avg_act_ues_l800": "act_ues_l800", "avg_act_ues_l1800": "act_ues_l1800", "avg_act_ues_l2300": "act_ues_l2300", "avg_act_ues_l2600": "act_ues_l2600", "avg_act_ues_l1800s": "act_ues_l1800s", "avg_dl_thp_l800": "dl_thp_l800", "avg_dl_thp_l1800": "dl_thp_l1800", "avg_dl_thp_l2300": "dl_thp_l2300", "avg_dl_thp_l2600": "dl_thp_l2600", "avg_dl_thp_l1800s": "dl_thp_l1800s", "avg_ul_thp_l800": "ul_thp_l800", "avg_ul_thp_l1800": "ul_thp_l1800", "avg_ul_thp_l2300": "ul_thp_l2300", "avg_ul_thp_l2600": "ul_thp_l2600", "avg_ul_thp_l1800s": "ul_thp_l1800s", } ) return [bh_kpi_df, lte_analysis_df] def process_lte_bh_report( dump_path: str, bh_report_path: str, num_last_days: int, num_threshold_days: int, availability_threshold: float, prb_usage_threshold: float, prb_diff_between_cells_threshold: float, main_prb_to_use: str, ) -> dict: """ Process LTE Busy Hour report and perform capacity analysis Args: bh_report_path: Path to BH report CSV file num_last_days: Number of last days for analysis num_threshold_days: Number of days for threshold calculation availability_threshold: Minimum required availability prb_usage_threshold: Maximum allowed PRB usage prb_diff_between_cells_threshold: Maximum allowed PRB usage difference between cells Returns: Dictionary containing analysis results and DataFrames """ LteCapacity.final_results = None # lte_db_dfs = lte_database_for_capacity(dump_path) # Read BH report df = pd.read_csv(bh_report_path, delimiter=";") df = kpi_naming_cleaning(df) # print(df.columns) df = create_hourly_date(df) df = df[KPI_COLUMNS] pivoted_kpi_dfs = lte_bh_dfs_per_kpi( dump_path=dump_path, df=df, number_of_kpi_days=num_last_days, availability_threshold=availability_threshold, prb_usage_threshold=prb_usage_threshold, prb_diff_between_cells_threshold=prb_diff_between_cells_threshold, number_of_threshold_days=num_threshold_days, main_prb_to_use=main_prb_to_use, ) # save_dataframe(pivoted_kpi_dfs, "LTE_BH_Report.csv") return pivoted_kpi_dfs