Source code for locust_telemetry.recorders.otel.otel

import logging
from typing import Sequence

from locust.env import Environment
from opentelemetry import metrics
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource

from locust_telemetry import config
from locust_telemetry.common import helpers as h
from locust_telemetry.core.events import TelemetryEventsEnum, TelemetryMetricsEnum
from locust_telemetry.recorders.otel.exceptions import OtelMetricAlreadyRegisteredError

logger = logging.getLogger(__name__)


[docs] class InstrumentRegistry: """ Registry for managing OpenTelemetry metric instruments. This class provides a structured way to register and retrieve metric instruments, ensuring uniqueness and consistency across the application. """
[docs] def __init__(self, meter: metrics.Meter): """ Initialize the instrument registry. Parameters ---------- meter : metrics.Meter The OpenTelemetry meter used to create instruments. """ self._registry: dict[ TelemetryEventsEnum | TelemetryMetricsEnum, h.InstrumentType ] = {} self.meter = meter
[docs] def extend(self, items: Sequence[h.InstrumentSpec]) -> None: """ Register multiple metric instruments in the registry. Parameters ---------- items : Sequence[InstrumentSpec] A list of instrument specifications to register. Raises ------ ValueError If a metric is already registered. """ for spec in items: if spec.metric in self._registry: raise OtelMetricAlreadyRegisteredError( f"[otel] Metric '{spec.metric.value}' already registered." ) instrument = spec.factory( meter=self.meter, name=spec.metric.value, description=spec.metric.value, unit=spec.unit, callbacks=spec.callbacks or [], ) self._registry[spec.metric] = instrument logger.debug(f"[otel] Registered metric: {spec.metric.value}")
[docs] def get( self, key: TelemetryEventsEnum | TelemetryMetricsEnum ) -> h.InstrumentType | None: """ Retrieve a registered instrument by its metric identifier. Parameters ---------- key : TelemetryEventsEnum | TelemetryMetricsEnum The metric identifier. Returns ------- h.InstrumentType The registered instrument, or None if not found. """ return self._registry.get(key)
[docs] def configure_otel(environment: Environment) -> None: """ Configure and initialize OpenTelemetry metrics for a Locust environment. This function: - Creates an OTLP exporter (gRPC). - Sets up a periodic metrics reader with the configured export interval. - Configures a MeterProvider with the given resource attributes. - Registers the meter provider globally. - Instantiates and attaches an InstrumentRegistry to the environment. Parameters ---------- environment : Any Locust environment object containing parsed options for OTEL configuration. Returns ------- Any The modified environment object with `otel_registry` attached. """ # Define resource metadata for the service resource = Resource.create( { "service.name": config.TELEMETRY_SERVICE_NAME, "service.instance.id": h.get_source_id(environment), } ) # Create the OTLP exporter (gRPC) exporter = OTLPMetricExporter( endpoint=environment.parsed_options.lt_otel_exporter_otlp_endpoint, insecure=environment.parsed_options.lt_otel_exporter_otlp_insecure, timeout=config.OTEL_EXPORTER_TIMEOUT, ) # Create a periodic exporting metric reader reader = PeriodicExportingMetricReader( exporter, export_interval_millis=( environment.parsed_options.lt_stats_recorder_interval * 1000 ), ) # Set up the meter provider with the reader provider = MeterProvider(resource=resource, metric_readers=[reader]) metrics.set_meter_provider(provider) # Attach an instrument registry to the environment environment.otel_registry = InstrumentRegistry( provider.get_meter(config.TELEMETRY_OTEL_METRICS_METER) ) logger.info("[otel] OpenTelemetry metrics configured successfully")