RS-AAAI / backend /utils /utils.py
peihsin0715
Fix data saving
fcb33d1
from transformers import TextDataset,DataCollatorForLanguageModeling,Trainer,TrainingArguments
import torch
import pandas as pd
from tqdm import tqdm
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import os
import sys
from transformers import (
AutoTokenizer,
AutoModelForCausalLM,
GPT2LMHeadModel,
GPT2Tokenizer,
)
def load_model_and_tokenizer(model_name: str):
import os
import torch
# 設定快取目錄
cache_dir = "/tmp/hf_models"
os.makedirs(cache_dir, exist_ok=True)
# 設定環境變數
os.environ['HF_HOME'] = cache_dir
os.environ['TRANSFORMERS_CACHE'] = cache_dir
# 確定設備
if torch.cuda.is_available():
device = torch.device("cuda")
dtype = torch.float16
else:
device = torch.device("cpu")
dtype = torch.float32
print(f"載入模型: {model_name}")
print(f"設備: {device}, 精度: {dtype}")
print(f"快取目錄: {cache_dir}")
try:
# 強制線上下載,不使用快取
download_kwargs = {
'cache_dir': cache_dir,
'force_download': False, # 改為 True 如果需要強制重新下載
'resume_download': True,
'local_files_only': False
}
model_kwargs = {
**download_kwargs,
'torch_dtype': dtype,
'low_cpu_mem_usage': True,
}
if device.type == "cuda":
model_kwargs['device_map'] = "auto"
# 根據模型類型載入
if model_name in {"gpt2", "openai-community/gpt2"}:
print("使用 GPT2 專用載入器")
tokenizer = GPT2Tokenizer.from_pretrained(model_name, **download_kwargs)
model = GPT2LMHeadModel.from_pretrained(model_name, **model_kwargs)
else:
print("使用 Auto 載入器")
tokenizer = AutoTokenizer.from_pretrained(model_name, **download_kwargs)
model = AutoModelForCausalLM.from_pretrained(model_name, **model_kwargs)
# 設定 pad token
if tokenizer.pad_token is None and tokenizer.eos_token is not None:
tokenizer.pad_token = tokenizer.eos_token
if hasattr(model.config, 'pad_token_id') and model.config.pad_token_id is None:
if hasattr(model.config, 'eos_token_id') and model.config.eos_token_id is not None:
model.config.pad_token_id = model.config.eos_token_id
# 如果不是 auto device_map,手動移動到設備
if device.type != "cuda":
model = model.to(device)
print(f"✓ 成功載入模型 {model_name}")
return tokenizer, model, device
except Exception as e:
import traceback
print(f"載入失敗: {str(e)}")
print(f"完整錯誤: {traceback.format_exc()}")
# 嘗試替代方案
if model_name == "openai-community/gpt2":
print("嘗試使用 'gpt2' 替代...")
return load_model_and_tokenizer("gpt2")
raise RuntimeError(f"無法載入模型 '{model_name}': {e}")
except Exception as e:
# 提供更詳細的錯誤信息
import traceback
print(f"Error loading model {model_name}: {str(e)}")
print(f"Traceback: {traceback.format_exc()}")
raise RuntimeError(f"Failed to load model '{model_name}': {e}")
def finetune(train_texts, tokenizer, model, num_epochs=20, output_dir='/temp/'):
train_path = f"/tmp/train.txt"
with open(train_path, "w", encoding="utf-8") as f:
for text in train_texts:
f.write(text.strip() + "\n")
train_dataset = TextDataset(tokenizer=tokenizer, file_path=train_path, block_size=128)
data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)
training_args = TrainingArguments(
output_dir=output_dir,
overwrite_output_dir=True,
per_device_train_batch_size=1,
num_train_epochs=num_epochs,
save_steps=500,
save_total_limit=2,
logging_dir='./logs',
logging_steps=10,
report_to="none"
)
trainer = Trainer(
model=model,
args=training_args,
data_collator=data_collator,
train_dataset=train_dataset,
)
trainer.train()
return model
def generate_topk_samples(model, df_table, tokenizer, device, top_k=10):
model.eval()
flat_results = []
df_table["prompts"] = df_table["prompts"].apply(lambda x: x[0] if isinstance(x, list) else x)
for idx, row in tqdm(df_table.iterrows(), total=len(df_table), desc="Generating samples"):
prompt = row["prompts"]
inputs = tokenizer(
prompt,
return_tensors="pt",
truncation=True,
padding=True
).to(device)
with torch.no_grad():
outputs = model.generate(
input_ids=inputs["input_ids"],
attention_mask=inputs["attention_mask"],
do_sample=True,
top_k=top_k,
max_new_tokens=20,
top_p=1.0,
num_return_sequences=top_k,
pad_token_id=tokenizer.eos_token_id
)
for out in outputs:
full_text = tokenizer.decode(out, skip_special_tokens=True).strip()
flat_results.append({
"domain": row["domain"],
"name": row["name"],
"category": row["category"],
"prompts": prompt,
"wikipedia": row["wikipedia"],
"generated": full_text
})
return pd.DataFrame(flat_results)
def evaluate_generated_outputs(
table: pd.DataFrame,
device,
task: str = "sentiment",
toxicity_model_choice: str = "detoxify",
text_col: str = "generated",
) -> pd.DataFrame:
assert text_col in table.columns, f"'{text_col}' not found in table columns"
pipe_device = 0 if (isinstance(device, torch.device) and device.type == "cuda") else -1
df = table.copy()
texts = df[text_col].fillna("").astype(str).tolist()
task = (task or "sentiment").lower()
if task == "sentiment":
print("Using default sentiment classifier: lxyuan/distilbert-base-multilingual-cased-sentiments-student")
tok = AutoTokenizer.from_pretrained("lxyuan/distilbert-base-multilingual-cased-sentiments-student")
mdl = AutoModelForSequenceClassification.from_pretrained("lxyuan/distilbert-base-multilingual-cased-sentiments-student").to(device).eval()
scores = []
for text in tqdm(texts, desc="Scoring (sentiment)"):
if not text.strip():
scores.append(0.5)
continue
inputs = tok(text, return_tensors="pt", truncation=True, padding=True).to(device)
with torch.no_grad():
logits = mdl(**inputs).logits
probs = F.softmax(logits, dim=1).squeeze(0).tolist()
val = (probs[2] - probs[0] + 1.0) / 2.0
scores.append(float(val))
df["sentiment_score"] = scores
return df
elif task == "regard":
print("Using default regard classifier: sasha/regardv3")
clf = pipeline("text-classification", model="sasha/regardv3", device=pipe_device, top_k=None)
def _safe_relabel(text: str):
try:
out = clf(text)
if isinstance(out, list):
out = out[0] if out else {}
if isinstance(out, dict) and "label" in out and "score" in out:
return {out["label"].lower(): float(out["score"])}
if isinstance(out, list) and out and isinstance(out[0], dict) and "label" in out[0]:
d = {}
for item in out:
d[item["label"].lower()] = float(item["score"])
return d
except Exception:
pass
return {"positive": 0.5, "negative": 0.5}
temp = []
for text in tqdm(texts, desc="Scoring (regard)"):
res = _safe_relabel(text)
pos = float(res.get("positive", 0.5))
neg = float(res.get("negative", 0.5))
temp.append(pos - neg + 1.0)
df["regard_score"] = temp
df["sentiment_score"] = df["regard_score"]
return df
elif task == "stereotype":
print("Using default stereotype classifier: holistic-ai/stereotype-deberta-v3-base-tasksource-nli")
clf = pipeline("text-classification", model="holistic-ai/stereotype-deberta-v3-base-tasksource-nli", device=pipe_device, top_k=None)
def _safe_relabel(text: str):
try:
out = clf(text)
if isinstance(out, list) and out and isinstance(out[0], dict) and "label" in out[0]:
d = {}
for item in out:
d[item["label"].lower()] = float(item["score"])
return d
if isinstance(out, dict) and "label" in out:
return {out["label"].lower(): float(out.get("score", 0.0))}
except Exception:
pass
return {
"stereotype_gender": 0.0,
"stereotype_religion": 0.0,
"stereotype_profession": 0.0,
"stereotype_race": 0.0,
}
g_list, r_list, p_list, race_list = [], [], [], []
for text in tqdm(texts, desc="Scoring (stereotype)"):
d = _safe_relabel(text)
g_list.append(float(d.get("stereotype_gender", 0.0)))
r_list.append(float(d.get("stereotype_religion", 0.0)))
p_list.append(float(d.get("stereotype_profession", 0.0)))
race_list.append(float(d.get("stereotype_race", 0.0)))
df["stereotype_gender_score"] = g_list
df["stereotype_religion_score"] = r_list
df["stereotype_profession_score"] = p_list
df["stereotype_race_score"] = race_list
df["sentiment_score"] = df["stereotype_gender_score"]
return df
elif task == "personality":
print("Using default personality classifier: Navya1602/editpersonality_classifier")
clf = pipeline("text-classification", model="Navya1602/editpersonality_classifier", device=pipe_device, top_k=None)
traits = ["extraversion", "neuroticism", "agreeableness", "conscientiousness", "openness"]
def _safe_relabel(text: str):
try:
out = clf(text)
if isinstance(out, list) and out and isinstance(out[0], dict) and "label" in out[0]:
d = {}
for item in out:
d[item["label"].lower()] = float(item["score"])
return d
if isinstance(out, dict) and "label" in out:
return {out["label"].lower(): float(out.get("score", 0.0))}
except Exception:
pass
return {t: 0.2 for t in traits}
cols = {t: [] for t in traits}
for text in tqdm(texts, desc="Scoring (personality)"):
d = _safe_relabel(text)
for t in traits:
cols[t].append(float(d.get(t, 0.2)))
for t in traits:
df[f"{t}_score"] = cols[t]
df["sentiment_score"] = df[[f"{t}_score" for t in traits]].mean(axis=1)
return df
elif task == "toxicity":
if toxicity_model_choice == "detoxify":
print("Using unitary/toxic-bert model for toxicity classification")
clf = pipeline("text-classification", model="unitary/toxic-bert", device=pipe_device, top_k=None)
def _get_toxic_prob(text: str) -> float:
try:
out = clf(text)
if isinstance(out, list) and out:
d = {it["label"].lower(): float(it["score"]) for it in out}
return float(d.get("toxic", d.get("toxic/overall", 0.0)))
if isinstance(out, dict) and "label" in out:
return float(out["score"]) if out["label"].lower() == "toxic" else 0.0
except Exception:
pass
return 0.0
elif toxicity_model_choice == "junglelee":
print("Using JungleLee/bert-toxic-comment-classification for toxicity classification")
clf = pipeline("text-classification", model="JungleLee/bert-toxic-comment-classification", device=pipe_device)
def _get_toxic_prob(text: str) -> float:
try:
out = clf(text)
if isinstance(out, dict):
lbl = out.get("label", "").lower()
score = float(out.get("score", 0.0))
return score if "toxic" in lbl else 0.0
if isinstance(out, list) and out:
for it in out:
if "toxic" in it.get("label", "").lower():
return float(it.get("score", 0.0))
except Exception:
pass
return 0.0
else:
raise ValueError("Invalid toxicity_model_choice. Choose 'detoxify' or 'junglelee'.")
tox = []
for text in tqdm(texts, desc="Scoring (toxicity)"):
tox.append(_get_toxic_prob(text))
df["toxicity_score"] = tox
df["sentiment_score"] = df["toxicity_score"]
return df
else:
raise ValueError(f"Unknown task '{task}'. Use one of: sentiment | regard | stereotype | personality | toxicity")
import numpy as np
import pandas as pd
from typing import List, Dict, Optional
def _generate_cross_category_cf(base_df, text_col, name_col, category_col, num_cf_per_row):
categories = base_df[category_col].unique().tolist()
category_names = {}
for cat in categories:
category_names[cat] = base_df[base_df[category_col] == cat][name_col].unique().tolist()
print(f"Categories for CF generation: {[f'{cat}({len(names)})' for cat, names in category_names.items()]}")
cf_rows = []
for idx, row in base_df.iterrows():
original_text = row[text_col]
original_name = row[name_col]
original_category = row[category_col]
original_name_clean = original_name.replace("_", " ")
other_categories = [cat for cat in categories if cat != original_category]
for target_category in other_categories:
target_names = category_names[target_category]
if len(target_names) == 0:
continue
num_to_sample = min(num_cf_per_row // len(other_categories) + 1, len(target_names))
if num_to_sample == 0:
continue
sampled_names = np.random.choice(target_names, size=num_to_sample, replace=False)
for new_name in sampled_names:
new_name_clean = new_name.replace("_", " ")
new_text = original_text.replace(original_name_clean, new_name_clean, 1)
if new_text == original_text:
original_parts = original_name_clean.split()
for part in original_parts:
if len(part) > 2:
new_text = original_text.replace(part, new_name_clean, 1)
if new_text != original_text:
break
if new_text == original_text:
continue
new_row = row.copy()
new_row[name_col] = new_name
new_row[text_col] = new_text
new_row[category_col] = target_category
new_row["original_category"] = original_category
new_row["cf_type"] = f"{original_category}->{target_category}"
cf_rows.append(new_row)
counterfactual_df = pd.DataFrame(cf_rows)
if len(counterfactual_df) > 0:
cf_stats = counterfactual_df["cf_type"].value_counts()
print(f"CF generation stats:")
for cf_type, count in cf_stats.items():
print(f" {cf_type}: {count}")
augmented_df = pd.concat([base_df, counterfactual_df], ignore_index=True)
print(f"\nAugmentation Finished: Original {len(base_df)} Added {len(counterfactual_df)} ")
print(f"Total data len: {len(augmented_df)}")
return augmented_df
def auto_detect_cf_method(base_df, category_col="category"):
categories = set(base_df[category_col].unique())
if {"American_actors", "American_actresses"}.issubset(categories):
return "actors_actresses"
else:
return "cross_category"
class Tee:
def __init__(self, *streams):
self.streams = streams
def write(self, data):
for stream in self.streams:
stream.write(data)
stream.flush()
def flush(self):
for stream in self.streams:
stream.flush()
def generate_counterfactual_augmentations(base_df, text_col="generated", name_col="name", category_col="category", num_cf_per_row=3):
categories = base_df[category_col].unique().tolist()
category_names = {}
for cat in categories:
category_names[cat] = base_df[base_df[category_col] == cat][name_col].unique().tolist()
print(f"Categories for CF generation: {[f'{cat}({len(names)})' for cat, names in category_names.items()]}")
if "American_actors" in categories and "American_actresses" in categories:
return _generate_actors_actresses_cf(base_df, text_col, name_col, category_col, num_cf_per_row, category_names)
else:
return _generate_cross_category_cf(base_df, text_col, name_col, category_col, num_cf_per_row, category_names)
def _generate_actors_actresses_cf(base_df, text_col, name_col, category_col, num_cf_per_row, category_names):
male_names = category_names.get("American_actors", [])
female_names = category_names.get("American_actresses", [])
cf_rows = []
for idx, row in base_df.iterrows():
original_text = row[text_col]
original_name = row[name_col]
category = row[category_col]
original_name_clean = original_name.replace("_", " ")
if category == "American_actors":
swap_pool = female_names
new_category = "American_actresses"
elif category == "American_actresses":
swap_pool = male_names
new_category = "American_actors"
else:
continue
if len(swap_pool) == 0:
continue
sampled_names = np.random.choice(swap_pool, size=min(num_cf_per_row, len(swap_pool)), replace=False)
for new_name in sampled_names:
new_name_clean = new_name.replace("_", " ")
new_text = original_text.replace(original_name_clean, new_name_clean, 1)
if new_text == original_text:
continue
new_row = row.copy()
new_row[name_col] = new_name
new_row[text_col] = new_text
new_row[category_col] = new_category
new_row["original_category"] = category
cf_rows.append(new_row)
counterfactual_df = pd.DataFrame(cf_rows)
augmented_df = pd.concat([base_df, counterfactual_df], ignore_index=True)
print(f"\nAugmentation Finished: Original {len(base_df)} Added {len(counterfactual_df)} ")
print(f"Total data len: {len(augmented_df)}")
return augmented_df
def _generate_cross_category_cf(base_df, text_col, name_col, category_col, num_cf_per_row, category_names):
categories = list(category_names.keys())
cf_rows = []
for idx, row in base_df.iterrows():
original_text = row[text_col]
original_name = row[name_col]
original_category = row[category_col]
original_name_clean = original_name.replace("_", " ")
other_categories = [cat for cat in categories if cat != original_category]
for target_category in other_categories:
target_names = category_names[target_category]
if len(target_names) == 0:
continue
num_to_sample = min(max(1, num_cf_per_row // len(other_categories)), len(target_names))
sampled_names = np.random.choice(target_names, size=num_to_sample, replace=False)
for new_name in sampled_names:
new_name_clean = new_name.replace("_", " ")
new_text = original_text.replace(original_name_clean, new_name_clean, 1)
if new_text == original_text:
original_parts = original_name_clean.split()
for part in original_parts:
if len(part) > 2:
new_text = original_text.replace(part, new_name_clean, 1)
if new_text != original_text:
break
if new_text == original_text:
continue
new_row = row.copy()
new_row[name_col] = new_name
new_row[text_col] = new_text
new_row[category_col] = target_category
new_row["original_category"] = original_category
cf_rows.append(new_row)
counterfactual_df = pd.DataFrame(cf_rows)
augmented_df = pd.concat([base_df, counterfactual_df], ignore_index=True)
print(f"\nAugmentation Finished: Original {len(base_df)} Added {len(counterfactual_df)} ")
print(f"Total data len: {len(augmented_df)}")
return augmented_df
def _ensure_plot_saved(
df,
score_col: str,
basename: str,
group_col: str = None,
target: float = None,
bins: int = 30,
) -> str:
os.makedirs("/tmp", exist_ok=True)
path = os.path.join("/tmp", f"{basename}.png")
plt.figure(figsize=(8, 5))
data = df[score_col].dropna().values
if group_col and group_col in df.columns:
for g, sub in df.groupby(group_col):
vals = sub[score_col].dropna().values
if len(vals) == 0:
continue
plt.hist(vals, bins=bins, alpha=0.4, label=f"{g} (n={len(vals)}, μ={np.mean(vals):.3f})", density=True)
else:
plt.hist(data, bins=bins, alpha=0.6, density=True, label=f"All (n={len(data)}, μ={np.mean(data):.3f})")
if len(data):
m = float(np.mean(data))
plt.axvline(m, linestyle="--", linewidth=2, label=f"mean={m:.3f}")
if target is not None:
plt.axvline(target, linestyle="-.", linewidth=2, label=f"target={target:.3f}")
plt.xlabel(score_col)
plt.ylabel("density")
plt.title(basename.replace("_", " "))
plt.legend(loc="best")
plt.tight_layout()
plt.savefig(path, dpi=160)
plt.close()
return path