Sound / embedding_transform_eval.py
Wendy-Fly's picture
add LLM (gemini/gpt) prediction columns to comparison
bddfa08 verified
#!/usr/bin/env python3
"""复用 batch_top100_match.py 缓存的 embedding,试几种空间变换,看哪个最能提分。
方法:
raw - 不变换(baseline,用 cosine sim → Top-K 投票)
severity_axis - 单方向投影:方向 = severe ruler 均值 − 非 severe ruler 均值
ridge - 岭回归:ruler_emb → ruler_score,拟合 (w, b),用 w·emb+b 预测
lasso - L1 回归
lda - Fisher LDA:把 ruler 二值化(rank<106 vs >=106)找投影方向
pca128_ridge - 先 PCA 到 128 维再 ridge
knn_score - kNN 回归(用 ruler 100 邻居均 score 当预测,本质和 batch_top100 等价)
要求:
- cache_emb/csv_*.npy + cache_emb/ruler_*.npy(之前跑过 batch_top100_match.py 自动缓存)
- 标签 csv(拿 golden_set.csv 的 label 列做 GT)
- ruler_items.json(拿 score / rank)
用法:
python3 embedding_transform_eval.py
python3 embedding_transform_eval.py --pca-dim 256 --boundary-rank 106
"""
import argparse
import json
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.linear_model import Ridge, Lasso
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
DEFAULTS = dict(
cache_dir = "cache_emb",
csv = "/mnt/bn/tns-algo-ue-my/biaowu/aipf_dm_metric/example/yss_ruler_eval/data/aipf_golden_set.csv",
ruler = "/mnt/bn/tns-algo-ue-my/biaowu/aipf_dm_metric/ranking_moderation/data/dm/youth_sexual_and_physical_abuse_aigt_v009/ranking_bucket/ruler_items.json",
pos_label = "Y",
boundary_rank = 106,
)
def load_npy_pair(cache_dir, n_csv, n_ruler, max_length=4096):
"""根据 batch_top100_match.py 的命名规则找缓存。"""
cd = Path(cache_dir)
csvs = list(cd.glob(f"csv_*_n{n_csv}_L{max_length}.npy"))
rulers = list(cd.glob(f"ruler_*_n{n_ruler}_L{max_length}.npy"))
if not csvs or not rulers:
raise FileNotFoundError(
f"找不到缓存。期望 {cd}/csv_*_n{n_csv}_L{max_length}.npy 和 ruler_*_n{n_ruler}_L{max_length}.npy"
)
return np.load(csvs[0]), np.load(rulers[0])
def load_ruler_meta(path):
with open(path) as f:
data = json.load(f)
items = data if isinstance(data, list) else (data.get("items") or data.get("ruler_items") or data.get("data") or [])
ranks = np.array([int(it["rank"]) for it in items])
scores = np.array([float(it["score"]) for it in items])
return ranks, scores
def metrics(preds, gts):
tp = int(((preds == 1) & (gts == 1)).sum())
fp = int(((preds == 1) & (gts == 0)).sum())
tn = int(((preds == 0) & (gts == 0)).sum())
fn = int(((preds == 0) & (gts == 1)).sum())
p = tp/(tp+fp) if tp+fp else 0.0
r = tp/(tp+fn) if tp+fn else 0.0
f = 2*p*r/(p+r) if p+r else 0.0
a = (tp+tn)/len(preds)
return tp, fp, tn, fn, p, r, f, a
def best_threshold_f1(scores, gts):
"""扫所有可能阈值,找最大化 F1 的那个。返回 (f1, thr, p, r)。"""
cands = sorted(set(scores.tolist()))
best = (-1.0, None, None, None)
for c in cands:
preds = (scores >= c).astype(int)
_, _, _, _, p, r, f, _ = metrics(preds, gts)
if f > best[0]:
best = (f, c, p, r)
return best
def fit_severity_axis(emb, ruler_score, ruler_rank, boundary_rank):
"""方向 = 严重组均值 - 非严重组均值;投影 = emb @ direction。"""
severe = emb[ruler_rank < boundary_rank].mean(axis=0)
notsev = emb[ruler_rank >= boundary_rank].mean(axis=0)
direction = severe - notsev
direction = direction / (np.linalg.norm(direction) + 1e-12)
return direction
def project(emb, direction):
return emb @ direction
def main():
p = argparse.ArgumentParser()
p.add_argument("--cache-dir", default=DEFAULTS["cache_dir"])
p.add_argument("--csv", default=DEFAULTS["csv"])
p.add_argument("--ruler", default=DEFAULTS["ruler"])
p.add_argument("--positive-label", default=DEFAULTS["pos_label"])
p.add_argument("--boundary-rank", type=int, default=DEFAULTS["boundary_rank"])
p.add_argument("--pca-dim", type=int, default=128)
p.add_argument("--max-length", type=int, default=4096)
p.add_argument("--output-jsonl", default="transform_eval.jsonl")
args = p.parse_args()
print("[1] load labels and ruler meta")
df = pd.read_csv(args.csv, keep_default_na=False)
gts = (df[df.columns[df.columns.tolist().index("label")]]
.astype(str).str.upper().eq(args.positive_label.upper()).astype(int).values)
ruler_rank, ruler_score = load_ruler_meta(args.ruler)
n_csv = len(gts)
n_ruler = len(ruler_rank)
print(f" csv={n_csv}, ruler={n_ruler}, pos rate={gts.mean():.2%}")
print("[2] load embeddings from cache")
csv_emb, ruler_emb = load_npy_pair(args.cache_dir, n_csv, n_ruler, args.max_length)
print(f" csv_emb={csv_emb.shape}, ruler_emb={ruler_emb.shape}")
# 已经是 L2 归一化的(前面脚本里做了)
methods = {}
# --- raw cosine top-K weighted ---
K = 100
sims = csv_emb @ ruler_emb.T
top_idx = np.argpartition(-sims, K-1, axis=1)[:, :K]
row = np.arange(n_csv)[:, None]
top_sims = sims[row, top_idx]
top_scores = ruler_score[top_idx]
raw_weighted = (top_sims * top_scores).sum(axis=1) / np.maximum(top_sims.sum(axis=1), 1e-12)
methods["raw cosine + top100 weighted score"] = raw_weighted
# --- severity axis projection ---
direction = fit_severity_axis(ruler_emb, ruler_score, ruler_rank, args.boundary_rank)
methods["severity_axis projection (1D)"] = project(csv_emb, direction)
# --- ridge regression: emb -> score ---
rid = Ridge(alpha=1.0).fit(ruler_emb, ruler_score)
methods["ridge: emb -> score"] = rid.predict(csv_emb)
# --- lasso ---
las = Lasso(alpha=0.001, max_iter=5000).fit(ruler_emb, ruler_score)
methods["lasso: emb -> score"] = las.predict(csv_emb)
# --- LDA: severe/notsevere ---
y_bin = (ruler_rank < args.boundary_rank).astype(int)
lda = LDA().fit(ruler_emb, y_bin)
methods["LDA: severe vs not"] = lda.decision_function(csv_emb)
# --- PCA -> ridge ---
pca = PCA(n_components=min(args.pca_dim, n_ruler-1, ruler_emb.shape[1])).fit(ruler_emb)
rid_p = Ridge(alpha=1.0).fit(pca.transform(ruler_emb), ruler_score)
methods[f"PCA{pca.n_components_} + ridge"] = rid_p.predict(pca.transform(csv_emb))
# --- knn average top-100 ruler score ---
methods["kNN-100 mean(ruler_score)"] = top_scores.mean(axis=1)
# --- LLM 列(如果 csv 里带了 AIPF 跑出来的位置/score)---
BOUNDARY_SCORE_DEFAULT = 44.72
llm_cols = [
("score_gemini_2.5_flash", None), # 已经是 score,越大越严
("position_gemini_2.5_flash", "neg"), # position 越小越严,取负
("score_gpt_4.1", None),
("position_gpt_4.1", "neg"),
]
for col, mode in llm_cols:
if col not in df.columns:
continue
raw = pd.to_numeric(df[col], errors="coerce").values
# NaN 用列中位数填,避免阈值扫描出问题
med = np.nanmedian(raw)
if np.isnan(med):
continue
raw = np.where(np.isnan(raw), med, raw)
if mode == "neg":
methods[f"LLM: {col} (-position)"] = -raw
else:
methods[f"LLM: {col}"] = raw
# ---- 评分输出 ----
print(f"\n{'method':<40}{'best F1':>10}{'thr':>10}{'P':>9}{'R':>9}{'AUC?':>10}")
print("-" * 88)
rows = []
for name, scores in methods.items():
f1, thr, prec, rec = best_threshold_f1(scores, gts)
try:
from sklearn.metrics import roc_auc_score
auc = roc_auc_score(gts, scores)
except Exception:
auc = float("nan")
rows.append((name, f1, thr, prec, rec, auc))
print(f"{name:<40}{f1:>10.4f}{thr:>10.4f}{prec:>9.4f}{rec:>9.4f}{auc:>10.4f}")
# 写 jsonl 把每条样本 7 个分数都留下
print(f"\n[write] {args.output_jsonl}")
with open(args.output_jsonl, "w") as f:
for i in range(n_csv):
rec = {"i": i, "ground_truth": int(gts[i])}
for name, scores in methods.items():
rec[name] = float(scores[i])
f.write(json.dumps(rec) + "\n")
print("\n说明:")
print("- AUC 反映分布可分性,跟阈值无关。AUC 高 = 这个变换的输出能更好把正/负分开。")
print("- best F1 是扫阈值找到的上限,是这个变换的理论最佳。")
if __name__ == "__main__":
main()