"""Shared functions for the Trackio UI.""" import os import gradio as gr import huggingface_hub as hf try: import trackio.utils as utils from trackio.sqlite_storage import SQLiteStorage from trackio.ui.helpers.run_selection import RunSelection except ImportError: import utils from sqlite_storage import SQLiteStorage from ui.helpers.run_selection import RunSelection CONFIG_COLUMN_MAPPINGS = { "_Username": "Username", "_Created": "Created", "_Group": "Group", } CONFIG_COLUMN_MAPPINGS_REVERSE = {v: k for k, v in CONFIG_COLUMN_MAPPINGS.items()} HfApi = hf.HfApi() def get_project_info() -> str | None: dataset_id = os.environ.get("TRACKIO_DATASET_ID") space_id = utils.get_space() if utils.persistent_storage_enabled(): return "✨ Persistent Storage is enabled, logs are stored directly in this Space." if dataset_id: sync_status = utils.get_sync_status(SQLiteStorage.get_scheduler()) upgrade_message = f"New changes are synced every 5 min To avoid losing data between syncs, click here to open this Space's settings and add Persistent Storage. Make sure data is synced prior to enabling." if sync_status is not None: info = f"↻ Backed up {sync_status} min ago to {dataset_id} | {upgrade_message}" else: info = f"↻ Not backed up yet to {dataset_id} | {upgrade_message}" return info return None def get_projects(request: gr.Request): projects = SQLiteStorage.get_projects() if project := request.query_params.get("project"): interactive = False else: interactive = True if selected_project := request.query_params.get("selected_project"): project = selected_project else: project = projects[0] if projects else None return gr.Dropdown( label="Project", choices=projects, value=project, allow_custom_value=True, interactive=interactive, info=get_project_info(), ) def update_navbar_value(project_dd, request: gr.Request): write_token = None if hasattr(request, "query_params") and request.query_params: write_token = request.query_params.get("write_token") metrics_url = f"?selected_project={project_dd}" runs_url = f"runs?selected_project={project_dd}" if write_token: metrics_url += f"&write_token={write_token}" runs_url += f"&write_token={write_token}" return gr.Navbar( value=[ ("Metrics", metrics_url), ("Runs", runs_url), ] ) def check_hf_token_has_write_access(hf_token: str | None) -> None: """ Checks to see if the provided hf_token is valid and has write access to the Space that Trackio is running in. If the hf_token is valid or if Trackio is not running on a Space, this function does nothing. Otherwise, it raises a PermissionError. """ if os.getenv("SYSTEM") == "spaces": # if we are running in Spaces # check auth token passed in if hf_token is None: raise PermissionError( "Expected a HF_TOKEN to be provided when logging to a Space" ) who = HfApi.whoami(hf_token) owner_name = os.getenv("SPACE_AUTHOR_NAME") repo_name = os.getenv("SPACE_REPO_NAME") # make sure the token user is either the author of the space, # or is a member of an org that is the author. orgs = [o["name"] for o in who["orgs"]] if owner_name != who["name"] and owner_name not in orgs: raise PermissionError( "Expected the provided hf_token to be the user owner of the space, or be a member of the org owner of the space" ) # reject fine-grained tokens without specific repo access access_token = who["auth"]["accessToken"] if access_token["role"] == "fineGrained": matched = False for item in access_token["fineGrained"]["scoped"]: if ( item["entity"]["type"] == "space" and item["entity"]["name"] == f"{owner_name}/{repo_name}" and "repo.write" in item["permissions"] ): matched = True break if ( ( item["entity"]["type"] == "user" or item["entity"]["type"] == "org" ) and item["entity"]["name"] == owner_name and "repo.write" in item["permissions"] ): matched = True break if not matched: raise PermissionError( "Expected the provided hf_token with fine grained permissions to provide write access to the space" ) # reject read-only tokens elif access_token["role"] != "write": raise PermissionError( "Expected the provided hf_token to provide write permissions" ) def check_oauth_token_has_write_access(oauth_token: str | None) -> None: """ Checks to see if the oauth token provided via Gradio's OAuth is valid and has write access to the Space that Trackio is running in. If the oauth token is valid or if Trackio is not running on a Space, this function does nothing. Otherwise, it raises a PermissionError. """ if not os.getenv("SYSTEM") == "spaces": return if oauth_token is None: raise PermissionError( "Expected an oauth to be provided when logging to a Space" ) who = HfApi.whoami(oauth_token) user_name = who["name"] owner_name = os.getenv("SPACE_AUTHOR_NAME") if user_name == owner_name: return # check if user is a member of an org that owns the space with write permissions for org in who["orgs"]: if org["name"] == owner_name and org["roleInOrg"] == "write": return raise PermissionError( "Expected the oauth token to be the user owner of the space, or be a member of the org owner of the space" ) def get_group_by_fields(project: str): configs = SQLiteStorage.get_all_run_configs(project) if project else {} keys = set() for config in configs.values(): keys.update(config.keys()) keys.discard("_Created") keys = [CONFIG_COLUMN_MAPPINGS.get(key, key) for key in keys] choices = [None] + sorted(keys) return gr.Dropdown( choices=choices, value=None, interactive=True, ) def group_runs_by_config( project: str, config_key: str, filter_text: str | None = None ) -> dict[str, list[str]]: if not project or not config_key: return {} display_key = config_key config_key = CONFIG_COLUMN_MAPPINGS_REVERSE.get(config_key, config_key) configs = SQLiteStorage.get_all_run_configs(project) groups: dict[str, list[str]] = {} for run_name, config in configs.items(): if filter_text and filter_text not in run_name: continue group_name = config.get(config_key, "None") label = f"{display_key}: {group_name}" groups.setdefault(label, []).append(run_name) for label in groups: groups[label].sort() sorted_groups = dict(sorted(groups.items(), key=lambda kv: kv[0].lower())) return sorted_groups def run_checkbox_update(selection: RunSelection, **kwargs) -> gr.CheckboxGroup: return gr.CheckboxGroup( choices=selection.choices, value=selection.selected, **kwargs, ) def handle_run_checkbox_change( selected_runs: list[str] | None, selection: RunSelection ) -> RunSelection: selection.select(selected_runs or []) return selection def handle_group_checkbox_change( group_selected: list[str] | None, selection: RunSelection, group_runs: list[str] | None, ): subset, _ = selection.replace_group(group_runs or [], group_selected or []) return ( selection, gr.CheckboxGroup(value=subset), run_checkbox_update(selection), ) def handle_group_toggle( select_all: bool, selection: RunSelection, group_runs: list[str] | None, ): target = list(group_runs or []) if select_all else [] subset, _ = selection.replace_group(group_runs or [], target) return ( selection, gr.CheckboxGroup(value=subset), run_checkbox_update(selection), )