Ahmedik95316's picture
Create error_handler.py
187f752
# utils/error_handler.py
# Production-ready error handling system for MLOps grade enhancement
import functools
import traceback
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Optional, Callable, Union, Type
from contextlib import contextmanager
from enum import Enum
import json
# Import structured logger
try:
from .structured_logger import StructuredLogger, EventType, LogLevel, MLOpsLoggers
STRUCTURED_LOGGING_AVAILABLE = True
except ImportError:
STRUCTURED_LOGGING_AVAILABLE = False
# Fallback to standard logging
import logging
class ErrorSeverity(Enum):
"""Error severity levels for classification and handling"""
LOW = "low" # Non-critical errors that don't affect core functionality
MEDIUM = "medium" # Errors that degrade performance but allow continuation
HIGH = "high" # Critical errors that require immediate attention
CRITICAL = "critical" # System-breaking errors that require emergency response
class ErrorCategory(Enum):
"""Error categories for better classification and handling"""
# Data-related errors
DATA_VALIDATION = "data_validation"
DATA_LOADING = "data_loading"
DATA_PREPROCESSING = "data_preprocessing"
DATA_QUALITY = "data_quality"
# Model-related errors
MODEL_TRAINING = "model_training"
MODEL_VALIDATION = "model_validation"
MODEL_LOADING = "model_loading"
MODEL_PREDICTION = "model_prediction"
# Feature engineering errors
FEATURE_EXTRACTION = "feature_extraction"
FEATURE_SELECTION = "feature_selection"
# System-related errors
RESOURCE_CONSTRAINT = "resource_constraint"
CONFIGURATION = "configuration"
DEPENDENCY = "dependency"
IO_OPERATION = "io_operation"
# API and service errors
API_ERROR = "api_error"
AUTHENTICATION = "authentication"
VALIDATION = "validation"
# External service errors
EXTERNAL_SERVICE = "external_service"
NETWORK = "network"
# Unknown/uncategorized errors
UNKNOWN = "unknown"
class MLOpsError(Exception):
"""Base exception class for MLOps-related errors"""
def __init__(self,
message: str,
category: ErrorCategory = ErrorCategory.UNKNOWN,
severity: ErrorSeverity = ErrorSeverity.MEDIUM,
component: str = None,
metadata: Dict[str, Any] = None,
suggestion: str = None,
original_error: Exception = None):
self.message = message
self.category = category
self.severity = severity
self.component = component
self.metadata = metadata or {}
self.suggestion = suggestion
self.original_error = original_error
self.timestamp = datetime.now().isoformat()
super().__init__(self.message)
def to_dict(self) -> Dict[str, Any]:
"""Convert error to dictionary for logging/serialization"""
return {
'message': self.message,
'category': self.category.value,
'severity': self.severity.value,
'component': self.component,
'metadata': self.metadata,
'suggestion': self.suggestion,
'timestamp': self.timestamp,
'original_error': {
'type': type(self.original_error).__name__ if self.original_error else None,
'message': str(self.original_error) if self.original_error else None
}
}
# Specific error types for different scenarios
class DataValidationError(MLOpsError):
"""Error in data validation"""
def __init__(self, message: str, **kwargs):
super().__init__(message, category=ErrorCategory.DATA_VALIDATION,
severity=ErrorSeverity.HIGH, **kwargs)
class ModelTrainingError(MLOpsError):
"""Error during model training"""
def __init__(self, message: str, **kwargs):
super().__init__(message, category=ErrorCategory.MODEL_TRAINING,
severity=ErrorSeverity.HIGH, **kwargs)
class ResourceConstraintError(MLOpsError):
"""Error due to resource constraints (CPU/Memory)"""
def __init__(self, message: str, **kwargs):
super().__init__(message, category=ErrorCategory.RESOURCE_CONSTRAINT,
severity=ErrorSeverity.MEDIUM, **kwargs)
class ConfigurationError(MLOpsError):
"""Error in configuration or setup"""
def __init__(self, message: str, **kwargs):
super().__init__(message, category=ErrorCategory.CONFIGURATION,
severity=ErrorSeverity.HIGH, **kwargs)
class FeatureEngineeringError(MLOpsError):
"""Error in feature engineering process"""
def __init__(self, message: str, **kwargs):
super().__init__(message, category=ErrorCategory.FEATURE_EXTRACTION,
severity=ErrorSeverity.MEDIUM, **kwargs)
class ErrorHandler:
"""Centralized error handling with logging, recovery, and monitoring"""
def __init__(self, component: str, logger: Optional[StructuredLogger] = None):
self.component = component
self.error_count = {} # Track error frequency
self.recovery_strategies = {} # Store recovery functions
# Setup logger
if STRUCTURED_LOGGING_AVAILABLE and logger is None:
self.logger = MLOpsLoggers.get_logger(component)
elif logger:
self.logger = logger
else:
# Fallback to standard logging
import logging
self.logger = logging.getLogger(component)
def register_recovery_strategy(self,
error_category: ErrorCategory,
recovery_func: Callable):
"""Register recovery strategy for specific error category"""
self.recovery_strategies[error_category] = recovery_func
def handle_error(self,
error: Exception,
context: Dict[str, Any] = None,
category: ErrorCategory = None,
severity: ErrorSeverity = None,
suggestion: str = None,
attempt_recovery: bool = True) -> Dict[str, Any]:
"""
Central error handling method
Returns:
Dict with error details and recovery status
"""
# Convert to MLOpsError if not already
if not isinstance(error, MLOpsError):
mlops_error = MLOpsError(
message=str(error),
category=category or self._classify_error(error),
severity=severity or self._determine_severity(error),
component=self.component,
metadata=context or {},
suggestion=suggestion,
original_error=error
)
else:
mlops_error = error
# Track error frequency
error_key = f"{mlops_error.category.value}:{type(error).__name__}"
self.error_count[error_key] = self.error_count.get(error_key, 0) + 1
# Log error
self._log_error(mlops_error, context)
# Attempt recovery if enabled
recovery_result = None
if attempt_recovery and mlops_error.category in self.recovery_strategies:
try:
recovery_result = self.recovery_strategies[mlops_error.category](mlops_error, context)
self._log_recovery_attempt(mlops_error, recovery_result)
except Exception as recovery_error:
self._log_recovery_failure(mlops_error, recovery_error)
return {
'error': mlops_error.to_dict(),
'recovery_attempted': recovery_result is not None,
'recovery_successful': recovery_result is not None and recovery_result.get('success', False),
'recovery_result': recovery_result,
'error_count': self.error_count.get(error_key, 1)
}
def _classify_error(self, error: Exception) -> ErrorCategory:
"""Automatically classify error based on type and message"""
error_type = type(error).__name__.lower()
error_message = str(error).lower()
# Data-related errors
if any(keyword in error_message for keyword in ['data', 'dataframe', 'csv', 'dataset']):
if any(keyword in error_message for keyword in ['validation', 'invalid', 'format']):
return ErrorCategory.DATA_VALIDATION
elif any(keyword in error_message for keyword in ['load', 'read', 'file']):
return ErrorCategory.DATA_LOADING
else:
return ErrorCategory.DATA_PREPROCESSING
# Model-related errors
if any(keyword in error_message for keyword in ['model', 'training', 'fit', 'predict']):
if 'training' in error_message or 'fit' in error_message:
return ErrorCategory.MODEL_TRAINING
elif 'predict' in error_message:
return ErrorCategory.MODEL_PREDICTION
else:
return ErrorCategory.MODEL_VALIDATION
# Resource constraints
if any(keyword in error_message for keyword in ['memory', 'cpu', 'resource', 'timeout']):
return ErrorCategory.RESOURCE_CONSTRAINT
# IO errors
if 'ioerror' in error_type or any(keyword in error_message for keyword in ['file', 'path', 'directory']):
return ErrorCategory.IO_OPERATION
# Configuration errors
if any(keyword in error_message for keyword in ['config', 'parameter', 'argument']):
return ErrorCategory.CONFIGURATION
# Feature engineering
if any(keyword in error_message for keyword in ['feature', 'transform', 'vectoriz']):
return ErrorCategory.FEATURE_EXTRACTION
# API errors
if any(keyword in error_message for keyword in ['api', 'request', 'response', 'http']):
return ErrorCategory.API_ERROR
return ErrorCategory.UNKNOWN
def _determine_severity(self, error: Exception) -> ErrorSeverity:
"""Determine error severity based on error type and context"""
error_type = type(error).__name__.lower()
error_message = str(error).lower()
# Critical system errors
if error_type in ['systemexit', 'keyboardinterrupt', 'memoryerror']:
return ErrorSeverity.CRITICAL
# High severity - prevents core functionality
if any(keyword in error_message for keyword in ['training failed', 'model not found', 'critical']):
return ErrorSeverity.HIGH
# Medium severity - degrades performance
if any(keyword in error_message for keyword in ['warning', 'timeout', 'resource']):
return ErrorSeverity.MEDIUM
# Default to medium for unknown errors
return ErrorSeverity.MEDIUM
def _log_error(self, error: MLOpsError, context: Dict[str, Any]):
"""Log error with structured logging"""
if STRUCTURED_LOGGING_AVAILABLE:
log_level = self._get_log_level_for_severity(error.severity)
self.logger.log(
level=log_level,
event_type=EventType.MODEL_TRAINING_ERROR,
message=f"Error in {self.component}: {error.message}",
component=self.component,
metadata={
'error_category': error.category.value,
'error_severity': error.severity.value,
'error_metadata': error.metadata,
'context': context or {},
'suggestion': error.suggestion,
'error_count': self.error_count.get(f"{error.category.value}:{type(error.original_error).__name__}", 1)
},
tags=[error.category.value, error.severity.value, 'error_handling']
)
else:
# Fallback logging
self.logger.error(f"Error in {self.component}: {error.message}")
def _get_log_level_for_severity(self, severity: ErrorSeverity) -> LogLevel:
"""Map error severity to log level"""
severity_to_log_level = {
ErrorSeverity.LOW: LogLevel.WARNING,
ErrorSeverity.MEDIUM: LogLevel.ERROR,
ErrorSeverity.HIGH: LogLevel.ERROR,
ErrorSeverity.CRITICAL: LogLevel.CRITICAL
}
return severity_to_log_level.get(severity, LogLevel.ERROR)
def _log_recovery_attempt(self, error: MLOpsError, recovery_result: Dict[str, Any]):
"""Log recovery attempt results"""
if STRUCTURED_LOGGING_AVAILABLE:
success = recovery_result.get('success', False)
event_type = EventType.MODEL_TRAINING_COMPLETE if success else EventType.MODEL_TRAINING_ERROR
self.logger.info(
event_type,
f"Recovery {'succeeded' if success else 'failed'} for {error.category.value} error",
component=self.component,
metadata={
'original_error': error.message,
'recovery_result': recovery_result,
'error_category': error.category.value
},
tags=['error_recovery', 'automated_recovery']
)
def _log_recovery_failure(self, error: MLOpsError, recovery_error: Exception):
"""Log recovery failure"""
if STRUCTURED_LOGGING_AVAILABLE:
self.logger.error(
EventType.MODEL_TRAINING_ERROR,
f"Recovery failed for {error.category.value} error: {str(recovery_error)}",
component=self.component,
metadata={
'original_error': error.message,
'recovery_error': str(recovery_error),
'error_category': error.category.value
},
tags=['error_recovery', 'recovery_failure']
)
# Decorator for automatic error handling
def handle_errors(component: str = None,
category: ErrorCategory = None,
severity: ErrorSeverity = None,
attempt_recovery: bool = True,
reraise: bool = True):
"""Decorator for automatic error handling"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
comp_name = component or func.__module__
error_handler = ErrorHandler(comp_name)
try:
return func(*args, **kwargs)
except Exception as e:
# Handle the error
result = error_handler.handle_error(
error=e,
context={
'function': func.__name__,
'args_count': len(args),
'kwargs_count': len(kwargs)
},
category=category,
severity=severity,
attempt_recovery=attempt_recovery
)
# Re-raise if specified, otherwise return error result
if reraise:
raise
else:
return result
return wrapper
return decorator
# Context manager for error handling
@contextmanager
def error_handling_context(component: str,
operation: str,
category: ErrorCategory = None,
severity: ErrorSeverity = None,
metadata: Dict[str, Any] = None):
"""Context manager for handling errors within a specific operation"""
error_handler = ErrorHandler(component)
try:
yield error_handler
except Exception as e:
result = error_handler.handle_error(
error=e,
context={
'operation': operation,
**(metadata or {})
},
category=category,
severity=severity
)
# Always re-raise in context manager
raise
# Recovery strategies for common scenarios
class RecoveryStrategies:
"""Common recovery strategies for different error categories"""
@staticmethod
def data_loading_recovery(error: MLOpsError, context: Dict[str, Any]) -> Dict[str, Any]:
"""Recovery strategy for data loading errors"""
try:
# Try alternative data sources or fallback datasets
if 'file_path' in context:
# Try backup locations
backup_paths = [
Path(context['file_path']).with_suffix('.backup.csv'),
Path('/tmp/data/fallback_dataset.csv'),
Path('/tmp/data/combined_dataset.csv')
]
for backup_path in backup_paths:
if backup_path.exists():
return {
'success': True,
'recovery_method': 'fallback_data_source',
'fallback_path': str(backup_path)
}
return {'success': False, 'reason': 'No fallback data sources available'}
except Exception as e:
return {'success': False, 'error': str(e)}
@staticmethod
def model_training_recovery(error: MLOpsError, context: Dict[str, Any]) -> Dict[str, Any]:
"""Recovery strategy for model training errors"""
try:
# Common recovery strategies for training failures
recovery_methods = []
# Reduce model complexity
if 'resource' in str(error.message).lower():
recovery_methods.append('reduce_model_complexity')
# Fallback to simpler model
if 'lightgbm' in str(error.message).lower():
recovery_methods.append('fallback_to_logistic_regression')
# Reduce dataset size for memory issues
if 'memory' in str(error.message).lower():
recovery_methods.append('reduce_dataset_size')
return {
'success': len(recovery_methods) > 0,
'recovery_methods': recovery_methods,
'suggestion': 'Apply suggested recovery methods and retry training'
}
except Exception as e:
return {'success': False, 'error': str(e)}
@staticmethod
def feature_engineering_recovery(error: MLOpsError, context: Dict[str, Any]) -> Dict[str, Any]:
"""Recovery strategy for feature engineering errors"""
try:
# Fallback to standard TF-IDF if enhanced features fail
if 'enhanced' in str(error.message).lower():
return {
'success': True,
'recovery_method': 'fallback_to_standard_features',
'suggestion': 'Switch to standard TF-IDF features and continue training'
}
return {'success': False, 'reason': 'No applicable recovery method'}
except Exception as e:
return {'success': False, 'error': str(e)}
# CPU constraint specific error handling for HuggingFace Spaces
class CPUConstraintHandler:
"""Specialized handler for CPU constraint issues in HuggingFace Spaces"""
def __init__(self, component: str):
self.component = component
self.error_handler = ErrorHandler(component)
# Register CPU-specific recovery strategies
self.error_handler.register_recovery_strategy(
ErrorCategory.RESOURCE_CONSTRAINT,
self._cpu_recovery_strategy
)
def _cpu_recovery_strategy(self, error: MLOpsError, context: Dict[str, Any]) -> Dict[str, Any]:
"""Recovery strategy specifically for CPU constraints"""
try:
recovery_actions = []
# Reduce parallel processing
if 'n_jobs' in str(error.message) or 'parallel' in str(error.message):
recovery_actions.append('force_single_threading')
# Reduce model complexity for CPU efficiency
if 'training' in context.get('operation', '').lower():
recovery_actions.extend([
'reduce_cv_folds',
'simplify_hyperparameter_grid',
'disable_ensemble_if_slow'
])
# Memory optimization for CPU-bound systems
if 'memory' in str(error.message).lower():
recovery_actions.extend([
'reduce_feature_dimensions',
'batch_processing',
'garbage_collection'
])
return {
'success': len(recovery_actions) > 0,
'recovery_actions': recovery_actions,
'cpu_optimizations': True,
'environment': 'huggingface_spaces'
}
except Exception as e:
return {'success': False, 'error': str(e)}
def monitor_and_handle_cpu_issues(self,
operation_func: Callable,
*args,
timeout_seconds: int = 300,
**kwargs) -> Any:
"""Monitor operation for CPU issues and handle automatically"""
import time
import signal
start_time = time.time()
def timeout_handler(signum, frame):
raise ResourceConstraintError(
f"Operation {operation_func.__name__} exceeded CPU time limit ({timeout_seconds}s)",
component=self.component,
metadata={
'timeout_seconds': timeout_seconds,
'operation': operation_func.__name__,
'environment': 'cpu_constrained'
},
suggestion="Reduce model complexity or dataset size for CPU-constrained environment"
)
# Set timeout signal
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout_seconds)
try:
result = operation_func(*args, **kwargs)
execution_time = time.time() - start_time
# Log performance if slow
if execution_time > timeout_seconds * 0.8: # 80% of timeout
if STRUCTURED_LOGGING_AVAILABLE:
logger = MLOpsLoggers.get_monitoring_logger()
logger.log_cpu_constraint_warning(
component=self.component,
operation=operation_func.__name__,
resource_usage={
'execution_time_seconds': execution_time,
'timeout_threshold': timeout_seconds,
'cpu_efficiency': 'low'
}
)
return result
except Exception as e:
execution_time = time.time() - start_time
# Handle error with CPU constraint context
self.error_handler.handle_error(
error=e,
context={
'operation': operation_func.__name__,
'execution_time': execution_time,
'timeout_limit': timeout_seconds,
'environment': 'cpu_constrained'
},
category=ErrorCategory.RESOURCE_CONSTRAINT,
severity=ErrorSeverity.HIGH
)
raise
finally:
# Clear timeout
signal.alarm(0)
# Integration utilities for existing codebase
def setup_error_handling() -> Dict[str, ErrorHandler]:
"""Setup error handlers for all MLOps components"""
handlers = {}
components = [
'model_trainer',
'model_retrainer',
'data_processor',
'feature_engineer',
'api_server',
'monitoring'
]
for component in components:
handler = ErrorHandler(component)
# Register common recovery strategies
handler.register_recovery_strategy(
ErrorCategory.DATA_LOADING,
RecoveryStrategies.data_loading_recovery
)
handler.register_recovery_strategy(
ErrorCategory.MODEL_TRAINING,
RecoveryStrategies.model_training_recovery
)
handler.register_recovery_strategy(
ErrorCategory.FEATURE_EXTRACTION,
RecoveryStrategies.feature_engineering_recovery
)
handlers[component] = handler
return handlers
def get_error_handler(component: str) -> ErrorHandler:
"""Get error handler for specific component"""
return ErrorHandler(component)
# Example integration functions
def integrate_with_retrain_py():
"""Example integration with retrain.py for robust error handling"""
# Setup error handler for retraining component
error_handler = ErrorHandler('model_retrainer')
# Register specific recovery strategies
error_handler.register_recovery_strategy(
ErrorCategory.MODEL_TRAINING,
lambda error, context: {
'success': True,
'recovery_method': 'fallback_to_individual_models',
'suggestion': 'Disable ensemble and use best individual model'
}
)
return error_handler
def integrate_with_train_py():
"""Example integration with train.py for comprehensive error handling"""
# Setup error handler for training component
error_handler = ErrorHandler('model_trainer')
# CPU constraint handler for HuggingFace Spaces
cpu_handler = CPUConstraintHandler('model_trainer')
return error_handler, cpu_handler
# Error reporting and analytics
class ErrorReporter:
"""Collect and report error analytics for MLOps monitoring"""
def __init__(self, report_file: Path = None):
self.report_file = report_file or Path("/tmp/logs/error_report.json")
self.error_stats = {}
def record_error(self, error_info: Dict[str, Any]):
"""Record error for analytics"""
category = error_info.get('error', {}).get('category', 'unknown')
severity = error_info.get('error', {}).get('severity', 'medium')
key = f"{category}:{severity}"
if key not in self.error_stats:
self.error_stats[key] = {
'count': 0,
'first_seen': datetime.now().isoformat(),
'last_seen': datetime.now().isoformat(),
'recovery_attempts': 0,
'recovery_successes': 0
}
stats = self.error_stats[key]
stats['count'] += 1
stats['last_seen'] = datetime.now().isoformat()
if error_info.get('recovery_attempted', False):
stats['recovery_attempts'] += 1
if error_info.get('recovery_successful', False):
stats['recovery_successes'] += 1
def generate_report(self) -> Dict[str, Any]:
"""Generate error analytics report"""
total_errors = sum(stats['count'] for stats in self.error_stats.values())
total_recovery_attempts = sum(stats['recovery_attempts'] for stats in self.error_stats.values())
total_recovery_successes = sum(stats['recovery_successes'] for stats in self.error_stats.values())
recovery_rate = (total_recovery_successes / total_recovery_attempts * 100) if total_recovery_attempts > 0 else 0
return {
'report_timestamp': datetime.now().isoformat(),
'summary': {
'total_errors': total_errors,
'unique_error_types': len(self.error_stats),
'recovery_attempts': total_recovery_attempts,
'recovery_successes': total_recovery_successes,
'recovery_rate_percent': recovery_rate
},
'error_breakdown': self.error_stats,
'recommendations': self._generate_recommendations()
}
def _generate_recommendations(self) -> list:
"""Generate recommendations based on error patterns"""
recommendations = []
# High frequency errors
high_freq_errors = {k: v for k, v in self.error_stats.items() if v['count'] > 5}
if high_freq_errors:
recommendations.append({
'type': 'high_frequency_errors',
'message': f'Address frequently occurring errors: {", ".join(high_freq_errors.keys())}',
'priority': 'high'
})
# Low recovery rates
low_recovery_errors = {
k: v for k, v in self.error_stats.items()
if v['recovery_attempts'] > 0 and (v['recovery_successes'] / v['recovery_attempts']) < 0.5
}
if low_recovery_errors:
recommendations.append({
'type': 'low_recovery_rate',
'message': 'Improve recovery strategies for poorly recovering error types',
'priority': 'medium',
'affected_errors': list(low_recovery_errors.keys())
})
# Resource constraint patterns
resource_errors = {k: v for k, v in self.error_stats.items() if 'resource_constraint' in k}
if resource_errors:
recommendations.append({
'type': 'resource_optimization',
'message': 'Consider CPU/memory optimizations for resource constraint errors',
'priority': 'high',
'suggestion': 'Review HuggingFace Spaces constraints and optimize accordingly'
})
return recommendations
def save_report(self):
"""Save error report to file"""
report = self.generate_report()
self.report_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.report_file, 'w') as f:
json.dump(report, f, indent=2)
return report
# Global error reporter instance
_global_error_reporter = None
def get_global_error_reporter() -> ErrorReporter:
"""Get global error reporter instance"""
global _global_error_reporter
if _global_error_reporter is None:
_global_error_reporter = ErrorReporter()
return _global_error_reporter
if __name__ == "__main__":
# Example usage and testing
print("Testing error handling system...")
# Test basic error handling
error_handler = ErrorHandler('test_component')
try:
raise ValueError("Test error for demonstration")
except Exception as e:
result = error_handler.handle_error(
error=e,
context={'test': True},
category=ErrorCategory.DATA_VALIDATION,
severity=ErrorSeverity.MEDIUM,
suggestion="This is a test error for demonstration purposes"
)
print("Error handling result:", result)
# Test decorator
@handle_errors(component='test_decorator', category=ErrorCategory.MODEL_TRAINING)
def test_function_with_error():
raise ModelTrainingError("Test model training error")
try:
test_function_with_error()
except Exception as e:
print("Decorator handled error:", type(e).__name__)
# Test CPU constraint handler
cpu_handler = CPUConstraintHandler('test_cpu')
def slow_operation():
import time
time.sleep(0.1) # Simulate work
return "completed"
try:
result = cpu_handler.monitor_and_handle_cpu_issues(slow_operation, timeout_seconds=1)
print("CPU monitoring result:", result)
except Exception as e:
print("CPU constraint error:", str(e))
# Test error reporting
reporter = get_global_error_reporter()
# Record some test errors
test_error_info = {
'error': {
'category': 'model_training',
'severity': 'high',
'message': 'Test error for reporting'
},
'recovery_attempted': True,
'recovery_successful': False
}
reporter.record_error(test_error_info)
report = reporter.generate_report()
print("Error report:", json.dumps(report, indent=2))
print("Error handling system test completed successfully!")