Duibonduil's picture
Upload 11 files
7a3c0ee verified
import wrapt
from typing import Any, Collection
from aworld.trace.instrumentation import Instrumentor
from aworld.trace.base import Tracer, get_tracer_provider_silent, TraceContext
from aworld.trace.propagator import get_global_trace_propagator, get_global_trace_context
from aworld.trace.propagator.carrier import DictCarrier
from aworld.logs.util import logger
def _emit_message_class_wrapper(tracer: Tracer):
async def awrapper(wrapped, instance, args, kwargs):
from aworld.core.event.base import Message
try:
event = args[0] if len(args) > 0 else kwargs.get("event")
propagator = get_global_trace_propagator()
trace_provider = get_tracer_provider_silent()
if trace_provider and propagator and event and isinstance(event, Message):
if not event.headers:
event.headers = {}
current_span = trace_provider.get_current_span()
if current_span:
trace_context = TraceContext(
trace_id=current_span.get_trace_id(), span_id=current_span.get_span_id())
propagator.inject(trace_context=trace_context,
carrier=DictCarrier(event.headers))
logger.info(
f"EventManager emit_message trace propagate, event.headers={event.headers}")
except Exception as e:
logger.error(
f"EventManager emit_message trace propagate exception: {e}")
return await wrapped(*args, **kwargs)
return awrapper
def _emit_message_instance_wrapper(tracer: Tracer):
@wrapt.decorator
async def awrapper(wrapped, instance, args, kwargs):
wrapper = _emit_message_class_wrapper(tracer)
return await wrapper(wrapped, instance, args, kwargs)
return awrapper
def _consume_class_wrapper(tracer: Tracer):
async def awrapper(wrapped, instance, args, kwargs):
from aworld.core.event.base import Message
event = await wrapped(*args, **kwargs)
try:
propagator = get_global_trace_propagator()
if propagator and event and isinstance(event, Message) and event.headers:
trace_context = propagator.extract(DictCarrier(event.headers))
logger.info(
f"extract trace_context from event: {trace_context}")
if trace_context:
get_global_trace_context().set(trace_context)
except Exception as e:
logger.error(
f"EventManager consume trace propagate exception: {e}")
return event
return awrapper
def _consume_instance_wrapper(tracer: Tracer):
@wrapt.decorator
async def awrapper(wrapped, instance, args, kwargs):
wrapper = _consume_class_wrapper(tracer)
return await wrapper(wrapped, instance, args, kwargs)
return awrapper
class EventBusInstrumentor(Instrumentor):
def instrumentation_dependencies(self) -> Collection[str]:
return ()
def _uninstrument(self, **kwargs: Any):
pass
def _instrument(self, **kwargs: Any):
tracer_provider = get_tracer_provider_silent()
if not tracer_provider:
return
tracer = tracer_provider.get_tracer(
"aworld.trace.instrumentation.eventbus")
wrapt.wrap_function_wrapper(
"aworld.events.manager",
"EventManager.emit_message",
_emit_message_class_wrapper(tracer=tracer)
)
wrapt.wrap_function_wrapper(
"aworld.events.manager",
"EventManager.consume",
_consume_class_wrapper(tracer=tracer)
)
def wrap_event_manager(manager: 'aworld.events.manager.EventManager'):
tracer_provider = get_tracer_provider_silent()
if not tracer_provider:
return manager
tracer = tracer_provider.get_tracer(
"aworld.trace.instrumentation.eventbus")
emit_wrapper = _emit_message_instance_wrapper(tracer)
consume_wrapper = _consume_instance_wrapper(tracer)
manager.emit_message = emit_wrapper(manager.emit_message)
manager.consume = consume_wrapper(manager.consume)
return manager