db_query / utils /kpi_analysis_utils.py
DavMelchi's picture
Add LCG analysis Part1
4d0848d
import re
import numpy as np
import pandas as pd
class GsmAnalysis:
hf_rate_coef = {
10: 1.1,
20: 1.2,
40: 1.4,
60: 1.6,
70: 1.7,
80: 1.8,
99: 2.0,
100: 1.4,
}
erlangB_table = {
1: 0.0204,
2: 0.2234,
3: 0.6022,
4: 1.092,
5: 1.657,
6: 2.276,
7: 2.935,
8: 3.627,
9: 4.345,
10: 5.084,
11: 5.841,
12: 6.614,
13: 7.401,
14: 8.2,
15: 9.009,
16: 9.828,
17: 10.66,
18: 11.49,
19: 12.33,
20: 13.18,
21: 14.04,
22: 14.9,
23: 15.76,
24: 16.63,
25: 17.5,
26: 18.38,
27: 19.26,
28: 20.15,
29: 21.04,
30: 21.93,
31: 22.83,
32: 23.72,
33: 24.63,
34: 25.53,
35: 26.43,
36: 27.34,
37: 28.25,
38: 29.17,
39: 30.08,
40: 31,
41: 31.91,
42: 32.84,
43: 33.76,
44: 34.68,
45: 35.61,
46: 36.53,
47: 37.46,
48: 38.39,
49: 39.32,
50: 40.25,
51: 41.19,
52: 42.12,
53: 43.06,
54: 44,
55: 44.93,
56: 45.88,
57: 46.81,
58: 47.75,
59: 48.7,
60: 49.64,
61: 50.59,
62: 51.53,
63: 52.48,
64: 53.43,
65: 54.38,
66: 55.32,
67: 56.27,
68: 57.22,
69: 58.18,
70: 59.13,
71: 60.08,
72: 61.04,
73: 61.99,
74: 62.94,
75: 63.9,
76: 64.86,
77: 65.81,
78: 66.77,
79: 67.73,
80: 68.69,
81: 69.64,
82: 70.61,
83: 71.57,
84: 72.53,
85: 73.49,
86: 74.45,
87: 75.41,
88: 76.38,
89: 77.34,
90: 78.3,
91: 79.27,
92: 80.23,
93: 81.2,
94: 82.16,
95: 83.13,
96: 84.09,
97: 85.06,
98: 86.03,
99: 87,
100: 87.97,
101: 88.94,
102: 89.91,
103: 90.88,
104: 91.85,
105: 92.82,
106: 93.79,
107: 94.76,
108: 95.73,
109: 96.71,
110: 97.68,
111: 98.65,
112: 99.63,
113: 100.6,
114: 101.57,
115: 102.54,
116: 103.52,
117: 104.49,
118: 105.47,
119: 106.44,
120: 107.42,
121: 108.4,
122: 109.37,
123: 110.35,
124: 111.32,
125: 112.3,
126: 113.28,
127: 114.25,
128: 115.23,
129: 116.21,
130: 117.19,
131: 118.17,
132: 119.15,
133: 120.12,
134: 121.1,
135: 122.08,
136: 123.07,
137: 124.04,
138: 125.02,
139: 126.01341,
140: 127.00918,
141: 127.96752,
142: 128.98152,
143: 129.92152,
144: 130.88534,
145: 131.96461,
146: 132.89897,
147: 133.86373,
148: 134.82569,
149: 135.76295,
150: 136.82988,
151: 137.79,
152: 138.77,
153: 139.75,
154: 140.74,
155: 141.72,
156: 142.7,
157: 143.69,
158: 144.67,
159: 145.66,
160: 146.64,
161: 147.63,
162: 148.61,
163: 149.6,
164: 150.58,
165: 151.57,
166: 152.55,
167: 153.54,
168: 154.53,
169: 155.51,
170: 156.5,
171: 157.48,
172: 158.47,
173: 159.46,
174: 160.44,
175: 161.43,
176: 162.42,
177: 163.41,
178: 164.39,
179: 165.38,
180: 166.37,
181: 167.36,
182: 168.35,
183: 169.33,
184: 170.32,
185: 171.31,
186: 172.3,
187: 173.29,
188: 174.28,
189: 175.27,
190: 176.26,
191: 177.25,
192: 178.24,
193: 179.23,
194: 180.22,
195: 181.21,
196: 182.2,
197: 183.19,
198: 184.18,
199: 185.17,
200: 186.16,
}
class GsmCapacity:
final_results = None
operational_neighbours_df = None
final_comment_mapping = {
"Availability and TX issues": "Operational issues with no congestion",
"Availability issues": "Operational issues with no congestion",
"TX issues": "Operational issues with no congestion",
"Operational is OK": "Operational is OK with no congestion",
"Tch utilization exceeded threshold, Availability and TX issues": "High utilization with Operational issues",
"Tch utilization exceeded threshold, Availability issues": "High utilization with Operational issues",
"Tch utilization exceeded threshold, TX issues": "High utilization with Operational issues",
"Tch utilization exceeded threshold, SDCCH blocking exceeded threshold, Operational is OK": "High Utilization with Congestion without Operational issues",
"Tch utilization exceeded threshold, TCH blocking exceeded threshold, Operational is OK": "High Utilization with Congestion without Operational issues",
"Tch utilization exceeded threshold, TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, Operational is OK": "High Utilization with Congestion without Operational issues",
"Tch utilization exceeded threshold, TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, TX issues": "High Utilization with Congestion without Operational issues",
"Tch utilization exceeded threshold, SDCCH blocking exceeded threshold, Availability and TX issues": "High utilization with Congestion and operational issues",
"Tch utilization exceeded threshold, SDCCH blocking exceeded threshold, TX issues": "High utilization with Congestion and operational issues",
"Tch utilization exceeded threshold, TCH blocking exceeded threshold, Availability and TX issues": "High utilization with Congestion and operational issues",
"Tch utilization exceeded threshold, TCH blocking exceeded threshold, Availability issues": "High utilization with Congestion and operational issues",
"Tch utilization exceeded threshold, TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, Availability and TX issues": "High utilization with Congestion and operational issues",
"Tch utilization exceeded threshold, TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, Availability issues": "High utilization with Congestion and operational issues",
"Tch utilization exceeded threshold, TCH blocking exceeded threshold, TX issues": "High utilization with Congestion and operational issues",
"Down Site": "Down Cell",
"SDCCH blocking exceeded threshold, Operational is OK": "Congestion without Operational issues",
"TCH blocking exceeded threshold, Operational is OK": "Congestion without Operational issues",
"TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, Operational is OK": "Congestion without Operational issues",
"Tch utilization exceeded threshold, Operational is OK": "High utilization without Congestion and Operational issues",
"SDCCH blocking exceeded threshold, Availability and TX issues": "Congestion with Operational issues",
"SDCCH blocking exceeded threshold, Availability issues": "Congestion with Operational issues",
"SDCCH blocking exceeded threshold, TX issues": "Congestion with Operational issues",
"TCH blocking exceeded threshold, Availability and TX issues": "Congestion with Operational issues",
"TCH blocking exceeded threshold, Availability issues": "Congestion with Operational issues",
"TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, Availability and TX issues": "Congestion with Operational issues",
"TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, Availability issues": "Congestion with Operational issues",
"TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, TX issues": "Congestion with Operational issues",
"TCH blocking exceeded threshold, TX issues": "Congestion with Operational issues",
}
def combine_comments(df: pd.DataFrame, *columns: str, new_column: str) -> pd.DataFrame:
"""
Combine comments from multiple columns into one column.
Args:
df: DataFrame containing comment columns
*columns: Variable number of column names containing comments
new_column: Name for the new combined comments column
Returns:
DataFrame with a new column containing combined comments
"""
result_df = df.copy()
result_df[new_column] = result_df[list(columns)].apply(
lambda row: ", ".join([str(x) for x in row if x]), axis=1
)
# Trim all trailing commas
result_df[new_column] = result_df[new_column].str.replace(
r"^[,\s]+|[,\s]+$", "", regex=True
)
# Replace multiple commas with a single comma
result_df[new_column] = result_df[new_column].str.replace(
r",\s*,", ", ", regex=True
)
return result_df
def summarize_fails_comments(comment):
if not comment or pd.isna(comment) or comment.strip() == "":
return ""
# Extract all `rrc_fail_xxx` fields
matches = re.findall(r"rrc_fail_([a-z_]+)", comment)
if not matches:
return ""
# Remove duplicates, sort alphabetically
unique_sorted = sorted(set(matches))
# Combine and add 'fails'
return ", ".join(unique_sorted) + " fails"
def kpi_naming_cleaning(df: pd.DataFrame) -> pd.DataFrame:
"""
Clean KPI column names by replacing special characters and standardizing format.
Args:
df: DataFrame with KPI column names to clean
Returns:
DataFrame with cleaned column names
"""
name_df: pd.DataFrame = df.copy()
name_df.columns = name_df.columns.str.replace("[ /(),-.']", "_", regex=True)
name_df.columns = name_df.columns.str.replace("___", "_")
name_df.columns = name_df.columns.str.replace("__", "_")
name_df.columns = name_df.columns.str.replace("%", "perc")
name_df.columns = name_df.columns.str.rstrip("_")
return name_df
def create_daily_date(df: pd.DataFrame) -> pd.DataFrame:
"""
Create a daily date column from PERIOD_START_TIME and drop unnecessary columns.
Args:
df: DataFrame containing PERIOD_START_TIME column
Returns:
DataFrame with new date column and unnecessary columns removed
"""
date_df: pd.DataFrame = df.copy()
date_df[["mois", "jour", "annee"]] = date_df["PERIOD_START_TIME"].str.split(
".", expand=True
)
date_df["date"] = date_df["annee"] + "-" + date_df["mois"] + "-" + date_df["jour"]
# Remove unnecessary columns
date_df = date_df.drop(["annee", "mois", "jour", "PERIOD_START_TIME"], axis=1)
return date_df
def create_hourly_date(df: pd.DataFrame) -> pd.DataFrame:
date_df: pd.DataFrame = df
date_df[["date_t", "hour"]] = date_df["PERIOD_START_TIME"].str.split(
" ", expand=True
)
date_df[["mois", "jour", "annee"]] = date_df["date_t"].str.split(".", expand=True)
date_df["datetime"] = (
date_df["annee"]
+ "-"
+ date_df["mois"]
+ "-"
+ date_df["jour"]
+ " "
+ date_df["hour"]
)
date_df["date"] = date_df["annee"] + "-" + date_df["mois"] + "-" + date_df["jour"]
# Remove columns 'année' and 'mois'
date_df = date_df.drop(
["annee", "mois", "jour", "date_t", "PERIOD_START_TIME"], axis=1
)
return date_df
def create_dfs_per_kpi(
df: pd.DataFrame = None,
pivot_date_column: str = "date",
pivot_name_column: str = "BTS_name",
kpi_columns_from: int = None,
) -> pd.DataFrame:
"""
Create pivoted DataFrames for each KPI and perform analysis.
Args:
df: DataFrame containing KPI data
Returns:
DataFrame with combined analysis results
"""
kpi_columns = df.columns[kpi_columns_from:]
pivoted_kpi_dfs = {}
# Loop through each KPI and create pivoted DataFrames
for kpi in kpi_columns:
temp_df = df[[pivot_date_column, pivot_name_column, kpi]].copy()
# remove duplicates
temp_df = temp_df.drop_duplicates(
subset=[pivot_name_column, pivot_date_column], keep="first"
)
temp_df = temp_df.reset_index()
# Pivot the dataframe
pivot_df = temp_df.pivot(
index=pivot_name_column, columns=pivot_date_column, values=kpi
)
pivot_df.columns = pd.MultiIndex.from_product([[kpi], pivot_df.columns])
pivot_df.columns.names = ["KPI", "Date"]
# Store in dictionary with KPI name as key
pivoted_kpi_dfs[kpi] = pivot_df
return pivoted_kpi_dfs
def cell_availability_analysis(
df: pd.DataFrame,
days: int = 7,
availability_threshold: int = 95,
analysis_type: str = "daily",
) -> pd.DataFrame:
"""
Analyze cell availability and categorize sites based on availability metrics.
Args:
df: DataFrame containing cell availability data
days: Number of days to analyze
Returns:
DataFrame with availability analysis and site status comments
"""
result_df: pd.DataFrame = df.copy().fillna(0)
last_days_df: pd.DataFrame = result_df.iloc[:, -days:]
result_df[f"Average_cell_availability_{analysis_type.lower()}"] = last_days_df.mean(
axis=1
).round(2)
# Count the number of days above threshold
result_df[
f"number_of_days_exceeding_availability_threshold_{analysis_type.lower()}"
] = last_days_df.apply(
lambda row: sum(1 for x in row if x <= availability_threshold), axis=1
)
# Categorize sites based on availability
def categorize_availability(x: float) -> str:
if x == 0 or pd.isnull(x):
return "Down Site"
elif 0 < x <= 70:
return "critical instability"
elif 70 < x <= availability_threshold:
return "instability"
else:
return "Availability OK"
result_df[f"availability_comment_{analysis_type.lower()}"] = result_df[
f"Average_cell_availability_{analysis_type.lower()}"
].apply(categorize_availability)
return result_df
def analyze_tch_abis_fails(
df: pd.DataFrame,
number_of_kpi_days: int,
analysis_type: str,
number_of_threshold_days: int,
tch_abis_fails_threshold: int,
) -> pd.DataFrame:
result_df: pd.DataFrame = df.copy()
last_days_df: pd.DataFrame = result_df.iloc[:, -number_of_kpi_days:]
# last_days_df = last_days_df.fillna(0)
result_df[f"avg_tch_abis_fail_{analysis_type.lower()}"] = last_days_df.mean(
axis=1
).round(2)
result_df[f"max_tch_abis_fail_{analysis_type.lower()}"] = last_days_df.max(axis=1)
# Count the number of days above threshold
result_df[f"number_of_days_with_tch_abis_fail_exceeded_{analysis_type.lower()}"] = (
last_days_df.apply(
lambda row: sum(1 for x in row if x >= tch_abis_fails_threshold), axis=1
)
)
# Add the daily_tch_comment : if number_of_days_with_tch_abis_fail_exceeded_daily is >= number_of_threshold_days : tch abis fail exceeded threshold , else : None
result_df[f"tch_abis_fail_{analysis_type.lower()}_comment"] = np.where(
result_df[f"number_of_days_with_tch_abis_fail_exceeded_{analysis_type.lower()}"]
>= number_of_threshold_days,
"tch abis fail exceeded threshold",
None,
)
return result_df
def analyze_tch_call_blocking(
df: pd.DataFrame,
number_of_kpi_days: int,
analysis_type: str,
number_of_threshold_days: int,
tch_blocking_threshold: int,
) -> pd.DataFrame:
result_df = df.copy()
last_days_df: pd.DataFrame = result_df.iloc[:, -number_of_kpi_days:]
# last_days_df = last_days_df.fillna(0)
result_df[f"avg_tch_call_blocking_{analysis_type.lower()}"] = last_days_df.mean(
axis=1
).round(2)
result_df[f"max_tch_call_blocking_{analysis_type.lower()}"] = last_days_df.max(
axis=1
)
# Count the number of days above threshold
result_df[f"number_of_days_with_tch_blocking_exceeded_{analysis_type.lower()}"] = (
last_days_df.apply(
lambda row: sum(1 for x in row if x >= tch_blocking_threshold), axis=1
)
)
# Add the daily_tch_comment : if number_of_days_with_tch_blocking_exceeded_daily is >= number_of_threshold_days : tch blocking exceeded threshold , else : None
result_df[f"tch_call_blocking_{analysis_type.lower()}_comment"] = np.where(
result_df[f"number_of_days_with_tch_blocking_exceeded_{analysis_type.lower()}"]
>= number_of_threshold_days,
"TCH blocking exceeded threshold",
None,
)
return result_df
def analyze_sdcch_call_blocking(
df: pd.DataFrame,
number_of_kpi_days: int,
sdcch_blocking_threshold: int,
analysis_type: str,
number_of_threshold_days: int,
) -> pd.DataFrame:
result_df = df.copy()
last_days_df: pd.DataFrame = result_df.iloc[:, -number_of_kpi_days:]
# last_days_df = last_days_df.fillna(0)
result_df[f"avg_sdcch_real_blocking_{analysis_type.lower()}"] = last_days_df.mean(
axis=1
).round(2)
result_df[f"max_sdcch_real_blocking_{analysis_type.lower()}"] = last_days_df.max(
axis=1
)
# Count the number of days above threshold
result_df[
f"number_of_days_with_sdcch_blocking_exceeded_{analysis_type.lower()}"
] = last_days_df.apply(
lambda row: sum(1 for x in row if x >= sdcch_blocking_threshold), axis=1
)
# add daily_sdcch_comment : if number_of_days_with_sdcch_blocking_exceeded_daily is >= number_of_threshold_days : sdcch blocking exceeded threshold , else : None
result_df[f"sdcch_real_blocking_{analysis_type.lower()}_comment"] = np.where(
result_df[
f"number_of_days_with_sdcch_blocking_exceeded_{analysis_type.lower()}"
]
>= number_of_threshold_days,
"SDCCH blocking exceeded threshold",
None,
)
return result_df
class LteCapacity:
final_results = None
# Next band mapping
next_band_mapping = {
"L1800": "L800",
"L800": "L1800",
"L1800/L800": "L2600",
"L1800/L2300/L800": "L2600",
"L2300/L800": "L2600",
"L1800/L2600/L800": "New site/Dual Beam",
"L1800/L2300/L2600/L800": "New site/Dual Beam",
"L2300": "FDD H// colocated site",
}
def analyze_prb_usage(
df: pd.DataFrame,
number_of_kpi_days: int,
prb_usage_threshold: int,
analysis_type: str,
number_of_threshold_days: int,
suffix: str = "",
) -> pd.DataFrame:
result_df = df.copy()
last_days_df: pd.DataFrame = result_df.iloc[:, -number_of_kpi_days:]
# last_days_df = last_days_df.fillna(0)
result_df[f"avg_prb_usage_{analysis_type.lower()}{suffix}"] = last_days_df.mean(
axis=1
).round(2)
result_df[f"max_prb_usage_{analysis_type.lower()}{suffix}"] = last_days_df.max(
axis=1
)
# Count the number of days above threshold
result_df[
f"number_of_days_with_prb_usage_exceeded_{analysis_type.lower()}{suffix}"
] = last_days_df.apply(
lambda row: sum(1 for x in row if x >= prb_usage_threshold), axis=1
)
# Add the daily_prb_comment : if number_of_days_with_prb_usage_exceeded_daily is >= number_of_threshold_days : prb usage exceeded threshold , else : None
result_df[f"prb_usage_{analysis_type.lower()}{suffix}_comment"] = np.where(
result_df[
f"number_of_days_with_prb_usage_exceeded_{analysis_type.lower()}{suffix}"
]
>= number_of_threshold_days,
"PRB usage exceeded threshold",
None,
)
return result_df
def analyze_fails_kpi(
df: pd.DataFrame,
number_of_kpi_days: int,
number_of_threshold_days: int,
kpi_threshold: int,
kpi_column_name: str,
) -> pd.DataFrame:
result_df: pd.DataFrame = df.copy()
last_days_df: pd.DataFrame = result_df.iloc[:, -number_of_kpi_days:]
# last_days_df = last_days_df.fillna(0)
result_df[f"avg_{kpi_column_name}"] = last_days_df.mean(axis=1).round(2)
result_df[f"max_{kpi_column_name}"] = last_days_df.max(axis=1)
# Count the number of days above threshold
result_df[f"number_of_days_with_{kpi_column_name}_exceeded"] = last_days_df.apply(
lambda row: sum(1 for x in row if x >= kpi_threshold), axis=1
)
# Add the {kpi_column_name}_comment : if number_of_days_with_{kpi_column_name}_exceeded_daily is >= number_of_threshold_days : {kpi_column_name} exceeded threshold , else : None
result_df[f"{kpi_column_name}_comment"] = np.where(
result_df[f"number_of_days_with_{kpi_column_name}_exceeded"]
>= number_of_threshold_days,
f"{kpi_column_name} exceeded threshold",
None,
)
return result_df
def analyze_lcg_utilization(
df: pd.DataFrame,
number_of_kpi_days: int,
number_of_threshold_days: int,
kpi_threshold: int,
kpi_column_name: str,
) -> pd.DataFrame:
result_df: pd.DataFrame = df.copy()
last_days_df: pd.DataFrame = result_df.iloc[:, -number_of_kpi_days:]
# last_days_df = last_days_df.fillna(0)
result_df[f"avg_{kpi_column_name}"] = last_days_df.mean(axis=1).round(2)
result_df[f"max_{kpi_column_name}"] = last_days_df.max(axis=1)
# Count the number of days above threshold
result_df[f"number_of_days_with_{kpi_column_name}_exceeded"] = last_days_df.apply(
lambda row: sum(1 for x in row if x >= kpi_threshold), axis=1
)
# Add the {kpi_column_name}_comment : if number_of_days_with_{kpi_column_name}_exceeded_daily is >= number_of_threshold_days : {kpi_column_name} exceeded threshold , else : None
result_df[f"{kpi_column_name}_comment"] = np.where(
result_df[f"number_of_days_with_{kpi_column_name}_exceeded"]
>= number_of_threshold_days,
f"{kpi_column_name} exceeded threshold",
None,
)
return result_df