|
|
""" |
|
|
normative_calculator.py - v2 |
|
|
|
|
|
Utility functions for computing z-scores and percentiles for any biomarker |
|
|
contained in *Table_1_summary_measure.xlsx*. |
|
|
|
|
|
|
|
|
|
|
|
Author: Lars Masanneck 06-05-2025 |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import math |
|
|
import pathlib |
|
|
import warnings |
|
|
from typing import Dict, Iterable, List, Sequence, Union |
|
|
|
|
|
import pandas as pd |
|
|
from scipy import stats |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__all__ = [ |
|
|
"load_normative_table", |
|
|
"compute_normative_position", |
|
|
"add_normative_columns", |
|
|
"categorize_bmi", |
|
|
"compute_skew_corrected_position", |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_BMI_BOUNDS: List[tuple[float, float, str]] = [ |
|
|
(0, 18.5, "Underweight"), |
|
|
(18.5, 25, "Healthy"), |
|
|
(25, 30, "Overweight"), |
|
|
(30, math.inf, "Obesity"), |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _categorize(value: float, bounds: Sequence[tuple]) -> str: |
|
|
"""Return category *label* for *value* given (lower, upper, label) tuples.""" |
|
|
for lower, upper, label in bounds: |
|
|
if lower <= value < upper: |
|
|
return label |
|
|
raise ValueError(f"{value} outside defined bounds.") |
|
|
|
|
|
|
|
|
def categorize_bmi(bmi: Union[str, float]) -> str: |
|
|
"""Map numeric BMI to the table's BMI category strings.""" |
|
|
if isinstance(bmi, str): |
|
|
return bmi.strip().capitalize() |
|
|
return _categorize(float(bmi), _BMI_BOUNDS) |
|
|
|
|
|
|
|
|
def _categorize_age(age: Union[str, int], normative_df: pd.DataFrame) -> str: |
|
|
"""Return an age‐group string for a numeric age, or pass through if already a string.""" |
|
|
if isinstance(age, str): |
|
|
return age.strip() |
|
|
for grp in normative_df["Age"].unique(): |
|
|
grp = grp.strip() |
|
|
if "-" in grp: |
|
|
lo, hi = grp.split("-", 1) |
|
|
try: |
|
|
lo_i, hi_i = int(lo), int(hi) |
|
|
except ValueError: |
|
|
continue |
|
|
if lo_i <= age <= hi_i: |
|
|
return grp |
|
|
elif grp.endswith("+"): |
|
|
try: |
|
|
lo_i = int(grp[:-1]) |
|
|
except ValueError: |
|
|
continue |
|
|
if age >= lo_i: |
|
|
return grp |
|
|
raise ValueError(f"No normative age group found for age {age!r}.") |
|
|
|
|
|
|
|
|
def load_normative_table(path): |
|
|
path = pathlib.Path(path) |
|
|
if not path.exists(): |
|
|
raise FileNotFoundError(path) |
|
|
|
|
|
str_cols = ["Age", "area", "gender", "Bmi", "Biomarkers", "nb_category"] |
|
|
|
|
|
float_cols = [ |
|
|
"min", |
|
|
"max", |
|
|
"median", |
|
|
"q1", |
|
|
"q3", |
|
|
"iqr", |
|
|
"mad", |
|
|
"mean", |
|
|
"sd", |
|
|
"se", |
|
|
"ci", |
|
|
] |
|
|
|
|
|
def parse_num(x): |
|
|
|
|
|
if isinstance(x, datetime): |
|
|
|
|
|
|
|
|
if x.year > datetime.now().year: |
|
|
return x.year + x.month / 100 |
|
|
|
|
|
|
|
|
return x.day + x.month / 100 |
|
|
|
|
|
try: |
|
|
return float(x) |
|
|
except Exception: |
|
|
return pd.NA |
|
|
|
|
|
|
|
|
converters = {col: str for col in str_cols} |
|
|
converters.update({col: parse_num for col in float_cols}) |
|
|
|
|
|
|
|
|
if path.suffix.lower() == ".csv": |
|
|
df = pd.read_csv(path, converters=converters) |
|
|
else: |
|
|
df = pd.read_excel(path, converters=converters) |
|
|
|
|
|
|
|
|
for c in str_cols: |
|
|
df[c] = df[c].astype(str) |
|
|
df.columns = df.columns.str.strip() |
|
|
|
|
|
return df |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_stats( |
|
|
normative_df: pd.DataFrame, |
|
|
biomarker: str, |
|
|
age_group: str, |
|
|
region: str, |
|
|
gender: str, |
|
|
bmi_category: str, |
|
|
) -> Dict[str, Union[float, str]]: |
|
|
"""Return all summary statistics for the requested stratum.""" |
|
|
mask = ( |
|
|
(normative_df["Biomarkers"].str.lower() == biomarker.lower()) |
|
|
& (normative_df["Age"].str.lower() == age_group.lower()) |
|
|
& (normative_df["area"].str.lower() == region.lower()) |
|
|
& (normative_df["gender"].str.lower() == gender.lower()) |
|
|
& (normative_df["Bmi"].str.lower() == bmi_category.lower()) |
|
|
) |
|
|
subset = normative_df.loc[mask] |
|
|
if subset.empty: |
|
|
raise KeyError("No normative stats found for the specified stratum.") |
|
|
if len(subset) > 1: |
|
|
warnings.warn( |
|
|
"Multiple normative rows found; using the first one (check your table)." |
|
|
) |
|
|
row = subset.iloc[0] |
|
|
|
|
|
n_col = "nb_category" if "nb_category" in row else "n" |
|
|
n_raw = row[n_col] |
|
|
n = str(row[n_col]) |
|
|
|
|
|
return { |
|
|
"median": float(row["median"]), |
|
|
"q1": float(row["q1"]), |
|
|
"q3": float(row["q3"]), |
|
|
"iqr": float(row["iqr"]), |
|
|
"mad": float(row["mad"]), |
|
|
"mean": float(row["mean"]), |
|
|
"sd": float(row["sd"]), |
|
|
"se": float(row["se"]), |
|
|
"ci": float(row["ci"]), |
|
|
"n": n, |
|
|
} |
|
|
|
|
|
|
|
|
def z_score(value: float, mean: float, sd: float) -> float: |
|
|
"""Compute z-score; returns NaN if SD is 0.""" |
|
|
if sd == 0: |
|
|
return float("nan") |
|
|
return (value - mean) / sd |
|
|
|
|
|
|
|
|
def percentile_from_z(z: float) -> float: |
|
|
"""Convert z-score to percentile (0-100).""" |
|
|
return float(stats.norm.cdf(z) * 100) |
|
|
|
|
|
|
|
|
def compute_normative_position( |
|
|
*, |
|
|
value: float, |
|
|
biomarker: str, |
|
|
age_group: Union[str, int], |
|
|
region: str, |
|
|
gender: str, |
|
|
bmi: Union[str, float], |
|
|
normative_df: pd.DataFrame, |
|
|
) -> Dict[str, Union[float, str]]: |
|
|
""" |
|
|
Compute where a single measurement falls relative to a normative distribution. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
value : float |
|
|
Raw measurement for the specified biomarker. |
|
|
biomarker : str |
|
|
Name of the biomarker (must match a value in the "Biomarkers" column |
|
|
of `normative_df`). |
|
|
age_group : Union[str, int] |
|
|
Either: |
|
|
- A string age-group label (e.g. "40-49") matching `normative_df["Age"]`, or |
|
|
- An integer age, which will be mapped into the correct age-group bracket. |
|
|
region : str |
|
|
Region name matching `normative_df["area"]` (case-insensitive). |
|
|
gender : str |
|
|
Gender label matching `normative_df["gender"]` (case-insensitive). |
|
|
bmi : Union[str, float] |
|
|
Either: |
|
|
- A string BMI category (e.g. "Healthy"), or |
|
|
- A numeric BMI value, which will be bucketed into WHO categories. |
|
|
normative_df : pd.DataFrame |
|
|
Table of normative summary statistics as returned by `load_normative_table`. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
Dict[str, Union[float, str]] |
|
|
A dictionary containing: |
|
|
- "z_score" (float): the computed z-score, |
|
|
- "percentile" (float): the percentile (0–100), |
|
|
- "mean" (float): the normative mean, |
|
|
- "sd" (float): the normative standard deviation, |
|
|
- "n" (str): the sample-size category string from the normative table. |
|
|
- "median" (float): the normative median, |
|
|
- "q1" (float): the first quartile, |
|
|
- "q3" (float): the third quartile, |
|
|
- "iqr" (float): the interquartile range, |
|
|
- "mad" (float): the median absolute deviation, |
|
|
- "se" (float): the standard error, |
|
|
- "ci" (float): the confidence interval. |
|
|
|
|
|
Raises |
|
|
------ |
|
|
KeyError |
|
|
If no matching stratum is found in `normative_df`. |
|
|
ValueError |
|
|
If an integer `age_group` cannot be mapped to any age bracket. |
|
|
""" |
|
|
|
|
|
age_group_str = _categorize_age(age_group, normative_df) |
|
|
bmi_cat = categorize_bmi(bmi) |
|
|
stats_d = _extract_stats( |
|
|
normative_df=normative_df, |
|
|
biomarker=biomarker, |
|
|
age_group=age_group_str, |
|
|
region=region, |
|
|
gender=gender, |
|
|
bmi_category=bmi_cat, |
|
|
) |
|
|
z = z_score(value, stats_d["mean"], stats_d["sd"]) |
|
|
pct = percentile_from_z(z) |
|
|
return { |
|
|
"z_score": z, |
|
|
"percentile": pct, |
|
|
"mean": stats_d["mean"], |
|
|
"sd": stats_d["sd"], |
|
|
"n": stats_d["n"], |
|
|
"median": stats_d["median"], |
|
|
"q1": stats_d["q1"], |
|
|
"q3": stats_d["q3"], |
|
|
"iqr": stats_d["iqr"], |
|
|
"mad": stats_d["mad"], |
|
|
"se": stats_d["se"], |
|
|
"ci": stats_d["ci"], |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _compute_for_row( |
|
|
row: pd.Series, |
|
|
biomarker: str, |
|
|
normative_df: pd.DataFrame, |
|
|
age_col: str, |
|
|
region_col: str, |
|
|
gender_col: str, |
|
|
bmi_col: str, |
|
|
value_col: str, |
|
|
): |
|
|
try: |
|
|
res = compute_normative_position( |
|
|
value=row[value_col], |
|
|
biomarker=biomarker, |
|
|
age_group=row[age_col], |
|
|
region=row[region_col], |
|
|
gender=row[gender_col], |
|
|
bmi=row[bmi_col], |
|
|
normative_df=normative_df, |
|
|
) |
|
|
return pd.Series( |
|
|
[res["z_score"], res["percentile"]], |
|
|
index=[f"{biomarker}_z", f"{biomarker}_pct"], |
|
|
) |
|
|
except Exception as exc: |
|
|
warnings.warn(str(exc)) |
|
|
return pd.Series( |
|
|
[float("nan"), float("nan")], index=[f"{biomarker}_z", f"{biomarker}_pct"] |
|
|
) |
|
|
|
|
|
|
|
|
def add_normative_columns( |
|
|
df: pd.DataFrame, |
|
|
*, |
|
|
biomarkers: Iterable[str], |
|
|
normative_df: pd.DataFrame, |
|
|
age_col: str = "Age", |
|
|
region_col: str = "area", |
|
|
gender_col: str = "gender", |
|
|
bmi_col: str = "Bmi", |
|
|
value_cols: dict[str, str] | None = None, |
|
|
output_prefixes: dict[str, str] | None = None, |
|
|
) -> pd.DataFrame: |
|
|
""" |
|
|
Append z-score and percentile columns for multiple biomarkers, with optional |
|
|
custom prefixes for the output column names. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
df : pd.DataFrame |
|
|
Participant-level data, must include demographic columns and raw biomarker |
|
|
values. |
|
|
biomarkers : Iterable[str] |
|
|
List of biomarker names to process. |
|
|
normative_df : pd.DataFrame |
|
|
Normative summary table as loaded by `load_normative_table`. |
|
|
age_col : str, default "Age" |
|
|
Column in `df` containing age-group labels or integer ages. |
|
|
region_col : str, default "area" |
|
|
Column in `df` matching the "area" field in `normative_df`. |
|
|
gender_col : str, default "gender" |
|
|
Column in `df` matching the "gender" field in `normative_df`. |
|
|
bmi_col : str, default "Bmi" |
|
|
Column in `df` containing BMI values or categories. |
|
|
value_cols : dict[str, str], optional |
|
|
Mapping from each biomarker name to the column in `df` that holds its |
|
|
raw numeric value. Defaults to identity mapping. |
|
|
output_prefixes : dict[str, str], optional |
|
|
Mapping from each biomarker name to the prefix to use for the output |
|
|
columns. Defaults to using the biomarker name itself. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
pd.DataFrame |
|
|
A copy of `df` with two new columns for each biomarker: |
|
|
`<prefix>_z` and `<prefix>_pct`. |
|
|
""" |
|
|
value_cols = value_cols or {bm: bm for bm in biomarkers} |
|
|
output_prefixes = output_prefixes or {} |
|
|
out = df.copy() |
|
|
|
|
|
for bm in biomarkers: |
|
|
prefix = output_prefixes.get(bm, bm) |
|
|
out[[f"{prefix}_z", f"{prefix}_pct"]] = df.apply( |
|
|
_compute_for_row, |
|
|
axis=1, |
|
|
biomarker=bm, |
|
|
normative_df=normative_df, |
|
|
age_col=age_col, |
|
|
region_col=region_col, |
|
|
gender_col=gender_col, |
|
|
bmi_col=bmi_col, |
|
|
value_col=value_cols[bm], |
|
|
) |
|
|
|
|
|
return out |
|
|
|
|
|
|
|
|
|
|
|
def compute_skew_corrected_position( |
|
|
value: float, mean: float, sd: float, median: float |
|
|
) -> dict[str, float]: |
|
|
"""Compute skew-corrected z-score and percentile using Pearson Type III distribution.""" |
|
|
|
|
|
if sd == 0: |
|
|
skewness = float("nan") |
|
|
else: |
|
|
skewness = 3 * (mean - median) / sd |
|
|
|
|
|
dist = stats.pearson3(skewness, loc=mean, scale=sd) |
|
|
|
|
|
p = dist.cdf(value) |
|
|
|
|
|
z_corr = stats.norm.ppf(p) |
|
|
return {"z_skew_corrected": z_corr, "percentile_skew_corrected": float(p * 100)} |
|
|
|