import types import inspect from typing import Union, Optional, Any, Type, Sequence, Callable, Iterable from aworld.trace.base import ( AttributeValueType, NoOpSpan, Span, Tracer, NoOpTracer, get_tracer_provider, get_tracer_provider_silent, log_trace_error ) from aworld.trace.span_cosumer import SpanConsumer from aworld.version_gen import __version__ from aworld.trace.auto_trace import AutoTraceModule, install_auto_tracing from aworld.trace.stack_info import get_user_stack_info from aworld.trace.constants import ( ATTRIBUTES_MESSAGE_KEY, ATTRIBUTES_MESSAGE_RUN_TYPE_KEY, ATTRIBUTES_MESSAGE_TEMPLATE_KEY, RunType ) from aworld.trace.msg_format import ( chunks_formatter, warn_formatting, FStringAwaitError, KnownFormattingError, warn_fstring_await ) from aworld.trace.function_trace import trace_func from .opentelemetry.opentelemetry_adapter import configure_otlp_provider from aworld.logs.util import logger def trace_configure(provider: str = "otlp", backends: Sequence[str] = None, base_url: str = None, write_token: str = None, span_consumers: Optional[Sequence[SpanConsumer]] = None, **kwargs ) -> None: """ Configure the trace provider. Args: provider: The trace provider to use. backends: The trace backends to use. base_url: The base URL of the trace backend. write_token: The write token of the trace backend. span_consumers: The span consumers to use. **kwargs: Additional arguments to pass to the trace provider. Returns: None """ exist_provider = get_tracer_provider_silent() if exist_provider: logger.info("Trace provider already configured, shutting down...") exist_provider.shutdown() if provider == "otlp": configure_otlp_provider( backends=backends, base_url=base_url, write_token=write_token, span_consumers=span_consumers, **kwargs) else: raise ValueError(f"Unknown trace provider: {provider}") class TraceManager: """ TraceManager is a class that provides a way to trace the execution of a function. """ def __init__(self, tracer_name: str = None) -> None: self._tracer_name = tracer_name or "aworld" self._version = __version__ def _create_auto_span(self, name: str, attributes: dict[str, AttributeValueType] = None ) -> Span: """ Create a auto trace span with the given name and attributes. """ return self._create_context_span(name, attributes) def _create_context_span(self, name: str, attributes: dict[str, AttributeValueType] = None) -> Span: try: tracer = get_tracer_provider().get_tracer( name=self._tracer_name, version=self._version) return ContextSpan(span_name=name, tracer=tracer, attributes=attributes) except Exception: return ContextSpan(span_name=name, tracer=NoOpTracer(), attributes=attributes) def get_current_span(self) -> Span: """ Get the current span. """ try: return get_tracer_provider().get_current_span() except Exception: return NoOpSpan() def new_manager(self, tracer_name_suffix: str = None) -> "TraceManager": """ Create a new TraceManager with the given tracer name suffix. """ tracer_name = self._tracer_name if not tracer_name_suffix else f"{self._tracer_name}.{tracer_name_suffix}" return TraceManager(tracer_name=tracer_name) def auto_tracing(self, modules: Union[Sequence[str], Callable[[AutoTraceModule], bool]], min_duration: float) -> None: """ Automatically trace the execution of a function. Args: modules: A list of module names or a callable that takes a `AutoTraceModule` and returns a boolean. min_duration: The minimum duration of a function to be traced. Returns: None """ install_auto_tracing(self, modules, min_duration) def span(self, msg_template: str = "", attributes: dict[str, AttributeValueType] = None, *, span_name: str = None, run_type: RunType = RunType.OTHER) -> "ContextSpan": try: attributes = attributes or {} stack_info = get_user_stack_info() merged_attributes = {**stack_info, **attributes} # Retrieve stack information of user code and add it to the attributes if any(c in msg_template for c in ('{', '}')): fstring_frame = inspect.currentframe().f_back else: fstring_frame = None log_message, extra_attrs, msg_template = format_span_msg( msg_template, merged_attributes, fstring_frame=fstring_frame, ) merged_attributes[ATTRIBUTES_MESSAGE_KEY] = log_message merged_attributes.update(extra_attrs) merged_attributes[ATTRIBUTES_MESSAGE_TEMPLATE_KEY] = msg_template merged_attributes[ATTRIBUTES_MESSAGE_RUN_TYPE_KEY] = run_type.value span_name = span_name or msg_template return self._create_context_span(span_name, merged_attributes) except Exception: log_trace_error() return ContextSpan(span_name=span_name, tracer=NoOpTracer(), attributes=attributes) def func_span(self, msg_template: Union[str, Callable] = None, *, attributes: dict[str, AttributeValueType] = None, span_name: str = None, extract_args: Union[bool, Iterable[str]] = False, **kwargs) -> Callable: """ A decorator that traces the execution of a function. Args: msg_template: The message template to use. attributes: The attributes to add to the span. span_name: The name of the span. extract_args: Whether to extract arguments from the function call. **kwargs: Additional attributes to add to the span. Returns: A decorator that traces the execution of a function. """ if callable(msg_template): # @trace_func # def foo(): return self.func_span()(msg_template) attributes = attributes or {} attributes.update(kwargs) return trace_func(self, msg_template, attributes, span_name, extract_args) class ContextSpan(Span): """A context manager that wraps an existing `Span` object. This class provides a way to use a `Span` object as a context manager. When the context manager is entered, it returns the `Span` itself. When the context manager is exited, it calls `end` on the `Span`. Args: span: The `Span` object to wrap. """ def __init__(self, span_name: str, tracer: Tracer, attributes: dict[str, AttributeValueType] = None) -> None: self._span_name = span_name self._tracer = tracer self._attributes = attributes self._span: Span = None self._coro_context = None def _start(self): if self._span is not None: return self._span = self._tracer.start_span( name=self._span_name, attributes=self._attributes, ) def __enter__(self) -> "Span": self._start() return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], traceback: Optional[Any], ) -> None: """Ends context manager and calls `end` on the `Span`.""" self._handle_exit(exc_type, exc_val, traceback) async def __aenter__(self) -> "Span": self._start() return self async def __aexit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], traceback: Optional[Any], ) -> None: self._handle_exit(exc_type, exc_val, traceback) def _handle_exit( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], traceback: Optional[Any], ) -> None: try: if self._span and self._span.is_recording() and isinstance(exc_val, BaseException): self._span.record_exception(exc_val, escaped=True) except ValueError as e: logger.warning(f"Failed to record_exception: {e}") finally: if self._span: self._span.end() def end(self, end_time: Optional[int] = None) -> None: if self._span: self._span.end(end_time) def set_attribute(self, key: str, value: AttributeValueType) -> None: if self._span: self._span.set_attribute(key, value) def set_attributes(self, attributes: dict[str, AttributeValueType]) -> None: if self._span: self._span.set_attributes(attributes) def is_recording(self) -> bool: if self._span: return self._span.is_recording() return False def record_exception( self, exception: BaseException, attributes: dict[str, Any] = None, timestamp: Optional[int] = None, escaped: bool = False, ) -> None: if self._span: self._span.record_exception( exception, attributes, timestamp, escaped) def get_trace_id(self) -> str: if self._span: return self._span.get_trace_id() def get_span_id(self) -> str: if self._span: return self._span.get_span_id() def format_span_msg( format_string: str, kwargs: dict[str, Any], fstring_frame: types.FrameType = None, ) -> tuple[str, dict[str, Any], str]: """ Returns 1. The formatted message. 2. A dictionary of extra attributes to add to the span/log. These can come from evaluating values in f-strings. 3. The final message template, which may differ from `format_string` if it was an f-string. """ try: chunks, extra_attrs, new_template = chunks_formatter.chunks( format_string, kwargs, fstring_frame=fstring_frame ) return ''.join(chunk['v'] for chunk in chunks), extra_attrs, new_template except KnownFormattingError as e: warn_formatting(str(e) or str(e.__cause__)) except FStringAwaitError as e: warn_fstring_await(str(e)) except Exception: log_trace_error() # Formatting failed, so just use the original format string as the message. return format_string, {}, format_string