Spaces:
Runtime error
Runtime error
""" | |
Testing utilities. | |
""" | |
import os | |
import sys | |
from time import sleep | |
sys.path.insert(0, '../../tinytroupe/') | |
sys.path.insert(0, '../../') | |
sys.path.insert(0, '..') | |
import tinytroupe.openai_utils as openai_utils | |
from tinytroupe.agent import TinyPerson | |
from tinytroupe.environment import TinyWorld, TinySocialNetwork | |
import pytest | |
import importlib | |
import conftest | |
################################################## | |
# global constants | |
################################################## | |
CACHE_FILE_NAME = "tests_cache.pickle" | |
EXPORT_BASE_FOLDER = os.path.join(os.path.dirname(__file__), "outputs/exports") | |
TEMP_SIMULATION_CACHE_FILE_NAME = os.path.join(os.path.dirname(__file__), "simulation_test_case.cache.json") | |
################################################## | |
# Caching, in order to save on API usage | |
################################################## | |
if conftest.refresh_cache: | |
# DELETE the cache file tests_cache.pickle | |
os.remove(CACHE_FILE_NAME) | |
if conftest.use_cache: | |
openai_utils.force_api_cache(True, CACHE_FILE_NAME) | |
else: | |
openai_utils.force_api_cache(False, CACHE_FILE_NAME) | |
################################################## | |
# File management | |
################################################## | |
def remove_file_if_exists(file_path): | |
""" | |
Removes the file at the given path if it exists. | |
""" | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
# remove temporary files | |
remove_file_if_exists(TEMP_SIMULATION_CACHE_FILE_NAME) | |
################################################## | |
# Simulation checks utilities | |
################################################## | |
def contains_action_type(actions, action_type): | |
""" | |
Checks if the given list of actions contains an action of the given type. | |
""" | |
for action in actions: | |
if action["action"]["type"] == action_type: | |
return True | |
return False | |
def contains_action_content(actions:list, action_content: str): | |
""" | |
Checks if the given list of actions contains an action with the given content. | |
""" | |
for action in actions: | |
# checks whether the desired content is contained in the action content | |
if action_content.lower() in action["action"]["content"].lower(): | |
return True | |
return False | |
def contains_stimulus_type(stimuli, stimulus_type): | |
""" | |
Checks if the given list of stimuli contains a stimulus of the given type. | |
""" | |
for stimulus in stimuli: | |
if stimulus["type"] == stimulus_type: | |
return True | |
return False | |
def contains_stimulus_content(stimuli, stimulus_content): | |
""" | |
Checks if the given list of stimuli contains a stimulus with the given content. | |
""" | |
for stimulus in stimuli: | |
# checks whether the desired content is contained in the stimulus content | |
if stimulus_content.lower() in stimulus["content"].lower(): | |
return True | |
return False | |
def terminates_with_action_type(actions, action_type): | |
""" | |
Checks if the given list of actions terminates with an action of the given type. | |
""" | |
if len(actions) == 0: | |
return False | |
return actions[-1]["action"]["type"] == action_type | |
def proposition_holds(proposition: str) -> bool: | |
""" | |
Checks if the given proposition is true according to an LLM call. | |
This can be used to check for text properties that are hard to | |
verify mechanically, such as "the text contains some ideas for a product". | |
""" | |
system_prompt = f""" | |
Check whether the following proposition is true or false. If it is | |
true, write "true", otherwise write "false". Don't write anything else! | |
""" | |
user_prompt = f""" | |
Proposition: {proposition} | |
""" | |
messages = [{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": user_prompt}] | |
# call the LLM | |
next_message = openai_utils.client().send_message(messages) | |
# check the result | |
cleaned_message = only_alphanumeric(next_message["content"]) | |
if cleaned_message.lower().startswith("true"): | |
return True | |
elif cleaned_message.lower().startswith("false"): | |
return False | |
else: | |
raise Exception(f"LLM returned unexpected result: {cleaned_message}") | |
def only_alphanumeric(string: str): | |
""" | |
Returns a string containing only alphanumeric characters. | |
""" | |
return ''.join(c for c in string if c.isalnum()) | |
def create_test_system_user_message(user_prompt, system_prompt="You are a helpful AI assistant."): | |
""" | |
Creates a list containing one system message and one user message. | |
""" | |
messages = [{"role": "system", "content": system_prompt}] | |
if user_prompt is not None: | |
messages.append({"role": "user", "content": user_prompt}) | |
return messages | |
def agents_personas_are_equal(agent1, agent2, ignore_name=False): | |
""" | |
Checks if the configurations of two agents are equal. | |
""" | |
ignore_keys = [] | |
if ignore_name: | |
ignore_keys.append("name") | |
for key in agent1._persona.keys(): | |
if key in ignore_keys: | |
continue | |
if agent1._persona[key] != agent2._persona[key]: | |
return False | |
return True | |
def agent_first_name(agent): | |
""" | |
Returns the first name of the agent. | |
""" | |
return agent.name.split()[0] | |
############################################################################################################ | |
# I/O utilities | |
############################################################################################################ | |
def get_relative_to_test_path(path_suffix): | |
""" | |
Returns the path to the test file with the given suffix. | |
""" | |
return os.path.join(os.path.dirname(__file__), path_suffix) | |
############################################################################################################ | |
# Fixtures | |
############################################################################################################ | |
def focus_group_world(): | |
import tinytroupe.examples as examples | |
world = TinyWorld("Focus group", [examples.create_lisa_the_data_scientist(), examples.create_oscar_the_architect(), examples.create_marcos_the_physician()]) | |
return world | |
def setup(): | |
TinyPerson.clear_agents() | |
TinyWorld.clear_environments() | |
yield |