mshuaibi's picture
clarify all set
7c5be67
import json
import os
import tempfile
from email.utils import parseaddr
from datetime import datetime
from typing import Dict, List, Tuple, Optional
import gradio as gr
import numpy as np
import pandas as pd
from apscheduler.schedulers.background import BackgroundScheduler
from datasets import VerificationMode, load_dataset, Dataset
from huggingface_hub import HfApi, snapshot_download
from collections import defaultdict
import seaborn as sns
from content import (
CITATION_BUTTON_LABEL,
CITATION_BUTTON_TEXT,
INTRODUCTION_TEXT,
SUBMISSION_TEXT,
PRE_COLUMN_NAMES,
POST_COLUMN_NAMES,
TITLE,
TYPES,
model_hyperlink,
)
from evaluator import evaluate
# Configuration constants
TOKEN = os.environ.get("TOKEN", None)
OWNER = "facebook"
# Dataset repositories
INTERNAL_DATA_DATASET = f"{OWNER}/fairchem_internal"
SUBMISSION_DATASET = f"{OWNER}/fairchem_leaderboard_submissions"
RESULTS_DATASET = f"{OWNER}/fairchem_leaderboard_results"
CONTACT_DATASET = f"{OWNER}/fairchem_leaderboard_contact_info_internal"
LEADERBOARD_PATH = f"{OWNER}/fairchem_leaderboard"
# Initialize HuggingFace API
api = HfApi()
# S2EF subsplits for validation and test data
S2EF_SUBSPLITS = [
"all",
"biomolecules",
"electrolytes",
"metal_complexes",
"neutral_organics",
]
# Evaluation types that are not S2EF
OTHER_EVAL_TYPES = [
"Ligand pocket",
"Ligand strain",
"Conformers",
"Protonation",
"Distance scaling",
"IE_EA",
"Spin gap",
]
# All evaluation types for the dropdown
ALL_EVAL_TYPES = ["Validation", "Test"] + OTHER_EVAL_TYPES
class LeaderboardData:
"""
Manages leaderboard data loading and processing.
"""
def __init__(self):
self._setup_data_paths()
self._load_contact_info()
self._eval_results = None
self._results_dfs = None
def _setup_data_paths(self):
"""
Setup target and result file paths.
"""
target_data_dir = snapshot_download(
repo_id=INTERNAL_DATA_DATASET,
repo_type="dataset",
token=TOKEN,
)
self.target_paths = {
"Validation": f"{target_data_dir}/omol_val_labels.npz",
"Test": f"{target_data_dir}/omol_test_labels.npz",
"Distance Scaling": f"{target_data_dir}/distance_scaling_labels.json",
"Ligand pocket": f"{target_data_dir}/ligand_pocket_labels.json",
"Ligand strain": f"{target_data_dir}/ligand_strain_labels.json",
"Conformers": f"{target_data_dir}/geom_conformers_labels.json",
"Protonation": f"{target_data_dir}/protonation_energies_labels.json",
"IE_EA": f"{target_data_dir}/ieea_labels.json",
"Distance scaling": f"{target_data_dir}/distance_scaling_labels.json",
"Spin gap": f"{target_data_dir}/spingap_labels.json",
}
self.result_paths = {
"Validation": "validation_s2ef.parquet",
"Test": "test_s2ef.parquet",
"Ligand pocket": "ligand_pocket.parquet",
"Ligand strain": "ligand_strain.parquet",
"Conformers": "geom_conformers.parquet",
"Protonation": "protonation.parquet",
"IE_EA": "ie_ea.parquet",
"Distance scaling": "distance_scaling.parquet",
"Spin gap": "spin_gap.parquet",
}
def _load_contact_info(self):
"""
Load contact information dataset.
"""
self.contact_infos = load_dataset(
CONTACT_DATASET,
token=TOKEN,
download_mode="force_redownload",
verification_mode=VerificationMode.NO_CHECKS,
)
def load_eval_data(self) -> Tuple[Dict, Dict[str, pd.DataFrame]]:
"""
Load all evaluation data and return results and dataframes.
"""
if self._eval_results is not None and self._results_dfs is not None:
return self._eval_results, self._results_dfs
# Load S2EF results
s2ef_results = load_dataset(
RESULTS_DATASET,
token=TOKEN,
download_mode="force_redownload",
verification_mode=VerificationMode.NO_CHECKS,
data_files={
"Validation": os.path.join("data", self.result_paths["Validation"]),
"Test": os.path.join("data", self.result_paths["Test"]),
},
)
eval_results = dict(s2ef_results)
# Load other evaluation types
for eval_type in OTHER_EVAL_TYPES:
eval_type_data = load_dataset(
RESULTS_DATASET,
token=TOKEN,
download_mode="force_redownload",
verification_mode=VerificationMode.NO_CHECKS,
data_files={"data": os.path.join("data", self.result_paths[eval_type])},
)
eval_results[eval_type] = eval_type_data["data"]
# Generate result dataframes
results_dfs = {}
# S2EF dataframes
for split in ["Validation", "Test"]:
for subsplit in S2EF_SUBSPLITS:
df_key = f"{split}_{subsplit}"
results_dfs[df_key] = self._get_s2ef_df_from_results(
eval_results, split, subsplit
)
# Other evaluation dataframes
for split in OTHER_EVAL_TYPES:
results_dfs[split] = self._get_eval_df_from_results(eval_results, split)
# Cache the results
self._eval_results = eval_results
self._results_dfs = results_dfs
return eval_results, results_dfs
def _get_s2ef_df_from_results(
self, eval_results: Dict, split: str, subsplit: str
) -> pd.DataFrame:
"""
Generate S2EF dataframe from evaluation results.
"""
local_df = eval_results[split]
local_df = local_df.map(
lambda row: {
"Model": model_hyperlink(
row["model_url"], row["paper_url"], row["Model"]
)
}
)
filtered_columns = (
PRE_COLUMN_NAMES
+ [f"{subsplit}_energy_mae", f"{subsplit}_forces_mae"]
+ POST_COLUMN_NAMES
)
df = pd.DataFrame(local_df)
avail_columns = list(df.columns)
missing_columns = list(set(filtered_columns) - set(avail_columns))
df[missing_columns] = ""
df = df[filtered_columns]
# Unit conversion
for col in df.columns:
if "mae" in col.lower():
df[col] = df[col] * 1000
df = df.sort_values(by=[f"{subsplit}_energy_mae"], ascending=True)
df[f"{subsplit}_energy_mae"] = df[f"{subsplit}_energy_mae"]
df[f"{subsplit}_forces_mae"] = df[f"{subsplit}_forces_mae"]
df = df.rename(
columns={
f"{subsplit}_energy_mae": "Energy MAE\n[meV]",
f"{subsplit}_forces_mae": "Forces MAE\n[meV/Å]",
"Energy Conserving": "Energy\nConserving",
}
)
return df
def _get_eval_df_from_results(self, eval_results: Dict, split: str) -> pd.DataFrame:
"""
Generate evaluation dataframe from results.
"""
local_df = eval_results[split]
local_df = local_df.map(
lambda row: {
"Model": model_hyperlink(
row["model_url"], row["paper_url"], row["Model"]
)
}
)
eval_columns = LEADERBOARD_COLUMNS[split]
filtered_columns = PRE_COLUMN_NAMES + eval_columns + POST_COLUMN_NAMES
df = pd.DataFrame(local_df)
# Filter to only show results after 09/2025, keep v1 for record keeping
df = df[df["Submission date"] > "2025-09"]
avail_columns = list(df.columns)
missing_columns = list(set(filtered_columns) - set(avail_columns))
df[missing_columns] = ""
df = df[filtered_columns]
# Unit conversion
for col in df.columns:
if "mae" in col.lower():
df[col] = df[col] * 1000
df = df.sort_values(by=[eval_columns[0]], ascending=True)
df = df.rename(columns=COLUMN_MAPPING)
return df
leaderboard_data = LeaderboardData()
# Column configurations for different evaluation types
LEADERBOARD_COLUMNS = {
"Ligand pocket": ["interaction_energy_mae", "interaction_forces_mae"],
"Ligand strain": ["strain_energy_mae", "global_min_rmsd"],
"Conformers": ["deltaE_mae", "ensemble_rmsd"],
"Protonation": ["deltaE_mae", "rmsd"],
"Distance scaling": ["lr_ddE_mae", "lr_ddF_mae", "sr_ddE_mae", "sr_ddF_mae"],
"IE_EA": ["deltaE_mae", "deltaF_mae"],
"Spin gap": ["deltaE_mae", "deltaF_mae"],
}
COLUMN_MAPPING = {
"interaction_energy_mae": "Ixn Energy\nMAE [meV]",
"interaction_forces_mae": "Ixn Forces\nMAE [meV/Å]",
"strain_energy_mae": "Strain Energy\nMAE [meV]",
"deltaE_mae": "\u0394Energy MAE\n[meV]",
"deltaF_mae": "\u0394Forces MAE\n[meV/Å]",
"ensemble_rmsd": "RMSD [Å]",
"global_min_rmsd": "RMSD [Å]",
"rmsd": "RMSD [Å]",
"lr_ddE_mae": "\u0394Energy (LR)\n MAE [meV]",
"lr_ddF_mae": "\u0394Forces (LR)\n MAE [meV/Å]",
"sr_ddE_mae": "\u0394Energy (SR)\n MAE [meV]",
"sr_ddF_mae": "\u0394Forces (SR)\n MAE [meV/Å]",
"Energy Conserving": "Energy\nConserving",
}
def add_new_eval(
path_to_file: str,
eval_type: str,
organization: str,
model: str,
model_url: str,
paper_url: str,
energy_conserving: bool,
mail: str,
training_set: str,
additional_info: str,
profile: gr.OAuthProfile,
) -> str:
"""Add a new evaluation to the leaderboard."""
print(f"Adding new eval of type: {eval_type}")
try:
# Validate email address
_, parsed_mail = parseaddr(mail)
if "@" not in parsed_mail:
yield "⚠️ Please provide a valid email address."
return
# Check monthly submission limit (5 submissions per month)
contact_key = eval_type.replace(" ", "_")
user_submission_dates = sorted(
row["date"]
for row in leaderboard_data.contact_infos.get(contact_key, [])
if row["username"] == profile.username
)
current_month = datetime.now().strftime("%Y-%m")
current_month_submissions = [
date for date in user_submission_dates if date.startswith(current_month)
]
if len(current_month_submissions) >= 5:
yield f"⚠️ You have reached the monthly submission limit of 5 submissions. Please try again next month."
return
# Validate file submission
if path_to_file is None:
yield "⚠️ Please upload a file."
return
if not (path_to_file.endswith(".npz") or path_to_file.endswith(".json")):
yield "⚠️ Please submit a valid npz or json file"
return
# Evaluate the submission
yield "⚙️ Evaluating your submission...(do not close/refresh this page!)"
metrics = evaluate(
leaderboard_data.target_paths[eval_type],
path_to_file,
eval_type,
)
submission_time = datetime.today().strftime("%Y-%m-%d-%H:%M")
# Upload submission file
yield "☁️ Uploading submission file..."
api.upload_file(
repo_id=SUBMISSION_DATASET,
path_or_fileobj=path_to_file,
path_in_repo=f"{organization}/{model}/submissions/{training_set}/{contact_key}_{submission_time}_{os.path.basename(path_to_file)}",
repo_type="dataset",
token=TOKEN,
)
# Update leaderboard data
yield "📋 Updating leaderboard data..."
eval_results, _ = leaderboard_data.load_eval_data()
eval_entry = {
"Model": model,
"Organization": organization,
"Submission date": submission_time,
"Training Set": training_set,
"Energy Conserving": energy_conserving,
"model_url": model_url,
"paper_url": paper_url,
"Notes": additional_info,
}
eval_entry.update(metrics)
if eval_type not in eval_results:
eval_results[eval_type] = Dataset.from_dict(
{k: [v] for k, v in eval_entry.items()}
)
else:
eval_results[eval_type] = eval_results[eval_type].add_item(eval_entry)
data_file_name = leaderboard_data.result_paths[eval_type]
# Upload results
yield "💾 Saving results to database..."
with tempfile.NamedTemporaryFile(suffix=".parquet") as tmp_file:
eval_results[eval_type].to_parquet(tmp_file.name)
api.upload_file(
repo_id=RESULTS_DATASET,
path_or_fileobj=tmp_file.name,
path_in_repo=f"data/{data_file_name}",
repo_type="dataset",
token=TOKEN,
)
# Save contact information
contact_info = {
"model": model,
"organization": organization,
"username": profile.username,
"email": mail,
"date": submission_time,
}
if contact_key not in leaderboard_data.contact_infos:
leaderboard_data.contact_infos[contact_key] = Dataset.from_dict(
{k: [v] for k, v in contact_info.items()}
)
else:
leaderboard_data.contact_infos[contact_key] = (
leaderboard_data.contact_infos[contact_key].add_item(contact_info)
)
leaderboard_data.contact_infos.push_to_hub(CONTACT_DATASET, token=TOKEN)
success_str = f"✅ Model {model} is successfully evaluated and stored in our database.\nPlease wait an hour and refresh the leaderboard to see your results displayed."
yield success_str
except Exception as e:
print(f"Error during submission: {e}")
yield (
f"An error occurred, please open a discussion/issue if you continue to have submission issues.\n{e}"
)
def transform_time(date_str):
dt = datetime.strptime(date_str, "%Y-%m-%d-%H:%M")
return dt.strftime("%Y-%m-%d")
def create_dataframe_tab(
tab_name: str,
df: pd.DataFrame,
datatype: List[str] = None,
widths: List[str] = None,
) -> gr.Tab:
"""
Create a tab with a dataframe.
"""
if datatype is None:
datatype = TYPES
if widths is None:
num_cols = len(df.columns)
fixed_cols = len(PRE_COLUMN_NAMES) + len(POST_COLUMN_NAMES)
# Model | Organization |Energy Conserving | Training Set | Metrics | date
widths = ["10%", "5%", "5%", "5%"] + ["5%"] * (num_cols - fixed_cols) + ["10%"]
if "Submission date" in df.columns:
df["Submission date"] = df["Submission date"].apply(transform_time)
cm = sns.color_palette("viridis_r", as_cmap=True)
df = df.style.format(precision=2).background_gradient(cmap=cm)
with gr.Tab(tab_name) as tab:
gr.Dataframe(
value=df,
datatype=datatype,
interactive=False,
show_search="filter",
column_widths=widths,
show_copy_button=True,
)
return tab
def create_s2ef_tabs(split: str, results_dfs: Dict[str, pd.DataFrame]) -> None:
"""
Create S2EF tabs for a given split (Validation/Test).
"""
subsplit_names = {
"all": "All",
"biomolecules": "Biomolecules",
"electrolytes": "Electrolytes",
"metal_complexes": "Metal Complexes",
"neutral_organics": "Neutral Organics",
}
for subsplit, display_name in subsplit_names.items():
df_key = f"{split}_{subsplit}"
create_dataframe_tab(display_name, results_dfs[df_key])
def create_evaluation_tabs(results_dfs: Dict[str, pd.DataFrame]) -> None:
"""
Create evaluation tabs for non-S2EF evaluations, including Overview tab.
"""
# Create Overview tab first
overview_df = create_overview_dataframe(results_dfs)
n_overview_columns = len(overview_df.columns)
create_dataframe_tab(
"Overview", overview_df, widths=["15%"] + ["10%"] * (n_overview_columns - 1)
)
# Create individual evaluation tabs
for eval_type in OTHER_EVAL_TYPES:
display_name = "IE/EA" if eval_type == "IE_EA" else eval_type
create_dataframe_tab(display_name, results_dfs[eval_type])
def create_overview_dataframe(results_dfs: Dict[str, pd.DataFrame]) -> pd.DataFrame:
"""
Create an overview dataframe combining all models with only the first metric from each eval type.
"""
model_info = {}
for eval_type, df in results_dfs.items():
if eval_type.startswith("Validation_") or eval_type.startswith("Test_"):
continue
for _, row in df.iterrows():
model_name = row["Model"]
dataset = row["Training Set"]
model_entry = (model_name, dataset)
model_info[model_entry] = {
"Model": model_name,
"Organization": row.get("Organization", ""),
"Energy Conserving": row.get("Energy\nConserving", ""),
"Training Set": dataset,
}
overview_data = {
"Model": [],
"Organization": [],
"Energy\nConserving": [],
"Training Set": [],
}
metric_columns = {}
for eval_type in OTHER_EVAL_TYPES:
if eval_type in results_dfs and eval_type in LEADERBOARD_COLUMNS:
metric_display_name = COLUMN_MAPPING[LEADERBOARD_COLUMNS[eval_type][0]]
task_display_name = "IE/EA" if eval_type == "IE_EA" else eval_type
full_display_name = f"{task_display_name}\n{metric_display_name}"
overview_data[full_display_name] = []
metric_columns[full_display_name] = (eval_type, metric_display_name)
all_model_entries = model_info.keys()
model_rankings = defaultdict(list)
for model_entry in sorted(all_model_entries, key=lambda x: (x[0], x[1])):
model_name, dataset = model_entry
entry_info = model_info[model_entry]
overview_data["Model"].append(entry_info["Model"])
overview_data["Organization"].append(entry_info["Organization"])
overview_data["Energy\nConserving"].append(entry_info["Energy Conserving"])
overview_data["Training Set"].append(entry_info["Training Set"])
# Fill in metrics for each column
for display_col, (eval_type, source_col) in metric_columns.items():
if eval_type in results_dfs:
df = results_dfs[eval_type].reset_index(drop=True)
# Match both model name and training set
model_row = df[
(df["Model"] == model_name) & (df["Training Set"] == dataset)
]
if not model_row.empty and source_col in model_row.columns:
value = model_row.iloc[0][source_col]
rank = model_row.index[0]
else:
value = np.nan
rank = df.shape[0]
overview_data[display_col].append(value)
model_rankings[model_entry].append(rank)
overview_df = pd.DataFrame(overview_data)
def get_rank(row):
model_name = row["Model"]
dataset = row["Training Set"]
rank = np.mean(model_rankings[(model_name, dataset)])
return rank
overview_df["overall_rank"] = overview_df.apply(get_rank, axis=1)
overview_df = overview_df.sort_values(by="overall_rank").drop(
columns=["overall_rank"]
)
return overview_df
def create_submission_interface() -> Tuple[gr.components.Component, ...]:
"""
Create the submission interface components.
"""
with gr.Accordion("Submit predictions for evaluation"):
with gr.Row():
gr.Markdown(SUBMISSION_TEXT, elem_classes="markdown-text")
with gr.Row():
with gr.Column():
model_name_textbox = gr.Textbox(label="Model name")
energy_conserving = gr.Checkbox(
label="Is the model energy conserving? (i.e. F= -dE/dx)"
)
model_url = gr.Textbox(label="Model/Checkpoint URL")
paper_url = gr.Textbox(label="Paper URL")
dataset = gr.Dropdown(
choices=["OMol-102M", "OMol-4M", "UMA-459M", "Other"],
label="Training set",
interactive=True,
)
additional_info = gr.Textbox(
label="Additional info (cutoff radius, # of params, etc.)"
)
organization = gr.Textbox(label="Organization")
mail = gr.Textbox(
label="Contact email (will be stored privately, & used if there is an issue with your submission)"
)
with gr.Column():
file_output = gr.File()
with gr.Row():
eval_type = gr.Dropdown(
choices=ALL_EVAL_TYPES,
label="Eval Type",
interactive=True,
)
with gr.Column():
gr.LoginButton()
with gr.Column():
submit_button = gr.Button("Submit Eval")
submission_result = gr.Textbox(label="Status")
return (
submit_button,
file_output,
eval_type,
organization,
model_name_textbox,
model_url,
paper_url,
energy_conserving,
mail,
dataset,
additional_info,
submission_result,
)
def create_interface() -> gr.Blocks:
"""
Create the complete Gradio interface.
"""
# Load data
_, results_dfs = leaderboard_data.load_eval_data()
demo = gr.Blocks()
with demo:
gr.HTML(TITLE)
gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text")
# Citation section
with gr.Row():
with gr.Accordion("📙 Citation", open=False):
gr.Markdown(CITATION_BUTTON_LABEL)
gr.Markdown(CITATION_BUTTON_TEXT)
# Evaluation results
gr.Markdown("## Evaluations", elem_classes="markdown-text")
with gr.Row():
create_evaluation_tabs(results_dfs)
gr.Markdown(
"**Overview rankings based on average rank across all evaluations",
elem_classes="markdown-text",
)
# S2EF Results tabs
gr.Markdown("## S2EF", elem_classes="markdown-text")
with gr.Tab("Test"):
create_s2ef_tabs("Test", results_dfs)
with gr.Tab("Validation"):
create_s2ef_tabs("Validation", results_dfs)
(
submit_button,
file_output,
eval_type,
organization,
model_name_textbox,
model_url,
paper_url,
energy_conserving,
mail,
dataset,
additional_info,
submission_result,
) = create_submission_interface()
submit_button.click(
add_new_eval,
[
file_output,
eval_type,
organization,
model_name_textbox,
model_url,
paper_url,
energy_conserving,
mail,
dataset,
additional_info,
],
submission_result,
)
return demo
def restart_space():
api.restart_space(repo_id=LEADERBOARD_PATH, token=TOKEN)
def main():
demo = create_interface()
scheduler = BackgroundScheduler()
scheduler.add_job(restart_space, "interval", seconds=3600)
scheduler.start()
# Launch the demo
demo.launch(debug=True)
if __name__ == "__main__":
main()