Spaces:
Sleeping
Sleeping
# coding: utf-8 | |
# Copyright (c) 2025 inclusionAI. | |
import copy | |
import inspect | |
import os.path | |
from typing import Dict, Any, List, Union | |
from aworld.core.context.base import Context | |
from aworld.logs.util import logger | |
from aworld.models.qwen_tokenizer import qwen_tokenizer | |
from aworld.models.openai_tokenizer import openai_tokenizer | |
from aworld.utils import import_package | |
def usage_process(usage: Dict[str, Union[int, Dict[str, int]]] = {}, context: Context = None): | |
if not context: | |
context = Context.instance() | |
stacks = inspect.stack() | |
index = 0 | |
for idx, stack in enumerate(stacks): | |
index = idx + 1 | |
file = os.path.basename(stack.filename) | |
# supported use `llm.py` utility function only | |
if 'call_llm_model' in stack.function and file == 'llm.py': | |
break | |
if index >= len(stacks): | |
logger.warning("not category usage find to count") | |
else: | |
instance = stacks[index].frame.f_locals.get('self') | |
name = getattr(instance, "_name", "unknown") | |
usage[name] = copy.copy(usage) | |
# total usage | |
context.add_token(usage) | |
def num_tokens_from_messages(messages, model="gpt-4o"): | |
"""Return the number of tokens used by a list of messages.""" | |
import_package("tiktoken") | |
import tiktoken | |
if model.lower() == "qwen": | |
encoding = qwen_tokenizer | |
elif model.lower() == "openai": | |
encoding = openai_tokenizer | |
else: | |
try: | |
encoding = tiktoken.encoding_for_model(model) | |
except KeyError: | |
logger.warning(f"{model} model not found. Using cl100k_base encoding.") | |
encoding = tiktoken.get_encoding("cl100k_base") | |
tokens_per_message = 3 | |
tokens_per_name = 1 | |
num_tokens = 0 | |
for message in messages: | |
num_tokens += tokens_per_message | |
if isinstance(message, str): | |
num_tokens += len(encoding.encode(message)) | |
else: | |
for key, value in message.items(): | |
num_tokens += len(encoding.encode(str(value))) | |
if key == "name": | |
num_tokens += tokens_per_name | |
num_tokens += 3 | |
return num_tokens | |
def truncate_tokens_from_messages(messages: List[Dict[str, Any]], max_tokens: int, keep_both_sides: bool = False, model: str = "gpt-4o"): | |
import_package("tiktoken") | |
import tiktoken | |
if model.lower() == "qwen": | |
return qwen_tokenizer.truncate(messages, max_tokens, keep_both_sides) | |
elif model.lower() == "openai": | |
return openai_tokenizer.truncate(messages, max_tokens, keep_both_sides) | |
try: | |
encoding = tiktoken.encoding_for_model(model) | |
except KeyError: | |
logger.warning(f"{model} model not found. Using cl100k_base encoding.") | |
encoding = tiktoken.get_encoding("cl100k_base") | |
return encoding.truncate(messages, max_tokens, keep_both_sides) | |
def agent_desc_transform(agent_dict: Dict[str, Any], | |
agents: List[str] = None, | |
provider: str = 'openai', | |
strategy: str = 'min') -> List[Dict[str, Any]]: | |
"""Default implement transform framework standard protocol to openai protocol of agent description. | |
Args: | |
agent_dict: Dict of descriptions of agents that are registered in the agent factory. | |
agents: Description of special agents to use. | |
provider: Different descriptions formats need to be processed based on the provider. | |
strategy: The value is `min` or `max`, when no special agents are provided, `min` indicates no content returned, | |
`max` means get all agents' descriptions. | |
""" | |
agent_as_tools = [] | |
if not agents and strategy == 'min': | |
return agent_as_tools | |
if provider and 'openai' in provider: | |
for agent_name, agent_info in agent_dict.items(): | |
if agents and agent_name not in agents: | |
logger.debug(f"{agent_name} can not supported in {agents}, you can set `tools` params to support it.") | |
continue | |
for action in agent_info["abilities"]: | |
# Build parameter properties | |
properties = {} | |
required = [] | |
for param_name, param_info in action["params"].items(): | |
properties[param_name] = { | |
"description": param_info["desc"], | |
"type": param_info["type"] if param_info["type"] != "str" else "string" | |
} | |
if param_info.get("required", False): | |
required.append(param_name) | |
openai_function_schema = { | |
"name": f'{agent_name}__{action["name"]}', | |
"description": action["desc"], | |
"parameters": { | |
"type": "object", | |
"properties": properties, | |
"required": required | |
} | |
} | |
agent_as_tools.append({ | |
"type": "function", | |
"function": openai_function_schema | |
}) | |
return agent_as_tools | |
def tool_desc_transform(tool_dict: Dict[str, Any], | |
tools: List[str] = None, | |
black_tool_actions: Dict[str, List[str]] = {}, | |
provider: str = 'openai', | |
strategy: str = 'min') -> List[Dict[str, Any]]: | |
"""Default implement transform framework standard protocol to openai protocol of tool description. | |
Args: | |
tool_dict: Dict of descriptions of tools that are registered in the agent factory. | |
tools: Description of special tools to use. | |
provider: Different descriptions formats need to be processed based on the provider. | |
strategy: The value is `min` or `max`, when no special tools are provided, `min` indicates no content returned, | |
`max` means get all tools' descriptions. | |
""" | |
openai_tools = [] | |
if not tools and strategy == 'min': | |
return openai_tools | |
if black_tool_actions is None: | |
black_tool_actions = {} | |
if provider and 'openai' in provider: | |
for tool_name, tool_info in tool_dict.items(): | |
if tools and tool_name not in tools: | |
logger.debug(f"{tool_name} can not supported in {tools}, you can set `tools` params to support it.") | |
continue | |
black_actions = black_tool_actions.get(tool_name, []) | |
for action in tool_info["actions"]: | |
if action['name'] in black_actions: | |
continue | |
# Build parameter properties | |
properties = {} | |
required = [] | |
for param_name, param_info in action["params"].items(): | |
properties[param_name] = { | |
"description": param_info["desc"], | |
"type": param_info["type"] if param_info["type"] != "str" else "string" | |
} | |
if param_info.get("required", False): | |
required.append(param_name) | |
openai_function_schema = { | |
"name": f'{tool_name}__{action["name"]}', | |
"description": action["desc"], | |
"parameters": { | |
"type": "object", | |
"properties": properties, | |
"required": required | |
} | |
} | |
openai_tools.append({ | |
"type": "function", | |
"function": openai_function_schema | |
}) | |
return openai_tools | |