File size: 9,014 Bytes
9d5b280 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
import argparse
import json
import os
import re
from pathlib import Path
import pandas as pd
from zeno_client import ZenoClient, ZenoMetric
from lm_eval.utils import (
eval_logger,
get_latest_filename,
get_results_filenames,
get_sample_results_filenames,
)
def parse_args():
parser = argparse.ArgumentParser(
description="Upload your data to the Zeno AI evaluation platform to visualize results. This requires a ZENO_API_KEY in your environment variables. The eleuther harness must be run with log_samples=True and an output_path set for data to be written to disk."
)
parser.add_argument(
"--data_path",
required=True,
help="Where to find the results of the benchmarks that have been run. Uses the name of each subfolder as the model name.",
)
parser.add_argument(
"--project_name",
required=True,
help="The name of the generated Zeno project.",
)
return parser.parse_args()
def main():
"""Upload the results of your benchmark tasks to the Zeno AI evaluation platform.
This scripts expects your results to live in a data folder where subfolders contain results of individual models.
"""
args = parse_args()
client = ZenoClient(os.environ["ZENO_API_KEY"])
# Get all model subfolders from the parent data folder.
models = [
os.path.basename(os.path.normpath(f))
for f in os.scandir(Path(args.data_path))
if f.is_dir()
]
assert len(models) > 0, "No model directories found in the data_path."
# Get the tasks from the latest results file of the first model.
tasks = set(tasks_for_model(models[0], args.data_path))
# Get tasks names from the latest results file for each model
# Get intersection of tasks for all models
for model in models:
old_tasks = tasks.copy()
task_count = len(tasks)
model_tasks = set(tasks_for_model(model, args.data_path))
tasks.intersection(set(model_tasks))
if task_count != len(tasks):
eval_logger.warning(
f"All models must have the same tasks. {model} has tasks: {model_tasks} but have already recorded tasks: {old_tasks}. Taking intersection {tasks}"
)
assert len(tasks) > 0, (
"Must provide at least one task in common amongst models to compare."
)
for task in tasks:
# Upload data for all models
for model_index, model in enumerate(models):
# Get latest results and sample results for a model
model_dir = Path(args.data_path, model)
model_files = [f.as_posix() for f in model_dir.iterdir() if f.is_file()]
model_results_filenames = get_results_filenames(model_files)
model_sample_filenames = get_sample_results_filenames(model_files)
latest_results = get_latest_filename(
[Path(f).name for f in model_results_filenames]
)
latest_sample_results = get_latest_filename(
[Path(f).name for f in model_sample_filenames if task in f]
)
model_args = re.sub(
r"[\"<>:/\|\\?\*\[\]]+",
"__",
json.load(
open(Path(args.data_path, model, latest_results), encoding="utf-8")
)["config"]["model_args"],
)
print(model_args)
data = []
with open(
Path(args.data_path, model, latest_sample_results),
"r",
encoding="utf-8",
) as file:
for line in file:
data.append(json.loads(line.strip()))
configs = json.load(
open(Path(args.data_path, model, latest_results), encoding="utf-8")
)["configs"]
config = configs[task]
if model_index == 0: # Only need to assemble data for the first model
metrics = []
for metric in config["metric_list"]:
if metric.get("aggregation") == "mean":
metrics.append(
ZenoMetric(
name=metric["metric"],
type="mean",
columns=[metric["metric"]],
)
)
project = client.create_project(
name=args.project_name + (f"_{task}" if len(tasks) > 1 else ""),
view="text-classification",
metrics=metrics,
)
project.upload_dataset(
generate_dataset(data, config),
id_column="id",
data_column="data",
label_column="labels",
)
project.upload_system(
generate_system_df(data, config),
name=model,
id_column="id",
output_column="output",
)
def tasks_for_model(model: str, data_path: str):
"""Get the tasks for a specific model.
Args:
model (str): The name of the model.
data_path (str): The path to the data.
Returns:
list: A list of tasks for the model.
"""
# get latest model results for a given name
model_dir = Path(data_path, model)
model_files = [f.as_posix() for f in model_dir.iterdir() if f.is_file()]
model_results_filenames = get_results_filenames(model_files)
latest_results = get_latest_filename(model_results_filenames)
config = (json.load(open(latest_results, encoding="utf-8"))["configs"],)
return list(config[0].keys())
def generate_dataset(
data,
config,
):
"""Generate a Zeno dataset from evaluation data.
Args:
data: The data to generate a dataset for.
config: The configuration of the task.
Returns:
pd.Dataframe: A dataframe that is ready to be uploaded to Zeno.
"""
ids = (
[x["doc_id"] for x in data]
if not config.get("filter_list")
else [f"{x['doc_id']}.{x['filter']}" for x in data]
)
labels = [x["target"] for x in data]
instance = [""] * len(ids)
if config["output_type"] == "loglikelihood":
instance = [x["arguments"]["gen_args_0"]["arg_0"] for x in data]
labels = [x["arguments"]["gen_args_0"]["arg_1"] for x in data]
elif config["output_type"] == "multiple_choice":
instance = [
x["arguments"]["gen_args_0"]["arg_0"]
+ "\n\n"
+ "\n".join([f"- {y[1]}" for y in x["arguments"]])
for x in data
]
elif config["output_type"] == "loglikelihood_rolling":
instance = [x["arguments"]["gen_args_0"]["arg_0"] for x in data]
elif config["output_type"] == "generate_until":
instance = [x["arguments"]["gen_args_0"]["arg_0"] for x in data]
return pd.DataFrame(
{
"id": ids,
"doc_id": [x["doc_id"] for x in data],
"data": instance,
"input_len": [len(x) for x in instance],
"labels": labels,
"output_type": config["output_type"],
}
)
def generate_system_df(data, config):
"""Generate a dataframe for a specific system to be uploaded to Zeno.
Args:
data: The data to generate a dataframe from.
config: The configuration of the task.
Returns:
pd.Dataframe: A dataframe that is ready to be uploaded to Zeno as a system.
"""
ids = (
[x["doc_id"] for x in data]
if not config.get("filter_list")
else [f"{x['doc_id']}.{x['filter']}" for x in data]
)
system_dict = {"id": ids}
system_dict["doc_id"] = [x["doc_id"] for x in data]
if config.get("filter_list"):
system_dict["filter"] = [x["filter"] for x in data]
system_dict["output"] = [""] * len(ids)
if config["output_type"] == "loglikelihood":
system_dict["output"] = [
"correct" if x["filtered_resps"][0][1] is True else "incorrect"
for x in data
]
elif config["output_type"] == "multiple_choice":
system_dict["output"] = [
", ".join([str(y[0]) for y in x["filtered_resps"]]) for x in data
]
system_dict["num_answers"] = [len(x["filtered_resps"]) for x in data]
elif config["output_type"] == "loglikelihood_rolling":
system_dict["output"] = [str(x["filtered_resps"][0]) for x in data]
elif config["output_type"] == "generate_until":
system_dict["output"] = [str(x["filtered_resps"][0]) for x in data]
system_dict["output_length"] = [len(str(x["filtered_resps"][0])) for x in data]
metrics = {
metric["metric"]: [x[metric["metric"]] for x in data]
for metric in config["metric_list"]
}
system_dict.update(metrics)
system_df = pd.DataFrame(system_dict)
return system_df
if __name__ == "__main__":
main()
|