Withings_Normalization_App / normalizer_model.py
Lars Masanneck
Proper initial commit
04428af
"""
normative_calculator.py - v2
Utility functions for computing z-scores and percentiles for any biomarker
contained in *Table_1_summary_measure.xlsx*.
Author: Lars Masanneck 06-05-2025
"""
from __future__ import annotations
import math
import pathlib
import warnings
from typing import Dict, Iterable, List, Sequence, Union
import pandas as pd
from scipy import stats
from datetime import datetime
###############################################################################
# Public API (re-exported in __all__)
###############################################################################
__all__ = [
"load_normative_table",
"compute_normative_position",
"add_normative_columns",
"categorize_bmi",
"compute_skew_corrected_position",
]
###############################################################################
# Constant category mappings
###############################################################################
# BMI categories (WHO definition)
_BMI_BOUNDS: List[tuple[float, float, str]] = [
(0, 18.5, "Underweight"),
(18.5, 25, "Healthy"),
(25, 30, "Overweight"),
(30, math.inf, "Obesity"),
]
###############################################################################
# Helper functions – categories & loading
###############################################################################
def _categorize(value: float, bounds: Sequence[tuple]) -> str:
"""Return category *label* for *value* given (lower, upper, label) tuples."""
for lower, upper, label in bounds:
if lower <= value < upper:
return label
raise ValueError(f"{value} outside defined bounds.")
def categorize_bmi(bmi: Union[str, float]) -> str:
"""Map numeric BMI to the table's BMI category strings."""
if isinstance(bmi, str):
return bmi.strip().capitalize()
return _categorize(float(bmi), _BMI_BOUNDS)
def _categorize_age(age: Union[str, int], normative_df: pd.DataFrame) -> str:
"""Return an age‐group string for a numeric age, or pass through if already a string."""
if isinstance(age, str):
return age.strip()
for grp in normative_df["Age"].unique():
grp = grp.strip()
if "-" in grp:
lo, hi = grp.split("-", 1)
try:
lo_i, hi_i = int(lo), int(hi)
except ValueError:
continue
if lo_i <= age <= hi_i:
return grp
elif grp.endswith("+"):
try:
lo_i = int(grp[:-1])
except ValueError:
continue
if age >= lo_i:
return grp
raise ValueError(f"No normative age group found for age {age!r}.")
def load_normative_table(path):
path = pathlib.Path(path)
if not path.exists():
raise FileNotFoundError(path)
# columns to keep as strings
str_cols = ["Age", "area", "gender", "Bmi", "Biomarkers", "nb_category"]
# columns to cast to floats (recovering numbers from any date‐formatted cells)
float_cols = [
"min",
"max",
"median",
"q1",
"q3",
"iqr",
"mad",
"mean",
"sd",
"se",
"ci",
]
def parse_num(x):
# Excel‐formatted dates get parsed into datetime; map back to original float:
if isinstance(x, datetime):
# if year is in the future (e.g. 3183 → original was 3183.xx),
# treat year as integer part and month as two‐digit fractional
if x.year > datetime.now().year:
return x.year + x.month / 100
# otherwise (small numbers like 5.06 → parsed as 2025-06-05),
# use day as integer and month as two‐digit fractional
return x.day + x.month / 100
# non‐dates: just a normal float cast (coerce errors to NA)
try:
return float(x)
except Exception:
return pd.NA
# build your converters
converters = {col: str for col in str_cols}
converters.update({col: parse_num for col in float_cols})
# read the normative table (Excel or CSV) with our converters
if path.suffix.lower() == ".csv":
df = pd.read_csv(path, converters=converters)
else:
df = pd.read_excel(path, converters=converters)
# ensure string cols are truly str dtype
for c in str_cols:
df[c] = df[c].astype(str)
df.columns = df.columns.str.strip()
return df
###############################################################################
# Core calculus
###############################################################################
def _extract_stats(
normative_df: pd.DataFrame,
biomarker: str,
age_group: str,
region: str,
gender: str,
bmi_category: str,
) -> Dict[str, Union[float, str]]:
"""Return all summary statistics for the requested stratum."""
mask = (
(normative_df["Biomarkers"].str.lower() == biomarker.lower())
& (normative_df["Age"].str.lower() == age_group.lower())
& (normative_df["area"].str.lower() == region.lower())
& (normative_df["gender"].str.lower() == gender.lower())
& (normative_df["Bmi"].str.lower() == bmi_category.lower())
)
subset = normative_df.loc[mask]
if subset.empty:
raise KeyError("No normative stats found for the specified stratum.")
if len(subset) > 1:
warnings.warn(
"Multiple normative rows found; using the first one (check your table)."
)
row = subset.iloc[0]
# Some versions of the table label sample size as "n" instead of "nb_category"
n_col = "nb_category" if "nb_category" in row else "n"
n_raw = row[n_col]
n = str(row[n_col])
return {
"median": float(row["median"]),
"q1": float(row["q1"]),
"q3": float(row["q3"]),
"iqr": float(row["iqr"]),
"mad": float(row["mad"]),
"mean": float(row["mean"]),
"sd": float(row["sd"]),
"se": float(row["se"]),
"ci": float(row["ci"]),
"n": n,
}
def z_score(value: float, mean: float, sd: float) -> float:
"""Compute z-score; returns NaN if SD is 0."""
if sd == 0:
return float("nan")
return (value - mean) / sd
def percentile_from_z(z: float) -> float:
"""Convert z-score to percentile (0-100)."""
return float(stats.norm.cdf(z) * 100)
def compute_normative_position(
*,
value: float,
biomarker: str,
age_group: Union[str, int],
region: str,
gender: str,
bmi: Union[str, float],
normative_df: pd.DataFrame,
) -> Dict[str, Union[float, str]]:
"""
Compute where a single measurement falls relative to a normative distribution.
Parameters
----------
value : float
Raw measurement for the specified biomarker.
biomarker : str
Name of the biomarker (must match a value in the "Biomarkers" column
of `normative_df`).
age_group : Union[str, int]
Either:
- A string age-group label (e.g. "40-49") matching `normative_df["Age"]`, or
- An integer age, which will be mapped into the correct age-group bracket.
region : str
Region name matching `normative_df["area"]` (case-insensitive).
gender : str
Gender label matching `normative_df["gender"]` (case-insensitive).
bmi : Union[str, float]
Either:
- A string BMI category (e.g. "Healthy"), or
- A numeric BMI value, which will be bucketed into WHO categories.
normative_df : pd.DataFrame
Table of normative summary statistics as returned by `load_normative_table`.
Returns
-------
Dict[str, Union[float, str]]
A dictionary containing:
- "z_score" (float): the computed z-score,
- "percentile" (float): the percentile (0–100),
- "mean" (float): the normative mean,
- "sd" (float): the normative standard deviation,
- "n" (str): the sample-size category string from the normative table.
- "median" (float): the normative median,
- "q1" (float): the first quartile,
- "q3" (float): the third quartile,
- "iqr" (float): the interquartile range,
- "mad" (float): the median absolute deviation,
- "se" (float): the standard error,
- "ci" (float): the confidence interval.
Raises
------
KeyError
If no matching stratum is found in `normative_df`.
ValueError
If an integer `age_group` cannot be mapped to any age bracket.
"""
# allow numeric age inputs by mapping them to the correct "Age" group
age_group_str = _categorize_age(age_group, normative_df)
bmi_cat = categorize_bmi(bmi)
stats_d = _extract_stats(
normative_df=normative_df,
biomarker=biomarker,
age_group=age_group_str,
region=region,
gender=gender,
bmi_category=bmi_cat,
)
z = z_score(value, stats_d["mean"], stats_d["sd"])
pct = percentile_from_z(z)
return {
"z_score": z,
"percentile": pct,
"mean": stats_d["mean"],
"sd": stats_d["sd"],
"n": stats_d["n"],
"median": stats_d["median"],
"q1": stats_d["q1"],
"q3": stats_d["q3"],
"iqr": stats_d["iqr"],
"mad": stats_d["mad"],
"se": stats_d["se"],
"ci": stats_d["ci"],
}
###############################################################################
# Batch processing helper
###############################################################################
def _compute_for_row(
row: pd.Series,
biomarker: str,
normative_df: pd.DataFrame,
age_col: str,
region_col: str,
gender_col: str,
bmi_col: str,
value_col: str,
):
try:
res = compute_normative_position(
value=row[value_col],
biomarker=biomarker,
age_group=row[age_col],
region=row[region_col],
gender=row[gender_col],
bmi=row[bmi_col],
normative_df=normative_df,
)
return pd.Series(
[res["z_score"], res["percentile"]],
index=[f"{biomarker}_z", f"{biomarker}_pct"],
)
except Exception as exc: # pragma: no cover
warnings.warn(str(exc))
return pd.Series(
[float("nan"), float("nan")], index=[f"{biomarker}_z", f"{biomarker}_pct"]
)
def add_normative_columns(
df: pd.DataFrame,
*,
biomarkers: Iterable[str],
normative_df: pd.DataFrame,
age_col: str = "Age",
region_col: str = "area",
gender_col: str = "gender",
bmi_col: str = "Bmi",
value_cols: dict[str, str] | None = None,
output_prefixes: dict[str, str] | None = None,
) -> pd.DataFrame:
"""
Append z-score and percentile columns for multiple biomarkers, with optional
custom prefixes for the output column names.
Parameters
----------
df : pd.DataFrame
Participant-level data, must include demographic columns and raw biomarker
values.
biomarkers : Iterable[str]
List of biomarker names to process.
normative_df : pd.DataFrame
Normative summary table as loaded by `load_normative_table`.
age_col : str, default "Age"
Column in `df` containing age-group labels or integer ages.
region_col : str, default "area"
Column in `df` matching the "area" field in `normative_df`.
gender_col : str, default "gender"
Column in `df` matching the "gender" field in `normative_df`.
bmi_col : str, default "Bmi"
Column in `df` containing BMI values or categories.
value_cols : dict[str, str], optional
Mapping from each biomarker name to the column in `df` that holds its
raw numeric value. Defaults to identity mapping.
output_prefixes : dict[str, str], optional
Mapping from each biomarker name to the prefix to use for the output
columns. Defaults to using the biomarker name itself.
Returns
-------
pd.DataFrame
A copy of `df` with two new columns for each biomarker:
`<prefix>_z` and `<prefix>_pct`.
"""
value_cols = value_cols or {bm: bm for bm in biomarkers}
output_prefixes = output_prefixes or {}
out = df.copy()
for bm in biomarkers:
prefix = output_prefixes.get(bm, bm)
out[[f"{prefix}_z", f"{prefix}_pct"]] = df.apply(
_compute_for_row,
axis=1,
biomarker=bm,
normative_df=normative_df,
age_col=age_col,
region_col=region_col,
gender_col=gender_col,
bmi_col=bmi_col,
value_col=value_cols[bm],
)
return out
# Add a function for skew-corrected z-score calculation
def compute_skew_corrected_position(
value: float, mean: float, sd: float, median: float
) -> dict[str, float]:
"""Compute skew-corrected z-score and percentile using Pearson Type III distribution."""
# Pearson's moment coefficient of skewness
if sd == 0:
skewness = float("nan")
else:
skewness = 3 * (mean - median) / sd
# Build Pearson Type III distribution (gamma-based)
dist = stats.pearson3(skewness, loc=mean, scale=sd)
# Compute percentile under skewed model
p = dist.cdf(value)
# Back-transform to standard normal z-score
z_corr = stats.norm.ppf(p)
return {"z_skew_corrected": z_corr, "percentile_skew_corrected": float(p * 100)}