Spaces:
Sleeping
Sleeping
File size: 687 Bytes
bc5e560 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# coding: utf-8
# Copyright (c) 2025 inclusionAI.
import abc
from typing import TypeVar, Generic, AsyncGenerator
from aworld.core.event.base import Message
IN = TypeVar('IN')
OUT = TypeVar('OUT')
class Handler(Generic[IN, OUT]):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
async def handle(self, data: IN) -> AsyncGenerator[OUT, None]:
"""Process the data as the expected result.
Args:
data: Data generated while running the task.
"""
@classmethod
def name(cls):
"""Handler name."""
return cls.__name__
class DefaultHandler(Handler[Message, AsyncGenerator[Message, None]]):
"""Default handler."""
|