File size: 2,596 Bytes
59c91c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import json
from typing import AsyncGenerator

from aworld.core.task import TaskResponse
from aworld.models.model_response import ModelResponse
from aworld.output import Output, MessageOutput
from aworld.runners.callback.decorator import CallbackRegistry
from aworld.runners.handler.base import DefaultHandler
from aworld.core.common import TaskItem, Observation
from aworld.core.context.base import Context
from aworld.core.event.base import Message, Constants, TopicType
from aworld.logs.util import logger

class ToolCallbackHandler(DefaultHandler):
    def __init__(self, runner):
        self.runner = runner

    async def handle(self, message):
        if message.category != Constants.TOOL_CALLBACK:
            return
        logger.info(f"-------ToolCallbackHandler start handle message----: {message}")
        outputs = self.runner.task.outputs
        output = None
        try:
            payload = message.payload
            if not payload or not payload[0]:
                return
            observation=payload[0]
            if not isinstance(observation, Observation):
                return
            if not observation.action_result:
                return
            for res in observation.action_result:
                if not res or not res.content or not res.tool_name or not res.action_name:
                    continue
                callback_func = CallbackRegistry.get(res.tool_name + "__" + res.action_name)
                if not callback_func:
                    continue
                callback_func(res)
                logger.info(f"-------ToolCallbackHandler callback_func-res: {res}")
            logger.info(f"-------ToolCallbackHandler end  handle message: {observation}")
        except Exception as e:
            # todo
            logger.warning(f"ToolCallbackHandler Failed to parse payload: {e}")
            yield Message(
                category=Constants.TASK,
                payload=TaskItem(msg="Failed to parse output.", data=payload, stop=True),
                sender=self.name(),
                session_id=Context.instance().session_id,
                topic=TopicType.ERROR
            )
        finally:
            #todo
            if output:
                if not output.metadata:
                    output.metadata = {}
                output.metadata['sender'] = message.sender
                output.metadata['receiver'] = message.receiver
                await outputs.add_output(output)
            # 1\Update the current message node status
            # 2\Update the incoming message node status


        return