# leaderboard/refresh.py import json import logging from pathlib import Path from typing import Any, Dict, List, Optional import pandas as pd import yaml # --- Logging Setup --- logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(module)s - %(message)s" ) logger = logging.getLogger(__name__) # --- Path Definitions --- SCRIPT_DIR = Path(__file__).resolve().parent PROJECT_ROOT = SCRIPT_DIR.parent # --- Default Input/Output Paths --- DEFAULT_MODELS_FOLDER = PROJECT_ROOT.parent / "llm-leaderboard/models_info" DEFAULT_RESULTS_FOLDER = PROJECT_ROOT.parent / "llm-leaderboard/results" OUTPUT_FOLDER = SCRIPT_DIR / "boards_data" CONFIG_FILE_PATH = SCRIPT_DIR / "leaderboard_config.yaml" TEMPLATE_FOLDER = SCRIPT_DIR / "template_jsons" # --- Constants for Subtask Processing --- NLU_NLG_TASK_KEYS = ["persian_nlu", "persian_nlg"] ALL_LEADERBOARD_COLUMNS = [ 'Model Name', 'thinking_method', 'model_url', 'parameters_count', 'source_type', 'Average', 'Persian IFEval', 'Persian MT-Bench', "PerMMLU", "PerCoR", "Persian NLU", "Persian NLG" ] def load_tasks_from_config(config_path: Path) -> Dict[str, str]: if not config_path.exists(): logger.error(f"Configuration file not found: {config_path}. Cannot load tasks.") return {} try: with open(config_path, 'r', encoding='utf-8') as f: config_data = yaml.safe_load(f) tasks_from_config = config_data.get('task_display_names', {}) if not isinstance(tasks_from_config, dict): logger.error(f"'task_display_names' in {config_path} is not a dictionary.") return {} processed_tasks = {k: v for k, v in tasks_from_config.items() if str(k).lower() != 'all'} if not processed_tasks: logger.warning(f"No tasks in {config_path} under 'task_display_names' (excluding 'all').") return processed_tasks except Exception as e: logger.error(f"Error loading config {config_path}: {e}") return {} class ModelEvaluationProcessor: def __init__( self, models_info_path: Path, results_base_path: Path, output_path: Path, template_jsons_path: Path, ) -> None: self.models_info_path = models_info_path self.results_base_path = results_base_path self.output_path = output_path self.template_folder = template_jsons_path self.output_path.mkdir(parents=True, exist_ok=True) self.tasks_config = load_tasks_from_config(CONFIG_FILE_PATH) if not self.tasks_config: logger.error("Tasks config is empty. Processing might be affected.") self.model_display_configs: Dict[str, Dict[str, Any]] = {} try: with open(CONFIG_FILE_PATH, 'r', encoding='utf-8') as f: config_data = yaml.safe_load(f) self.model_display_configs = config_data.get('model_display_configs', {}) except Exception as e: logger.error(f"Error loading model_display_configs from {CONFIG_FILE_PATH}: {e}") self.main_scores_map = { "ifeval": "strict_instruction_accuracy", "mt_bench": "score_mean", "MMLU": "acc", "persian_csr": "acc", "persian_nlg": "nlg_score", "persian_nlu": "nlu_score", } def _load_template(self, task_key: str) -> Dict[str, Any]: path = self.template_folder / f"{task_key}.json" try: return json.loads(path.read_text(encoding="utf-8")) except FileNotFoundError: logger.warning(f"Template file not found for task_key {task_key} at {path}. Using empty template.") return {} except Exception as e: logger.error(f"Cannot load template for task_key {task_key} from {path}: {e}") return {} def _deep_override(self, base: Any, override: Any) -> Any: if isinstance(base, dict) and isinstance(override, dict): merged = {} for k, v_base in base.items(): if k in override and override[k] is not None and override[k] != -1: merged[k] = self._deep_override(v_base, override[k]) else: merged[k] = v_base return merged elif override is not None and override != -1: return override else: return base def _load_model_raw_results(self, model_folder_name: str, task_key: str) -> Dict[str, Any]: results_filename = f"{model_folder_name}___{task_key}.json" results_file_path = self.results_base_path / results_filename if results_file_path.exists(): try: with open(results_file_path, 'r', encoding='utf-8') as f: data = json.load(f) return data if isinstance(data, dict) else {} except json.JSONDecodeError as e: logger.error(f"JSONDecodeError for model '{model_folder_name}', task_key '{task_key}' from {results_file_path}: {e}") except Exception as e: logger.error(f"Error loading results for model '{model_folder_name}', task_key '{task_key}' from {results_file_path}: {e}") else: logger.warning(f"Results file not found for model '{model_folder_name}', task_key '{task_key}' at {results_file_path}") return {} def load_and_fill_task_results(self, model_folder_name: str, task_key: str) -> Dict[str, Any]: template = self._load_template(task_key) raw_results = self._load_model_raw_results(model_folder_name, task_key) return self._deep_override(template, raw_results) def clean_previous_subtask_files(self) -> None: logger.info("Cleaning previous NLU/NLG subtask JSONL files...") for task_key_prefix in NLU_NLG_TASK_KEYS: for result_file in self.results_base_path.glob(f"*___{task_key_prefix}.json"): try: task_data_content = result_file.read_text(encoding="utf-8") if not task_data_content.strip(): logger.debug(f"Skipping empty result file for subtask cleaning: {result_file}") continue task_data = json.loads(task_data_content) main_score_for_this_task_prefix = self.main_scores_map.get(task_key_prefix) for subtask_name in task_data: if subtask_name == main_score_for_this_task_prefix: continue if isinstance(task_data.get(subtask_name), dict): subtask_output_path = self.output_path / f"{subtask_name}.jsonl" if subtask_output_path.exists(): subtask_output_path.unlink() logger.info(f"Deleted previous subtask file: {subtask_output_path}") except json.JSONDecodeError as e: logger.warning(f"Failed to decode JSON for subtask cleaning from {result_file}: {e}") except Exception as e: logger.warning(f"Failed to inspect/delete subtask files based on {result_file}: {e}") def _process_subtask_data(self, task_results: Dict[str, Any], base_model_info: Dict[str, Any], parent_task_main_score_key: Optional[str], parent_task_key_for_log: str) -> None: parent_task_main_score_value = task_results.get(parent_task_main_score_key) if parent_task_main_score_key else None for subtask_name, subtask_scores_dict in task_results.items(): if subtask_name == parent_task_main_score_key: continue if not isinstance(subtask_scores_dict, dict): logger.debug(f"Skipping entry '{subtask_name}' in '{parent_task_key_for_log}': not a dictionary of subtask scores.") continue row_data = base_model_info.copy() row_data.update(subtask_scores_dict) if parent_task_main_score_key: row_data[parent_task_main_score_key] = parent_task_main_score_value subtask_output_file = f"{subtask_name}.jsonl" subtask_output_path = self.output_path / subtask_output_file try: current_entries = [] if subtask_output_path.exists(): existing_df = pd.read_json(subtask_output_path, lines=True) if not existing_df.empty and 'Model Name' in existing_df.columns: current_entries = existing_df[existing_df['Model Name'] != row_data['Model Name']].to_dict(orient='records') current_entries.append(row_data) updated_df = pd.DataFrame(current_entries) updated_df.to_json(subtask_output_path, orient="records", lines=True, force_ascii=False) logger.debug(f"Updated subtask file: {subtask_output_path} for model {base_model_info.get('Model Name')}, parent task {parent_task_key_for_log}") except Exception as e: logger.error(f"Error updating subtask file {subtask_output_path} for parent {parent_task_key_for_log}: {e}") def process_nlu_nlg_subtasks(self, model_details: Dict[str, Any], model_folder_name: str, canonical_model_name: str) -> None: model_config = self.model_display_configs.get(canonical_model_name, {}) thinking_method = model_config.get('thinking', 'N/A') common_subtask_model_info = { "Model Name": canonical_model_name, "thinking_method": thinking_method, "model_url": model_details.get('model_url', model_details.get('link', model_details.get('homepage', 'https://google.com'))), "parameters_count": str(model_details.get('n_parameters', "N/A")), "source_type": "Closed-Source" # Default, will be refined } parameters_count_raw = model_details.get('n_parameters', None) if parameters_count_raw is not None: is_open_source_candidate = False if isinstance(parameters_count_raw, (int, float)) and parameters_count_raw > 0: is_open_source_candidate = True elif isinstance(parameters_count_raw, str) and \ str(parameters_count_raw).strip().lower() not in ["", "n/a", "unknown", "private", "confidential", "tbd", "null", "closed"]: is_open_source_candidate = True common_subtask_model_info["source_type"] = "Open-Source" if is_open_source_candidate else "Closed-Source" for task_key_for_subtasks in NLU_NLG_TASK_KEYS: if task_key_for_subtasks not in self.tasks_config: logger.debug(f"Subtask processing for '{task_key_for_subtasks}' skipped: not in tasks_config.") continue logger.info(f"Processing subtasks for '{task_key_for_subtasks}' for model '{canonical_model_name}'...") parent_task_full_results = self.load_and_fill_task_results(model_folder_name, task_key_for_subtasks) main_score_key_for_parent_task = self.main_scores_map.get(task_key_for_subtasks) if not main_score_key_for_parent_task: logger.warning(f"No main score key in main_scores_map for parent task '{task_key_for_subtasks}'.") self._process_subtask_data( parent_task_full_results, common_subtask_model_info, main_score_key_for_parent_task, task_key_for_subtasks ) def process_models(self) -> Dict[str, pd.DataFrame]: processed_task_data: Dict[str, List[Dict[str, Any]]] = {task_key: [] for task_key in self.tasks_config.keys()} all_models_summary_data: List[Dict[str, Any]] = [] if not self.models_info_path.exists() or not self.models_info_path.is_dir(): logger.critical(f"Configured MODELS_FOLDER path does not exist or is not a directory: {self.models_info_path}") empty_dfs = {key: pd.DataFrame() for key in self.tasks_config.keys()} empty_dfs["all"] = pd.DataFrame() return empty_dfs model_info_files = list(self.models_info_path.glob("*.json")) if not model_info_files: logger.warning(f"No model info files (*.json) found in {self.models_info_path}. No models will be processed.") empty_dfs = {key: pd.DataFrame() for key in self.tasks_config.keys()} empty_dfs["all"] = pd.DataFrame() return empty_dfs for model_info_file in model_info_files: model_folder_name = model_info_file.stem if 'o4-mini' in model_folder_name: continue try: with open(model_info_file, 'r', encoding='utf-8') as f: model_details = json.load(f) canonical_model_name = model_details.get('name_for_leaderboard', model_details.get('model_hf_id', model_details.get('name', model_folder_name))) model_url = model_details.get('model_url', model_details.get('link', model_details.get('homepage', 'https_google.com'))) if not model_url: model_url = 'https_google.com' parameters_count_raw = model_details.get('n_parameters', None) parameters_count_display = str(parameters_count_raw) if parameters_count_raw is not None else "N/A" source_type = "Closed-Source" if parameters_count_raw is not None: is_open_source_candidate = False if isinstance(parameters_count_raw, (int, float)) and parameters_count_raw > 0: is_open_source_candidate = True elif isinstance(parameters_count_raw, str) and \ str(parameters_count_raw).strip().lower() not in ["", "n/a", "unknown", "private", "confidential", "tbd", "null", "closed"]: is_open_source_candidate = True source_type = "Open-Source" if is_open_source_candidate else "Closed-Source" model_config = self.model_display_configs.get(canonical_model_name, {}) thinking_method = model_config.get('thinking', 'N/A') except Exception as e: logger.error(f"Error loading/parsing model info from {model_info_file}: {e}. Skipping '{model_folder_name}'.") continue logger.info(f"Processing model: {canonical_model_name} (source ID: {model_folder_name})") current_model_scores_for_summary: Dict[str, Any] = { "Model Name": canonical_model_name, "thinking_method": thinking_method, "model_url": model_url, "parameters_count": parameters_count_display, "source_type": source_type } for task_key, task_display_name in self.tasks_config.items(): task_specific_results = self.load_and_fill_task_results(model_folder_name, task_key) main_score_metric_name = self.main_scores_map.get(task_key) task_data_entry_for_specific_jsonl: Dict[str, Any] = { "Model Name": canonical_model_name, "thinking_method": thinking_method, "model_url": model_url, "parameters_count": parameters_count_display, "source_type": source_type } if isinstance(task_specific_results, dict) and task_specific_results: for metric, value in task_specific_results.items(): task_data_entry_for_specific_jsonl[metric] = value if main_score_metric_name and main_score_metric_name in task_specific_results: score_value = task_specific_results[main_score_metric_name] if task_key == "mt_bench" and score_value is not None: try: score_value = float(score_value) / 10.0 except (ValueError, TypeError): logger.warning(f"Could not convert mt_bench score '{score_value}' to float for division for model {canonical_model_name}") score_value = pd.NA current_model_scores_for_summary[task_display_name] = score_value elif main_score_metric_name: logger.warning(f"Main score metric '{main_score_metric_name}' for task '{task_key}' (Display: {task_display_name}) not found for model '{canonical_model_name}'. Will be NA.") current_model_scores_for_summary[task_display_name] = pd.NA task_data_entry_for_specific_jsonl[main_score_metric_name] = pd.NA else: logger.warning(f"No valid results data for model '{canonical_model_name}', task_key '{task_key}'. Scores will be NA.") if main_score_metric_name: task_data_entry_for_specific_jsonl[main_score_metric_name] = pd.NA current_model_scores_for_summary[task_display_name] = pd.NA processed_task_data[task_key].append(task_data_entry_for_specific_jsonl) all_models_summary_data.append(current_model_scores_for_summary) self.process_nlu_nlg_subtasks(model_details, model_folder_name, canonical_model_name) final_dataframes: Dict[str, pd.DataFrame] = {} for task_key, data_list in processed_task_data.items(): df = pd.DataFrame(data_list) if data_list else pd.DataFrame() main_score_col = self.main_scores_map.get(task_key) if not df.empty and main_score_col and main_score_col in df.columns: try: df[main_score_col] = pd.to_numeric(df[main_score_col], errors='coerce') df = df.sort_values(by=main_score_col, ascending=False, na_position='last') except Exception as e: logger.warning(f"Could not sort dataframe for task {task_key} by score {main_score_col}: {e}") final_dataframes[task_key] = df if df.empty: logger.warning(f"No data processed for task '{task_key}'. Resulting DataFrame is empty.") if all_models_summary_data: all_df = pd.DataFrame(all_models_summary_data) score_cols_for_average = [] for _, task_display_name_for_avg in self.tasks_config.items(): if task_display_name_for_avg in all_df.columns: numeric_col = pd.to_numeric(all_df[task_display_name_for_avg], errors='coerce') if numeric_col.notna().any(): # Check if there is at least one non-NA numeric value all_df[task_display_name_for_avg] = numeric_col score_cols_for_average.append(task_display_name_for_avg) else: # All values are NA or non-numeric all_df[task_display_name_for_avg] = pd.NA # Ensure column is NA if not usable logger.warning(f"Column '{task_display_name_for_avg}' for averaging in 'all' table is not numeric or all NaN. Excluding from average calculation and setting to NA.") if score_cols_for_average: try: all_df["Average"] = all_df[score_cols_for_average].mean(axis=1, skipna=False) all_df.loc[all_df["Average"].notna(), "Average"] = all_df.loc[all_df["Average"].notna(), "Average"].round(4) except Exception as e: logger.error(f"Error calculating 'Average' for 'all' table: {e}. Average column might be NA or incorrect.") all_df["Average"] = pd.NA # Fallback to NA else: logger.warning("No valid numeric score columns found to calculate 'Average' for 'all' table.") all_df["Average"] = pd.NA # Assign pd.NA if no columns to average if "Average" in all_df.columns: # Check if 'Average' column exists all_df = all_df.sort_values(by="Average", ascending=False, na_position='last') existing_cols_in_order = [col for col in ALL_LEADERBOARD_COLUMNS if col in all_df.columns] other_cols = [col for col in all_df.columns if col not in existing_cols_in_order] all_df = all_df[existing_cols_in_order + other_cols] final_dataframes["all"] = all_df else: final_dataframes["all"] = pd.DataFrame() logger.warning("No summary data collected for the 'all' table.") return final_dataframes def save_dataframe_as_jsonl(self, df: pd.DataFrame, filename_base: str) -> None: if df is None or df.empty: logger.warning(f"DataFrame for '{filename_base}.jsonl' is empty or None. Skipping save.") return output_file_path = self.output_path / f"{filename_base}.jsonl" try: df.to_json(output_file_path, orient="records", lines=True, force_ascii=False, index=False) logger.info(f"Saved data to {output_file_path}") except Exception as e: logger.error(f"Failed to save DataFrame to {output_file_path}: {e}") def run(self) -> None: logger.info("Starting data processing pipeline in ModelEvaluationProcessor...") self.clean_previous_subtask_files() processed_dataframes = self.process_models() for task_key_or_name, df in processed_dataframes.items(): self.save_dataframe_as_jsonl(df, task_key_or_name) logger.info("Data processing pipeline completed successfully!") def main() -> None: models_folder_to_use = DEFAULT_MODELS_FOLDER results_folder_to_use = DEFAULT_RESULTS_FOLDER template_folder_to_use = TEMPLATE_FOLDER logger.info(f"Refresh script running from: {SCRIPT_DIR}") logger.info(f"CONFIGURED Input 'models_info' Path: {models_folder_to_use}") logger.info(f"CONFIGURED Input 'results' Path: {results_folder_to_use}") logger.info(f"CONFIGURED Input 'template_jsons' Path: {template_folder_to_use}") logger.info(f"Outputting processed data to (inside 'leaderboard' dir): {OUTPUT_FOLDER}") logger.info(f"Using configuration file (inside 'leaderboard' dir): {CONFIG_FILE_PATH}") if not CONFIG_FILE_PATH.exists(): logger.critical(f"CRITICAL: Config file not found at {CONFIG_FILE_PATH}. Ensure '{CONFIG_FILE_PATH.name}' exists in '{SCRIPT_DIR}'.") return if not models_folder_to_use.exists() or not models_folder_to_use.is_dir(): logger.critical(f"CRITICAL: Input 'models_info' directory not found at {models_folder_to_use} or is not a directory.") return if not results_folder_to_use.exists() or not results_folder_to_use.is_dir(): logger.critical(f"CRITICAL: Input 'results' directory not found at {results_folder_to_use} or is not a directory.") return if not template_folder_to_use.exists() or not template_folder_to_use.is_dir(): logger.warning(f"WARNING: 'template_jsons' directory not found at {template_folder_to_use}. Template filling might not work as expected.") try: processor = ModelEvaluationProcessor( models_info_path=models_folder_to_use, results_base_path=results_folder_to_use, output_path=OUTPUT_FOLDER, template_jsons_path=template_folder_to_use, ) processor.run() except Exception as e: logger.error(f"Unhandled exception in main: {e}", exc_info=True) if __name__ == "__main__": main()