from transformers import TextDataset,DataCollatorForLanguageModeling,Trainer,TrainingArguments import torch import pandas as pd from tqdm import tqdm import torch.nn.functional as F from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline import matplotlib.pyplot as plt import seaborn as sns import numpy as np import os import sys from transformers import ( AutoTokenizer, AutoModelForCausalLM, GPT2LMHeadModel, GPT2Tokenizer, ) def load_model_and_tokenizer(model_name: str): import os import torch # 設定快取目錄 cache_dir = "/tmp/hf_models" os.makedirs(cache_dir, exist_ok=True) # 設定環境變數 os.environ['HF_HOME'] = cache_dir os.environ['TRANSFORMERS_CACHE'] = cache_dir # 確定設備 if torch.cuda.is_available(): device = torch.device("cuda") dtype = torch.float16 else: device = torch.device("cpu") dtype = torch.float32 print(f"載入模型: {model_name}") print(f"設備: {device}, 精度: {dtype}") print(f"快取目錄: {cache_dir}") try: # 強制線上下載,不使用快取 download_kwargs = { 'cache_dir': cache_dir, 'force_download': False, # 改為 True 如果需要強制重新下載 'resume_download': True, 'local_files_only': False } model_kwargs = { **download_kwargs, 'torch_dtype': dtype, 'low_cpu_mem_usage': True, } if device.type == "cuda": model_kwargs['device_map'] = "auto" # 根據模型類型載入 if model_name in {"gpt2", "openai-community/gpt2"}: print("使用 GPT2 專用載入器") tokenizer = GPT2Tokenizer.from_pretrained(model_name, **download_kwargs) model = GPT2LMHeadModel.from_pretrained(model_name, **model_kwargs) else: print("使用 Auto 載入器") tokenizer = AutoTokenizer.from_pretrained(model_name, **download_kwargs) model = AutoModelForCausalLM.from_pretrained(model_name, **model_kwargs) # 設定 pad token if tokenizer.pad_token is None and tokenizer.eos_token is not None: tokenizer.pad_token = tokenizer.eos_token if hasattr(model.config, 'pad_token_id') and model.config.pad_token_id is None: if hasattr(model.config, 'eos_token_id') and model.config.eos_token_id is not None: model.config.pad_token_id = model.config.eos_token_id # 如果不是 auto device_map,手動移動到設備 if device.type != "cuda": model = model.to(device) print(f"✓ 成功載入模型 {model_name}") return tokenizer, model, device except Exception as e: import traceback print(f"載入失敗: {str(e)}") print(f"完整錯誤: {traceback.format_exc()}") # 嘗試替代方案 if model_name == "openai-community/gpt2": print("嘗試使用 'gpt2' 替代...") return load_model_and_tokenizer("gpt2") raise RuntimeError(f"無法載入模型 '{model_name}': {e}") except Exception as e: # 提供更詳細的錯誤信息 import traceback print(f"Error loading model {model_name}: {str(e)}") print(f"Traceback: {traceback.format_exc()}") raise RuntimeError(f"Failed to load model '{model_name}': {e}") def finetune(train_texts, tokenizer, model, num_epochs=20, output_dir='/temp/'): train_path = f"/tmp/train.txt" with open(train_path, "w", encoding="utf-8") as f: for text in train_texts: f.write(text.strip() + "\n") train_dataset = TextDataset(tokenizer=tokenizer, file_path=train_path, block_size=128) data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False) training_args = TrainingArguments( output_dir=output_dir, overwrite_output_dir=True, per_device_train_batch_size=1, num_train_epochs=num_epochs, save_steps=500, save_total_limit=2, logging_dir='./logs', logging_steps=10, report_to="none" ) trainer = Trainer( model=model, args=training_args, data_collator=data_collator, train_dataset=train_dataset, ) trainer.train() return model def generate_topk_samples(model, df_table, tokenizer, device, top_k=10): model.eval() flat_results = [] df_table["prompts"] = df_table["prompts"].apply(lambda x: x[0] if isinstance(x, list) else x) for idx, row in tqdm(df_table.iterrows(), total=len(df_table), desc="Generating samples"): prompt = row["prompts"] inputs = tokenizer( prompt, return_tensors="pt", truncation=True, padding=True ).to(device) with torch.no_grad(): outputs = model.generate( input_ids=inputs["input_ids"], attention_mask=inputs["attention_mask"], do_sample=True, top_k=top_k, max_new_tokens=20, top_p=1.0, num_return_sequences=top_k, pad_token_id=tokenizer.eos_token_id ) for out in outputs: full_text = tokenizer.decode(out, skip_special_tokens=True).strip() flat_results.append({ "domain": row["domain"], "name": row["name"], "category": row["category"], "prompts": prompt, "wikipedia": row["wikipedia"], "generated": full_text }) return pd.DataFrame(flat_results) def evaluate_generated_outputs( table: pd.DataFrame, device, task: str = "sentiment", toxicity_model_choice: str = "detoxify", text_col: str = "generated", ) -> pd.DataFrame: assert text_col in table.columns, f"'{text_col}' not found in table columns" pipe_device = 0 if (isinstance(device, torch.device) and device.type == "cuda") else -1 df = table.copy() texts = df[text_col].fillna("").astype(str).tolist() task = (task or "sentiment").lower() if task == "sentiment": print("Using default sentiment classifier: lxyuan/distilbert-base-multilingual-cased-sentiments-student") tok = AutoTokenizer.from_pretrained("lxyuan/distilbert-base-multilingual-cased-sentiments-student") mdl = AutoModelForSequenceClassification.from_pretrained("lxyuan/distilbert-base-multilingual-cased-sentiments-student").to(device).eval() scores = [] for text in tqdm(texts, desc="Scoring (sentiment)"): if not text.strip(): scores.append(0.5) continue inputs = tok(text, return_tensors="pt", truncation=True, padding=True).to(device) with torch.no_grad(): logits = mdl(**inputs).logits probs = F.softmax(logits, dim=1).squeeze(0).tolist() val = (probs[2] - probs[0] + 1.0) / 2.0 scores.append(float(val)) df["sentiment_score"] = scores return df elif task == "regard": print("Using default regard classifier: sasha/regardv3") clf = pipeline("text-classification", model="sasha/regardv3", device=pipe_device, top_k=None) def _safe_relabel(text: str): try: out = clf(text) if isinstance(out, list): out = out[0] if out else {} if isinstance(out, dict) and "label" in out and "score" in out: return {out["label"].lower(): float(out["score"])} if isinstance(out, list) and out and isinstance(out[0], dict) and "label" in out[0]: d = {} for item in out: d[item["label"].lower()] = float(item["score"]) return d except Exception: pass return {"positive": 0.5, "negative": 0.5} temp = [] for text in tqdm(texts, desc="Scoring (regard)"): res = _safe_relabel(text) pos = float(res.get("positive", 0.5)) neg = float(res.get("negative", 0.5)) temp.append(pos - neg + 1.0) df["regard_score"] = temp df["sentiment_score"] = df["regard_score"] return df elif task == "stereotype": print("Using default stereotype classifier: holistic-ai/stereotype-deberta-v3-base-tasksource-nli") clf = pipeline("text-classification", model="holistic-ai/stereotype-deberta-v3-base-tasksource-nli", device=pipe_device, top_k=None) def _safe_relabel(text: str): try: out = clf(text) if isinstance(out, list) and out and isinstance(out[0], dict) and "label" in out[0]: d = {} for item in out: d[item["label"].lower()] = float(item["score"]) return d if isinstance(out, dict) and "label" in out: return {out["label"].lower(): float(out.get("score", 0.0))} except Exception: pass return { "stereotype_gender": 0.0, "stereotype_religion": 0.0, "stereotype_profession": 0.0, "stereotype_race": 0.0, } g_list, r_list, p_list, race_list = [], [], [], [] for text in tqdm(texts, desc="Scoring (stereotype)"): d = _safe_relabel(text) g_list.append(float(d.get("stereotype_gender", 0.0))) r_list.append(float(d.get("stereotype_religion", 0.0))) p_list.append(float(d.get("stereotype_profession", 0.0))) race_list.append(float(d.get("stereotype_race", 0.0))) df["stereotype_gender_score"] = g_list df["stereotype_religion_score"] = r_list df["stereotype_profession_score"] = p_list df["stereotype_race_score"] = race_list df["sentiment_score"] = df["stereotype_gender_score"] return df elif task == "personality": print("Using default personality classifier: Navya1602/editpersonality_classifier") clf = pipeline("text-classification", model="Navya1602/editpersonality_classifier", device=pipe_device, top_k=None) traits = ["extraversion", "neuroticism", "agreeableness", "conscientiousness", "openness"] def _safe_relabel(text: str): try: out = clf(text) if isinstance(out, list) and out and isinstance(out[0], dict) and "label" in out[0]: d = {} for item in out: d[item["label"].lower()] = float(item["score"]) return d if isinstance(out, dict) and "label" in out: return {out["label"].lower(): float(out.get("score", 0.0))} except Exception: pass return {t: 0.2 for t in traits} cols = {t: [] for t in traits} for text in tqdm(texts, desc="Scoring (personality)"): d = _safe_relabel(text) for t in traits: cols[t].append(float(d.get(t, 0.2))) for t in traits: df[f"{t}_score"] = cols[t] df["sentiment_score"] = df[[f"{t}_score" for t in traits]].mean(axis=1) return df elif task == "toxicity": if toxicity_model_choice == "detoxify": print("Using unitary/toxic-bert model for toxicity classification") clf = pipeline("text-classification", model="unitary/toxic-bert", device=pipe_device, top_k=None) def _get_toxic_prob(text: str) -> float: try: out = clf(text) if isinstance(out, list) and out: d = {it["label"].lower(): float(it["score"]) for it in out} return float(d.get("toxic", d.get("toxic/overall", 0.0))) if isinstance(out, dict) and "label" in out: return float(out["score"]) if out["label"].lower() == "toxic" else 0.0 except Exception: pass return 0.0 elif toxicity_model_choice == "junglelee": print("Using JungleLee/bert-toxic-comment-classification for toxicity classification") clf = pipeline("text-classification", model="JungleLee/bert-toxic-comment-classification", device=pipe_device) def _get_toxic_prob(text: str) -> float: try: out = clf(text) if isinstance(out, dict): lbl = out.get("label", "").lower() score = float(out.get("score", 0.0)) return score if "toxic" in lbl else 0.0 if isinstance(out, list) and out: for it in out: if "toxic" in it.get("label", "").lower(): return float(it.get("score", 0.0)) except Exception: pass return 0.0 else: raise ValueError("Invalid toxicity_model_choice. Choose 'detoxify' or 'junglelee'.") tox = [] for text in tqdm(texts, desc="Scoring (toxicity)"): tox.append(_get_toxic_prob(text)) df["toxicity_score"] = tox df["sentiment_score"] = df["toxicity_score"] return df else: raise ValueError(f"Unknown task '{task}'. Use one of: sentiment | regard | stereotype | personality | toxicity") import numpy as np import pandas as pd from typing import List, Dict, Optional def _generate_cross_category_cf(base_df, text_col, name_col, category_col, num_cf_per_row): categories = base_df[category_col].unique().tolist() category_names = {} for cat in categories: category_names[cat] = base_df[base_df[category_col] == cat][name_col].unique().tolist() print(f"Categories for CF generation: {[f'{cat}({len(names)})' for cat, names in category_names.items()]}") cf_rows = [] for idx, row in base_df.iterrows(): original_text = row[text_col] original_name = row[name_col] original_category = row[category_col] original_name_clean = original_name.replace("_", " ") other_categories = [cat for cat in categories if cat != original_category] for target_category in other_categories: target_names = category_names[target_category] if len(target_names) == 0: continue num_to_sample = min(num_cf_per_row // len(other_categories) + 1, len(target_names)) if num_to_sample == 0: continue sampled_names = np.random.choice(target_names, size=num_to_sample, replace=False) for new_name in sampled_names: new_name_clean = new_name.replace("_", " ") new_text = original_text.replace(original_name_clean, new_name_clean, 1) if new_text == original_text: original_parts = original_name_clean.split() for part in original_parts: if len(part) > 2: new_text = original_text.replace(part, new_name_clean, 1) if new_text != original_text: break if new_text == original_text: continue new_row = row.copy() new_row[name_col] = new_name new_row[text_col] = new_text new_row[category_col] = target_category new_row["original_category"] = original_category new_row["cf_type"] = f"{original_category}->{target_category}" cf_rows.append(new_row) counterfactual_df = pd.DataFrame(cf_rows) if len(counterfactual_df) > 0: cf_stats = counterfactual_df["cf_type"].value_counts() print(f"CF generation stats:") for cf_type, count in cf_stats.items(): print(f" {cf_type}: {count}") augmented_df = pd.concat([base_df, counterfactual_df], ignore_index=True) print(f"\nAugmentation Finished: Original {len(base_df)} Added {len(counterfactual_df)} ") print(f"Total data len: {len(augmented_df)}") return augmented_df def auto_detect_cf_method(base_df, category_col="category"): categories = set(base_df[category_col].unique()) if {"American_actors", "American_actresses"}.issubset(categories): return "actors_actresses" else: return "cross_category" class Tee: def __init__(self, *streams): self.streams = streams def write(self, data): for stream in self.streams: stream.write(data) stream.flush() def flush(self): for stream in self.streams: stream.flush() def generate_counterfactual_augmentations(base_df, text_col="generated", name_col="name", category_col="category", num_cf_per_row=3): categories = base_df[category_col].unique().tolist() category_names = {} for cat in categories: category_names[cat] = base_df[base_df[category_col] == cat][name_col].unique().tolist() print(f"Categories for CF generation: {[f'{cat}({len(names)})' for cat, names in category_names.items()]}") if "American_actors" in categories and "American_actresses" in categories: return _generate_actors_actresses_cf(base_df, text_col, name_col, category_col, num_cf_per_row, category_names) else: return _generate_cross_category_cf(base_df, text_col, name_col, category_col, num_cf_per_row, category_names) def _generate_actors_actresses_cf(base_df, text_col, name_col, category_col, num_cf_per_row, category_names): male_names = category_names.get("American_actors", []) female_names = category_names.get("American_actresses", []) cf_rows = [] for idx, row in base_df.iterrows(): original_text = row[text_col] original_name = row[name_col] category = row[category_col] original_name_clean = original_name.replace("_", " ") if category == "American_actors": swap_pool = female_names new_category = "American_actresses" elif category == "American_actresses": swap_pool = male_names new_category = "American_actors" else: continue if len(swap_pool) == 0: continue sampled_names = np.random.choice(swap_pool, size=min(num_cf_per_row, len(swap_pool)), replace=False) for new_name in sampled_names: new_name_clean = new_name.replace("_", " ") new_text = original_text.replace(original_name_clean, new_name_clean, 1) if new_text == original_text: continue new_row = row.copy() new_row[name_col] = new_name new_row[text_col] = new_text new_row[category_col] = new_category new_row["original_category"] = category cf_rows.append(new_row) counterfactual_df = pd.DataFrame(cf_rows) augmented_df = pd.concat([base_df, counterfactual_df], ignore_index=True) print(f"\nAugmentation Finished: Original {len(base_df)} Added {len(counterfactual_df)} ") print(f"Total data len: {len(augmented_df)}") return augmented_df def _generate_cross_category_cf(base_df, text_col, name_col, category_col, num_cf_per_row, category_names): categories = list(category_names.keys()) cf_rows = [] for idx, row in base_df.iterrows(): original_text = row[text_col] original_name = row[name_col] original_category = row[category_col] original_name_clean = original_name.replace("_", " ") other_categories = [cat for cat in categories if cat != original_category] for target_category in other_categories: target_names = category_names[target_category] if len(target_names) == 0: continue num_to_sample = min(max(1, num_cf_per_row // len(other_categories)), len(target_names)) sampled_names = np.random.choice(target_names, size=num_to_sample, replace=False) for new_name in sampled_names: new_name_clean = new_name.replace("_", " ") new_text = original_text.replace(original_name_clean, new_name_clean, 1) if new_text == original_text: original_parts = original_name_clean.split() for part in original_parts: if len(part) > 2: new_text = original_text.replace(part, new_name_clean, 1) if new_text != original_text: break if new_text == original_text: continue new_row = row.copy() new_row[name_col] = new_name new_row[text_col] = new_text new_row[category_col] = target_category new_row["original_category"] = original_category cf_rows.append(new_row) counterfactual_df = pd.DataFrame(cf_rows) augmented_df = pd.concat([base_df, counterfactual_df], ignore_index=True) print(f"\nAugmentation Finished: Original {len(base_df)} Added {len(counterfactual_df)} ") print(f"Total data len: {len(augmented_df)}") return augmented_df def _ensure_plot_saved( df, score_col: str, basename: str, group_col: str = None, target: float = None, bins: int = 30, ) -> str: os.makedirs("/tmp", exist_ok=True) path = os.path.join("/tmp", f"{basename}.png") plt.figure(figsize=(8, 5)) data = df[score_col].dropna().values if group_col and group_col in df.columns: for g, sub in df.groupby(group_col): vals = sub[score_col].dropna().values if len(vals) == 0: continue plt.hist(vals, bins=bins, alpha=0.4, label=f"{g} (n={len(vals)}, μ={np.mean(vals):.3f})", density=True) else: plt.hist(data, bins=bins, alpha=0.6, density=True, label=f"All (n={len(data)}, μ={np.mean(data):.3f})") if len(data): m = float(np.mean(data)) plt.axvline(m, linestyle="--", linewidth=2, label=f"mean={m:.3f}") if target is not None: plt.axvline(target, linestyle="-.", linewidth=2, label=f"target={target:.3f}") plt.xlabel(score_col) plt.ylabel("density") plt.title(basename.replace("_", " ")) plt.legend(loc="best") plt.tight_layout() plt.savefig(path, dpi=160) plt.close() return path