""" UFC Fight Prediction Pipeline Copyright (C) 2025 Alvaro This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . """ import csv import os from datetime import datetime from collections import OrderedDict import json import joblib from ..config import FIGHTS_CSV_PATH, MODEL_RESULTS_PATH, MODELS_DIR, LAST_EVENT_JSON_PATH from .models import BaseModel from sklearn.model_selection import KFold import mlflow import mlflow.sklearn class PredictionPipeline: """ Orchestrates the model training, evaluation, and reporting pipeline. """ def __init__(self, models, use_existing_models=True, force_retrain=False): if not all(isinstance(m, BaseModel) for m in models): raise TypeError("All models must be instances of BaseModel.") self.models = models self.train_fights = [] self.test_fights = [] self.results = {} self.use_existing_models = use_existing_models self.force_retrain = force_retrain def _get_last_trained_event(self): """Get the last event that models were trained on.""" if not os.path.exists(LAST_EVENT_JSON_PATH): return None try: with open(LAST_EVENT_JSON_PATH, 'r', encoding='utf-8') as f: last_event_data = json.load(f) if isinstance(last_event_data, list) and len(last_event_data) > 0: return last_event_data[0].get('name'), last_event_data[0].get('date') return None, None except (json.JSONDecodeError, FileNotFoundError): return None, None def _save_last_trained_event(self, event_name, event_date): """Save the last event that models were trained on.""" last_event_data = [{ "name": event_name, "date": event_date, "training_timestamp": datetime.now().isoformat() }] try: with open(LAST_EVENT_JSON_PATH, 'w', encoding='utf-8') as f: json.dump(last_event_data, f, indent=4) except Exception as e: print(f"Warning: Could not save last trained event: {e}") def _has_new_data_since_last_training(self): """Check if there's new fight data since the last training.""" last_event_name, last_event_date = self._get_last_trained_event() if not last_event_name or not last_event_date: return True # No previous training record, consider as new data if not os.path.exists(FIGHTS_CSV_PATH): return False with open(FIGHTS_CSV_PATH, 'r', encoding='utf-8') as f: fights = list(csv.DictReader(f)) if not fights: return False # Sort fights by date to get the latest event fights.sort(key=lambda x: datetime.strptime(x['event_date'], '%B %d, %Y')) latest_fight = fights[-1] latest_event_name = latest_fight['event_name'] latest_event_date = latest_fight['event_date'] # Check if we have new events since last training if latest_event_name != last_event_name: print(f"New data detected: Latest event '{latest_event_name}' differs from last trained event '{last_event_name}'") return True return False def _model_exists(self, model): """Check if a saved model file exists and can be loaded successfully.""" model_name = model.__class__.__name__ file_name = f"{model_name}.joblib" save_path = os.path.join(MODELS_DIR, file_name) if not os.path.exists(save_path): return False # Verify the model can actually be loaded try: joblib.load(save_path) return True except Exception as e: print(f"Warning: Model file {file_name} exists but cannot be loaded ({e}). Will retrain.") return False def _load_existing_model(self, model_class): """Load an existing model from disk.""" model_name = model_class.__name__ file_name = f"{model_name}.joblib" load_path = os.path.join(MODELS_DIR, file_name) try: loaded_model = joblib.load(load_path) print(f"Loaded existing model: {model_name}") return loaded_model except Exception as e: print(f"Error loading model {model_name}: {e}") return None def _should_retrain_models(self): """Determine if models should be retrained.""" if self.force_retrain: print("Force retrain flag is set. Retraining all models.") return True if not self.use_existing_models: print("Use existing models flag is disabled. Retraining all models.") return True # Check if any model files are missing missing_models = [m for m in self.models if not self._model_exists(m)] if missing_models: missing_names = [m.__class__.__name__ for m in missing_models] print(f"Missing model files for: {missing_names}. Retraining all models.") return True # Check if there's new data since last training if self._has_new_data_since_last_training(): return True print("No new data detected and all model files exist. Using existing models.") return False def _load_and_split_data(self, num_test_events: int = 1) -> None: """Loads and splits the data into chronological training and testing sets.""" print("\n--- Loading and Splitting Data ---") if not os.path.exists(FIGHTS_CSV_PATH): raise FileNotFoundError(f"Fights data not found at '{FIGHTS_CSV_PATH}'.") fights = self._load_fights() all_events = list(OrderedDict.fromkeys(f['event_name'] for f in fights)) if len(all_events) < num_test_events: print(f"Warning: Fewer than {num_test_events} events found. Adjusting test set size.") num_test_events = len(all_events) test_event_names = all_events[-num_test_events:] self.train_fights = [f for f in fights if f['event_name'] not in test_event_names] self.test_fights = [f for f in fights if f['event_name'] in test_event_names] print(f"Data loaded. {len(self.train_fights)} training fights, {len(self.test_fights)} testing fights.") print(f"Testing on the last {num_test_events} event(s): {', '.join(test_event_names)}") def _load_fights(self) -> list: """Helper method to load and sort fights from CSV.""" with open(FIGHTS_CSV_PATH, 'r', encoding='utf-8') as f: fights = list(csv.DictReader(f)) fights.sort(key=lambda x: datetime.strptime(x['event_date'], '%B %d, %Y')) return fights def run(self, detailed_report: bool = True) -> None: """Executes the full pipeline: load, train, evaluate, report and save models.""" self._load_and_split_data() eval_fights = [f for f in self.test_fights if f['winner'] not in ["Draw", "NC", ""]] if not eval_fights: print("No fights with definitive outcomes in the test set. Aborting.") return should_retrain = self._should_retrain_models() for i, model in enumerate(self.models): model_name = model.__class__.__name__ print(f"\n--- Evaluating Model: {model_name} ---") if should_retrain: print(f"Training {model_name}...") model.train(self.train_fights) else: # Try to load existing model, fall back to training if loading fails loaded_model = self._load_existing_model(model.__class__) if loaded_model is not None: # Replace the model instance with the loaded one self.models[i] = loaded_model model = loaded_model else: print(f"Failed to load {model_name}, training new model...") model.train(self.train_fights) correct_predictions = 0 predictions = [] for fight in eval_fights: f1_name, f2_name = fight['fighter_1'], fight['fighter_2'] actual_winner = fight['winner'] event_name = fight.get('event_name', 'Unknown Event') prediction_result = model.predict(fight) predicted_winner = prediction_result.get('winner') probability = prediction_result.get('probability') is_correct = (predicted_winner == actual_winner) if is_correct: correct_predictions += 1 predictions.append({ 'fight': f"{f1_name} vs. {f2_name}", 'event': event_name, 'predicted_winner': predicted_winner, 'probability': f"{probability:.1%}" if probability is not None else "N/A", 'actual_winner': actual_winner, 'is_correct': is_correct }) accuracy = (correct_predictions / len(eval_fights)) * 100 model_status = "retrained" if should_retrain else "loaded from disk" self.results[model_name] = { 'accuracy': accuracy, 'predictions': predictions, 'total_fights': len(eval_fights), 'model_status': model_status } if detailed_report: self._report_detailed_results() else: self._report_summary() # Only train and save models if retraining was performed if should_retrain: self._train_and_save_models() def run_kfold_cv(self, k: int = 3, holdout_events: int = 1): """Performs k-fold cross-validation where each fold is a set of events. Within each fold, we keep the last *holdout_events* for testing.""" fights = self._load_fights() # Build an ordered list of unique events event_list = list(OrderedDict.fromkeys(f['event_name'] for f in fights)) # Initialize KFold splitter on events kf = KFold(n_splits=k, shuffle=True, random_state=42) all_fold_metrics = [] for fold_idx, (train_event_idx, test_event_idx) in enumerate(kf.split(event_list), start=1): train_events = [event_list[i] for i in train_event_idx] # Collect fights that belong to the training events fold_fights = [f for f in fights if f['event_name'] in train_events] # Inside this fold, reserve the last `holdout_events` events for testing fold_events_ordered = list(OrderedDict.fromkeys(f['event_name'] for f in fold_fights)) test_events = fold_events_ordered[-holdout_events:] train_set = [f for f in fold_fights if f['event_name'] not in test_events] test_set = [f for f in fold_fights if f['event_name'] in test_events] # Start an MLflow run for the current fold mlflow.set_experiment("UFC_KFold_CV") with mlflow.start_run(run_name=f"fold_{fold_idx}"): # Log meta information about the fold mlflow.log_param("fold", fold_idx) mlflow.log_param("train_events", len(train_events)) mlflow.log_param("test_events", holdout_events) fold_results = {} for model in self.models: model_name = model.__class__.__name__ # Train and evaluate model.train(train_set) correct = 0 total_fights = 0 for fight in test_set: if fight['winner'] not in ["Draw", "NC", ""]: prediction = model.predict(fight) if prediction.get('winner') == fight['winner']: correct += 1 total_fights += 1 acc = correct / total_fights if total_fights > 0 else 0.0 fold_results[model_name] = acc # Log metrics and register model to appear in MLflow Models tab mlflow.log_metric(f"accuracy_{model_name}", acc) mlflow.log_metric(f"total_fights_{model_name}", total_fights) # Register the model with MLflow to appear in Models tab mlflow.sklearn.log_model( model, f"model_{model_name}", registered_model_name=f"{model_name}_UFC_Model" ) all_fold_metrics.append(fold_results) return all_fold_metrics def update_models_if_new_data(self): """ Checks for new data and retrains/saves all models on the full dataset if needed. This does not run any evaluation. """ print("\n--- Checking for Model Updates ---") # Check if any model files are missing or invalid missing_models = [m for m in self.models if not self._model_exists(m)] has_new_data = self._has_new_data_since_last_training() if missing_models: missing_names = [m.__class__.__name__ for m in missing_models] print(f"Missing or invalid model files found for: {missing_names}.") self._train_and_save_models() elif has_new_data: print("New data detected, retraining all models...") self._train_and_save_models() else: print("No new data detected. Models are already up-to-date.") def _train_and_save_models(self): """Trains all models on the full dataset and saves them.""" print("\n\n--- Training and Saving All Models on Full Dataset ---") if not os.path.exists(FIGHTS_CSV_PATH): print(f"Error: Fights data not found at '{FIGHTS_CSV_PATH}'. Cannot save models.") return with open(FIGHTS_CSV_PATH, 'r', encoding='utf-8') as f: all_fights = list(csv.DictReader(f)) print(f"Training models on all {len(all_fights)} available fights...") if not os.path.exists(MODELS_DIR): os.makedirs(MODELS_DIR) print(f"Created directory: {MODELS_DIR}") # Get the latest event info for tracking if all_fights: all_fights.sort(key=lambda x: datetime.strptime(x['event_date'], '%B %d, %Y')) latest_fight = all_fights[-1] latest_event_name = latest_fight['event_name'] latest_event_date = latest_fight['event_date'] for model in self.models: model_name = model.__class__.__name__ print(f"\n--- Training: {model_name} ---") model.train(all_fights) # Sanitize and save the model file_name = f"{model_name}.joblib" save_path = os.path.join(MODELS_DIR, file_name) joblib.dump(model, save_path) print(f"Model saved successfully to {save_path}") # Save the last trained event info if all_fights: self._save_last_trained_event(latest_event_name, latest_event_date) print(f"Updated last trained event: {latest_event_name} ({latest_event_date})") def _report_summary(self): """Prints a concise summary of model performance.""" print("\n\n--- Prediction Pipeline Summary ---") print(f"{'Model':<25} | {'Accuracy':<10} | {'Fights Evaluated':<20} | {'Status':<15}") print("-" * 80) for model_name, result in self.results.items(): status = result.get('model_status', 'unknown') print(f"{model_name:<25} | {result['accuracy']:<9.2f}% | {result['total_fights']:<20} | {status:<15}") print("-" * 80) def _save_report_to_json(self, file_path=MODEL_RESULTS_PATH): """Saves the detailed prediction results to a JSON file.""" print(f"\nSaving detailed report to {file_path}...") try: # Create a report structure that is clean and JSON-friendly report = {} for model_name, result in self.results.items(): # Group predictions by event for a more organized report predictions_by_event = {} for p in result['predictions']: event_name = p.pop('event') # Extract event and remove it from the sub-dictionary if event_name not in predictions_by_event: predictions_by_event[event_name] = [] predictions_by_event[event_name].append(p) report[model_name] = { "overall_accuracy": f"{result['accuracy']:.2f}%", "total_fights_evaluated": result['total_fights'], "model_status": result.get('model_status', 'unknown'), "predictions_by_event": predictions_by_event } with open(file_path, 'w', encoding='utf-8') as f: json.dump(report, f, indent=4) print("Report saved successfully.") except (IOError, TypeError) as e: print(f"Error saving report to JSON file: {e}") def _report_detailed_results(self): """Prints a summary and saves the detailed report to a file.""" print("\n\n--- Prediction Pipeline Finished: Detailed Report ---") # A summary is printed to the console for convenience. self._report_summary() # The detailed report is now saved to a JSON file. self._save_report_to_json()