import os import base64 from typing import Optional, Literal from opentelemetry.sdk.trace import TracerProvider from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry import trace from openinference.instrumentation.smolagents import SmolagentsInstrumentor def setup_tracing( service_name: str = "agent-service", enabled: bool = True, provider: Literal["langfuse", "phoenix"] = "langfuse", ) -> Optional[TracerProvider]: """ Configure and set up OpenTelemetry tracing with Langfuse or Phoenix integration. Args: service_name: Name of the service for trace identification enabled: Whether tracing should be active provider: Which tracing provider to use - "langfuse" or "phoenix" Returns: Configured TracerProvider instance or None if disabled """ if not enabled: return None # Setup Phoenix if provider == "phoenix": try: from phoenix.otel import register # Create Phoenix tracer provider trace_provider = register( project_name=service_name, endpoint="http://127.0.0.1:6006/v1/traces", auto_instrument=True, ) print(f"✅ Phoenix tracing enabled for {service_name}") # Instrument smolagents with Phoenix provider SmolagentsInstrumentor().instrument(tracer_provider=trace_provider) return trace_provider except ImportError: print("⚠️ Phoenix not installed. Run: pip install 'smolagents[telemetry]'") return None # Setup Langfuse (default) else: # Configure Langfuse host langfuse_host = os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com") # Create auth token for Langfuse langfuse_public_key = os.environ.get("LANGFUSE_PUBLIC_KEY") langfuse_secret_key = os.environ.get("LANGFUSE_SECRET_KEY") if not langfuse_public_key or not langfuse_secret_key: print("⚠️ Langfuse keys not found in environment") return None # Configure Langfuse langfuse_auth = base64.b64encode( f"{langfuse_public_key}:{langfuse_secret_key}".encode() ).decode() # Set environment variables for OTLP exporter os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = f"{langfuse_host}/api/public/otel" os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = ( f"Authorization=Basic {langfuse_auth}" ) # Create a new TracerProvider for Langfuse langfuse_provider = TracerProvider() # Add Langfuse exporter langfuse_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter())) # Set as the global default tracer provider trace.set_tracer_provider(langfuse_provider) print(f"✅ Langfuse tracing enabled for {service_name}") # Instrument smolagents SmolagentsInstrumentor().instrument(tracer_provider=langfuse_provider) return langfuse_provider