Spaces:
Sleeping
Sleeping
import openai | |
import json | |
from typing import List, Dict | |
from callback_handler import BaseCallbackHandler | |
import tiktoken | |
def call_openai( | |
messages: List[Dict[str, str]], | |
functions: List[str] = None, | |
stream: str = "no", | |
model: str = "gpt-3.5-turbo", | |
temperature: float = 0, | |
callback: BaseCallbackHandler = None | |
) -> str: | |
""" | |
Call openai with list of messages and optional list of functions. See description at openai website. | |
Args: | |
messages: messages passed to openai. list of dictionaries with keys: role=[system, user, assitant, function] + content= message | |
functions: function list passed to openai | |
stream: ["no", "sentence", "token"] | |
model: name of openai model | |
temperature: of openai model | |
callback: callback handler class. If streaming, it is mandatory | |
Returns: | |
final message | |
""" | |
current_state = None | |
prompt_tokens = token_count( | |
messages=messages, | |
functions=functions | |
) | |
if functions == None: | |
completion_tokens = -2 | |
response = openai.ChatCompletion.create( | |
model = model, | |
temperature=temperature, | |
stream=True, | |
messages=messages, | |
) | |
else: | |
completion_tokens = -1 | |
response = openai.ChatCompletion.create( | |
model = model, | |
temperature=temperature, | |
stream=True, | |
messages=messages, | |
functions=functions | |
) | |
for chunk in response: | |
completion_tokens += 1 | |
data = json.loads(str(chunk["choices"][0])) | |
delta = data["delta"] | |
finish_reason = data["finish_reason"] | |
if finish_reason is not None: | |
if finish_reason == "function_call": | |
completion_tokens += 6 | |
final_response = { | |
"usage": { | |
"completion_tokens": completion_tokens, | |
"prompt_tokens": prompt_tokens, | |
}, | |
"choices": [] | |
} | |
if current_state == "function": | |
d = { | |
"finish_reason": "function_call", | |
"message": { | |
"content": None, | |
"function_call": { | |
"arguments": function_arg, | |
"name": function_name | |
}, | |
"role": "assistant" | |
} | |
} | |
final_response["choices"].append(d) | |
if current_state == "user": | |
d = { | |
"finish_reason": "stop", | |
"message": { | |
"content": message_all, | |
"role": "assistant" | |
} | |
} | |
final_response["choices"].append(d) | |
if callback: | |
callback.on_llm_end(response=final_response) | |
return final_response | |
else: | |
if current_state == None: | |
if 'function_call' in delta: | |
current_state = "function" | |
function_name = delta["function_call"]["name"] | |
function_arg = "" | |
# if stream != "no": | |
# s = f" - {function_name}" | |
# callback.on_llm_new_token(token=s) | |
else: | |
current_state = "user" | |
message_stream = "" | |
message_all = "" | |
elif current_state == "function": | |
function_arg += delta['function_call']['arguments'] | |
elif current_state == "user": | |
token = delta["content"] | |
message_all += token | |
if stream == "token": | |
callback.on_llm_new_token(token=token) | |
if stream == "sentence": | |
message_stream += token | |
if "." in token or "!" in token or "?" in token or "\n" in token: | |
if message_stream[-1] == "\n": | |
callback.on_llm_new_token(token=message_stream[:-1]) | |
else: | |
callback.on_llm_new_token(token=message_stream) | |
message_stream = "" | |
def num_tokens_from_messages(messages, model="gpt-3.5-turbo-0613"): | |
"""Return the number of tokens used by a list of messages.""" | |
try: | |
encoding = tiktoken.encoding_for_model(model) | |
except KeyError: | |
# print("Warning: model not found. Using cl100k_base encoding.") | |
encoding = tiktoken.get_encoding("cl100k_base") | |
if model in { | |
"gpt-3.5-turbo-0613", | |
"gpt-3.5-turbo-16k-0613", | |
"gpt-4-0314", | |
"gpt-4-32k-0314", | |
"gpt-4-0613", | |
"gpt-4-32k-0613", | |
}: | |
tokens_per_message = 3 | |
tokens_per_name = 1 | |
elif model == "gpt-3.5-turbo-0301": | |
tokens_per_message = 4 # every message follows <|start|>{role/name}\n{content}<|end|>\n | |
tokens_per_name = -1 # if there's a name, the role is omitted | |
elif "gpt-3.5-turbo" in model: | |
# print("Warning: gpt-3.5-turbo may update over time. Returning num tokens assuming gpt-3.5-turbo-0613.") | |
return num_tokens_from_messages(messages, model="gpt-3.5-turbo-0613") | |
elif "gpt-4" in model: | |
# print("Warning: gpt-4 may update over time. Returning num tokens assuming gpt-4-0613.") | |
return num_tokens_from_messages(messages, model="gpt-4-0613") | |
else: | |
raise NotImplementedError( | |
f"""num_tokens_from_messages() is not implemented for model {model}. See https://github.com/openai/openai-python/blob/main/chatml.md for information on how messages are converted to tokens.""" | |
) | |
num_tokens = 0 | |
# print(messages) | |
for message in messages: | |
num_tokens += tokens_per_message | |
for key, value in message.items(): | |
if key == "function_call": | |
num_tokens += tokens_per_name | |
for k, v in value.items(): | |
# print(k,v) | |
num_tokens += len(encoding.encode(v)) | |
if value != None and key != "function_call": | |
num_tokens += len(encoding.encode(value)) | |
if key == "name": | |
num_tokens += tokens_per_name | |
num_tokens += 3 # every reply is primed with <|start|>assistant<|message|> | |
return num_tokens | |
def num_tokens_from_functions(functions, model="gpt-3.5-turbo-0613"): | |
"""Return the number of tokens used by a list of functions.""" | |
try: | |
encoding = tiktoken.encoding_for_model(model) | |
except KeyError: | |
# print("Warning: model not found. Using cl100k_base encoding.") | |
encoding = tiktoken.get_encoding("cl100k_base") | |
num_tokens = 0 | |
for function in functions: | |
function_tokens = len(encoding.encode(function['name'])) | |
function_tokens += len(encoding.encode(function['description'])) | |
if 'parameters' in function: | |
parameters = function['parameters'] | |
if 'properties' in parameters: | |
for propertiesKey in parameters['properties']: | |
function_tokens += len(encoding.encode(propertiesKey)) | |
v = parameters['properties'][propertiesKey] | |
for field in v: | |
if field == 'type': | |
function_tokens += 2 | |
function_tokens += len(encoding.encode(v['type'])) | |
elif field == 'description': | |
function_tokens += 2 | |
function_tokens += len(encoding.encode(v['description'])) | |
elif field == 'enum': | |
function_tokens -= 3 | |
for o in v['enum']: | |
function_tokens += 3 | |
function_tokens += len(encoding.encode(o)) | |
else: | |
dummy = 0 | |
# print(f"Warning: not supported field: {field}") | |
function_tokens += 16 | |
num_tokens += function_tokens | |
num_tokens += 16 | |
return num_tokens | |
def token_count( | |
messages: List[Dict[str, str]], | |
functions: List[str] = None, | |
model = "gpt-3.5-turbo-0613" | |
) -> int: | |
msgs_tokens = num_tokens_from_messages(messages=messages, model=model) | |
tokens_used = msgs_tokens | |
if functions is not None: | |
function_tokens = num_tokens_from_functions(functions=functions, model=model) | |
tokens_used += function_tokens | |
return tokens_used |