AttnTrace / src /models /Llama.py
SecureLLMSys's picture
update
ee19553
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from .Model import Model
import os
import signal
from functools import lru_cache
import spaces
def handle_timeout(sig, frame):
raise TimeoutError('took too long')
signal.signal(signal.SIGALRM, handle_timeout)
class Llama(Model):
def __init__(self, config, device="cuda:0"):
super().__init__(config)
self.device = device
self.max_output_tokens = int(config["params"]["max_output_tokens"])
api_pos = int(config["api_key_info"]["api_key_use"])
self.hf_token = config["api_key_info"]["api_keys"][api_pos] or os.getenv("HF_TOKEN")
self.tokenizer = AutoTokenizer.from_pretrained(self.name, use_auth_token=self.hf_token)
self.model = None # Delayed init
self.terminators = [
self.tokenizer.eos_token_id,
self.tokenizer.convert_tokens_to_ids("<|eot_id|>")
]
def _load_model_if_needed(self):
if self.model is None:
model = AutoModelForCausalLM.from_pretrained(
self.name,
torch_dtype=torch.bfloat16,
token=self.hf_token,
device_map="auto", # or omit entirely to default to CPU
)
self.model = model
return self.model
@spaces.GPU
def query(self, msg, max_tokens=128000):
model = self._load_model_if_needed().to("cuda")
messages = self.messages
messages[1]["content"] = msg
input_ids = self.tokenizer.apply_chat_template(
messages,
add_generation_prompt=True,
return_tensors="pt",
truncation=True
).to(model.device)
attention_mask = torch.ones(input_ids.shape, device=model.device)
try:
signal.alarm(60)
output_tokens = model.generate(
input_ids,
max_length=max_tokens,
attention_mask=attention_mask,
eos_token_id=self.terminators,
top_k=50,
do_sample=False
)
signal.alarm(0)
except TimeoutError:
print("time out")
return "time out"
return self.tokenizer.decode(output_tokens[0][input_ids.shape[-1]:], skip_special_tokens=True)
def get_prompt_length(self, msg):
model = self._load_model_if_needed()
messages = self.messages
messages[1]["content"] = msg
input_ids = self.tokenizer.apply_chat_template(
messages,
add_generation_prompt=True,
return_tensors="pt",
padding=True,
truncation=True
).to(model.device)
return len(input_ids[0])
def cut_context(self, msg, max_length):
tokens = self.tokenizer.encode(msg, add_special_tokens=True)
truncated_tokens = tokens[:max_length]
truncated_text = self.tokenizer.decode(truncated_tokens, skip_special_tokens=True)
return truncated_text