#!/usr/bin/env python3 """ Script to transform your production database into HuggingFace dataset format. Follows the same pattern as FutureBench's convert_to_csv.py but simplified. """ import os import sys import tempfile from datetime import datetime import pandas as pd from huggingface_hub import HfApi # Add the parent directory to sys.path to allow imports (same as convert_to_csv.py) sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) # Import FutureBench models and database (same as convert_to_csv.py) # Import configuration from config_db import HF_CONFIG, PROCESSING_CONFIG from future_bench.database import get_session from future_bench.models import EventBase, Prediction def datetime_to_string(dt): """Convert datetime to string or return empty string if None (same as convert_to_csv.py)""" return dt.isoformat() if dt else "" def extract_events_and_predictions(session): """ Extract events and predictions from your database. Uses the same SQLAlchemy ORM approach as convert_to_csv.py. """ # Get all events (same as convert_to_csv.py) events = session.query(EventBase).all() if not events: print("No events found in the database.") return pd.DataFrame() # Get all predictions (same as convert_to_csv.py) predictions = session.query(Prediction).all() if not predictions: print("No predictions found in the database.") return pd.DataFrame() # Create combined view (same logic as convert_to_csv.py) combined_data = [] for event in events: if event.result is None: # Skip unresolved events continue event_predictions = [p for p in predictions if p.event_id == event.id] for pred in event_predictions: combined_data.append( { "event_id": event.id, "question": event.question, "event_type": event.event_type, "open_to_bet_until": datetime_to_string(event.open_to_bet_until), "result": event.result, "algorithm_name": pred.algorithm_name, "actual_prediction": pred.actual_prediction, "prediction_created_at": datetime_to_string(pred.created_at), } ) df = pd.DataFrame(combined_data) return df def transform_to_standard_format(df): """ Transform your raw data into the standard format expected by your leaderboard. This should match the CSV format your leaderboard already expects. """ # Convert date columns with flexible parsing for microseconds df["open_to_bet_until"] = pd.to_datetime(df["open_to_bet_until"], format="mixed") df["prediction_created_at"] = pd.to_datetime(df["prediction_created_at"], format="mixed") # Add any additional columns your leaderboard expects df["source"] = "your-app" # Add source identifier # Filter to data starting from June 12th cutoff_date = datetime(2025, 6, 12) df = df[df["prediction_created_at"] >= cutoff_date] print(f" Filtered to predictions created from {cutoff_date.strftime('%B %d, %Y')} onwards: {len(df)} records remaining") # Filter by event types df = df[df["event_type"].isin(PROCESSING_CONFIG["event_types"])] # Exclude test models df = df[~df["algorithm_name"].isin(PROCESSING_CONFIG["exclude_models"])] # Calculate accuracy per model (for summary) accuracy_df = df.groupby(["algorithm_name", "event_type"]).agg({"actual_prediction": "count", "result": lambda x: (df.loc[x.index, "actual_prediction"] == x).sum()}).rename(columns={"actual_prediction": "total_predictions", "result": "correct_predictions"}).reset_index() accuracy_df["accuracy"] = accuracy_df["correct_predictions"] / accuracy_df["total_predictions"] return df, accuracy_df def upload_to_huggingface(df, accuracy_df, repo_data, repo_results): """ Upload the transformed data to HuggingFace repositories. """ api = HfApi(token=HF_CONFIG["token"]) # Create temporary directory for files with tempfile.TemporaryDirectory() as tmp_dir: # Save main dataset data_path = os.path.join(tmp_dir, "data.csv") df.to_csv(data_path, index=False) # Save accuracy summary results_path = os.path.join(tmp_dir, "results.csv") accuracy_df.to_csv(results_path, index=False) # Upload to data repo api.upload_file(path_or_fileobj=data_path, path_in_repo="data.csv", repo_id=repo_data, repo_type="dataset") # Upload to results repo api.upload_file(path_or_fileobj=results_path, path_in_repo="results.csv", repo_id=repo_results, repo_type="dataset") print(f"✅ Uploaded data to {repo_data}") print(f"✅ Uploaded results to {repo_results}") def main(): """Main pipeline function""" print("🚀 Starting database to HuggingFace pipeline...") # Step 1: Extract from database (same as convert_to_csv.py) print("📊 Extracting data from database...") session = next(get_session()) try: df = extract_events_and_predictions(session) print(f" Found {len(df)} event-prediction pairs") finally: session.close() if len(df) == 0: print("❌ No data found in database") return # Step 2: Transform to standard format print("🔄 Transforming data...") df, accuracy_df = transform_to_standard_format(df) print(f" Processed {len(df)} records") print(f" Generated accuracy stats for {len(accuracy_df)} model-task pairs") # Step 3: Upload to HuggingFace if HF_CONFIG["token"]: print("☁️ Uploading to HuggingFace...") upload_to_huggingface(df, accuracy_df, HF_CONFIG["data_repo"], HF_CONFIG["results_repo"]) else: print("⚠️ No HF_TOKEN found, saving locally instead...") df.to_csv("data_export.csv", index=False) accuracy_df.to_csv("results_export.csv", index=False) print(" Saved data_export.csv and results_export.csv") print("✅ Pipeline completed successfully!") if __name__ == "__main__": main()