diff --git a/github-runner-manager/pyproject.toml b/github-runner-manager/pyproject.toml index c385816e20..241e6030df 100644 --- a/github-runner-manager/pyproject.toml +++ b/github-runner-manager/pyproject.toml @@ -3,7 +3,7 @@ [project] name = "github-runner-manager" -version = "0.12.0" +version = "0.13.0" authors = [ { name = "Canonical IS DevOps", email = "is-devops-team@canonical.com" }, ] diff --git a/github-runner-manager/src/github_runner_manager/configuration/base.py b/github-runner-manager/src/github_runner_manager/configuration/base.py index 44b65235cf..0ce6e18f03 100644 --- a/github-runner-manager/src/github_runner_manager/configuration/base.py +++ b/github-runner-manager/src/github_runner_manager/configuration/base.py @@ -50,6 +50,8 @@ class ApplicationConfiguration(BaseModel): non_reactive_configuration: Configuration for non-reactive mode. reactive_configuration: Configuration for reactive mode. openstack_configuration: Configuration for authorization to a OpenStack host. + planner_url: Base URL of the planner service. + planner_token: Bearer token to authenticate against the planner service. reconcile_interval: Seconds to wait between reconciliation. """ @@ -61,6 +63,8 @@ class ApplicationConfiguration(BaseModel): non_reactive_configuration: "NonReactiveConfiguration" reactive_configuration: "ReactiveConfiguration | None" openstack_configuration: OpenStackConfiguration + planner_url: Optional[AnyHttpUrl] = None + planner_token: Optional[str] = None reconcile_interval: int @staticmethod diff --git a/github-runner-manager/src/github_runner_manager/manager/pressure_reconciler.py b/github-runner-manager/src/github_runner_manager/manager/pressure_reconciler.py new file mode 100644 index 0000000000..8b86fedccc --- /dev/null +++ b/github-runner-manager/src/github_runner_manager/manager/pressure_reconciler.py @@ -0,0 +1,215 @@ +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Planner-driven pressure reconciler. + +Creates or deletes runners based on pressure signals from the planner +service. Runs in two independent loops (create/delete) and coordinates +access to the underlying RunnerManager via the provided lock. +""" + +from __future__ import annotations + +import logging +import time +from dataclasses import dataclass +from threading import Event, Lock +from typing import Optional + +from github_runner_manager.errors import MissingServerConfigError +from github_runner_manager.manager.runner_manager import RunnerManager, RunnerMetadata +from github_runner_manager.planner_client import PlannerApiError, PlannerClient + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class PressureReconcilerConfig: + """Configuration for pressure reconciliation. + + Attributes: + flavor_name: Name of the planner flavor to reconcile. + reconcile_interval: Seconds between timer-based delete reconciliations. + min_pressure: Minimum desired runner count (floor) for the flavor. + Also used as fallback when the planner is unavailable. + """ + + flavor_name: str + reconcile_interval: int = 5 * 60 + min_pressure: int = 0 + + +class PressureReconciler: # pylint: disable=too-few-public-methods + """Continuously reconciles runner count against planner pressure. + + This reconciler keeps the total number of runners near the desired level + indicated by the planner's pressure for a given flavor. It operates in two + threads: + - create loop: scales up when desired exceeds current total + - delete loop: scales down when current exceeds desired + + Concurrency with any other reconcile loop is protected by a shared lock. + + The delete loop uses the last pressure seen by the create loop rather than + fetching a fresh value, so it may act on a stale reading if pressure changed + between stream events. This is an accepted trade-off: the window is bounded + by the stream update frequency, and any over-deletion is self-correcting + because the create loop will scale back up on the next pressure event. + + Attributes: + _manager: Runner manager used to list, create, and clean up runners. + _planner: Client used to stream pressure updates. + _config: Reconciler configuration. + _lock: Shared lock to serialize operations with other reconcile loops. + _stop: Event used to signal streaming loops to stop gracefully. + _last_pressure: Last pressure value seen in the create stream. + """ + + def __init__( + self, + manager: RunnerManager, + planner_client: PlannerClient, + config: PressureReconcilerConfig, + lock: Lock, + ) -> None: + """Initialize reconciler state and dependencies. + + Args: + manager: Runner manager interface for creating, cleaning up, + and listing runners. + planner_client: Client used to stream pressure updates. + config: Reconciler configuration. + lock: Shared lock to serialize operations with other reconcile loops. + """ + self._manager = manager + self._planner = planner_client + self._config = config + self._lock = lock + + self._stop = Event() + self._last_pressure: Optional[int] = None + + def start_create_loop(self) -> None: + """Continuously create runners to satisfy planner pressure.""" + while not self._stop.is_set(): + try: + for update in self._planner.stream_pressure(self._config.flavor_name): + if self._stop.is_set(): + return + self._handle_create_runners(update.pressure) + except PlannerApiError: + fallback = max(self._last_pressure or 0, self._config.min_pressure) + logger.exception( + "Error in pressure stream loop, falling back to %s runners.", + fallback, + ) + self._handle_create_runners(fallback) + time.sleep(5) + + def start_delete_loop(self) -> None: + """Continuously delete runners using last seen pressure on a timer.""" + logger.debug("Delete loop: starting, interval=%ss", self._config.reconcile_interval) + while not self._stop.wait(self._config.reconcile_interval): + logger.debug("Delete loop: woke up, _last_pressure=%s", self._last_pressure) + if self._last_pressure is None: + logger.debug("Delete loop: no pressure seen yet, skipping.") + continue + self._handle_timer_reconcile(self._last_pressure) + + def stop(self) -> None: + """Signal the reconciler loops to stop gracefully.""" + self._stop.set() + + def _handle_create_runners(self, pressure: int) -> None: + """Create runners when desired exceeds current total. + + Args: + pressure: Current pressure value used to compute desired total. + """ + desired_total = self._desired_total_from_pressure(pressure) + logger.debug( + "Create loop: pressure=%s, desired=%s, updating _last_pressure", + pressure, + desired_total, + ) + self._last_pressure = pressure + with self._lock: + current_total = len(self._manager.get_runners()) + to_create = max(desired_total - current_total, 0) + if to_create <= 0: + logger.info( + "Create loop: nothing to do (desired=%s current=%s)", + desired_total, + current_total, + ) + return + logger.info( + "Create loop: creating %s runners (desired=%s current=%s)", + to_create, + desired_total, + current_total, + ) + try: + self._manager.create_runners(num=to_create, metadata=RunnerMetadata()) + except MissingServerConfigError: + logger.exception( + "Unable to create runners due to missing server configuration (image/flavor)." + ) + + def _handle_timer_reconcile(self, pressure: int) -> None: + """Clean up stale runners, then converge toward the desired count. + + Scales down (deletes) when current exceeds desired, and scales up + (creates) when current falls below desired after cleanup. + + Args: + pressure: Current pressure value used to compute desired total. + """ + desired_total = self._desired_total_from_pressure(pressure) + with self._lock: + self._manager.cleanup() + current_total = len(self._manager.get_runners()) + if current_total > desired_total: + to_delete = current_total - desired_total + logger.info( + "Timer: scaling down %s runners (desired=%s current=%s)", + to_delete, + desired_total, + current_total, + ) + self._manager.delete_runners(num=to_delete) + elif current_total < desired_total: + to_create = desired_total - current_total + logger.info( + "Timer: scaling up %s runners (desired=%s current=%s)", + to_create, + desired_total, + current_total, + ) + try: + self._manager.create_runners(num=to_create, metadata=RunnerMetadata()) + except MissingServerConfigError: + logger.exception( + "Unable to create runners due to missing server configuration" + " (image/flavor)." + ) + else: + logger.info( + "Timer: no changes needed (desired=%s current=%s)", + desired_total, + current_total, + ) + + def _desired_total_from_pressure(self, pressure: int) -> int: + """Compute desired runner total from planner pressure. + + Ensures non-negative totals and respects the configured `min_pressure` + floor. + + Args: + pressure: Current pressure value from planner. + + Returns: + The desired total number of runners. + """ + return max(pressure, self._config.min_pressure, 0) diff --git a/github-runner-manager/src/github_runner_manager/manager/runner_manager.py b/github-runner-manager/src/github_runner_manager/manager/runner_manager.py index bb5ed421e1..152b74292e 100644 --- a/github-runner-manager/src/github_runner_manager/manager/runner_manager.py +++ b/github-runner-manager/src/github_runner_manager/manager/runner_manager.py @@ -264,10 +264,15 @@ def get_runners(self) -> tuple[RunnerInstance, ...]: ) def delete_runners(self, num: int) -> IssuedMetricEventsStats: - """Delete runners. + """Delete up to `num` runners, preferring idle ones over busy. + + Runners are selected in order: deletable → idle → busy. Busy runners + are only targeted when there are not enough idle runners to satisfy + `num`, and the GitHub API will reject deletion of runners actively + executing a job, so the actual number deleted may be less than `num`. Args: - num: The number of runner to delete. + num: The maximum number of runners to delete. Returns: Stats on metrics events issued during the deletion of runners. diff --git a/github-runner-manager/src/github_runner_manager/planner_client.py b/github-runner-manager/src/github_runner_manager/planner_client.py new file mode 100644 index 0000000000..02f315ef80 --- /dev/null +++ b/github-runner-manager/src/github_runner_manager/planner_client.py @@ -0,0 +1,117 @@ +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Client to interact with the planner service.""" + +import json +import logging +from dataclasses import dataclass +from typing import Iterable +from urllib.parse import urljoin + +import requests +import requests.adapters +import urllib3 +from pydantic import AnyHttpUrl, BaseModel + +logger = logging.getLogger(__name__) + + +@dataclass +class PressureInfo: + """Pressure information for a flavor returned by the planner service. + + Attributes: + pressure: Desired total runner count for the flavor. + """ + + pressure: int + + +class PlannerConfiguration(BaseModel): + """Configuration inputs for the PlannerClient. + + Attributes: + base_url: Base URL of the planner service. + token: Bearer token used to authenticate against the planner service. + timeout: Default timeout in seconds for HTTP requests. + """ + + base_url: AnyHttpUrl + token: str + timeout: int = 5 * 60 + + +class PlannerApiError(Exception): + """Represents an error while interacting with the planner service.""" + + +class PlannerClient: # pylint: disable=too-few-public-methods + """An HTTP client for the planner service.""" + + def __init__(self, config: PlannerConfiguration) -> None: + """Initialize client with planner configuration. + + Args: + config: Planner service configuration containing base URL, + authentication token, and default request timeout. + """ + self._session = self._create_session() + self._config = config + + def stream_pressure(self, name: str) -> Iterable[PressureInfo]: + """Stream pressure updates for the given flavor. + + Args: + name: Flavor name. + + Yields: + Parsed pressure updates. + + Raises: + PlannerApiError: On HTTP or stream errors. + """ + url = urljoin(str(self._config.base_url), f"/api/v1/flavors/{name}/pressure?stream=true") + try: + with self._session.get( + url, + headers={"Authorization": f"Bearer {self._config.token}"}, + timeout=self._config.timeout, + stream=True, + ) as response: + response.raise_for_status() + for line in response.iter_lines(decode_unicode=True): + if not line: + continue + try: + data = json.loads(line) + if not isinstance(data, dict) or name not in data: + logger.debug("Skipping non-pressure stream line: %s", line) + continue + yield PressureInfo(pressure=int(data[name])) + except json.JSONDecodeError: + logger.warning("Skipping malformed stream line: %s", line) + continue + except requests.RequestException as exc: + logger.exception("Error while streaming pressure for flavor '%s' from planner.", name) + raise PlannerApiError from exc + + @staticmethod + def _create_session() -> requests.Session: + """Create a requests session with retries and no env proxies. + + Returns: + A configured `requests.Session` instance. + """ + adapter = requests.adapters.HTTPAdapter( + max_retries=urllib3.Retry( + total=3, + backoff_factor=0.3, + status_forcelist=[500, 502, 503, 504], + ) + ) + + session = requests.Session() + session.mount("http://", adapter) + session.mount("https://", adapter) + return session diff --git a/github-runner-manager/tests/unit/manager/__init__.py b/github-runner-manager/tests/unit/manager/__init__.py new file mode 100644 index 0000000000..c6a4ad808e --- /dev/null +++ b/github-runner-manager/tests/unit/manager/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. diff --git a/github-runner-manager/tests/unit/manager/test_pressure_reconciler.py b/github-runner-manager/tests/unit/manager/test_pressure_reconciler.py new file mode 100644 index 0000000000..446196f467 --- /dev/null +++ b/github-runner-manager/tests/unit/manager/test_pressure_reconciler.py @@ -0,0 +1,175 @@ +"""Unit tests for PressureReconciler.""" + +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +from threading import Lock +from types import SimpleNamespace + +import pytest + +from github_runner_manager.manager.pressure_reconciler import ( + PressureReconciler, + PressureReconcilerConfig, +) +from github_runner_manager.planner_client import PlannerApiError + + +class _FakeManager: + """Lightweight runner manager stub for testing the reconciler.""" + + def __init__(self, runners_count: int = 0): + """Initialize with an optional number of pre-existing runners.""" + self._runners = [object() for _ in range(runners_count)] + self.created_args: list[int] = [] + self.cleanup_called = 0 + + def get_runners(self) -> list[object]: + """Return the current list of runners.""" + return list(self._runners) + + def create_runners(self, num: int, metadata: object): # noqa: ARG002 + """Record the creation request and extend the internal runner list.""" + self.created_args.append(num) + if num > 0: + self._runners.extend(object() for _ in range(num)) + + def cleanup(self): + """Increment the cleanup counter.""" + self.cleanup_called += 1 + + +class _FakePlanner: + """Planner client stub supplying pressure data for tests.""" + + def __init__( + self, + stream_updates: list[int] | None = None, + stream_raises: bool = False, + ): + """Initialize with configurable stream behavior.""" + self._stream_updates = stream_updates or [] + self._stream_raises = stream_raises + + def stream_pressure(self, name: str): # noqa: ARG002 + """Yield pressure updates or raise PlannerApiError based on configuration. + + Yields: + Namespace objects with a pressure attribute. + """ + if self._stream_raises: + raise PlannerApiError + for p in self._stream_updates: + yield SimpleNamespace(pressure=p) + + +def test_min_pressure_used_as_fallback_when_stream_errors(monkeypatch: pytest.MonkeyPatch): + """ + arrange: A reconciler whose planner stream raises PlannerApiError and no prior pressure. + act: Call start_create_loop. + assert: min_pressure is used as fallback to create runners. + """ + mgr = _FakeManager() + planner = _FakePlanner(stream_raises=True) + cfg = PressureReconcilerConfig(flavor_name="small", min_pressure=2) + reconciler = PressureReconciler(mgr, planner, cfg, lock=Lock()) + + def _stop_after_backoff(_seconds: int): + """Stop the reconciler after the backoff sleep is triggered.""" + reconciler.stop() + + monkeypatch.setattr( + "github_runner_manager.manager.pressure_reconciler.time.sleep", _stop_after_backoff + ) + reconciler.start_create_loop() + + assert 2 in mgr.created_args + + +def test_fallback_preserves_last_pressure_when_higher(monkeypatch: pytest.MonkeyPatch): + """ + arrange: A reconciler with last_pressure=10 and min_pressure=2 whose stream errors. + act: Call start_create_loop. + assert: The higher last_pressure is used as fallback instead of min_pressure. + """ + mgr = _FakeManager() + planner = _FakePlanner(stream_raises=True) + cfg = PressureReconcilerConfig(flavor_name="small", min_pressure=2) + reconciler = PressureReconciler(mgr, planner, cfg, lock=Lock()) + reconciler._last_pressure = 10 + + def _stop_after_backoff(_seconds: int): + """Stop the reconciler after the backoff sleep is triggered.""" + reconciler.stop() + + monkeypatch.setattr( + "github_runner_manager.manager.pressure_reconciler.time.sleep", _stop_after_backoff + ) + reconciler.start_create_loop() + + assert 10 in mgr.created_args + + +def test_delete_loop_uses_cached_pressure(monkeypatch: pytest.MonkeyPatch): + """ + arrange: A reconciler with a cached last_pressure value. + act: Call start_delete_loop. + assert: Cleanup runs and runners are created based on the cached pressure. + """ + mgr = _FakeManager() + planner = _FakePlanner() + cfg = PressureReconcilerConfig(flavor_name="small", reconcile_interval=60) + reconciler = PressureReconciler(mgr, planner, cfg, lock=Lock()) + reconciler._last_pressure = 3 + wait_calls = {"count": 0} + + def _wait(_interval: int) -> bool: + """Return False once to enter the loop, then True to exit.""" + wait_calls["count"] += 1 + return wait_calls["count"] > 1 + + monkeypatch.setattr(reconciler._stop, "wait", _wait) + reconciler.start_delete_loop() + + assert mgr.cleanup_called == 1 + assert mgr.created_args == [3] + + +def test_delete_loop_skips_when_no_cached_pressure(monkeypatch: pytest.MonkeyPatch): + """ + arrange: A reconciler with no cached pressure (None). + act: Call start_delete_loop. + assert: No cleanup or creation occurs. + """ + mgr = _FakeManager() + planner = _FakePlanner() + cfg = PressureReconcilerConfig(flavor_name="small", reconcile_interval=60) + reconciler = PressureReconciler(mgr, planner, cfg, lock=Lock()) + wait_calls = {"count": 0} + + def _wait(_interval: int) -> bool: + """Return True (stop signal) after the second call.""" + wait_calls["count"] += 1 + return wait_calls["count"] > 1 + + monkeypatch.setattr(reconciler._stop, "wait", _wait) + reconciler.start_delete_loop() + + assert mgr.cleanup_called == 0 + + +def test_handle_timer_reconcile_uses_desired_total_not_raw_pressure(): + """ + arrange: A reconciler with 4 runners and min_pressure=5. + act: Call _handle_timer_reconcile with pressure 0. + assert: Cleanup runs and 1 runner is created to reach the min_pressure floor. + """ + mgr = _FakeManager(runners_count=4) + planner = _FakePlanner() + cfg = PressureReconcilerConfig(flavor_name="small", min_pressure=5) + reconciler = PressureReconciler(mgr, planner, cfg, lock=Lock()) + + reconciler._handle_timer_reconcile(0) + + assert mgr.cleanup_called == 1 + assert mgr.created_args == [1] diff --git a/github-runner-manager/tests/unit/test_config.py b/github-runner-manager/tests/unit/test_config.py index 460a53b7ae..f92f6e1551 100644 --- a/github-runner-manager/tests/unit/test_config.py +++ b/github-runner-manager/tests/unit/test_config.py @@ -93,6 +93,8 @@ username: test_username network: test_network vm_prefix: test_unit +planner_token: planner-testing-token +planner_url: http://planner.example.com reconcile_interval: 10 """ @@ -175,6 +177,8 @@ def app_config_fixture() -> ApplicationConfiguration: region_name="test_region", ), ), + planner_token="planner-testing-token", + planner_url="http://planner.example.com", reconcile_interval=10, ) @@ -200,3 +204,15 @@ def test_load_configuration_from_yaml(app_config: ApplicationConfiguration): yaml_config = yaml.safe_load(StringIO(SAMPLE_YAML_CONFIGURATION)) loaded_app_config = ApplicationConfiguration.validate(yaml_config) assert loaded_app_config == app_config + + +def test_configuration_allows_empty_planner_fields(): + """Planner URL/token are optional for non-planner mode.""" + config = yaml.safe_load(StringIO(SAMPLE_YAML_CONFIGURATION)) + config["planner_url"] = None + config["planner_token"] = None + + loaded = ApplicationConfiguration.validate(config) + + assert loaded.planner_url is None + assert loaded.planner_token is None diff --git a/github-runner-manager/tests/unit/test_planner_client.py b/github-runner-manager/tests/unit/test_planner_client.py new file mode 100644 index 0000000000..a0c4dcddc7 --- /dev/null +++ b/github-runner-manager/tests/unit/test_planner_client.py @@ -0,0 +1,148 @@ +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Unit tests for PlannerClient.""" + +import json +from types import SimpleNamespace + +import requests + +from github_runner_manager.planner_client import ( + PlannerClient, + PlannerConfiguration, +) + + +class _FakeResponse: + """Minimal Response-like object used to stub `requests.Response`.""" + + def __init__(self, status_code: int = 200, lines: list[str] | None = None) -> None: + """Minimal Response-like object used to stub `requests.Response`. + + Args: + status_code: HTTP status code to emulate. + lines: Lines yielded by `iter_lines()` for streaming tests. + """ + self.status_code = status_code + self._lines = lines or [] + self._closed = False + + def raise_for_status(self) -> None: + """Raise an HTTPError if status is 4xx/5xx. + + Raises: + HTTPError: When `status_code` indicates an error. + """ + if self.status_code >= 400: + raise requests.HTTPError("HTTP error") + + def iter_lines(self, decode_unicode: bool = True): + """Yield configured NDJSON lines for streaming tests. + + Args: + decode_unicode: Included for compatibility; not used in stub. + + Yields: + str: Individual NDJSON lines. + """ + for line in self._lines: + yield line + + def close(self) -> None: + """Mark the response as closed (context manager support).""" + self._closed = True + + def __enter__(self): + """Enter the context manager and return self.""" + return self + + def __exit__(self, exc_type, exc, tb): + """Exit the context manager, closing the response. + + Args: + exc_type: Exception type, if any. + exc: Exception instance, if any. + tb: Traceback, if any. + + Returns: + bool: False to propagate any exception. + """ + self.close() + return False + + +class _FakeSession: + """Minimal Session-like object used to stub `requests.Session`.""" + + def __init__(self) -> None: + """Initialize the fake session.""" + self.last_get: SimpleNamespace | None = None + + def mount(self, *_args, **_kwargs): + """Compatibility no-op for adapter mounting.""" + return None + + def get(self, url: str, headers: dict[str, str], timeout: int, stream: bool = False): + """Record GET call parameters and return a default fake response. + + Args: + url: The request URL. + headers: Request headers. + timeout: Request timeout in seconds. + stream: Whether the response should be streamed. + + Returns: + _FakeResponse: Default 200 response; tests monkeypatch this. + """ + self.last_get = SimpleNamespace(url=url, headers=headers, timeout=timeout, stream=stream) + # Default successful response; individual tests can monkeypatch this method + return _FakeResponse(status_code=200) + + +def _fake_get_stream_response(lines: list[str], status_code: int = 200): + """Build a stub `Session.get` returning stream lines (NDJSON). + + Args: + lines: List of NDJSON lines to yield. + status_code: HTTP status code for the response (default: 200). + + Returns: + Callable: A function compatible with `Session.get` signature. + """ + + def _fake_get(url, headers, timeout, stream=False): + """Return a fake streaming response yielding predefined lines. + + Args: + url: Request URL (ignored by stub). + headers: Request headers (ignored by stub). + timeout: Request timeout in seconds (ignored by stub). + stream: Whether streaming is requested (ignored by stub). + + Returns: + _FakeResponse: Response configured with the provided NDJSON lines. + """ + return _FakeResponse(status_code=status_code, lines=lines) + + return _fake_get + + +def test_stream_pressure_success(monkeypatch): + """ + arrange: Fake session streams NDJSON lines with a blank heartbeat. + act: Iterate over stream_pressure('small'). + assert: Yields expected pressure updates, skipping blank heartbeat lines. + """ + cfg = PlannerConfiguration(base_url="http://localhost:8080", token="t") + client = PlannerClient(cfg) + + fake_session = _FakeSession() + monkeypatch.setattr(client, "_session", fake_session) + + lines = [json.dumps({"small": 2}), "", json.dumps({"small": 5})] + monkeypatch.setattr(fake_session, "get", _fake_get_stream_response(lines)) + + updates = list(client.stream_pressure("small")) + assert updates[0].pressure == 2 + assert updates[1].pressure == 5 diff --git a/github-runner-manager/tests/unit/test_runner_scaler.py b/github-runner-manager/tests/unit/test_runner_scaler.py index f13a277674..6abd5df990 100644 --- a/github-runner-manager/tests/unit/test_runner_scaler.py +++ b/github-runner-manager/tests/unit/test_runner_scaler.py @@ -186,6 +186,8 @@ def application_configuration_fixture() -> ApplicationConfiguration: region_name="region", ), ), + planner_token="planner-testing-token", + planner_url="http://planner.example.com", reconcile_interval=10, )