import os import openai from openai import OpenAI, AzureOpenAI import time import json import pickle import logging import configparser from pydantic import BaseModel from typing import Union import textwrap # to dedent strings import tiktoken from tinytroupe import utils from tinytroupe.control import transactional logger = logging.getLogger("tinytroupe") # We'll use various configuration elements below config = utils.read_config_file() ########################################################################### # Default parameter values ########################################################################### default = {} default["model"] = config["OpenAI"].get("MODEL", "gpt-4o") default["max_tokens"] = int(config["OpenAI"].get("MAX_TOKENS", "1024")) default["temperature"] = float(config["OpenAI"].get("TEMPERATURE", "1.0")) default["top_p"] = int(config["OpenAI"].get("TOP_P", "0")) default["frequency_penalty"] = float(config["OpenAI"].get("FREQ_PENALTY", "0.0")) default["presence_penalty"] = float( config["OpenAI"].get("PRESENCE_PENALTY", "0.0")) default["timeout"] = float(config["OpenAI"].get("TIMEOUT", "30.0")) default["max_attempts"] = float(config["OpenAI"].get("MAX_ATTEMPTS", "0.0")) default["waiting_time"] = float(config["OpenAI"].get("WAITING_TIME", "1")) default["exponential_backoff_factor"] = float(config["OpenAI"].get("EXPONENTIAL_BACKOFF_FACTOR", "5")) default["embedding_model"] = config["OpenAI"].get("EMBEDDING_MODEL", "text-embedding-3-small") default["cache_api_calls"] = config["OpenAI"].getboolean("CACHE_API_CALLS", False) default["cache_file_name"] = config["OpenAI"].get("CACHE_FILE_NAME", "openai_api_cache.pickle") ########################################################################### # Model calling helpers ########################################################################### class LLMRequest: """ A class that represents an LLM model call. It contains the input messages, the model configuration, and the model output. """ def __init__(self, system_template_name:str=None, system_prompt:str=None, user_template_name:str=None, user_prompt:str=None, output_type=None, **model_params): """ Initializes an LLMCall instance with the specified system and user templates, or the system and user prompts. If a template is specified, the corresponding prompt must be None, and vice versa. """ if (system_template_name is not None and system_prompt is not None) or \ (user_template_name is not None and user_prompt is not None) or\ (system_template_name is None and system_prompt is None) or \ (user_template_name is None and user_prompt is None): raise ValueError("Either the template or the prompt must be specified, but not both.") self.system_template_name = system_template_name self.user_template_name = user_template_name self.system_prompt = textwrap.dedent(system_prompt) # remove identation self.user_prompt = textwrap.dedent(user_prompt) # remove identation self.output_type = output_type self.model_params = model_params self.model_output = None self.messages = [] # will be set after the call self.response_raw = None self.response_json = None self.response_value = None self.response_justification = None self.response_confidence = None def __call__(self, *args, **kwds): return self.call(*args, **kwds) def call(self, **rendering_configs): """ Calls the LLM model with the specified rendering configurations. Args: rendering_configs: The rendering configurations (template variables) to use when composing the initial messages. Returns: The content of the model response. """ if self.system_template_name is not None and self.user_template_name is not None: self.messages = utils.compose_initial_LLM_messages_with_templates(self.system_template_name, self.user_template_name, rendering_configs) else: self.messages = [{"role": "system", "content": self.system_prompt}, {"role": "user", "content": self.user_prompt}] # # Setup typing for the output # if self.output_type is not None: # specify the structured output self.model_params["response_format"] = LLMScalarWithJustificationResponse self.messages.append({"role": "user", "content": "In your response, you **MUST** provide a value, along with a justification and your confidence level that the value and justification are correct (0.0 means no confidence, 1.0 means complete confidence)."+ "Furtheremore, your response **MUST** be a JSON object with the following structure: {\"value\": value, \"justification\": justification, \"confidence\": confidence}."}) # specify the value type if self.output_type == bool: self.messages.append(self._request_bool_llm_message()) elif self.output_type == int: self.messages.append(self._request_integer_llm_message()) elif self.output_type == float: self.messages.append(self._request_float_llm_message()) elif self.output_type == list and all(isinstance(option, str) for option in self.output_type): self.messages.append(self._request_enumerable_llm_message(self.output_type)) elif self.output_type == str: pass else: raise ValueError(f"Unsupported output type: {self.output_type}") # # call the LLM model # self.model_output = client().send_message(self.messages, **self.model_params) if 'content' in self.model_output: self.response_raw = self.response_value = self.model_output['content'] # further, if an output type is specified, we need to coerce the result to that type if self.output_type is not None: self.response_json = utils.extract_json(self.response_raw) self.response_value = self.response_json["value"] self.response_justification = self.response_json["justification"] self.response_confidence = self.response_json["confidence"] if self.output_type == bool: self.response_value = self._coerce_to_bool(self.response_value) elif self.output_type == int: self.response_value = self._coerce_to_integer(self.response_value) elif self.output_type == float: self.response_value = self._coerce_to_float(self.response_value) elif self.output_type == list and all(isinstance(option, str) for option in self.output_type): self.response_value = self._coerce_to_enumerable(self.response_value, self.output_type) elif self.output_type == str: pass else: raise ValueError(f"Unsupported output type: {self.output_type}") return self.response_value else: logger.error(f"Model output does not contain 'content' key: {self.model_output}") return None def _coerce_to_bool(self, llm_output): """ Coerces the LLM output to a boolean value. This method looks for the string "True", "False", "Yes", "No", "Positive", "Negative" in the LLM output, such that - case is neutralized; - the first occurrence of the string is considered, the rest is ignored. For example, " Yes, that is true" will be considered "Yes"; - if no such string is found, the method raises an error. So it is important that the prompts actually requests a boolean value. Args: llm_output (str, bool): The LLM output to coerce. Returns: The boolean value of the LLM output. """ # if the LLM output is already a boolean, we return it if isinstance(llm_output, bool): return llm_output # let's extract the first occurrence of the string "True", "False", "Yes", "No", "Positive", "Negative" in the LLM output. # using a regular expression import re match = re.search(r'\b(?:True|False|Yes|No|Positive|Negative)\b', llm_output, re.IGNORECASE) if match: first_match = match.group(0).lower() if first_match in ["true", "yes", "positive"]: return True elif first_match in ["false", "no", "negative"]: return False raise ValueError("The LLM output does not contain a recognizable boolean value.") def _request_bool_llm_message(self): return {"role": "user", "content": "The `value` field you generate **must** be either 'True' or 'False'. This is critical for later processing. If you don't know the correct answer, just output 'False'."} def _coerce_to_integer(self, llm_output:str): """ Coerces the LLM output to an integer value. This method looks for the first occurrence of an integer in the LLM output, such that - the first occurrence of the integer is considered, the rest is ignored. For example, "There are 3 cats" will be considered 3; - if no integer is found, the method raises an error. So it is important that the prompts actually requests an integer value. Args: llm_output (str, int): The LLM output to coerce. Returns: The integer value of the LLM output. """ # if the LLM output is already an integer, we return it if isinstance(llm_output, int): return llm_output # let's extract the first occurrence of an integer in the LLM output. # using a regular expression import re match = re.search(r'\b\d+\b', llm_output) if match: return int(match.group(0)) raise ValueError("The LLM output does not contain a recognizable integer value.") def _request_integer_llm_message(self): return {"role": "user", "content": "The `value` field you generate **must** be an integer number (e.g., '1'). This is critical for later processing.."} def _coerce_to_float(self, llm_output:str): """ Coerces the LLM output to a float value. This method looks for the first occurrence of a float in the LLM output, such that - the first occurrence of the float is considered, the rest is ignored. For example, "The price is $3.50" will be considered 3.50; - if no float is found, the method raises an error. So it is important that the prompts actually requests a float value. Args: llm_output (str, float): The LLM output to coerce. Returns: The float value of the LLM output. """ # if the LLM output is already a float, we return it if isinstance(llm_output, float): return llm_output # let's extract the first occurrence of a float in the LLM output. # using a regular expression import re match = re.search(r'\b\d+\.\d+\b', llm_output) if match: return float(match.group(0)) raise ValueError("The LLM output does not contain a recognizable float value.") def _request_float_llm_message(self): return {"role": "user", "content": "The `value` field you generate **must** be a float number (e.g., '980.16'). This is critical for later processing."} def _coerce_to_enumerable(self, llm_output:str, options:list): """ Coerces the LLM output to one of the specified options. This method looks for the first occurrence of one of the specified options in the LLM output, such that - the first occurrence of the option is considered, the rest is ignored. For example, "I prefer cats" will be considered "cats"; - if no option is found, the method raises an error. So it is important that the prompts actually requests one of the specified options. Args: llm_output (str): The LLM output to coerce. options (list): The list of options to consider. Returns: The option value of the LLM output. """ # let's extract the first occurrence of one of the specified options in the LLM output. # using a regular expression import re match = re.search(r'\b(?:' + '|'.join(options) + r')\b', llm_output, re.IGNORECASE) if match: return match.group(0) raise ValueError("The LLM output does not contain a recognizable option value.") def _request_enumerable_llm_message(self, options:list): options_list_as_string = ', '.join([f"'{o}'" for o in options]) return {"role": "user", "content": f"The `value` field you generate **must** be exactly one of the following strings: {options_list_as_string}. This is critical for later processing."} def __repr__(self): return f"LLMRequest(messages={self.messages}, model_params={self.model_params}, model_output={self.model_output})" # # Data structures to enforce output format during LLM API call. # class LLMScalarWithJustificationResponse(BaseModel): """ LLMTypedResponse represents a typed response from an LLM (Language Learning Model). Attributes: value (str, int, float, list): The value of the response. justification (str): The justification or explanation for the response. """ value: Union[str, int, float, bool] justification: str confidence: float ########################################################################### # Client class ########################################################################### class OpenAIClient: """ A utility class for interacting with the OpenAI API. """ def __init__(self, cache_api_calls=default["cache_api_calls"], cache_file_name=default["cache_file_name"]) -> None: logger.debug("Initializing OpenAIClient") # should we cache api calls and reuse them? self.set_api_cache(cache_api_calls, cache_file_name) def set_api_cache(self, cache_api_calls, cache_file_name=default["cache_file_name"]): """ Enables or disables the caching of API calls. Args: cache_file_name (str): The name of the file to use for caching API calls. """ self.cache_api_calls = cache_api_calls self.cache_file_name = cache_file_name if self.cache_api_calls: # load the cache, if any self.api_cache = self._load_cache() def _setup_from_config(self): """ Sets up the OpenAI API configurations for this client. """ self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) def send_message(self, current_messages, model=default["model"], temperature=default["temperature"], max_tokens=default["max_tokens"], top_p=default["top_p"], frequency_penalty=default["frequency_penalty"], presence_penalty=default["presence_penalty"], stop=[], timeout=default["timeout"], max_attempts=default["max_attempts"], waiting_time=default["waiting_time"], exponential_backoff_factor=default["exponential_backoff_factor"], n = 1, response_format=None, echo=False): """ Sends a message to the OpenAI API and returns the response. Args: current_messages (list): A list of dictionaries representing the conversation history. model (str): The ID of the model to use for generating the response. temperature (float): Controls the "creativity" of the response. Higher values result in more diverse responses. max_tokens (int): The maximum number of tokens (words or punctuation marks) to generate in the response. top_p (float): Controls the "quality" of the response. Higher values result in more coherent responses. frequency_penalty (float): Controls the "repetition" of the response. Higher values result in less repetition. presence_penalty (float): Controls the "diversity" of the response. Higher values result in more diverse responses. stop (str): A string that, if encountered in the generated response, will cause the generation to stop. max_attempts (int): The maximum number of attempts to make before giving up on generating a response. timeout (int): The maximum number of seconds to wait for a response from the API. waiting_time (int): The number of seconds to wait between requests. exponential_backoff_factor (int): The factor by which to increase the waiting time between requests. n (int): The number of completions to generate. response_format: The format of the response, if any. Returns: A dictionary representing the generated response. """ def aux_exponential_backoff(): nonlocal waiting_time # in case waiting time was initially set to 0 if waiting_time <= 0: waiting_time = 2 logger.info(f"Request failed. Waiting {waiting_time} seconds between requests...") time.sleep(waiting_time) # exponential backoff waiting_time = waiting_time * exponential_backoff_factor # setup the OpenAI configurations for this client. self._setup_from_config() # We need to adapt the parameters to the API type, so we create a dictionary with them first chat_api_params = { "model": model, "messages": current_messages, "temperature": temperature, "max_tokens":max_tokens, "top_p": top_p, "frequency_penalty": frequency_penalty, "presence_penalty": presence_penalty, "stop": stop, "timeout": timeout, "stream": False, "n": n, } if response_format is not None: chat_api_params["response_format"] = response_format i = 0 while i < max_attempts: try: i += 1 try: logger.debug(f"Sending messages to OpenAI API. Token count={self._count_tokens(current_messages, model)}.") except NotImplementedError: logger.debug(f"Token count not implemented for model {model}.") start_time = time.monotonic() logger.debug(f"Calling model with client class {self.__class__.__name__}.") ############################################################### # call the model, either from the cache or from the API ############################################################### cache_key = str((model, chat_api_params)) # need string to be hashable if self.cache_api_calls and (cache_key in self.api_cache): response = self.api_cache[cache_key] else: if waiting_time > 0: logger.info(f"Waiting {waiting_time} seconds before next API request (to avoid throttling)...") time.sleep(waiting_time) response = self._raw_model_call(model, chat_api_params) if self.cache_api_calls: self.api_cache[cache_key] = response self._save_cache() logger.debug(f"Got response from API: {response}") end_time = time.monotonic() logger.debug( f"Got response in {end_time - start_time:.2f} seconds after {i} attempts.") return utils.sanitize_dict(self._raw_model_response_extractor(response)) except InvalidRequestError as e: logger.error(f"[{i}] Invalid request error, won't retry: {e}") # there's no point in retrying if the request is invalid # so we return None right away return None except openai.BadRequestError as e: logger.error(f"[{i}] Invalid request error, won't retry: {e}") # there's no point in retrying if the request is invalid # so we return None right away return None except openai.RateLimitError: logger.warning( f"[{i}] Rate limit error, waiting a bit and trying again.") aux_exponential_backoff() except NonTerminalError as e: logger.error(f"[{i}] Non-terminal error: {e}") aux_exponential_backoff() except Exception as e: logger.error(f"[{i}] Error: {e}") logger.error(f"Failed to get response after {max_attempts} attempts.") return None def _raw_model_call(self, model, chat_api_params): """ Calls the OpenAI API with the given parameters. Subclasses should override this method to implement their own API calls. """ if "response_format" in chat_api_params: # to enforce the response format via pydantic, we need to use a different method del chat_api_params["stream"] return self.client.beta.chat.completions.parse( **chat_api_params ) else: return self.client.chat.completions.create( **chat_api_params ) def _raw_model_response_extractor(self, response): """ Extracts the response from the API response. Subclasses should override this method to implement their own response extraction. """ return response.choices[0].message.to_dict() def _count_tokens(self, messages: list, model: str): """ Count the number of OpenAI tokens in a list of messages using tiktoken. Adapted from https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb Args: messages (list): A list of dictionaries representing the conversation history. model (str): The name of the model to use for encoding the string. """ try: try: encoding = tiktoken.encoding_for_model(model) except KeyError: logger.debug("Token count: model not found. Using cl100k_base encoding.") encoding = tiktoken.get_encoding("cl100k_base") if model in { "gpt-3.5-turbo-0613", "gpt-3.5-turbo-16k-0613", "gpt-4-0314", "gpt-4-32k-0314", "gpt-4-0613", "gpt-4-32k-0613", }: tokens_per_message = 3 tokens_per_name = 1 elif model == "gpt-3.5-turbo-0301": tokens_per_message = 4 # every message follows <|start|>{role/name}\n{content}<|end|>\n tokens_per_name = -1 # if there's a name, the role is omitted elif "gpt-3.5-turbo" in model: logger.debug("Token count: gpt-3.5-turbo may update over time. Returning num tokens assuming gpt-3.5-turbo-0613.") return self._count_tokens(messages, model="gpt-3.5-turbo-0613") elif ("gpt-4" in model) or ("ppo" in model): logger.debug("Token count: gpt-4 may update over time. Returning num tokens assuming gpt-4-0613.") return self._count_tokens(messages, model="gpt-4-0613") else: raise NotImplementedError( f"""num_tokens_from_messages() is not implemented for model {model}. See https://github.com/openai/openai-python/blob/main/chatml.md for information on how messages are converted to tokens.""" ) num_tokens = 0 for message in messages: num_tokens += tokens_per_message for key, value in message.items(): num_tokens += len(encoding.encode(value)) if key == "name": num_tokens += tokens_per_name num_tokens += 3 # every reply is primed with <|start|>assistant<|message|> return num_tokens except Exception as e: logger.error(f"Error counting tokens: {e}") return None def _save_cache(self): """ Saves the API cache to disk. We use pickle to do that because some obj are not JSON serializable. """ # use pickle to save the cache pickle.dump(self.api_cache, open(self.cache_file_name, "wb")) def _load_cache(self): """ Loads the API cache from disk. """ # unpickle return pickle.load(open(self.cache_file_name, "rb")) if os.path.exists(self.cache_file_name) else {} def get_embedding(self, text, model=default["embedding_model"]): """ Gets the embedding of the given text using the specified model. Args: text (str): The text to embed. model (str): The name of the model to use for embedding the text. Returns: The embedding of the text. """ response = self._raw_embedding_model_call(text, model) return self._raw_embedding_model_response_extractor(response) def _raw_embedding_model_call(self, text, model): """ Calls the OpenAI API to get the embedding of the given text. Subclasses should override this method to implement their own API calls. """ return self.client.embeddings.create( input=[text], model=model ) def _raw_embedding_model_response_extractor(self, response): """ Extracts the embedding from the API response. Subclasses should override this method to implement their own response extraction. """ return response.data[0].embedding class AzureClient(OpenAIClient): def __init__(self, cache_api_calls=default["cache_api_calls"], cache_file_name=default["cache_file_name"]) -> None: logger.debug("Initializing AzureClient") super().__init__(cache_api_calls, cache_file_name) def _setup_from_config(self): """ Sets up the Azure OpenAI Service API configurations for this client, including the API endpoint and key. """ self.client = AzureOpenAI(azure_endpoint= os.getenv("AZURE_OPENAI_ENDPOINT"), api_version = config["OpenAI"]["AZURE_API_VERSION"], api_key = os.getenv("AZURE_OPENAI_KEY")) ########################################################################### # Exceptions ########################################################################### class InvalidRequestError(Exception): """ Exception raised when the request to the OpenAI API is invalid. """ pass class NonTerminalError(Exception): """ Exception raised when an unspecified error occurs but we know we can retry. """ pass ########################################################################### # Clients registry # # We can have potentially different clients, so we need a place to # register them and retrieve them when needed. # # We support both OpenAI and Azure OpenAI Service API by default. # Thus, we need to set the API parameters based on the choice of the user. # This is done within specialized classes. # # It is also possible to register custom clients, to access internal or # otherwise non-conventional API endpoints. ########################################################################### _api_type_to_client = {} _api_type_override = None def register_client(api_type, client): """ Registers a client for the given API type. Args: api_type (str): The API type for which we want to register the client. client: The client to register. """ _api_type_to_client[api_type] = client def _get_client_for_api_type(api_type): """ Returns the client for the given API type. Args: api_type (str): The API type for which we want to get the client. """ try: return _api_type_to_client[api_type] except KeyError: raise ValueError(f"API type {api_type} is not supported. Please check the 'config.ini' file.") def client(): """ Returns the client for the configured API type. """ api_type = config["OpenAI"]["API_TYPE"] if _api_type_override is None else _api_type_override logger.debug(f"Using API type {api_type}.") return _get_client_for_api_type(api_type) # TODO simplify the custom configuration methods below def force_api_type(api_type): """ Forces the use of the given API type, thus overriding any other configuration. Args: api_type (str): The API type to use. """ global _api_type_override _api_type_override = api_type def force_api_cache(cache_api_calls, cache_file_name=default["cache_file_name"]): """ Forces the use of the given API cache configuration, thus overriding any other configuration. Args: cache_api_calls (bool): Whether to cache API calls. cache_file_name (str): The name of the file to use for caching API calls. """ # set the cache parameters on all clients for client in _api_type_to_client.values(): client.set_api_cache(cache_api_calls, cache_file_name) # default client register_client("openai", OpenAIClient()) register_client("azure", AzureClient())