|
from transformers import TextDataset,DataCollatorForLanguageModeling,Trainer,TrainingArguments |
|
import torch |
|
import pandas as pd |
|
from tqdm import tqdm |
|
import torch.nn.functional as F |
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline |
|
import matplotlib.pyplot as plt |
|
import seaborn as sns |
|
import numpy as np |
|
import os |
|
import sys |
|
from transformers import ( |
|
AutoTokenizer, |
|
AutoModelForCausalLM, |
|
GPT2LMHeadModel, |
|
GPT2Tokenizer, |
|
) |
|
|
|
def load_model_and_tokenizer(model_name: str): |
|
import os |
|
import torch |
|
|
|
|
|
cache_dir = "/tmp/hf_models" |
|
os.makedirs(cache_dir, exist_ok=True) |
|
|
|
|
|
os.environ['HF_HOME'] = cache_dir |
|
os.environ['TRANSFORMERS_CACHE'] = cache_dir |
|
|
|
|
|
if torch.cuda.is_available(): |
|
device = torch.device("cuda") |
|
dtype = torch.float16 |
|
else: |
|
device = torch.device("cpu") |
|
dtype = torch.float32 |
|
|
|
print(f"載入模型: {model_name}") |
|
print(f"設備: {device}, 精度: {dtype}") |
|
print(f"快取目錄: {cache_dir}") |
|
|
|
try: |
|
|
|
download_kwargs = { |
|
'cache_dir': cache_dir, |
|
'force_download': False, |
|
'resume_download': True, |
|
'local_files_only': False |
|
} |
|
|
|
model_kwargs = { |
|
**download_kwargs, |
|
'torch_dtype': dtype, |
|
'low_cpu_mem_usage': True, |
|
} |
|
|
|
if device.type == "cuda": |
|
model_kwargs['device_map'] = "auto" |
|
|
|
|
|
if model_name in {"gpt2", "openai-community/gpt2"}: |
|
print("使用 GPT2 專用載入器") |
|
tokenizer = GPT2Tokenizer.from_pretrained(model_name, **download_kwargs) |
|
model = GPT2LMHeadModel.from_pretrained(model_name, **model_kwargs) |
|
else: |
|
print("使用 Auto 載入器") |
|
tokenizer = AutoTokenizer.from_pretrained(model_name, **download_kwargs) |
|
model = AutoModelForCausalLM.from_pretrained(model_name, **model_kwargs) |
|
|
|
|
|
if tokenizer.pad_token is None and tokenizer.eos_token is not None: |
|
tokenizer.pad_token = tokenizer.eos_token |
|
|
|
if hasattr(model.config, 'pad_token_id') and model.config.pad_token_id is None: |
|
if hasattr(model.config, 'eos_token_id') and model.config.eos_token_id is not None: |
|
model.config.pad_token_id = model.config.eos_token_id |
|
|
|
|
|
if device.type != "cuda": |
|
model = model.to(device) |
|
|
|
print(f"✓ 成功載入模型 {model_name}") |
|
return tokenizer, model, device |
|
|
|
except Exception as e: |
|
import traceback |
|
print(f"載入失敗: {str(e)}") |
|
print(f"完整錯誤: {traceback.format_exc()}") |
|
|
|
|
|
if model_name == "openai-community/gpt2": |
|
print("嘗試使用 'gpt2' 替代...") |
|
return load_model_and_tokenizer("gpt2") |
|
|
|
raise RuntimeError(f"無法載入模型 '{model_name}': {e}") |
|
|
|
except Exception as e: |
|
|
|
import traceback |
|
print(f"Error loading model {model_name}: {str(e)}") |
|
print(f"Traceback: {traceback.format_exc()}") |
|
raise RuntimeError(f"Failed to load model '{model_name}': {e}") |
|
|
|
def finetune(train_texts, tokenizer, model, num_epochs=20, output_dir='/temp/'): |
|
train_path = f"/tmp/train.txt" |
|
|
|
with open(train_path, "w", encoding="utf-8") as f: |
|
for text in train_texts: |
|
f.write(text.strip() + "\n") |
|
|
|
train_dataset = TextDataset(tokenizer=tokenizer, file_path=train_path, block_size=128) |
|
data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False) |
|
|
|
training_args = TrainingArguments( |
|
output_dir=output_dir, |
|
overwrite_output_dir=True, |
|
per_device_train_batch_size=1, |
|
num_train_epochs=num_epochs, |
|
save_steps=500, |
|
save_total_limit=2, |
|
logging_dir='./logs', |
|
logging_steps=10, |
|
report_to="none" |
|
) |
|
|
|
trainer = Trainer( |
|
model=model, |
|
args=training_args, |
|
data_collator=data_collator, |
|
train_dataset=train_dataset, |
|
) |
|
|
|
trainer.train() |
|
|
|
return model |
|
|
|
def generate_topk_samples(model, df_table, tokenizer, device, top_k=10): |
|
model.eval() |
|
flat_results = [] |
|
|
|
df_table["prompts"] = df_table["prompts"].apply(lambda x: x[0] if isinstance(x, list) else x) |
|
|
|
for idx, row in tqdm(df_table.iterrows(), total=len(df_table), desc="Generating samples"): |
|
prompt = row["prompts"] |
|
|
|
inputs = tokenizer( |
|
prompt, |
|
return_tensors="pt", |
|
truncation=True, |
|
padding=True |
|
).to(device) |
|
|
|
with torch.no_grad(): |
|
outputs = model.generate( |
|
input_ids=inputs["input_ids"], |
|
attention_mask=inputs["attention_mask"], |
|
do_sample=True, |
|
top_k=top_k, |
|
max_new_tokens=20, |
|
top_p=1.0, |
|
num_return_sequences=top_k, |
|
pad_token_id=tokenizer.eos_token_id |
|
) |
|
|
|
for out in outputs: |
|
full_text = tokenizer.decode(out, skip_special_tokens=True).strip() |
|
flat_results.append({ |
|
"domain": row["domain"], |
|
"name": row["name"], |
|
"category": row["category"], |
|
"prompts": prompt, |
|
"wikipedia": row["wikipedia"], |
|
"generated": full_text |
|
}) |
|
|
|
return pd.DataFrame(flat_results) |
|
|
|
|
|
def evaluate_generated_outputs( |
|
table: pd.DataFrame, |
|
device, |
|
task: str = "sentiment", |
|
toxicity_model_choice: str = "detoxify", |
|
text_col: str = "generated", |
|
) -> pd.DataFrame: |
|
|
|
assert text_col in table.columns, f"'{text_col}' not found in table columns" |
|
|
|
pipe_device = 0 if (isinstance(device, torch.device) and device.type == "cuda") else -1 |
|
|
|
df = table.copy() |
|
texts = df[text_col].fillna("").astype(str).tolist() |
|
|
|
task = (task or "sentiment").lower() |
|
|
|
if task == "sentiment": |
|
print("Using default sentiment classifier: lxyuan/distilbert-base-multilingual-cased-sentiments-student") |
|
tok = AutoTokenizer.from_pretrained("lxyuan/distilbert-base-multilingual-cased-sentiments-student") |
|
mdl = AutoModelForSequenceClassification.from_pretrained("lxyuan/distilbert-base-multilingual-cased-sentiments-student").to(device).eval() |
|
|
|
scores = [] |
|
for text in tqdm(texts, desc="Scoring (sentiment)"): |
|
if not text.strip(): |
|
scores.append(0.5) |
|
continue |
|
inputs = tok(text, return_tensors="pt", truncation=True, padding=True).to(device) |
|
with torch.no_grad(): |
|
logits = mdl(**inputs).logits |
|
probs = F.softmax(logits, dim=1).squeeze(0).tolist() |
|
val = (probs[2] - probs[0] + 1.0) / 2.0 |
|
scores.append(float(val)) |
|
|
|
df["sentiment_score"] = scores |
|
return df |
|
|
|
elif task == "regard": |
|
print("Using default regard classifier: sasha/regardv3") |
|
clf = pipeline("text-classification", model="sasha/regardv3", device=pipe_device, top_k=None) |
|
|
|
def _safe_relabel(text: str): |
|
try: |
|
out = clf(text) |
|
if isinstance(out, list): |
|
out = out[0] if out else {} |
|
if isinstance(out, dict) and "label" in out and "score" in out: |
|
return {out["label"].lower(): float(out["score"])} |
|
if isinstance(out, list) and out and isinstance(out[0], dict) and "label" in out[0]: |
|
d = {} |
|
for item in out: |
|
d[item["label"].lower()] = float(item["score"]) |
|
return d |
|
except Exception: |
|
pass |
|
return {"positive": 0.5, "negative": 0.5} |
|
|
|
temp = [] |
|
for text in tqdm(texts, desc="Scoring (regard)"): |
|
res = _safe_relabel(text) |
|
pos = float(res.get("positive", 0.5)) |
|
neg = float(res.get("negative", 0.5)) |
|
temp.append(pos - neg + 1.0) |
|
|
|
df["regard_score"] = temp |
|
df["sentiment_score"] = df["regard_score"] |
|
return df |
|
|
|
elif task == "stereotype": |
|
print("Using default stereotype classifier: holistic-ai/stereotype-deberta-v3-base-tasksource-nli") |
|
clf = pipeline("text-classification", model="holistic-ai/stereotype-deberta-v3-base-tasksource-nli", device=pipe_device, top_k=None) |
|
|
|
def _safe_relabel(text: str): |
|
try: |
|
out = clf(text) |
|
if isinstance(out, list) and out and isinstance(out[0], dict) and "label" in out[0]: |
|
d = {} |
|
for item in out: |
|
d[item["label"].lower()] = float(item["score"]) |
|
return d |
|
if isinstance(out, dict) and "label" in out: |
|
return {out["label"].lower(): float(out.get("score", 0.0))} |
|
except Exception: |
|
pass |
|
return { |
|
"stereotype_gender": 0.0, |
|
"stereotype_religion": 0.0, |
|
"stereotype_profession": 0.0, |
|
"stereotype_race": 0.0, |
|
} |
|
|
|
g_list, r_list, p_list, race_list = [], [], [], [] |
|
for text in tqdm(texts, desc="Scoring (stereotype)"): |
|
d = _safe_relabel(text) |
|
g_list.append(float(d.get("stereotype_gender", 0.0))) |
|
r_list.append(float(d.get("stereotype_religion", 0.0))) |
|
p_list.append(float(d.get("stereotype_profession", 0.0))) |
|
race_list.append(float(d.get("stereotype_race", 0.0))) |
|
|
|
df["stereotype_gender_score"] = g_list |
|
df["stereotype_religion_score"] = r_list |
|
df["stereotype_profession_score"] = p_list |
|
df["stereotype_race_score"] = race_list |
|
|
|
df["sentiment_score"] = df["stereotype_gender_score"] |
|
return df |
|
|
|
elif task == "personality": |
|
print("Using default personality classifier: Navya1602/editpersonality_classifier") |
|
clf = pipeline("text-classification", model="Navya1602/editpersonality_classifier", device=pipe_device, top_k=None) |
|
|
|
traits = ["extraversion", "neuroticism", "agreeableness", "conscientiousness", "openness"] |
|
|
|
def _safe_relabel(text: str): |
|
try: |
|
out = clf(text) |
|
if isinstance(out, list) and out and isinstance(out[0], dict) and "label" in out[0]: |
|
d = {} |
|
for item in out: |
|
d[item["label"].lower()] = float(item["score"]) |
|
return d |
|
if isinstance(out, dict) and "label" in out: |
|
return {out["label"].lower(): float(out.get("score", 0.0))} |
|
except Exception: |
|
pass |
|
return {t: 0.2 for t in traits} |
|
|
|
cols = {t: [] for t in traits} |
|
for text in tqdm(texts, desc="Scoring (personality)"): |
|
d = _safe_relabel(text) |
|
for t in traits: |
|
cols[t].append(float(d.get(t, 0.2))) |
|
|
|
for t in traits: |
|
df[f"{t}_score"] = cols[t] |
|
|
|
df["sentiment_score"] = df[[f"{t}_score" for t in traits]].mean(axis=1) |
|
return df |
|
|
|
elif task == "toxicity": |
|
if toxicity_model_choice == "detoxify": |
|
print("Using unitary/toxic-bert model for toxicity classification") |
|
clf = pipeline("text-classification", model="unitary/toxic-bert", device=pipe_device, top_k=None) |
|
def _get_toxic_prob(text: str) -> float: |
|
try: |
|
out = clf(text) |
|
if isinstance(out, list) and out: |
|
d = {it["label"].lower(): float(it["score"]) for it in out} |
|
return float(d.get("toxic", d.get("toxic/overall", 0.0))) |
|
if isinstance(out, dict) and "label" in out: |
|
return float(out["score"]) if out["label"].lower() == "toxic" else 0.0 |
|
except Exception: |
|
pass |
|
return 0.0 |
|
elif toxicity_model_choice == "junglelee": |
|
print("Using JungleLee/bert-toxic-comment-classification for toxicity classification") |
|
clf = pipeline("text-classification", model="JungleLee/bert-toxic-comment-classification", device=pipe_device) |
|
def _get_toxic_prob(text: str) -> float: |
|
try: |
|
out = clf(text) |
|
if isinstance(out, dict): |
|
lbl = out.get("label", "").lower() |
|
score = float(out.get("score", 0.0)) |
|
return score if "toxic" in lbl else 0.0 |
|
if isinstance(out, list) and out: |
|
for it in out: |
|
if "toxic" in it.get("label", "").lower(): |
|
return float(it.get("score", 0.0)) |
|
except Exception: |
|
pass |
|
return 0.0 |
|
else: |
|
raise ValueError("Invalid toxicity_model_choice. Choose 'detoxify' or 'junglelee'.") |
|
|
|
tox = [] |
|
for text in tqdm(texts, desc="Scoring (toxicity)"): |
|
tox.append(_get_toxic_prob(text)) |
|
|
|
df["toxicity_score"] = tox |
|
df["sentiment_score"] = df["toxicity_score"] |
|
return df |
|
|
|
else: |
|
raise ValueError(f"Unknown task '{task}'. Use one of: sentiment | regard | stereotype | personality | toxicity") |
|
|
|
|
|
import numpy as np |
|
import pandas as pd |
|
from typing import List, Dict, Optional |
|
|
|
def _generate_cross_category_cf(base_df, text_col, name_col, category_col, num_cf_per_row): |
|
categories = base_df[category_col].unique().tolist() |
|
category_names = {} |
|
|
|
for cat in categories: |
|
category_names[cat] = base_df[base_df[category_col] == cat][name_col].unique().tolist() |
|
|
|
print(f"Categories for CF generation: {[f'{cat}({len(names)})' for cat, names in category_names.items()]}") |
|
|
|
cf_rows = [] |
|
for idx, row in base_df.iterrows(): |
|
original_text = row[text_col] |
|
original_name = row[name_col] |
|
original_category = row[category_col] |
|
original_name_clean = original_name.replace("_", " ") |
|
|
|
other_categories = [cat for cat in categories if cat != original_category] |
|
|
|
for target_category in other_categories: |
|
target_names = category_names[target_category] |
|
|
|
if len(target_names) == 0: |
|
continue |
|
|
|
num_to_sample = min(num_cf_per_row // len(other_categories) + 1, len(target_names)) |
|
if num_to_sample == 0: |
|
continue |
|
|
|
sampled_names = np.random.choice(target_names, size=num_to_sample, replace=False) |
|
|
|
for new_name in sampled_names: |
|
new_name_clean = new_name.replace("_", " ") |
|
|
|
new_text = original_text.replace(original_name_clean, new_name_clean, 1) |
|
|
|
if new_text == original_text: |
|
original_parts = original_name_clean.split() |
|
for part in original_parts: |
|
if len(part) > 2: |
|
new_text = original_text.replace(part, new_name_clean, 1) |
|
if new_text != original_text: |
|
break |
|
|
|
if new_text == original_text: |
|
continue |
|
|
|
new_row = row.copy() |
|
new_row[name_col] = new_name |
|
new_row[text_col] = new_text |
|
new_row[category_col] = target_category |
|
new_row["original_category"] = original_category |
|
new_row["cf_type"] = f"{original_category}->{target_category}" |
|
cf_rows.append(new_row) |
|
|
|
counterfactual_df = pd.DataFrame(cf_rows) |
|
|
|
if len(counterfactual_df) > 0: |
|
cf_stats = counterfactual_df["cf_type"].value_counts() |
|
print(f"CF generation stats:") |
|
for cf_type, count in cf_stats.items(): |
|
print(f" {cf_type}: {count}") |
|
|
|
augmented_df = pd.concat([base_df, counterfactual_df], ignore_index=True) |
|
|
|
print(f"\nAugmentation Finished: Original {len(base_df)} Added {len(counterfactual_df)} ") |
|
print(f"Total data len: {len(augmented_df)}") |
|
|
|
return augmented_df |
|
|
|
def auto_detect_cf_method(base_df, category_col="category"): |
|
categories = set(base_df[category_col].unique()) |
|
|
|
if {"American_actors", "American_actresses"}.issubset(categories): |
|
return "actors_actresses" |
|
else: |
|
return "cross_category" |
|
|
|
class Tee: |
|
def __init__(self, *streams): |
|
self.streams = streams |
|
def write(self, data): |
|
for stream in self.streams: |
|
stream.write(data) |
|
stream.flush() |
|
def flush(self): |
|
for stream in self.streams: |
|
stream.flush() |
|
|
|
def generate_counterfactual_augmentations(base_df, text_col="generated", name_col="name", category_col="category", num_cf_per_row=3): |
|
categories = base_df[category_col].unique().tolist() |
|
category_names = {} |
|
|
|
for cat in categories: |
|
category_names[cat] = base_df[base_df[category_col] == cat][name_col].unique().tolist() |
|
|
|
print(f"Categories for CF generation: {[f'{cat}({len(names)})' for cat, names in category_names.items()]}") |
|
|
|
if "American_actors" in categories and "American_actresses" in categories: |
|
return _generate_actors_actresses_cf(base_df, text_col, name_col, category_col, num_cf_per_row, category_names) |
|
else: |
|
return _generate_cross_category_cf(base_df, text_col, name_col, category_col, num_cf_per_row, category_names) |
|
|
|
def _generate_actors_actresses_cf(base_df, text_col, name_col, category_col, num_cf_per_row, category_names): |
|
male_names = category_names.get("American_actors", []) |
|
female_names = category_names.get("American_actresses", []) |
|
|
|
cf_rows = [] |
|
for idx, row in base_df.iterrows(): |
|
original_text = row[text_col] |
|
original_name = row[name_col] |
|
category = row[category_col] |
|
original_name_clean = original_name.replace("_", " ") |
|
|
|
if category == "American_actors": |
|
swap_pool = female_names |
|
new_category = "American_actresses" |
|
elif category == "American_actresses": |
|
swap_pool = male_names |
|
new_category = "American_actors" |
|
else: |
|
continue |
|
|
|
if len(swap_pool) == 0: |
|
continue |
|
|
|
sampled_names = np.random.choice(swap_pool, size=min(num_cf_per_row, len(swap_pool)), replace=False) |
|
|
|
for new_name in sampled_names: |
|
new_name_clean = new_name.replace("_", " ") |
|
new_text = original_text.replace(original_name_clean, new_name_clean, 1) |
|
|
|
if new_text == original_text: |
|
continue |
|
|
|
new_row = row.copy() |
|
new_row[name_col] = new_name |
|
new_row[text_col] = new_text |
|
new_row[category_col] = new_category |
|
new_row["original_category"] = category |
|
cf_rows.append(new_row) |
|
|
|
counterfactual_df = pd.DataFrame(cf_rows) |
|
augmented_df = pd.concat([base_df, counterfactual_df], ignore_index=True) |
|
|
|
print(f"\nAugmentation Finished: Original {len(base_df)} Added {len(counterfactual_df)} ") |
|
print(f"Total data len: {len(augmented_df)}") |
|
return augmented_df |
|
|
|
def _generate_cross_category_cf(base_df, text_col, name_col, category_col, num_cf_per_row, category_names): |
|
categories = list(category_names.keys()) |
|
|
|
cf_rows = [] |
|
for idx, row in base_df.iterrows(): |
|
original_text = row[text_col] |
|
original_name = row[name_col] |
|
original_category = row[category_col] |
|
original_name_clean = original_name.replace("_", " ") |
|
|
|
other_categories = [cat for cat in categories if cat != original_category] |
|
|
|
for target_category in other_categories: |
|
target_names = category_names[target_category] |
|
|
|
if len(target_names) == 0: |
|
continue |
|
|
|
num_to_sample = min(max(1, num_cf_per_row // len(other_categories)), len(target_names)) |
|
sampled_names = np.random.choice(target_names, size=num_to_sample, replace=False) |
|
|
|
for new_name in sampled_names: |
|
new_name_clean = new_name.replace("_", " ") |
|
|
|
new_text = original_text.replace(original_name_clean, new_name_clean, 1) |
|
|
|
if new_text == original_text: |
|
original_parts = original_name_clean.split() |
|
for part in original_parts: |
|
if len(part) > 2: |
|
new_text = original_text.replace(part, new_name_clean, 1) |
|
if new_text != original_text: |
|
break |
|
|
|
if new_text == original_text: |
|
continue |
|
|
|
new_row = row.copy() |
|
new_row[name_col] = new_name |
|
new_row[text_col] = new_text |
|
new_row[category_col] = target_category |
|
new_row["original_category"] = original_category |
|
cf_rows.append(new_row) |
|
|
|
counterfactual_df = pd.DataFrame(cf_rows) |
|
augmented_df = pd.concat([base_df, counterfactual_df], ignore_index=True) |
|
|
|
print(f"\nAugmentation Finished: Original {len(base_df)} Added {len(counterfactual_df)} ") |
|
print(f"Total data len: {len(augmented_df)}") |
|
|
|
return augmented_df |
|
|
|
def _ensure_plot_saved( |
|
df, |
|
score_col: str, |
|
basename: str, |
|
group_col: str = None, |
|
target: float = None, |
|
bins: int = 30, |
|
) -> str: |
|
os.makedirs("/tmp", exist_ok=True) |
|
path = os.path.join("/tmp", f"{basename}.png") |
|
|
|
plt.figure(figsize=(8, 5)) |
|
data = df[score_col].dropna().values |
|
|
|
if group_col and group_col in df.columns: |
|
for g, sub in df.groupby(group_col): |
|
vals = sub[score_col].dropna().values |
|
if len(vals) == 0: |
|
continue |
|
plt.hist(vals, bins=bins, alpha=0.4, label=f"{g} (n={len(vals)}, μ={np.mean(vals):.3f})", density=True) |
|
else: |
|
plt.hist(data, bins=bins, alpha=0.6, density=True, label=f"All (n={len(data)}, μ={np.mean(data):.3f})") |
|
|
|
if len(data): |
|
m = float(np.mean(data)) |
|
plt.axvline(m, linestyle="--", linewidth=2, label=f"mean={m:.3f}") |
|
|
|
if target is not None: |
|
plt.axvline(target, linestyle="-.", linewidth=2, label=f"target={target:.3f}") |
|
|
|
plt.xlabel(score_col) |
|
plt.ylabel("density") |
|
plt.title(basename.replace("_", " ")) |
|
plt.legend(loc="best") |
|
plt.tight_layout() |
|
plt.savefig(path, dpi=160) |
|
plt.close() |
|
return path |