#!/usr/bin/env python3 """Real LLM compression and energy test - Phase 4""" import time import json import torch import numpy as np import psutil import os def get_model_size(model): """Calculate actual model size in memory""" param_size = 0 for param in model.parameters(): param_size += param.nelement() * param.element_size() buffer_size = 0 for buffer in model.buffers(): buffer_size += buffer.nelement() * buffer.element_size() return (param_size + buffer_size) / 1024 / 1024 # MB def measure_inference_speed(model, tokenizer, prompts, device='cpu'): """Measure actual inference speed""" model.eval() total_tokens = 0 start_time = time.time() with torch.no_grad(): for prompt in prompts: inputs = tokenizer(prompt, return_tensors='pt', padding=True).to(device) outputs = model.generate( **inputs, max_new_tokens=20, do_sample=False, pad_token_id=tokenizer.pad_token_id ) total_tokens += outputs.shape[1] inference_time = time.time() - start_time return { 'total_tokens': total_tokens, 'time_seconds': inference_time, 'tokens_per_second': total_tokens / inference_time } def run_real_compression_test(): """Run actual model compression test""" print("="*70) print(" "*20 + "REAL LLM COMPRESSION TEST") print("="*70) # Use a smaller model that will actually download from transformers import AutoTokenizer, AutoModelForCausalLM model_name = "distilgpt2" # 82M params, ~320MB device = 'cuda' if torch.cuda.is_available() else 'cpu' print(f"\nšŸ“„ Loading {model_name} model...") print(f"Device: {device}") # Test prompts test_prompts = [ "The future of artificial intelligence is", "Quantum computers will revolutionize", "Energy efficiency in computing means", "Machine learning algorithms can", "The next breakthrough in technology" ] results = {} # 1. Baseline FP32 Model print("\nšŸ”µ Testing FP32 baseline model...") model_fp32 = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=torch.float32 ).to(device) tokenizer = AutoTokenizer.from_pretrained(model_name) tokenizer.pad_token = tokenizer.eos_token fp32_size = get_model_size(model_fp32) fp32_speed = measure_inference_speed(model_fp32, tokenizer, test_prompts, device) results['fp32'] = { 'size_mb': fp32_size, 'dtype': 'float32', **fp32_speed } del model_fp32 if device == 'cuda': torch.cuda.empty_cache() # 2. FP16 Model print("\n🟢 Testing FP16 model...") model_fp16 = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=torch.float16 ).to(device) fp16_size = get_model_size(model_fp16) fp16_speed = measure_inference_speed(model_fp16, tokenizer, test_prompts, device) results['fp16'] = { 'size_mb': fp16_size, 'dtype': 'float16', **fp16_speed } del model_fp16 if device == 'cuda': torch.cuda.empty_cache() # 3. INT8 Quantization (simulated via torch.quantization) print("\n🟔 Testing INT8 quantized model...") model_int8 = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=torch.float32 ) # Dynamic quantization model_int8 = torch.quantization.quantize_dynamic( model_int8, {torch.nn.Linear}, dtype=torch.qint8 ) int8_size = get_model_size(model_int8) int8_speed = measure_inference_speed(model_int8, tokenizer, test_prompts, 'cpu') # INT8 on CPU results['int8'] = { 'size_mb': int8_size, 'dtype': 'int8', **int8_speed } # Calculate improvements results['compression_ratios'] = { 'fp32_to_fp16': results['fp32']['size_mb'] / results['fp16']['size_mb'], 'fp32_to_int8': results['fp32']['size_mb'] / results['int8']['size_mb'], 'fp16_to_int8': results['fp16']['size_mb'] / results['int8']['size_mb'] } results['speedup_ratios'] = { 'fp16_vs_fp32': results['fp16']['tokens_per_second'] / results['fp32']['tokens_per_second'], 'int8_vs_fp32': results['int8']['tokens_per_second'] / results['fp32']['tokens_per_second'] } # Energy estimation (based on time and model size) # Simplified: Energy āˆ time Ɨ model_size baseline_energy = results['fp32']['time_seconds'] * results['fp32']['size_mb'] fp16_energy = results['fp16']['time_seconds'] * results['fp16']['size_mb'] int8_energy = results['int8']['time_seconds'] * results['int8']['size_mb'] results['energy_estimates'] = { 'fp32_relative': 1.0, 'fp16_relative': fp16_energy / baseline_energy, 'int8_relative': int8_energy / baseline_energy, 'fp16_reduction_percent': (1 - fp16_energy / baseline_energy) * 100, 'int8_reduction_percent': (1 - int8_energy / baseline_energy) * 100 } # Check acceptance criteria results['acceptance_criteria'] = { 'compression_4x': max(results['compression_ratios'].values()) >= 4.0, 'energy_reduction_40': max( results['energy_estimates']['fp16_reduction_percent'], results['energy_estimates']['int8_reduction_percent'] ) >= 40.0, 'criteria_met': False } results['acceptance_criteria']['criteria_met'] = ( results['acceptance_criteria']['compression_4x'] or results['acceptance_criteria']['energy_reduction_40'] ) return results if __name__ == "__main__": print("\nšŸ”¬ Starting REAL LLM Compression Test...") # Run the test results = run_real_compression_test() # Display results print("\n" + "="*70) print(" "*25 + "RESULTS") print("="*70) print("\nšŸ“Š Model Sizes:") for dtype in ['fp32', 'fp16', 'int8']: if dtype in results: print(f" {dtype:5}: {results[dtype]['size_mb']:>8.1f} MB") print("\n⚔ Inference Speed:") for dtype in ['fp32', 'fp16', 'int8']: if dtype in results: print(f" {dtype:5}: {results[dtype]['tokens_per_second']:>8.1f} tokens/sec") print("\nšŸ“‰ Compression Ratios:") for key, value in results['compression_ratios'].items(): print(f" {key}: {value:.2f}x") print("\nšŸ”‹ Energy Reduction Estimates:") print(f" FP16: {results['energy_estimates']['fp16_reduction_percent']:.1f}%") print(f" INT8: {results['energy_estimates']['int8_reduction_percent']:.1f}%") print("\nāœ… Acceptance Criteria:") print(f" 4x Compression: {'PASS' if results['acceptance_criteria']['compression_4x'] else 'FAIL'}") print(f" 40% Energy Reduction: {'PASS' if results['acceptance_criteria']['energy_reduction_40'] else 'FAIL'}") # Save results os.makedirs("phase4_outputs", exist_ok=True) with open("phase4_outputs/real_llm_results.json", "w") as f: json.dump(results, f, indent=2) print(f"\nšŸ’¾ Results saved to phase4_outputs/real_llm_results.json") print("="*70)