Spaces:
Sleeping
Sleeping
File size: 8,037 Bytes
0ab9543 5f04d3d 0ab9543 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
import openai
import json
from typing import List, Dict
from callback_handler import BaseCallbackHandler
import tiktoken
def call_openai(
messages: List[Dict[str, str]],
functions: List[str] = None,
stream: str = "no",
model: str = "gpt-3.5-turbo",
temperature: float = 0,
callback: BaseCallbackHandler = None
) -> str:
"""
Call openai with list of messages and optional list of functions. See description at openai website.
Args:
messages: messages passed to openai. list of dictionaries with keys: role=[system, user, assitant, function] + content= message
functions: function list passed to openai
stream: ["no", "sentence", "token"]
model: name of openai model
temperature: of openai model
callback: callback handler class. If streaming, it is mandatory
Returns:
final message
"""
current_state = None
prompt_tokens = token_count(
messages=messages,
functions=functions
)
if functions == None:
completion_tokens = -2
response = openai.ChatCompletion.create(
model = model,
temperature=temperature,
stream=True,
messages=messages,
)
else:
completion_tokens = -1
response = openai.ChatCompletion.create(
model = model,
temperature=temperature,
stream=True,
messages=messages,
functions=functions
)
for chunk in response:
completion_tokens += 1
data = json.loads(str(chunk["choices"][0]))
delta = data["delta"]
finish_reason = data["finish_reason"]
if finish_reason is not None:
if finish_reason == "function_call":
completion_tokens += 6
final_response = {
"usage": {
"completion_tokens": completion_tokens,
"prompt_tokens": prompt_tokens,
},
"choices": []
}
if current_state == "function":
d = {
"finish_reason": "function_call",
"message": {
"content": None,
"function_call": {
"arguments": function_arg,
"name": function_name
},
"role": "assistant"
}
}
final_response["choices"].append(d)
if current_state == "user":
d = {
"finish_reason": "stop",
"message": {
"content": message_all,
"role": "assistant"
}
}
final_response["choices"].append(d)
if callback:
callback.on_llm_end(response=final_response)
return final_response
else:
if current_state == None:
if 'function_call' in delta:
current_state = "function"
function_name = delta["function_call"]["name"]
function_arg = ""
# if stream != "no":
# s = f" - {function_name}"
# callback.on_llm_new_token(token=s)
else:
current_state = "user"
message_stream = ""
message_all = ""
elif current_state == "function":
function_arg += delta['function_call']['arguments']
elif current_state == "user":
token = delta["content"]
message_all += token
if stream == "token":
callback.on_llm_new_token(token=token)
if stream == "sentence":
message_stream += token
if "." in token or "!" in token or "?" in token or "\n" in token:
if message_stream[-1] == "\n":
callback.on_llm_new_token(token=message_stream[:-1])
else:
callback.on_llm_new_token(token=message_stream)
message_stream = ""
def num_tokens_from_messages(messages, model="gpt-3.5-turbo-0613"):
"""Return the number of tokens used by a list of messages."""
try:
encoding = tiktoken.encoding_for_model(model)
except KeyError:
# print("Warning: model not found. Using cl100k_base encoding.")
encoding = tiktoken.get_encoding("cl100k_base")
if model in {
"gpt-3.5-turbo-0613",
"gpt-3.5-turbo-16k-0613",
"gpt-4-0314",
"gpt-4-32k-0314",
"gpt-4-0613",
"gpt-4-32k-0613",
}:
tokens_per_message = 3
tokens_per_name = 1
elif model == "gpt-3.5-turbo-0301":
tokens_per_message = 4 # every message follows <|start|>{role/name}\n{content}<|end|>\n
tokens_per_name = -1 # if there's a name, the role is omitted
elif "gpt-3.5-turbo" in model:
# print("Warning: gpt-3.5-turbo may update over time. Returning num tokens assuming gpt-3.5-turbo-0613.")
return num_tokens_from_messages(messages, model="gpt-3.5-turbo-0613")
elif "gpt-4" in model:
# print("Warning: gpt-4 may update over time. Returning num tokens assuming gpt-4-0613.")
return num_tokens_from_messages(messages, model="gpt-4-0613")
else:
raise NotImplementedError(
f"""num_tokens_from_messages() is not implemented for model {model}. See https://github.com/openai/openai-python/blob/main/chatml.md for information on how messages are converted to tokens."""
)
num_tokens = 0
# print(messages)
for message in messages:
num_tokens += tokens_per_message
for key, value in message.items():
if key == "function_call":
num_tokens += tokens_per_name
for k, v in value.items():
# print(k,v)
num_tokens += len(encoding.encode(v))
if value != None and key != "function_call":
num_tokens += len(encoding.encode(value))
if key == "name":
num_tokens += tokens_per_name
num_tokens += 3 # every reply is primed with <|start|>assistant<|message|>
return num_tokens
def num_tokens_from_functions(functions, model="gpt-3.5-turbo-0613"):
"""Return the number of tokens used by a list of functions."""
try:
encoding = tiktoken.encoding_for_model(model)
except KeyError:
# print("Warning: model not found. Using cl100k_base encoding.")
encoding = tiktoken.get_encoding("cl100k_base")
num_tokens = 0
for function in functions:
function_tokens = len(encoding.encode(function['name']))
function_tokens += len(encoding.encode(function['description']))
if 'parameters' in function:
parameters = function['parameters']
if 'properties' in parameters:
for propertiesKey in parameters['properties']:
function_tokens += len(encoding.encode(propertiesKey))
v = parameters['properties'][propertiesKey]
for field in v:
if field == 'type':
function_tokens += 2
function_tokens += len(encoding.encode(v['type']))
elif field == 'description':
function_tokens += 2
function_tokens += len(encoding.encode(v['description']))
elif field == 'enum':
function_tokens -= 3
for o in v['enum']:
function_tokens += 3
function_tokens += len(encoding.encode(o))
else:
dummy = 0
# print(f"Warning: not supported field: {field}")
function_tokens += 16
num_tokens += function_tokens
num_tokens += 16
return num_tokens
def token_count(
messages: List[Dict[str, str]],
functions: List[str] = None,
model = "gpt-3.5-turbo-0613"
) -> int:
msgs_tokens = num_tokens_from_messages(messages=messages, model=model)
tokens_used = msgs_tokens
if functions is not None:
function_tokens = num_tokens_from_functions(functions=functions, model=model)
tokens_used += function_tokens
return tokens_used |