import json import os import tempfile from email.utils import parseaddr from datetime import datetime from typing import Dict, List, Tuple, Optional import gradio as gr import numpy as np import pandas as pd from apscheduler.schedulers.background import BackgroundScheduler from datasets import VerificationMode, load_dataset, Dataset from huggingface_hub import HfApi, snapshot_download from collections import defaultdict import seaborn as sns from content import ( CITATION_BUTTON_LABEL, CITATION_BUTTON_TEXT, INTRODUCTION_TEXT, SUBMISSION_TEXT, PRE_COLUMN_NAMES, POST_COLUMN_NAMES, TITLE, TYPES, model_hyperlink, ) from evaluator import evaluate # Configuration constants TOKEN = os.environ.get("TOKEN", None) OWNER = "facebook" # Dataset repositories INTERNAL_DATA_DATASET = f"{OWNER}/fairchem_internal" SUBMISSION_DATASET = f"{OWNER}/fairchem_leaderboard_submissions" RESULTS_DATASET = f"{OWNER}/fairchem_leaderboard_results" CONTACT_DATASET = f"{OWNER}/fairchem_leaderboard_contact_info_internal" LEADERBOARD_PATH = f"{OWNER}/fairchem_leaderboard" # Initialize HuggingFace API api = HfApi() # S2EF subsplits for validation and test data S2EF_SUBSPLITS = [ "all", "biomolecules", "electrolytes", "metal_complexes", "neutral_organics", ] # Evaluation types that are not S2EF OTHER_EVAL_TYPES = [ "Ligand pocket", "Ligand strain", "Conformers", "Protonation", "Distance scaling", "IE_EA", "Spin gap", ] # All evaluation types for the dropdown ALL_EVAL_TYPES = ["Validation", "Test"] + OTHER_EVAL_TYPES class LeaderboardData: """ Manages leaderboard data loading and processing. """ def __init__(self): self._setup_data_paths() self._load_contact_info() self._eval_results = None self._results_dfs = None def _setup_data_paths(self): """ Setup target and result file paths. """ target_data_dir = snapshot_download( repo_id=INTERNAL_DATA_DATASET, repo_type="dataset", token=TOKEN, ) self.target_paths = { "Validation": f"{target_data_dir}/omol_val_labels.npz", "Test": f"{target_data_dir}/omol_test_labels.npz", "Distance Scaling": f"{target_data_dir}/distance_scaling_labels.json", "Ligand pocket": f"{target_data_dir}/ligand_pocket_labels.json", "Ligand strain": f"{target_data_dir}/ligand_strain_labels.json", "Conformers": f"{target_data_dir}/geom_conformers_labels.json", "Protonation": f"{target_data_dir}/protonation_energies_labels.json", "IE_EA": f"{target_data_dir}/ieea_labels.json", "Distance scaling": f"{target_data_dir}/distance_scaling_labels.json", "Spin gap": f"{target_data_dir}/spingap_labels.json", } self.result_paths = { "Validation": "validation_s2ef.parquet", "Test": "test_s2ef.parquet", "Ligand pocket": "ligand_pocket.parquet", "Ligand strain": "ligand_strain.parquet", "Conformers": "geom_conformers.parquet", "Protonation": "protonation.parquet", "IE_EA": "ie_ea.parquet", "Distance scaling": "distance_scaling.parquet", "Spin gap": "spin_gap.parquet", } def _load_contact_info(self): """ Load contact information dataset. """ self.contact_infos = load_dataset( CONTACT_DATASET, token=TOKEN, download_mode="force_redownload", verification_mode=VerificationMode.NO_CHECKS, ) def load_eval_data(self) -> Tuple[Dict, Dict[str, pd.DataFrame]]: """ Load all evaluation data and return results and dataframes. """ if self._eval_results is not None and self._results_dfs is not None: return self._eval_results, self._results_dfs # Load S2EF results s2ef_results = load_dataset( RESULTS_DATASET, token=TOKEN, download_mode="force_redownload", verification_mode=VerificationMode.NO_CHECKS, data_files={ "Validation": os.path.join("data", self.result_paths["Validation"]), "Test": os.path.join("data", self.result_paths["Test"]), }, ) eval_results = dict(s2ef_results) # Load other evaluation types for eval_type in OTHER_EVAL_TYPES: eval_type_data = load_dataset( RESULTS_DATASET, token=TOKEN, download_mode="force_redownload", verification_mode=VerificationMode.NO_CHECKS, data_files={"data": os.path.join("data", self.result_paths[eval_type])}, ) eval_results[eval_type] = eval_type_data["data"] # Generate result dataframes results_dfs = {} # S2EF dataframes for split in ["Validation", "Test"]: for subsplit in S2EF_SUBSPLITS: df_key = f"{split}_{subsplit}" results_dfs[df_key] = self._get_s2ef_df_from_results( eval_results, split, subsplit ) # Other evaluation dataframes for split in OTHER_EVAL_TYPES: results_dfs[split] = self._get_eval_df_from_results(eval_results, split) # Cache the results self._eval_results = eval_results self._results_dfs = results_dfs return eval_results, results_dfs def _get_s2ef_df_from_results( self, eval_results: Dict, split: str, subsplit: str ) -> pd.DataFrame: """ Generate S2EF dataframe from evaluation results. """ local_df = eval_results[split] local_df = local_df.map( lambda row: { "Model": model_hyperlink( row["model_url"], row["paper_url"], row["Model"] ) } ) filtered_columns = ( PRE_COLUMN_NAMES + [f"{subsplit}_energy_mae", f"{subsplit}_forces_mae"] + POST_COLUMN_NAMES ) df = pd.DataFrame(local_df) avail_columns = list(df.columns) missing_columns = list(set(filtered_columns) - set(avail_columns)) df[missing_columns] = "" df = df[filtered_columns] # Unit conversion for col in df.columns: if "mae" in col.lower(): df[col] = df[col] * 1000 df = df.sort_values(by=[f"{subsplit}_energy_mae"], ascending=True) df[f"{subsplit}_energy_mae"] = df[f"{subsplit}_energy_mae"] df[f"{subsplit}_forces_mae"] = df[f"{subsplit}_forces_mae"] df = df.rename( columns={ f"{subsplit}_energy_mae": "Energy MAE\n[meV]", f"{subsplit}_forces_mae": "Forces MAE\n[meV/Å]", "Energy Conserving": "Energy\nConserving", } ) return df def _get_eval_df_from_results(self, eval_results: Dict, split: str) -> pd.DataFrame: """ Generate evaluation dataframe from results. """ local_df = eval_results[split] local_df = local_df.map( lambda row: { "Model": model_hyperlink( row["model_url"], row["paper_url"], row["Model"] ) } ) eval_columns = LEADERBOARD_COLUMNS[split] filtered_columns = PRE_COLUMN_NAMES + eval_columns + POST_COLUMN_NAMES df = pd.DataFrame(local_df) # Filter to only show results after 09/2025, keep v1 for record keeping df = df[df["Submission date"] > "2025-09"] avail_columns = list(df.columns) missing_columns = list(set(filtered_columns) - set(avail_columns)) df[missing_columns] = "" df = df[filtered_columns] # Unit conversion for col in df.columns: if "mae" in col.lower(): df[col] = df[col] * 1000 df = df.sort_values(by=[eval_columns[0]], ascending=True) df = df.rename(columns=COLUMN_MAPPING) return df leaderboard_data = LeaderboardData() # Column configurations for different evaluation types LEADERBOARD_COLUMNS = { "Ligand pocket": ["interaction_energy_mae", "interaction_forces_mae"], "Ligand strain": ["strain_energy_mae", "global_min_rmsd"], "Conformers": ["deltaE_mae", "ensemble_rmsd"], "Protonation": ["deltaE_mae", "rmsd"], "Distance scaling": ["lr_ddE_mae", "lr_ddF_mae", "sr_ddE_mae", "sr_ddF_mae"], "IE_EA": ["deltaE_mae", "deltaF_mae"], "Spin gap": ["deltaE_mae", "deltaF_mae"], } COLUMN_MAPPING = { "interaction_energy_mae": "Ixn Energy\nMAE [meV]", "interaction_forces_mae": "Ixn Forces\nMAE [meV/Å]", "strain_energy_mae": "Strain Energy\nMAE [meV]", "deltaE_mae": "\u0394Energy MAE\n[meV]", "deltaF_mae": "\u0394Forces MAE\n[meV/Å]", "ensemble_rmsd": "RMSD [Å]", "global_min_rmsd": "RMSD [Å]", "rmsd": "RMSD [Å]", "lr_ddE_mae": "\u0394Energy (LR)\n MAE [meV]", "lr_ddF_mae": "\u0394Forces (LR)\n MAE [meV/Å]", "sr_ddE_mae": "\u0394Energy (SR)\n MAE [meV]", "sr_ddF_mae": "\u0394Forces (SR)\n MAE [meV/Å]", "Energy Conserving": "Energy\nConserving", } def add_new_eval( path_to_file: str, eval_type: str, organization: str, model: str, model_url: str, paper_url: str, energy_conserving: bool, mail: str, training_set: str, additional_info: str, profile: gr.OAuthProfile, ) -> str: """Add a new evaluation to the leaderboard.""" print(f"Adding new eval of type: {eval_type}") try: # Validate email address _, parsed_mail = parseaddr(mail) if "@" not in parsed_mail: yield "⚠️ Please provide a valid email address." return # Check monthly submission limit (5 submissions per month) contact_key = eval_type.replace(" ", "_") user_submission_dates = sorted( row["date"] for row in leaderboard_data.contact_infos.get(contact_key, []) if row["username"] == profile.username ) current_month = datetime.now().strftime("%Y-%m") current_month_submissions = [ date for date in user_submission_dates if date.startswith(current_month) ] if len(current_month_submissions) >= 5: yield f"⚠️ You have reached the monthly submission limit of 5 submissions. Please try again next month." return # Validate file submission if path_to_file is None: yield "⚠️ Please upload a file." return if not (path_to_file.endswith(".npz") or path_to_file.endswith(".json")): yield "⚠️ Please submit a valid npz or json file" return # Evaluate the submission yield "⚙️ Evaluating your submission...(do not close/refresh this page!)" metrics = evaluate( leaderboard_data.target_paths[eval_type], path_to_file, eval_type, ) submission_time = datetime.today().strftime("%Y-%m-%d-%H:%M") # Upload submission file yield "☁️ Uploading submission file..." api.upload_file( repo_id=SUBMISSION_DATASET, path_or_fileobj=path_to_file, path_in_repo=f"{organization}/{model}/submissions/{training_set}/{contact_key}_{submission_time}_{os.path.basename(path_to_file)}", repo_type="dataset", token=TOKEN, ) # Update leaderboard data yield "📋 Updating leaderboard data..." eval_results, _ = leaderboard_data.load_eval_data() eval_entry = { "Model": model, "Organization": organization, "Submission date": submission_time, "Training Set": training_set, "Energy Conserving": energy_conserving, "model_url": model_url, "paper_url": paper_url, "Notes": additional_info, } eval_entry.update(metrics) if eval_type not in eval_results: eval_results[eval_type] = Dataset.from_dict( {k: [v] for k, v in eval_entry.items()} ) else: eval_results[eval_type] = eval_results[eval_type].add_item(eval_entry) data_file_name = leaderboard_data.result_paths[eval_type] # Upload results yield "💾 Saving results to database..." with tempfile.NamedTemporaryFile(suffix=".parquet") as tmp_file: eval_results[eval_type].to_parquet(tmp_file.name) api.upload_file( repo_id=RESULTS_DATASET, path_or_fileobj=tmp_file.name, path_in_repo=f"data/{data_file_name}", repo_type="dataset", token=TOKEN, ) # Save contact information contact_info = { "model": model, "organization": organization, "username": profile.username, "email": mail, "date": submission_time, } if contact_key not in leaderboard_data.contact_infos: leaderboard_data.contact_infos[contact_key] = Dataset.from_dict( {k: [v] for k, v in contact_info.items()} ) else: leaderboard_data.contact_infos[contact_key] = ( leaderboard_data.contact_infos[contact_key].add_item(contact_info) ) leaderboard_data.contact_infos.push_to_hub(CONTACT_DATASET, token=TOKEN) success_str = f"✅ Model {model} is successfully evaluated and stored in our database.\nPlease wait an hour and refresh the leaderboard to see your results displayed." yield success_str except Exception as e: print(f"Error during submission: {e}") yield ( f"An error occurred, please open a discussion/issue if you continue to have submission issues.\n{e}" ) def transform_time(date_str): dt = datetime.strptime(date_str, "%Y-%m-%d-%H:%M") return dt.strftime("%Y-%m-%d") def create_dataframe_tab( tab_name: str, df: pd.DataFrame, datatype: List[str] = None, widths: List[str] = None, ) -> gr.Tab: """ Create a tab with a dataframe. """ if datatype is None: datatype = TYPES if widths is None: num_cols = len(df.columns) fixed_cols = len(PRE_COLUMN_NAMES) + len(POST_COLUMN_NAMES) # Model | Organization |Energy Conserving | Training Set | Metrics | date widths = ["10%", "5%", "5%", "5%"] + ["5%"] * (num_cols - fixed_cols) + ["10%"] if "Submission date" in df.columns: df["Submission date"] = df["Submission date"].apply(transform_time) cm = sns.color_palette("viridis_r", as_cmap=True) df = df.style.format(precision=2).background_gradient(cmap=cm) with gr.Tab(tab_name) as tab: gr.Dataframe( value=df, datatype=datatype, interactive=False, show_search="filter", column_widths=widths, show_copy_button=True, ) return tab def create_s2ef_tabs(split: str, results_dfs: Dict[str, pd.DataFrame]) -> None: """ Create S2EF tabs for a given split (Validation/Test). """ subsplit_names = { "all": "All", "biomolecules": "Biomolecules", "electrolytes": "Electrolytes", "metal_complexes": "Metal Complexes", "neutral_organics": "Neutral Organics", } for subsplit, display_name in subsplit_names.items(): df_key = f"{split}_{subsplit}" create_dataframe_tab(display_name, results_dfs[df_key]) def create_evaluation_tabs(results_dfs: Dict[str, pd.DataFrame]) -> None: """ Create evaluation tabs for non-S2EF evaluations, including Overview tab. """ # Create Overview tab first overview_df = create_overview_dataframe(results_dfs) n_overview_columns = len(overview_df.columns) create_dataframe_tab( "Overview", overview_df, widths=["15%"] + ["10%"] * (n_overview_columns - 1) ) # Create individual evaluation tabs for eval_type in OTHER_EVAL_TYPES: display_name = "IE/EA" if eval_type == "IE_EA" else eval_type create_dataframe_tab(display_name, results_dfs[eval_type]) def create_overview_dataframe(results_dfs: Dict[str, pd.DataFrame]) -> pd.DataFrame: """ Create an overview dataframe combining all models with only the first metric from each eval type. """ model_info = {} for eval_type, df in results_dfs.items(): if eval_type.startswith("Validation_") or eval_type.startswith("Test_"): continue for _, row in df.iterrows(): model_name = row["Model"] dataset = row["Training Set"] model_entry = (model_name, dataset) model_info[model_entry] = { "Model": model_name, "Organization": row.get("Organization", ""), "Energy Conserving": row.get("Energy\nConserving", ""), "Training Set": dataset, } overview_data = { "Model": [], "Organization": [], "Energy\nConserving": [], "Training Set": [], } metric_columns = {} for eval_type in OTHER_EVAL_TYPES: if eval_type in results_dfs and eval_type in LEADERBOARD_COLUMNS: metric_display_name = COLUMN_MAPPING[LEADERBOARD_COLUMNS[eval_type][0]] task_display_name = "IE/EA" if eval_type == "IE_EA" else eval_type full_display_name = f"{task_display_name}\n{metric_display_name}" overview_data[full_display_name] = [] metric_columns[full_display_name] = (eval_type, metric_display_name) all_model_entries = model_info.keys() model_rankings = defaultdict(list) for model_entry in sorted(all_model_entries, key=lambda x: (x[0], x[1])): model_name, dataset = model_entry entry_info = model_info[model_entry] overview_data["Model"].append(entry_info["Model"]) overview_data["Organization"].append(entry_info["Organization"]) overview_data["Energy\nConserving"].append(entry_info["Energy Conserving"]) overview_data["Training Set"].append(entry_info["Training Set"]) # Fill in metrics for each column for display_col, (eval_type, source_col) in metric_columns.items(): if eval_type in results_dfs: df = results_dfs[eval_type].reset_index(drop=True) # Match both model name and training set model_row = df[ (df["Model"] == model_name) & (df["Training Set"] == dataset) ] if not model_row.empty and source_col in model_row.columns: value = model_row.iloc[0][source_col] rank = model_row.index[0] else: value = np.nan rank = df.shape[0] overview_data[display_col].append(value) model_rankings[model_entry].append(rank) overview_df = pd.DataFrame(overview_data) def get_rank(row): model_name = row["Model"] dataset = row["Training Set"] rank = np.mean(model_rankings[(model_name, dataset)]) return rank overview_df["overall_rank"] = overview_df.apply(get_rank, axis=1) overview_df = overview_df.sort_values(by="overall_rank").drop( columns=["overall_rank"] ) return overview_df def create_submission_interface() -> Tuple[gr.components.Component, ...]: """ Create the submission interface components. """ with gr.Accordion("Submit predictions for evaluation"): with gr.Row(): gr.Markdown(SUBMISSION_TEXT, elem_classes="markdown-text") with gr.Row(): with gr.Column(): model_name_textbox = gr.Textbox(label="Model name") energy_conserving = gr.Checkbox( label="Is the model energy conserving? (i.e. F= -dE/dx)" ) model_url = gr.Textbox(label="Model/Checkpoint URL") paper_url = gr.Textbox(label="Paper URL") dataset = gr.Dropdown( choices=["OMol-102M", "OMol-4M", "UMA-459M", "Other"], label="Training set", interactive=True, ) additional_info = gr.Textbox( label="Additional info (cutoff radius, # of params, etc.)" ) organization = gr.Textbox(label="Organization") mail = gr.Textbox( label="Contact email (will be stored privately, & used if there is an issue with your submission)" ) with gr.Column(): file_output = gr.File() with gr.Row(): eval_type = gr.Dropdown( choices=ALL_EVAL_TYPES, label="Eval Type", interactive=True, ) with gr.Column(): gr.LoginButton() with gr.Column(): submit_button = gr.Button("Submit Eval") submission_result = gr.Textbox(label="Status") return ( submit_button, file_output, eval_type, organization, model_name_textbox, model_url, paper_url, energy_conserving, mail, dataset, additional_info, submission_result, ) def create_interface() -> gr.Blocks: """ Create the complete Gradio interface. """ # Load data _, results_dfs = leaderboard_data.load_eval_data() demo = gr.Blocks() with demo: gr.HTML(TITLE) gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text") # Citation section with gr.Row(): with gr.Accordion("📙 Citation", open=False): gr.Markdown(CITATION_BUTTON_LABEL) gr.Markdown(CITATION_BUTTON_TEXT) # Evaluation results gr.Markdown("## Evaluations", elem_classes="markdown-text") with gr.Row(): create_evaluation_tabs(results_dfs) gr.Markdown( "**Overview rankings based on average rank across all evaluations", elem_classes="markdown-text", ) # S2EF Results tabs gr.Markdown("## S2EF", elem_classes="markdown-text") with gr.Tab("Test"): create_s2ef_tabs("Test", results_dfs) with gr.Tab("Validation"): create_s2ef_tabs("Validation", results_dfs) ( submit_button, file_output, eval_type, organization, model_name_textbox, model_url, paper_url, energy_conserving, mail, dataset, additional_info, submission_result, ) = create_submission_interface() submit_button.click( add_new_eval, [ file_output, eval_type, organization, model_name_textbox, model_url, paper_url, energy_conserving, mail, dataset, additional_info, ], submission_result, ) return demo def restart_space(): api.restart_space(repo_id=LEADERBOARD_PATH, token=TOKEN) def main(): demo = create_interface() scheduler = BackgroundScheduler() scheduler.add_job(restart_space, "interval", seconds=3600) scheduler.start() # Launch the demo demo.launch(debug=True) if __name__ == "__main__": main()