Duibonduil's picture
Upload 11 files
3e56848 verified
import os
from pydantic import BaseModel
from typing import Sequence, Optional
from aworld.trace.span_cosumer import SpanConsumer
from logging import Logger
from aworld.logs.util import trace_logger
from aworld.trace.context_manager import trace_configure
from aworld.metrics.context_manager import MetricContext
from aworld.logs.log import set_log_provider, instrument_logging
from aworld.trace.instrumentation.uni_llmmodel import LLMModelInstrumentor
from aworld.trace.instrumentation.eventbus import EventBusInstrumentor
class ObservabilityConfig(BaseModel):
'''
Observability configuration
'''
class Config:
arbitrary_types_allowed = True
trace_provider: Optional[str] = "otlp"
trace_backends: Optional[Sequence[str]] = ["memory"]
trace_base_url: Optional[str] = None
trace_write_token: Optional[str] = None
trace_span_consumers: Optional[Sequence[SpanConsumer]] = None
# whether to start the trace service
trace_server_enabled: Optional[bool] = False
trace_server_port: Optional[int] = 7079
metrics_provider: Optional[str] = None
metrics_backend: Optional[str] = None
metrics_base_url: Optional[str] = None
metrics_write_token: Optional[str] = None
# whether to instrument system metrics
metrics_system_enabled: Optional[bool] = False
logs_provider: Optional[str] = None
logs_backend: Optional[str] = None
logs_base_url: Optional[str] = None
logs_write_token: Optional[str] = None
# The loggers that need to record the log as a span
logs_trace_instrumented_loggers: Sequence[Logger] = [trace_logger]
def configure(config: ObservabilityConfig = None):
if config is None:
config = ObservabilityConfig()
_trace_configure(config)
_metrics_configure(config)
_log_configure(config)
LLMModelInstrumentor().instrument()
EventBusInstrumentor().instrument()
def _trace_configure(config: ObservabilityConfig):
if not config.trace_base_url and config.trace_provider == "otlp":
if "logfire" in config.trace_backends:
config.trace_base_url = os.getenv("LOGFIRE_WRITE_TOKEN")
elif os.getenv("OTLP_TRACES_ENDPOINT"):
config.trace_base_url = os.getenv("OTLP_TRACES_ENDPOINT")
config.trace_backends.append("other_otlp")
trace_configure(
provider=config.trace_provider,
backends=config.trace_backends,
base_url=config.trace_base_url,
write_token=config.trace_write_token,
span_consumers=config.trace_span_consumers,
server_enabled=config.trace_server_enabled,
server_port=config.trace_server_port
)
def _metrics_configure(config: ObservabilityConfig):
if config.metrics_provider and config.metrics_backend:
MetricContext.configure(
provider=config.metrics_provider,
backend=config.metrics_backend,
base_url=config.metrics_base_url,
write_token=config.metrics_write_token,
metrics_system_enabled=config.metrics_system_enabled
)
def _log_configure(config: ObservabilityConfig):
if config.logs_provider and config.logs_backend:
if config.logs_backend == "logfire" and not config.logs_write_token:
config.logs_write_token = os.getenv("LOGFIRE_WRITE_TOKEN")
set_log_provider(provider=config.logs_provider,
backend=config.logs_backend,
base_url=config.logs_base_url,
write_token=config.logs_write_token)
if config.logs_trace_instrumented_loggers:
for logger in config.logs_trace_instrumented_loggers:
instrument_logging(logger)