target-poc / call_openai.py
gerglitzen's picture
fix
5f04d3d
import openai
import json
from typing import List, Dict
from callback_handler import BaseCallbackHandler
import tiktoken
def call_openai(
messages: List[Dict[str, str]],
functions: List[str] = None,
stream: str = "no",
model: str = "gpt-3.5-turbo",
temperature: float = 0,
callback: BaseCallbackHandler = None
) -> str:
"""
Call openai with list of messages and optional list of functions. See description at openai website.
Args:
messages: messages passed to openai. list of dictionaries with keys: role=[system, user, assitant, function] + content= message
functions: function list passed to openai
stream: ["no", "sentence", "token"]
model: name of openai model
temperature: of openai model
callback: callback handler class. If streaming, it is mandatory
Returns:
final message
"""
current_state = None
prompt_tokens = token_count(
messages=messages,
functions=functions
)
if functions == None:
completion_tokens = -2
response = openai.ChatCompletion.create(
model = model,
temperature=temperature,
stream=True,
messages=messages,
)
else:
completion_tokens = -1
response = openai.ChatCompletion.create(
model = model,
temperature=temperature,
stream=True,
messages=messages,
functions=functions
)
for chunk in response:
completion_tokens += 1
data = json.loads(str(chunk["choices"][0]))
delta = data["delta"]
finish_reason = data["finish_reason"]
if finish_reason is not None:
if finish_reason == "function_call":
completion_tokens += 6
final_response = {
"usage": {
"completion_tokens": completion_tokens,
"prompt_tokens": prompt_tokens,
},
"choices": []
}
if current_state == "function":
d = {
"finish_reason": "function_call",
"message": {
"content": None,
"function_call": {
"arguments": function_arg,
"name": function_name
},
"role": "assistant"
}
}
final_response["choices"].append(d)
if current_state == "user":
d = {
"finish_reason": "stop",
"message": {
"content": message_all,
"role": "assistant"
}
}
final_response["choices"].append(d)
if callback:
callback.on_llm_end(response=final_response)
return final_response
else:
if current_state == None:
if 'function_call' in delta:
current_state = "function"
function_name = delta["function_call"]["name"]
function_arg = ""
# if stream != "no":
# s = f" - {function_name}"
# callback.on_llm_new_token(token=s)
else:
current_state = "user"
message_stream = ""
message_all = ""
elif current_state == "function":
function_arg += delta['function_call']['arguments']
elif current_state == "user":
token = delta["content"]
message_all += token
if stream == "token":
callback.on_llm_new_token(token=token)
if stream == "sentence":
message_stream += token
if "." in token or "!" in token or "?" in token or "\n" in token:
if message_stream[-1] == "\n":
callback.on_llm_new_token(token=message_stream[:-1])
else:
callback.on_llm_new_token(token=message_stream)
message_stream = ""
def num_tokens_from_messages(messages, model="gpt-3.5-turbo-0613"):
"""Return the number of tokens used by a list of messages."""
try:
encoding = tiktoken.encoding_for_model(model)
except KeyError:
# print("Warning: model not found. Using cl100k_base encoding.")
encoding = tiktoken.get_encoding("cl100k_base")
if model in {
"gpt-3.5-turbo-0613",
"gpt-3.5-turbo-16k-0613",
"gpt-4-0314",
"gpt-4-32k-0314",
"gpt-4-0613",
"gpt-4-32k-0613",
}:
tokens_per_message = 3
tokens_per_name = 1
elif model == "gpt-3.5-turbo-0301":
tokens_per_message = 4 # every message follows <|start|>{role/name}\n{content}<|end|>\n
tokens_per_name = -1 # if there's a name, the role is omitted
elif "gpt-3.5-turbo" in model:
# print("Warning: gpt-3.5-turbo may update over time. Returning num tokens assuming gpt-3.5-turbo-0613.")
return num_tokens_from_messages(messages, model="gpt-3.5-turbo-0613")
elif "gpt-4" in model:
# print("Warning: gpt-4 may update over time. Returning num tokens assuming gpt-4-0613.")
return num_tokens_from_messages(messages, model="gpt-4-0613")
else:
raise NotImplementedError(
f"""num_tokens_from_messages() is not implemented for model {model}. See https://github.com/openai/openai-python/blob/main/chatml.md for information on how messages are converted to tokens."""
)
num_tokens = 0
# print(messages)
for message in messages:
num_tokens += tokens_per_message
for key, value in message.items():
if key == "function_call":
num_tokens += tokens_per_name
for k, v in value.items():
# print(k,v)
num_tokens += len(encoding.encode(v))
if value != None and key != "function_call":
num_tokens += len(encoding.encode(value))
if key == "name":
num_tokens += tokens_per_name
num_tokens += 3 # every reply is primed with <|start|>assistant<|message|>
return num_tokens
def num_tokens_from_functions(functions, model="gpt-3.5-turbo-0613"):
"""Return the number of tokens used by a list of functions."""
try:
encoding = tiktoken.encoding_for_model(model)
except KeyError:
# print("Warning: model not found. Using cl100k_base encoding.")
encoding = tiktoken.get_encoding("cl100k_base")
num_tokens = 0
for function in functions:
function_tokens = len(encoding.encode(function['name']))
function_tokens += len(encoding.encode(function['description']))
if 'parameters' in function:
parameters = function['parameters']
if 'properties' in parameters:
for propertiesKey in parameters['properties']:
function_tokens += len(encoding.encode(propertiesKey))
v = parameters['properties'][propertiesKey]
for field in v:
if field == 'type':
function_tokens += 2
function_tokens += len(encoding.encode(v['type']))
elif field == 'description':
function_tokens += 2
function_tokens += len(encoding.encode(v['description']))
elif field == 'enum':
function_tokens -= 3
for o in v['enum']:
function_tokens += 3
function_tokens += len(encoding.encode(o))
else:
dummy = 0
# print(f"Warning: not supported field: {field}")
function_tokens += 16
num_tokens += function_tokens
num_tokens += 16
return num_tokens
def token_count(
messages: List[Dict[str, str]],
functions: List[str] = None,
model = "gpt-3.5-turbo-0613"
) -> int:
msgs_tokens = num_tokens_from_messages(messages=messages, model=model)
tokens_used = msgs_tokens
if functions is not None:
function_tokens = num_tokens_from_functions(functions=functions, model=model)
tokens_used += function_tokens
return tokens_used