Spaces:
Running
Running
from datetime import timedelta | |
import pandas as pd | |
from .about import Tasks | |
from .display_utils import format_percentage, make_clickable_model | |
def clean_model_name(model_name: str) -> str: | |
"""Clean up model names for better display""" | |
if model_name.startswith("smolagents-tavily-web-visit-"): | |
return "Agent Baseline " + model_name.removeprefix("smolagents-tavily-web-visit-") | |
if model_name.startswith("language-model-"): | |
return "Language Model " + model_name.removeprefix("language-model-") | |
return model_name | |
def get_available_weeks(predictions_df): | |
"""Get list of available weeks from the data""" | |
if predictions_df is None or predictions_df.empty: | |
return [] | |
# Get unique dates and convert to weeks | |
dates = predictions_df["open_to_bet_until"].dt.date.unique() | |
weeks = {} | |
for date in dates: | |
# Get the Monday of the week for this date | |
monday = date - timedelta(days=date.weekday()) | |
week_end = monday + timedelta(days=6) | |
week_key = f"{monday} to {week_end}" | |
week_range = (monday, week_end) | |
weeks[week_key] = week_range | |
# Sort by date | |
sorted_weeks = sorted(weeks.items(), key=lambda x: x[1][0]) | |
return [("All Time", None)] + sorted_weeks | |
def filter_data_by_week(predictions_df, week_range): | |
"""Filter predictions data by week range""" | |
if predictions_df is None or predictions_df.empty or week_range is None: | |
return predictions_df | |
start_date, end_date = week_range | |
# Filter data where open_to_bet_until falls within the week | |
filtered_df = predictions_df[(predictions_df["open_to_bet_until"].dt.date >= start_date) & (predictions_df["open_to_bet_until"].dt.date <= end_date)] | |
return filtered_df | |
def create_leaderboard_df(predictions_df, week_filter=None): | |
""" | |
Create leaderboard DataFrame from predictions CSV data | |
Much simpler than Future-Bench's complex JSON parsing | |
""" | |
if predictions_df is None or predictions_df.empty: | |
return pd.DataFrame() | |
# Apply week filter if specified | |
if week_filter is not None: | |
predictions_df = filter_data_by_week(predictions_df, week_filter) | |
if predictions_df.empty: | |
return pd.DataFrame() | |
# Calculate accuracy by algorithm and event type | |
results = [] | |
# Group by algorithm to calculate metrics | |
for algorithm in predictions_df["algorithm_name"].unique(): | |
algo_data = predictions_df[predictions_df["algorithm_name"] == algorithm] | |
# Filter out rows where result is null (unresolved events) | |
resolved_data = algo_data[algo_data["result"].notna()] | |
if len(resolved_data) == 0: | |
continue | |
# Calculate accuracy for each event type | |
cleaned_algorithm = clean_model_name(algorithm) | |
algo_scores = {"Model": make_clickable_model(cleaned_algorithm), "Events": len(resolved_data), "Correct Predictions": 0} | |
task_scores = [] | |
for task in Tasks: | |
task_data = resolved_data[resolved_data["event_type"] == task.value.benchmark] | |
if len(task_data) > 0: | |
# Calculate accuracy for this task | |
# Handle different prediction formats | |
correct = 0 | |
total = len(task_data) | |
for _, row in task_data.iterrows(): | |
prediction = row["actual_prediction"] | |
actual = row["result"] | |
# Simple string comparison for now | |
# Could be enhanced for more complex prediction formats | |
if str(prediction).lower().strip() == str(actual).lower().strip(): | |
correct += 1 | |
accuracy = (correct / total) * 100 if total > 0 else 0 | |
algo_scores[task.value.col_name] = accuracy | |
task_scores.append(accuracy) | |
# Add to total correct predictions | |
algo_scores["Correct Predictions"] += correct | |
else: | |
algo_scores[task.value.col_name] = None | |
# Calculate average accuracy across tasks where model made predictions | |
if task_scores: | |
algo_scores["Average"] = sum(task_scores) / len(task_scores) | |
else: | |
algo_scores["Average"] = 0 | |
results.append(algo_scores) | |
# Create DataFrame | |
df = pd.DataFrame(results) | |
# Sort by average score (descending) | |
if "Average" in df.columns: | |
df = df.sort_values("Average", ascending=False) | |
# Reset index to ensure proper row indexing | |
df = df.reset_index(drop=True) | |
# Add rank column with medals for top 3 and numbers for rest | |
ranks = [] | |
for i in range(len(df)): | |
if i == 0: | |
ranks.append("๐ฅ") | |
elif i == 1: | |
ranks.append("๐ฅ") | |
elif i == 2: | |
ranks.append("๐ฅ") | |
else: | |
ranks.append(f"#{i + 1}") | |
# Insert rank column at the beginning | |
df.insert(0, "Rank", ranks) | |
# Format percentage columns | |
for task in Tasks: | |
if task.value.col_name in df.columns: | |
df[task.value.col_name] = df[task.value.col_name].apply(format_percentage) | |
if "Average" in df.columns: | |
df["Average"] = df["Average"].apply(format_percentage) | |
return df | |
def get_leaderboard_summary(df): | |
"""Get summary statistics for the leaderboard""" | |
if df is None or df.empty: | |
return {"total_models": 0, "total_predictions": 0, "avg_accuracy": 0} | |
total_models = len(df) | |
total_predictions = df["Events"].sum() if "Events" in df.columns else 0 | |
# Calculate average accuracy across all models | |
avg_accuracy = 0 | |
if "Average" in df.columns: | |
# Extract numeric values from percentage strings | |
numeric_scores = [] | |
for score in df["Average"]: | |
if score != "N/A": | |
try: | |
numeric_scores.append(float(score.replace("%", ""))) | |
except Exception: | |
pass | |
if numeric_scores: | |
avg_accuracy = sum(numeric_scores) / len(numeric_scores) | |
return {"total_models": total_models, "total_predictions": total_predictions, "avg_accuracy": avg_accuracy} | |
def filter_leaderboard(df, min_predictions=0): | |
"""Filter leaderboard by minimum number of predictions""" | |
if df is None or df.empty: | |
return df | |
if "Events" in df.columns: | |
return df[df["Events"] >= min_predictions] | |
return df | |