|
|
|
|
|
|
|
import numpy as np |
|
from typing import Dict, Any, Tuple, Optional, List, Callable |
|
from pathlib import Path |
|
import json |
|
from datetime import datetime |
|
from dataclasses import dataclass |
|
import logging |
|
|
|
|
|
try: |
|
from .statistical_analysis import ( |
|
MLOpsStatisticalAnalyzer, BootstrapAnalyzer, |
|
FeatureImportanceAnalyzer, StatisticalResult |
|
) |
|
STATISTICAL_ANALYSIS_AVAILABLE = True |
|
except ImportError: |
|
STATISTICAL_ANALYSIS_AVAILABLE = False |
|
logging.warning("Statistical analysis components not available") |
|
|
|
|
|
try: |
|
from .structured_logger import StructuredLogger, EventType, MLOpsLoggers |
|
STRUCTURED_LOGGING_AVAILABLE = True |
|
except ImportError: |
|
STRUCTURED_LOGGING_AVAILABLE = False |
|
import logging |
|
|
|
|
|
@dataclass |
|
class UncertaintyReport: |
|
"""Comprehensive uncertainty quantification report""" |
|
model_performance_uncertainty: Dict[str, Any] |
|
feature_importance_uncertainty: Dict[str, Any] |
|
cross_validation_uncertainty: Dict[str, Any] |
|
prediction_uncertainty: Dict[str, Any] |
|
model_comparison_uncertainty: Dict[str, Any] |
|
recommendations: List[Dict[str, Any]] |
|
confidence_level: float |
|
analysis_timestamp: str |
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
"""Convert to dictionary for serialization""" |
|
return { |
|
'model_performance_uncertainty': self.model_performance_uncertainty, |
|
'feature_importance_uncertainty': self.feature_importance_uncertainty, |
|
'cross_validation_uncertainty': self.cross_validation_uncertainty, |
|
'prediction_uncertainty': self.prediction_uncertainty, |
|
'model_comparison_uncertainty': self.model_comparison_uncertainty, |
|
'recommendations': self.recommendations, |
|
'confidence_level': self.confidence_level, |
|
'analysis_timestamp': self.analysis_timestamp |
|
} |
|
|
|
def save_report(self, file_path: Path = None) -> Path: |
|
"""Save uncertainty report to file""" |
|
if file_path is None: |
|
file_path = Path("/tmp/logs/uncertainty_report.json") |
|
|
|
file_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
with open(file_path, 'w') as f: |
|
json.dump(self.to_dict(), f, indent=2, default=str) |
|
|
|
return file_path |
|
|
|
|
|
class EnhancedUncertaintyQuantifier: |
|
"""Enhanced uncertainty quantification for MLOps pipeline integration""" |
|
|
|
def __init__(self, |
|
confidence_level: float = 0.95, |
|
n_bootstrap: int = 1000, |
|
random_state: int = 42): |
|
|
|
self.confidence_level = confidence_level |
|
self.n_bootstrap = n_bootstrap |
|
self.random_state = random_state |
|
|
|
if STATISTICAL_ANALYSIS_AVAILABLE: |
|
self.statistical_analyzer = MLOpsStatisticalAnalyzer( |
|
confidence_level, n_bootstrap, random_state |
|
) |
|
self.bootstrap_analyzer = BootstrapAnalyzer(n_bootstrap, confidence_level, random_state) |
|
self.feature_analyzer = FeatureImportanceAnalyzer(n_bootstrap, confidence_level, random_state) |
|
else: |
|
raise ImportError("Statistical analysis components required for uncertainty quantification") |
|
|
|
if STRUCTURED_LOGGING_AVAILABLE: |
|
self.logger = MLOpsLoggers.get_logger('uncertainty_quantification') |
|
else: |
|
self.logger = logging.getLogger(__name__) |
|
|
|
def quantify_model_uncertainty(self, |
|
model, |
|
X_train: np.ndarray, |
|
X_test: np.ndarray, |
|
y_train: np.ndarray, |
|
y_test: np.ndarray, |
|
model_name: str = "model") -> Dict[str, Any]: |
|
"""Quantify uncertainty in model performance metrics""" |
|
|
|
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score |
|
|
|
|
|
model.fit(X_train, y_train) |
|
y_pred = model.predict(X_test) |
|
y_pred_proba = model.predict_proba(X_test)[:, 1] if hasattr(model, 'predict_proba') else y_pred |
|
|
|
|
|
metrics = { |
|
'accuracy': lambda y_true, y_pred: accuracy_score(y_true, y_pred), |
|
'f1': lambda y_true, y_pred: f1_score(y_true, y_pred, average='weighted'), |
|
'precision': lambda y_true, y_pred: precision_score(y_true, y_pred, average='weighted'), |
|
'recall': lambda y_true, y_pred: recall_score(y_true, y_pred, average='weighted'), |
|
'roc_auc': lambda y_true, y_pred_proba: roc_auc_score(y_true, y_pred_proba) |
|
} |
|
|
|
|
|
uncertainty_results = {} |
|
|
|
for metric_name, metric_func in metrics.items(): |
|
try: |
|
if metric_name == 'roc_auc': |
|
result = self.bootstrap_analyzer.bootstrap_metric( |
|
y_test, y_pred_proba, metric_func |
|
) |
|
else: |
|
result = self.bootstrap_analyzer.bootstrap_metric( |
|
y_test, y_pred, metric_func |
|
) |
|
|
|
uncertainty_results[metric_name] = { |
|
'point_estimate': result.point_estimate, |
|
'confidence_interval': result.confidence_interval, |
|
'margin_of_error': result.margin_of_error(), |
|
'relative_uncertainty': result.margin_of_error() / result.point_estimate if result.point_estimate > 0 else np.inf, |
|
'confidence_level': result.confidence_level, |
|
'sample_size': result.sample_size, |
|
'metadata': result.metadata |
|
} |
|
|
|
except Exception as e: |
|
uncertainty_results[metric_name] = {'error': str(e)} |
|
|
|
|
|
valid_uncertainties = [ |
|
r['relative_uncertainty'] for r in uncertainty_results.values() |
|
if isinstance(r, dict) and 'relative_uncertainty' in r and np.isfinite(r['relative_uncertainty']) |
|
] |
|
|
|
overall_assessment = { |
|
'model_name': model_name, |
|
'average_relative_uncertainty': float(np.mean(valid_uncertainties)) if valid_uncertainties else np.inf, |
|
'max_relative_uncertainty': float(np.max(valid_uncertainties)) if valid_uncertainties else np.inf, |
|
'uncertainty_level': self._classify_uncertainty_level(np.mean(valid_uncertainties)) if valid_uncertainties else 'unknown' |
|
} |
|
|
|
return { |
|
'metric_uncertainties': uncertainty_results, |
|
'overall_assessment': overall_assessment, |
|
'analysis_metadata': { |
|
'confidence_level': self.confidence_level, |
|
'n_bootstrap': self.n_bootstrap, |
|
'test_size': len(y_test), |
|
'train_size': len(y_train) |
|
} |
|
} |
|
|
|
def quantify_feature_importance_uncertainty(self, |
|
model, |
|
X: np.ndarray, |
|
y: np.ndarray, |
|
feature_names: List[str] = None) -> Dict[str, Any]: |
|
"""Quantify uncertainty in feature importance rankings""" |
|
|
|
try: |
|
|
|
stability_results = self.feature_analyzer.analyze_importance_stability( |
|
model, X, y, feature_names |
|
) |
|
|
|
|
|
feature_uncertainties = {} |
|
unstable_features = [] |
|
|
|
for feature_name, analysis in stability_results['feature_importance_analysis'].items(): |
|
cv = analysis['metadata']['coefficient_of_variation'] |
|
|
|
feature_uncertainties[feature_name] = { |
|
'importance_mean': analysis['point_estimate'], |
|
'importance_ci': analysis['confidence_interval'], |
|
'coefficient_of_variation': cv, |
|
'stability_rank': analysis['metadata']['stability_rank'], |
|
'uncertainty_level': self._classify_feature_uncertainty(cv) |
|
} |
|
|
|
|
|
if cv > 0.5: |
|
unstable_features.append({ |
|
'feature': feature_name, |
|
'cv': cv, |
|
'reason': 'High variance in importance across bootstrap samples' |
|
}) |
|
|
|
return { |
|
'feature_importance_uncertainties': feature_uncertainties, |
|
'stability_ranking': stability_results['stability_ranking'], |
|
'unstable_features': unstable_features, |
|
'uncertainty_summary': { |
|
'total_features': len(feature_uncertainties), |
|
'unstable_features_count': len(unstable_features), |
|
'uncertainty_rate': len(unstable_features) / len(feature_uncertainties) if feature_uncertainties else 0 |
|
}, |
|
'analysis_metadata': stability_results['analysis_metadata'] |
|
} |
|
|
|
except Exception as e: |
|
return {'error': str(e)} |
|
|
|
def quantify_cross_validation_uncertainty(self, |
|
model, |
|
X: np.ndarray, |
|
y: np.ndarray, |
|
cv_folds: int = 5) -> Dict[str, Any]: |
|
"""Quantify uncertainty in cross-validation results""" |
|
|
|
from sklearn.model_selection import cross_val_score, StratifiedKFold |
|
from sklearn.metrics import f1_score, accuracy_score |
|
|
|
try: |
|
|
|
cv_strategy = StratifiedKFold(n_splits=cv_folds, shuffle=True, random_state=self.random_state) |
|
|
|
|
|
metrics = { |
|
'accuracy': lambda y_true, y_pred: accuracy_score(y_true, y_pred), |
|
'f1': lambda y_true, y_pred: f1_score(y_true, y_pred, average='weighted') |
|
} |
|
|
|
cv_analysis = self.statistical_analyzer.cv_analyzer.comprehensive_cv_analysis( |
|
model, X, y, metrics |
|
) |
|
|
|
|
|
cv_uncertainties = {} |
|
|
|
for metric_name, analysis in cv_analysis['metrics_analysis'].items(): |
|
test_scores = analysis['test_scores'] |
|
|
|
|
|
cv_coefficient = test_scores['std'] / test_scores['mean'] if test_scores['mean'] > 0 else np.inf |
|
|
|
cv_uncertainties[metric_name] = { |
|
'cv_mean': test_scores['mean'], |
|
'cv_std': test_scores['std'], |
|
'cv_scores': test_scores['scores'], |
|
'coefficient_of_variation': cv_coefficient, |
|
'confidence_interval': test_scores['confidence_interval'], |
|
'stability_level': self._classify_cv_stability(cv_coefficient), |
|
'overfitting_analysis': analysis.get('overfitting_analysis', {}), |
|
'statistical_tests': analysis.get('statistical_tests', {}) |
|
} |
|
|
|
return { |
|
'cv_uncertainties': cv_uncertainties, |
|
'cv_metadata': { |
|
'cv_folds': cv_folds, |
|
'sample_size': len(X), |
|
'confidence_level': self.confidence_level |
|
}, |
|
'stability_assessment': self._assess_cv_stability(cv_uncertainties) |
|
} |
|
|
|
except Exception as e: |
|
return {'error': str(e)} |
|
|
|
def quantify_prediction_uncertainty(self, |
|
model, |
|
X_new: np.ndarray, |
|
n_bootstrap_predictions: int = 100) -> Dict[str, Any]: |
|
"""Quantify uncertainty in individual predictions using bootstrap""" |
|
|
|
try: |
|
|
|
|
|
|
|
if hasattr(model, 'predict_proba'): |
|
|
|
probabilities = model.predict_proba(X_new) |
|
predictions = model.predict(X_new) |
|
|
|
|
|
prediction_uncertainties = [] |
|
|
|
for i, (pred, proba) in enumerate(zip(predictions, probabilities)): |
|
max_proba = np.max(proba) |
|
entropy = -np.sum(proba * np.log(proba + 1e-8)) |
|
|
|
uncertainty_info = { |
|
'prediction': int(pred), |
|
'prediction_probability': float(max_proba), |
|
'entropy': float(entropy), |
|
'uncertainty_level': self._classify_prediction_uncertainty(max_proba), |
|
'all_class_probabilities': proba.tolist() |
|
} |
|
|
|
prediction_uncertainties.append(uncertainty_info) |
|
|
|
|
|
avg_entropy = np.mean([p['entropy'] for p in prediction_uncertainties]) |
|
avg_confidence = np.mean([p['prediction_probability'] for p in prediction_uncertainties]) |
|
|
|
uncertain_predictions = sum(1 for p in prediction_uncertainties if p['uncertainty_level'] in ['high', 'very_high']) |
|
|
|
return { |
|
'individual_predictions': prediction_uncertainties, |
|
'uncertainty_summary': { |
|
'total_predictions': len(prediction_uncertainties), |
|
'uncertain_predictions': uncertain_predictions, |
|
'uncertainty_rate': uncertain_predictions / len(prediction_uncertainties), |
|
'average_entropy': float(avg_entropy), |
|
'average_confidence': float(avg_confidence) |
|
} |
|
} |
|
else: |
|
return { |
|
'error': 'Model does not support probability predictions - uncertainty quantification limited' |
|
} |
|
|
|
except Exception as e: |
|
return {'error': str(e)} |
|
|
|
def comprehensive_uncertainty_analysis(self, |
|
models: Dict[str, Any], |
|
X_train: np.ndarray, |
|
X_test: np.ndarray, |
|
y_train: np.ndarray, |
|
y_test: np.ndarray, |
|
feature_names: List[str] = None) -> UncertaintyReport: |
|
"""Perform comprehensive uncertainty analysis across all components""" |
|
|
|
|
|
model_uncertainties = {} |
|
for model_name, model in models.items(): |
|
model_uncertainties[model_name] = self.quantify_model_uncertainty( |
|
model, X_train, X_test, y_train, y_test, model_name |
|
) |
|
|
|
|
|
best_model_name = min(model_uncertainties.keys(), |
|
key=lambda k: model_uncertainties[k]['overall_assessment']['average_relative_uncertainty']) |
|
best_model = models[best_model_name] |
|
|
|
feature_uncertainty = self.quantify_feature_importance_uncertainty( |
|
best_model, X_train, y_train, feature_names |
|
) |
|
|
|
|
|
cv_uncertainty = self.quantify_cross_validation_uncertainty( |
|
best_model, X_train, y_train |
|
) |
|
|
|
|
|
prediction_uncertainty = self.quantify_prediction_uncertainty( |
|
best_model, X_test |
|
) |
|
|
|
|
|
if len(models) > 1: |
|
comparison_uncertainty = self._quantify_model_comparison_uncertainty( |
|
models, X_train, y_train |
|
) |
|
else: |
|
comparison_uncertainty = {'single_model': 'No comparison available'} |
|
|
|
|
|
recommendations = self._generate_uncertainty_recommendations( |
|
model_uncertainties, feature_uncertainty, cv_uncertainty, prediction_uncertainty |
|
) |
|
|
|
return UncertaintyReport( |
|
model_performance_uncertainty=model_uncertainties, |
|
feature_importance_uncertainty=feature_uncertainty, |
|
cross_validation_uncertainty=cv_uncertainty, |
|
prediction_uncertainty=prediction_uncertainty, |
|
model_comparison_uncertainty=comparison_uncertainty, |
|
recommendations=recommendations, |
|
confidence_level=self.confidence_level, |
|
analysis_timestamp=datetime.now().isoformat() |
|
) |
|
|
|
def _quantify_model_comparison_uncertainty(self, |
|
models: Dict[str, Any], |
|
X: np.ndarray, |
|
y: np.ndarray) -> Dict[str, Any]: |
|
"""Quantify uncertainty in model comparisons""" |
|
|
|
try: |
|
|
|
from sklearn.metrics import f1_score, accuracy_score |
|
|
|
metrics = { |
|
'f1': lambda y_true, y_pred: f1_score(y_true, y_pred, average='weighted'), |
|
'accuracy': lambda y_true, y_pred: accuracy_score(y_true, y_pred) |
|
} |
|
|
|
comparison_results = self.statistical_analyzer.comparison_analyzer.comprehensive_model_comparison( |
|
models, X, y, metrics |
|
) |
|
|
|
|
|
comparison_uncertainties = {} |
|
|
|
for comparison_name, comparison_data in comparison_results.get('pairwise_comparisons', {}).items(): |
|
overall_comp = comparison_data.get('overall_comparison', {}) |
|
|
|
comparison_uncertainties[comparison_name] = { |
|
'improvement_rate': overall_comp.get('improvement_rate', 0), |
|
'significant_improvements': overall_comp.get('significant_improvements', 0), |
|
'total_comparisons': overall_comp.get('total_comparisons', 0), |
|
'recommendation': overall_comp.get('recommendation', 'No recommendation'), |
|
'uncertainty_level': self._classify_comparison_uncertainty(overall_comp.get('improvement_rate', 0)) |
|
} |
|
|
|
|
|
ranking = comparison_results.get('model_ranking', {}) |
|
ranking_uncertainty = self._assess_ranking_uncertainty(ranking) |
|
|
|
return { |
|
'pairwise_comparison_uncertainties': comparison_uncertainties, |
|
'ranking_uncertainty': ranking_uncertainty, |
|
'comparison_metadata': comparison_results.get('analysis_metadata', {}) |
|
} |
|
|
|
except Exception as e: |
|
return {'error': str(e)} |
|
|
|
def _classify_uncertainty_level(self, relative_uncertainty: float) -> str: |
|
"""Classify overall uncertainty level""" |
|
if relative_uncertainty < 0.05: |
|
return 'very_low' |
|
elif relative_uncertainty < 0.1: |
|
return 'low' |
|
elif relative_uncertainty < 0.2: |
|
return 'medium' |
|
elif relative_uncertainty < 0.5: |
|
return 'high' |
|
else: |
|
return 'very_high' |
|
|
|
def _classify_feature_uncertainty(self, cv: float) -> str: |
|
"""Classify feature importance uncertainty""" |
|
if cv < 0.2: |
|
return 'stable' |
|
elif cv < 0.5: |
|
return 'moderately_stable' |
|
elif cv < 1.0: |
|
return 'unstable' |
|
else: |
|
return 'very_unstable' |
|
|
|
def _classify_cv_stability(self, cv_coefficient: float) -> str: |
|
"""Classify cross-validation stability""" |
|
if cv_coefficient < 0.1: |
|
return 'very_stable' |
|
elif cv_coefficient < 0.2: |
|
return 'stable' |
|
elif cv_coefficient < 0.3: |
|
return 'moderately_stable' |
|
else: |
|
return 'unstable' |
|
|
|
def _classify_prediction_uncertainty(self, max_probability: float) -> str: |
|
"""Classify individual prediction uncertainty""" |
|
if max_probability > 0.95: |
|
return 'very_low' |
|
elif max_probability > 0.8: |
|
return 'low' |
|
elif max_probability > 0.6: |
|
return 'medium' |
|
elif max_probability > 0.5: |
|
return 'high' |
|
else: |
|
return 'very_high' |
|
|
|
def _classify_comparison_uncertainty(self, improvement_rate: float) -> str: |
|
"""Classify model comparison uncertainty""" |
|
if improvement_rate > 0.8: |
|
return 'very_confident' |
|
elif improvement_rate > 0.6: |
|
return 'confident' |
|
elif improvement_rate > 0.4: |
|
return 'moderate' |
|
elif improvement_rate > 0.2: |
|
return 'uncertain' |
|
else: |
|
return 'very_uncertain' |
|
|
|
def _assess_cv_stability(self, cv_uncertainties: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Assess overall cross-validation stability""" |
|
|
|
stability_levels = [info.get('stability_level', 'unknown') for info in cv_uncertainties.values()] |
|
|
|
stable_count = sum(1 for level in stability_levels if level in ['very_stable', 'stable']) |
|
|
|
return { |
|
'stable_metrics': stable_count, |
|
'total_metrics': len(stability_levels), |
|
'stability_rate': stable_count / len(stability_levels) if stability_levels else 0, |
|
'overall_stability': 'stable' if stable_count / len(stability_levels) > 0.6 else 'unstable' |
|
} |
|
|
|
def _assess_ranking_uncertainty(self, ranking: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Assess uncertainty in model ranking""" |
|
|
|
if not ranking or 'ranking' not in ranking: |
|
return {'uncertainty': 'unknown', 'reason': 'No ranking data available'} |
|
|
|
ranking_data = ranking['ranking'] |
|
|
|
if len(ranking_data) < 2: |
|
return {'uncertainty': 'low', 'reason': 'Only one model'} |
|
|
|
|
|
top_model = ranking_data[0] |
|
significantly_better_count = len(top_model.get('significantly_better_than', [])) |
|
total_other_models = len(ranking_data) - 1 |
|
|
|
if significantly_better_count == total_other_models: |
|
return { |
|
'uncertainty': 'low', |
|
'reason': 'Top model significantly better than all others', |
|
'confidence': 'high' |
|
} |
|
elif significantly_better_count > total_other_models / 2: |
|
return { |
|
'uncertainty': 'medium', |
|
'reason': 'Top model significantly better than some others', |
|
'confidence': 'medium' |
|
} |
|
else: |
|
return { |
|
'uncertainty': 'high', |
|
'reason': 'No clear statistical winner among models', |
|
'confidence': 'low' |
|
} |
|
|
|
def _generate_uncertainty_recommendations(self, |
|
model_uncertainties: Dict[str, Any], |
|
feature_uncertainty: Dict[str, Any], |
|
cv_uncertainty: Dict[str, Any], |
|
prediction_uncertainty: Dict[str, Any]) -> List[Dict[str, Any]]: |
|
"""Generate actionable recommendations based on uncertainty analysis""" |
|
|
|
recommendations = [] |
|
|
|
|
|
for model_name, uncertainty in model_uncertainties.items(): |
|
overall_assessment = uncertainty.get('overall_assessment', {}) |
|
uncertainty_level = overall_assessment.get('uncertainty_level', 'unknown') |
|
|
|
if uncertainty_level in ['high', 'very_high']: |
|
recommendations.append({ |
|
'type': 'model_performance', |
|
'priority': 'high', |
|
'model': model_name, |
|
'issue': f'High performance uncertainty ({uncertainty_level})', |
|
'action': 'Collect more training data or consider model regularization', |
|
'details': { |
|
'avg_relative_uncertainty': overall_assessment.get('average_relative_uncertainty', 0), |
|
'max_relative_uncertainty': overall_assessment.get('max_relative_uncertainty', 0) |
|
} |
|
}) |
|
|
|
|
|
unstable_features = feature_uncertainty.get('unstable_features', []) |
|
if unstable_features: |
|
recommendations.append({ |
|
'type': 'feature_importance', |
|
'priority': 'medium', |
|
'issue': f'{len(unstable_features)} features have unstable importance rankings', |
|
'action': 'Review feature engineering and consider feature selection', |
|
'details': { |
|
'unstable_features': [f['feature'] for f in unstable_features], |
|
'uncertainty_rate': feature_uncertainty.get('uncertainty_summary', {}).get('uncertainty_rate', 0) |
|
} |
|
}) |
|
|
|
|
|
cv_stability = cv_uncertainty.get('stability_assessment', {}) |
|
if cv_stability.get('overall_stability') == 'unstable': |
|
recommendations.append({ |
|
'type': 'cross_validation', |
|
'priority': 'medium', |
|
'issue': 'Unstable cross-validation performance', |
|
'action': 'Check data quality, consider stratified sampling, or increase CV folds', |
|
'details': { |
|
'stability_rate': cv_stability.get('stability_rate', 0), |
|
'stable_metrics': cv_stability.get('stable_metrics', 0), |
|
'total_metrics': cv_stability.get('total_metrics', 0) |
|
} |
|
}) |
|
|
|
|
|
pred_summary = prediction_uncertainty.get('uncertainty_summary', {}) |
|
uncertainty_rate = pred_summary.get('uncertainty_rate', 0) |
|
|
|
if uncertainty_rate > 0.2: |
|
recommendations.append({ |
|
'type': 'prediction_uncertainty', |
|
'priority': 'high', |
|
'issue': f'{uncertainty_rate:.1%} of predictions have high uncertainty', |
|
'action': 'Consider implementing prediction confidence thresholds or human review for uncertain cases', |
|
'details': { |
|
'uncertain_predictions': pred_summary.get('uncertain_predictions', 0), |
|
'total_predictions': pred_summary.get('total_predictions', 0), |
|
'average_confidence': pred_summary.get('average_confidence', 0) |
|
} |
|
}) |
|
|
|
return recommendations |
|
|
|
|
|
|
|
def integrate_uncertainty_quantification_with_retrain(): |
|
"""Integration function for retrain.py""" |
|
|
|
def enhanced_model_comparison_with_uncertainty(models_dict, X_train, X_test, y_train, y_test): |
|
"""Enhanced model comparison with comprehensive uncertainty quantification""" |
|
|
|
try: |
|
quantifier = EnhancedUncertaintyQuantifier() |
|
|
|
|
|
uncertainty_report = quantifier.comprehensive_uncertainty_analysis( |
|
models_dict, X_train, X_test, y_train, y_test |
|
) |
|
|
|
|
|
report_path = uncertainty_report.save_report() |
|
|
|
|
|
model_uncertainties = uncertainty_report.model_performance_uncertainty |
|
|
|
|
|
best_model_name = min( |
|
model_uncertainties.keys(), |
|
key=lambda k: model_uncertainties[k]['overall_assessment']['average_relative_uncertainty'] |
|
) |
|
|
|
best_uncertainty = model_uncertainties[best_model_name]['overall_assessment']['average_relative_uncertainty'] |
|
uncertainty_level = model_uncertainties[best_model_name]['overall_assessment']['uncertainty_level'] |
|
|
|
|
|
promote_candidate = ( |
|
uncertainty_level in ['very_low', 'low', 'medium'] and |
|
len(uncertainty_report.recommendations) <= 2 |
|
) |
|
|
|
return { |
|
'recommended_model': best_model_name, |
|
'uncertainty_level': uncertainty_level, |
|
'average_uncertainty': best_uncertainty, |
|
'uncertainty_report': uncertainty_report.to_dict(), |
|
'report_path': str(report_path), |
|
'promote_candidate': promote_candidate, |
|
'recommendations': uncertainty_report.recommendations |
|
} |
|
|
|
except Exception as e: |
|
return {'error': f'Uncertainty quantification failed: {str(e)}'} |
|
|
|
return enhanced_model_comparison_with_uncertainty |
|
|
|
def integrate_uncertainty_quantification_with_train(): |
|
"""Integration function for train.py""" |
|
|
|
def enhanced_ensemble_validation_with_uncertainty(individual_models, ensemble_model, X, y): |
|
"""Enhanced ensemble validation with uncertainty quantification""" |
|
|
|
try: |
|
from sklearn.model_selection import train_test_split |
|
|
|
quantifier = EnhancedUncertaintyQuantifier() |
|
|
|
|
|
models_to_analyze = {**individual_models, 'ensemble': ensemble_model} |
|
|
|
|
|
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) |
|
|
|
|
|
uncertainty_report = quantifier.comprehensive_uncertainty_analysis( |
|
models_to_analyze, X_train, X_test, y_train, y_test |
|
) |
|
|
|
|
|
ensemble_uncertainty = uncertainty_report.model_performance_uncertainty.get('ensemble', {}) |
|
ensemble_uncertainty_level = ensemble_uncertainty.get('overall_assessment', {}).get('uncertainty_level', 'unknown') |
|
|
|
|
|
individual_uncertainties = [ |
|
uncertainty_report.model_performance_uncertainty[name]['overall_assessment']['average_relative_uncertainty'] |
|
for name in individual_models.keys() |
|
if name in uncertainty_report.model_performance_uncertainty |
|
] |
|
|
|
ensemble_avg_uncertainty = ensemble_uncertainty.get('overall_assessment', {}).get('average_relative_uncertainty', np.inf) |
|
best_individual_uncertainty = min(individual_uncertainties) if individual_uncertainties else np.inf |
|
|
|
|
|
use_ensemble = ( |
|
ensemble_uncertainty_level in ['very_low', 'low', 'medium'] and |
|
ensemble_avg_uncertainty <= best_individual_uncertainty * 1.1 |
|
) |
|
|
|
return { |
|
'use_ensemble': use_ensemble, |
|
'ensemble_uncertainty_level': ensemble_uncertainty_level, |
|
'ensemble_avg_uncertainty': ensemble_avg_uncertainty, |
|
'best_individual_uncertainty': best_individual_uncertainty, |
|
'uncertainty_analysis': uncertainty_report.to_dict(), |
|
'recommendations': uncertainty_report.recommendations |
|
} |
|
|
|
except Exception as e: |
|
return {'error': f'Uncertainty quantification failed: {str(e)}'} |
|
|
|
return enhanced_ensemble_validation_with_uncertainty |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
print("Testing enhanced uncertainty quantification system...") |
|
|
|
|
|
np.random.seed(42) |
|
X = np.random.randn(300, 15) |
|
y = (X[:, 0] + X[:, 1] + np.random.randn(300) * 0.2 > 0).astype(int) |
|
|
|
|
|
from sklearn.linear_model import LogisticRegression |
|
from sklearn.ensemble import RandomForestClassifier |
|
from sklearn.model_selection import train_test_split |
|
|
|
models = { |
|
'logistic_regression': LogisticRegression(random_state=42), |
|
'random_forest': RandomForestClassifier(n_estimators=50, random_state=42) |
|
} |
|
|
|
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42) |
|
|
|
|
|
if STATISTICAL_ANALYSIS_AVAILABLE: |
|
quantifier = EnhancedUncertaintyQuantifier(n_bootstrap=100) |
|
|
|
print("Running comprehensive uncertainty analysis...") |
|
uncertainty_report = quantifier.comprehensive_uncertainty_analysis( |
|
models, X_train, X_test, y_train, y_test |
|
) |
|
|
|
print(f"Generated {len(uncertainty_report.recommendations)} uncertainty-based recommendations") |
|
print(f"Overall confidence level: {uncertainty_report.confidence_level}") |
|
|
|
|
|
report_path = uncertainty_report.save_report() |
|
print(f"Uncertainty report saved to: {report_path}") |
|
|
|
print("Enhanced uncertainty quantification system test completed successfully!") |
|
|
|
else: |
|
print("Statistical analysis components not available - skipping test") |