File size: 2,453 Bytes
0d77048
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import requests
from urllib.parse import urljoin
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.sdk._logs import LoggerProvider as SDKLoggerProvider
from opentelemetry.sdk._logs._internal import SynchronousMultiLogRecordProcessor
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor

from ..log import LoggerProvider
class OTLPLoggerProvider(LoggerProvider):

    def __init__(self,
                 backend: str = None,
                 base_url: str = None,
                 write_token: str = None,
                 **kwargs
    ) -> None:
        self._logger_provider = _build_otlp_privider(backend=backend,
                                               base_url=base_url,
                                               write_token=write_token)



def _build_otlp_privider(backend: str = None, base_url: str = None, write_token: str = None):
    """Build the otlp provider.
    Args:
        backends: The backends to use.
        write_token: The write token to use.
        **kwargs: Additional keyword arguments to pass to the provider.
    Returns:
        The otlp provider.
    """
    backend = backend or "logfire"
    if backend == "logfire":
        log_exporter = _configure_logfire_exporter(write_token=write_token, base_url=base_url)
    else:
        raise ValueError(f"Unknown backend: {backend}")

    otlp_log_processor = BatchLogRecordProcessor(log_exporter)
    multi_log_processor = SynchronousMultiLogRecordProcessor()
    multi_log_processor.add_log_record_processor(otlp_log_processor)
    logger_provider = SDKLoggerProvider()
    logger_provider.add_log_record_processor(multi_log_processor)
    return logger_provider


def _configure_logfire_exporter(write_token: str, base_url: str = None) -> None:
    """Configure the Logfire exporter.
    Args:
        write_token: The write token to use.
        base_url: The base URL to use.
        **kwargs: Additional keyword arguments to pass to the exporter.
    """
    base_url = base_url or "https://logfire-us.pydantic.dev"
    headers = {'User-Agent': f'logfire/3.14.0', 'Authorization': write_token}
    session = requests.Session()
    session.headers.update(headers)
    return OTLPLogExporter(
                endpoint=urljoin(base_url, '/v1/logs'),
                session=session,
                compression=Compression.Gzip,
    )