tcid / data.py
ror's picture
ror HF Staff
Updated important models list
8e403de
from huggingface_hub import HfFileSystem
import pandas as pd
from utils import logger
from datetime import datetime
import threading
import traceback
import json
import re
# NOTE: if caching is an issue, try adding `use_listings_cache=False`
fs = HfFileSystem()
IMPORTANT_MODELS = [
"auto",
"bert", # old but dominant (encoder only)
"gpt2", # old (decoder)
"t5", # old (encoder-decoder)
"modernbert", # (encoder only)
"vit", # old (vision) - fixed comma
"clip", # old but dominant (vision)
"detr", # objection detection, segmentation (vision)
"table_transformer", # objection detection (visioin) - maybe just detr?
"got_ocr2", # ocr (vision)
"whisper", # old but dominant (audio)
"wav2vec2", # old (audio)
"qwen2_audio", # (audio)
"speech_t5", # (audio)
"csm", # (audio)
"llama", # new and dominant (meta)
"gemma3", # new (google)
"qwen2", # new (Alibaba)
"mistral3", # new (Mistral) - added missing comma
"qwen2_5_vl", # new (vision)
"llava", # many models from it (vision)
"smolvlm", # new (video)
"internvl", # new (video)
"gemma3n", # new (omnimodal models)
"qwen2_5_omni", # new (omnimodal models)
# "gpt_oss", # new (quite used)
"qwen2_5_omni", # new (omnimodal models)
]
KEYS_TO_KEEP = [
"success_amd",
"success_nvidia",
"skipped_amd",
"skipped_nvidia",
"failed_multi_no_amd",
"failed_multi_no_nvidia",
"failed_single_no_amd",
"failed_single_no_nvidia",
"failures_amd",
"failures_nvidia",
"job_link_amd",
"job_link_nvidia",
]
def log_dataframe_link(link: str) -> str:
"""
Adds the link to the dataset in the logs, modifies it to get a clockable link and then returns the date of the
report.
"""
logger.info(f"Reading df located at {link}")
# Make sure the links starts with an http adress
if link.startswith("hf://"):
link = "https://huggingface.co/" + link.removeprefix("hf://")
# Pattern to match transformers_daily_ci followed by any path, then a date (YYYY-MM-DD format)
pattern = r'transformers_daily_ci(.*?)/(\d{4}-\d{2}-\d{2})'
match = re.search(pattern, link)
# Failure case:
if not match:
logger.error("Could not find transformers_daily_ci and.or date in the link")
return "9999-99-99"
# Replace the path between with blob/main
path_between = match.group(1)
link = link.replace("transformers_daily_ci" + path_between, "transformers_daily_ci/blob/main")
logger.info(f"Link to data source: {link}")
# Return the date
return match.group(2)
def infer_latest_update_msg(date_df_amd: str, date_df_nvidia: str) -> str:
# Early return if one of the dates is invalid
if date_df_amd.startswith("9999") and date_df_nvidia.startswith("9999"):
return "could not find last update time"
# Warn if dates are not the same
if date_df_amd != date_df_nvidia:
logger.warning(f"Different dates found: {date_df_amd} (AMD) vs {date_df_nvidia} (NVIDIA)")
# Take the latest date and format it
try:
latest_date = max(date_df_amd, date_df_nvidia)
yyyy, mm, dd = latest_date.split("-")
return f"last updated {mm}/{dd}/{yyyy}"
except Exception as e:
logger.error(f"When trying to infer latest date, got error {e}")
return "could not find last update time"
def read_one_dataframe(json_path: str, device_label: str) -> tuple[pd.DataFrame, str]:
df_upload_date = log_dataframe_link(json_path)
df = pd.read_json(json_path, orient="index")
df.index.name = "model_name"
df[f"failed_multi_no_{device_label}"] = df["failures"].apply(lambda x: len(x["multi"]) if "multi" in x else 0)
df[f"failed_single_no_{device_label}"] = df["failures"].apply(lambda x: len(x["single"]) if "single" in x else 0)
return df, df_upload_date
def get_first_working_df(file_list: list[str]) -> str:
for file in file_list:
job_links = file.rsplit('/', 1)[0] + "/job_links.json"
try:
links = pd.read_json(f"hf://{job_links}", typ="series")
has_one_working_link = any(links.values)
except Exception as e:
logger.error(f"Could not read job links from {job_links}: {e}")
has_one_working_link = False
if has_one_working_link:
return file
logger.warning(f"Skipping {file} as it has no working job links.")
raise RuntimeError("Could not find any working dataframe in the provided list.")
def get_distant_data() -> tuple[pd.DataFrame, str]:
# Retrieve AMD dataframe
amd_src = "hf://datasets/optimum-amd/transformers_daily_ci/**/runs/**/ci_results_run_models_gpu/model_results.json"
files_amd = sorted(fs.glob(amd_src, refresh=True), reverse=True)
file_amd = get_first_working_df(files_amd)
df_amd, date_df_amd = read_one_dataframe(f"hf://{file_amd}", "amd")
# Retrieve NVIDIA dataframe, which pattern should be:
# hf://datasets/hf-internal-testing`/transformers_daily_ci/raw/main/YYYY-MM-DD/ci_results_run_models_gpu/model_results.json
nvidia_src = "hf://datasets/hf-internal-testing/transformers_daily_ci/*/ci_results_run_models_gpu/model_results.json"
files_nvidia = sorted(fs.glob(nvidia_src, refresh=True), reverse=True)
# NOTE: should this be removeprefix instead of lstrip?
nvidia_path = files_nvidia[0].lstrip('datasets/hf-internal-testing/transformers_daily_ci/')
nvidia_path = "https://huggingface.co/datasets/hf-internal-testing/transformers_daily_ci/raw/main/" + nvidia_path
df_nvidia, date_df_nvidia = read_one_dataframe(nvidia_path, "nvidia")
# Infer and format the latest df date
latest_update_msg = infer_latest_update_msg(date_df_amd, date_df_nvidia)
# Join both dataframes
joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer")
joined = joined[KEYS_TO_KEEP]
joined.index = joined.index.str.replace("^models_", "", regex=True)
# Fitler out all but important models
important_models_lower = [model.lower() for model in IMPORTANT_MODELS]
filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)]
# Warn for ach missing important models
for model in IMPORTANT_MODELS:
if model not in filtered_joined.index:
print(f"[WARNING] Model {model} was missing from index.")
return filtered_joined, latest_update_msg
def get_sample_data() -> tuple[pd.DataFrame, str]:
# Retrieve sample dataframes
df_amd, _ = read_one_dataframe("sample_amd.json", "amd")
df_nvidia, _ = read_one_dataframe("sample_nvidia.json", "nvidia")
# Join both dataframes
joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer")
joined = joined[KEYS_TO_KEEP]
joined.index = joined.index.str.replace("^models_", "", regex=True)
# Fitler out all but important models
important_models_lower = [model.lower() for model in IMPORTANT_MODELS]
filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)]
# Prefix all model names with "sample_"
filtered_joined.index = "sample_" + filtered_joined.index
return filtered_joined, "sample data was loaded"
def safe_extract(row: pd.DataFrame, key: str) -> int:
return int(row.get(key, 0)) if pd.notna(row.get(key, 0)) else 0
def extract_model_data(row: pd.Series) -> tuple[dict[str, int], dict[str, int], int, int, int, int]:
"""Extract and process model data from DataFrame row."""
# Handle missing values and get counts directly from dataframe
success_nvidia = safe_extract(row, "success_nvidia")
success_amd = safe_extract(row, "success_amd")
skipped_nvidia = safe_extract(row, "skipped_nvidia")
skipped_amd = safe_extract(row, "skipped_amd")
failed_multi_amd = safe_extract(row, 'failed_multi_no_amd')
failed_multi_nvidia = safe_extract(row, 'failed_multi_no_nvidia')
failed_single_amd = safe_extract(row, 'failed_single_no_amd')
failed_single_nvidia = safe_extract(row, 'failed_single_no_nvidia')
# Calculate total failures
total_failed_amd = failed_multi_amd + failed_single_amd
total_failed_nvidia = failed_multi_nvidia + failed_single_nvidia
# Create stats dictionaries directly from dataframe values
amd_stats = {
'passed': success_amd,
'failed': total_failed_amd,
'skipped': skipped_amd,
'error': 0 # Not available in this dataset
}
nvidia_stats = {
'passed': success_nvidia,
'failed': total_failed_nvidia,
'skipped': skipped_nvidia,
'error': 0 # Not available in this dataset
}
return amd_stats, nvidia_stats, failed_multi_amd, failed_single_amd, failed_multi_nvidia, failed_single_nvidia
class CIResults:
def __init__(self):
self.df = pd.DataFrame()
self.available_models = []
self.latest_update_msg = ""
def load_data(self) -> None:
"""Load data from the data source."""
# Try loading the distant data, and fall back on sample data for local tinkering
try:
logger.info("Loading distant data...")
new_df, latest_update_msg = get_distant_data()
self.latest_update_msg = latest_update_msg
except Exception as e:
error_msg = [
"Loading data failed:",
"-" * 120,
traceback.format_exc(),
"-" * 120,
"Falling back on sample data."
]
logger.error("\n".join(error_msg))
new_df, latest_update_msg = get_sample_data()
self.latest_update_msg = latest_update_msg
# Update attributes
self.df = new_df
self.available_models = new_df.index.tolist()
# Log and return distant load status
logger.info(f"Data loaded successfully: {len(self.available_models)} models")
logger.info(f"Models: {self.available_models[:5]}{'...' if len(self.available_models) > 5 else ''}")
logger.info(f"Latest update message: {self.latest_update_msg}")
# Log a preview of the df
msg = {}
for model in self.available_models[:3]:
msg[model] = {}
for col in self.df.columns:
value = self.df.loc[model, col]
if not isinstance(value, int):
value = str(value)
if len(value) > 10:
value = value[:10] + "..."
msg[model][col] = value
logger.info(json.dumps(msg, indent=4))
def schedule_data_reload(self):
"""Schedule the next data reload."""
def reload_data():
self.load_data()
# Schedule the next reload in 15 minutes (900 seconds)
timer = threading.Timer(900.0, reload_data)
timer.daemon = True # Dies when main thread dies
timer.start()
logger.info("Next data reload scheduled in 15 minutes")
# Start the first reload timer
timer = threading.Timer(900.0, reload_data)
timer.daemon = True
timer.start()
logger.info("Data auto-reload scheduled every 15 minutes")