Source code for locust_telemetry.recorders.json.handlers

"""
JSON telemetry handlers for Locust.
This module provides handler implementations for lifecycle events, system
metrics, request metrics, and structured JSON output. These handlers are
used by the JSON telemetry recorders for both master and worker nodes.
"""

import logging
from typing import Any, Optional

import gevent
import psutil
from locust.runners import WorkerRunner

from locust_telemetry.common import helpers as h
from locust_telemetry.core.events import TelemetryEventsEnum, TelemetryMetricsEnum
from locust_telemetry.core.handlers import (
    BaseLifecycleHandler,
    BaseOutputHandler,
    BaseRequestHandler,
    BaseSystemMetricsHandler,
)
from locust_telemetry.recorders.json.constants import (
    REQUEST_STATS_TYPE_CURRENT,
    REQUEST_STATS_TYPE_ENDPOINT,
    REQUEST_STATS_TYPE_FINAL,
    REQUEST_STATUS_ERROR,
    REQUEST_STATUS_SUCCESS,
    TEST_STOP_BUFFER_FOR_GRAPHS,
)

logger = logging.getLogger(__name__)


[docs] class JsonTelemetryOutputHandler(BaseOutputHandler): """ Output handler for JSON-based telemetry logging. Responsibilities ---------------- - Log lifecycle events and system/request metrics in JSON format. - Enrich logs with run-level context including run ID, testplan, and source (master/worker). """
[docs] def log_telemetry(self, event_type: str, event_name: str, **kwargs: Any) -> None: """ Log a telemetry data as json Parameters ---------- event_type : str The telemetry event type either 'event' or 'metrics'. event_name : str Name of the telemetry event or metrics **kwargs : dict Additional event/metrics metadata. """ payload = {**self.get_context(active=True), **kwargs} logger.info( f"Recording telemetry {event_type}: {event_name}", extra={ "telemetry": { "telemetry_type": event_type, "telemetry_name": event_name, **payload, } }, )
[docs] def record_event( self, tl_type: TelemetryEventsEnum, *args: Any, **kwargs: Any ) -> None: """ Record a telemetry event in JSON format. Parameters ---------- tl_type : TelemetryEvent The telemetry event enum. *args : tuple Additional positional arguments for the event. **kwargs : dict Additional event metadata. """ self.log_telemetry("event", tl_type.value, **kwargs)
[docs] def record_metrics( self, tl_type: TelemetryMetricsEnum, *args: Any, **kwargs: Any ) -> None: """ Record a telemetry metric in JSON format. Parameters ---------- tl_type : TelemetryMetric The telemetry metric enum. *args : tuple Additional positional arguments for the metric. **kwargs : dict Metric-specific attributes such as `value` and `unit`. """ self.log_telemetry("metrics", tl_type.value, **kwargs)
[docs] class JsonTelemetryLifecycleHandler(BaseLifecycleHandler): """ Lifecycle handler for JSON telemetry. This class inherits from `LifecycleHandlerBase` and handles Locust test lifecycle events. For JSON telemetry, lifecycle events are forwarded to the output handler for structured logging. Custom behavior: On test stop, adjusts the stop timestamp to account for a buffer used in JSON graphs, and emits a `SPAWNING_COMPLETE` event. Attributes ---------- output : OutputHandlerBase Output handler responsible for recording telemetry events. env : Environment The Locust environment instance. """
[docs] def on_test_stop(self, *args: Any, **kwargs: Any) -> None: """ Handle the `test_stop` event for JSON telemetry. Adjusts the test stop time by `TEST_STOP_BUFFER_FOR_GRAPHS` seconds to allow for post-test graph updates, then forwards the event to the output handler. Parameters ---------- *args : Any Positional arguments passed by Locust. **kwargs : Any Additional keyword arguments passed by Locust. """ end_time = h.get_utc_time_with_buffer( seconds_buffer=TEST_STOP_BUFFER_FOR_GRAPHS ) self.output.record_event( TelemetryEventsEnum.TEST_STOP, *args, end_time=end_time, **kwargs ) logger.info("[json] Recorded test stop event with adjusted end time.")
[docs] class JsonTelemetrySystemMetricsHandler(BaseSystemMetricsHandler): """ System metrics handler for JSON telemetry. Responsibilities ---------------- - Periodically capture process-level metrics (CPU and memory usage). - Forward metrics to the JSON output handler. - Run metrics collection in a background greenlet for non-blocking execution. """ _system_metrics_gevent: Optional[gevent.Greenlet] = None _process: psutil.Process = psutil.Process()
[docs] def start(self) -> None: """ Start system metrics collection. Spawns a greenlet that periodically collects CPU and memory metrics. """ # Warmup psutil to avoid starting from zero h.warmup_psutil(self._process) self._system_metrics_gevent = gevent.spawn(self._gevent_loop)
[docs] def stop(self) -> None: """ Stop system metrics collection. Terminates the greenlet collecting system metrics. Logs a warning if the collection loop was never started. """ if self._system_metrics_gevent is None: logger.warning("[json] Gevent loop never started") return self._system_metrics_gevent.kill() self._system_metrics_gevent = None
def _gevent_loop(self) -> None: """ Background loop for capturing system metrics. This method runs inside a greenlet and periodically records: - CPU usage (percent) - Memory usage (MiB) The interval between recordings is defined by ``self.env.parsed_options.lt_stats_recorder_interval``. Handles graceful termination on `GreenletExit` and logs any exceptions. """ try: while True: io = psutil.net_io_counters() cpu_usage = self._process.cpu_percent() # Convert bytes to MiB memory_usage = h.convert_bytes_to_mib(self._process.memory_info().rss) self.output.record_metrics( TelemetryMetricsEnum.CPU, value=cpu_usage, unit="percent" ) self.output.record_metrics( TelemetryMetricsEnum.MEMORY, value=memory_usage, unit="MiB" ) self.output.record_metrics( TelemetryMetricsEnum.NETWORK, value=io.bytes_sent, unit="MiB", direction="sent", ) self.output.record_metrics( TelemetryMetricsEnum.NETWORK, value=io.bytes_recv, unit="MiB", direction="recv", ) gevent.sleep(self.env.parsed_options.lt_stats_recorder_interval) except gevent.GreenletExit: logger.info("[json] System metrics collection terminated gracefully") except Exception: logger.exception("[json] System metrics collection loop failed") raise
[docs] class JsonTelemetryRequestHandler(BaseRequestHandler): """ JSON telemetry handler for Locust request events/metrics. This handler periodically collects aggregate request metrics and forwards them to the output handler in a format suitable for JSON logging. It also implements the `RequestHandlerBase` interface for individual request events, though in JSON telemetry no action is needed per-request. Attributes ---------- _request_metrics_gevent : Optional[gevent.Greenlet] Background greenlet for periodically collecting request metrics. """ _request_metrics_gevent: Optional[gevent.Greenlet] = None
[docs] def start(self) -> None: """ Start periodic request metrics collection. Spawns a background greenlet that logs aggregate request statistics at the configured interval. """ # Since this collects stats from master, there is no need to run in worker node if isinstance(self.env.runner, WorkerRunner): return self._request_metrics_gevent = gevent.spawn(self._gevent_loop)
[docs] def stop(self) -> None: """ Stop request metrics collection. Terminates the greenlet collecting request metrics. Logs a warning if the collection loop was never started. """ # Since this collects stats from master, there is no need to run in worker node if isinstance(self.env.runner, WorkerRunner): return if self._request_metrics_gevent is None: logger.warning("[json] Gevent loop never started") return self._request_metrics_gevent.kill() self._request_metrics_gevent = None # Collect final stats self._flush_stats()
def _flush_stats(self): """ Collect and log the final request statistics at the end of a test. Iterates over both successful and error request statistics and records them via the output handler. Percentile fields are normalized using `add_percentiles`. This method is called when stopping the request metrics collection greenlet to ensure all final metrics are emitted. """ # Final requests stats self.output.record_metrics( TelemetryMetricsEnum.REQUEST_STATS, stats_type=REQUEST_STATS_TYPE_FINAL, user_count=self.env.runner.user_count, **h.add_percentiles(self.env.stats.total.to_dict()), ) # Final request success and error stats by endpoint. final_stats_types = { REQUEST_STATUS_ERROR: self.env.stats.errors, REQUEST_STATUS_SUCCESS: self.env.stats.entries, } for status, stats in final_stats_types.items(): for _, stat in stats.items(): self.output.record_metrics( TelemetryMetricsEnum.REQUEST_STATS, stats_type=REQUEST_STATS_TYPE_ENDPOINT, status=status, **h.add_percentiles(stat.to_dict()), ) def _gevent_loop(self) -> None: """ Background loop for periodic request metrics collection. Continuously collects total request statistics and sends them to the output handler until the greenlet is killed. """ try: while True: stats = h.add_percentiles(self.env.stats.total.to_dict()) self.output.record_metrics( TelemetryMetricsEnum.REQUEST_STATS, stats_type=REQUEST_STATS_TYPE_CURRENT, user_count=self.env.runner.user_count, **stats, ) gevent.sleep(self.env.parsed_options.lt_stats_recorder_interval) except gevent.GreenletExit: logger.info("[json] Request stats logger terminated gracefully") except Exception: logger.exception("[json] Request metrics collection loop failed") raise
[docs] def on_request(self, *args: Any, **kwargs: Any) -> None: """ Handler for individual Locust request events. For JSON telemetry, per-request handling is not needed, so this is intentionally left empty. Parameters ---------- *args : Any Positional arguments from Locust request events. **kwargs : Any Keyword arguments from Locust request events. """