File size: 3,862 Bytes
bc5e560
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# coding: utf-8
# Copyright (c) 2025 inclusionAI.
import abc
from typing import AsyncGenerator

from aworld.core.agent.base import is_agent
from aworld.core.common import ActionModel, TaskItem
from aworld.core.event.base import Message, Constants, TopicType
from aworld.core.tool.base import AsyncTool, Tool, ToolFactory
from aworld.logs.util import logger
from aworld.runners.handler.base import DefaultHandler


class ToolHandler(DefaultHandler):
    __metaclass__ = abc.ABCMeta

    def __init__(self, runner: 'TaskEventRunner'):
        self.tools = runner.tools
        self.tools_conf = runner.tools_conf

    @classmethod
    def name(cls):
        return "_tool_handler"


class DefaultToolHandler(ToolHandler):
    async def handle(self, message: Message) -> AsyncGenerator[Message, None]:
        if message.category != Constants.TOOL:
            return

        headers = {"context": message.context}
        # data is List[ActionModel]
        data = message.payload
        if not data:
            # error message, p2p
            yield Message(
                category=Constants.TASK,
                payload=TaskItem(msg="no data to process.", data=data, stop=True),
                sender='agent_handler',
                session_id=message.session_id,
                topic=TopicType.ERROR,
                headers=headers
            )
            return

        for action in data:
            if not isinstance(action, ActionModel):
                # error message, p2p
                yield Message(
                    category=Constants.TASK,
                    payload=TaskItem(msg="action not a ActionModel.", data=data, stop=True),
                    sender=self.name(),
                    session_id=message.session_id,
                    topic=TopicType.ERROR,
                    headers=headers
                )
                return

        new_tools = dict()
        tool_mapping = dict()
        # Directly use or use tools after creation.
        for act in data:
            if is_agent(act):
                logger.warning(f"somethings wrong, {act} is an agent.")
                continue

            if not self.tools or (self.tools and act.tool_name not in self.tools):
                # dynamic only use default config in module.
                conf = self.tools_conf.get(act.tool_name)
                tool = ToolFactory(act.tool_name, conf=conf, asyn=conf.use_async if conf else False)
                tool.event_driven = True
                if isinstance(tool, Tool):
                    tool.reset()
                elif isinstance(tool, AsyncTool):
                    await tool.reset()
                tool_mapping[act.tool_name] = []
                self.tools[act.tool_name] = tool
                new_tools[act.tool_name] = tool
            if act.tool_name not in tool_mapping:
                tool_mapping[act.tool_name] = []
            tool_mapping[act.tool_name].append(act)

        if new_tools:
            yield Message(
                category=Constants.TASK,
                payload=TaskItem(data=new_tools),
                sender=self.name(),
                session_id=message.session_id,
                topic=TopicType.SUBSCRIBE_TOOL,
                headers=headers
            )

        for tool_name, actions in tool_mapping.items():
            if not (isinstance(self.tools[tool_name], Tool) or isinstance(self.tools[tool_name], AsyncTool)):
                logger.warning(f"Unsupported tool type: {self.tools[tool_name]}")
                continue

            # send to the tool
            yield Message(
                category=Constants.TOOL,
                payload=actions,
                sender=actions[0].agent_name if actions else '',
                session_id=message.session_id,
                receiver=tool_name,
                headers=headers
            )