""" Fine-tuning script for Iain Morris style article generation Uses QLoRA for efficient training """ import os import json import torch from transformers import ( AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer, DataCollatorForLanguageModeling, BitsAndBytesConfig ) from peft import ( LoraConfig, get_peft_model, TaskType, prepare_model_for_kbit_training ) from datasets import Dataset, load_from_disk import logging from typing import Dict, List # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class IainMorrisFineTuner: def __init__(self, model_name: str = "microsoft/DialoGPT-medium"): """ Initialize the fine-tuner Args: model_name: Base model to fine-tune """ # Use Zephyr-7B-Beta - excellent for instruction following, no auth required self.model_name = "HuggingFaceH4/zephyr-7b-beta" # Configure device for Apple Silicon M3 if torch.backends.mps.is_available(): self.device = torch.device("mps") self.use_mps = True self.use_cuda = False logger.info("Using Apple Silicon MPS acceleration") elif torch.cuda.is_available(): self.device = torch.device("cuda") self.use_mps = False self.use_cuda = True logger.info("Using CUDA acceleration") else: self.device = torch.device("cpu") self.use_mps = False self.use_cuda = False logger.info("Using CPU") logger.info(f"Using device: {self.device}") # Skip quantization for MPS - not supported yet if self.use_cuda: self.bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16 ) else: self.bnb_config = None if self.use_mps: logger.info("Quantization not supported on MPS. Using full precision with memory optimization.") else: logger.info("Quantization not available on CPU. Using full precision.") # LoRA configuration optimized for M3 lora_rank = 16 if (self.use_mps or self.use_cuda) else 8 # Full rank for M3/CUDA self.lora_config = LoraConfig( r=lora_rank, # Rank lora_alpha=32, # Alpha parameter for LoRA scaling target_modules=[ "q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj", "lm_head", ], bias="none", lora_dropout=0.05, task_type=TaskType.CAUSAL_LM, ) def load_model_and_tokenizer(self): """Load the base model and tokenizer""" logger.info(f"Loading model: {self.model_name}") # Load tokenizer self.tokenizer = AutoTokenizer.from_pretrained( self.model_name, trust_remote_code=True, padding_side="left" ) # Add pad token if it doesn't exist if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token # Load model with M3-optimized settings model_kwargs = { "trust_remote_code": True, "low_cpu_mem_usage": True, } if self.use_cuda: # CUDA settings with quantization model_kwargs.update({ "quantization_config": self.bnb_config, "device_map": "auto", "torch_dtype": torch.bfloat16 }) elif self.use_mps: # MPS (Apple Silicon) optimized settings model_kwargs.update({ "torch_dtype": torch.float16, # float16 works well on MPS "device_map": None, # Let us handle device placement manually }) else: # CPU settings model_kwargs.update({ "torch_dtype": torch.float32, "device_map": None, }) self.model = AutoModelForCausalLM.from_pretrained( self.model_name, **model_kwargs ) # Move model to device if not using device_map if not self.use_cuda: self.model = self.model.to(self.device) # Prepare model for training if self.use_cuda: self.model = prepare_model_for_kbit_training(self.model) else: # For MPS/CPU training, just ensure model is in training mode self.model.train() # Add LoRA adapters self.model = get_peft_model(self.model, self.lora_config) # Print trainable parameters self.model.print_trainable_parameters() logger.info("Model and tokenizer loaded successfully") def format_chat_template(self, example: Dict) -> str: """ Format example using chat template Args: example: Training example with messages Returns: Formatted text """ messages = example['messages'] # Use the tokenizer's chat template if available if hasattr(self.tokenizer, 'apply_chat_template'): try: return self.tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=False ) except: pass # Fallback formatting formatted = "" for message in messages: role = message['role'] content = message['content'] if role == 'system': formatted += f"<|system|>\n{content}\n" elif role == 'user': formatted += f"<|user|>\n{content}\n" elif role == 'assistant': formatted += f"<|assistant|>\n{content}\n" return formatted def tokenize_function(self, examples: Dict) -> Dict: """ Tokenize examples for training Args: examples: Batch of examples Returns: Tokenized examples """ # Format each example texts = [] for i in range(len(examples['messages'])): example = {'messages': examples['messages'][i]} formatted_text = self.format_chat_template(example) texts.append(formatted_text) # Tokenize tokenized = self.tokenizer( texts, truncation=True, padding=False, max_length=2048, return_overflowing_tokens=False, ) # Set labels for causal language modeling tokenized["labels"] = tokenized["input_ids"].copy() return tokenized def load_datasets(self, data_dir: str = "data"): """ Load training and validation datasets Args: data_dir: Directory containing the datasets """ logger.info("Loading datasets...") try: # Try to load HF datasets first self.train_dataset = load_from_disk(f"{data_dir}/train_hf_dataset") self.val_dataset = load_from_disk(f"{data_dir}/val_hf_dataset") except: # Fallback to JSON files - prioritize enhanced dataset try: # Try enhanced dataset first (includes non-telecom examples) with open(f"{data_dir}/enhanced_train_dataset.json", 'r') as f: train_data = json.load(f) logger.info("Using enhanced training dataset with non-telecom examples") except FileNotFoundError: try: # Fall back to improved dataset (updated system prompts) with open(f"{data_dir}/improved_train_dataset.json", 'r') as f: train_data = json.load(f) logger.info("Using improved training dataset with updated system prompts") except FileNotFoundError: # Final fallback to original dataset with open(f"{data_dir}/train_dataset.json", 'r') as f: train_data = json.load(f) logger.info("Using original training dataset") # Load validation dataset (use improved if available) try: with open(f"{data_dir}/improved_val_dataset.json", 'r') as f: val_data = json.load(f) logger.info("Using improved validation dataset") except FileNotFoundError: with open(f"{data_dir}/val_dataset.json", 'r') as f: val_data = json.load(f) logger.info("Using original validation dataset") self.train_dataset = Dataset.from_list(train_data) self.val_dataset = Dataset.from_list(val_data) logger.info(f"Loaded {len(self.train_dataset)} training examples") logger.info(f"Loaded {len(self.val_dataset)} validation examples") # Tokenize datasets logger.info("Tokenizing datasets...") self.train_dataset = self.train_dataset.map( self.tokenize_function, batched=True, remove_columns=self.train_dataset.column_names ) self.val_dataset = self.val_dataset.map( self.tokenize_function, batched=True, remove_columns=self.val_dataset.column_names ) logger.info("Datasets tokenized successfully") def setup_training_args(self, output_dir: str = "models/iain-morris-model-enhanced"): """ Setup training arguments optimized for M3 Args: output_dir: Directory to save the model """ # Base training arguments - improved based on training guide recommendations training_kwargs = { "output_dir": output_dir, "num_train_epochs": 4 if self.use_mps else 4, # Increased epochs for better style learning "per_device_train_batch_size": 1, "per_device_eval_batch_size": 1, "gradient_accumulation_steps": 8 if self.use_mps else 4, # More accumulation for MPS "save_steps": 50, "logging_steps": 10, "learning_rate": 5e-5 if self.use_mps else 5e-5, # Lower LR as recommended (5e-5) "weight_decay": 0.001, "max_grad_norm": 0.3, "max_steps": -1, "warmup_ratio": 0.03, "group_by_length": True, "lr_scheduler_type": "constant", "report_to": "none", # Disable reporting to avoid tensorboard dependency "eval_strategy": "steps", "eval_steps": 50, "save_total_limit": 3, # Keep more checkpoints for better model selection "load_best_model_at_end": True, "metric_for_best_model": "eval_loss", "greater_is_better": False, "dataloader_pin_memory": False, } # Device-specific optimizations if self.use_cuda: training_kwargs.update({ "optim": "paged_adamw_32bit", "fp16": False, "bf16": True, }) elif self.use_mps: training_kwargs.update({ "optim": "adamw_torch", # Standard optimizer for MPS "fp16": False, # fp16 not supported on MPS in this version "bf16": False, # bf16 not supported on MPS "dataloader_num_workers": 0, # Avoid multiprocessing issues on MPS }) else: training_kwargs.update({ "optim": "adamw_torch", "fp16": False, "bf16": False, "dataloader_num_workers": 0, }) self.training_args = TrainingArguments(**training_kwargs) logger.info(f"Training configured for {self.device} with {training_kwargs['num_train_epochs']} epochs") def train(self): """Train the model""" logger.info("Starting training...") # Data collator data_collator = DataCollatorForLanguageModeling( tokenizer=self.tokenizer, mlm=False, ) # Initialize trainer trainer = Trainer( model=self.model, args=self.training_args, train_dataset=self.train_dataset, eval_dataset=self.val_dataset, tokenizer=self.tokenizer, data_collator=data_collator, ) # Train trainer.train() # Save the final model trainer.save_model() self.tokenizer.save_pretrained(self.training_args.output_dir) logger.info(f"Training completed. Model saved to {self.training_args.output_dir}") def save_lora_adapters(self, output_dir: str = "models/lora_adapters"): """ Save only the LoRA adapters Args: output_dir: Directory to save adapters """ os.makedirs(output_dir, exist_ok=True) self.model.save_pretrained(output_dir) self.tokenizer.save_pretrained(output_dir) logger.info(f"LoRA adapters saved to {output_dir}") def run_full_pipeline(self, data_dir: str = "data"): """ Run the complete fine-tuning pipeline Args: data_dir: Directory containing training data """ try: # Load model and tokenizer self.load_model_and_tokenizer() # Load datasets self.load_datasets(data_dir) # Setup training arguments self.setup_training_args() # Train self.train() # Save LoRA adapters separately self.save_lora_adapters() logger.info("Fine-tuning pipeline completed successfully!") except Exception as e: logger.error(f"Error in fine-tuning pipeline: {e}") raise def main(): """ Main function to run fine-tuning """ # Check if CUDA is available if torch.cuda.is_available(): logger.info(f"CUDA available. GPU: {torch.cuda.get_device_name()}") logger.info(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB") else: logger.warning("CUDA not available. Training will be slow on CPU.") # Initialize fine-tuner fine_tuner = IainMorrisFineTuner() # Run the pipeline fine_tuner.run_full_pipeline() if __name__ == "__main__": main()