|
|
|
|
|
|
|
import functools |
|
import traceback |
|
import sys |
|
from datetime import datetime |
|
from pathlib import Path |
|
from typing import Dict, Any, Optional, Callable, Union, Type |
|
from contextlib import contextmanager |
|
from enum import Enum |
|
import json |
|
|
|
|
|
try: |
|
from .structured_logger import StructuredLogger, EventType, LogLevel, MLOpsLoggers |
|
STRUCTURED_LOGGING_AVAILABLE = True |
|
except ImportError: |
|
STRUCTURED_LOGGING_AVAILABLE = False |
|
|
|
import logging |
|
|
|
|
|
class ErrorSeverity(Enum): |
|
"""Error severity levels for classification and handling""" |
|
LOW = "low" |
|
MEDIUM = "medium" |
|
HIGH = "high" |
|
CRITICAL = "critical" |
|
|
|
|
|
class ErrorCategory(Enum): |
|
"""Error categories for better classification and handling""" |
|
|
|
DATA_VALIDATION = "data_validation" |
|
DATA_LOADING = "data_loading" |
|
DATA_PREPROCESSING = "data_preprocessing" |
|
DATA_QUALITY = "data_quality" |
|
|
|
|
|
MODEL_TRAINING = "model_training" |
|
MODEL_VALIDATION = "model_validation" |
|
MODEL_LOADING = "model_loading" |
|
MODEL_PREDICTION = "model_prediction" |
|
|
|
|
|
FEATURE_EXTRACTION = "feature_extraction" |
|
FEATURE_SELECTION = "feature_selection" |
|
|
|
|
|
RESOURCE_CONSTRAINT = "resource_constraint" |
|
CONFIGURATION = "configuration" |
|
DEPENDENCY = "dependency" |
|
IO_OPERATION = "io_operation" |
|
|
|
|
|
API_ERROR = "api_error" |
|
AUTHENTICATION = "authentication" |
|
VALIDATION = "validation" |
|
|
|
|
|
EXTERNAL_SERVICE = "external_service" |
|
NETWORK = "network" |
|
|
|
|
|
UNKNOWN = "unknown" |
|
|
|
|
|
class MLOpsError(Exception): |
|
"""Base exception class for MLOps-related errors""" |
|
|
|
def __init__(self, |
|
message: str, |
|
category: ErrorCategory = ErrorCategory.UNKNOWN, |
|
severity: ErrorSeverity = ErrorSeverity.MEDIUM, |
|
component: str = None, |
|
metadata: Dict[str, Any] = None, |
|
suggestion: str = None, |
|
original_error: Exception = None): |
|
|
|
self.message = message |
|
self.category = category |
|
self.severity = severity |
|
self.component = component |
|
self.metadata = metadata or {} |
|
self.suggestion = suggestion |
|
self.original_error = original_error |
|
self.timestamp = datetime.now().isoformat() |
|
|
|
super().__init__(self.message) |
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
"""Convert error to dictionary for logging/serialization""" |
|
return { |
|
'message': self.message, |
|
'category': self.category.value, |
|
'severity': self.severity.value, |
|
'component': self.component, |
|
'metadata': self.metadata, |
|
'suggestion': self.suggestion, |
|
'timestamp': self.timestamp, |
|
'original_error': { |
|
'type': type(self.original_error).__name__ if self.original_error else None, |
|
'message': str(self.original_error) if self.original_error else None |
|
} |
|
} |
|
|
|
|
|
|
|
class DataValidationError(MLOpsError): |
|
"""Error in data validation""" |
|
def __init__(self, message: str, **kwargs): |
|
super().__init__(message, category=ErrorCategory.DATA_VALIDATION, |
|
severity=ErrorSeverity.HIGH, **kwargs) |
|
|
|
|
|
class ModelTrainingError(MLOpsError): |
|
"""Error during model training""" |
|
def __init__(self, message: str, **kwargs): |
|
super().__init__(message, category=ErrorCategory.MODEL_TRAINING, |
|
severity=ErrorSeverity.HIGH, **kwargs) |
|
|
|
|
|
class ResourceConstraintError(MLOpsError): |
|
"""Error due to resource constraints (CPU/Memory)""" |
|
def __init__(self, message: str, **kwargs): |
|
super().__init__(message, category=ErrorCategory.RESOURCE_CONSTRAINT, |
|
severity=ErrorSeverity.MEDIUM, **kwargs) |
|
|
|
|
|
class ConfigurationError(MLOpsError): |
|
"""Error in configuration or setup""" |
|
def __init__(self, message: str, **kwargs): |
|
super().__init__(message, category=ErrorCategory.CONFIGURATION, |
|
severity=ErrorSeverity.HIGH, **kwargs) |
|
|
|
|
|
class FeatureEngineeringError(MLOpsError): |
|
"""Error in feature engineering process""" |
|
def __init__(self, message: str, **kwargs): |
|
super().__init__(message, category=ErrorCategory.FEATURE_EXTRACTION, |
|
severity=ErrorSeverity.MEDIUM, **kwargs) |
|
|
|
|
|
class ErrorHandler: |
|
"""Centralized error handling with logging, recovery, and monitoring""" |
|
|
|
def __init__(self, component: str, logger: Optional[StructuredLogger] = None): |
|
self.component = component |
|
self.error_count = {} |
|
self.recovery_strategies = {} |
|
|
|
|
|
if STRUCTURED_LOGGING_AVAILABLE and logger is None: |
|
self.logger = MLOpsLoggers.get_logger(component) |
|
elif logger: |
|
self.logger = logger |
|
else: |
|
|
|
import logging |
|
self.logger = logging.getLogger(component) |
|
|
|
def register_recovery_strategy(self, |
|
error_category: ErrorCategory, |
|
recovery_func: Callable): |
|
"""Register recovery strategy for specific error category""" |
|
self.recovery_strategies[error_category] = recovery_func |
|
|
|
def handle_error(self, |
|
error: Exception, |
|
context: Dict[str, Any] = None, |
|
category: ErrorCategory = None, |
|
severity: ErrorSeverity = None, |
|
suggestion: str = None, |
|
attempt_recovery: bool = True) -> Dict[str, Any]: |
|
""" |
|
Central error handling method |
|
|
|
Returns: |
|
Dict with error details and recovery status |
|
""" |
|
|
|
|
|
if not isinstance(error, MLOpsError): |
|
mlops_error = MLOpsError( |
|
message=str(error), |
|
category=category or self._classify_error(error), |
|
severity=severity or self._determine_severity(error), |
|
component=self.component, |
|
metadata=context or {}, |
|
suggestion=suggestion, |
|
original_error=error |
|
) |
|
else: |
|
mlops_error = error |
|
|
|
|
|
error_key = f"{mlops_error.category.value}:{type(error).__name__}" |
|
self.error_count[error_key] = self.error_count.get(error_key, 0) + 1 |
|
|
|
|
|
self._log_error(mlops_error, context) |
|
|
|
|
|
recovery_result = None |
|
if attempt_recovery and mlops_error.category in self.recovery_strategies: |
|
try: |
|
recovery_result = self.recovery_strategies[mlops_error.category](mlops_error, context) |
|
self._log_recovery_attempt(mlops_error, recovery_result) |
|
except Exception as recovery_error: |
|
self._log_recovery_failure(mlops_error, recovery_error) |
|
|
|
return { |
|
'error': mlops_error.to_dict(), |
|
'recovery_attempted': recovery_result is not None, |
|
'recovery_successful': recovery_result is not None and recovery_result.get('success', False), |
|
'recovery_result': recovery_result, |
|
'error_count': self.error_count.get(error_key, 1) |
|
} |
|
|
|
def _classify_error(self, error: Exception) -> ErrorCategory: |
|
"""Automatically classify error based on type and message""" |
|
error_type = type(error).__name__.lower() |
|
error_message = str(error).lower() |
|
|
|
|
|
if any(keyword in error_message for keyword in ['data', 'dataframe', 'csv', 'dataset']): |
|
if any(keyword in error_message for keyword in ['validation', 'invalid', 'format']): |
|
return ErrorCategory.DATA_VALIDATION |
|
elif any(keyword in error_message for keyword in ['load', 'read', 'file']): |
|
return ErrorCategory.DATA_LOADING |
|
else: |
|
return ErrorCategory.DATA_PREPROCESSING |
|
|
|
|
|
if any(keyword in error_message for keyword in ['model', 'training', 'fit', 'predict']): |
|
if 'training' in error_message or 'fit' in error_message: |
|
return ErrorCategory.MODEL_TRAINING |
|
elif 'predict' in error_message: |
|
return ErrorCategory.MODEL_PREDICTION |
|
else: |
|
return ErrorCategory.MODEL_VALIDATION |
|
|
|
|
|
if any(keyword in error_message for keyword in ['memory', 'cpu', 'resource', 'timeout']): |
|
return ErrorCategory.RESOURCE_CONSTRAINT |
|
|
|
|
|
if 'ioerror' in error_type or any(keyword in error_message for keyword in ['file', 'path', 'directory']): |
|
return ErrorCategory.IO_OPERATION |
|
|
|
|
|
if any(keyword in error_message for keyword in ['config', 'parameter', 'argument']): |
|
return ErrorCategory.CONFIGURATION |
|
|
|
|
|
if any(keyword in error_message for keyword in ['feature', 'transform', 'vectoriz']): |
|
return ErrorCategory.FEATURE_EXTRACTION |
|
|
|
|
|
if any(keyword in error_message for keyword in ['api', 'request', 'response', 'http']): |
|
return ErrorCategory.API_ERROR |
|
|
|
return ErrorCategory.UNKNOWN |
|
|
|
def _determine_severity(self, error: Exception) -> ErrorSeverity: |
|
"""Determine error severity based on error type and context""" |
|
error_type = type(error).__name__.lower() |
|
error_message = str(error).lower() |
|
|
|
|
|
if error_type in ['systemexit', 'keyboardinterrupt', 'memoryerror']: |
|
return ErrorSeverity.CRITICAL |
|
|
|
|
|
if any(keyword in error_message for keyword in ['training failed', 'model not found', 'critical']): |
|
return ErrorSeverity.HIGH |
|
|
|
|
|
if any(keyword in error_message for keyword in ['warning', 'timeout', 'resource']): |
|
return ErrorSeverity.MEDIUM |
|
|
|
|
|
return ErrorSeverity.MEDIUM |
|
|
|
def _log_error(self, error: MLOpsError, context: Dict[str, Any]): |
|
"""Log error with structured logging""" |
|
if STRUCTURED_LOGGING_AVAILABLE: |
|
log_level = self._get_log_level_for_severity(error.severity) |
|
|
|
self.logger.log( |
|
level=log_level, |
|
event_type=EventType.MODEL_TRAINING_ERROR, |
|
message=f"Error in {self.component}: {error.message}", |
|
component=self.component, |
|
metadata={ |
|
'error_category': error.category.value, |
|
'error_severity': error.severity.value, |
|
'error_metadata': error.metadata, |
|
'context': context or {}, |
|
'suggestion': error.suggestion, |
|
'error_count': self.error_count.get(f"{error.category.value}:{type(error.original_error).__name__}", 1) |
|
}, |
|
tags=[error.category.value, error.severity.value, 'error_handling'] |
|
) |
|
else: |
|
|
|
self.logger.error(f"Error in {self.component}: {error.message}") |
|
|
|
def _get_log_level_for_severity(self, severity: ErrorSeverity) -> LogLevel: |
|
"""Map error severity to log level""" |
|
severity_to_log_level = { |
|
ErrorSeverity.LOW: LogLevel.WARNING, |
|
ErrorSeverity.MEDIUM: LogLevel.ERROR, |
|
ErrorSeverity.HIGH: LogLevel.ERROR, |
|
ErrorSeverity.CRITICAL: LogLevel.CRITICAL |
|
} |
|
return severity_to_log_level.get(severity, LogLevel.ERROR) |
|
|
|
def _log_recovery_attempt(self, error: MLOpsError, recovery_result: Dict[str, Any]): |
|
"""Log recovery attempt results""" |
|
if STRUCTURED_LOGGING_AVAILABLE: |
|
success = recovery_result.get('success', False) |
|
event_type = EventType.MODEL_TRAINING_COMPLETE if success else EventType.MODEL_TRAINING_ERROR |
|
|
|
self.logger.info( |
|
event_type, |
|
f"Recovery {'succeeded' if success else 'failed'} for {error.category.value} error", |
|
component=self.component, |
|
metadata={ |
|
'original_error': error.message, |
|
'recovery_result': recovery_result, |
|
'error_category': error.category.value |
|
}, |
|
tags=['error_recovery', 'automated_recovery'] |
|
) |
|
|
|
def _log_recovery_failure(self, error: MLOpsError, recovery_error: Exception): |
|
"""Log recovery failure""" |
|
if STRUCTURED_LOGGING_AVAILABLE: |
|
self.logger.error( |
|
EventType.MODEL_TRAINING_ERROR, |
|
f"Recovery failed for {error.category.value} error: {str(recovery_error)}", |
|
component=self.component, |
|
metadata={ |
|
'original_error': error.message, |
|
'recovery_error': str(recovery_error), |
|
'error_category': error.category.value |
|
}, |
|
tags=['error_recovery', 'recovery_failure'] |
|
) |
|
|
|
|
|
|
|
def handle_errors(component: str = None, |
|
category: ErrorCategory = None, |
|
severity: ErrorSeverity = None, |
|
attempt_recovery: bool = True, |
|
reraise: bool = True): |
|
"""Decorator for automatic error handling""" |
|
def decorator(func): |
|
@functools.wraps(func) |
|
def wrapper(*args, **kwargs): |
|
comp_name = component or func.__module__ |
|
error_handler = ErrorHandler(comp_name) |
|
|
|
try: |
|
return func(*args, **kwargs) |
|
except Exception as e: |
|
|
|
result = error_handler.handle_error( |
|
error=e, |
|
context={ |
|
'function': func.__name__, |
|
'args_count': len(args), |
|
'kwargs_count': len(kwargs) |
|
}, |
|
category=category, |
|
severity=severity, |
|
attempt_recovery=attempt_recovery |
|
) |
|
|
|
|
|
if reraise: |
|
raise |
|
else: |
|
return result |
|
|
|
return wrapper |
|
return decorator |
|
|
|
|
|
|
|
@contextmanager |
|
def error_handling_context(component: str, |
|
operation: str, |
|
category: ErrorCategory = None, |
|
severity: ErrorSeverity = None, |
|
metadata: Dict[str, Any] = None): |
|
"""Context manager for handling errors within a specific operation""" |
|
error_handler = ErrorHandler(component) |
|
|
|
try: |
|
yield error_handler |
|
except Exception as e: |
|
result = error_handler.handle_error( |
|
error=e, |
|
context={ |
|
'operation': operation, |
|
**(metadata or {}) |
|
}, |
|
category=category, |
|
severity=severity |
|
) |
|
|
|
raise |
|
|
|
|
|
|
|
class RecoveryStrategies: |
|
"""Common recovery strategies for different error categories""" |
|
|
|
@staticmethod |
|
def data_loading_recovery(error: MLOpsError, context: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Recovery strategy for data loading errors""" |
|
try: |
|
|
|
if 'file_path' in context: |
|
|
|
backup_paths = [ |
|
Path(context['file_path']).with_suffix('.backup.csv'), |
|
Path('/tmp/data/fallback_dataset.csv'), |
|
Path('/tmp/data/combined_dataset.csv') |
|
] |
|
|
|
for backup_path in backup_paths: |
|
if backup_path.exists(): |
|
return { |
|
'success': True, |
|
'recovery_method': 'fallback_data_source', |
|
'fallback_path': str(backup_path) |
|
} |
|
|
|
return {'success': False, 'reason': 'No fallback data sources available'} |
|
|
|
except Exception as e: |
|
return {'success': False, 'error': str(e)} |
|
|
|
@staticmethod |
|
def model_training_recovery(error: MLOpsError, context: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Recovery strategy for model training errors""" |
|
try: |
|
|
|
recovery_methods = [] |
|
|
|
|
|
if 'resource' in str(error.message).lower(): |
|
recovery_methods.append('reduce_model_complexity') |
|
|
|
|
|
if 'lightgbm' in str(error.message).lower(): |
|
recovery_methods.append('fallback_to_logistic_regression') |
|
|
|
|
|
if 'memory' in str(error.message).lower(): |
|
recovery_methods.append('reduce_dataset_size') |
|
|
|
return { |
|
'success': len(recovery_methods) > 0, |
|
'recovery_methods': recovery_methods, |
|
'suggestion': 'Apply suggested recovery methods and retry training' |
|
} |
|
|
|
except Exception as e: |
|
return {'success': False, 'error': str(e)} |
|
|
|
@staticmethod |
|
def feature_engineering_recovery(error: MLOpsError, context: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Recovery strategy for feature engineering errors""" |
|
try: |
|
|
|
if 'enhanced' in str(error.message).lower(): |
|
return { |
|
'success': True, |
|
'recovery_method': 'fallback_to_standard_features', |
|
'suggestion': 'Switch to standard TF-IDF features and continue training' |
|
} |
|
|
|
return {'success': False, 'reason': 'No applicable recovery method'} |
|
|
|
except Exception as e: |
|
return {'success': False, 'error': str(e)} |
|
|
|
|
|
|
|
class CPUConstraintHandler: |
|
"""Specialized handler for CPU constraint issues in HuggingFace Spaces""" |
|
|
|
def __init__(self, component: str): |
|
self.component = component |
|
self.error_handler = ErrorHandler(component) |
|
|
|
|
|
self.error_handler.register_recovery_strategy( |
|
ErrorCategory.RESOURCE_CONSTRAINT, |
|
self._cpu_recovery_strategy |
|
) |
|
|
|
def _cpu_recovery_strategy(self, error: MLOpsError, context: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Recovery strategy specifically for CPU constraints""" |
|
try: |
|
recovery_actions = [] |
|
|
|
|
|
if 'n_jobs' in str(error.message) or 'parallel' in str(error.message): |
|
recovery_actions.append('force_single_threading') |
|
|
|
|
|
if 'training' in context.get('operation', '').lower(): |
|
recovery_actions.extend([ |
|
'reduce_cv_folds', |
|
'simplify_hyperparameter_grid', |
|
'disable_ensemble_if_slow' |
|
]) |
|
|
|
|
|
if 'memory' in str(error.message).lower(): |
|
recovery_actions.extend([ |
|
'reduce_feature_dimensions', |
|
'batch_processing', |
|
'garbage_collection' |
|
]) |
|
|
|
return { |
|
'success': len(recovery_actions) > 0, |
|
'recovery_actions': recovery_actions, |
|
'cpu_optimizations': True, |
|
'environment': 'huggingface_spaces' |
|
} |
|
|
|
except Exception as e: |
|
return {'success': False, 'error': str(e)} |
|
|
|
def monitor_and_handle_cpu_issues(self, |
|
operation_func: Callable, |
|
*args, |
|
timeout_seconds: int = 300, |
|
**kwargs) -> Any: |
|
"""Monitor operation for CPU issues and handle automatically""" |
|
import time |
|
import signal |
|
|
|
start_time = time.time() |
|
|
|
def timeout_handler(signum, frame): |
|
raise ResourceConstraintError( |
|
f"Operation {operation_func.__name__} exceeded CPU time limit ({timeout_seconds}s)", |
|
component=self.component, |
|
metadata={ |
|
'timeout_seconds': timeout_seconds, |
|
'operation': operation_func.__name__, |
|
'environment': 'cpu_constrained' |
|
}, |
|
suggestion="Reduce model complexity or dataset size for CPU-constrained environment" |
|
) |
|
|
|
|
|
signal.signal(signal.SIGALRM, timeout_handler) |
|
signal.alarm(timeout_seconds) |
|
|
|
try: |
|
result = operation_func(*args, **kwargs) |
|
execution_time = time.time() - start_time |
|
|
|
|
|
if execution_time > timeout_seconds * 0.8: |
|
if STRUCTURED_LOGGING_AVAILABLE: |
|
logger = MLOpsLoggers.get_monitoring_logger() |
|
logger.log_cpu_constraint_warning( |
|
component=self.component, |
|
operation=operation_func.__name__, |
|
resource_usage={ |
|
'execution_time_seconds': execution_time, |
|
'timeout_threshold': timeout_seconds, |
|
'cpu_efficiency': 'low' |
|
} |
|
) |
|
|
|
return result |
|
|
|
except Exception as e: |
|
execution_time = time.time() - start_time |
|
|
|
|
|
self.error_handler.handle_error( |
|
error=e, |
|
context={ |
|
'operation': operation_func.__name__, |
|
'execution_time': execution_time, |
|
'timeout_limit': timeout_seconds, |
|
'environment': 'cpu_constrained' |
|
}, |
|
category=ErrorCategory.RESOURCE_CONSTRAINT, |
|
severity=ErrorSeverity.HIGH |
|
) |
|
raise |
|
|
|
finally: |
|
|
|
signal.alarm(0) |
|
|
|
|
|
|
|
def setup_error_handling() -> Dict[str, ErrorHandler]: |
|
"""Setup error handlers for all MLOps components""" |
|
handlers = {} |
|
|
|
components = [ |
|
'model_trainer', |
|
'model_retrainer', |
|
'data_processor', |
|
'feature_engineer', |
|
'api_server', |
|
'monitoring' |
|
] |
|
|
|
for component in components: |
|
handler = ErrorHandler(component) |
|
|
|
|
|
handler.register_recovery_strategy( |
|
ErrorCategory.DATA_LOADING, |
|
RecoveryStrategies.data_loading_recovery |
|
) |
|
handler.register_recovery_strategy( |
|
ErrorCategory.MODEL_TRAINING, |
|
RecoveryStrategies.model_training_recovery |
|
) |
|
handler.register_recovery_strategy( |
|
ErrorCategory.FEATURE_EXTRACTION, |
|
RecoveryStrategies.feature_engineering_recovery |
|
) |
|
|
|
handlers[component] = handler |
|
|
|
return handlers |
|
|
|
|
|
def get_error_handler(component: str) -> ErrorHandler: |
|
"""Get error handler for specific component""" |
|
return ErrorHandler(component) |
|
|
|
|
|
|
|
def integrate_with_retrain_py(): |
|
"""Example integration with retrain.py for robust error handling""" |
|
|
|
|
|
error_handler = ErrorHandler('model_retrainer') |
|
|
|
|
|
error_handler.register_recovery_strategy( |
|
ErrorCategory.MODEL_TRAINING, |
|
lambda error, context: { |
|
'success': True, |
|
'recovery_method': 'fallback_to_individual_models', |
|
'suggestion': 'Disable ensemble and use best individual model' |
|
} |
|
) |
|
|
|
return error_handler |
|
|
|
|
|
def integrate_with_train_py(): |
|
"""Example integration with train.py for comprehensive error handling""" |
|
|
|
|
|
error_handler = ErrorHandler('model_trainer') |
|
|
|
|
|
cpu_handler = CPUConstraintHandler('model_trainer') |
|
|
|
return error_handler, cpu_handler |
|
|
|
|
|
|
|
class ErrorReporter: |
|
"""Collect and report error analytics for MLOps monitoring""" |
|
|
|
def __init__(self, report_file: Path = None): |
|
self.report_file = report_file or Path("/tmp/logs/error_report.json") |
|
self.error_stats = {} |
|
|
|
def record_error(self, error_info: Dict[str, Any]): |
|
"""Record error for analytics""" |
|
category = error_info.get('error', {}).get('category', 'unknown') |
|
severity = error_info.get('error', {}).get('severity', 'medium') |
|
|
|
key = f"{category}:{severity}" |
|
|
|
if key not in self.error_stats: |
|
self.error_stats[key] = { |
|
'count': 0, |
|
'first_seen': datetime.now().isoformat(), |
|
'last_seen': datetime.now().isoformat(), |
|
'recovery_attempts': 0, |
|
'recovery_successes': 0 |
|
} |
|
|
|
stats = self.error_stats[key] |
|
stats['count'] += 1 |
|
stats['last_seen'] = datetime.now().isoformat() |
|
|
|
if error_info.get('recovery_attempted', False): |
|
stats['recovery_attempts'] += 1 |
|
if error_info.get('recovery_successful', False): |
|
stats['recovery_successes'] += 1 |
|
|
|
def generate_report(self) -> Dict[str, Any]: |
|
"""Generate error analytics report""" |
|
total_errors = sum(stats['count'] for stats in self.error_stats.values()) |
|
total_recovery_attempts = sum(stats['recovery_attempts'] for stats in self.error_stats.values()) |
|
total_recovery_successes = sum(stats['recovery_successes'] for stats in self.error_stats.values()) |
|
|
|
recovery_rate = (total_recovery_successes / total_recovery_attempts * 100) if total_recovery_attempts > 0 else 0 |
|
|
|
return { |
|
'report_timestamp': datetime.now().isoformat(), |
|
'summary': { |
|
'total_errors': total_errors, |
|
'unique_error_types': len(self.error_stats), |
|
'recovery_attempts': total_recovery_attempts, |
|
'recovery_successes': total_recovery_successes, |
|
'recovery_rate_percent': recovery_rate |
|
}, |
|
'error_breakdown': self.error_stats, |
|
'recommendations': self._generate_recommendations() |
|
} |
|
|
|
def _generate_recommendations(self) -> list: |
|
"""Generate recommendations based on error patterns""" |
|
recommendations = [] |
|
|
|
|
|
high_freq_errors = {k: v for k, v in self.error_stats.items() if v['count'] > 5} |
|
if high_freq_errors: |
|
recommendations.append({ |
|
'type': 'high_frequency_errors', |
|
'message': f'Address frequently occurring errors: {", ".join(high_freq_errors.keys())}', |
|
'priority': 'high' |
|
}) |
|
|
|
|
|
low_recovery_errors = { |
|
k: v for k, v in self.error_stats.items() |
|
if v['recovery_attempts'] > 0 and (v['recovery_successes'] / v['recovery_attempts']) < 0.5 |
|
} |
|
if low_recovery_errors: |
|
recommendations.append({ |
|
'type': 'low_recovery_rate', |
|
'message': 'Improve recovery strategies for poorly recovering error types', |
|
'priority': 'medium', |
|
'affected_errors': list(low_recovery_errors.keys()) |
|
}) |
|
|
|
|
|
resource_errors = {k: v for k, v in self.error_stats.items() if 'resource_constraint' in k} |
|
if resource_errors: |
|
recommendations.append({ |
|
'type': 'resource_optimization', |
|
'message': 'Consider CPU/memory optimizations for resource constraint errors', |
|
'priority': 'high', |
|
'suggestion': 'Review HuggingFace Spaces constraints and optimize accordingly' |
|
}) |
|
|
|
return recommendations |
|
|
|
def save_report(self): |
|
"""Save error report to file""" |
|
report = self.generate_report() |
|
|
|
self.report_file.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
with open(self.report_file, 'w') as f: |
|
json.dump(report, f, indent=2) |
|
|
|
return report |
|
|
|
|
|
|
|
_global_error_reporter = None |
|
|
|
def get_global_error_reporter() -> ErrorReporter: |
|
"""Get global error reporter instance""" |
|
global _global_error_reporter |
|
if _global_error_reporter is None: |
|
_global_error_reporter = ErrorReporter() |
|
return _global_error_reporter |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
print("Testing error handling system...") |
|
|
|
|
|
error_handler = ErrorHandler('test_component') |
|
|
|
try: |
|
raise ValueError("Test error for demonstration") |
|
except Exception as e: |
|
result = error_handler.handle_error( |
|
error=e, |
|
context={'test': True}, |
|
category=ErrorCategory.DATA_VALIDATION, |
|
severity=ErrorSeverity.MEDIUM, |
|
suggestion="This is a test error for demonstration purposes" |
|
) |
|
print("Error handling result:", result) |
|
|
|
|
|
@handle_errors(component='test_decorator', category=ErrorCategory.MODEL_TRAINING) |
|
def test_function_with_error(): |
|
raise ModelTrainingError("Test model training error") |
|
|
|
try: |
|
test_function_with_error() |
|
except Exception as e: |
|
print("Decorator handled error:", type(e).__name__) |
|
|
|
|
|
cpu_handler = CPUConstraintHandler('test_cpu') |
|
|
|
def slow_operation(): |
|
import time |
|
time.sleep(0.1) |
|
return "completed" |
|
|
|
try: |
|
result = cpu_handler.monitor_and_handle_cpu_issues(slow_operation, timeout_seconds=1) |
|
print("CPU monitoring result:", result) |
|
except Exception as e: |
|
print("CPU constraint error:", str(e)) |
|
|
|
|
|
reporter = get_global_error_reporter() |
|
|
|
|
|
test_error_info = { |
|
'error': { |
|
'category': 'model_training', |
|
'severity': 'high', |
|
'message': 'Test error for reporting' |
|
}, |
|
'recovery_attempted': True, |
|
'recovery_successful': False |
|
} |
|
|
|
reporter.record_error(test_error_info) |
|
report = reporter.generate_report() |
|
print("Error report:", json.dumps(report, indent=2)) |
|
|
|
print("Error handling system test completed successfully!") |