FutureBench / process_data /db_to_hf.py
vinid's picture
Leaderboard deployment 2025-07-16 18:05:41
6441bc6
#!/usr/bin/env python3
"""
Script to transform your production database into HuggingFace dataset format.
Follows the same pattern as FutureBench's convert_to_csv.py but simplified.
"""
import os
import sys
import tempfile
from datetime import datetime
import pandas as pd
from huggingface_hub import HfApi
# Add the parent directory to sys.path to allow imports (same as convert_to_csv.py)
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
# Import FutureBench models and database (same as convert_to_csv.py)
# Import configuration
from config_db import HF_CONFIG, PROCESSING_CONFIG
from future_bench.database import get_session
from future_bench.models import EventBase, Prediction
def datetime_to_string(dt):
"""Convert datetime to string or return empty string if None (same as convert_to_csv.py)"""
return dt.isoformat() if dt else ""
def extract_events_and_predictions(session):
"""
Extract events and predictions from your database.
Uses the same SQLAlchemy ORM approach as convert_to_csv.py.
"""
# Get all events (same as convert_to_csv.py)
events = session.query(EventBase).all()
if not events:
print("No events found in the database.")
return pd.DataFrame()
# Get all predictions (same as convert_to_csv.py)
predictions = session.query(Prediction).all()
if not predictions:
print("No predictions found in the database.")
return pd.DataFrame()
# Create combined view (same logic as convert_to_csv.py)
combined_data = []
for event in events:
if event.result is None: # Skip unresolved events
continue
event_predictions = [p for p in predictions if p.event_id == event.id]
for pred in event_predictions:
combined_data.append(
{
"event_id": event.id,
"question": event.question,
"event_type": event.event_type,
"open_to_bet_until": datetime_to_string(event.open_to_bet_until),
"result": event.result,
"algorithm_name": pred.algorithm_name,
"actual_prediction": pred.actual_prediction,
"prediction_created_at": datetime_to_string(pred.created_at),
}
)
df = pd.DataFrame(combined_data)
return df
def transform_to_standard_format(df):
"""
Transform your raw data into the standard format expected by your leaderboard.
This should match the CSV format your leaderboard already expects.
"""
# Convert date columns with flexible parsing for microseconds
df["open_to_bet_until"] = pd.to_datetime(df["open_to_bet_until"], format="mixed")
df["prediction_created_at"] = pd.to_datetime(df["prediction_created_at"], format="mixed")
# Add any additional columns your leaderboard expects
df["source"] = "your-app" # Add source identifier
# Filter to data starting from June 12th
cutoff_date = datetime(2025, 6, 12)
df = df[df["prediction_created_at"] >= cutoff_date]
print(f" Filtered to predictions created from {cutoff_date.strftime('%B %d, %Y')} onwards: {len(df)} records remaining")
# Filter by event types
df = df[df["event_type"].isin(PROCESSING_CONFIG["event_types"])]
# Exclude test models
df = df[~df["algorithm_name"].isin(PROCESSING_CONFIG["exclude_models"])]
# Calculate accuracy per model (for summary)
accuracy_df = df.groupby(["algorithm_name", "event_type"]).agg({"actual_prediction": "count", "result": lambda x: (df.loc[x.index, "actual_prediction"] == x).sum()}).rename(columns={"actual_prediction": "total_predictions", "result": "correct_predictions"}).reset_index()
accuracy_df["accuracy"] = accuracy_df["correct_predictions"] / accuracy_df["total_predictions"]
return df, accuracy_df
def upload_to_huggingface(df, accuracy_df, repo_data, repo_results):
"""
Upload the transformed data to HuggingFace repositories.
"""
api = HfApi(token=HF_CONFIG["token"])
# Create temporary directory for files
with tempfile.TemporaryDirectory() as tmp_dir:
# Save main dataset
data_path = os.path.join(tmp_dir, "data.csv")
df.to_csv(data_path, index=False)
# Save accuracy summary
results_path = os.path.join(tmp_dir, "results.csv")
accuracy_df.to_csv(results_path, index=False)
# Upload to data repo
api.upload_file(path_or_fileobj=data_path, path_in_repo="data.csv", repo_id=repo_data, repo_type="dataset")
# Upload to results repo
api.upload_file(path_or_fileobj=results_path, path_in_repo="results.csv", repo_id=repo_results, repo_type="dataset")
print(f"โœ… Uploaded data to {repo_data}")
print(f"โœ… Uploaded results to {repo_results}")
def main():
"""Main pipeline function"""
print("๐Ÿš€ Starting database to HuggingFace pipeline...")
# Step 1: Extract from database (same as convert_to_csv.py)
print("๐Ÿ“Š Extracting data from database...")
session = next(get_session())
try:
df = extract_events_and_predictions(session)
print(f" Found {len(df)} event-prediction pairs")
finally:
session.close()
if len(df) == 0:
print("โŒ No data found in database")
return
# Step 2: Transform to standard format
print("๐Ÿ”„ Transforming data...")
df, accuracy_df = transform_to_standard_format(df)
print(f" Processed {len(df)} records")
print(f" Generated accuracy stats for {len(accuracy_df)} model-task pairs")
# Step 3: Upload to HuggingFace
if HF_CONFIG["token"]:
print("โ˜๏ธ Uploading to HuggingFace...")
upload_to_huggingface(df, accuracy_df, HF_CONFIG["data_repo"], HF_CONFIG["results_repo"])
else:
print("โš ๏ธ No HF_TOKEN found, saving locally instead...")
df.to_csv("data_export.csv", index=False)
accuracy_df.to_csv("results_export.csv", index=False)
print(" Saved data_export.csv and results_export.csv")
print("โœ… Pipeline completed successfully!")
if __name__ == "__main__":
main()