import copy import json import os import re import sys import argparse import fire import torch sys.path.append(os.path.join(os.getcwd(), "peft/src/")) from peft import PeftModel from tqdm import tqdm from transformers import GenerationConfig, LlamaForCausalLM, LlamaTokenizer, AutoModelForCausalLM, AutoTokenizer if torch.cuda.is_available(): device = "cuda" else: device = "cpu" try: if torch.backends.mps.is_available(): device = "mps" except: # noqa: E722 pass def main( load_8bit: bool = False, base_model: str = "", lora_weights: str = "tloen/alpaca-lora-7b", share_gradio: bool = False, ): args = parse_args() def evaluate( instruction, input=None, temperature=0.1, top_p=0.75, top_k=40, num_beams=4, max_new_tokens=256, **kwargs, ): prompt = generate_prompt(instruction, input) inputs = tokenizer(prompt, return_tensors="pt") input_ids = inputs["input_ids"].to(device) generation_config = GenerationConfig( temperature=temperature, top_p=top_p, top_k=top_k, num_beams=num_beams, **kwargs, ) with torch.no_grad(): generation_output = model.generate( input_ids=input_ids, generation_config=generation_config, return_dict_in_generate=True, output_scores=True, max_new_tokens=max_new_tokens, use_cache=False, ) s = generation_output.sequences[0] output = tokenizer.decode(s) return output.split("### Response:")[1].strip() """ # testing code for readme for instruction in [ "Tell me about alpacas.", "Tell me about the president of Mexico in 2019.", "Tell me about the king of France in 2019.", "List all Canadian provinces in alphabetical order.", "Write a Python program that prints the first 10 Fibonacci numbers.", "Write a program that prints the numbers from 1 to 100. But for multiples of three print 'Fizz' instead of the number and for the multiples of five print 'Buzz'. For numbers which are multiples of both three and five print 'FizzBuzz'.", # noqa: E501 "Tell me five words that rhyme with 'shock'.", "Translate the sentence 'I have no mouth but I must scream' into Spanish.", "Count up from 1 to 500.", ]: print("Instruction:", instruction) print("Response:", evaluate(instruction)) print() """ save_file = f'experiment/{args.model}-{args.adapter}-{args.dataset}.json' create_dir('experiment/') dataset = load_data(args) tokenizer, model = load_model(args) total = len(dataset) correct = 0 miss = 0.001 output_data = [] pbar = tqdm(total=total) for idx, data in enumerate(dataset): instruction = data.get('instruction') outputs = evaluate(instruction) label = data.get('answer') flag = False if args.dataset.lower() in ['aqua']: predict = extract_answer_letter(args, outputs) if label == predict: correct += 1 flag = True else: if isinstance(label, str): label = float(label) predict = extract_answer_number(args, outputs) if abs(label - predict) <= miss: correct += 1 flag = True new_data = copy.deepcopy(data) new_data['output_pred'] = outputs new_data['pred'] = predict new_data['flag'] = flag output_data.append(new_data) print(' ') print('---------------') print(outputs) print('prediction:', predict) print('label:', label) print('---------------') print(f'\rtest:{idx + 1}/{total} | accuracy {correct} {correct / (idx + 1)}') with open(save_file, 'w+') as f: json.dump(output_data, f, indent=4) pbar.update(1) pbar.close() print('\n') print('test finished') def create_dir(dir_path): if not os.path.exists(dir_path): os.mkdir(dir_path) return def generate_prompt(instruction, input=None): if input: return f"""Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request. ### Instruction: {instruction} ### Input: {input} ### Response: """ # noqa: E501 else: return f"""Below is an instruction that describes a task. Write a response that appropriately completes the request. ### Instruction: {instruction} ### Response: """ # noqa: E501 def load_data(args) -> list: """ read data from dataset file Args: args: Returns: """ file_path = f'dataset/{args.dataset}/test.json' if not os.path.exists(file_path): raise FileNotFoundError(f"can not find dataset file : {file_path}") json_data = json.load(open(file_path, 'r')) return json_data def parse_args(): parser = argparse.ArgumentParser() parser.add_argument('--dataset', choices=['AddSub', 'MultiArith', 'SingleEq', 'gsm8k', 'AQuA', 'SVAMP'], required=True) parser.add_argument('--model', choices=['LLaMA-7B', 'BLOOM-7B', 'GPT-j-6B'], required=True) parser.add_argument('--adapter', choices=['LoRA', 'AdapterP', 'AdapterH', 'Parallel', 'Prefix'], required=True) parser.add_argument('--base_model', required=True) parser.add_argument('--lora_weights', required=True) parser.add_argument('--load_8bit', action='store_true', default=False) return parser.parse_args() def load_model(args) -> tuple: """ load tuned model Args: args: Returns: tuple(tokenizer, model) """ base_model = args.base_model if not base_model: raise ValueError(f'can not find base model name by the value: {args.model}') lora_weights = args.lora_weights if not lora_weights: raise ValueError(f'can not find lora weight, the value is: {lora_weights}') load_8bit = args.load_8bit if args.model == 'LLaMA-7B': tokenizer = LlamaTokenizer.from_pretrained(base_model) else: tokenizer = AutoTokenizer.from_pretrained(base_model) if device == "cuda": model = AutoModelForCausalLM.from_pretrained( base_model, load_in_8bit=load_8bit, torch_dtype=torch.float16, device_map="auto", trust_remote_code=True, ) # fix zwq model = PeftModel.from_pretrained( model, lora_weights, torch_dtype=torch.float16, device_map={"":0} ) elif device == "mps": model = AutoModelForCausalLM.from_pretrained( base_model, device_map={"": device}, torch_dtype=torch.float16, ) model = PeftModel.from_pretrained( model, lora_weights, device_map={"": device}, torch_dtype=torch.float16, ) else: model = AutoModelForCausalLM.from_pretrained( base_model, device_map={"": device}, low_cpu_mem_usage=True ) model = PeftModel.from_pretrained( model, lora_weights, device_map={"": device}, ) # unwind broken decapoda-research config model.config.pad_token_id = tokenizer.pad_token_id = 0 # unk model.config.bos_token_id = 1 model.config.eos_token_id = 2 if not load_8bit: model.half() # seems to fix bugs for some users. model.eval() if torch.__version__ >= "2" and sys.platform != "win32": model = torch.compile(model) return tokenizer, model def load_instruction(args) -> str: instruction = '' if not instruction: raise ValueError('instruct not initialized') return instruction def extract_answer_number(args, sentence: str) -> float: dataset = args.dataset.lower() if dataset in ["multiarith", "addsub", "singleeq", "gsm8k", "svamp"]: sentence = sentence.replace(',', '') pred = [s for s in re.findall(r'-?\d+\.?\d*', sentence)] if not pred: return float('inf') pred_answer = float(pred[-1]) else: raise NotImplementedError(' not support dataset: {}'.format(dataset)) if isinstance(pred_answer, str): try: pred_answer = float(pred_answer) except ValueError as e: pred_answer = float('inf') return pred_answer def extract_answer_letter(args, sentence: str) -> str: sentence_ = sentence.strip() pred_answers = re.findall(r'A|B|C|D|E', sentence_) if pred_answers: if not pred_answers: return '' return pred_answers[0] else: return '' if __name__ == "__main__": fire.Fire(main)