File size: 12,496 Bytes
ab4e093
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
"""
Advanced CPU Optimizer for training on CPU-only systems
Optimized for maximum performance on limited hardware
"""

import os
import logging
import threading
from typing import Dict, Any, Optional, List
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import numpy as np
from .memory_manager import AdvancedMemoryManager

logger = logging.getLogger(__name__)

class CPUOptimizer:
    """
    Advanced CPU optimization for training and inference
    """
    
    def __init__(self, memory_manager: AdvancedMemoryManager):
        """
        Initialize CPU optimizer
        
        Args:
            memory_manager: Memory manager instance
        """
        self.memory_manager = memory_manager
        self.cpu_count = os.cpu_count()
        self.optimizations_applied = []
        
        # Apply initial optimizations
        self._apply_global_optimizations()
        
        logger.info(f"CPU Optimizer initialized for {self.cpu_count} cores")
    
    def _apply_global_optimizations(self):
        """Apply global CPU optimizations"""
        
        # Set optimal thread count for PyTorch
        optimal_threads = min(self.cpu_count, 8)  # Cap at 8 for stability
        torch.set_num_threads(optimal_threads)
        self.optimizations_applied.append(f"PyTorch threads: {optimal_threads}")
        
        # Set thread count for inter-op parallelism
        torch.set_num_interop_threads(min(self.cpu_count // 2, 4))
        self.optimizations_applied.append("Inter-op parallelism configured")
        
        # Enable Intel MKL optimizations if available
        try:
            import intel_extension_for_pytorch as ipex
            self.optimizations_applied.append("Intel Extension for PyTorch enabled")
        except ImportError:
            logger.warning("Intel Extension for PyTorch not available")
        
        # Set environment variables for CPU optimization
        os.environ['OMP_NUM_THREADS'] = str(optimal_threads)
        os.environ['MKL_NUM_THREADS'] = str(optimal_threads)
        os.environ['NUMEXPR_NUM_THREADS'] = str(optimal_threads)
        os.environ['OPENBLAS_NUM_THREADS'] = str(optimal_threads)
        self.optimizations_applied.append("Environment variables optimized")
        
        # Enable CPU-specific optimizations
        torch.backends.mkl.enabled = True
        torch.backends.mkldnn.enabled = True
        self.optimizations_applied.append("MKL and MKLDNN enabled")
        
        logger.info(f"Applied optimizations: {', '.join(self.optimizations_applied)}")
    
    def optimize_model(self, model: nn.Module, 
                      use_jit: bool = True,
                      use_channels_last: bool = True) -> nn.Module:
        """
        Optimize model for CPU inference/training
        
        Args:
            model: PyTorch model to optimize
            use_jit: Whether to use TorchScript JIT compilation
            use_channels_last: Whether to use channels-last memory format
            
        Returns:
            Optimized model
        """
        with self.memory_manager.memory_context("optimize_model"):
            logger.info("Optimizing model for CPU")
            
            # Set model to CPU
            model = model.cpu()
            
            # Set to evaluation mode for optimization
            was_training = model.training
            model.eval()
            
            try:
                # Apply Intel Extension optimizations if available
                try:
                    import intel_extension_for_pytorch as ipex
                    model = ipex.optimize(model, dtype=torch.float32)
                    logger.info("Applied Intel Extension optimizations")
                except ImportError:
                    pass
                
                # Apply channels-last memory format for conv models
                if use_channels_last and self._has_conv_layers(model):
                    model = model.to(memory_format=torch.channels_last)
                    logger.info("Applied channels-last memory format")
                
                # Apply TorchScript JIT compilation
                if use_jit:
                    try:
                        # Create dummy input for tracing
                        dummy_input = self._create_dummy_input(model)
                        if dummy_input is not None:
                            model = torch.jit.trace(model, dummy_input)
                            logger.info("Applied TorchScript JIT compilation")
                    except Exception as e:
                        logger.warning(f"JIT compilation failed: {e}")
                
                # Restore training mode if needed
                if was_training:
                    model.train()
                
                return model
                
            except Exception as e:
                logger.error(f"Model optimization failed: {e}")
                return model
    
    def _has_conv_layers(self, model: nn.Module) -> bool:
        """Check if model has convolutional layers"""
        for module in model.modules():
            if isinstance(module, (nn.Conv1d, nn.Conv2d, nn.Conv3d)):
                return True
        return False
    
    def _create_dummy_input(self, model: nn.Module) -> Optional[torch.Tensor]:
        """Create dummy input for model tracing"""
        try:
            # Try to infer input shape from model
            for name, param in model.named_parameters():
                if 'embedding' in name.lower() and param.dim() == 2:
                    # Text model - create token input
                    vocab_size = param.shape[0]
                    return torch.randint(0, min(vocab_size, 1000), (1, 32))
                elif 'conv' in name.lower() and param.dim() == 4:
                    # Vision model - create image input
                    channels = param.shape[1]
                    return torch.randn(1, channels, 224, 224)
            
            # Default fallback
            return torch.randn(1, 512)
            
        except Exception:
            return None
    
    def optimize_dataloader(self, dataloader: DataLoader) -> DataLoader:
        """
        Optimize DataLoader for CPU training
        
        Args:
            dataloader: Original DataLoader
            
        Returns:
            Optimized DataLoader
        """
        # Calculate optimal number of workers
        optimal_workers = min(self.cpu_count // 2, 4)
        
        # Create new DataLoader with optimized settings
        optimized_loader = DataLoader(
            dataloader.dataset,
            batch_size=dataloader.batch_size,
            shuffle=dataloader.drop_last if hasattr(dataloader, 'drop_last') else False,
            num_workers=optimal_workers,
            pin_memory=False,  # Not needed for CPU
            persistent_workers=True if optimal_workers > 0 else False,
            prefetch_factor=2 if optimal_workers > 0 else 2,
        )
        
        logger.info(f"Optimized DataLoader with {optimal_workers} workers")
        return optimized_loader
    
    def optimize_optimizer(self, optimizer: optim.Optimizer, 
                          model: nn.Module) -> optim.Optimizer:
        """
        Optimize optimizer settings for CPU training
        
        Args:
            optimizer: PyTorch optimizer
            model: Model being optimized
            
        Returns:
            Optimized optimizer
        """
        # Apply gradient clipping
        for param_group in optimizer.param_groups:
            if 'weight_decay' not in param_group:
                param_group['weight_decay'] = 0.01
        
        logger.info("Applied optimizer optimizations")
        return optimizer
    
    def enable_mixed_precision(self) -> bool:
        """
        Enable mixed precision training for CPU (if supported)
        
        Returns:
            Whether mixed precision was enabled
        """
        try:
            # Check if CPU supports mixed precision
            if torch.cpu.amp.autocast is not None:
                logger.info("CPU mixed precision available")
                return True
        except AttributeError:
            pass
        
        logger.warning("CPU mixed precision not available")
        return False
    
    def optimize_batch_size(self, base_batch_size: int, 
                           model_size_mb: float) -> int:
        """
        Calculate optimal batch size based on available memory
        
        Args:
            base_batch_size: Base batch size to start from
            model_size_mb: Model size in MB
            
        Returns:
            Optimized batch size
        """
        memory_info = self.memory_manager.get_memory_info()
        available_memory_mb = memory_info['system_memory_available_gb'] * 1024
        
        # Reserve memory for model and overhead
        usable_memory_mb = available_memory_mb - model_size_mb - 2000  # 2GB overhead
        
        # Estimate memory per sample (rough approximation)
        memory_per_sample_mb = model_size_mb * 0.1  # 10% of model size per sample
        
        if memory_per_sample_mb > 0:
            max_batch_size = int(usable_memory_mb / memory_per_sample_mb)
            optimal_batch_size = min(base_batch_size, max_batch_size, 32)  # Cap at 32
        else:
            optimal_batch_size = min(base_batch_size, 8)  # Conservative fallback
        
        optimal_batch_size = max(1, optimal_batch_size)  # At least 1
        
        logger.info(f"Optimized batch size: {optimal_batch_size} (was {base_batch_size})")
        return optimal_batch_size
    
    def get_performance_recommendations(self, model: nn.Module) -> List[str]:
        """
        Get performance recommendations for the current setup
        
        Args:
            model: Model to analyze
            
        Returns:
            List of recommendations
        """
        recommendations = []
        
        # Check model size
        param_count = sum(p.numel() for p in model.parameters())
        model_size_mb = param_count * 4 / (1024**2)  # Assume float32
        
        if model_size_mb > 2000:  # > 2GB
            recommendations.append("Consider using model sharding for large models")
            recommendations.append("Use gradient checkpointing to reduce memory usage")
        
        # Check CPU utilization
        if self.cpu_count > 8:
            recommendations.append("Consider using distributed training across CPU cores")
        
        # Check memory
        memory_info = self.memory_manager.get_memory_info()
        if memory_info['system_memory_percent'] > 80:
            recommendations.append("Reduce batch size to lower memory usage")
            recommendations.append("Enable gradient accumulation instead of large batches")
        
        # Check for optimization opportunities
        if not any('Intel Extension' in opt for opt in self.optimizations_applied):
            recommendations.append("Install Intel Extension for PyTorch for better CPU performance")
        
        return recommendations
    
    def benchmark_performance(self, model: nn.Module, 
                            input_shape: tuple,
                            num_iterations: int = 100) -> Dict[str, float]:
        """
        Benchmark model performance
        
        Args:
            model: Model to benchmark
            input_shape: Input tensor shape
            num_iterations: Number of iterations to run
            
        Returns:
            Performance metrics
        """
        model.eval()
        dummy_input = torch.randn(*input_shape)
        
        # Warmup
        with torch.no_grad():
            for _ in range(10):
                _ = model(dummy_input)
        
        # Benchmark
        import time
        start_time = time.time()
        
        with torch.no_grad():
            for _ in range(num_iterations):
                _ = model(dummy_input)
        
        end_time = time.time()
        
        total_time = end_time - start_time
        avg_time_per_inference = total_time / num_iterations
        throughput = 1.0 / avg_time_per_inference
        
        return {
            'total_time_seconds': total_time,
            'avg_time_per_inference_ms': avg_time_per_inference * 1000,
            'throughput_inferences_per_second': throughput,
            'iterations': num_iterations
        }