db_query / process_kpi /process_lcg_capacity.py
DavMelchi's picture
Add LCG analysis Part1
4d0848d
import numpy as np
import pandas as pd
from utils.kpi_analysis_utils import (
analyze_lcg_utilization,
combine_comments,
create_daily_date,
create_dfs_per_kpi,
kpi_naming_cleaning,
)
from utils.utils_vars import get_physical_db
lcg_comments_mapping = {
"2": "No Congestion",
"1": "No Congestion",
"lcg1 exceeded threshold, lcg2 exceeded threshold, 2": "Need BB SU upgrage",
"lcg1 exceeded threshold, 2": "Need LCG balancing",
"lcg1 exceeded threshold, 1": "Need BB SU upgrage",
"lcg2 exceeded threshold, 2": "Need LCG balancing",
}
KPI_COLUMNS = [
"date",
"WBTS_name",
"lcg_id",
"BB_SU_LCG_MAX_R",
]
LCG_ANALYSIS_COLUMNS = [
"WBTS_name",
"lcg1_utilisation",
"avg_lcg1",
"max_lcg1",
"number_of_days_with_lcg1_exceeded",
"lcg1_comment",
"lcg2_utilisation",
"avg_lcg2",
"max_lcg2",
"number_of_days_with_lcg2_exceeded",
"lcg2_comment",
"difference_between_lcgs",
"difference_between_lcgs_comment",
"lcg_comment",
"number_of_lcg",
"final_comments",
]
def lcg_kpi_analysis(
df,
num_last_days,
num_threshold_days,
lcg_utilization_threshold,
difference_between_lcgs,
) -> pd.DataFrame:
"""
Analyze LCG capacity data.
Args:
df: DataFrame containing LCG capacity data
num_last_days: Number of days for analysis
num_threshold_days: Minimum days above threshold to flag for upgrade
lcg_utilization_threshold: Utilization threshold percentage for flagging
difference_between_lcgs: Difference between LCGs for flagging
Returns:
Processed DataFrame with LCG capacity analysis results
"""
lcg1_df = df[df["lcg_id"] == 1]
lcg2_df = df[df["lcg_id"] == 2]
pivoted_kpi_dfs = create_dfs_per_kpi(
df=df,
pivot_date_column="date",
pivot_name_column="WBTS_name",
kpi_columns_from=2,
)
pivoted_lcg1_df = create_dfs_per_kpi(
df=lcg1_df,
pivot_date_column="date",
pivot_name_column="WBTS_name",
kpi_columns_from=2,
)
pivoted_lcg2_df = create_dfs_per_kpi(
df=lcg2_df,
pivot_date_column="date",
pivot_name_column="WBTS_name",
kpi_columns_from=2,
)
# BB_SU_LCG_MAX_R to have all site with LCG 1 and/ or LCG 2
BB_SU_LCG_MAX_R_df = pivoted_kpi_dfs["BB_SU_LCG_MAX_R"]
pivoted_lcg1_df = pivoted_lcg1_df["BB_SU_LCG_MAX_R"]
pivoted_lcg2_df = pivoted_lcg2_df["BB_SU_LCG_MAX_R"]
# rename column
pivoted_lcg1_df = pivoted_lcg1_df.rename(
columns={"BB_SU_LCG_MAX_R": "lcg1_utilisation"}
)
pivoted_lcg2_df = pivoted_lcg2_df.rename(
columns={"BB_SU_LCG_MAX_R": "lcg2_utilisation"}
)
# analyze lcg utilization for each site per number_of_kpi_days and number_of_threshold_days
pivoted_lcg1_df = analyze_lcg_utilization(
df=pivoted_lcg1_df,
number_of_kpi_days=num_last_days,
number_of_threshold_days=num_threshold_days,
kpi_threshold=lcg_utilization_threshold,
kpi_column_name="lcg1",
)
pivoted_lcg2_df = analyze_lcg_utilization(
df=pivoted_lcg2_df,
number_of_kpi_days=num_last_days,
number_of_threshold_days=num_threshold_days,
kpi_threshold=lcg_utilization_threshold,
kpi_column_name="lcg2",
)
kpi_df = pd.concat(
[
BB_SU_LCG_MAX_R_df,
pivoted_lcg1_df,
pivoted_lcg2_df,
],
axis=1,
)
kpi_df = kpi_df.reset_index()
# Number of available lcgs
# kpi_df = pd.merge(kpi_df, available_lcgs_df, on="WBTS_name", how="left")
# calculate difference between lcg1 and lcg2
kpi_df["difference_between_lcgs"] = kpi_df[["avg_lcg1", "avg_lcg2"]].apply(
lambda row: max(row) - min(row), axis=1
)
# flag if difference between lcg1 and lcg2 is above threshold
kpi_df["difference_between_lcgs_comment"] = np.where(
kpi_df["difference_between_lcgs"] > difference_between_lcgs,
"difference between lcgs exceeded threshold",
None,
)
# Combine comments
kpi_df = combine_comments(
kpi_df,
"lcg1_comment",
"lcg2_comment",
# "difference_between_lcgs_comment",
new_column="lcg_comment",
)
# Replace if "lcg_comment" contains "nan" and ", nan" and "nan, " with None
kpi_df["lcg_comment"] = kpi_df["lcg_comment"].replace("nan", None)
# Remove "nan" from comma-separated strings
kpi_df["lcg_comment"] = (
kpi_df["lcg_comment"].str.replace(r"\bnan\b,?\s?", "", regex=True).str.strip()
)
kpi_df["number_of_lcg"] = np.where(
kpi_df["avg_lcg1"].notna() & kpi_df["avg_lcg2"].notna(),
2,
np.where(kpi_df["avg_lcg1"].notna() | kpi_df["avg_lcg2"].notna(), 1, 0),
)
# Combine comments
kpi_df = combine_comments(
kpi_df,
"lcg_comment",
"number_of_lcg",
new_column="final_comments",
)
kpi_df["final_comments"] = kpi_df["final_comments"].apply(
lambda x: lcg_comments_mapping.get(x, x)
)
kpi_df = kpi_df[LCG_ANALYSIS_COLUMNS]
lcg_analysis_df = kpi_df.copy()
lcg_analysis_df = lcg_analysis_df[
[
"WBTS_name",
"avg_lcg1",
"max_lcg1",
"number_of_days_with_lcg1_exceeded",
"lcg1_comment",
"avg_lcg2",
"max_lcg2",
"number_of_days_with_lcg2_exceeded",
"lcg2_comment",
"difference_between_lcgs",
"final_comments",
]
]
lcg_analysis_df = lcg_analysis_df.droplevel(level=1, axis=1)
# Remove row if code less than 5 characters
lcg_analysis_df = lcg_analysis_df[lcg_analysis_df["WBTS_name"].str.len() >= 5]
# Add code
lcg_analysis_df["code"] = lcg_analysis_df["WBTS_name"].str.split("_").str[0]
lcg_analysis_df["code"] = (
pd.to_numeric(lcg_analysis_df["code"], errors="coerce").fillna(0).astype(int)
)
lcg_analysis_df["Region"] = (
lcg_analysis_df["WBTS_name"].str.split("_").str[1:2].str.join("_")
)
lcg_analysis_df["Region"] = lcg_analysis_df["Region"].fillna("UNKNOWN")
# move code to the first column
lcg_analysis_df = lcg_analysis_df[
["code", "Region"]
+ [col for col in lcg_analysis_df if col != "code" and col != "Region"]
]
# Load physical database
physical_db: pd.DataFrame = get_physical_db()
# Convert code_sector to code
physical_db["code"] = physical_db["Code_Sector"].str.split("_").str[0]
# remove duplicates
physical_db = physical_db.drop_duplicates(subset="code")
# keep only code and longitude and latitude
physical_db = physical_db[["code", "Longitude", "Latitude"]]
physical_db["code"] = (
pd.to_numeric(physical_db["code"], errors="coerce").fillna(0).astype(int)
)
lcg_analysis_df = pd.merge(
lcg_analysis_df,
physical_db,
on="code",
how="left",
)
return [lcg_analysis_df, kpi_df]
def load_and_process_lcg_data(
uploaded_file,
num_last_days,
num_threshold_days,
lcg_utilization_threshold,
difference_between_lcgs,
) -> pd.DataFrame:
"""Load and process data for LCG capacity analysis."""
try:
# Load data
df = pd.read_csv(uploaded_file, delimiter=";")
if df.empty:
raise ValueError("Uploaded file is empty")
df = kpi_naming_cleaning(df)
df = create_daily_date(df)
# Validate required columns
missing_cols = [col for col in KPI_COLUMNS if col not in df.columns]
if missing_cols:
raise ValueError(f"Missing required columns: {', '.join(missing_cols)}")
df = df[KPI_COLUMNS]
# Process the data
dfs = lcg_kpi_analysis(
df,
num_last_days,
num_threshold_days,
lcg_utilization_threshold,
difference_between_lcgs,
)
return dfs
except Exception as e:
# Log the error and re-raise with a user-friendly message
error_msg = f"Error processing LCG data: {str(e)}"
st.error(error_msg)
raise