Spaces:
Sleeping
Sleeping
import json | |
from typing import AsyncGenerator | |
from aworld.core.task import TaskResponse | |
from aworld.models.model_response import ModelResponse | |
from aworld.output import Output, MessageOutput | |
from aworld.runners.callback.decorator import CallbackRegistry | |
from aworld.runners.handler.base import DefaultHandler | |
from aworld.core.common import TaskItem, Observation | |
from aworld.core.context.base import Context | |
from aworld.core.event.base import Message, Constants, TopicType | |
from aworld.logs.util import logger | |
class ToolCallbackHandler(DefaultHandler): | |
def __init__(self, runner): | |
self.runner = runner | |
async def handle(self, message): | |
if message.category != Constants.TOOL_CALLBACK: | |
return | |
logger.info(f"-------ToolCallbackHandler start handle message----: {message}") | |
outputs = self.runner.task.outputs | |
output = None | |
try: | |
payload = message.payload | |
if not payload or not payload[0]: | |
return | |
observation=payload[0] | |
if not isinstance(observation, Observation): | |
return | |
if not observation.action_result: | |
return | |
for res in observation.action_result: | |
if not res or not res.content or not res.tool_name or not res.action_name: | |
continue | |
callback_func = CallbackRegistry.get(res.tool_name + "__" + res.action_name) | |
if not callback_func: | |
continue | |
callback_func(res) | |
logger.info(f"-------ToolCallbackHandler callback_func-res: {res}") | |
logger.info(f"-------ToolCallbackHandler end handle message: {observation}") | |
except Exception as e: | |
# todo | |
logger.warning(f"ToolCallbackHandler Failed to parse payload: {e}") | |
yield Message( | |
category=Constants.TASK, | |
payload=TaskItem(msg="Failed to parse output.", data=payload, stop=True), | |
sender=self.name(), | |
session_id=Context.instance().session_id, | |
topic=TopicType.ERROR | |
) | |
finally: | |
#todo | |
if output: | |
if not output.metadata: | |
output.metadata = {} | |
output.metadata['sender'] = message.sender | |
output.metadata['receiver'] = message.receiver | |
await outputs.add_output(output) | |
# 1\Update the current message node status | |
# 2\Update the incoming message node status | |
return | |