import json import os import pandas as pd from src.display.formatting import has_no_nan_values, make_clickable_model from src.display.utils import AutoEvalColumn, EvalQueueColumn, ModelType, Precision, WeightType from src.leaderboard.read_evals import get_raw_eval_results from src.about import Tasks def load_csv_results(): """Load results from main-results.csv file""" csv_path = "main-results.csv" if not os.path.exists(csv_path): return [] df = pd.read_csv(csv_path) results = [] for _, row in df.iterrows(): # Parse parameters - handle different formats param_str = str(row['Param']) if 'activated' in param_str: # Extract the activated parameter count (e.g., "2.8B activated (16B total)") param_value = float(param_str.split('B')[0]) elif 'B' in param_str: # Simple format (e.g., "9B") param_value = float(param_str.replace('B', '')) else: param_value = 0 # Convert CSV data to the format expected by the leaderboard data_dict = { AutoEvalColumn.model.name: make_clickable_model(row['Model']), AutoEvalColumn.average.name: row['ACC'], # Using ACC as the average score AutoEvalColumn.params.name: param_value, AutoEvalColumn.license.name: "Open Source" if row['Open Source?'] == 'Yes' else "Proprietary", AutoEvalColumn.model_type.name: ModelType.FT.value.name, # Default to fine-tuned AutoEvalColumn.precision.name: Precision.float16.value.name, # Default precision AutoEvalColumn.weight_type.name: WeightType.Original.value.name, AutoEvalColumn.architecture.name: "Unknown", AutoEvalColumn.still_on_hub.name: True, AutoEvalColumn.revision.name: "", AutoEvalColumn.likes.name: 0, AutoEvalColumn.model_type_symbol.name: ModelType.FT.value.symbol, } # Add task-specific scores (required by the leaderboard) for task in Tasks: data_dict[task.name] = row['ACC'] # Use the same ACC score for all tasks results.append(data_dict) return results def get_leaderboard_df(results_path: str, requests_path: str, cols: list, benchmark_cols: list) -> pd.DataFrame: """Creates a dataframe from all the individual experiment results""" raw_data = get_raw_eval_results(results_path, requests_path) all_data_json = [v.to_dict() for v in raw_data] # If no JSON data found, try loading from CSV if not all_data_json: all_data_json = load_csv_results() if not all_data_json: # Return empty dataframe if no data found return pd.DataFrame(columns=cols) df = pd.DataFrame.from_records(all_data_json) # Only include columns that exist in the dataframe existing_cols = [col for col in cols if col in df.columns] df = df.sort_values(by=[AutoEvalColumn.average.name], ascending=False) df = df[existing_cols].round(decimals=2) # filter out if any of the benchmarks have not been produced df = df[has_no_nan_values(df, benchmark_cols)] return df def get_evaluation_queue_df(save_path: str, cols: list) -> list[pd.DataFrame]: """Creates the different dataframes for the evaluation queues requestes""" entries = [entry for entry in os.listdir(save_path) if not entry.startswith(".")] all_evals = [] for entry in entries: if ".json" in entry: file_path = os.path.join(save_path, entry) with open(file_path) as fp: data = json.load(fp) data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) data[EvalQueueColumn.revision.name] = data.get("revision", "main") all_evals.append(data) elif ".md" not in entry: # this is a folder sub_entries = [e for e in os.listdir(f"{save_path}/{entry}") if os.path.isfile(e) and not e.startswith(".")] for sub_entry in sub_entries: file_path = os.path.join(save_path, entry, sub_entry) with open(file_path) as fp: data = json.load(fp) data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) data[EvalQueueColumn.revision.name] = data.get("revision", "main") all_evals.append(data) pending_list = [e for e in all_evals if e["status"] in ["PENDING", "RERUN"]] running_list = [e for e in all_evals if e["status"] == "RUNNING"] finished_list = [e for e in all_evals if e["status"].startswith("FINISHED") or e["status"] == "PENDING_NEW_EVAL"] df_pending = pd.DataFrame.from_records(pending_list, columns=cols) df_running = pd.DataFrame.from_records(running_list, columns=cols) df_finished = pd.DataFrame.from_records(finished_list, columns=cols) return df_finished[cols], df_running[cols], df_pending[cols]