Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import json | |
import os | |
import tempfile | |
from email.utils import parseaddr | |
from datetime import datetime | |
from typing import Dict, List, Tuple, Optional | |
import gradio as gr | |
import numpy as np | |
import pandas as pd | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from datasets import VerificationMode, load_dataset, Dataset | |
from huggingface_hub import HfApi, snapshot_download | |
from collections import defaultdict | |
import seaborn as sns | |
from content import ( | |
CITATION_BUTTON_LABEL, | |
CITATION_BUTTON_TEXT, | |
INTRODUCTION_TEXT, | |
SUBMISSION_TEXT, | |
PRE_COLUMN_NAMES, | |
POST_COLUMN_NAMES, | |
TITLE, | |
TYPES, | |
model_hyperlink, | |
) | |
from evaluator import evaluate | |
# Configuration constants | |
TOKEN = os.environ.get("TOKEN", None) | |
OWNER = "facebook" | |
# Dataset repositories | |
INTERNAL_DATA_DATASET = f"{OWNER}/fairchem_internal" | |
SUBMISSION_DATASET = f"{OWNER}/fairchem_leaderboard_submissions" | |
RESULTS_DATASET = f"{OWNER}/fairchem_leaderboard_results" | |
CONTACT_DATASET = f"{OWNER}/fairchem_leaderboard_contact_info_internal" | |
LEADERBOARD_PATH = f"{OWNER}/fairchem_leaderboard" | |
# Initialize HuggingFace API | |
api = HfApi() | |
# S2EF subsplits for validation and test data | |
S2EF_SUBSPLITS = [ | |
"all", | |
"biomolecules", | |
"electrolytes", | |
"metal_complexes", | |
"neutral_organics", | |
] | |
# Evaluation types that are not S2EF | |
OTHER_EVAL_TYPES = [ | |
"Ligand pocket", | |
"Ligand strain", | |
"Conformers", | |
"Protonation", | |
"Distance scaling", | |
"IE_EA", | |
"Spin gap", | |
] | |
# All evaluation types for the dropdown | |
ALL_EVAL_TYPES = ["Validation", "Test"] + OTHER_EVAL_TYPES | |
class LeaderboardData: | |
""" | |
Manages leaderboard data loading and processing. | |
""" | |
def __init__(self): | |
self._setup_data_paths() | |
self._load_contact_info() | |
self._eval_results = None | |
self._results_dfs = None | |
def _setup_data_paths(self): | |
""" | |
Setup target and result file paths. | |
""" | |
target_data_dir = snapshot_download( | |
repo_id=INTERNAL_DATA_DATASET, | |
repo_type="dataset", | |
token=TOKEN, | |
) | |
self.target_paths = { | |
"Validation": f"{target_data_dir}/omol_val_labels.npz", | |
"Test": f"{target_data_dir}/omol_test_labels.npz", | |
"Distance Scaling": f"{target_data_dir}/distance_scaling_labels.json", | |
"Ligand pocket": f"{target_data_dir}/ligand_pocket_labels.json", | |
"Ligand strain": f"{target_data_dir}/ligand_strain_labels.json", | |
"Conformers": f"{target_data_dir}/geom_conformers_labels.json", | |
"Protonation": f"{target_data_dir}/protonation_energies_labels.json", | |
"IE_EA": f"{target_data_dir}/ieea_labels.json", | |
"Distance scaling": f"{target_data_dir}/distance_scaling_labels.json", | |
"Spin gap": f"{target_data_dir}/spingap_labels.json", | |
} | |
self.result_paths = { | |
"Validation": "validation_s2ef.parquet", | |
"Test": "test_s2ef.parquet", | |
"Ligand pocket": "ligand_pocket.parquet", | |
"Ligand strain": "ligand_strain.parquet", | |
"Conformers": "geom_conformers.parquet", | |
"Protonation": "protonation.parquet", | |
"IE_EA": "ie_ea.parquet", | |
"Distance scaling": "distance_scaling.parquet", | |
"Spin gap": "spin_gap.parquet", | |
} | |
def _load_contact_info(self): | |
""" | |
Load contact information dataset. | |
""" | |
self.contact_infos = load_dataset( | |
CONTACT_DATASET, | |
token=TOKEN, | |
download_mode="force_redownload", | |
verification_mode=VerificationMode.NO_CHECKS, | |
) | |
def load_eval_data(self) -> Tuple[Dict, Dict[str, pd.DataFrame]]: | |
""" | |
Load all evaluation data and return results and dataframes. | |
""" | |
if self._eval_results is not None and self._results_dfs is not None: | |
return self._eval_results, self._results_dfs | |
# Load S2EF results | |
s2ef_results = load_dataset( | |
RESULTS_DATASET, | |
token=TOKEN, | |
download_mode="force_redownload", | |
verification_mode=VerificationMode.NO_CHECKS, | |
data_files={ | |
"Validation": os.path.join("data", self.result_paths["Validation"]), | |
"Test": os.path.join("data", self.result_paths["Test"]), | |
}, | |
) | |
eval_results = dict(s2ef_results) | |
# Load other evaluation types | |
for eval_type in OTHER_EVAL_TYPES: | |
eval_type_data = load_dataset( | |
RESULTS_DATASET, | |
token=TOKEN, | |
download_mode="force_redownload", | |
verification_mode=VerificationMode.NO_CHECKS, | |
data_files={"data": os.path.join("data", self.result_paths[eval_type])}, | |
) | |
eval_results[eval_type] = eval_type_data["data"] | |
# Generate result dataframes | |
results_dfs = {} | |
# S2EF dataframes | |
for split in ["Validation", "Test"]: | |
for subsplit in S2EF_SUBSPLITS: | |
df_key = f"{split}_{subsplit}" | |
results_dfs[df_key] = self._get_s2ef_df_from_results( | |
eval_results, split, subsplit | |
) | |
# Other evaluation dataframes | |
for split in OTHER_EVAL_TYPES: | |
results_dfs[split] = self._get_eval_df_from_results(eval_results, split) | |
# Cache the results | |
self._eval_results = eval_results | |
self._results_dfs = results_dfs | |
return eval_results, results_dfs | |
def _get_s2ef_df_from_results( | |
self, eval_results: Dict, split: str, subsplit: str | |
) -> pd.DataFrame: | |
""" | |
Generate S2EF dataframe from evaluation results. | |
""" | |
local_df = eval_results[split] | |
local_df = local_df.map( | |
lambda row: { | |
"Model": model_hyperlink( | |
row["model_url"], row["paper_url"], row["Model"] | |
) | |
} | |
) | |
filtered_columns = ( | |
PRE_COLUMN_NAMES | |
+ [f"{subsplit}_energy_mae", f"{subsplit}_forces_mae"] | |
+ POST_COLUMN_NAMES | |
) | |
df = pd.DataFrame(local_df) | |
avail_columns = list(df.columns) | |
missing_columns = list(set(filtered_columns) - set(avail_columns)) | |
df[missing_columns] = "" | |
df = df[filtered_columns] | |
# Unit conversion | |
for col in df.columns: | |
if "mae" in col.lower(): | |
df[col] = df[col] * 1000 | |
df = df.sort_values(by=[f"{subsplit}_energy_mae"], ascending=True) | |
df[f"{subsplit}_energy_mae"] = df[f"{subsplit}_energy_mae"] | |
df[f"{subsplit}_forces_mae"] = df[f"{subsplit}_forces_mae"] | |
df = df.rename( | |
columns={ | |
f"{subsplit}_energy_mae": "Energy MAE\n[meV]", | |
f"{subsplit}_forces_mae": "Forces MAE\n[meV/Å]", | |
"Energy Conserving": "Energy\nConserving", | |
} | |
) | |
return df | |
def _get_eval_df_from_results(self, eval_results: Dict, split: str) -> pd.DataFrame: | |
""" | |
Generate evaluation dataframe from results. | |
""" | |
local_df = eval_results[split] | |
local_df = local_df.map( | |
lambda row: { | |
"Model": model_hyperlink( | |
row["model_url"], row["paper_url"], row["Model"] | |
) | |
} | |
) | |
eval_columns = LEADERBOARD_COLUMNS[split] | |
filtered_columns = PRE_COLUMN_NAMES + eval_columns + POST_COLUMN_NAMES | |
df = pd.DataFrame(local_df) | |
# Filter to only show results after 09/2025, keep v1 for record keeping | |
df = df[df["Submission date"] > "2025-09"] | |
avail_columns = list(df.columns) | |
missing_columns = list(set(filtered_columns) - set(avail_columns)) | |
df[missing_columns] = "" | |
df = df[filtered_columns] | |
# Unit conversion | |
for col in df.columns: | |
if "mae" in col.lower(): | |
df[col] = df[col] * 1000 | |
df = df.sort_values(by=[eval_columns[0]], ascending=True) | |
df = df.rename(columns=COLUMN_MAPPING) | |
return df | |
leaderboard_data = LeaderboardData() | |
# Column configurations for different evaluation types | |
LEADERBOARD_COLUMNS = { | |
"Ligand pocket": ["interaction_energy_mae", "interaction_forces_mae"], | |
"Ligand strain": ["strain_energy_mae", "global_min_rmsd"], | |
"Conformers": ["deltaE_mae", "ensemble_rmsd"], | |
"Protonation": ["deltaE_mae", "rmsd"], | |
"Distance scaling": ["lr_ddE_mae", "lr_ddF_mae", "sr_ddE_mae", "sr_ddF_mae"], | |
"IE_EA": ["deltaE_mae", "deltaF_mae"], | |
"Spin gap": ["deltaE_mae", "deltaF_mae"], | |
} | |
COLUMN_MAPPING = { | |
"interaction_energy_mae": "Ixn Energy\nMAE [meV]", | |
"interaction_forces_mae": "Ixn Forces\nMAE [meV/Å]", | |
"strain_energy_mae": "Strain Energy\nMAE [meV]", | |
"deltaE_mae": "\u0394Energy MAE\n[meV]", | |
"deltaF_mae": "\u0394Forces MAE\n[meV/Å]", | |
"ensemble_rmsd": "RMSD [Å]", | |
"global_min_rmsd": "RMSD [Å]", | |
"rmsd": "RMSD [Å]", | |
"lr_ddE_mae": "\u0394Energy (LR)\n MAE [meV]", | |
"lr_ddF_mae": "\u0394Forces (LR)\n MAE [meV/Å]", | |
"sr_ddE_mae": "\u0394Energy (SR)\n MAE [meV]", | |
"sr_ddF_mae": "\u0394Forces (SR)\n MAE [meV/Å]", | |
"Energy Conserving": "Energy\nConserving", | |
} | |
def add_new_eval( | |
path_to_file: str, | |
eval_type: str, | |
organization: str, | |
model: str, | |
model_url: str, | |
paper_url: str, | |
energy_conserving: bool, | |
mail: str, | |
training_set: str, | |
additional_info: str, | |
profile: gr.OAuthProfile, | |
) -> str: | |
"""Add a new evaluation to the leaderboard.""" | |
print(f"Adding new eval of type: {eval_type}") | |
try: | |
# Validate email address | |
_, parsed_mail = parseaddr(mail) | |
if "@" not in parsed_mail: | |
yield "⚠️ Please provide a valid email address." | |
return | |
# Check monthly submission limit (5 submissions per month) | |
contact_key = eval_type.replace(" ", "_") | |
user_submission_dates = sorted( | |
row["date"] | |
for row in leaderboard_data.contact_infos.get(contact_key, []) | |
if row["username"] == profile.username | |
) | |
current_month = datetime.now().strftime("%Y-%m") | |
current_month_submissions = [ | |
date for date in user_submission_dates if date.startswith(current_month) | |
] | |
if len(current_month_submissions) >= 5: | |
yield f"⚠️ You have reached the monthly submission limit of 5 submissions. Please try again next month." | |
return | |
# Validate file submission | |
if path_to_file is None: | |
yield "⚠️ Please upload a file." | |
return | |
if not (path_to_file.endswith(".npz") or path_to_file.endswith(".json")): | |
yield "⚠️ Please submit a valid npz or json file" | |
return | |
# Evaluate the submission | |
yield "⚙️ Evaluating your submission...(do not close/refresh this page!)" | |
metrics = evaluate( | |
leaderboard_data.target_paths[eval_type], | |
path_to_file, | |
eval_type, | |
) | |
submission_time = datetime.today().strftime("%Y-%m-%d-%H:%M") | |
# Upload submission file | |
yield "☁️ Uploading submission file..." | |
api.upload_file( | |
repo_id=SUBMISSION_DATASET, | |
path_or_fileobj=path_to_file, | |
path_in_repo=f"{organization}/{model}/submissions/{training_set}/{contact_key}_{submission_time}_{os.path.basename(path_to_file)}", | |
repo_type="dataset", | |
token=TOKEN, | |
) | |
# Update leaderboard data | |
yield "📋 Updating leaderboard data..." | |
eval_results, _ = leaderboard_data.load_eval_data() | |
eval_entry = { | |
"Model": model, | |
"Organization": organization, | |
"Submission date": submission_time, | |
"Training Set": training_set, | |
"Energy Conserving": energy_conserving, | |
"model_url": model_url, | |
"paper_url": paper_url, | |
"Notes": additional_info, | |
} | |
eval_entry.update(metrics) | |
if eval_type not in eval_results: | |
eval_results[eval_type] = Dataset.from_dict( | |
{k: [v] for k, v in eval_entry.items()} | |
) | |
else: | |
eval_results[eval_type] = eval_results[eval_type].add_item(eval_entry) | |
data_file_name = leaderboard_data.result_paths[eval_type] | |
# Upload results | |
yield "💾 Saving results to database..." | |
with tempfile.NamedTemporaryFile(suffix=".parquet") as tmp_file: | |
eval_results[eval_type].to_parquet(tmp_file.name) | |
api.upload_file( | |
repo_id=RESULTS_DATASET, | |
path_or_fileobj=tmp_file.name, | |
path_in_repo=f"data/{data_file_name}", | |
repo_type="dataset", | |
token=TOKEN, | |
) | |
# Save contact information | |
contact_info = { | |
"model": model, | |
"organization": organization, | |
"username": profile.username, | |
"email": mail, | |
"date": submission_time, | |
} | |
if contact_key not in leaderboard_data.contact_infos: | |
leaderboard_data.contact_infos[contact_key] = Dataset.from_dict( | |
{k: [v] for k, v in contact_info.items()} | |
) | |
else: | |
leaderboard_data.contact_infos[contact_key] = ( | |
leaderboard_data.contact_infos[contact_key].add_item(contact_info) | |
) | |
leaderboard_data.contact_infos.push_to_hub(CONTACT_DATASET, token=TOKEN) | |
success_str = f"✅ Model {model} is successfully evaluated and stored in our database.\nPlease wait an hour and refresh the leaderboard to see your results displayed." | |
yield success_str | |
except Exception as e: | |
print(f"Error during submission: {e}") | |
yield ( | |
f"An error occurred, please open a discussion/issue if you continue to have submission issues.\n{e}" | |
) | |
def transform_time(date_str): | |
dt = datetime.strptime(date_str, "%Y-%m-%d-%H:%M") | |
return dt.strftime("%Y-%m-%d") | |
def create_dataframe_tab( | |
tab_name: str, | |
df: pd.DataFrame, | |
datatype: List[str] = None, | |
widths: List[str] = None, | |
) -> gr.Tab: | |
""" | |
Create a tab with a dataframe. | |
""" | |
if datatype is None: | |
datatype = TYPES | |
if widths is None: | |
num_cols = len(df.columns) | |
fixed_cols = len(PRE_COLUMN_NAMES) + len(POST_COLUMN_NAMES) | |
# Model | Organization |Energy Conserving | Training Set | Metrics | date | |
widths = ["10%", "5%", "5%", "5%"] + ["5%"] * (num_cols - fixed_cols) + ["10%"] | |
if "Submission date" in df.columns: | |
df["Submission date"] = df["Submission date"].apply(transform_time) | |
cm = sns.color_palette("viridis_r", as_cmap=True) | |
df = df.style.format(precision=2).background_gradient(cmap=cm) | |
with gr.Tab(tab_name) as tab: | |
gr.Dataframe( | |
value=df, | |
datatype=datatype, | |
interactive=False, | |
show_search="filter", | |
column_widths=widths, | |
show_copy_button=True, | |
) | |
return tab | |
def create_s2ef_tabs(split: str, results_dfs: Dict[str, pd.DataFrame]) -> None: | |
""" | |
Create S2EF tabs for a given split (Validation/Test). | |
""" | |
subsplit_names = { | |
"all": "All", | |
"biomolecules": "Biomolecules", | |
"electrolytes": "Electrolytes", | |
"metal_complexes": "Metal Complexes", | |
"neutral_organics": "Neutral Organics", | |
} | |
for subsplit, display_name in subsplit_names.items(): | |
df_key = f"{split}_{subsplit}" | |
create_dataframe_tab(display_name, results_dfs[df_key]) | |
def create_evaluation_tabs(results_dfs: Dict[str, pd.DataFrame]) -> None: | |
""" | |
Create evaluation tabs for non-S2EF evaluations, including Overview tab. | |
""" | |
# Create Overview tab first | |
overview_df = create_overview_dataframe(results_dfs) | |
n_overview_columns = len(overview_df.columns) | |
create_dataframe_tab( | |
"Overview", overview_df, widths=["15%"] + ["10%"] * (n_overview_columns - 1) | |
) | |
# Create individual evaluation tabs | |
for eval_type in OTHER_EVAL_TYPES: | |
display_name = "IE/EA" if eval_type == "IE_EA" else eval_type | |
create_dataframe_tab(display_name, results_dfs[eval_type]) | |
def create_overview_dataframe(results_dfs: Dict[str, pd.DataFrame]) -> pd.DataFrame: | |
""" | |
Create an overview dataframe combining all models with only the first metric from each eval type. | |
""" | |
model_info = {} | |
for eval_type, df in results_dfs.items(): | |
if eval_type.startswith("Validation_") or eval_type.startswith("Test_"): | |
continue | |
for _, row in df.iterrows(): | |
model_name = row["Model"] | |
dataset = row["Training Set"] | |
model_entry = (model_name, dataset) | |
model_info[model_entry] = { | |
"Model": model_name, | |
"Organization": row.get("Organization", ""), | |
"Energy Conserving": row.get("Energy\nConserving", ""), | |
"Training Set": dataset, | |
} | |
overview_data = { | |
"Model": [], | |
"Organization": [], | |
"Energy\nConserving": [], | |
"Training Set": [], | |
} | |
metric_columns = {} | |
for eval_type in OTHER_EVAL_TYPES: | |
if eval_type in results_dfs and eval_type in LEADERBOARD_COLUMNS: | |
metric_display_name = COLUMN_MAPPING[LEADERBOARD_COLUMNS[eval_type][0]] | |
task_display_name = "IE/EA" if eval_type == "IE_EA" else eval_type | |
full_display_name = f"{task_display_name}\n{metric_display_name}" | |
overview_data[full_display_name] = [] | |
metric_columns[full_display_name] = (eval_type, metric_display_name) | |
all_model_entries = model_info.keys() | |
model_rankings = defaultdict(list) | |
for model_entry in sorted(all_model_entries, key=lambda x: (x[0], x[1])): | |
model_name, dataset = model_entry | |
entry_info = model_info[model_entry] | |
overview_data["Model"].append(entry_info["Model"]) | |
overview_data["Organization"].append(entry_info["Organization"]) | |
overview_data["Energy\nConserving"].append(entry_info["Energy Conserving"]) | |
overview_data["Training Set"].append(entry_info["Training Set"]) | |
# Fill in metrics for each column | |
for display_col, (eval_type, source_col) in metric_columns.items(): | |
if eval_type in results_dfs: | |
df = results_dfs[eval_type].reset_index(drop=True) | |
# Match both model name and training set | |
model_row = df[ | |
(df["Model"] == model_name) & (df["Training Set"] == dataset) | |
] | |
if not model_row.empty and source_col in model_row.columns: | |
value = model_row.iloc[0][source_col] | |
rank = model_row.index[0] | |
else: | |
value = np.nan | |
rank = df.shape[0] | |
overview_data[display_col].append(value) | |
model_rankings[model_entry].append(rank) | |
overview_df = pd.DataFrame(overview_data) | |
def get_rank(row): | |
model_name = row["Model"] | |
dataset = row["Training Set"] | |
rank = np.mean(model_rankings[(model_name, dataset)]) | |
return rank | |
overview_df["overall_rank"] = overview_df.apply(get_rank, axis=1) | |
overview_df = overview_df.sort_values(by="overall_rank").drop( | |
columns=["overall_rank"] | |
) | |
return overview_df | |
def create_submission_interface() -> Tuple[gr.components.Component, ...]: | |
""" | |
Create the submission interface components. | |
""" | |
with gr.Accordion("Submit predictions for evaluation"): | |
with gr.Row(): | |
gr.Markdown(SUBMISSION_TEXT, elem_classes="markdown-text") | |
with gr.Row(): | |
with gr.Column(): | |
model_name_textbox = gr.Textbox(label="Model name") | |
energy_conserving = gr.Checkbox( | |
label="Is the model energy conserving? (i.e. F= -dE/dx)" | |
) | |
model_url = gr.Textbox(label="Model/Checkpoint URL") | |
paper_url = gr.Textbox(label="Paper URL") | |
dataset = gr.Dropdown( | |
choices=["OMol-102M", "OMol-4M", "UMA-459M", "Other"], | |
label="Training set", | |
interactive=True, | |
) | |
additional_info = gr.Textbox( | |
label="Additional info (cutoff radius, # of params, etc.)" | |
) | |
organization = gr.Textbox(label="Organization") | |
mail = gr.Textbox( | |
label="Contact email (will be stored privately, & used if there is an issue with your submission)" | |
) | |
with gr.Column(): | |
file_output = gr.File() | |
with gr.Row(): | |
eval_type = gr.Dropdown( | |
choices=ALL_EVAL_TYPES, | |
label="Eval Type", | |
interactive=True, | |
) | |
with gr.Column(): | |
gr.LoginButton() | |
with gr.Column(): | |
submit_button = gr.Button("Submit Eval") | |
submission_result = gr.Textbox(label="Status") | |
return ( | |
submit_button, | |
file_output, | |
eval_type, | |
organization, | |
model_name_textbox, | |
model_url, | |
paper_url, | |
energy_conserving, | |
mail, | |
dataset, | |
additional_info, | |
submission_result, | |
) | |
def create_interface() -> gr.Blocks: | |
""" | |
Create the complete Gradio interface. | |
""" | |
# Load data | |
_, results_dfs = leaderboard_data.load_eval_data() | |
demo = gr.Blocks() | |
with demo: | |
gr.HTML(TITLE) | |
gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text") | |
# Citation section | |
with gr.Row(): | |
with gr.Accordion("📙 Citation", open=False): | |
gr.Markdown(CITATION_BUTTON_LABEL) | |
gr.Markdown(CITATION_BUTTON_TEXT) | |
# Evaluation results | |
gr.Markdown("## Evaluations", elem_classes="markdown-text") | |
with gr.Row(): | |
create_evaluation_tabs(results_dfs) | |
gr.Markdown( | |
"**Overview rankings based on average rank across all evaluations", | |
elem_classes="markdown-text", | |
) | |
# S2EF Results tabs | |
gr.Markdown("## S2EF", elem_classes="markdown-text") | |
with gr.Tab("Test"): | |
create_s2ef_tabs("Test", results_dfs) | |
with gr.Tab("Validation"): | |
create_s2ef_tabs("Validation", results_dfs) | |
( | |
submit_button, | |
file_output, | |
eval_type, | |
organization, | |
model_name_textbox, | |
model_url, | |
paper_url, | |
energy_conserving, | |
mail, | |
dataset, | |
additional_info, | |
submission_result, | |
) = create_submission_interface() | |
submit_button.click( | |
add_new_eval, | |
[ | |
file_output, | |
eval_type, | |
organization, | |
model_name_textbox, | |
model_url, | |
paper_url, | |
energy_conserving, | |
mail, | |
dataset, | |
additional_info, | |
], | |
submission_result, | |
) | |
return demo | |
def restart_space(): | |
api.restart_space(repo_id=LEADERBOARD_PATH, token=TOKEN) | |
def main(): | |
demo = create_interface() | |
scheduler = BackgroundScheduler() | |
scheduler.add_job(restart_space, "interval", seconds=3600) | |
scheduler.start() | |
# Launch the demo | |
demo.launch(debug=True) | |
if __name__ == "__main__": | |
main() | |