Spaces:
Runtime error
Runtime error
| """ | |
| LoRA Trainer Module | |
| Implements Low-Rank Adaptation (LoRA) fine-tuning using HuggingFace PEFT library. | |
| Supports 4-bit/8-bit quantization for efficient training on consumer hardware. | |
| """ | |
| import os | |
| import json | |
| from pathlib import Path | |
| from dataclasses import dataclass, field | |
| from typing import Optional, List, Dict, Any | |
| import torch | |
| from transformers import ( | |
| AutoModelForCausalLM, | |
| AutoTokenizer, | |
| TrainingArguments, | |
| Trainer, | |
| DataCollatorForLanguageModeling | |
| ) | |
| from peft import ( | |
| LoraConfig, | |
| get_peft_model, | |
| prepare_model_for_kbit_training, | |
| PeftModel | |
| ) | |
| from datasets import Dataset | |
| class LoRAConfig: | |
| """LoRA configuration parameters.""" | |
| r: int = 8 # LoRA rank | |
| lora_alpha: int = 16 # LoRA alpha (scaling factor) | |
| target_modules: List[str] = field(default_factory=lambda: ["q_proj", "v_proj", "k_proj", "o_proj"]) | |
| lora_dropout: float = 0.05 | |
| bias: str = "none" | |
| task_type: str = "CAUSAL_LM" | |
| class LoRATrainer: | |
| """ | |
| LoRA Trainer for parameter-efficient fine-tuning of large language models. | |
| Features: | |
| - 4-bit/8-bit quantization support | |
| - Automatic dataset tokenization with chat templates | |
| - HuggingFace Trainer integration | |
| - Checkpoint management | |
| - Adapter merging for deployment | |
| Example: | |
| >>> config = LoRAConfig(r=8, lora_alpha=16) | |
| >>> trainer = LoRATrainer("Qwen/Qwen2.5-7B-Instruct", config) | |
| >>> trainer.load_model(use_4bit=True) | |
| >>> trainer.prepare_dataset(training_data) | |
| >>> trainer.train(num_epochs=3) | |
| >>> trainer.save_model("./output") | |
| """ | |
| def __init__( | |
| self, | |
| model_name: str, | |
| lora_config: LoRAConfig, | |
| output_dir: str = "./models/output" | |
| ): | |
| """ | |
| Initialize LoRA Trainer. | |
| Args: | |
| model_name: HuggingFace model path or name | |
| lora_config: LoRA configuration | |
| output_dir: Directory for saving checkpoints and final model | |
| """ | |
| self.model_name = model_name | |
| self.lora_config = lora_config | |
| self.output_dir = Path(output_dir) | |
| self.output_dir.mkdir(parents=True, exist_ok=True) | |
| self.model = None | |
| self.tokenizer = None | |
| self.train_dataset = None | |
| self.eval_dataset = None | |
| self.trainer = None | |
| def load_model( | |
| self, | |
| use_4bit: bool = True, | |
| use_8bit: bool = False, | |
| device_map: str = "auto", | |
| max_memory: Optional[Dict] = None | |
| ) -> None: | |
| """ | |
| Load model with LoRA adapters and optional quantization. | |
| Args: | |
| use_4bit: Use 4-bit quantization (bitsandbytes) | |
| use_8bit: Use 8-bit quantization (alternative to 4-bit) | |
| device_map: Device mapping strategy | |
| max_memory: Maximum memory allocation per device | |
| """ | |
| print(f"Loading model: {self.model_name}") | |
| # Load tokenizer | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| self.model_name, | |
| trust_remote_code=True, | |
| padding_side="right" | |
| ) | |
| # Set pad token if not present | |
| if self.tokenizer.pad_token is None: | |
| self.tokenizer.pad_token = self.tokenizer.eos_token | |
| # Quantization config | |
| quantization_config = None | |
| if use_4bit: | |
| from transformers import BitsAndBytesConfig | |
| quantization_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_compute_dtype=torch.float16, | |
| bnb_4bit_use_double_quant=True, | |
| bnb_4bit_quant_type="nf4" | |
| ) | |
| elif use_8bit: | |
| from transformers import BitsAndBytesConfig | |
| quantization_config = BitsAndBytesConfig(load_in_8bit=True) | |
| # Load base model | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| self.model_name, | |
| quantization_config=quantization_config, | |
| device_map=device_map, | |
| max_memory=max_memory, | |
| trust_remote_code=True, | |
| torch_dtype=torch.float16 if not (use_4bit or use_8bit) else None | |
| ) | |
| # Prepare for k-bit training if quantized | |
| if use_4bit or use_8bit: | |
| self.model = prepare_model_for_kbit_training(self.model) | |
| # Configure LoRA | |
| peft_config = LoraConfig( | |
| r=self.lora_config.r, | |
| lora_alpha=self.lora_config.lora_alpha, | |
| target_modules=self.lora_config.target_modules, | |
| lora_dropout=self.lora_config.lora_dropout, | |
| bias=self.lora_config.bias, | |
| task_type=self.lora_config.task_type | |
| ) | |
| # Apply LoRA adapters | |
| self.model = get_peft_model(self.model, peft_config) | |
| # Print trainable parameters | |
| self.model.print_trainable_parameters() | |
| print(f"β Model loaded with LoRA (rank={self.lora_config.r})") | |
| def prepare_dataset( | |
| self, | |
| data: List[Dict], | |
| validation_split: float = 0.1, | |
| max_length: int = 2048, | |
| test_data: Optional[List[Dict]] = None | |
| ) -> None: | |
| """ | |
| Tokenize and prepare dataset for training. | |
| Args: | |
| data: Training data in format [{"instruction": "...", "input": "...", "output": "..."}] | |
| validation_split: Fraction of data to use for validation | |
| max_length: Maximum sequence length | |
| test_data: Optional separate test dataset | |
| """ | |
| print(f"Preparing dataset: {len(data)} examples") | |
| def format_prompt(example): | |
| """Format example using chat template.""" | |
| # Build conversation | |
| messages = [] | |
| # System message (optional, can be customized) | |
| messages.append({ | |
| "role": "system", | |
| "content": "You are a helpful AI assistant." | |
| }) | |
| # User message | |
| user_content = example.get("instruction", "") | |
| if example.get("input"): | |
| user_content += f"\n\n{example['input']}" | |
| messages.append({ | |
| "role": "user", | |
| "content": user_content | |
| }) | |
| # Assistant response | |
| messages.append({ | |
| "role": "assistant", | |
| "content": example.get("output", "") | |
| }) | |
| # Apply chat template | |
| try: | |
| formatted = self.tokenizer.apply_chat_template( | |
| messages, | |
| tokenize=False, | |
| add_generation_prompt=False | |
| ) | |
| except Exception: | |
| # Fallback if chat template not available | |
| formatted = f"{user_content}\n\n{example.get('output', '')}" | |
| return {"text": formatted} | |
| # Format all examples | |
| formatted_data = [format_prompt(ex) for ex in data] | |
| # Split train/val | |
| if test_data is None: | |
| split_idx = int(len(formatted_data) * (1 - validation_split)) | |
| train_data = formatted_data[:split_idx] | |
| val_data = formatted_data[split_idx:] | |
| else: | |
| train_data = formatted_data | |
| val_data = [format_prompt(ex) for ex in test_data] | |
| # Create datasets | |
| self.train_dataset = Dataset.from_list(train_data) | |
| self.eval_dataset = Dataset.from_list(val_data) if val_data else None | |
| # Tokenization function | |
| def tokenize_function(examples): | |
| tokenized = self.tokenizer( | |
| examples["text"], | |
| truncation=True, | |
| max_length=max_length, | |
| padding="max_length", | |
| return_tensors=None | |
| ) | |
| tokenized["labels"] = tokenized["input_ids"].copy() | |
| return tokenized | |
| # Tokenize | |
| self.train_dataset = self.train_dataset.map( | |
| tokenize_function, | |
| batched=True, | |
| remove_columns=self.train_dataset.column_names | |
| ) | |
| if self.eval_dataset: | |
| self.eval_dataset = self.eval_dataset.map( | |
| tokenize_function, | |
| batched=True, | |
| remove_columns=self.eval_dataset.column_names | |
| ) | |
| print(f"β Dataset prepared: {len(self.train_dataset)} train, {len(self.eval_dataset) if self.eval_dataset else 0} val") | |
| def train( | |
| self, | |
| num_epochs: int = 3, | |
| learning_rate: float = 2e-4, | |
| per_device_train_batch_size: int = 4, | |
| per_device_eval_batch_size: int = 4, | |
| gradient_accumulation_steps: int = 4, | |
| warmup_steps: int = 100, | |
| logging_steps: int = 10, | |
| save_steps: int = 500, | |
| eval_steps: int = 500, | |
| fp16: bool = True, | |
| optim: str = "paged_adamw_8bit" | |
| ) -> None: | |
| """ | |
| Train the model with LoRA. | |
| Args: | |
| num_epochs: Number of training epochs | |
| learning_rate: Learning rate | |
| per_device_train_batch_size: Batch size per device for training | |
| per_device_eval_batch_size: Batch size per device for evaluation | |
| gradient_accumulation_steps: Gradient accumulation steps | |
| warmup_steps: Learning rate warmup steps | |
| logging_steps: Log every N steps | |
| save_steps: Save checkpoint every N steps | |
| eval_steps: Evaluate every N steps | |
| fp16: Use mixed precision training | |
| optim: Optimizer type | |
| """ | |
| if self.model is None: | |
| raise ValueError("Model not loaded. Call load_model() first.") | |
| if self.train_dataset is None: | |
| raise ValueError("Dataset not prepared. Call prepare_dataset() first.") | |
| print(f"Starting training: {num_epochs} epochs") | |
| # Training arguments | |
| training_args = TrainingArguments( | |
| output_dir=str(self.output_dir), | |
| num_train_epochs=num_epochs, | |
| per_device_train_batch_size=per_device_train_batch_size, | |
| per_device_eval_batch_size=per_device_eval_batch_size, | |
| gradient_accumulation_steps=gradient_accumulation_steps, | |
| learning_rate=learning_rate, | |
| warmup_steps=warmup_steps, | |
| logging_steps=logging_steps, | |
| save_steps=save_steps, | |
| eval_steps=eval_steps if self.eval_dataset else None, | |
| evaluation_strategy="steps" if self.eval_dataset else "no", | |
| save_strategy="steps", | |
| fp16=fp16, | |
| optim=optim, | |
| load_best_model_at_end=True if self.eval_dataset else False, | |
| save_total_limit=3, | |
| report_to=[] # Disable wandb/tensorboard by default | |
| ) | |
| # Data collator | |
| data_collator = DataCollatorForLanguageModeling( | |
| tokenizer=self.tokenizer, | |
| mlm=False | |
| ) | |
| # Initialize trainer | |
| self.trainer = Trainer( | |
| model=self.model, | |
| args=training_args, | |
| train_dataset=self.train_dataset, | |
| eval_dataset=self.eval_dataset, | |
| data_collator=data_collator | |
| ) | |
| # Train | |
| self.trainer.train() | |
| print("β Training complete!") | |
| def save_model(self, save_path: Optional[str] = None) -> None: | |
| """ | |
| Save LoRA adapter weights. | |
| Args: | |
| save_path: Path to save adapters (uses output_dir if None) | |
| """ | |
| if save_path is None: | |
| save_path = str(self.output_dir / "final_model") | |
| else: | |
| save_path = str(Path(save_path)) | |
| Path(save_path).mkdir(parents=True, exist_ok=True) | |
| # Save adapter | |
| self.model.save_pretrained(save_path) | |
| self.tokenizer.save_pretrained(save_path) | |
| # Save config | |
| config_path = Path(save_path) / "lora_config.json" | |
| with open(config_path, 'w') as f: | |
| json.dump({ | |
| "r": self.lora_config.r, | |
| "lora_alpha": self.lora_config.lora_alpha, | |
| "target_modules": self.lora_config.target_modules, | |
| "lora_dropout": self.lora_config.lora_dropout | |
| }, f, indent=2) | |
| print(f"β Model saved to: {save_path}") | |
| def load_adapter(self, adapter_path: str) -> None: | |
| """ | |
| Load pre-trained LoRA adapter. | |
| Args: | |
| adapter_path: Path to adapter weights | |
| """ | |
| if self.model is None: | |
| raise ValueError("Base model not loaded. Call load_model() first.") | |
| print(f"Loading adapter from: {adapter_path}") | |
| self.model = PeftModel.from_pretrained( | |
| self.model, | |
| adapter_path, | |
| is_trainable=True | |
| ) | |
| print("β Adapter loaded") | |
| def merge_and_save(self, save_path: str) -> None: | |
| """ | |
| Merge LoRA weights with base model and save full model. | |
| Args: | |
| save_path: Path to save merged model | |
| """ | |
| print("Merging LoRA weights with base model...") | |
| # Merge | |
| merged_model = self.model.merge_and_unload() | |
| # Save | |
| Path(save_path).mkdir(parents=True, exist_ok=True) | |
| merged_model.save_pretrained(save_path) | |
| self.tokenizer.save_pretrained(save_path) | |
| print(f"β Merged model saved to: {save_path}") | |
| def evaluate_on_test_set( | |
| self, | |
| test_data: List[Dict], | |
| max_samples: int = 50, | |
| max_new_tokens: int = 256 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Evaluate model on test set. | |
| Args: | |
| test_data: Test examples | |
| max_samples: Maximum number of samples to evaluate | |
| max_new_tokens: Maximum tokens to generate | |
| Returns: | |
| Evaluation results dictionary | |
| """ | |
| import time | |
| print(f"Evaluating on {min(len(test_data), max_samples)} test examples...") | |
| results = { | |
| "num_examples": min(len(test_data), max_samples), | |
| "responses": [], | |
| "avg_response_length": 0, | |
| "total_time": 0, | |
| "throughput": 0 | |
| } | |
| self.model.eval() | |
| start_time = time.time() | |
| for i, example in enumerate(test_data[:max_samples]): | |
| # Format prompt | |
| user_content = example.get("instruction", "") | |
| if example.get("input"): | |
| user_content += f"\n\n{example['input']}" | |
| messages = [{"role": "user", "content": user_content}] | |
| try: | |
| prompt = self.tokenizer.apply_chat_template( | |
| messages, | |
| tokenize=False, | |
| add_generation_prompt=True | |
| ) | |
| except Exception: | |
| prompt = user_content | |
| # Tokenize | |
| inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device) | |
| # Generate | |
| with torch.no_grad(): | |
| outputs = self.model.generate( | |
| **inputs, | |
| max_new_tokens=max_new_tokens, | |
| temperature=0.7, | |
| do_sample=True, | |
| top_p=0.9 | |
| ) | |
| # Decode | |
| response = self.tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True) | |
| results["responses"].append({ | |
| "input": user_content, | |
| "expected": example.get("output", ""), | |
| "generated": response | |
| }) | |
| # Calculate metrics | |
| results["total_time"] = time.time() - start_time | |
| results["avg_response_length"] = sum(len(r["generated"]) for r in results["responses"]) / len(results["responses"]) | |
| results["throughput"] = len(results["responses"]) / results["total_time"] | |
| print(f"β Evaluation complete: {results['throughput']:.2f} examples/sec") | |
| return results | |