import threading import time from datetime import datetime, timezone, timedelta import os import requests import pandas as pd from datasets import Dataset, get_dataset_config_names from datasets.exceptions import DatasetNotFoundError from pandas.api.types import is_integer_dtype import gradio as gr from src.datamodel.data import F1Data from src.display.formatting import styled_error, styled_message from src.display.utils import ModelType from src.envs import SUBMISSIONS_REPO, TOKEN from src.logger import get_logger from src.validation.validate import is_submission_file_valid, is_valid logger = get_logger(__name__) MIN_WAIT_TIME_PER_USER_HRS = 24 RATE_LIMIT_WINDOW_HRS = 24 MAX_SUBMISSIONS_PER_WINDOW = 10 submission_lock = threading.Lock() def add_new_solutions( lbdb: F1Data, username: str, user_id: str, system_name: str, org: str, submission_path: str, is_warmup_dataset: bool, ensure_all_present: bool = False, ): with submission_lock: try: submitted_ids = get_dataset_config_names(SUBMISSIONS_REPO, token=TOKEN) except (DatasetNotFoundError, FileNotFoundError): submitted_ids = [] if submitted_ids == ["default"]: # means empty dataset submitted_ids = [] logger.info(f"Found {len(submitted_ids)} submissions") # Rate limits: # 1. Users must wait MIN_WAIT_TIME_PER_USER_HRS hours between submissions. # 2. No more than MAX_SUBMISSIONS_PER_WINDOW submissions RATE_LIMIT_WINDOW_HRS hours overall. sub_df = pd.DataFrame.from_dict( { "submission_id": submitted_ids, "user_id": map(_submission_id_to_user_id, submitted_ids), "timestamp": map(_submission_id_to_timestamp, submitted_ids), } ) # Per user limit now = datetime.now(timezone.utc) cutoff_user = now - timedelta(hours=MIN_WAIT_TIME_PER_USER_HRS) user_last_submission_ts = sub_df[sub_df.user_id == user_id].timestamp.max() if pd.notna(user_last_submission_ts) and user_last_submission_ts > cutoff_user: remaining_hrs = (user_last_submission_ts - cutoff_user).total_seconds() / 3600 logger.info(f"{username} must wait {remaining_hrs:.2f} more hours.") return styled_error( f"You must wait {MIN_WAIT_TIME_PER_USER_HRS} hours between submissions. " f"Remaining wait time: {remaining_hrs:.2f} hours" ) # Overall limit cutoff_overall = now - timedelta(hours=RATE_LIMIT_WINDOW_HRS) if len(sub_df.timestamp > cutoff_overall) >= MAX_SUBMISSIONS_PER_WINDOW: logger.info( f"Too many submissions in the last {RATE_LIMIT_WINDOW_HRS} hours: {len(sub_df.timestamp > cutoff_overall)}." ) return styled_error("The leaderboard has reached its submission capacity for now. Please try again later.") logger.info( f"Adding new submission: {system_name=}, {org=}, and {submission_path=}", ) # Double-checking. for val in [system_name, org]: assert is_valid(val) assert is_submission_file_valid(submission_path, is_warmup_dataset=is_warmup_dataset) try: submission_df = pd.read_json(submission_path, lines=True) if ensure_all_present: _validate_all_submissions_present(lbdb=lbdb, pd_ds=submission_df) except Exception: logger.warning("Failed to parse submission DF!", exc_info=True) return styled_error( "An error occurred. Please try again later." ) # Use same message as external error. Avoid infoleak. submission_id = f"{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}_{username}_{user_id}" # Seems good, creating the eval. logger.info(f"Adding new submission: {submission_id}") submission_ts = time.time_ns() def add_info(row): return { **row, "system_name": system_name, "organization": org, "submission_id": submission_id, "submission_ts": submission_ts, "evaluation_id": "", # This will be set later when the evaluation is launched in the backend "evaluation_start_ts": "", # This will be set when the evaluation starts } ds = Dataset.from_pandas(submission_df).map(add_info) with submission_lock: ds.push_to_hub( SUBMISSIONS_REPO, submission_id, private=True, ) return styled_message( "Your request has been submitted to the evaluation queue!\n" + "Results may take up to 24 hours to be processed and shown in the leaderboard." ) def fetch_sub_claim(oauth_token: gr.OAuthToken | None) -> dict | None: if oauth_token is None: return None provider = os.getenv("OPENID_PROVIDER_URL") if not provider: return None try: oidc_meta = requests.get(f"{provider}/.well-known/openid-configuration", timeout=5) oidc_meta = oidc_meta.json() userinfo_ep = oidc_meta["userinfo_endpoint"] claims = requests.get(userinfo_ep, headers={"Authorization": f"Bearer {oauth_token.token}"}, timeout=5) logger.info(f"userinfo_endpoint response: status={claims.status_code}\nheaders={dict(claims.headers)}") claims = claims.json() # Typical fields: sub (stable id), preferred_username, name, picture return { "sub": claims.get("sub"), "preferred_username": claims.get("preferred_username"), "name": claims.get("name"), } except Exception as e: logger.warning(f"Failed to fetch user claims: {e}") return None def fetch_user_info(oauth_token: gr.OAuthToken | None) -> dict | None: if oauth_token is None: return None try: headers = {"Authorization": f"Bearer {oauth_token.token}"} logger.info("HEADERS %s", headers) r = requests.get("https://huggingface.co/api/whoami-v2", headers=headers) logger.info("RESP CODE %s", r.status_code) logger.info("RESP content %s", r.text) if r.status_code != 200: return None return r.json() except: logger.exception("Cannot get user info") return None def _validate_all_submissions_present( lbdb: F1Data, pd_ds: pd.DataFrame, ): logger.info(f"Validating DS size {len(pd_ds)} columns {pd_ds.columns} set {set(pd_ds.columns)}") expected_cols = ["problem_id", "solution"] if set(pd_ds.columns) != set(expected_cols): return ValueError(f"Expected attributes: {expected_cols}, Got: {pd_ds.columns.tolist()}") if not is_integer_dtype(pd_ds["problem_id"]): return ValueError("problem_id must be str convertible to int") if any(type(v) is not str for v in pd_ds["solution"]): return ValueError("solution must be of type str") submitted_ids = set(pd_ds.problem_id.astype(str)) if submitted_ids != lbdb.code_problem_ids: missing = lbdb.code_problem_ids - submitted_ids unknown = submitted_ids - lbdb.code_problem_ids raise ValueError(f"Mismatched problem IDs: {len(missing)} missing, {len(unknown)} unknown") if len(pd_ds) > len(lbdb.code_problem_ids): return ValueError("Duplicate problem IDs exist in uploaded file") def _submission_id_to_user_id(submission_id: str) -> str: """ Extracts the user ID from the submission ID: "YYYYMMDD_HHMMSS_username_userid" """ return submission_id.rsplit("_", 1)[-1] def _submission_id_to_timestamp(submission_id: str) -> datetime: """ Extracts the timestamp from the submission ID: "YYYYMMDD_HHMMSS_username_userid" """ ts_str = "_".join(submission_id.split("_", 2)[:2]) return datetime.strptime(ts_str, "%Y%m%d_%H%M%S").replace(tzinfo=timezone.utc)