|
import json |
|
import os |
|
import re |
|
import time |
|
from collections import defaultdict |
|
from dataclasses import asdict, dataclass |
|
from datetime import datetime |
|
from pathlib import Path |
|
|
|
from datasets import load_dataset |
|
from datasets.utils.metadata import MetadataConfigs |
|
from huggingface_hub import ( |
|
DatasetCard, |
|
DatasetCardData, |
|
HfApi, |
|
hf_hub_url, |
|
) |
|
from huggingface_hub.utils import build_hf_headers, get_session, hf_raise_for_status |
|
|
|
from lm_eval.utils import ( |
|
eval_logger, |
|
get_file_datetime, |
|
get_file_task_name, |
|
get_results_filenames, |
|
get_sample_results_filenames, |
|
handle_non_serializable, |
|
hash_string, |
|
sanitize_list, |
|
sanitize_model_name, |
|
sanitize_task_name, |
|
) |
|
|
|
|
|
@dataclass(init=False) |
|
class GeneralConfigTracker: |
|
""" |
|
Tracker for the evaluation parameters. |
|
|
|
Attributes: |
|
model_source (str): Source of the model (e.g. Hugging Face, GGUF, etc.) |
|
model_name (str): Name of the model. |
|
model_name_sanitized (str): Sanitized model name for directory creation. |
|
start_time (float): Start time of the experiment. Logged at class init. |
|
end_time (float): Start time of the experiment. Logged when calling [`GeneralConfigTracker.log_end_time`] |
|
total_evaluation_time_seconds (str): Inferred total evaluation time in seconds (from the start and end times). |
|
""" |
|
|
|
model_source: str = None |
|
model_name: str = None |
|
model_name_sanitized: str = None |
|
system_instruction: str = None |
|
system_instruction_sha: str = None |
|
fewshot_as_multiturn: bool = None |
|
chat_template: str = None |
|
chat_template_sha: str = None |
|
start_time: float = None |
|
end_time: float = None |
|
total_evaluation_time_seconds: str = None |
|
|
|
def __init__(self) -> None: |
|
"""Starts the evaluation timer.""" |
|
self.start_time = time.perf_counter() |
|
|
|
@staticmethod |
|
def _get_model_name(model_args: str) -> str: |
|
"""Extracts the model name from the model arguments.""" |
|
|
|
def extract_model_name(model_args: str, key: str) -> str: |
|
"""Extracts the model name from the model arguments using a key.""" |
|
args_after_key = model_args.split(key)[1] |
|
return args_after_key.split(",")[0] |
|
|
|
|
|
prefixes = ["peft=", "delta=", "pretrained=", "model=", "path=", "engine="] |
|
for prefix in prefixes: |
|
if prefix in model_args: |
|
return extract_model_name(model_args, prefix) |
|
return "" |
|
|
|
def log_experiment_args( |
|
self, |
|
model_source: str, |
|
model_args: str, |
|
system_instruction: str, |
|
chat_template: str, |
|
fewshot_as_multiturn: bool, |
|
) -> None: |
|
"""Logs model parameters and job ID.""" |
|
self.model_source = model_source |
|
self.model_name = GeneralConfigTracker._get_model_name(model_args) |
|
self.model_name_sanitized = sanitize_model_name(self.model_name) |
|
self.system_instruction = system_instruction |
|
self.system_instruction_sha = ( |
|
hash_string(system_instruction) if system_instruction else None |
|
) |
|
self.chat_template = chat_template |
|
self.chat_template_sha = hash_string(chat_template) if chat_template else None |
|
self.fewshot_as_multiturn = fewshot_as_multiturn |
|
|
|
def log_end_time(self) -> None: |
|
"""Logs the end time of the evaluation and calculates the total evaluation time.""" |
|
self.end_time = time.perf_counter() |
|
self.total_evaluation_time_seconds = str(self.end_time - self.start_time) |
|
|
|
|
|
class EvaluationTracker: |
|
""" |
|
Keeps track and saves relevant information of the evaluation process. |
|
Compiles the data from trackers and writes it to files, which can be published to the Hugging Face hub if requested. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
output_path: str = None, |
|
hub_results_org: str = "", |
|
hub_repo_name: str = "", |
|
details_repo_name: str = "", |
|
results_repo_name: str = "", |
|
push_results_to_hub: bool = False, |
|
push_samples_to_hub: bool = False, |
|
public_repo: bool = False, |
|
token: str = "", |
|
leaderboard_url: str = "", |
|
point_of_contact: str = "", |
|
gated: bool = False, |
|
) -> None: |
|
""" |
|
Creates all the necessary loggers for evaluation tracking. |
|
|
|
Args: |
|
output_path (str): Path to save the results. If not provided, the results won't be saved. |
|
hub_results_org (str): The Hugging Face organization to push the results to. If not provided, the results will be pushed to the owner of the Hugging Face token. |
|
hub_repo_name (str): The name of the Hugging Face repository to push the results to. If not provided, the results will be pushed to `lm-eval-results`. |
|
details_repo_name (str): The name of the Hugging Face repository to push the details to. If not provided, the results will be pushed to `lm-eval-results`. |
|
result_repo_name (str): The name of the Hugging Face repository to push the results to. If not provided, the results will not be pushed and will be found in the details_hub_repo. |
|
push_results_to_hub (bool): Whether to push the results to the Hugging Face hub. |
|
push_samples_to_hub (bool): Whether to push the samples to the Hugging Face hub. |
|
public_repo (bool): Whether to push the results to a public or private repository. |
|
token (str): Token to use when pushing to the Hugging Face hub. This token should have write access to `hub_results_org`. |
|
leaderboard_url (str): URL to the leaderboard on the Hugging Face hub on the dataset card. |
|
point_of_contact (str): Contact information on the Hugging Face hub dataset card. |
|
gated (bool): Whether to gate the repository. |
|
""" |
|
self.general_config_tracker = GeneralConfigTracker() |
|
|
|
self.output_path = output_path |
|
self.push_results_to_hub = push_results_to_hub |
|
self.push_samples_to_hub = push_samples_to_hub |
|
self.public_repo = public_repo |
|
self.leaderboard_url = leaderboard_url |
|
self.point_of_contact = point_of_contact |
|
self.api = HfApi(token=token) if token else None |
|
self.gated_repo = gated |
|
|
|
if not self.api and (push_results_to_hub or push_samples_to_hub): |
|
raise ValueError( |
|
"Hugging Face token is not defined, but 'push_results_to_hub' or 'push_samples_to_hub' is set to True. " |
|
"Please provide a valid Hugging Face token by setting the HF_TOKEN environment variable." |
|
) |
|
|
|
if ( |
|
self.api |
|
and hub_results_org == "" |
|
and (push_results_to_hub or push_samples_to_hub) |
|
): |
|
hub_results_org = self.api.whoami()["name"] |
|
eval_logger.warning( |
|
f"hub_results_org was not specified. Results will be pushed to '{hub_results_org}'." |
|
) |
|
|
|
if hub_repo_name == "": |
|
details_repo_name = ( |
|
details_repo_name if details_repo_name != "" else "lm-eval-results" |
|
) |
|
results_repo_name = ( |
|
results_repo_name if results_repo_name != "" else details_repo_name |
|
) |
|
else: |
|
details_repo_name = hub_repo_name |
|
results_repo_name = hub_repo_name |
|
eval_logger.warning( |
|
"hub_repo_name was specified. Both details and results will be pushed to the same repository. Using hub_repo_name is no longer recommended, details_repo_name and results_repo_name should be used instead." |
|
) |
|
|
|
self.details_repo = f"{hub_results_org}/{details_repo_name}" |
|
self.details_repo_private = f"{hub_results_org}/{details_repo_name}-private" |
|
self.results_repo = f"{hub_results_org}/{results_repo_name}" |
|
self.results_repo_private = f"{hub_results_org}/{results_repo_name}-private" |
|
|
|
def save_results_aggregated( |
|
self, |
|
results: dict, |
|
samples: dict, |
|
) -> None: |
|
""" |
|
Saves the aggregated results and samples to the output path and pushes them to the Hugging Face hub if requested. |
|
|
|
Args: |
|
results (dict): The aggregated results to save. |
|
samples (dict): The samples results to save. |
|
""" |
|
self.general_config_tracker.log_end_time() |
|
|
|
if self.output_path: |
|
try: |
|
eval_logger.info("Saving results aggregated") |
|
|
|
|
|
task_hashes = {} |
|
if samples: |
|
for task_name, task_samples in samples.items(): |
|
sample_hashes = [ |
|
s["doc_hash"] + s["prompt_hash"] + s["target_hash"] |
|
for s in task_samples |
|
] |
|
task_hashes[task_name] = hash_string("".join(sample_hashes)) |
|
|
|
|
|
results.update({"task_hashes": task_hashes}) |
|
results.update(asdict(self.general_config_tracker)) |
|
dumped = json.dumps( |
|
results, |
|
indent=2, |
|
default=handle_non_serializable, |
|
ensure_ascii=False, |
|
) |
|
|
|
path = Path(self.output_path if self.output_path else Path.cwd()) |
|
path = path.joinpath(self.general_config_tracker.model_name_sanitized) |
|
path.mkdir(parents=True, exist_ok=True) |
|
|
|
self.date_id = datetime.now().isoformat().replace(":", "-") |
|
file_results_aggregated = path.joinpath(f"results_{self.date_id}.json") |
|
file_results_aggregated.open("w", encoding="utf-8").write(dumped) |
|
|
|
if self.api and self.push_results_to_hub: |
|
repo_id = ( |
|
self.results_repo |
|
if self.public_repo |
|
else self.results_repo_private |
|
) |
|
self.api.create_repo( |
|
repo_id=repo_id, |
|
repo_type="dataset", |
|
private=not self.public_repo, |
|
exist_ok=True, |
|
) |
|
self.api.upload_file( |
|
repo_id=repo_id, |
|
path_or_fileobj=str( |
|
path.joinpath(f"results_{self.date_id}.json") |
|
), |
|
path_in_repo=os.path.join( |
|
self.general_config_tracker.model_name, |
|
f"results_{self.date_id}.json", |
|
), |
|
repo_type="dataset", |
|
commit_message=f"Adding aggregated results for {self.general_config_tracker.model_name}", |
|
) |
|
eval_logger.info( |
|
"Successfully pushed aggregated results to the Hugging Face Hub. " |
|
f"You can find them at: {repo_id}" |
|
) |
|
|
|
except Exception as e: |
|
eval_logger.warning("Could not save results aggregated") |
|
eval_logger.info(repr(e)) |
|
else: |
|
eval_logger.info( |
|
"Output path not provided, skipping saving results aggregated" |
|
) |
|
|
|
def save_results_samples( |
|
self, |
|
task_name: str, |
|
samples: dict, |
|
) -> None: |
|
""" |
|
Saves the samples results to the output path and pushes them to the Hugging Face hub if requested. |
|
|
|
Args: |
|
task_name (str): The task name to save the samples for. |
|
samples (dict): The samples results to save. |
|
""" |
|
if self.output_path: |
|
try: |
|
eval_logger.info(f"Saving per-sample results for: {task_name}") |
|
|
|
path = Path(self.output_path if self.output_path else Path.cwd()) |
|
path = path.joinpath(self.general_config_tracker.model_name_sanitized) |
|
path.mkdir(parents=True, exist_ok=True) |
|
|
|
file_results_samples = path.joinpath( |
|
f"samples_{task_name}_{self.date_id}.jsonl" |
|
) |
|
|
|
for sample in samples: |
|
|
|
|
|
|
|
arguments = {} |
|
for i, arg in enumerate(sample["arguments"]): |
|
arguments[f"gen_args_{i}"] = {} |
|
for j, tmp in enumerate(arg): |
|
arguments[f"gen_args_{i}"][f"arg_{j}"] = tmp |
|
|
|
sample["resps"] = sanitize_list(sample["resps"]) |
|
sample["filtered_resps"] = sanitize_list(sample["filtered_resps"]) |
|
sample["arguments"] = arguments |
|
sample["target"] = str(sample["target"]) |
|
|
|
sample_dump = ( |
|
json.dumps( |
|
sample, |
|
default=handle_non_serializable, |
|
ensure_ascii=False, |
|
) |
|
+ "\n" |
|
) |
|
|
|
with open(file_results_samples, "a", encoding="utf-8") as f: |
|
f.write(sample_dump) |
|
|
|
if self.api and self.push_samples_to_hub: |
|
repo_id = ( |
|
self.details_repo |
|
if self.public_repo |
|
else self.details_repo_private |
|
) |
|
self.api.create_repo( |
|
repo_id=repo_id, |
|
repo_type="dataset", |
|
private=not self.public_repo, |
|
exist_ok=True, |
|
) |
|
try: |
|
if self.gated_repo: |
|
headers = build_hf_headers() |
|
r = get_session().put( |
|
url=f"https://huggingface.co/api/datasets/{repo_id}/settings", |
|
headers=headers, |
|
json={"gated": "auto"}, |
|
) |
|
hf_raise_for_status(r) |
|
except Exception as e: |
|
eval_logger.warning("Could not gate the repository") |
|
eval_logger.info(repr(e)) |
|
self.api.upload_folder( |
|
repo_id=repo_id, |
|
folder_path=str(path), |
|
path_in_repo=self.general_config_tracker.model_name_sanitized, |
|
repo_type="dataset", |
|
commit_message=f"Adding samples results for {task_name} to {self.general_config_tracker.model_name}", |
|
) |
|
eval_logger.info( |
|
f"Successfully pushed sample results for task: {task_name} to the Hugging Face Hub. " |
|
f"You can find them at: {repo_id}" |
|
) |
|
|
|
except Exception as e: |
|
eval_logger.warning("Could not save sample results") |
|
eval_logger.info(repr(e)) |
|
else: |
|
eval_logger.info("Output path not provided, skipping saving sample results") |
|
|
|
def recreate_metadata_card(self) -> None: |
|
""" |
|
Creates a metadata card for the evaluation results dataset and pushes it to the Hugging Face hub. |
|
""" |
|
|
|
eval_logger.info("Recreating metadata card") |
|
repo_id = self.details_repo if self.public_repo else self.details_repo_private |
|
|
|
files_in_repo = self.api.list_repo_files(repo_id=repo_id, repo_type="dataset") |
|
results_files = get_results_filenames(files_in_repo) |
|
sample_files = get_sample_results_filenames(files_in_repo) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
latest_task_results_datetime = defaultdict(lambda: datetime.min.isoformat()) |
|
|
|
for file_path in sample_files: |
|
file_path = Path(file_path) |
|
filename = file_path.name |
|
model_name = file_path.parent |
|
task_name = get_file_task_name(filename) |
|
results_datetime = get_file_datetime(filename) |
|
task_name_sanitized = sanitize_task_name(task_name) |
|
|
|
samples_key = f"{model_name}__{task_name_sanitized}" |
|
results_key = f"{model_name}__results" |
|
latest_datetime = max( |
|
latest_task_results_datetime[samples_key], |
|
results_datetime, |
|
) |
|
latest_task_results_datetime[samples_key] = latest_datetime |
|
latest_task_results_datetime[results_key] = max( |
|
latest_task_results_datetime[results_key], |
|
latest_datetime, |
|
) |
|
|
|
|
|
card_metadata = MetadataConfigs() |
|
|
|
|
|
for file_path in results_files: |
|
file_path = Path(file_path) |
|
results_filename = file_path.name |
|
model_name = file_path.parent |
|
eval_date = get_file_datetime(results_filename) |
|
eval_date_sanitized = re.sub(r"[^\w\.]", "_", eval_date) |
|
results_filename = Path("**") / Path(results_filename).name |
|
config_name = f"{model_name}__results" |
|
sanitized_last_eval_date_results = re.sub( |
|
r"[^\w\.]", "_", latest_task_results_datetime[config_name] |
|
) |
|
|
|
if eval_date_sanitized == sanitized_last_eval_date_results: |
|
|
|
current_results = card_metadata.get(config_name, {"data_files": []}) |
|
current_results["data_files"].append( |
|
{"split": eval_date_sanitized, "path": [str(results_filename)]} |
|
) |
|
card_metadata[config_name] = current_results |
|
|
|
card_metadata[config_name]["data_files"].append( |
|
{"split": "latest", "path": [str(results_filename)]} |
|
) |
|
|
|
|
|
for file_path in sample_files: |
|
file_path = Path(file_path) |
|
filename = file_path.name |
|
model_name = file_path.parent |
|
task_name = get_file_task_name(filename) |
|
eval_date = get_file_datetime(filename) |
|
task_name_sanitized = sanitize_task_name(task_name) |
|
eval_date_sanitized = re.sub(r"[^\w\.]", "_", eval_date) |
|
results_filename = Path("**") / Path(filename).name |
|
config_name = f"{model_name}__{task_name_sanitized}" |
|
sanitized_last_eval_date_results = re.sub( |
|
r"[^\w\.]", "_", latest_task_results_datetime[config_name] |
|
) |
|
if eval_date_sanitized == sanitized_last_eval_date_results: |
|
|
|
current_details_for_task = card_metadata.get( |
|
config_name, {"data_files": []} |
|
) |
|
current_details_for_task["data_files"].append( |
|
{"split": eval_date_sanitized, "path": [str(results_filename)]} |
|
) |
|
card_metadata[config_name] = current_details_for_task |
|
|
|
card_metadata[config_name]["data_files"].append( |
|
{"split": "latest", "path": [str(results_filename)]} |
|
) |
|
|
|
|
|
latest_datetime = max(latest_task_results_datetime.values()) |
|
latest_model_name = max( |
|
latest_task_results_datetime, key=lambda k: latest_task_results_datetime[k] |
|
) |
|
last_results_file = [ |
|
f for f in results_files if latest_datetime.replace(":", "-") in f |
|
][0] |
|
last_results_file_path = hf_hub_url( |
|
repo_id=repo_id, filename=last_results_file, repo_type="dataset" |
|
) |
|
latest_results_file = load_dataset( |
|
"json", data_files=last_results_file_path, split="train" |
|
) |
|
results_dict = latest_results_file["results"][0] |
|
new_dictionary = {"all": results_dict} |
|
new_dictionary.update(results_dict) |
|
results_string = json.dumps(new_dictionary, indent=4) |
|
|
|
dataset_summary = ( |
|
"Dataset automatically created during the evaluation run of model " |
|
) |
|
if self.general_config_tracker.model_source == "hf": |
|
dataset_summary += f"[{self.general_config_tracker.model_name}](https://huggingface.co/{self.general_config_tracker.model_name})\n" |
|
else: |
|
dataset_summary += f"{self.general_config_tracker.model_name}\n" |
|
dataset_summary += ( |
|
f"The dataset is composed of {len(card_metadata) - 1} configuration(s), each one corresponding to one of the evaluated task.\n\n" |
|
f"The dataset has been created from {len(results_files)} run(s). Each run can be found as a specific split in each " |
|
'configuration, the split being named using the timestamp of the run.The "train" split is always pointing to the latest results.\n\n' |
|
'An additional configuration "results" store all the aggregated results of the run.\n\n' |
|
"To load the details from a run, you can for instance do the following:\n" |
|
) |
|
if self.general_config_tracker.model_source == "hf": |
|
dataset_summary += ( |
|
"```python\nfrom datasets import load_dataset\n" |
|
f'data = load_dataset(\n\t"{repo_id}",\n\tname="{latest_model_name}",\n\tsplit="latest"\n)\n```\n\n' |
|
) |
|
dataset_summary += ( |
|
"## Latest results\n\n" |
|
f"These are the [latest results from run {latest_datetime}]({last_results_file_path.replace('/resolve/', '/blob/')}) " |
|
"(note that there might be results for other tasks in the repos if successive evals didn't cover the same tasks. " |
|
'You find each in the results and the "latest" split for each eval):\n\n' |
|
f"```python\n{results_string}\n```" |
|
) |
|
card_data = DatasetCardData( |
|
dataset_summary=dataset_summary, |
|
repo_url=f"https://huggingface.co/{self.general_config_tracker.model_name}", |
|
pretty_name=f"Evaluation run of {self.general_config_tracker.model_name}", |
|
leaderboard_url=self.leaderboard_url, |
|
point_of_contact=self.point_of_contact, |
|
) |
|
card_metadata.to_dataset_card_data(card_data) |
|
card = DatasetCard.from_template( |
|
card_data, |
|
pretty_name=card_data.pretty_name, |
|
) |
|
card.push_to_hub(repo_id, repo_type="dataset") |
|
|