Source code for locust_telemetry.core.recorder

import logging
import os
import socket
from abc import ABC
from typing import Any, Type

from locust.env import Environment

from locust_telemetry.core.handlers import (
    BaseLifecycleHandler,
    BaseOutputHandler,
    BaseRequestHandler,
    BaseSystemMetricsHandler,
)

logger = logging.getLogger(__name__)


[docs] class BaseRecorder(ABC): """ Abstract base class for telemetry recorders. Responsibilities ---------------- - Store the Locust environment and telemetry handlers. - Provide helper methods for concrete recorders. - Allow subclasses to register event listeners as needed. """ def __init__( self, env: Environment, output_handler_cls: Type[BaseOutputHandler], lifecycle_handler_cls: Type[BaseLifecycleHandler], system_handler_cls: Type[BaseSystemMetricsHandler], requests_handler_cls: Type[BaseRequestHandler], ) -> None: self.env: Environment = env self._username: str = os.getenv("USER", "unknown") self._hostname: str = socket.gethostname() self._pid: int = os.getpid() # Initialize handlers self.output = output_handler_cls(env) self.lifecycle = lifecycle_handler_cls(self.output, env) self.system = system_handler_cls(self.output, env) self.requests = requests_handler_cls(self.output, env)
[docs] def on_cpu_warning(self, *args: Any, **kwargs: Any) -> None: """ Handle a CPU usage warning raised by Locust. Forwards the warning to the lifecycle handler with telemetry metadata. Parameters ---------- *args : Any args from locust event **kwargs : Any Additional keyword arguments from the Locust event. """ self.lifecycle.on_cpu_warning(value=kwargs.get("cpu_usage"), unit="percent")
[docs] def on_test_start(self, *args: Any, **kwargs: Any) -> None: """ Base handler for test start event. Registers CPU warning listener. Subclasses can extend this method to start other handlers. """ self.env.events.cpu_warning.add_listener(self.on_cpu_warning)
[docs] def on_test_stop(self, *args: Any, **kwargs: Any) -> None: """ Base handler for test stop event. Removes CPU warning listener. Subclasses can extend this method to stop other handlers. """ self.env.events.cpu_warning.remove_listener(self.on_cpu_warning)
[docs] class MasterNodeRecorder(BaseRecorder): """ Telemetry recorder for the Locust master node. Registers master-specific event listeners and coordinates telemetry collection across system, lifecycle, and request metrics. """ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) # Register master-only event listeners self.env.events.test_start.add_listener(self.on_test_start) self.env.events.test_stop.add_listener(self.on_test_stop) self.env.events.spawning_complete.add_listener(self.on_spawning_complete)
[docs] def on_test_start(self, *args: Any, **kwargs: Any) -> None: """ Handle the `test_start` event. Starts lifecycle recording, request metrics collection, and system metrics collection. """ super().on_test_start(*args, **kwargs) self.lifecycle.on_test_start() self.requests.start() self.system.start()
[docs] def on_test_stop(self, *args: Any, **kwargs: Any) -> None: """ Handle the `test_stop` event. Stops request metrics and system metrics collection, and forwards the event to the lifecycle handler. """ super().on_test_stop(*args, **kwargs) self.lifecycle.on_test_stop() self.requests.stop() self.system.stop()
[docs] def on_spawning_complete(self, user_count: int) -> None: """ Handle the `spawning_complete` event. Parameters ---------- user_count : int Number of users that have been spawned. """ self.lifecycle.on_spawning_complete(user_count=user_count)
[docs] class WorkerNodeRecorder(BaseRecorder): """ Telemetry recorder for the Locust worker node. Focuses on system metrics and CPU warnings, delegating lifecycle event logging to the master. """ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) # Register worker event listeners self.env.events.test_start.add_listener(self.on_test_start) self.env.events.test_stop.add_listener(self.on_test_stop)
[docs] def on_test_start(self, *args: Any, **kwargs: Any) -> None: """ Handle the `test_start` event. Starts system metrics collection and registers a CPU warning listener. Note ---- Lifecycle events are handled by the master, so no call to `self.lifecycle.on_test_start`. """ self.system.start() self.requests.start()
[docs] def on_test_stop(self, *args: Any, **kwargs: Any) -> None: """ Handle the `test_stop` event. Stops system metrics collection and unregisters the CPU warning listener. Note ---- Lifecycle events are handled by the master, so no call to `self.lifecycle.on_test_stop`. """ self.system.stop() self.requests.stop()