Spaces:
Sleeping
Sleeping
# coding: utf-8 | |
# Copyright (c) 2025 inclusionAI. | |
import asyncio | |
from typing import Callable, Any | |
from aworld.core.context.base import Context | |
from aworld.core.event import eventbus | |
from aworld.core.event.base import Message, Constants | |
from aworld.events.manager import EventManager | |
from aworld.utils.common import sync_exec | |
def subscribe(key: str, category: str = None): | |
"""Subscribe the special event to handle. | |
Examples: | |
>>> cate = Constants.TOOL or Constants.AGENT; key = "topic" | |
>>> @subscribe(category=cate, key=key) | |
>>> def example(message: Message) -> Message | None: | |
>>> print("do something") | |
Args: | |
key: The index key of the handler. | |
category: Types of subscription events, the value is `agent` or `tool`. | |
""" | |
def decorator(func: Callable[..., Any]) -> Callable[..., Any]: | |
if category is None: | |
sync_exec(eventbus.subscribe, Constants.TOOL, key, func) | |
sync_exec(eventbus.subscribe, Constants.AGENT, key, func) | |
else: | |
sync_exec(eventbus.subscribe, category, key, func) | |
return func | |
return decorator | |
async def _send_message(msg: Message) -> str: | |
context = msg.context | |
if not context: | |
context = Context() | |
event_mng = context.event_manager | |
if not event_mng: | |
event_mng = EventManager(context) | |
await event_mng.emit_message(msg) | |
return msg.id | |
async def send_message(msg: Message) -> asyncio.Task: | |
"""Utility function of send event. | |
Args: | |
msg: The content and meta information to be sent. | |
""" | |
task = asyncio.create_task(_send_message(msg), name=msg.id) | |
return task | |