Spaces:
Sleeping
Sleeping
| # src.kg.openai_api.py | |
| import json | |
| import logging | |
| import os | |
| from pathlib import Path | |
| import openai | |
| from dotenv import load_dotenv | |
| from openai.error import (APIError, RateLimitError, ServiceUnavailableError, | |
| Timeout, APIConnectionError, InvalidRequestError) | |
| from tenacity import (before_sleep_log, retry, retry_if_exception_type, | |
| stop_after_delay, wait_random_exponential, stop_after_attempt) | |
| from tiktoken import Encoding, encoding_for_model | |
| logger = logging.getLogger(__name__) | |
| load_dotenv() | |
| # This value is set by OpenAI for the selected model and cannot be changed. | |
| MAX_MODEL_TOKEN_COUNT = 4096 | |
| # This value can be changed. | |
| MAX_RESPONSE_TOKEN_COUNT = 512 | |
| RESPONSES_DIRECTORY_PATH = Path('../openai-api-responses-new') | |
| def get_openai_model_encoding(model_id): | |
| """Get the encoding (tokenizer) for the OpenAI model.""" | |
| return encoding_for_model(model_id) | |
| def get_max_chapter_segment_token_count(prompt: str, model_id: str) -> int: | |
| """ | |
| Calculate the maximum number of tokens that a chapter segment may contain | |
| given the prompt. | |
| """ | |
| encoding = get_openai_model_encoding(model_id) | |
| # `encode_ordinary()` ignores special tokens and is slightly faster than | |
| # `encode()`. | |
| prompt_token_count = len(encoding.encode_ordinary(prompt)) | |
| # Subtract 8 for tokens added by OpenAI in the prompt and response (refer | |
| # to https://platform.openai.com/docs/guides/chat/managing-tokens for | |
| # details). | |
| # Subtract 1 for the newline added below to the end of the prompt. | |
| # This calculation does not have to be exact. | |
| max_chapter_segment_token_count = (MAX_MODEL_TOKEN_COUNT | |
| - MAX_RESPONSE_TOKEN_COUNT | |
| - prompt_token_count - 8 - 1) | |
| return max_chapter_segment_token_count | |
| def save_openai_api_response(prompt_messages): | |
| """ | |
| Use a prompt to make a request to the OpenAI API and return the response data. | |
| """ | |
| openai.api_key = prompt_messages[0]['api_key'] # Set the API key for OpenAI | |
| model_id = prompt_messages[0]['model_id'] # Get the model ID from the prompt messages | |
| prompt_messages[0].pop('api_key') # Remove the API key from the prompt messages | |
| prompt_messages[0].pop('model_id') # Remove the model ID from the prompt messages | |
| try: | |
| logger.info('Calling OpenAI API...') | |
| response = openai.ChatCompletion.create( | |
| model=model_id, messages=prompt_messages, temperature=0 | |
| ) | |
| finish_reason = response.choices[0].finish_reason | |
| if finish_reason != 'stop': | |
| logger.error(f'`finish_reason` is `{finish_reason}`.') | |
| save_data = { | |
| 'model': response.model, | |
| 'usage': response.usage, | |
| 'finish_reason': finish_reason, | |
| 'prompt_messages': prompt_messages, | |
| 'response': response.choices[0].message.content | |
| } | |
| except InvalidRequestError: | |
| logger.error('InvalidRequestError encountered 10 times. Returning empty response.') | |
| save_data = { | |
| 'model': None, | |
| 'usage': None, | |
| 'finish_reason': 'invalid_request', | |
| 'prompt_messages': prompt_messages, | |
| 'response': ' ' | |
| } | |
| return save_data | |
| def load_response_text(save_path): | |
| """ | |
| Load the response text from a JSON file containing response data from the | |
| OpenAI API. | |
| """ | |
| with open(save_path, 'r') as save_file: | |
| save_data = json.load(save_file) | |
| return save_data['response'] | |