Spaces:
Sleeping
Sleeping
from aworld.trace.base import Propagator, Carrier, TraceContext | |
from aworld.trace.baggage import BaggageContext | |
from aworld.logs.util import logger | |
from aworld.trace.base import AttributeValueType | |
class SofaTracerBaggagePropagator(Propagator): | |
""" | |
Sofa tracer baggage propagator. | |
""" | |
_TRACE_ID_HEDER_NAMES = ["SOFA-TraceId", "sofaTraceId"] | |
_SPAN_ID_HEDER_NAMES = ["SOFA-RpcId", "sofaRpcId"] | |
_PEN_ATTRS_HEDER_NAME = "sofaPenAttrs" | |
_SYS_PEN_ATTRS_HEDER_NAME = "sysPenAttrs" | |
_TRACE_ID_BAGGAGE_KEY = "attributes.sofa.traceid" | |
_SPAN_ID_BAGGAGE_KEY = "attributes.sofa.rpcid" | |
_PEN_ATTRS_BAGGAGE_KEY = "attributes.sofa.penattrs" | |
_SYS_PEN_ATTRS_BAGGAGE_KEY = "attributes.sofa.syspenattrs" | |
def extract(self, carrier: Carrier): | |
""" | |
Extract trace context from carrier. | |
Args: | |
carrier: The carrier to extract trace context from. | |
Returns: | |
A dict of trace context. | |
""" | |
trace_id = None | |
span_id = None | |
for name in self._TRACE_ID_HEDER_NAMES: | |
trace_id = self._get_value(carrier, name) | |
if trace_id: | |
break | |
for name in self._SPAN_ID_HEDER_NAMES: | |
span_id = self._get_value(carrier, name) | |
if span_id: | |
break | |
pen_attrs = self._get_value(carrier, self._PEN_ATTRS_HEDER_NAME) | |
sys_pen_attrs = self._get_value( | |
carrier, self._SYS_PEN_ATTRS_HEDER_NAME) | |
logger.info( | |
f"extract trace_id: {trace_id}, span_id: {span_id}, pen_attrs: {pen_attrs}, sys_pen_attrs: {sys_pen_attrs}") | |
if trace_id and span_id: | |
BaggageContext.set_baggage(self._TRACE_ID_BAGGAGE_KEY, trace_id) | |
span_id = span_id + ".1" | |
BaggageContext.set_baggage(self._SPAN_ID_BAGGAGE_KEY, span_id) | |
if pen_attrs: | |
BaggageContext.set_baggage( | |
self._PEN_ATTRS_BAGGAGE_KEY, pen_attrs) | |
if sys_pen_attrs: | |
BaggageContext.set_baggage( | |
self._SYS_PEN_ATTRS_BAGGAGE_KEY, sys_pen_attrs) | |
def inject(self, trace_context: TraceContext, carrier: Carrier): | |
""" | |
Inject trace context to carrier. | |
Args: | |
trace_context: The trace context to inject. | |
carrier: The carrier to inject trace context to. | |
""" | |
baggage = BaggageContext.get_baggage() | |
if baggage: | |
trace_id = baggage.get(self._TRACE_ID_BAGGAGE_KEY) | |
span_id = baggage.get(self._SPAN_ID_BAGGAGE_KEY) | |
if trace_id and span_id: | |
carrier.set(self._TRACE_ID_HEDER_NAMES[0], trace_id) | |
carrier.set(self._SPAN_ID_HEDER_NAMES[0], span_id) | |
pen_attrs_dict = {} | |
for key, value in baggage.items(): | |
if key == self._TRACE_ID_BAGGAGE_KEY or key == self._SPAN_ID_BAGGAGE_KEY: | |
continue | |
if key == self._PEN_ATTRS_BAGGAGE_KEY and value: | |
pen_attrs_dict.update(dict(item.split("=") | |
for item in value.split("&"))) | |
continue | |
if key == self._SYS_PEN_ATTRS_BAGGAGE_KEY and value: | |
carrier.set(self._SYS_PEN_ATTRS_HEDER_NAME, value) | |
continue | |
# other baggage items will be injected to sofaPenAttrs | |
pen_attrs_dict.update({key: value}) | |
if pen_attrs_dict: | |
pen_attrs = "&".join(f"{key}={value}" | |
for key, value in pen_attrs_dict.items()) | |
carrier.set(self._PEN_ATTRS_HEDER_NAME, pen_attrs) | |
class SofaSpanHelper: | |
""" | |
Sofa span helper. | |
""" | |
def set_sofa_context_to_attr(span_attributes: dict[str, AttributeValueType]): | |
""" | |
Set sofa context to span attributes. | |
Args: | |
span_attributes: The span attributes to set sofa context to. | |
""" | |
baggage = BaggageContext.get_baggage() | |
if baggage: | |
trace_id = baggage.get( | |
SofaTracerBaggagePropagator._TRACE_ID_BAGGAGE_KEY) | |
span_id = baggage.get( | |
SofaTracerBaggagePropagator._SPAN_ID_BAGGAGE_KEY) | |
if trace_id and span_id: | |
span_attributes.update({ | |
SofaTracerBaggagePropagator._TRACE_ID_BAGGAGE_KEY: trace_id, | |
SofaTracerBaggagePropagator._SPAN_ID_BAGGAGE_KEY: span_id | |
}) | |
pen_attrs = baggage.get( | |
SofaTracerBaggagePropagator._PEN_ATTRS_BAGGAGE_KEY) | |
if pen_attrs: | |
span_attributes.update({ | |
SofaTracerBaggagePropagator._PEN_ATTRS_BAGGAGE_KEY: pen_attrs | |
}) | |
sys_pen_attrs = baggage.get( | |
SofaTracerBaggagePropagator._SYS_PEN_ATTRS_BAGGAGE_KEY) | |
if sys_pen_attrs: | |
span_attributes.update({ | |
SofaTracerBaggagePropagator._SYS_PEN_ATTRS_BAGGAGE_KEY: sys_pen_attrs | |
}) | |