File size: 5,113 Bytes
6b4889b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from aworld.trace.base import Propagator, Carrier, TraceContext
from aworld.trace.baggage import BaggageContext
from aworld.logs.util import logger
from aworld.trace.base import AttributeValueType


class SofaTracerBaggagePropagator(Propagator):
    """
    Sofa tracer baggage propagator.
    """

    _TRACE_ID_HEDER_NAMES = ["SOFA-TraceId", "sofaTraceId"]
    _SPAN_ID_HEDER_NAMES = ["SOFA-RpcId", "sofaRpcId"]
    _PEN_ATTRS_HEDER_NAME = "sofaPenAttrs"
    _SYS_PEN_ATTRS_HEDER_NAME = "sysPenAttrs"

    _TRACE_ID_BAGGAGE_KEY = "attributes.sofa.traceid"
    _SPAN_ID_BAGGAGE_KEY = "attributes.sofa.rpcid"
    _PEN_ATTRS_BAGGAGE_KEY = "attributes.sofa.penattrs"
    _SYS_PEN_ATTRS_BAGGAGE_KEY = "attributes.sofa.syspenattrs"

    def extract(self, carrier: Carrier):
        """
        Extract trace context from carrier.
        Args:
            carrier: The carrier to extract trace context from.
        Returns:
            A dict of trace context.
        """
        trace_id = None
        span_id = None
        for name in self._TRACE_ID_HEDER_NAMES:
            trace_id = self._get_value(carrier, name)
            if trace_id:
                break
        for name in self._SPAN_ID_HEDER_NAMES:
            span_id = self._get_value(carrier, name)
            if span_id:
                break
        pen_attrs = self._get_value(carrier, self._PEN_ATTRS_HEDER_NAME)
        sys_pen_attrs = self._get_value(
            carrier, self._SYS_PEN_ATTRS_HEDER_NAME)

        logger.info(
            f"extract trace_id: {trace_id}, span_id: {span_id}, pen_attrs: {pen_attrs}, sys_pen_attrs: {sys_pen_attrs}")
        if trace_id and span_id:
            BaggageContext.set_baggage(self._TRACE_ID_BAGGAGE_KEY, trace_id)
            span_id = span_id + ".1"
            BaggageContext.set_baggage(self._SPAN_ID_BAGGAGE_KEY, span_id)
            if pen_attrs:
                BaggageContext.set_baggage(
                    self._PEN_ATTRS_BAGGAGE_KEY, pen_attrs)
            if sys_pen_attrs:
                BaggageContext.set_baggage(
                    self._SYS_PEN_ATTRS_BAGGAGE_KEY, sys_pen_attrs)

    def inject(self, trace_context: TraceContext, carrier: Carrier):
        """
        Inject trace context to carrier.
        Args:
            trace_context: The trace context to inject.
            carrier: The carrier to inject trace context to.
        """
        baggage = BaggageContext.get_baggage()

        if baggage:
            trace_id = baggage.get(self._TRACE_ID_BAGGAGE_KEY)
            span_id = baggage.get(self._SPAN_ID_BAGGAGE_KEY)
            if trace_id and span_id:
                carrier.set(self._TRACE_ID_HEDER_NAMES[0], trace_id)
                carrier.set(self._SPAN_ID_HEDER_NAMES[0], span_id)

            pen_attrs_dict = {}
            for key, value in baggage.items():
                if key == self._TRACE_ID_BAGGAGE_KEY or key == self._SPAN_ID_BAGGAGE_KEY:
                    continue
                if key == self._PEN_ATTRS_BAGGAGE_KEY and value:
                    pen_attrs_dict.update(dict(item.split("=")
                                          for item in value.split("&")))
                    continue
                if key == self._SYS_PEN_ATTRS_BAGGAGE_KEY and value:
                    carrier.set(self._SYS_PEN_ATTRS_HEDER_NAME, value)
                    continue

                # other baggage items will be injected to sofaPenAttrs
                pen_attrs_dict.update({key: value})

            if pen_attrs_dict:
                pen_attrs = "&".join(f"{key}={value}"
                                     for key, value in pen_attrs_dict.items())
                carrier.set(self._PEN_ATTRS_HEDER_NAME, pen_attrs)


class SofaSpanHelper:
    """
    Sofa span helper.
    """

    @staticmethod
    def set_sofa_context_to_attr(span_attributes: dict[str, AttributeValueType]):
        """
        Set sofa context to span attributes.
        Args:
            span_attributes: The span attributes to set sofa context to.
        """
        baggage = BaggageContext.get_baggage()
        if baggage:
            trace_id = baggage.get(
                SofaTracerBaggagePropagator._TRACE_ID_BAGGAGE_KEY)
            span_id = baggage.get(
                SofaTracerBaggagePropagator._SPAN_ID_BAGGAGE_KEY)
            if trace_id and span_id:
                span_attributes.update({
                    SofaTracerBaggagePropagator._TRACE_ID_BAGGAGE_KEY: trace_id,
                    SofaTracerBaggagePropagator._SPAN_ID_BAGGAGE_KEY: span_id
                })
            pen_attrs = baggage.get(
                SofaTracerBaggagePropagator._PEN_ATTRS_BAGGAGE_KEY)
            if pen_attrs:
                span_attributes.update({
                    SofaTracerBaggagePropagator._PEN_ATTRS_BAGGAGE_KEY: pen_attrs
                })
            sys_pen_attrs = baggage.get(
                SofaTracerBaggagePropagator._SYS_PEN_ATTRS_BAGGAGE_KEY)
            if sys_pen_attrs:
                span_attributes.update({
                    SofaTracerBaggagePropagator._SYS_PEN_ATTRS_BAGGAGE_KEY: sys_pen_attrs
                })