"""
OpenTelemetry handlers for Locust.
This module provides handler implementations for lifecycle events, system
metrics, request metrics, and output in the context of OpenTelemetry.
These handlers are used by OTEL recorders for both master and worker
nodes to collect and export telemetry data to an OTLP endpoint.
"""
import logging
from typing import Any, List, Sequence
import psutil
from locust.env import Environment
from opentelemetry.metrics import Observation
from locust_telemetry.common import helpers as h
from locust_telemetry.core.events import TelemetryEventsEnum, TelemetryMetricsEnum
from locust_telemetry.core.handlers import (
BaseLifecycleHandler,
BaseOutputHandler,
BaseRequestHandler,
BaseSystemMetricsHandler,
)
from locust_telemetry.recorders.otel.exceptions import OtelMetricNotRegisteredError
logger = logging.getLogger(__name__)
[docs]
class OtelOutputHandler(BaseOutputHandler):
"""
OpenTelemetry output handler.
Responsible for recording lifecycle events and metrics using registered
OTEL instruments.
"""
[docs]
def __init__(self, env: Environment):
"""
Initialize the output handler.
Parameters
----------
env : Environment
Locust Environment instance containing the OpenTelemetry meter
and instrument registry.
"""
super().__init__(env)
[docs]
def record_event(
self, tl_type: TelemetryEventsEnum, *args: Any, **kwargs: Any
) -> None:
"""
Record a lifecycle event as a counter with event type as attribute.
Parameters
----------
tl_type : TelemetryEventsEnum
The lifecycle event being recorded (e.g., test.start, test.stop).
*args : Any
Additional unused arguments.
**kwargs : Any
Attributes to attach to the recorded metric.
"""
context = self.get_context(active=True)
instrument: h.InstrumentType = self.env.otel_registry.get(
TelemetryEventsEnum.TEST
)
if not instrument:
logger.error(
"[otel] Event metric not registered: %s", TelemetryEventsEnum.TEST.value
)
raise OtelMetricNotRegisteredError(
f"{self.__class__.__name__}: Metric not "
f"registered: {TelemetryEventsEnum.TEST.value}"
)
instrument.add(1, attributes={"event": tl_type.value, **context, **kwargs})
logger.debug("[otel] Recorded event: %s", tl_type.value)
[docs]
def record_metrics(
self, tl_type: TelemetryMetricsEnum, *args: Any, **kwargs: Any
) -> None:
"""
Record request or custom metrics.
Parameters
----------
tl_type : TelemetryMetricsEnum
The metric being recorded (e.g., request_success).
*args : Any
Positional arguments containing metric values (e.g., response time).
**kwargs : Any
Attributes to attach to the recorded metric.
"""
context = self.get_context(active=True)
instrument: h.InstrumentType = self.env.otel_registry.get(tl_type)
if not instrument:
logger.error("[otel] Metric not registered: %s", tl_type.value)
raise OtelMetricNotRegisteredError(
f"{self.__class__.__name__}: Metric not registered: {tl_type.value}"
)
instrument.record(
args[0], attributes={"metric": tl_type.value, **context, **kwargs}
)
[docs]
class OtelLifecycleHandler(BaseLifecycleHandler):
"""
OpenTelemetry lifecycle handler.
Extends the base lifecycle handler to register OTEL instruments for
test events and user counts.
"""
[docs]
def __init__(self, output: OtelOutputHandler, env: Environment):
"""
Initialize the lifecycle handler and register instruments.
Parameters
----------
output : OtelOutputHandler
The OTEL output handler for recording metrics.
env : Environment
Locust Environment instance.
"""
super().__init__(output, env)
self.output: OtelOutputHandler = output
self.env.otel_registry.extend(self.instruments)
@property
def instruments(self) -> Sequence[h.InstrumentSpec]:
"""All the instruments that is required by this handler"""
return (
h.InstrumentSpec(
metric=TelemetryEventsEnum.TEST,
unit="1",
factory=h.create_otel_counter,
),
h.InstrumentSpec(
metric=TelemetryMetricsEnum.USER,
unit="1",
factory=h.create_otel_observable_gauge,
callbacks=[self._user_count_callback],
),
)
def _user_count_callback(self, options=None) -> List[Observation]:
"""
Observable callback for current active user count.
Returns
-------
List[Observation]
Single observation containing the active user count.
"""
return [Observation(self.env.runner.user_count, self.output.get_context())]
[docs]
class OtelSystemMetricsHandler(BaseSystemMetricsHandler):
"""
OpenTelemetry handler for system-level metrics.
Collects CPU usage, memory usage, and network I/O using psutil and
reports them via Observable Gauges.
"""
_process: psutil.Process = psutil.Process()
[docs]
def __init__(self, output: OtelOutputHandler, env: Environment):
"""
Initialize the system metrics handler.
Parameters
----------
output : OtelOutputHandler
Reference to the output handler for recording metrics.
env : Environment
Locust Environment instance.
"""
super().__init__(output, env)
h.warmup_psutil(self._process)
self.output: OtelOutputHandler = output
self.env.otel_registry.extend(self.instruments)
@property
def instruments(self) -> Sequence[h.InstrumentSpec]:
"""All the instruments that is required by this handler"""
return (
h.InstrumentSpec(
metric=TelemetryMetricsEnum.NETWORK,
unit="By",
factory=h.create_otel_observable_gauge,
callbacks=[self._network_usage_callback],
),
h.InstrumentSpec(
metric=TelemetryMetricsEnum.MEMORY,
unit="By",
factory=h.create_otel_observable_gauge,
callbacks=[self._memory_usage_callback],
),
h.InstrumentSpec(
metric=TelemetryMetricsEnum.CPU,
unit="%",
factory=h.create_otel_observable_gauge,
callbacks=[self._cpu_usage_callback],
),
)
def _network_usage_callback(self, options=None) -> List[Observation]:
"""
Callback for network I/O statistics.
Returns
-------
List[Observation]
Observations for bytes sent and received.
"""
io = psutil.net_io_counters()
ctx = self.output.get_context()
return [
Observation(io.bytes_sent, {**ctx, "direction": "sent"}),
Observation(io.bytes_recv, {**ctx, "direction": "recv"}),
]
def _memory_usage_callback(self, options=None) -> List[Observation]:
"""
Callback for process memory usage.
Returns
-------
List[Observation]
Observation for memory usage in MiB.
"""
memory_mib = h.convert_bytes_to_mib(self._process.memory_info().rss)
return [Observation(memory_mib, self.output.get_context())]
def _cpu_usage_callback(self, options=None) -> List[Observation]:
"""
Callback for process CPU usage.
Returns
-------
List[Observation]
Observation for CPU utilization percentage.
"""
return [Observation(self._process.cpu_percent(), self.output.get_context())]
[docs]
def start(self) -> None:
"""
Start request metrics collection (no-op, provided for interface compliance).
"""
[docs]
def stop(self) -> None:
"""
Stop request metrics collection (no-op, provided for interface compliance).
"""
[docs]
class OtelRequestHandler(BaseRequestHandler):
"""
OpenTelemetry handler for request-level metrics.
Registers histograms for request success and failure durations,
and records metrics for each request event.
"""
[docs]
def __init__(self, output: OtelOutputHandler, env: Environment):
"""
Initialize the request metrics handler.
Parameters
----------
output : OtelOutputHandler
Reference to the output handler for recording metrics.
env : Environment
Locust Environment instance.
"""
super().__init__(output, env)
self.output: OtelOutputHandler = output
self.env.otel_registry.extend(self.instruments)
@property
def instruments(self) -> Sequence[h.InstrumentSpec]:
"""All the instruments that is required by this handler"""
return (
h.InstrumentSpec(
metric=TelemetryMetricsEnum.REQUEST_SUCCESS,
unit="ms",
factory=h.create_otel_histogram,
),
h.InstrumentSpec(
metric=TelemetryMetricsEnum.REQUEST_ERROR,
unit="ms",
factory=h.create_otel_histogram,
),
)
[docs]
def on_request(self, *args: Any, **kwargs: Any) -> None:
"""
Handle a request event and record request duration.
Parameters
----------
*args : Any
Additional unused arguments.
**kwargs : Any
Keyword arguments containing request metadata,
such as response_time, name, and exception.
"""
response = kwargs.get("response")
is_error = bool(kwargs.get("exception"))
metric = (
TelemetryMetricsEnum.REQUEST_ERROR
if is_error
else TelemetryMetricsEnum.REQUEST_SUCCESS
)
self.output.record_metrics(
metric,
kwargs.get("response_time"),
endpoint=kwargs.get("name"),
status_code=response.status_code if response else 500,
)
[docs]
def start(self) -> None:
"""
Start request metrics collection (no-op, provided for interface compliance).
"""
[docs]
def stop(self) -> None:
"""
Stop request metrics collection (no-op, provided for interface compliance).
"""