Spaces:
Sleeping
Sleeping
File size: 1,042 Bytes
3e56848 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
from abc import ABC, abstractmethod
from typing import Sequence
from aworld.trace.base import Span
class SpanConsumer(ABC):
"""SpanConsumer is a protocol that represents a consumer for spans.
"""
@abstractmethod
def consume(self, spans: Sequence[Span]) -> None:
"""Consumes a span.
Args:
spans: The span to consume.
"""
_SPAN_CONSUMER_REGISTRY = {}
def register_span_consumer(default_kwargs=None) -> None:
"""Registers a span consumer.
Args:
default_kwargs: A dictionary of default keyword arguments to pass to the span consumer.
"""
default_kwargs = default_kwargs or {}
def decorator(cls):
_SPAN_CONSUMER_REGISTRY[cls.__name__] = (cls, default_kwargs)
return cls
return decorator
def get_span_consumers() -> Sequence[SpanConsumer]:
"""Returns a list of span consumers.
Returns:
A list of span consumers.
"""
return [
cls(**kwargs)
for cls, kwargs in _SPAN_CONSUMER_REGISTRY.values()
]
|