Spaces:
Sleeping
Sleeping
File size: 1,681 Bytes
b05d426 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# coding: utf-8
# Copyright (c) 2025 inclusionAI.
import asyncio
from typing import Callable, Any
from aworld.core.context.base import Context
from aworld.core.event import eventbus
from aworld.core.event.base import Message, Constants
from aworld.events.manager import EventManager
from aworld.utils.common import sync_exec
def subscribe(key: str, category: str = None):
"""Subscribe the special event to handle.
Examples:
>>> cate = Constants.TOOL or Constants.AGENT; key = "topic"
>>> @subscribe(category=cate, key=key)
>>> def example(message: Message) -> Message | None:
>>> print("do something")
Args:
key: The index key of the handler.
category: Types of subscription events, the value is `agent` or `tool`.
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
if category is None:
sync_exec(eventbus.subscribe, Constants.TOOL, key, func)
sync_exec(eventbus.subscribe, Constants.AGENT, key, func)
else:
sync_exec(eventbus.subscribe, category, key, func)
return func
return decorator
async def _send_message(msg: Message) -> str:
context = msg.context
if not context:
context = Context()
event_mng = context.event_manager
if not event_mng:
event_mng = EventManager(context)
await event_mng.emit_message(msg)
return msg.id
async def send_message(msg: Message) -> asyncio.Task:
"""Utility function of send event.
Args:
msg: The content and meta information to be sent.
"""
task = asyncio.create_task(_send_message(msg), name=msg.id)
return task
|