from huggingface_hub import HfFileSystem import pandas as pd from utils import logger from datetime import datetime, timedelta import threading import traceback import json import re from typing import List, Tuple, Optional # NOTE: if caching is an issue, try adding `use_listings_cache=False` fs = HfFileSystem() IMPORTANT_MODELS = [ "auto", "bert", # old but dominant (encoder only) "gpt2", # old (decoder) "t5", # old (encoder-decoder) "modernbert", # (encoder only) "vit", # old (vision) - fixed comma "clip", # old but dominant (vision) "detr", # objection detection, segmentation (vision) "table-transformer", # objection detection (visioin) - maybe just detr? "got_ocr2", # ocr (vision) "whisper", # old but dominant (audio) "wav2vec2", # old (audio) "llama", # new and dominant (meta) "gemma3", # new (google) "qwen2", # new (Alibaba) "mistral3", # new (Mistral) - added missing comma "qwen2_5_vl", # new (vision) "llava", # many models from it (vision) "smolvlm", # new (video) "internvl", # new (video) "gemma3n", # new (omnimodal models) "qwen2_5_omni", # new (omnimodal models) ] KEYS_TO_KEEP = [ "success_amd", "success_nvidia", "skipped_amd", "skipped_nvidia", "failed_multi_no_amd", "failed_multi_no_nvidia", "failed_single_no_amd", "failed_single_no_nvidia", "failures_amd", "failures_nvidia", "job_link_amd", "job_link_nvidia", ] def log_dataframe_link(link: str) -> str: """ Adds the link to the dataset in the logs, modifies it to get a clockable link and then returns the date of the report. """ if link.startswith("sample_"): return "9999-99-99" logger.info(f"Reading df located at {link}") # Make sure the links starts with an http adress if link.startswith("hf://"): link = "https://huggingface.co/" + link.removeprefix("hf://") # Pattern to match transformers_daily_ci followed by any path, then a date (YYYY-MM-DD format) pattern = r'transformers_daily_ci(.*?)/(\d{4}-\d{2}-\d{2})' match = re.search(pattern, link) # Failure case: if not match: logger.error("Could not find transformers_daily_ci and.or date in the link") return "9999-99-99" # Replace the path between with blob/main path_between = match.group(1) link = link.replace("transformers_daily_ci" + path_between, "transformers_daily_ci/blob/main") logger.info(f"Link to data source: {link}") # Return the date return match.group(2) def infer_latest_update_msg(date_df_amd: str, date_df_nvidia: str) -> str: # Early return if one of the dates is invalid if date_df_amd.startswith("9999") and date_df_nvidia.startswith("9999"): return "could not find last update time" # Warn if dates are not the same if date_df_amd != date_df_nvidia: logger.warning(f"Different dates found: {date_df_amd} (AMD) vs {date_df_nvidia} (NVIDIA)") # Take the latest date and format it try: latest_date = max(date_df_amd, date_df_nvidia) yyyy, mm, dd = latest_date.split("-") return f"last updated {mm}/{dd}/{yyyy}" except Exception as e: logger.error(f"When trying to infer latest date, got error {e}") return "could not find last update time" def read_one_dataframe(json_path: str, device_label: str) -> tuple[pd.DataFrame, str]: df_upload_date = log_dataframe_link(json_path) df = pd.read_json(json_path, orient="index") df.index.name = "model_name" df[f"failed_multi_no_{device_label}"] = df["failures"].apply(lambda x: len(x["multi"]) if "multi" in x else 0) df[f"failed_single_no_{device_label}"] = df["failures"].apply(lambda x: len(x["single"]) if "single" in x else 0) return df, df_upload_date def get_available_dates() -> List[str]: """Get list of available dates from both AMD and NVIDIA datasets.""" try: # Get AMD dates - the path structure is: YYYY-MM-DD/runs/{run_id}/ci_results_run_models_gpu/model_results.json amd_src = "hf://datasets/optimum-amd/transformers_daily_ci/**/runs/**/ci_results_run_models_gpu/model_results.json" files_amd = sorted(fs.glob(amd_src, refresh=True), reverse=True) logger.info(f"Found {len(files_amd)} AMD files") # Get NVIDIA dates - structure is: YYYY-MM-DD/ci_results_run_models_gpu/model_results.json nvidia_src = "hf://datasets/hf-internal-testing/transformers_daily_ci/*/ci_results_run_models_gpu/model_results.json" files_nvidia = sorted(fs.glob(nvidia_src, refresh=True), reverse=True) logger.info(f"Found {len(files_nvidia)} NVIDIA files") # Extract dates from file paths amd_dates = set() for file_path in files_amd: # Pattern to match the date in the AMD path: YYYY-MM-DD/runs/{run_id}/ci_results_run_models_gpu/model_results.json pattern = r'transformers_daily_ci/(\d{4}-\d{2}-\d{2})/runs/[^/]+/ci_results_run_models_gpu/model_results\.json' match = re.search(pattern, file_path) if match: amd_dates.add(match.group(1)) else: # Log unmatched paths for debugging logger.debug(f"AMD file path didn't match pattern: {file_path}") # Log a few example AMD file paths for debugging if files_amd: logger.info(f"Example AMD file paths: {files_amd[:3]}") nvidia_dates = set() for file_path in files_nvidia: # Pattern to match the date in the NVIDIA path: YYYY-MM-DD/ci_results_run_models_gpu/model_results.json pattern = r'transformers_daily_ci/(\d{4}-\d{2}-\d{2})/ci_results_run_models_gpu/model_results\.json' match = re.search(pattern, file_path) if match: nvidia_dates.add(match.group(1)) logger.info(f"AMD dates: {sorted(amd_dates, reverse=True)[:5]}...") # Show first 5 logger.info(f"NVIDIA dates: {sorted(nvidia_dates, reverse=True)[:5]}...") # Show first 5 # Return intersection of both datasets (dates where both have data) common_dates = sorted(amd_dates.intersection(nvidia_dates), reverse=True) logger.info(f"Common dates: {len(common_dates)} dates where both AMD and NVIDIA have data") if common_dates: return common_dates[:30] # Limit to last 30 days for performance else: # If no real dates available, generate fake dates for the last 7 days logger.warning("No real dates available, generating fake dates for demo purposes") fake_dates = [] today = datetime.now() for i in range(7): date = today - timedelta(days=i) fake_dates.append(date.strftime("%Y-%m-%d")) return fake_dates except Exception as e: logger.error(f"Error getting available dates: {e}") # Generate fake dates when there's an error logger.info("Generating fake dates due to error") fake_dates = [] today = datetime.now() for i in range(7): date = today - timedelta(days=i) fake_dates.append(date.strftime("%Y-%m-%d")) return fake_dates def get_data_for_date(target_date: str) -> tuple[pd.DataFrame, str]: """Get data for a specific date.""" try: # For AMD, we need to find the specific run file for the date # AMD structure: YYYY-MM-DD/runs/{run_id}/ci_results_run_models_gpu/model_results.json amd_src = f"hf://datasets/optimum-amd/transformers_daily_ci/{target_date}/runs/*/ci_results_run_models_gpu/model_results.json" amd_files = fs.glob(amd_src, refresh=True) if not amd_files: raise FileNotFoundError(f"No AMD data found for date {target_date}") # Use the first (most recent) run for the date amd_file = amd_files[0] # Ensure the AMD file path has the hf:// prefix if not amd_file.startswith("hf://"): amd_file = f"hf://{amd_file}" # NVIDIA structure: YYYY-MM-DD/ci_results_run_models_gpu/model_results.json nvidia_src = f"hf://datasets/hf-internal-testing/transformers_daily_ci/{target_date}/ci_results_run_models_gpu/model_results.json" # Read dataframes - try each platform independently df_amd = pd.DataFrame() df_nvidia = pd.DataFrame() try: df_amd, _ = read_one_dataframe(amd_file, "amd") logger.info(f"Successfully loaded AMD data for {target_date}") except Exception as e: logger.warning(f"Failed to load AMD data for {target_date}: {e}") try: df_nvidia, _ = read_one_dataframe(nvidia_src, "nvidia") logger.info(f"Successfully loaded NVIDIA data for {target_date}") except Exception as e: logger.warning(f"Failed to load NVIDIA data for {target_date}: {e}") # If both failed, return empty dataframe if df_amd.empty and df_nvidia.empty: logger.warning(f"No data available for either platform on {target_date}") return pd.DataFrame(), target_date # Join both dataframes (outer join to include data from either platform) if not df_amd.empty and not df_nvidia.empty: joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") elif not df_amd.empty: joined = df_amd.copy() else: joined = df_nvidia.copy() joined = joined[KEYS_TO_KEEP] joined.index = joined.index.str.replace("^models_", "", regex=True) # Filter out all but important models important_models_lower = [model.lower() for model in IMPORTANT_MODELS] filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] return filtered_joined, target_date except Exception as e: logger.error(f"Error getting data for date {target_date}: {e}") # Return empty dataframe instead of sample data for historical functionality return pd.DataFrame(), target_date def get_historical_data(start_date: str, end_date: str, sample_data = False) -> pd.DataFrame: """Get historical data for a date range.""" if sample_data: return get_fake_historical_data(start_date, end_date) try: start_dt = datetime.strptime(start_date, "%Y-%m-%d") end_dt = datetime.strptime(end_date, "%Y-%m-%d") historical_data = [] current_dt = start_dt while current_dt <= end_dt: date_str = current_dt.strftime("%Y-%m-%d") try: df, _ = get_data_for_date(date_str) # Only add non-empty dataframes if not df.empty: df['date'] = date_str historical_data.append(df) logger.info(f"Loaded data for {date_str}") else: logger.warning(f"No data available for {date_str}") except Exception as e: logger.warning(f"Could not load data for {date_str}: {e}") current_dt += timedelta(days=1) # Combine all dataframes combined_df = pd.concat(historical_data, ignore_index=False) return combined_df except Exception as e: logger.error(f"Error getting historical data: {e}") # Fall back to fake data when there's an error logger.info("Falling back to fake historical data due to error") return get_fake_historical_data(start_date, end_date) def get_distant_data() -> tuple[pd.DataFrame, str]: # Retrieve AMD dataframe amd_src = "hf://datasets/optimum-amd/transformers_daily_ci/**/runs/**/ci_results_run_models_gpu/model_results.json" files_amd = sorted(fs.glob(amd_src, refresh=True), reverse=True) df_amd, date_df_amd = read_one_dataframe(f"hf://{files_amd[0]}", "amd") # Retrieve NVIDIA dataframe, which pattern should be: # hf://datasets/hf-internal-testing`/transformers_daily_ci/raw/main/YYYY-MM-DD/ci_results_run_models_gpu/model_results.json nvidia_src = "hf://datasets/hf-internal-testing/transformers_daily_ci/*/ci_results_run_models_gpu/model_results.json" files_nvidia = sorted(fs.glob(nvidia_src, refresh=True), reverse=True) # NOTE: should this be removeprefix instead of lstrip? nvidia_path = files_nvidia[0].lstrip('datasets/hf-internal-testing/transformers_daily_ci/') nvidia_path = "https://huggingface.co/datasets/hf-internal-testing/transformers_daily_ci/raw/main/" + nvidia_path df_nvidia, date_df_nvidia = read_one_dataframe(nvidia_path, "nvidia") # Infer and format the latest df date latest_update_msg = infer_latest_update_msg(date_df_amd, date_df_nvidia) # Join both dataframes joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") joined = joined[KEYS_TO_KEEP] joined.index = joined.index.str.replace("^models_", "", regex=True) # Fitler out all but important models important_models_lower = [model.lower() for model in IMPORTANT_MODELS] filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] # Warn for ach missing important models for model in IMPORTANT_MODELS: if model not in filtered_joined.index: print(f"[WARNING] Model {model} was missing from index.") return filtered_joined, latest_update_msg def get_sample_data() -> tuple[pd.DataFrame, str]: # Retrieve sample dataframes df_amd, _ = read_one_dataframe("sample_amd.json", "amd") df_nvidia, _ = read_one_dataframe("sample_nvidia.json", "nvidia") # Join both dataframes joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") joined = joined[KEYS_TO_KEEP] joined.index = joined.index.str.replace("^models_", "", regex=True) # Fitler out all but important models important_models_lower = [model.lower() for model in IMPORTANT_MODELS] filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] # Prefix all model names with "sample_" filtered_joined.index = "sample_" + filtered_joined.index return filtered_joined, "sample data was loaded" def get_fake_historical_data(start_date: str, end_date: str) -> pd.DataFrame: """Generate fake historical data for a date range when real data loading fails.""" try: start_dt = datetime.strptime(start_date, "%Y-%m-%d") end_dt = datetime.strptime(end_date, "%Y-%m-%d") # Generate fake data for each date in the range historical_data = [] current_dt = start_dt # Get base sample data to use as template sample_df, _ = get_sample_data() while current_dt <= end_dt: date_str = current_dt.strftime("%Y-%m-%d") # Create a copy of sample data for this date with some random variations date_df = sample_df.copy() date_df['date'] = date_str # Add some random variation to make it look more realistic import random for idx in date_df.index: # Vary the success/failure counts slightly (±20%) for col in ['success_amd', 'success_nvidia', 'skipped_amd', 'skipped_nvidia']: if col in date_df.columns: original_val = date_df.loc[idx, col] if pd.notna(original_val) and original_val > 0: variation = random.uniform(0.8, 1.2) date_df.loc[idx, col] = max(0, int(original_val * variation)) # Vary failure counts more dramatically to show trends for col in ['failed_multi_no_amd', 'failed_multi_no_nvidia', 'failed_single_no_amd', 'failed_single_no_nvidia']: if col in date_df.columns: original_val = date_df.loc[idx, col] if pd.notna(original_val): # Sometimes have more failures, sometimes fewer variation = random.uniform(0.5, 2.0) date_df.loc[idx, col] = max(0, int(original_val * variation)) historical_data.append(date_df) current_dt += timedelta(days=1) if not historical_data: logger.warning("No fake historical data generated") return pd.DataFrame() # Combine all dataframes combined_df = pd.concat(historical_data, ignore_index=False) logger.info(f"Generated fake historical data: {len(combined_df)} records from {start_date} to {end_date}") return combined_df except Exception as e: logger.error(f"Error generating fake historical data: {e}") return pd.DataFrame() def safe_extract(row: pd.DataFrame, key: str) -> int: return int(row.get(key, 0)) if pd.notna(row.get(key, 0)) else 0 def extract_model_data(row: pd.Series) -> tuple[dict[str, int], dict[str, int], int, int, int, int]: """Extract and process model data from DataFrame row.""" # Handle missing values and get counts directly from dataframe success_nvidia = safe_extract(row, "success_nvidia") success_amd = safe_extract(row, "success_amd") skipped_nvidia = safe_extract(row, "skipped_nvidia") skipped_amd = safe_extract(row, "skipped_amd") failed_multi_amd = safe_extract(row, 'failed_multi_no_amd') failed_multi_nvidia = safe_extract(row, 'failed_multi_no_nvidia') failed_single_amd = safe_extract(row, 'failed_single_no_amd') failed_single_nvidia = safe_extract(row, 'failed_single_no_nvidia') # Calculate total failures total_failed_amd = failed_multi_amd + failed_single_amd total_failed_nvidia = failed_multi_nvidia + failed_single_nvidia # Create stats dictionaries directly from dataframe values amd_stats = { 'passed': success_amd, 'failed': total_failed_amd, 'skipped': skipped_amd, 'error': 0 # Not available in this dataset } nvidia_stats = { 'passed': success_nvidia, 'failed': total_failed_nvidia, 'skipped': skipped_nvidia, 'error': 0 # Not available in this dataset } return amd_stats, nvidia_stats, failed_multi_amd, failed_single_amd, failed_multi_nvidia, failed_single_nvidia class CIResults: def __init__(self): self.df = pd.DataFrame() self.available_models = [] self.latest_update_msg = "" self.available_dates = [] self.historical_df = pd.DataFrame() self.all_historical_data = pd.DataFrame() # Store all historical data at startup self.sample_data = False def load_data(self) -> None: """Load data from the data source.""" # Try loading the distant data, and fall back on sample data for local tinkering try: logger.info("Loading distant data...") new_df, latest_update_msg = get_distant_data() self.latest_update_msg = latest_update_msg self.available_dates = get_available_dates() logger.info(f"Available dates: {len(self.available_dates)} dates") if self.available_dates: logger.info(f"Date range: {self.available_dates[-1]} to {self.available_dates[0]}") else: logger.warning("No available dates found") self.available_dates = [] except Exception as e: error_msg = [ "Loading data failed:", "-" * 120, traceback.format_exc(), "-" * 120, "Falling back on sample data." ] logger.error("\n".join(error_msg)) self.sample_data = True new_df, latest_update_msg = get_sample_data() self.latest_update_msg = latest_update_msg self.available_dates = None # Update attributes self.df = new_df self.available_models = new_df.index.tolist() # Load all historical data at startup self.load_all_historical_data() # Log and return distant load status logger.info(f"Data loaded successfully: {len(self.available_models)} models") logger.info(f"Models: {self.available_models[:5]}{'...' if len(self.available_models) > 5 else ''}") logger.info(f"Latest update message: {self.latest_update_msg}") # Log a preview of the df msg = {} for model in self.available_models[:3]: msg[model] = {} for col in self.df.columns: value = self.df.loc[model, col] if not isinstance(value, int): value = str(value) if len(value) > 10: value = value[:10] + "..." msg[model][col] = value logger.info(json.dumps(msg, indent=4)) def load_all_historical_data(self) -> None: """Load all available historical data at startup.""" try: if not self.available_dates: # Generate fake dates when no real dates are available fake_dates = [] today = datetime.now() for i in range(7): date = today - timedelta(days=i) fake_dates.append(date.strftime("%Y-%m-%d")) self.available_dates = fake_dates logger.info(f"No available dates found, generated {len(self.available_dates)} sample dates.") logger.info(f"Loading all historical data for {len(self.available_dates)} dates...") start_date = self.available_dates[-1] # Oldest date end_date = self.available_dates[0] # Newest date self.all_historical_data = get_historical_data(start_date, end_date, self.sample_data) logger.info(f"All historical data loaded: {len(self.all_historical_data)} records") except Exception as e: logger.error(f"Error loading all historical data: {e}") self.all_historical_data = pd.DataFrame() def load_historical_data(self, start_date: str, end_date: str) -> None: """Load historical data for a date range from pre-loaded data.""" try: logger.info(f"Filtering historical data from {start_date} to {end_date}") if self.all_historical_data.empty: logger.warning("No pre-loaded historical data available") self.historical_df = pd.DataFrame() return # Filter the pre-loaded data by date range start_dt = datetime.strptime(start_date, "%Y-%m-%d") end_dt = datetime.strptime(end_date, "%Y-%m-%d") # Filter data within the date range filtered_data = [] for date_str in self.all_historical_data['date'].unique(): date_dt = datetime.strptime(date_str, "%Y-%m-%d") if start_dt <= date_dt <= end_dt: date_data = self.all_historical_data[self.all_historical_data['date'] == date_str] filtered_data.append(date_data) if filtered_data: self.historical_df = pd.concat(filtered_data, ignore_index=False) logger.info(f"Historical data filtered: {len(self.historical_df)} records for {start_date} to {end_date}") else: self.historical_df = pd.DataFrame() logger.warning(f"No historical data found for date range {start_date} to {end_date}") except Exception as e: logger.error(f"Error filtering historical data: {e}") self.historical_df = pd.DataFrame() def schedule_data_reload(self): """Schedule the next data reload.""" def reload_data(): self.load_data() # Schedule the next reload in 15 minutes (900 seconds) timer = threading.Timer(900.0, reload_data) timer.daemon = True # Dies when main thread dies timer.start() logger.info("Next data reload scheduled in 15 minutes") # Start the first reload timer timer = threading.Timer(900.0, reload_data) timer.daemon = True timer.start() logger.info("Data auto-reload scheduled every 15 minutes")