Spaces:
Sleeping
Sleeping
import time | |
import threading | |
from typing import Sequence, Optional, Dict, List | |
from prometheus_client import Counter as PCounter, Gauge as PGauge, Histogram as PHistogram, CollectorRegistry | |
from prometheus_client import start_http_server, REGISTRY | |
from aworld.metrics.metric import( | |
MetricProvider, | |
Counter, | |
UpDownCounter, | |
MetricExporter, | |
Gauge, | |
Histogram, | |
set_metric_provider | |
) | |
class PrometheusMetricProvider(MetricProvider): | |
""" | |
PrometheusMetricProvider is a subclass of MetricProvider, representing a metric provider for Prometheus. | |
""" | |
def __init__(self, exporter: MetricExporter): | |
""" | |
Initialize the PrometheusMetricProvider. | |
Args: | |
port: The port to use for the Prometheus server. | |
""" | |
super().__init__() | |
self.exporter = exporter | |
def shutdown(self) -> None: | |
""" | |
Shutdown the PrometheusMetricProvider. | |
""" | |
self.exporter.shutdown() | |
def create_counter(self, name: str, description: str, unit: str, | |
labelnames: Optional[Sequence[str]] = None) -> Counter: | |
""" | |
Create a counter metric. | |
Args: | |
name: The name of the metric. | |
description: The description of the metric. | |
unit: The unit of the metric. | |
Returns: | |
The counter metric. | |
""" | |
return PrometheusCounter(name, description, unit, self, labelnames) | |
def create_un_down_counter(self, name: str, description: str, unit: str, | |
labelnames: Optional[Sequence[str]] = None) -> UpDownCounter: | |
""" | |
Create an up-down counter metric. | |
Args: | |
name: The name of the metric. | |
description: The description of the metric. | |
unit: The unit of the metric. | |
Returns: | |
The up-down counter metric. | |
""" | |
return PrometheusUpDownCounter(name, description, unit, self, labelnames) | |
def create_gauge(self, name: str, description: str, unit: str, labelnames: Optional[Sequence[str]] = None) -> Gauge: | |
""" | |
Create a gauge metric. | |
Args: | |
name: The name of the metric. | |
description: The description of the metric. | |
unit: The unit of the metric. | |
Returns: | |
The gauge metric. | |
""" | |
return PrometheusGauge(name, description, unit, self, labelnames) | |
def create_histogram(self, | |
name: str, | |
description: str, | |
unit: str, | |
buckets: Optional[Sequence[float]] = None, | |
labelnames: Optional[Sequence[str]] = None) -> Histogram: | |
""" | |
Create a histogram metric. | |
Args: | |
name: The name of the metric. | |
description: The description of the metric. | |
unit: The unit of the metric. | |
buckets: The buckets of the histogram. | |
Returns: | |
The histogram metric. | |
""" | |
return PrometheusHistogram(name, description, unit, self, buckets, labelnames) | |
class PrometheusCounter(Counter): | |
""" | |
PrometheusCounter is a subclass of Counter, representing a counter metric for Prometheus. | |
""" | |
def __init__(self, | |
name: str, | |
description: str, | |
unit: str, | |
provider: MetricProvider, | |
labelnames: Optional[Sequence[str]] = None): | |
""" | |
Initialize the PrometheusCounter. | |
Args: | |
name: The name of the metric. | |
description: The description of the metric. | |
unit: The unit of the metric. | |
provider: The provider of the metric. | |
""" | |
labelnames = labelnames or [] | |
super().__init__(name, description, unit, provider, labelnames) | |
self._counter = PCounter(name=name, documentation=description, labelnames=labelnames, unit=unit) | |
def add(self, value: int, labels: dict = None) -> None: | |
""" | |
Add a value to the counter. | |
Args: | |
value: The value to add to the counter. | |
labels: The labels to associate with the value. | |
""" | |
if labels: | |
self._counter.labels(**labels).inc(value) | |
else: | |
self._counter.inc(value) | |
class PrometheusUpDownCounter(UpDownCounter): | |
""" | |
PrometheusUpDownCounter is a subclass of UpDownCounter, representing an up-down counter metric for Prometheus. | |
""" | |
def __init__(self, | |
name: str, | |
description: str, | |
unit: str, | |
provider: MetricProvider, | |
labelnames: Optional[Sequence[str]] = None): | |
""" | |
Initialize the PrometheusUpDownCounter. | |
Args: | |
name: The name of the metric. | |
description: The description of the metric. | |
unit: The unit of the metric. | |
provider: The provider of the metric. | |
""" | |
labelnames = labelnames or [] | |
super().__init__(name, description, unit, provider, labelnames) | |
self._gauge = PGauge(name=name, documentation=description, labelnames=labelnames, unit=unit) | |
def inc(self, value: int, labels: dict = None) -> None: | |
""" | |
Add a value to the counter. | |
Args: | |
value: The value to add to the counter. | |
labels: The labels to associate with the value. | |
""" | |
if labels: | |
self._gauge.labels(**labels).inc(value) | |
else: | |
self._gauge.inc(value) | |
def dec(self, value: int, labels: dict = None) -> None: | |
""" | |
Subtract a value from the counter. | |
Args: | |
value: The value to subtract from the counter. | |
labels: The labels to associate with the value. | |
""" | |
if labels: | |
self._gauge.labels(**labels).dec(value) | |
else: | |
self._gauge.dec(value) | |
class PrometheusGauge(Gauge): | |
""" | |
PrometheusGauge is a subclass of Gauge, representing a gauge metric for Prometheus. | |
""" | |
def __init__(self, | |
name: str, | |
description: str, | |
unit: str, | |
provider: MetricProvider, | |
labelnames: Optional[Sequence[str]] = None): | |
""" | |
Initialize the PrometheusGauge. | |
Args: | |
name: The name of the metric. | |
description: The description of the metric. | |
unit: The unit of the metric. | |
provider: The provider of the metric. | |
""" | |
labelnames = labelnames or [] | |
super().__init__(name, description, unit, provider, labelnames) | |
self._gauge = PGauge(name=name, documentation=description, labelnames=labelnames, unit=unit) | |
def set(self, value: int, labels: dict = None) -> None: | |
""" | |
Set the value of the gauge. | |
Args: | |
value: The value to set the gauge to. | |
labels: The labels to associate with the value. | |
""" | |
if labels: | |
self._gauge.labels(**labels).set(value) | |
else: | |
self._gauge.set(value) | |
def inc(self, value: int, labels: dict = None) -> None: | |
""" | |
Add a value to the gauge. | |
Args: | |
value: The value to add to the gauge. | |
labels: The labels to associate with the value. | |
""" | |
if labels: | |
self._gauge.labels(**labels).inc(value) | |
else: | |
self._gauge.inc(value) | |
def dec(self, value: int, labels: dict = None) -> None: | |
""" | |
Subtract a value from the gauge. | |
Args: | |
value: The value to subtract from the gauge. | |
labels: The labels to associate with the value. | |
""" | |
if labels: | |
self._gauge.labels(**labels).dec(value) | |
else: | |
self._gauge.dec(value) | |
class PrometheusHistogram(Histogram): | |
""" | |
PrometheusHistogram is a subclass of Histogram, representing a histogram metric for Prometheus. | |
""" | |
def __init__(self, | |
name: str, | |
description: str, | |
unit: str, | |
provider: MetricProvider, | |
buckets: Sequence[float] = None, | |
labelnames: Optional[Sequence[str]] = None): | |
""" | |
Initialize the PrometheusHistogram. | |
Args: | |
name: The name of the metric. | |
description: The description of the metric. | |
unit: The unit of the metric. | |
provider: The provider of the metric. | |
""" | |
labelnames = labelnames or [] | |
super().__init__(name, description, unit, provider, buckets, labelnames) | |
if buckets: | |
self._histogram = PHistogram(name=name, documentation=description, labelnames=labelnames, unit=unit, | |
buckets=buckets) | |
else: | |
self._histogram = PHistogram(name=name, documentation=description, labelnames=labelnames, unit=unit) | |
def record(self, value: int, labels: dict = None) -> None: | |
""" | |
Record a value in the histogram. | |
Args: | |
value: The value to record in the histogram. | |
labels: The labels to associate with the value. | |
""" | |
if labels: | |
self._histogram.labels(**labels).observe(value) | |
else: | |
self._histogram.observe(value) | |
class PrometheusMetricExporter(MetricExporter): | |
""" | |
PrometheusMetricExporter is a class for exporting metrics to Prometheus. | |
""" | |
def __init__(self, port: int = 8000): | |
""" | |
Initialize the PrometheusMetricExporter. | |
Args: | |
port: The port to use for the Prometheus server. | |
""" | |
self.port = port | |
server, server_thread = start_http_server(self.port) | |
self.server = server | |
self.server_thread = server_thread | |
def shutdown(self) -> None: | |
""" | |
Shutdown the PrometheusMetricExporter. | |
""" | |
self.server.shutdown() | |
self.server_thread.join() | |
class PrometheusConsoleMetricExporter(MetricExporter): | |
"""Implementation of :class:`MetricExporter` that prints metrics to the | |
console. | |
This class can be used for diagnostic purposes. It prints the exported | |
metrics to the console STDOUT. | |
""" | |
def __init__(self, out_interval_secs: float = 1.0): | |
"""Initialize the console exporter.""" | |
self._should_shutdown = False | |
self.out_interval_secs = out_interval_secs | |
self.metrics_thread = threading.Thread(target=self._output_metrics_to_console) | |
self.metrics_thread.daemon = True | |
self.metrics_thread.start() | |
def generate_latest(self, registry: CollectorRegistry = REGISTRY) -> bytes: | |
"""Returns the metrics from the registry in latest text format as a string.""" | |
def sample_line(line): | |
if line.labels: | |
labelstr = '{{{0}}}'.format(','.join( | |
['{}="{}"'.format( | |
k, v.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"')) | |
for k, v in sorted(line.labels.items())])) | |
else: | |
labelstr = '' | |
timestamp = '' | |
if line.timestamp is not None: | |
# Convert to milliseconds. | |
timestamp = f' {int(float(line.timestamp) * 1000):d}' | |
return f'{line.name}{labelstr} {line.value}{timestamp}\n' | |
output = [] | |
for metric in registry.collect(): | |
try: | |
om_samples: Dict[str, List[str]] = {} | |
for s in metric.samples: | |
for suffix in ['_gsum', '_gcount']: | |
if s.name == metric.name + suffix: | |
# OpenMetrics specific sample, put in a gauge at the end. | |
om_samples.setdefault(suffix, []).append(sample_line(s)) | |
break | |
else: | |
output.append(sample_line(s)) | |
except Exception as exception: | |
exception.args = (exception.args or ('',)) + (metric,) | |
raise | |
for suffix, lines in sorted(om_samples.items()): | |
output.extend(lines) | |
return ''.join(output).encode('utf-8') | |
def _output_metrics_to_console(self): | |
while not self._should_shutdown: | |
metrics_text = self.generate_latest(REGISTRY) | |
print(metrics_text.decode('utf-8')) | |
time.sleep(self.out_interval_secs) | |
def shutdown(self) -> None: | |
""" | |
Shutdown the PrometheusConsoleMetricExporter. | |
""" | |
self._should_shutdown = True | |
def configure_prometheus_provider(backend: str, | |
base_url: str = None, | |
write_token: str = None, | |
**kwargs | |
): | |
""" | |
Initialize the prometheus metric provider. | |
Args: | |
backend: The backend of the metric provider. | |
base_url: The base url of the metric provider. | |
write_token: The write token of the metric provider. | |
""" | |
if backend == "console": | |
exporter = PrometheusConsoleMetricExporter(out_interval_secs=2) | |
set_metric_provider(PrometheusMetricProvider(exporter)) | |
elif backend == "prometheus": | |
exporter = PrometheusMetricExporter() | |
set_metric_provider(PrometheusMetricProvider(exporter)) | |