|
import os, math, random |
|
import pandas as pd |
|
import torch |
|
from typing import Optional |
|
from transformers import (AutoTokenizer, AutoModelForCausalLM, DataCollatorForLanguageModeling, |
|
Trainer, TrainingArguments) |
|
|
|
try: |
|
from peft import LoraConfig, get_peft_model, TaskType |
|
PEFT_AVAILABLE = True |
|
except Exception: |
|
PEFT_AVAILABLE = False |
|
|
|
def build_text_column(df: pd.DataFrame) -> pd.Series: |
|
cols = [c.lower() for c in df.columns] |
|
lower_map = {c.lower(): c for c in df.columns} |
|
if 'text' in cols: |
|
return df[lower_map['text']].astype(str) |
|
if 'prompt' in cols and 'generated' in cols: |
|
pcol = lower_map['prompt']; rcol = lower_map['generated'] |
|
return df.apply(lambda r: f"### Instruction:\n{r[pcol]}\n\n### Response:\n{r[rcol]}\n", axis=1) |
|
|
|
if 'generated' in cols: |
|
return df[lower_map['generated']].astype(str) |
|
|
|
raise ValueError("CSV 缺少可用欄位:請提供 text,或 prompt+generated,或 generated。") |
|
|
|
def finetune_gpt2_from_csv( |
|
csv_path: str, |
|
base_model: str = "gpt2", |
|
output_dir: str = "/tmp/ft_gpt2_out", |
|
train_split: float = 0.9, |
|
epochs: int = 3, |
|
lr: float = 5e-5, |
|
batch_size: int = 2, |
|
use_lora: bool = False, |
|
lora_r: int = 8, |
|
lora_alpha: int = 16, |
|
lora_dropout: float = 0.05, |
|
seed: int = 42, |
|
max_length: int = 512, |
|
) -> dict: |
|
os.makedirs(output_dir, exist_ok=True) |
|
random.seed(seed); torch.manual_seed(seed) |
|
|
|
df = pd.read_csv(csv_path) |
|
texts = build_text_column(df).fillna("").tolist() |
|
|
|
tokenizer = AutoTokenizer.from_pretrained(base_model) |
|
if tokenizer.pad_token is None: |
|
tokenizer.pad_token = tokenizer.eos_token |
|
|
|
model = AutoModelForCausalLM.from_pretrained(base_model) |
|
|
|
if use_lora: |
|
if not PEFT_AVAILABLE: |
|
print("PEFT 未安裝,改為全參數微調") |
|
else: |
|
lconf = LoraConfig( |
|
r=lora_r, lora_alpha=lora_alpha, lora_dropout=lora_dropout, |
|
task_type=TaskType.CAUSAL_LM, target_modules=["c_attn","c_proj","q_attn"] |
|
) |
|
model = get_peft_model(model, lconf) |
|
|
|
def tokenize(example_texts): |
|
return tokenizer(example_texts, truncation=True, max_length=max_length) |
|
|
|
split_idx = int(len(texts) * train_split) |
|
train_texts, val_texts = texts[:split_idx], texts[split_idx:] or texts[: max(1, len(texts)//10)] |
|
|
|
train_enc = tokenize(train_texts) |
|
val_enc = tokenize(val_texts) |
|
|
|
collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False) |
|
|
|
class SimpleDS(torch.utils.data.Dataset): |
|
def __init__(self, enc): self.enc = enc |
|
def __len__(self): return len(self.enc["input_ids"]) |
|
def __getitem__(self, idx): |
|
return {k: torch.tensor(v[idx]) for k, v in self.enc.items()} |
|
|
|
train_ds, val_ds = SimpleDS(train_enc), SimpleDS(val_enc) |
|
|
|
args = TrainingArguments( |
|
output_dir=output_dir, |
|
per_device_train_batch_size=batch_size, |
|
per_device_eval_batch_size=batch_size, |
|
num_train_epochs=epochs, |
|
learning_rate=lr, |
|
warmup_ratio=0.03, |
|
weight_decay=0.01, |
|
logging_steps=20, |
|
eval_strategy="steps", |
|
eval_steps=100, |
|
save_strategy="steps", |
|
save_steps=100, |
|
save_total_limit=2, |
|
fp16=torch.cuda.is_available(), |
|
bf16=torch.cuda.is_bf16_supported() if hasattr(torch.cuda, "is_bf16_supported") else False, |
|
report_to=[], |
|
) |
|
|
|
trainer = Trainer( |
|
model=model, |
|
args=args, |
|
train_dataset=train_ds, |
|
eval_dataset=val_ds, |
|
data_collator=collator, |
|
) |
|
|
|
trainer.train() |
|
trainer.save_model(output_dir) |
|
tokenizer.save_pretrained(output_dir) |
|
|
|
return { |
|
"output_dir": output_dir, |
|
"train_size": len(train_ds), |
|
"eval_size": len(val_ds), |
|
"perplexity": math.exp(trainer.evaluate()["eval_loss"]) if "eval_loss" in trainer.evaluate() else None |
|
} |