Spaces:
Running
Running
""" | |
Advanced CPU Optimizer for training on CPU-only systems | |
Optimized for maximum performance on limited hardware | |
""" | |
import os | |
import logging | |
import threading | |
from typing import Dict, Any, Optional, List | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
from torch.utils.data import DataLoader | |
import numpy as np | |
from .memory_manager import AdvancedMemoryManager | |
logger = logging.getLogger(__name__) | |
class CPUOptimizer: | |
""" | |
Advanced CPU optimization for training and inference | |
""" | |
def __init__(self, memory_manager: AdvancedMemoryManager): | |
""" | |
Initialize CPU optimizer | |
Args: | |
memory_manager: Memory manager instance | |
""" | |
self.memory_manager = memory_manager | |
self.cpu_count = os.cpu_count() | |
self.optimizations_applied = [] | |
# Apply initial optimizations | |
self._apply_global_optimizations() | |
logger.info(f"CPU Optimizer initialized for {self.cpu_count} cores") | |
def _apply_global_optimizations(self): | |
"""Apply global CPU optimizations""" | |
# Set optimal thread count for PyTorch | |
optimal_threads = min(self.cpu_count, 8) # Cap at 8 for stability | |
torch.set_num_threads(optimal_threads) | |
self.optimizations_applied.append(f"PyTorch threads: {optimal_threads}") | |
# Set thread count for inter-op parallelism | |
torch.set_num_interop_threads(min(self.cpu_count // 2, 4)) | |
self.optimizations_applied.append("Inter-op parallelism configured") | |
# Enable Intel MKL optimizations if available | |
try: | |
import intel_extension_for_pytorch as ipex | |
self.optimizations_applied.append("Intel Extension for PyTorch enabled") | |
except ImportError: | |
logger.warning("Intel Extension for PyTorch not available") | |
# Set environment variables for CPU optimization | |
os.environ['OMP_NUM_THREADS'] = str(optimal_threads) | |
os.environ['MKL_NUM_THREADS'] = str(optimal_threads) | |
os.environ['NUMEXPR_NUM_THREADS'] = str(optimal_threads) | |
os.environ['OPENBLAS_NUM_THREADS'] = str(optimal_threads) | |
self.optimizations_applied.append("Environment variables optimized") | |
# Enable CPU-specific optimizations | |
torch.backends.mkl.enabled = True | |
torch.backends.mkldnn.enabled = True | |
self.optimizations_applied.append("MKL and MKLDNN enabled") | |
logger.info(f"Applied optimizations: {', '.join(self.optimizations_applied)}") | |
def optimize_model(self, model: nn.Module, | |
use_jit: bool = True, | |
use_channels_last: bool = True) -> nn.Module: | |
""" | |
Optimize model for CPU inference/training | |
Args: | |
model: PyTorch model to optimize | |
use_jit: Whether to use TorchScript JIT compilation | |
use_channels_last: Whether to use channels-last memory format | |
Returns: | |
Optimized model | |
""" | |
with self.memory_manager.memory_context("optimize_model"): | |
logger.info("Optimizing model for CPU") | |
# Set model to CPU | |
model = model.cpu() | |
# Set to evaluation mode for optimization | |
was_training = model.training | |
model.eval() | |
try: | |
# Apply Intel Extension optimizations if available | |
try: | |
import intel_extension_for_pytorch as ipex | |
model = ipex.optimize(model, dtype=torch.float32) | |
logger.info("Applied Intel Extension optimizations") | |
except ImportError: | |
pass | |
# Apply channels-last memory format for conv models | |
if use_channels_last and self._has_conv_layers(model): | |
model = model.to(memory_format=torch.channels_last) | |
logger.info("Applied channels-last memory format") | |
# Apply TorchScript JIT compilation | |
if use_jit: | |
try: | |
# Create dummy input for tracing | |
dummy_input = self._create_dummy_input(model) | |
if dummy_input is not None: | |
model = torch.jit.trace(model, dummy_input) | |
logger.info("Applied TorchScript JIT compilation") | |
except Exception as e: | |
logger.warning(f"JIT compilation failed: {e}") | |
# Restore training mode if needed | |
if was_training: | |
model.train() | |
return model | |
except Exception as e: | |
logger.error(f"Model optimization failed: {e}") | |
return model | |
def _has_conv_layers(self, model: nn.Module) -> bool: | |
"""Check if model has convolutional layers""" | |
for module in model.modules(): | |
if isinstance(module, (nn.Conv1d, nn.Conv2d, nn.Conv3d)): | |
return True | |
return False | |
def _create_dummy_input(self, model: nn.Module) -> Optional[torch.Tensor]: | |
"""Create dummy input for model tracing""" | |
try: | |
# Try to infer input shape from model | |
for name, param in model.named_parameters(): | |
if 'embedding' in name.lower() and param.dim() == 2: | |
# Text model - create token input | |
vocab_size = param.shape[0] | |
return torch.randint(0, min(vocab_size, 1000), (1, 32)) | |
elif 'conv' in name.lower() and param.dim() == 4: | |
# Vision model - create image input | |
channels = param.shape[1] | |
return torch.randn(1, channels, 224, 224) | |
# Default fallback | |
return torch.randn(1, 512) | |
except Exception: | |
return None | |
def optimize_dataloader(self, dataloader: DataLoader) -> DataLoader: | |
""" | |
Optimize DataLoader for CPU training | |
Args: | |
dataloader: Original DataLoader | |
Returns: | |
Optimized DataLoader | |
""" | |
# Calculate optimal number of workers | |
optimal_workers = min(self.cpu_count // 2, 4) | |
# Create new DataLoader with optimized settings | |
optimized_loader = DataLoader( | |
dataloader.dataset, | |
batch_size=dataloader.batch_size, | |
shuffle=dataloader.drop_last if hasattr(dataloader, 'drop_last') else False, | |
num_workers=optimal_workers, | |
pin_memory=False, # Not needed for CPU | |
persistent_workers=True if optimal_workers > 0 else False, | |
prefetch_factor=2 if optimal_workers > 0 else 2, | |
) | |
logger.info(f"Optimized DataLoader with {optimal_workers} workers") | |
return optimized_loader | |
def optimize_optimizer(self, optimizer: optim.Optimizer, | |
model: nn.Module) -> optim.Optimizer: | |
""" | |
Optimize optimizer settings for CPU training | |
Args: | |
optimizer: PyTorch optimizer | |
model: Model being optimized | |
Returns: | |
Optimized optimizer | |
""" | |
# Apply gradient clipping | |
for param_group in optimizer.param_groups: | |
if 'weight_decay' not in param_group: | |
param_group['weight_decay'] = 0.01 | |
logger.info("Applied optimizer optimizations") | |
return optimizer | |
def enable_mixed_precision(self) -> bool: | |
""" | |
Enable mixed precision training for CPU (if supported) | |
Returns: | |
Whether mixed precision was enabled | |
""" | |
try: | |
# Check if CPU supports mixed precision | |
if torch.cpu.amp.autocast is not None: | |
logger.info("CPU mixed precision available") | |
return True | |
except AttributeError: | |
pass | |
logger.warning("CPU mixed precision not available") | |
return False | |
def optimize_batch_size(self, base_batch_size: int, | |
model_size_mb: float) -> int: | |
""" | |
Calculate optimal batch size based on available memory | |
Args: | |
base_batch_size: Base batch size to start from | |
model_size_mb: Model size in MB | |
Returns: | |
Optimized batch size | |
""" | |
memory_info = self.memory_manager.get_memory_info() | |
available_memory_mb = memory_info['system_memory_available_gb'] * 1024 | |
# Reserve memory for model and overhead | |
usable_memory_mb = available_memory_mb - model_size_mb - 2000 # 2GB overhead | |
# Estimate memory per sample (rough approximation) | |
memory_per_sample_mb = model_size_mb * 0.1 # 10% of model size per sample | |
if memory_per_sample_mb > 0: | |
max_batch_size = int(usable_memory_mb / memory_per_sample_mb) | |
optimal_batch_size = min(base_batch_size, max_batch_size, 32) # Cap at 32 | |
else: | |
optimal_batch_size = min(base_batch_size, 8) # Conservative fallback | |
optimal_batch_size = max(1, optimal_batch_size) # At least 1 | |
logger.info(f"Optimized batch size: {optimal_batch_size} (was {base_batch_size})") | |
return optimal_batch_size | |
def get_performance_recommendations(self, model: nn.Module) -> List[str]: | |
""" | |
Get performance recommendations for the current setup | |
Args: | |
model: Model to analyze | |
Returns: | |
List of recommendations | |
""" | |
recommendations = [] | |
# Check model size | |
param_count = sum(p.numel() for p in model.parameters()) | |
model_size_mb = param_count * 4 / (1024**2) # Assume float32 | |
if model_size_mb > 2000: # > 2GB | |
recommendations.append("Consider using model sharding for large models") | |
recommendations.append("Use gradient checkpointing to reduce memory usage") | |
# Check CPU utilization | |
if self.cpu_count > 8: | |
recommendations.append("Consider using distributed training across CPU cores") | |
# Check memory | |
memory_info = self.memory_manager.get_memory_info() | |
if memory_info['system_memory_percent'] > 80: | |
recommendations.append("Reduce batch size to lower memory usage") | |
recommendations.append("Enable gradient accumulation instead of large batches") | |
# Check for optimization opportunities | |
if not any('Intel Extension' in opt for opt in self.optimizations_applied): | |
recommendations.append("Install Intel Extension for PyTorch for better CPU performance") | |
return recommendations | |
def benchmark_performance(self, model: nn.Module, | |
input_shape: tuple, | |
num_iterations: int = 100) -> Dict[str, float]: | |
""" | |
Benchmark model performance | |
Args: | |
model: Model to benchmark | |
input_shape: Input tensor shape | |
num_iterations: Number of iterations to run | |
Returns: | |
Performance metrics | |
""" | |
model.eval() | |
dummy_input = torch.randn(*input_shape) | |
# Warmup | |
with torch.no_grad(): | |
for _ in range(10): | |
_ = model(dummy_input) | |
# Benchmark | |
import time | |
start_time = time.time() | |
with torch.no_grad(): | |
for _ in range(num_iterations): | |
_ = model(dummy_input) | |
end_time = time.time() | |
total_time = end_time - start_time | |
avg_time_per_inference = total_time / num_iterations | |
throughput = 1.0 / avg_time_per_inference | |
return { | |
'total_time_seconds': total_time, | |
'avg_time_per_inference_ms': avg_time_per_inference * 1000, | |
'throughput_inferences_per_second': throughput, | |
'iterations': num_iterations | |
} | |