Spaces:
Sleeping
Sleeping
import sys | |
import os | |
import traceback | |
import time | |
import datetime | |
import requests | |
from threading import Lock | |
from typing import Any, Iterator, Sequence, Optional, TYPE_CHECKING | |
from contextvars import Token | |
from urllib.parse import urljoin | |
import opentelemetry.context as otlp_context_api | |
from opentelemetry.trace import ( | |
SpanKind, | |
set_span_in_context, | |
get_current_span as get_current_otlp_span, | |
NonRecordingSpan, | |
SpanContext, | |
TraceFlags | |
) | |
from opentelemetry.trace.status import StatusCode | |
from opentelemetry.sdk.trace import ( | |
ReadableSpan, | |
SynchronousMultiSpanProcessor, | |
Tracer as SDKTracer, | |
Span as SDKSpan, | |
TracerProvider as SDKTracerProvider | |
) | |
from opentelemetry.context import Context as OTLPContext | |
from opentelemetry.semconv.trace import SpanAttributes | |
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor | |
from aworld.trace.base import ( | |
AttributeValueType, | |
NoOpTracer, | |
SpanType, | |
TraceProvider, | |
Tracer, | |
Span, | |
TraceContext, | |
set_tracer_provider | |
) | |
from aworld.trace.span_cosumer import SpanConsumer | |
from aworld.trace.propagator import get_global_trace_context | |
from aworld.trace.baggage.sofa_tracer import SofaSpanHelper | |
from aworld.logs.util import logger | |
from aworld.utils.common import get_local_ip | |
from .memory_storage import InMemorySpanExporter, InMemoryStorage | |
from ..constants import ATTRIBUTES_MESSAGE_KEY | |
from .export import FileSpanExporter, NoOpSpanExporter, SpanConsumerExporter | |
from ..server import set_trace_server | |
class OTLPTraceProvider(TraceProvider): | |
"""A TraceProvider that wraps an existing `SDKTracerProvider`. | |
This class provides a way to use a `SDKTracerProvider` as a `TraceProvider`. | |
When the context manager is entered, it returns the `SDKTracerProvider` itself. | |
When the context manager is exited, it calls `shutdown` on the `SDKTracerProvider`. | |
Args: | |
provider: The internal provider to wrap. | |
""" | |
def __init__(self, provider: SDKTracerProvider, suppressed_scopes: Optional[set[str]] = None): | |
self._provider: SDKTracerProvider = provider | |
self._suppressed_scopes = set() | |
if suppressed_scopes: | |
self._suppressed_scopes.update(suppressed_scopes) | |
self._lock: Lock = Lock() | |
def get_tracer( | |
self, | |
name: str, | |
version: Optional[str] = None | |
): | |
with self._lock: | |
if name in self._suppressed_scopes: | |
return NoOpTracer() | |
else: | |
tracer = self._provider.get_tracer(instrumenting_module_name=name, | |
instrumenting_library_version=version) | |
return OTLPTracer(tracer) | |
def shutdown(self) -> None: | |
with self._lock: | |
if isinstance(self._provider, SDKTracerProvider): | |
self._provider.shutdown() | |
def force_flush(self, timeout: Optional[float] = None) -> bool: | |
with self._lock: | |
if isinstance(self._provider, SDKTracerProvider): | |
return self._provider.force_flush(timeout) | |
else: | |
return False | |
def get_current_span(self) -> Optional["Span"]: | |
otlp_span = get_current_otlp_span() | |
return OTLPSpan(otlp_span, is_new_span=False) | |
class OTLPTracer(Tracer): | |
"""A Tracer represents a collection of Spans. | |
Args: | |
tracer: The internal tracer to wrap. | |
""" | |
def __init__(self, tracer: SDKTracer): | |
self._tracer = tracer | |
def start_span( | |
self, | |
name: str, | |
span_type: SpanType = SpanType.INTERNAL, | |
attributes: dict[str, AttributeValueType] = None, | |
start_time: Optional[int] = None, | |
record_exception: bool = True, | |
set_status_on_exception: bool = True, | |
trace_context: Optional[TraceContext] = None | |
) -> "Span": | |
otel_context = None | |
trace_context = trace_context or get_global_trace_context().get_and_clear() | |
if trace_context: | |
otel_context = self._get_otel_context_from_trace_context( | |
trace_context) | |
start_time = start_time or time.time_ns() | |
attributes = {**(attributes or {})} | |
attributes.setdefault(ATTRIBUTES_MESSAGE_KEY, name) | |
SofaSpanHelper.set_sofa_context_to_attr(attributes) | |
attributes = {k: v for k, v in attributes.items( | |
) if is_valid_attribute_value(k, v)} | |
span_kind = self._convert_to_span_kind( | |
span_type) if span_type else SpanKind.INTERNAL | |
span = self._tracer.start_span(name=name, | |
kind=span_kind, | |
context=otel_context, | |
attributes=attributes, | |
start_time=start_time, | |
record_exception=record_exception, | |
set_status_on_exception=set_status_on_exception) | |
return OTLPSpan(span) | |
def start_as_current_span( | |
self, | |
name: str, | |
span_type: SpanType = SpanType.INTERNAL, | |
attributes: dict[str, AttributeValueType] = None, | |
start_time: Optional[int] = None, | |
record_exception: bool = True, | |
set_status_on_exception: bool = True, | |
end_on_exit: bool = True, | |
trace_context: Optional[TraceContext] = None | |
) -> Iterator["Span"]: | |
start_time = start_time or time.time_ns() | |
attributes = {**(attributes or {})} | |
attributes.setdefault(ATTRIBUTES_MESSAGE_KEY, name) | |
SofaSpanHelper.set_sofa_context_to_attr(attributes) | |
attributes = {k: v for k, v in attributes.items( | |
) if is_valid_attribute_value(k, v)} | |
span_kind = self._convert_to_span_kind( | |
span_type) if span_type else SpanKind.INTERNAL | |
otel_context = None | |
trace_context = trace_context or get_global_trace_context().get_and_clear() | |
if trace_context: | |
otel_context = self._get_otel_context_from_trace_context( | |
trace_context) | |
class _OTLPSpanContextManager: | |
def __init__(self, tracer: SDKTracer): | |
self._span_cm = None | |
self._tracer = tracer | |
def __enter__(self): | |
self._span_cm = self._tracer.start_as_current_span( | |
name=name, | |
kind=span_kind, | |
context=otel_context, | |
attributes=attributes, | |
start_time=start_time, | |
record_exception=record_exception, | |
set_status_on_exception=set_status_on_exception, | |
end_on_exit=end_on_exit | |
) | |
inner_span = self._span_cm.__enter__() | |
return OTLPSpan(inner_span) | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
return self._span_cm.__exit__(exc_type, exc_val, exc_tb) | |
return _OTLPSpanContextManager(self._tracer) | |
def _convert_to_span_kind(self, span_type: SpanType) -> str: | |
if span_type == SpanType.INTERNAL: | |
return SpanKind.INTERNAL | |
elif span_type == SpanType.CLIENT: | |
return SpanKind.CLIENT | |
elif span_type == SpanType.SERVER: | |
return SpanKind.SERVER | |
elif span_type == SpanType.PRODUCER: | |
return SpanKind.PRODUCER | |
elif span_type == SpanType.CONSUMER: | |
return SpanKind.CONSUMER | |
else: | |
return SpanKind.INTERNAL | |
def _get_otel_context_from_trace_context(self, trace_context: TraceContext) -> OTLPContext: | |
trace_flags = None | |
if trace_context.trace_flags: | |
trace_flags = TraceFlags(int(trace_context.trace_flags, 16)) | |
otel_context = otlp_context_api.Context() | |
return set_span_in_context( | |
NonRecordingSpan( | |
SpanContext( | |
trace_id=int(trace_context.trace_id, 16), | |
span_id=int(trace_context.span_id, 16), | |
is_remote=True, | |
trace_flags=trace_flags | |
) | |
), | |
otel_context, | |
) | |
class OTLPSpan(Span, ReadableSpan): | |
"""A Span represents a single operation within a trace. | |
""" | |
def __init__(self, span: SDKSpan, is_new_span=True): | |
self._span = span | |
self._token: Optional[Token[OTLPContext]] = None | |
if is_new_span: | |
self._attach() | |
self._add_to_open_spans() | |
if not TYPE_CHECKING: # pragma: no branch | |
def __getattr__(self, name: str) -> Any: | |
return getattr(self._span, name) | |
def end(self, end_time: Optional[int] = None) -> None: | |
self._remove_from_open_spans() | |
end_time = end_time or time.time_ns() | |
if not self._span._status or self._span._status.status_code == StatusCode.UNSET: | |
self._span.set_status( | |
status=StatusCode.OK, | |
description="", | |
) | |
self._span.end(end_time=end_time) | |
self._detach() | |
def set_attribute(self, key: str, value: Any) -> None: | |
if not is_valid_attribute_value(key, value): | |
return | |
self._span.set_attribute(key=key, value=value) | |
def set_attributes(self, attributes: dict[str, Any]) -> None: | |
attributes = {k: v for k, v in attributes.items( | |
) if is_valid_attribute_value(k, v)} | |
self._span.set_attributes(attributes=attributes) | |
def is_recording(self) -> bool: | |
return self._span.is_recording() | |
def record_exception( | |
self, | |
exception: BaseException, | |
attributes: dict[str, Any] = None, | |
timestamp: Optional[int] = None, | |
escaped: bool = False, | |
) -> None: | |
timestamp = timestamp or time.time_ns() | |
attributes = {**(attributes or {})} | |
stacktrace = ''.join(traceback.format_exception( | |
type(exception), exception, exception.__traceback__)) | |
self._span.set_attributes({ | |
SpanAttributes.EXCEPTION_STACKTRACE: stacktrace, | |
SpanAttributes.EXCEPTION_TYPE: type(exception).__name__, | |
SpanAttributes.EXCEPTION_MESSAGE: str(exception), | |
SpanAttributes.EXCEPTION_ESCAPED: escaped | |
}) | |
if exception is not sys.exc_info()[1]: | |
attributes[SpanAttributes.EXCEPTION_STACKTRACE] = stacktrace | |
self._span.record_exception(exception=exception, | |
attributes=attributes, | |
timestamp=timestamp, | |
escaped=escaped) | |
self._span.set_status( | |
status=StatusCode.ERROR, | |
description=str(exception), | |
) | |
def get_trace_id(self) -> str: | |
"""Get the trace ID of the span. | |
Returns: | |
The trace ID of the span. | |
""" | |
if not self._span or not self._span.get_span_context() or not self.is_recording(): | |
return None | |
return f"{self._span.get_span_context().trace_id:032x}" | |
def get_span_id(self) -> str: | |
"""Get the span ID of the span. | |
Returns: | |
The span ID of the span. | |
""" | |
if not self._span or not self._span.get_span_context() or not self.is_recording(): | |
return None | |
return f"{self._span.get_span_context().span_id:016x}" | |
def _attach(self): | |
if self._token is not None: | |
return | |
self._token = otlp_context_api.attach(set_span_in_context(self._span)) | |
def _detach(self): | |
if self._token is None: | |
return | |
try: | |
otlp_context_api.detach(self._token) | |
except ValueError as e: | |
logger.warning(f"Failed to detach context: {e}") | |
finally: | |
self._token = None | |
def configure_otlp_provider( | |
backends: Sequence[str] = None, | |
base_url: str = None, | |
write_token: str = None, | |
span_consumers: Optional[Sequence[SpanConsumer]] = None, | |
**kwargs | |
) -> None: | |
"""Configure the OTLP provider. | |
Args: | |
backend: The backend to use. | |
write_token: The write token to use. | |
**kwargs: Additional keyword arguments to pass to the provider. | |
""" | |
from aworld.metrics.opentelemetry.opentelemetry_adapter import build_otel_resource | |
backends = backends or ["logfire"] | |
processor = SynchronousMultiSpanProcessor() | |
processor.add_span_processor(BatchSpanProcessor( | |
SpanConsumerExporter(span_consumers))) | |
for backend in backends: | |
if backend == "logfire": | |
span_exporter = _configure_logfire_exporter( | |
write_token=write_token, base_url=base_url, **kwargs) | |
processor.add_span_processor(BatchSpanProcessor(span_exporter)) | |
elif backend == "console": | |
from opentelemetry.sdk.trace.export import ConsoleSpanExporter | |
processor.add_span_processor( | |
BatchSpanProcessor(ConsoleSpanExporter())) | |
elif backend == "file": | |
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") | |
file_path = kwargs.get("file_path", f"traces_{timestamp}.json") | |
processor.add_span_processor( | |
BatchSpanProcessor(FileSpanExporter(file_path))) | |
elif backend == "memory": | |
logger.info("Using in-memory storage for traces.") | |
storage = kwargs.get( | |
"storage", InMemoryStorage()) | |
processor.add_span_processor( | |
SimpleSpanProcessor(InMemorySpanExporter(storage=storage))) | |
server_enabled = kwargs.get("server_enabled") or os.getenv( | |
"START_TRACE_SERVER") or "true" | |
server_port = kwargs.get("server_port") or 7079 | |
if (server_enabled.lower() == "true"): | |
logger.info(f"Starting trace server on port {server_port}.") | |
set_trace_server(storage=storage, port=int( | |
server_port), start_server=True) | |
else: | |
logger.info("Trace server is not started.") | |
set_trace_server(storage=storage, port=int( | |
server_port), start_server=False) | |
else: | |
span_exporter = _configure_otlp_exporter( | |
base_url=base_url, **kwargs) | |
processor.add_span_processor(BatchSpanProcessor(span_exporter)) | |
set_tracer_provider(OTLPTraceProvider(SDKTracerProvider(active_span_processor=processor, | |
resource=build_otel_resource()))) | |
def _configure_logfire_exporter(write_token: str, base_url: str = None) -> None: | |
"""Configure the Logfire exporter. | |
Args: | |
write_token: The write token to use. | |
base_url: The base URL to use. | |
**kwargs: Additional keyword arguments to pass to the exporter. | |
""" | |
from opentelemetry.exporter.otlp.proto.http import Compression | |
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter | |
base_url = base_url or "https://logfire-us.pydantic.dev" | |
headers = {'User-Agent': f'logfire/3.14.0', 'Authorization': write_token} | |
session = requests.Session() | |
session.headers.update(headers) | |
return OTLPSpanExporter( | |
endpoint=urljoin(base_url, '/v1/traces'), | |
session=session, | |
compression=Compression.Gzip, | |
) | |
def _configure_otlp_exporter(base_url: str = None, **kwargs) -> None: | |
"""Configure the OTLP exporter. | |
Args: | |
write_token: The write token to use. | |
base_url: The base URL to use. | |
**kwargs: Additional keyword arguments to pass to the exporter. | |
""" | |
import requests | |
from opentelemetry.exporter.otlp.proto.http import Compression | |
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter | |
otlp_traces_endpoint = os.getenv("OTLP_TRACES_ENDPOINT") | |
base_url = base_url or otlp_traces_endpoint | |
session = requests.Session() | |
return OTLPSpanExporter( | |
endpoint=base_url, | |
session=session, | |
compression=Compression.Gzip, | |
) | |
def is_valid_attribute_value(k, v): | |
valid = True | |
if not v: | |
valid = False | |
valid = isinstance(v, (str, bool, int, float)) or \ | |
(isinstance(v, Sequence) and | |
all(isinstance(i, (str, bool, int, float)) for i in v)) | |
if not valid: | |
logger.warning(f"value of attribute[{k}] is invalid: {v}") | |
return valid | |