Spaces:
Sleeping
Sleeping
import wrapt | |
from typing import Any, Collection | |
from aworld.trace.instrumentation import Instrumentor | |
from aworld.trace.base import Tracer, get_tracer_provider_silent, TraceContext | |
from aworld.trace.propagator import get_global_trace_propagator, get_global_trace_context | |
from aworld.trace.propagator.carrier import DictCarrier | |
from aworld.logs.util import logger | |
def _emit_message_class_wrapper(tracer: Tracer): | |
async def awrapper(wrapped, instance, args, kwargs): | |
from aworld.core.event.base import Message | |
try: | |
event = args[0] if len(args) > 0 else kwargs.get("event") | |
propagator = get_global_trace_propagator() | |
trace_provider = get_tracer_provider_silent() | |
if trace_provider and propagator and event and isinstance(event, Message): | |
if not event.headers: | |
event.headers = {} | |
current_span = trace_provider.get_current_span() | |
if current_span: | |
trace_context = TraceContext( | |
trace_id=current_span.get_trace_id(), span_id=current_span.get_span_id()) | |
propagator.inject(trace_context=trace_context, | |
carrier=DictCarrier(event.headers)) | |
logger.info( | |
f"EventManager emit_message trace propagate, event.headers={event.headers}") | |
except Exception as e: | |
logger.error( | |
f"EventManager emit_message trace propagate exception: {e}") | |
return await wrapped(*args, **kwargs) | |
return awrapper | |
def _emit_message_instance_wrapper(tracer: Tracer): | |
async def awrapper(wrapped, instance, args, kwargs): | |
wrapper = _emit_message_class_wrapper(tracer) | |
return await wrapper(wrapped, instance, args, kwargs) | |
return awrapper | |
def _consume_class_wrapper(tracer: Tracer): | |
async def awrapper(wrapped, instance, args, kwargs): | |
from aworld.core.event.base import Message | |
event = await wrapped(*args, **kwargs) | |
try: | |
propagator = get_global_trace_propagator() | |
if propagator and event and isinstance(event, Message) and event.headers: | |
trace_context = propagator.extract(DictCarrier(event.headers)) | |
logger.info( | |
f"extract trace_context from event: {trace_context}") | |
if trace_context: | |
get_global_trace_context().set(trace_context) | |
except Exception as e: | |
logger.error( | |
f"EventManager consume trace propagate exception: {e}") | |
return event | |
return awrapper | |
def _consume_instance_wrapper(tracer: Tracer): | |
async def awrapper(wrapped, instance, args, kwargs): | |
wrapper = _consume_class_wrapper(tracer) | |
return await wrapper(wrapped, instance, args, kwargs) | |
return awrapper | |
class EventBusInstrumentor(Instrumentor): | |
def instrumentation_dependencies(self) -> Collection[str]: | |
return () | |
def _uninstrument(self, **kwargs: Any): | |
pass | |
def _instrument(self, **kwargs: Any): | |
tracer_provider = get_tracer_provider_silent() | |
if not tracer_provider: | |
return | |
tracer = tracer_provider.get_tracer( | |
"aworld.trace.instrumentation.eventbus") | |
wrapt.wrap_function_wrapper( | |
"aworld.events.manager", | |
"EventManager.emit_message", | |
_emit_message_class_wrapper(tracer=tracer) | |
) | |
wrapt.wrap_function_wrapper( | |
"aworld.events.manager", | |
"EventManager.consume", | |
_consume_class_wrapper(tracer=tracer) | |
) | |
def wrap_event_manager(manager: 'aworld.events.manager.EventManager'): | |
tracer_provider = get_tracer_provider_silent() | |
if not tracer_provider: | |
return manager | |
tracer = tracer_provider.get_tracer( | |
"aworld.trace.instrumentation.eventbus") | |
emit_wrapper = _emit_message_instance_wrapper(tracer) | |
consume_wrapper = _consume_instance_wrapper(tracer) | |
manager.emit_message = emit_wrapper(manager.emit_message) | |
manager.consume = consume_wrapper(manager.consume) | |
return manager | |