Duibonduil's picture
Upload 11 files
3e56848 verified
from abc import ABC, abstractmethod
from typing import Sequence
from aworld.trace.base import Span
class SpanConsumer(ABC):
"""SpanConsumer is a protocol that represents a consumer for spans.
"""
@abstractmethod
def consume(self, spans: Sequence[Span]) -> None:
"""Consumes a span.
Args:
spans: The span to consume.
"""
_SPAN_CONSUMER_REGISTRY = {}
def register_span_consumer(default_kwargs=None) -> None:
"""Registers a span consumer.
Args:
default_kwargs: A dictionary of default keyword arguments to pass to the span consumer.
"""
default_kwargs = default_kwargs or {}
def decorator(cls):
_SPAN_CONSUMER_REGISTRY[cls.__name__] = (cls, default_kwargs)
return cls
return decorator
def get_span_consumers() -> Sequence[SpanConsumer]:
"""Returns a list of span consumers.
Returns:
A list of span consumers.
"""
return [
cls(**kwargs)
for cls, kwargs in _SPAN_CONSUMER_REGISTRY.values()
]