import re import numpy as np import pandas as pd class GsmAnalysis: hf_rate_coef = { 10: 1.1, 20: 1.2, 40: 1.4, 60: 1.6, 70: 1.7, 80: 1.8, 99: 2.0, 100: 1.4, } erlangB_table = { 1: 0.0204, 2: 0.2234, 3: 0.6022, 4: 1.092, 5: 1.657, 6: 2.276, 7: 2.935, 8: 3.627, 9: 4.345, 10: 5.084, 11: 5.841, 12: 6.614, 13: 7.401, 14: 8.2, 15: 9.009, 16: 9.828, 17: 10.66, 18: 11.49, 19: 12.33, 20: 13.18, 21: 14.04, 22: 14.9, 23: 15.76, 24: 16.63, 25: 17.5, 26: 18.38, 27: 19.26, 28: 20.15, 29: 21.04, 30: 21.93, 31: 22.83, 32: 23.72, 33: 24.63, 34: 25.53, 35: 26.43, 36: 27.34, 37: 28.25, 38: 29.17, 39: 30.08, 40: 31, 41: 31.91, 42: 32.84, 43: 33.76, 44: 34.68, 45: 35.61, 46: 36.53, 47: 37.46, 48: 38.39, 49: 39.32, 50: 40.25, 51: 41.19, 52: 42.12, 53: 43.06, 54: 44, 55: 44.93, 56: 45.88, 57: 46.81, 58: 47.75, 59: 48.7, 60: 49.64, 61: 50.59, 62: 51.53, 63: 52.48, 64: 53.43, 65: 54.38, 66: 55.32, 67: 56.27, 68: 57.22, 69: 58.18, 70: 59.13, 71: 60.08, 72: 61.04, 73: 61.99, 74: 62.94, 75: 63.9, 76: 64.86, 77: 65.81, 78: 66.77, 79: 67.73, 80: 68.69, 81: 69.64, 82: 70.61, 83: 71.57, 84: 72.53, 85: 73.49, 86: 74.45, 87: 75.41, 88: 76.38, 89: 77.34, 90: 78.3, 91: 79.27, 92: 80.23, 93: 81.2, 94: 82.16, 95: 83.13, 96: 84.09, 97: 85.06, 98: 86.03, 99: 87, 100: 87.97, 101: 88.94, 102: 89.91, 103: 90.88, 104: 91.85, 105: 92.82, 106: 93.79, 107: 94.76, 108: 95.73, 109: 96.71, 110: 97.68, 111: 98.65, 112: 99.63, 113: 100.6, 114: 101.57, 115: 102.54, 116: 103.52, 117: 104.49, 118: 105.47, 119: 106.44, 120: 107.42, 121: 108.4, 122: 109.37, 123: 110.35, 124: 111.32, 125: 112.3, 126: 113.28, 127: 114.25, 128: 115.23, 129: 116.21, 130: 117.19, 131: 118.17, 132: 119.15, 133: 120.12, 134: 121.1, 135: 122.08, 136: 123.07, 137: 124.04, 138: 125.02, 139: 126.01341, 140: 127.00918, 141: 127.96752, 142: 128.98152, 143: 129.92152, 144: 130.88534, 145: 131.96461, 146: 132.89897, 147: 133.86373, 148: 134.82569, 149: 135.76295, 150: 136.82988, 151: 137.79, 152: 138.77, 153: 139.75, 154: 140.74, 155: 141.72, 156: 142.7, 157: 143.69, 158: 144.67, 159: 145.66, 160: 146.64, 161: 147.63, 162: 148.61, 163: 149.6, 164: 150.58, 165: 151.57, 166: 152.55, 167: 153.54, 168: 154.53, 169: 155.51, 170: 156.5, 171: 157.48, 172: 158.47, 173: 159.46, 174: 160.44, 175: 161.43, 176: 162.42, 177: 163.41, 178: 164.39, 179: 165.38, 180: 166.37, 181: 167.36, 182: 168.35, 183: 169.33, 184: 170.32, 185: 171.31, 186: 172.3, 187: 173.29, 188: 174.28, 189: 175.27, 190: 176.26, 191: 177.25, 192: 178.24, 193: 179.23, 194: 180.22, 195: 181.21, 196: 182.2, 197: 183.19, 198: 184.18, 199: 185.17, 200: 186.16, } class GsmCapacity: final_results = None operational_neighbours_df = None final_comment_mapping = { "Availability and TX issues": "Operational issues with no congestion", "Availability issues": "Operational issues with no congestion", "TX issues": "Operational issues with no congestion", "Operational is OK": "Operational is OK with no congestion", "Tch utilization exceeded threshold, Availability and TX issues": "High utilization with Operational issues", "Tch utilization exceeded threshold, Availability issues": "High utilization with Operational issues", "Tch utilization exceeded threshold, TX issues": "High utilization with Operational issues", "Tch utilization exceeded threshold, SDCCH blocking exceeded threshold, Operational is OK": "High Utilization with Congestion without Operational issues", "Tch utilization exceeded threshold, TCH blocking exceeded threshold, Operational is OK": "High Utilization with Congestion without Operational issues", "Tch utilization exceeded threshold, TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, Operational is OK": "High Utilization with Congestion without Operational issues", "Tch utilization exceeded threshold, TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, TX issues": "High Utilization with Congestion without Operational issues", "Tch utilization exceeded threshold, SDCCH blocking exceeded threshold, Availability and TX issues": "High utilization with Congestion and operational issues", "Tch utilization exceeded threshold, SDCCH blocking exceeded threshold, TX issues": "High utilization with Congestion and operational issues", "Tch utilization exceeded threshold, TCH blocking exceeded threshold, Availability and TX issues": "High utilization with Congestion and operational issues", "Tch utilization exceeded threshold, TCH blocking exceeded threshold, Availability issues": "High utilization with Congestion and operational issues", "Tch utilization exceeded threshold, TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, Availability and TX issues": "High utilization with Congestion and operational issues", "Tch utilization exceeded threshold, TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, Availability issues": "High utilization with Congestion and operational issues", "Tch utilization exceeded threshold, TCH blocking exceeded threshold, TX issues": "High utilization with Congestion and operational issues", "Down Site": "Down Cell", "SDCCH blocking exceeded threshold, Operational is OK": "Congestion without Operational issues", "TCH blocking exceeded threshold, Operational is OK": "Congestion without Operational issues", "TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, Operational is OK": "Congestion without Operational issues", "Tch utilization exceeded threshold, Operational is OK": "High utilization without Congestion and Operational issues", "SDCCH blocking exceeded threshold, Availability and TX issues": "Congestion with Operational issues", "SDCCH blocking exceeded threshold, Availability issues": "Congestion with Operational issues", "SDCCH blocking exceeded threshold, TX issues": "Congestion with Operational issues", "TCH blocking exceeded threshold, Availability and TX issues": "Congestion with Operational issues", "TCH blocking exceeded threshold, Availability issues": "Congestion with Operational issues", "TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, Availability and TX issues": "Congestion with Operational issues", "TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, Availability issues": "Congestion with Operational issues", "TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, TX issues": "Congestion with Operational issues", "TCH blocking exceeded threshold, TX issues": "Congestion with Operational issues", } def combine_comments(df: pd.DataFrame, *columns: str, new_column: str) -> pd.DataFrame: """ Combine comments from multiple columns into one column. Args: df: DataFrame containing comment columns *columns: Variable number of column names containing comments new_column: Name for the new combined comments column Returns: DataFrame with a new column containing combined comments """ result_df = df.copy() result_df[new_column] = result_df[list(columns)].apply( lambda row: ", ".join([str(x) for x in row if x]), axis=1 ) # Trim all trailing commas result_df[new_column] = result_df[new_column].str.replace( r"^[,\s]+|[,\s]+$", "", regex=True ) # Replace multiple commas with a single comma result_df[new_column] = result_df[new_column].str.replace( r",\s*,", ", ", regex=True ) return result_df def summarize_fails_comments(comment): if not comment or pd.isna(comment) or comment.strip() == "": return "" # Extract all `rrc_fail_xxx` fields matches = re.findall(r"rrc_fail_([a-z_]+)", comment) if not matches: return "" # Remove duplicates, sort alphabetically unique_sorted = sorted(set(matches)) # Combine and add 'fails' return ", ".join(unique_sorted) + " fails" def kpi_naming_cleaning(df: pd.DataFrame) -> pd.DataFrame: """ Clean KPI column names by replacing special characters and standardizing format. Args: df: DataFrame with KPI column names to clean Returns: DataFrame with cleaned column names """ name_df: pd.DataFrame = df.copy() name_df.columns = name_df.columns.str.replace("[ /(),-.']", "_", regex=True) name_df.columns = name_df.columns.str.replace("___", "_") name_df.columns = name_df.columns.str.replace("__", "_") name_df.columns = name_df.columns.str.replace("%", "perc") name_df.columns = name_df.columns.str.rstrip("_") return name_df def create_daily_date(df: pd.DataFrame) -> pd.DataFrame: """ Create a daily date column from PERIOD_START_TIME and drop unnecessary columns. Args: df: DataFrame containing PERIOD_START_TIME column Returns: DataFrame with new date column and unnecessary columns removed """ date_df: pd.DataFrame = df.copy() date_df[["mois", "jour", "annee"]] = date_df["PERIOD_START_TIME"].str.split( ".", expand=True ) date_df["date"] = date_df["annee"] + "-" + date_df["mois"] + "-" + date_df["jour"] # Remove unnecessary columns date_df = date_df.drop(["annee", "mois", "jour", "PERIOD_START_TIME"], axis=1) return date_df def create_hourly_date(df: pd.DataFrame) -> pd.DataFrame: date_df: pd.DataFrame = df date_df[["date_t", "hour"]] = date_df["PERIOD_START_TIME"].str.split( " ", expand=True ) date_df[["mois", "jour", "annee"]] = date_df["date_t"].str.split(".", expand=True) date_df["datetime"] = ( date_df["annee"] + "-" + date_df["mois"] + "-" + date_df["jour"] + " " + date_df["hour"] ) date_df["date"] = date_df["annee"] + "-" + date_df["mois"] + "-" + date_df["jour"] # Remove columns 'année' and 'mois' date_df = date_df.drop( ["annee", "mois", "jour", "date_t", "PERIOD_START_TIME"], axis=1 ) return date_df def create_dfs_per_kpi( df: pd.DataFrame = None, pivot_date_column: str = "date", pivot_name_column: str = "BTS_name", kpi_columns_from: int = None, ) -> pd.DataFrame: """ Create pivoted DataFrames for each KPI and perform analysis. Args: df: DataFrame containing KPI data Returns: DataFrame with combined analysis results """ kpi_columns = df.columns[kpi_columns_from:] pivoted_kpi_dfs = {} # Loop through each KPI and create pivoted DataFrames for kpi in kpi_columns: temp_df = df[[pivot_date_column, pivot_name_column, kpi]].copy() # remove duplicates temp_df = temp_df.drop_duplicates( subset=[pivot_name_column, pivot_date_column], keep="first" ) temp_df = temp_df.reset_index() # Pivot the dataframe pivot_df = temp_df.pivot( index=pivot_name_column, columns=pivot_date_column, values=kpi ) pivot_df.columns = pd.MultiIndex.from_product([[kpi], pivot_df.columns]) pivot_df.columns.names = ["KPI", "Date"] # Store in dictionary with KPI name as key pivoted_kpi_dfs[kpi] = pivot_df return pivoted_kpi_dfs def cell_availability_analysis( df: pd.DataFrame, days: int = 7, availability_threshold: int = 95, analysis_type: str = "daily", ) -> pd.DataFrame: """ Analyze cell availability and categorize sites based on availability metrics. Args: df: DataFrame containing cell availability data days: Number of days to analyze Returns: DataFrame with availability analysis and site status comments """ result_df: pd.DataFrame = df.copy().fillna(0) last_days_df: pd.DataFrame = result_df.iloc[:, -days:] result_df[f"Average_cell_availability_{analysis_type.lower()}"] = last_days_df.mean( axis=1 ).round(2) # Count the number of days above threshold result_df[ f"number_of_days_exceeding_availability_threshold_{analysis_type.lower()}" ] = last_days_df.apply( lambda row: sum(1 for x in row if x <= availability_threshold), axis=1 ) # Categorize sites based on availability def categorize_availability(x: float) -> str: if x == 0 or pd.isnull(x): return "Down Site" elif 0 < x <= 70: return "critical instability" elif 70 < x <= availability_threshold: return "instability" else: return "Availability OK" result_df[f"availability_comment_{analysis_type.lower()}"] = result_df[ f"Average_cell_availability_{analysis_type.lower()}" ].apply(categorize_availability) return result_df def analyze_tch_abis_fails( df: pd.DataFrame, number_of_kpi_days: int, analysis_type: str, number_of_threshold_days: int, tch_abis_fails_threshold: int, ) -> pd.DataFrame: result_df: pd.DataFrame = df.copy() last_days_df: pd.DataFrame = result_df.iloc[:, -number_of_kpi_days:] # last_days_df = last_days_df.fillna(0) result_df[f"avg_tch_abis_fail_{analysis_type.lower()}"] = last_days_df.mean( axis=1 ).round(2) result_df[f"max_tch_abis_fail_{analysis_type.lower()}"] = last_days_df.max(axis=1) # Count the number of days above threshold result_df[f"number_of_days_with_tch_abis_fail_exceeded_{analysis_type.lower()}"] = ( last_days_df.apply( lambda row: sum(1 for x in row if x >= tch_abis_fails_threshold), axis=1 ) ) # Add the daily_tch_comment : if number_of_days_with_tch_abis_fail_exceeded_daily is >= number_of_threshold_days : tch abis fail exceeded threshold , else : None result_df[f"tch_abis_fail_{analysis_type.lower()}_comment"] = np.where( result_df[f"number_of_days_with_tch_abis_fail_exceeded_{analysis_type.lower()}"] >= number_of_threshold_days, "tch abis fail exceeded threshold", None, ) return result_df def analyze_tch_call_blocking( df: pd.DataFrame, number_of_kpi_days: int, analysis_type: str, number_of_threshold_days: int, tch_blocking_threshold: int, ) -> pd.DataFrame: result_df = df.copy() last_days_df: pd.DataFrame = result_df.iloc[:, -number_of_kpi_days:] # last_days_df = last_days_df.fillna(0) result_df[f"avg_tch_call_blocking_{analysis_type.lower()}"] = last_days_df.mean( axis=1 ).round(2) result_df[f"max_tch_call_blocking_{analysis_type.lower()}"] = last_days_df.max( axis=1 ) # Count the number of days above threshold result_df[f"number_of_days_with_tch_blocking_exceeded_{analysis_type.lower()}"] = ( last_days_df.apply( lambda row: sum(1 for x in row if x >= tch_blocking_threshold), axis=1 ) ) # Add the daily_tch_comment : if number_of_days_with_tch_blocking_exceeded_daily is >= number_of_threshold_days : tch blocking exceeded threshold , else : None result_df[f"tch_call_blocking_{analysis_type.lower()}_comment"] = np.where( result_df[f"number_of_days_with_tch_blocking_exceeded_{analysis_type.lower()}"] >= number_of_threshold_days, "TCH blocking exceeded threshold", None, ) return result_df def analyze_sdcch_call_blocking( df: pd.DataFrame, number_of_kpi_days: int, sdcch_blocking_threshold: int, analysis_type: str, number_of_threshold_days: int, ) -> pd.DataFrame: result_df = df.copy() last_days_df: pd.DataFrame = result_df.iloc[:, -number_of_kpi_days:] # last_days_df = last_days_df.fillna(0) result_df[f"avg_sdcch_real_blocking_{analysis_type.lower()}"] = last_days_df.mean( axis=1 ).round(2) result_df[f"max_sdcch_real_blocking_{analysis_type.lower()}"] = last_days_df.max( axis=1 ) # Count the number of days above threshold result_df[ f"number_of_days_with_sdcch_blocking_exceeded_{analysis_type.lower()}" ] = last_days_df.apply( lambda row: sum(1 for x in row if x >= sdcch_blocking_threshold), axis=1 ) # add daily_sdcch_comment : if number_of_days_with_sdcch_blocking_exceeded_daily is >= number_of_threshold_days : sdcch blocking exceeded threshold , else : None result_df[f"sdcch_real_blocking_{analysis_type.lower()}_comment"] = np.where( result_df[ f"number_of_days_with_sdcch_blocking_exceeded_{analysis_type.lower()}" ] >= number_of_threshold_days, "SDCCH blocking exceeded threshold", None, ) return result_df class LteCapacity: final_results = None # Next band mapping next_band_mapping = { "L1800": "L800", "L800": "L1800", "L1800/L800": "L2600", "L1800/L2300/L800": "L2600", "L2300/L800": "L2600", "L1800/L2600/L800": "New site/Dual Beam", "L1800/L2300/L2600/L800": "New site/Dual Beam", "L2300": "FDD H// colocated site", } def analyze_prb_usage( df: pd.DataFrame, number_of_kpi_days: int, prb_usage_threshold: int, analysis_type: str, number_of_threshold_days: int, suffix: str = "", ) -> pd.DataFrame: result_df = df.copy() last_days_df: pd.DataFrame = result_df.iloc[:, -number_of_kpi_days:] # last_days_df = last_days_df.fillna(0) result_df[f"avg_prb_usage_{analysis_type.lower()}{suffix}"] = last_days_df.mean( axis=1 ).round(2) result_df[f"max_prb_usage_{analysis_type.lower()}{suffix}"] = last_days_df.max( axis=1 ) # Count the number of days above threshold result_df[ f"number_of_days_with_prb_usage_exceeded_{analysis_type.lower()}{suffix}" ] = last_days_df.apply( lambda row: sum(1 for x in row if x >= prb_usage_threshold), axis=1 ) # Add the daily_prb_comment : if number_of_days_with_prb_usage_exceeded_daily is >= number_of_threshold_days : prb usage exceeded threshold , else : None result_df[f"prb_usage_{analysis_type.lower()}{suffix}_comment"] = np.where( result_df[ f"number_of_days_with_prb_usage_exceeded_{analysis_type.lower()}{suffix}" ] >= number_of_threshold_days, "PRB usage exceeded threshold", None, ) return result_df def analyze_fails_kpi( df: pd.DataFrame, number_of_kpi_days: int, number_of_threshold_days: int, kpi_threshold: int, kpi_column_name: str, ) -> pd.DataFrame: result_df: pd.DataFrame = df.copy() last_days_df: pd.DataFrame = result_df.iloc[:, -number_of_kpi_days:] # last_days_df = last_days_df.fillna(0) result_df[f"avg_{kpi_column_name}"] = last_days_df.mean(axis=1).round(2) result_df[f"max_{kpi_column_name}"] = last_days_df.max(axis=1) # Count the number of days above threshold result_df[f"number_of_days_with_{kpi_column_name}_exceeded"] = last_days_df.apply( lambda row: sum(1 for x in row if x >= kpi_threshold), axis=1 ) # Add the {kpi_column_name}_comment : if number_of_days_with_{kpi_column_name}_exceeded_daily is >= number_of_threshold_days : {kpi_column_name} exceeded threshold , else : None result_df[f"{kpi_column_name}_comment"] = np.where( result_df[f"number_of_days_with_{kpi_column_name}_exceeded"] >= number_of_threshold_days, f"{kpi_column_name} exceeded threshold", None, ) return result_df def analyze_lcg_utilization( df: pd.DataFrame, number_of_kpi_days: int, number_of_threshold_days: int, kpi_threshold: int, kpi_column_name: str, ) -> pd.DataFrame: result_df: pd.DataFrame = df.copy() last_days_df: pd.DataFrame = result_df.iloc[:, -number_of_kpi_days:] # last_days_df = last_days_df.fillna(0) result_df[f"avg_{kpi_column_name}"] = last_days_df.mean(axis=1).round(2) result_df[f"max_{kpi_column_name}"] = last_days_df.max(axis=1) # Count the number of days above threshold result_df[f"number_of_days_with_{kpi_column_name}_exceeded"] = last_days_df.apply( lambda row: sum(1 for x in row if x >= kpi_threshold), axis=1 ) # Add the {kpi_column_name}_comment : if number_of_days_with_{kpi_column_name}_exceeded_daily is >= number_of_threshold_days : {kpi_column_name} exceeded threshold , else : None result_df[f"{kpi_column_name}_comment"] = np.where( result_df[f"number_of_days_with_{kpi_column_name}_exceeded"] >= number_of_threshold_days, f"{kpi_column_name} exceeded threshold", None, ) return result_df