Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Script to transform your production database into HuggingFace dataset format. | |
Follows the same pattern as FutureBench's convert_to_csv.py but simplified. | |
""" | |
import os | |
import sys | |
import tempfile | |
from datetime import datetime | |
import pandas as pd | |
from huggingface_hub import HfApi | |
# Add the parent directory to sys.path to allow imports (same as convert_to_csv.py) | |
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) | |
# Import FutureBench models and database (same as convert_to_csv.py) | |
# Import configuration | |
from config_db import HF_CONFIG, PROCESSING_CONFIG | |
from future_bench.database import get_session | |
from future_bench.models import EventBase, Prediction | |
def datetime_to_string(dt): | |
"""Convert datetime to string or return empty string if None (same as convert_to_csv.py)""" | |
return dt.isoformat() if dt else "" | |
def extract_events_and_predictions(session): | |
""" | |
Extract events and predictions from your database. | |
Uses the same SQLAlchemy ORM approach as convert_to_csv.py. | |
""" | |
# Get all events (same as convert_to_csv.py) | |
events = session.query(EventBase).all() | |
if not events: | |
print("No events found in the database.") | |
return pd.DataFrame() | |
# Get all predictions (same as convert_to_csv.py) | |
predictions = session.query(Prediction).all() | |
if not predictions: | |
print("No predictions found in the database.") | |
return pd.DataFrame() | |
# Create combined view (same logic as convert_to_csv.py) | |
combined_data = [] | |
for event in events: | |
if event.result is None: # Skip unresolved events | |
continue | |
event_predictions = [p for p in predictions if p.event_id == event.id] | |
for pred in event_predictions: | |
combined_data.append( | |
{ | |
"event_id": event.id, | |
"question": event.question, | |
"event_type": event.event_type, | |
"open_to_bet_until": datetime_to_string(event.open_to_bet_until), | |
"result": event.result, | |
"algorithm_name": pred.algorithm_name, | |
"actual_prediction": pred.actual_prediction, | |
"prediction_created_at": datetime_to_string(pred.created_at), | |
} | |
) | |
df = pd.DataFrame(combined_data) | |
return df | |
def transform_to_standard_format(df): | |
""" | |
Transform your raw data into the standard format expected by your leaderboard. | |
This should match the CSV format your leaderboard already expects. | |
""" | |
# Convert date columns with flexible parsing for microseconds | |
df["open_to_bet_until"] = pd.to_datetime(df["open_to_bet_until"], format="mixed") | |
df["prediction_created_at"] = pd.to_datetime(df["prediction_created_at"], format="mixed") | |
# Add any additional columns your leaderboard expects | |
df["source"] = "your-app" # Add source identifier | |
# Filter to data starting from June 12th | |
cutoff_date = datetime(2025, 6, 12) | |
df = df[df["prediction_created_at"] >= cutoff_date] | |
print(f" Filtered to predictions created from {cutoff_date.strftime('%B %d, %Y')} onwards: {len(df)} records remaining") | |
# Filter by event types | |
df = df[df["event_type"].isin(PROCESSING_CONFIG["event_types"])] | |
# Exclude test models | |
df = df[~df["algorithm_name"].isin(PROCESSING_CONFIG["exclude_models"])] | |
# Calculate accuracy per model (for summary) | |
accuracy_df = df.groupby(["algorithm_name", "event_type"]).agg({"actual_prediction": "count", "result": lambda x: (df.loc[x.index, "actual_prediction"] == x).sum()}).rename(columns={"actual_prediction": "total_predictions", "result": "correct_predictions"}).reset_index() | |
accuracy_df["accuracy"] = accuracy_df["correct_predictions"] / accuracy_df["total_predictions"] | |
return df, accuracy_df | |
def upload_to_huggingface(df, accuracy_df, repo_data, repo_results): | |
""" | |
Upload the transformed data to HuggingFace repositories. | |
""" | |
api = HfApi(token=HF_CONFIG["token"]) | |
# Create temporary directory for files | |
with tempfile.TemporaryDirectory() as tmp_dir: | |
# Save main dataset | |
data_path = os.path.join(tmp_dir, "data.csv") | |
df.to_csv(data_path, index=False) | |
# Save accuracy summary | |
results_path = os.path.join(tmp_dir, "results.csv") | |
accuracy_df.to_csv(results_path, index=False) | |
# Upload to data repo | |
api.upload_file(path_or_fileobj=data_path, path_in_repo="data.csv", repo_id=repo_data, repo_type="dataset") | |
# Upload to results repo | |
api.upload_file(path_or_fileobj=results_path, path_in_repo="results.csv", repo_id=repo_results, repo_type="dataset") | |
print(f"โ Uploaded data to {repo_data}") | |
print(f"โ Uploaded results to {repo_results}") | |
def main(): | |
"""Main pipeline function""" | |
print("๐ Starting database to HuggingFace pipeline...") | |
# Step 1: Extract from database (same as convert_to_csv.py) | |
print("๐ Extracting data from database...") | |
session = next(get_session()) | |
try: | |
df = extract_events_and_predictions(session) | |
print(f" Found {len(df)} event-prediction pairs") | |
finally: | |
session.close() | |
if len(df) == 0: | |
print("โ No data found in database") | |
return | |
# Step 2: Transform to standard format | |
print("๐ Transforming data...") | |
df, accuracy_df = transform_to_standard_format(df) | |
print(f" Processed {len(df)} records") | |
print(f" Generated accuracy stats for {len(accuracy_df)} model-task pairs") | |
# Step 3: Upload to HuggingFace | |
if HF_CONFIG["token"]: | |
print("โ๏ธ Uploading to HuggingFace...") | |
upload_to_huggingface(df, accuracy_df, HF_CONFIG["data_repo"], HF_CONFIG["results_repo"]) | |
else: | |
print("โ ๏ธ No HF_TOKEN found, saving locally instead...") | |
df.to_csv("data_export.csv", index=False) | |
accuracy_df.to_csv("results_export.csv", index=False) | |
print(" Saved data_export.csv and results_export.csv") | |
print("โ Pipeline completed successfully!") | |
if __name__ == "__main__": | |
main() | |