db_query / process_kpi /process_wbts_capacity.py
DavMelchi's picture
add avail and Abis checking BH
c005a67
import pandas as pd
from utils.kpi_analysis_utils import (
cell_availability_analysis,
combine_comments,
create_daily_date,
create_dfs_per_kpi,
kpi_naming_cleaning,
)
class WbtsCapacity:
final_results: pd.DataFrame = None
def check_deviation(row: pd.Series, max_diff: float = 3.0, type: str = "") -> str:
"""
Check if any value in the row deviates more than max_diff from the most common value.
Args:
row: Series of values to check for deviation
max_diff: Maximum allowed difference from the most common value
type: Type identifier for the deviation message
Returns:
A message indicating deviation if found, otherwise an empty string
"""
numeric_row = row.astype(float) # Ensure numeric
mode_series = numeric_row.mode()
# Safe fallback in case mode is empty
most_common = mode_series.iloc[0] if not mode_series.empty else numeric_row.iloc[0]
diffs = abs(numeric_row - most_common)
if (diffs > max_diff).any():
return f"{type} Deviation > {max_diff} detected"
else:
return ""
def max_used_bb_subunits_analysis(
df: pd.DataFrame,
days: int = 7,
threshold: int = 80,
number_of_threshold_days: int = 3,
) -> pd.DataFrame:
"""
Analyze maximum used baseband subunits and identify sites needing upgrades.
Args:
df: DataFrame containing baseband utilization data
days: Number of days to analyze
threshold: Utilization threshold percentage for flagging
number_of_threshold_days: Minimum days above threshold to flag for upgrade
Returns:
DataFrame with analysis results and upgrade recommendations
"""
result_df = df.copy()
last_days_df = result_df.iloc[:, -days:]
last_days_df = last_days_df.fillna(0)
result_df["Average_used_bb_ratio"] = last_days_df.mean(axis=1).round(2)
# Count the number of days above threshold
result_df["bb_number_of_days_exceeding_threshold"] = last_days_df.apply(
lambda row: sum(1 for x in row if x >= threshold), axis=1
)
# Initialize comment column
result_df["Average_used_bb_ratio_comment"] = ""
# Apply condition for upgrade recommendation
result_df.loc[
(result_df["bb_number_of_days_exceeding_threshold"] >= number_of_threshold_days)
& (result_df["Average_used_bb_ratio"] >= threshold),
"Average_used_bb_ratio_comment",
] = "need BB upgrade"
return result_df
def max_used_ce_analysis(
df: pd.DataFrame,
days: int = 7,
threshold: int = 80,
number_of_threshold_days: int = 3,
) -> pd.DataFrame:
"""
Analyze maximum used channel elements and identify sites needing upgrades.
Args:
df: DataFrame containing channel element utilization data
days: Number of days to analyze
threshold: Utilization threshold percentage for flagging
number_of_threshold_days: Minimum days above threshold to flag for upgrade
Returns:
DataFrame with analysis results and upgrade recommendations
"""
result_df = df.copy().fillna(0)
last_days_df = result_df.iloc[:, -days:]
result_df["Average_used_ce_ratio"] = last_days_df.mean(axis=1).round(2)
# Count the number of days above threshold
result_df["ce_number_of_days_exceeding_threshold"] = last_days_df.apply(
lambda row: sum(1 for x in row if x >= threshold), axis=1
)
# Initialize comment column
result_df["Average_used_ce_ratio_comment"] = ""
# Apply condition for upgrade recommendation
result_df.loc[
(result_df["ce_number_of_days_exceeding_threshold"] >= number_of_threshold_days)
& (result_df["Average_used_ce_ratio"] >= threshold),
"Average_used_ce_ratio_comment",
] = "need CE upgrade"
return result_df
def num_bb_subunits_analysis(df: pd.DataFrame, days: int = 3) -> pd.DataFrame:
"""
Analyze baseband subunit count for deviations.
Args:
df: DataFrame containing baseband subunit count data
days: Number of days to analyze
Returns:
DataFrame with deviation analysis comments
"""
result_df = df.copy()
last_days_df = result_df.iloc[:, -days:]
result_df["num_bb_subunits_comment"] = last_days_df.apply(
lambda row: check_deviation(row, type="bb"), axis=1
)
return result_df
def avail_ce_analysis(df: pd.DataFrame, days: int = 7) -> pd.DataFrame:
"""
Analyze available channel elements for deviations.
Args:
df: DataFrame containing available channel element data
days: Number of days to analyze
Returns:
DataFrame with deviation analysis comments
"""
result_df = df.copy()
last_days_df = result_df.iloc[:, -days:]
result_df["avail_ce_comment"] = last_days_df.apply(
lambda row: check_deviation(row, max_diff=96, type="ce"), axis=1
)
return result_df
def bb_comments_analysis(df: pd.DataFrame) -> pd.DataFrame:
"""
Combine baseband related comments into a single column.
Args:
df: DataFrame containing baseband comment columns
Returns:
DataFrame with combined baseband comments
"""
return combine_comments(
df,
"num_bb_subunits_comment",
"Average_used_bb_ratio_comment",
"availability_comment_daily",
new_column="bb_comments",
)
def ce_comments_analysis(df: pd.DataFrame) -> pd.DataFrame:
"""
Combine channel element related comments into a single column.
Args:
df: DataFrame containing channel element comment columns
Returns:
DataFrame with combined channel element comments
"""
return combine_comments(
df,
"avail_ce_comment",
"Average_used_ce_ratio_comment",
"availability_comment_daily",
new_column="ce_comments",
)
def wbts_kpi_analysis(
df: pd.DataFrame,
num_days: int = 7,
threshold: int = 80,
number_of_threshold_days: int = 3,
) -> pd.DataFrame:
"""
Create pivoted DataFrames for each KPI and perform analysis.
Args:
df: DataFrame containing KPI data
num_days: Number of days to analyze
threshold: Utilization threshold percentage for flagging
number_of_threshold_days: Minimum days above threshold to flag for upgrade
Returns:
DataFrame with combined analysis results
"""
# kpi_columns = df.columns[5:]
pivoted_kpi_dfs = {}
pivoted_kpi_dfs = create_dfs_per_kpi(
df=df, pivot_date_column="date", pivot_name_column="DN", kpi_columns_from=5
)
# Extract individual KPI DataFrames
wbts_name_df = pivoted_kpi_dfs["WBTS_name"].iloc[:, 0]
licensed_ce_df = pivoted_kpi_dfs["LICENSED_R99CE_WBTS_M5008C48"]
max_used_ce_dl_df = pivoted_kpi_dfs["MAX_USED_CE_R99_DL_M5008C12"]
max_used_ce_ul_df = pivoted_kpi_dfs["MAX_USED_CE_R99_UL_M5008C15"]
max_avail_ce_df = pivoted_kpi_dfs["MAX_AVAIL_R99_CE_M5006C0"]
max_used_bb_subunits_df = pivoted_kpi_dfs["MAX_USED_BB_SUBUNITS_M5008C38"]
num_bb_subunits_df = pivoted_kpi_dfs["NUM_BB_SUBUNITS_M5008C39"]
max_bb_sus_util_ratio_df = pivoted_kpi_dfs["Max_BB_SUs_Util_ratio"]
cell_availability_df = pivoted_kpi_dfs[
"Cell_Availability_excluding_blocked_by_user_state_BLU"
]
total_cs_traffic_df = pivoted_kpi_dfs["Total_CS_traffic_Erl"]
total_data_traffic_df = pivoted_kpi_dfs["Total_Data_Traffic"]
max_used_ce_ratio_flexi_df = pivoted_kpi_dfs["Max_Used_CE_s_ratio_Flexi_R2"]
# Perform analysis on each KPI DataFrame
max_bb_sus_util_ratio_df = max_used_bb_subunits_analysis(
max_bb_sus_util_ratio_df, num_days, threshold, number_of_threshold_days
)
cell_availability_df = cell_availability_analysis(cell_availability_df, num_days)
max_used_ce_ratio_flexi_df = max_used_ce_analysis(
max_used_ce_ratio_flexi_df, num_days, threshold, number_of_threshold_days
)
num_bb_subunits_df = num_bb_subunits_analysis(num_bb_subunits_df, num_days)
licensed_ce_df = avail_ce_analysis(licensed_ce_df, num_days)
# Concatenate all DataFrames
result_df = pd.concat(
[
wbts_name_df,
licensed_ce_df,
max_used_ce_dl_df,
max_used_ce_ul_df,
max_avail_ce_df,
max_used_bb_subunits_df,
num_bb_subunits_df,
max_bb_sus_util_ratio_df,
cell_availability_df,
total_cs_traffic_df,
total_data_traffic_df,
max_used_ce_ratio_flexi_df,
],
axis=1,
)
# Add combined comments analysis
result_df = bb_comments_analysis(result_df)
result_df = ce_comments_analysis(result_df)
return result_df
def load_data(
filepath: str,
num_days: int,
threshold: int,
number_of_threshold_days: int,
) -> pd.DataFrame:
"""
Load data from CSV file and perform preprocessing and analysis.
Args:
filepath: Path to CSV file or uploaded file object
num_days: Number of days to analyze
threshold: Utilization threshold percentage for flagging
number_of_threshold_days: Minimum days above threshold to flag for upgrade
Returns:
DataFrame with processed and analyzed data
"""
df = pd.read_csv(filepath, delimiter=";")
# Preprocess data
df = create_daily_date(df)
df = kpi_naming_cleaning(df)
# Reorder columns for better organization
df = df[["date"] + [col for col in df.columns if col not in ["date"]]]
df = df[[col for col in df.columns if col != "WBTS_name"] + ["WBTS_name"]]
# Perform KPI analysis
df = wbts_kpi_analysis(df, num_days, threshold, number_of_threshold_days)
# for col, col_index in zip(df.columns, df.columns.get_indexer(df.columns)):
# print(f"Column: {col}, Index: {col_index}")
return df