BayesTensor's picture
Upload folder using huggingface_hub
9d5b280 verified
import collections
import fnmatch
import functools
import hashlib
import importlib.util
import inspect
import json
import logging
import os
import re
from dataclasses import asdict, is_dataclass
from itertools import islice
from typing import Any, Callable, Generator, List, Tuple
import numpy as np
import yaml
from jinja2 import BaseLoader, Environment, StrictUndefined
logging.basicConfig(
format="%(asctime)s,%(msecs)03d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s",
datefmt="%Y-%m-%d:%H:%M:%S",
level=logging.INFO,
)
eval_logger = logging.getLogger("lm-eval")
SPACING = " " * 47
HIGHER_IS_BETTER_SYMBOLS = {
True: "↑",
False: "↓",
}
def hash_string(string: str) -> str:
return hashlib.sha256(string.encode("utf-8")).hexdigest()
def escaped_split(text, sep_char, maxsplit=-1):
"""Split text into a list on occurrences of the given separation
character `sep_char`. The separation character may be escaped by a
backslash to avoid splitting at that location.
The separation character must be a string of size 1.
If `maxsplit` is given, at most `maxsplit` splits are done (thus,
the list will have at most `maxsplit + 1` elements). If `maxsplit`
is not specified or less than 0, then there is no limit on the
number of splits (all possible splits are made).
"""
assert len(sep_char) == 1, (
"separation string must be a single character for escaped splitting"
)
if maxsplit == 0:
return text
maxsplit = max(0, maxsplit)
return re.split(r"(?<!\\)" + sep_char, text, maxsplit)
def handle_arg_string(arg):
if arg.lower() == "true":
return True
elif arg.lower() == "false":
return False
elif arg.isnumeric():
return int(arg)
try:
return float(arg)
except ValueError:
return arg
def handle_non_serializable(o):
if isinstance(o, np.int64) or isinstance(o, np.int32):
return int(o)
elif isinstance(o, set):
return list(o)
else:
return str(o)
def sanitize_list(sub):
"""
Takes possible nested list and recursively converts all inner component to strings
"""
if isinstance(sub, list):
return [sanitize_list(item) for item in sub]
if isinstance(sub, tuple):
return tuple(sanitize_list(item) for item in sub)
else:
return str(sub)
def simple_parse_args_string(args_string):
"""
Parses something like
args1=val1,arg2=val2
Into a dictionary
"""
args_string = args_string.strip()
if not args_string:
return {}
arg_list = [arg for arg in args_string.split(",") if arg]
args_dict = {
kv[0]: handle_arg_string("=".join(kv[1:]))
for kv in [arg.split("=") for arg in arg_list]
}
return args_dict
def join_iters(iters):
for iter in iters:
yield from iter
def group(arr, fn):
res = collections.defaultdict(list)
for ob in arr:
res[fn(ob)].append(ob)
return list(res.values())
# Returns a list containing all values of the source_list that
# match at least one of the patterns
def pattern_match(patterns, source_list):
if isinstance(patterns, str):
patterns = [patterns]
task_names = set()
for pattern in patterns:
for matching in fnmatch.filter(source_list, pattern):
task_names.add(matching)
return sorted(list(task_names))
def softmax(x):
"""Compute softmax values for each sets of scores in x."""
e_x = np.exp(x - np.max(x))
return e_x / e_x.sum()
def general_detokenize(string):
string = string.replace(" n't", "n't")
string = string.replace(" )", ")")
string = string.replace("( ", "(")
string = string.replace('" ', '"')
string = string.replace(' "', '"')
string = re.sub(r" (['.,])", r"\1", string)
return string
def get_file_task_name(filename: str) -> str:
"""
Given the sample results filenames, extracts and returns the task name.
"""
return filename[filename.find("_") + 1 : filename.rfind("_")]
def get_file_datetime(filename: str) -> str:
"""
Given the results and sample results filenames, extracts and returns the datetime.
"""
return filename[filename.rfind("_") + 1 :].replace(".jsonl", "")
def sanitize_model_name(model_name: str) -> str:
"""
Given the model name, returns a sanitized version of it.
"""
return re.sub(r"[\"<>:/\|\\?\*\[\]]+", "__", model_name)
def sanitize_task_name(task_name: str) -> str:
"""
Given the task name, returns a sanitized version of it.
"""
return re.sub(r"\W", "_", task_name)
def get_latest_filename(filenames: List[str]) -> str:
"""
Given a list of filenames, returns the filename with the latest datetime.
"""
return max(filenames, key=lambda f: get_file_datetime(f))
def get_results_filenames(filenames: List[str]) -> List[str]:
"""
Extracts filenames that correspond to aggregated results.
"""
return [f for f in filenames if "/results_" in f and ".json" in f]
def get_sample_results_filenames(filenames: List[str]) -> List[str]:
"""
Extracts filenames that correspond to sample results.
"""
return [f for f in filenames if "/samples_" in f and ".json" in f]
def get_rolling_token_windows(
token_list: List[int], prefix_token: int, max_seq_len: int, context_len: int
) -> Generator[Tuple[List[int], List[int]], None, None]:
"""
- context_len allows for a rolling window context, allowing each prediction window to potentially
condition on some context
:param token_list: list
List of tokens to be PREDICTED
:param max_seq_len: int
max_seq_len of model (or max_seq_len we want to use)
:param context_len: int
Amount of desired token context for prediction. Needs to be at least 1.
:param prefix_token: token
Dummy token like <eos> so the first token has something to condition on
:return: generator
Generator of tuples
(input_tokens, pred_tokens)
Note: Score only the last len(pred_tokens) logits of the LM
"""
assert 1 <= context_len <= max_seq_len
if not token_list:
return
# +1 offset, going from input->preds
pred_len = max_seq_len - context_len + 1
predicted = 0
# Special handling for first window: predict all tokens
first_seq_len = min(max_seq_len, len(token_list))
yield [prefix_token] + token_list[: first_seq_len - 1], token_list[:first_seq_len]
predicted += first_seq_len
while predicted < len(token_list):
window_pred_len = min(len(token_list) - predicted, pred_len)
window_end = predicted + window_pred_len
yield (
token_list[window_end - max_seq_len - 1 : window_end - 1],
token_list[window_end - window_pred_len : window_end],
)
predicted += window_pred_len
def make_disjoint_window(
pair: Tuple[List[int], List[int]],
) -> Tuple[List[int], List[int]]:
"""Takes output from get_rolling_token_windows and makes the context not overlap with the continuation"""
a, b = pair
return a[: len(a) - (len(b) - 1)], b
class EnhancedJSONEncoder(json.JSONEncoder):
"""
Provides a proper json encoding for the loggers and trackers json dumps.
Notably manages the json encoding of dataclasses.
"""
def default(self, o):
if is_dataclass(o):
return asdict(o)
return super().default(o)
class Reorderer:
def __init__(self, arr: List[Any], fn: Callable) -> None:
"""Reorder an array according to some function
Args:
arr (List[Any]): The initial array
fn (Callable[[Any], Any]): A function to determine the priority of elements
"""
self.size = len(arr)
arr = list(enumerate(arr))
arr = group(arr, lambda x: fn(x[1]))
# arr = [([y[0] for y in x], x[0][1]) for x in arr]
# TODO: overhaul reorderer. It currently grouped requests by content but we don't want this
arr = [([y[0]], x[0][1]) for x in arr for y in x]
arr.sort(key=lambda x: fn(x[1]))
self.arr = arr
def get_reordered(self):
"""Gets the reordered array
Returns:
List[Any]: The reordered array
"""
return [x[1] for x in self.arr]
def get_original(self, newarr):
"""Restores the original order of a new array based on the old array's order
Args:
newarr (List[Any]): The array to be restored
Returns:
List[Any]: The array restored to the original order
"""
res = [None] * self.size
cov = [False] * self.size
for (inds, _), v in zip(self.arr, newarr):
for ind in inds:
res[ind] = v
cov[ind] = True
assert all(cov)
return res
def make_table(result_dict, column: str = "results", sort_results: bool = False):
"""Generate table of results."""
from pytablewriter import LatexTableWriter, MarkdownTableWriter
if column == "results":
column_name = "Tasks"
elif column == "groups":
column_name = "Groups"
all_headers = [
column_name,
"Version",
"Filter",
"n-shot",
"Metric",
"",
"Value",
"",
"Stderr",
]
md_writer = MarkdownTableWriter()
latex_writer = LatexTableWriter()
md_writer.headers = all_headers
latex_writer.headers = all_headers
values = []
keys = result_dict[column].keys()
if sort_results:
# sort entries alphabetically by task or group name.
# NOTE: we default here to false, because order matters for multi-level table printing a la mmlu.
# sorting here would mess that up
keys = sorted(keys)
for k in keys:
dic = result_dict[column][k]
version = result_dict["versions"].get(k, " N/A")
n = str(result_dict.get("n-shot", " ").get(k, " "))
higher_is_better = result_dict.get("higher_is_better", {}).get(k, {})
if "alias" in dic:
k = dic.pop("alias")
metric_items = dic.items()
metric_items = sorted(metric_items)
for (mf), v in metric_items:
m, _, f = mf.partition(",")
if m.endswith("_stderr"):
continue
hib = HIGHER_IS_BETTER_SYMBOLS.get(higher_is_better.get(m), "")
v = "%.4f" % v if isinstance(v, float) else v
if m + "_stderr" + "," + f in dic:
se = dic[m + "_stderr" + "," + f]
se = " N/A" if se == "N/A" else "%.4f" % se
values.append([k, version, f, n, m, hib, v, "±", se])
else:
values.append([k, version, f, n, m, hib, v, "", ""])
k = ""
version = ""
md_writer.value_matrix = values
latex_writer.value_matrix = values
# todo: make latex table look good
# print(latex_writer.dumps())
return md_writer.dumps()
def positional_deprecated(fn):
"""
A decorator to nudge users into passing only keyword args (`kwargs`) to the
wrapped function, `fn`.
"""
@functools.wraps(fn)
def _wrapper(*args, **kwargs):
if len(args) != 1 if inspect.ismethod(fn) else 0:
print(
f"WARNING: using {fn.__name__} with positional arguments is "
"deprecated and will be disallowed in a future version of "
"lm-evaluation-harness!"
)
return fn(*args, **kwargs)
return _wrapper
def ignore_constructor(loader, node):
return node
def import_function(loader, node):
function_name = loader.construct_scalar(node)
yaml_path = os.path.dirname(loader.name)
*module_name, function_name = function_name.split(".")
if isinstance(module_name, list):
module_name = ".".join(module_name)
module_path = os.path.normpath(os.path.join(yaml_path, "{}.py".format(module_name)))
spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
function = getattr(module, function_name)
return function
def load_yaml_config(yaml_path=None, yaml_config=None, yaml_dir=None, mode="full"):
if mode == "simple":
constructor_fn = ignore_constructor
elif mode == "full":
constructor_fn = import_function
# Add the import_function constructor to the YAML loader
yaml.add_constructor("!function", constructor_fn)
if yaml_config is None:
with open(yaml_path, "rb") as file:
yaml_config = yaml.full_load(file)
if yaml_dir is None:
yaml_dir = os.path.dirname(yaml_path)
assert yaml_dir is not None
if "include" in yaml_config:
include_path = yaml_config["include"]
del yaml_config["include"]
if isinstance(include_path, str):
include_path = [include_path]
# Load from the last one first
include_path.reverse()
final_yaml_config = {}
for path in include_path:
# Assumes that path is a full path.
# If not found, assume the included yaml
# is in the same dir as the original yaml
if not os.path.isfile(path):
path = os.path.join(yaml_dir, path)
try:
included_yaml_config = load_yaml_config(yaml_path=path, mode=mode)
final_yaml_config.update(included_yaml_config)
except Exception as ex:
# If failed to load, ignore
raise ex
final_yaml_config.update(yaml_config)
return final_yaml_config
return yaml_config
def regex_replace(string, pattern, repl, count: int = 0):
"""Implements the `re.sub` function as a custom Jinja filter."""
return re.sub(pattern, repl, string, count=count)
env = Environment(
loader=BaseLoader, undefined=StrictUndefined, keep_trailing_newline=True
)
env.filters["regex_replace"] = regex_replace
def apply_template(template: str, doc: dict) -> str:
rtemplate = env.from_string(template)
return rtemplate.render(**doc)
def create_iterator(raw_iterator, *, rank=0, world_size=1, limit=None):
"""
Method for creating a (potentially) sliced and limited
iterator from a raw document iterator. Used for splitting data
among ranks in multigpu setting or only pulling a sample of documents
"""
return islice(raw_iterator, rank, limit, world_size)
def weighted_f1_score(items):
from sklearn.metrics import f1_score
unzipped_list = list(zip(*items))
golds = unzipped_list[0]
preds = unzipped_list[1]
fscore = f1_score(golds, preds, average="weighted")
return fscore