""" normative_calculator.py - v2 Utility functions for computing z-scores and percentiles for any biomarker contained in *Table_1_summary_measure.xlsx*. Author: Lars Masanneck 06-05-2025 """ from __future__ import annotations import math import pathlib import warnings from typing import Dict, Iterable, List, Sequence, Union import pandas as pd from scipy import stats from datetime import datetime ############################################################################### # Public API (re-exported in __all__) ############################################################################### __all__ = [ "load_normative_table", "compute_normative_position", "add_normative_columns", "categorize_bmi", "compute_skew_corrected_position", ] ############################################################################### # Constant category mappings ############################################################################### # BMI categories (WHO definition) _BMI_BOUNDS: List[tuple[float, float, str]] = [ (0, 18.5, "Underweight"), (18.5, 25, "Healthy"), (25, 30, "Overweight"), (30, math.inf, "Obesity"), ] ############################################################################### # Helper functions – categories & loading ############################################################################### def _categorize(value: float, bounds: Sequence[tuple]) -> str: """Return category *label* for *value* given (lower, upper, label) tuples.""" for lower, upper, label in bounds: if lower <= value < upper: return label raise ValueError(f"{value} outside defined bounds.") def categorize_bmi(bmi: Union[str, float]) -> str: """Map numeric BMI to the table's BMI category strings.""" if isinstance(bmi, str): return bmi.strip().capitalize() return _categorize(float(bmi), _BMI_BOUNDS) def _categorize_age(age: Union[str, int], normative_df: pd.DataFrame) -> str: """Return an age‐group string for a numeric age, or pass through if already a string.""" if isinstance(age, str): return age.strip() for grp in normative_df["Age"].unique(): grp = grp.strip() if "-" in grp: lo, hi = grp.split("-", 1) try: lo_i, hi_i = int(lo), int(hi) except ValueError: continue if lo_i <= age <= hi_i: return grp elif grp.endswith("+"): try: lo_i = int(grp[:-1]) except ValueError: continue if age >= lo_i: return grp raise ValueError(f"No normative age group found for age {age!r}.") def load_normative_table(path): path = pathlib.Path(path) if not path.exists(): raise FileNotFoundError(path) # columns to keep as strings str_cols = ["Age", "area", "gender", "Bmi", "Biomarkers", "nb_category"] # columns to cast to floats (recovering numbers from any date‐formatted cells) float_cols = [ "min", "max", "median", "q1", "q3", "iqr", "mad", "mean", "sd", "se", "ci", ] def parse_num(x): # Excel‐formatted dates get parsed into datetime; map back to original float: if isinstance(x, datetime): # if year is in the future (e.g. 3183 → original was 3183.xx), # treat year as integer part and month as two‐digit fractional if x.year > datetime.now().year: return x.year + x.month / 100 # otherwise (small numbers like 5.06 → parsed as 2025-06-05), # use day as integer and month as two‐digit fractional return x.day + x.month / 100 # non‐dates: just a normal float cast (coerce errors to NA) try: return float(x) except Exception: return pd.NA # build your converters converters = {col: str for col in str_cols} converters.update({col: parse_num for col in float_cols}) # read the normative table (Excel or CSV) with our converters if path.suffix.lower() == ".csv": df = pd.read_csv(path, converters=converters) else: df = pd.read_excel(path, converters=converters) # ensure string cols are truly str dtype for c in str_cols: df[c] = df[c].astype(str) df.columns = df.columns.str.strip() return df ############################################################################### # Core calculus ############################################################################### def _extract_stats( normative_df: pd.DataFrame, biomarker: str, age_group: str, region: str, gender: str, bmi_category: str, ) -> Dict[str, Union[float, str]]: """Return all summary statistics for the requested stratum.""" mask = ( (normative_df["Biomarkers"].str.lower() == biomarker.lower()) & (normative_df["Age"].str.lower() == age_group.lower()) & (normative_df["area"].str.lower() == region.lower()) & (normative_df["gender"].str.lower() == gender.lower()) & (normative_df["Bmi"].str.lower() == bmi_category.lower()) ) subset = normative_df.loc[mask] if subset.empty: raise KeyError("No normative stats found for the specified stratum.") if len(subset) > 1: warnings.warn( "Multiple normative rows found; using the first one (check your table)." ) row = subset.iloc[0] # Some versions of the table label sample size as "n" instead of "nb_category" n_col = "nb_category" if "nb_category" in row else "n" n_raw = row[n_col] n = str(row[n_col]) return { "median": float(row["median"]), "q1": float(row["q1"]), "q3": float(row["q3"]), "iqr": float(row["iqr"]), "mad": float(row["mad"]), "mean": float(row["mean"]), "sd": float(row["sd"]), "se": float(row["se"]), "ci": float(row["ci"]), "n": n, } def z_score(value: float, mean: float, sd: float) -> float: """Compute z-score; returns NaN if SD is 0.""" if sd == 0: return float("nan") return (value - mean) / sd def percentile_from_z(z: float) -> float: """Convert z-score to percentile (0-100).""" return float(stats.norm.cdf(z) * 100) def compute_normative_position( *, value: float, biomarker: str, age_group: Union[str, int], region: str, gender: str, bmi: Union[str, float], normative_df: pd.DataFrame, ) -> Dict[str, Union[float, str]]: """ Compute where a single measurement falls relative to a normative distribution. Parameters ---------- value : float Raw measurement for the specified biomarker. biomarker : str Name of the biomarker (must match a value in the "Biomarkers" column of `normative_df`). age_group : Union[str, int] Either: - A string age-group label (e.g. "40-49") matching `normative_df["Age"]`, or - An integer age, which will be mapped into the correct age-group bracket. region : str Region name matching `normative_df["area"]` (case-insensitive). gender : str Gender label matching `normative_df["gender"]` (case-insensitive). bmi : Union[str, float] Either: - A string BMI category (e.g. "Healthy"), or - A numeric BMI value, which will be bucketed into WHO categories. normative_df : pd.DataFrame Table of normative summary statistics as returned by `load_normative_table`. Returns ------- Dict[str, Union[float, str]] A dictionary containing: - "z_score" (float): the computed z-score, - "percentile" (float): the percentile (0–100), - "mean" (float): the normative mean, - "sd" (float): the normative standard deviation, - "n" (str): the sample-size category string from the normative table. - "median" (float): the normative median, - "q1" (float): the first quartile, - "q3" (float): the third quartile, - "iqr" (float): the interquartile range, - "mad" (float): the median absolute deviation, - "se" (float): the standard error, - "ci" (float): the confidence interval. Raises ------ KeyError If no matching stratum is found in `normative_df`. ValueError If an integer `age_group` cannot be mapped to any age bracket. """ # allow numeric age inputs by mapping them to the correct "Age" group age_group_str = _categorize_age(age_group, normative_df) bmi_cat = categorize_bmi(bmi) stats_d = _extract_stats( normative_df=normative_df, biomarker=biomarker, age_group=age_group_str, region=region, gender=gender, bmi_category=bmi_cat, ) z = z_score(value, stats_d["mean"], stats_d["sd"]) pct = percentile_from_z(z) return { "z_score": z, "percentile": pct, "mean": stats_d["mean"], "sd": stats_d["sd"], "n": stats_d["n"], "median": stats_d["median"], "q1": stats_d["q1"], "q3": stats_d["q3"], "iqr": stats_d["iqr"], "mad": stats_d["mad"], "se": stats_d["se"], "ci": stats_d["ci"], } ############################################################################### # Batch processing helper ############################################################################### def _compute_for_row( row: pd.Series, biomarker: str, normative_df: pd.DataFrame, age_col: str, region_col: str, gender_col: str, bmi_col: str, value_col: str, ): try: res = compute_normative_position( value=row[value_col], biomarker=biomarker, age_group=row[age_col], region=row[region_col], gender=row[gender_col], bmi=row[bmi_col], normative_df=normative_df, ) return pd.Series( [res["z_score"], res["percentile"]], index=[f"{biomarker}_z", f"{biomarker}_pct"], ) except Exception as exc: # pragma: no cover warnings.warn(str(exc)) return pd.Series( [float("nan"), float("nan")], index=[f"{biomarker}_z", f"{biomarker}_pct"] ) def add_normative_columns( df: pd.DataFrame, *, biomarkers: Iterable[str], normative_df: pd.DataFrame, age_col: str = "Age", region_col: str = "area", gender_col: str = "gender", bmi_col: str = "Bmi", value_cols: dict[str, str] | None = None, output_prefixes: dict[str, str] | None = None, ) -> pd.DataFrame: """ Append z-score and percentile columns for multiple biomarkers, with optional custom prefixes for the output column names. Parameters ---------- df : pd.DataFrame Participant-level data, must include demographic columns and raw biomarker values. biomarkers : Iterable[str] List of biomarker names to process. normative_df : pd.DataFrame Normative summary table as loaded by `load_normative_table`. age_col : str, default "Age" Column in `df` containing age-group labels or integer ages. region_col : str, default "area" Column in `df` matching the "area" field in `normative_df`. gender_col : str, default "gender" Column in `df` matching the "gender" field in `normative_df`. bmi_col : str, default "Bmi" Column in `df` containing BMI values or categories. value_cols : dict[str, str], optional Mapping from each biomarker name to the column in `df` that holds its raw numeric value. Defaults to identity mapping. output_prefixes : dict[str, str], optional Mapping from each biomarker name to the prefix to use for the output columns. Defaults to using the biomarker name itself. Returns ------- pd.DataFrame A copy of `df` with two new columns for each biomarker: `_z` and `_pct`. """ value_cols = value_cols or {bm: bm for bm in biomarkers} output_prefixes = output_prefixes or {} out = df.copy() for bm in biomarkers: prefix = output_prefixes.get(bm, bm) out[[f"{prefix}_z", f"{prefix}_pct"]] = df.apply( _compute_for_row, axis=1, biomarker=bm, normative_df=normative_df, age_col=age_col, region_col=region_col, gender_col=gender_col, bmi_col=bmi_col, value_col=value_cols[bm], ) return out # Add a function for skew-corrected z-score calculation def compute_skew_corrected_position( value: float, mean: float, sd: float, median: float ) -> dict[str, float]: """Compute skew-corrected z-score and percentile using Pearson Type III distribution.""" # Pearson's moment coefficient of skewness if sd == 0: skewness = float("nan") else: skewness = 3 * (mean - median) / sd # Build Pearson Type III distribution (gamma-based) dist = stats.pearson3(skewness, loc=mean, scale=sd) # Compute percentile under skewed model p = dist.cdf(value) # Back-transform to standard normal z-score z_corr = stats.norm.ppf(p) return {"z_skew_corrected": z_corr, "percentile_skew_corrected": float(p * 100)}