-
Notifications
You must be signed in to change notification settings - Fork 25
feat: planner client and pressure reconciler with unit tests #734
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
cba58bc
feat: planner client and pressure reconciler with unit tests
cbartz fe5a2b8
bump application version to 0.13.0
cbartz aac6c22
revert aproxy_address fix (moved to fix/aproxy-requirement branch)
cbartz bddeb63
change arrange act format in test doc strings
cbartz 8c8ad7b
address copilot review comment
cbartz f8a354f
address copilot review comment
cbartz f87143d
address copilot review comment
cbartz 59be5ad
address review comment
cbartz 15048df
use configured min_pressure
cbartz e47d828
remove dead code
cbartz 1b16606
remove early exit on error optimization
cbartz 3bf8b3d
_handle_create -> _handle_create_runners
cbartz 6c9f0b3
use ints instead of floats for pressure
cbartz 0ca6d17
fix stream pressure client
cbartz 10421c7
change pressure to ints in unit test
cbartz f691783
remove ruff noqa
cbartz 569f1d1
use arrange/act/assert docstrings
cbartz a79638d
use min_pressure instead of fallback runners
cbartz 23462de
use last pressure for fallback
cbartz 73c5d3b
add MissingServerConfigError
cbartz 93a05a0
rework unit test
cbartz 2cfc4d0
lint
cbartz File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
215 changes: 215 additions & 0 deletions
215
github-runner-manager/src/github_runner_manager/manager/pressure_reconciler.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,215 @@ | ||
| # Copyright 2026 Canonical Ltd. | ||
| # See LICENSE file for licensing details. | ||
|
|
||
| """Planner-driven pressure reconciler. | ||
|
|
||
| Creates or deletes runners based on pressure signals from the planner | ||
| service. Runs in two independent loops (create/delete) and coordinates | ||
| access to the underlying RunnerManager via the provided lock. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import logging | ||
| import time | ||
| from dataclasses import dataclass | ||
| from threading import Event, Lock | ||
| from typing import Optional | ||
|
|
||
| from github_runner_manager.errors import MissingServerConfigError | ||
| from github_runner_manager.manager.runner_manager import RunnerManager, RunnerMetadata | ||
| from github_runner_manager.planner_client import PlannerApiError, PlannerClient | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class PressureReconcilerConfig: | ||
| """Configuration for pressure reconciliation. | ||
|
|
||
| Attributes: | ||
| flavor_name: Name of the planner flavor to reconcile. | ||
| reconcile_interval: Seconds between timer-based delete reconciliations. | ||
| min_pressure: Minimum desired runner count (floor) for the flavor. | ||
| Also used as fallback when the planner is unavailable. | ||
| """ | ||
|
|
||
| flavor_name: str | ||
| reconcile_interval: int = 5 * 60 | ||
| min_pressure: int = 0 | ||
|
|
||
|
|
||
| class PressureReconciler: # pylint: disable=too-few-public-methods | ||
| """Continuously reconciles runner count against planner pressure. | ||
|
|
||
| This reconciler keeps the total number of runners near the desired level | ||
| indicated by the planner's pressure for a given flavor. It operates in two | ||
| threads: | ||
| - create loop: scales up when desired exceeds current total | ||
| - delete loop: scales down when current exceeds desired | ||
|
|
||
| Concurrency with any other reconcile loop is protected by a shared lock. | ||
|
|
||
| The delete loop uses the last pressure seen by the create loop rather than | ||
| fetching a fresh value, so it may act on a stale reading if pressure changed | ||
| between stream events. This is an accepted trade-off: the window is bounded | ||
| by the stream update frequency, and any over-deletion is self-correcting | ||
| because the create loop will scale back up on the next pressure event. | ||
|
|
||
| Attributes: | ||
| _manager: Runner manager used to list, create, and clean up runners. | ||
| _planner: Client used to stream pressure updates. | ||
| _config: Reconciler configuration. | ||
| _lock: Shared lock to serialize operations with other reconcile loops. | ||
| _stop: Event used to signal streaming loops to stop gracefully. | ||
| _last_pressure: Last pressure value seen in the create stream. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| manager: RunnerManager, | ||
| planner_client: PlannerClient, | ||
| config: PressureReconcilerConfig, | ||
| lock: Lock, | ||
| ) -> None: | ||
| """Initialize reconciler state and dependencies. | ||
|
|
||
| Args: | ||
| manager: Runner manager interface for creating, cleaning up, | ||
| and listing runners. | ||
| planner_client: Client used to stream pressure updates. | ||
| config: Reconciler configuration. | ||
| lock: Shared lock to serialize operations with other reconcile loops. | ||
| """ | ||
| self._manager = manager | ||
| self._planner = planner_client | ||
| self._config = config | ||
| self._lock = lock | ||
|
|
||
| self._stop = Event() | ||
| self._last_pressure: Optional[int] = None | ||
|
|
||
| def start_create_loop(self) -> None: | ||
| """Continuously create runners to satisfy planner pressure.""" | ||
| while not self._stop.is_set(): | ||
| try: | ||
| for update in self._planner.stream_pressure(self._config.flavor_name): | ||
| if self._stop.is_set(): | ||
| return | ||
| self._handle_create_runners(update.pressure) | ||
| except PlannerApiError: | ||
| fallback = max(self._last_pressure or 0, self._config.min_pressure) | ||
| logger.exception( | ||
| "Error in pressure stream loop, falling back to %s runners.", | ||
| fallback, | ||
| ) | ||
| self._handle_create_runners(fallback) | ||
| time.sleep(5) | ||
|
|
||
| def start_delete_loop(self) -> None: | ||
| """Continuously delete runners using last seen pressure on a timer.""" | ||
| logger.debug("Delete loop: starting, interval=%ss", self._config.reconcile_interval) | ||
| while not self._stop.wait(self._config.reconcile_interval): | ||
| logger.debug("Delete loop: woke up, _last_pressure=%s", self._last_pressure) | ||
| if self._last_pressure is None: | ||
| logger.debug("Delete loop: no pressure seen yet, skipping.") | ||
| continue | ||
| self._handle_timer_reconcile(self._last_pressure) | ||
cbartz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| def stop(self) -> None: | ||
| """Signal the reconciler loops to stop gracefully.""" | ||
| self._stop.set() | ||
|
|
||
| def _handle_create_runners(self, pressure: int) -> None: | ||
| """Create runners when desired exceeds current total. | ||
|
|
||
| Args: | ||
| pressure: Current pressure value used to compute desired total. | ||
| """ | ||
| desired_total = self._desired_total_from_pressure(pressure) | ||
| logger.debug( | ||
| "Create loop: pressure=%s, desired=%s, updating _last_pressure", | ||
| pressure, | ||
| desired_total, | ||
| ) | ||
| self._last_pressure = pressure | ||
| with self._lock: | ||
| current_total = len(self._manager.get_runners()) | ||
| to_create = max(desired_total - current_total, 0) | ||
| if to_create <= 0: | ||
| logger.info( | ||
| "Create loop: nothing to do (desired=%s current=%s)", | ||
| desired_total, | ||
| current_total, | ||
| ) | ||
| return | ||
| logger.info( | ||
| "Create loop: creating %s runners (desired=%s current=%s)", | ||
| to_create, | ||
| desired_total, | ||
| current_total, | ||
| ) | ||
| try: | ||
| self._manager.create_runners(num=to_create, metadata=RunnerMetadata()) | ||
| except MissingServerConfigError: | ||
| logger.exception( | ||
| "Unable to create runners due to missing server configuration (image/flavor)." | ||
| ) | ||
|
|
||
| def _handle_timer_reconcile(self, pressure: int) -> None: | ||
| """Clean up stale runners, then converge toward the desired count. | ||
|
|
||
| Scales down (deletes) when current exceeds desired, and scales up | ||
| (creates) when current falls below desired after cleanup. | ||
florentianayuwono marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| Args: | ||
| pressure: Current pressure value used to compute desired total. | ||
| """ | ||
| desired_total = self._desired_total_from_pressure(pressure) | ||
| with self._lock: | ||
| self._manager.cleanup() | ||
| current_total = len(self._manager.get_runners()) | ||
| if current_total > desired_total: | ||
| to_delete = current_total - desired_total | ||
| logger.info( | ||
| "Timer: scaling down %s runners (desired=%s current=%s)", | ||
| to_delete, | ||
| desired_total, | ||
| current_total, | ||
| ) | ||
| self._manager.delete_runners(num=to_delete) | ||
| elif current_total < desired_total: | ||
| to_create = desired_total - current_total | ||
| logger.info( | ||
| "Timer: scaling up %s runners (desired=%s current=%s)", | ||
| to_create, | ||
| desired_total, | ||
| current_total, | ||
| ) | ||
| try: | ||
| self._manager.create_runners(num=to_create, metadata=RunnerMetadata()) | ||
| except MissingServerConfigError: | ||
| logger.exception( | ||
| "Unable to create runners due to missing server configuration" | ||
| " (image/flavor)." | ||
| ) | ||
| else: | ||
| logger.info( | ||
| "Timer: no changes needed (desired=%s current=%s)", | ||
| desired_total, | ||
| current_total, | ||
| ) | ||
|
|
||
| def _desired_total_from_pressure(self, pressure: int) -> int: | ||
| """Compute desired runner total from planner pressure. | ||
|
|
||
| Ensures non-negative totals and respects the configured `min_pressure` | ||
| floor. | ||
|
|
||
| Args: | ||
| pressure: Current pressure value from planner. | ||
|
|
||
| Returns: | ||
| The desired total number of runners. | ||
| """ | ||
| return max(pressure, self._config.min_pressure, 0) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
117 changes: 117 additions & 0 deletions
117
github-runner-manager/src/github_runner_manager/planner_client.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,117 @@ | ||
| # Copyright 2026 Canonical Ltd. | ||
| # See LICENSE file for licensing details. | ||
|
|
||
| """Client to interact with the planner service.""" | ||
|
|
||
| import json | ||
| import logging | ||
| from dataclasses import dataclass | ||
| from typing import Iterable | ||
| from urllib.parse import urljoin | ||
|
|
||
| import requests | ||
| import requests.adapters | ||
| import urllib3 | ||
| from pydantic import AnyHttpUrl, BaseModel | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| @dataclass | ||
| class PressureInfo: | ||
| """Pressure information for a flavor returned by the planner service. | ||
|
|
||
| Attributes: | ||
| pressure: Desired total runner count for the flavor. | ||
cbartz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """ | ||
|
|
||
| pressure: int | ||
|
|
||
|
|
||
| class PlannerConfiguration(BaseModel): | ||
| """Configuration inputs for the PlannerClient. | ||
|
|
||
| Attributes: | ||
| base_url: Base URL of the planner service. | ||
| token: Bearer token used to authenticate against the planner service. | ||
| timeout: Default timeout in seconds for HTTP requests. | ||
| """ | ||
|
|
||
| base_url: AnyHttpUrl | ||
| token: str | ||
| timeout: int = 5 * 60 | ||
|
|
||
|
|
||
| class PlannerApiError(Exception): | ||
| """Represents an error while interacting with the planner service.""" | ||
|
|
||
|
|
||
| class PlannerClient: # pylint: disable=too-few-public-methods | ||
| """An HTTP client for the planner service.""" | ||
|
|
||
| def __init__(self, config: PlannerConfiguration) -> None: | ||
| """Initialize client with planner configuration. | ||
|
|
||
| Args: | ||
| config: Planner service configuration containing base URL, | ||
| authentication token, and default request timeout. | ||
| """ | ||
| self._session = self._create_session() | ||
| self._config = config | ||
|
|
||
| def stream_pressure(self, name: str) -> Iterable[PressureInfo]: | ||
| """Stream pressure updates for the given flavor. | ||
|
|
||
| Args: | ||
| name: Flavor name. | ||
|
|
||
| Yields: | ||
| Parsed pressure updates. | ||
|
|
||
| Raises: | ||
| PlannerApiError: On HTTP or stream errors. | ||
| """ | ||
| url = urljoin(str(self._config.base_url), f"/api/v1/flavors/{name}/pressure?stream=true") | ||
| try: | ||
| with self._session.get( | ||
| url, | ||
| headers={"Authorization": f"Bearer {self._config.token}"}, | ||
| timeout=self._config.timeout, | ||
| stream=True, | ||
| ) as response: | ||
| response.raise_for_status() | ||
| for line in response.iter_lines(decode_unicode=True): | ||
| if not line: | ||
| continue | ||
| try: | ||
| data = json.loads(line) | ||
| if not isinstance(data, dict) or name not in data: | ||
| logger.debug("Skipping non-pressure stream line: %s", line) | ||
| continue | ||
cbartz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| yield PressureInfo(pressure=int(data[name])) | ||
| except json.JSONDecodeError: | ||
| logger.warning("Skipping malformed stream line: %s", line) | ||
| continue | ||
| except requests.RequestException as exc: | ||
| logger.exception("Error while streaming pressure for flavor '%s' from planner.", name) | ||
| raise PlannerApiError from exc | ||
|
|
||
| @staticmethod | ||
| def _create_session() -> requests.Session: | ||
| """Create a requests session with retries and no env proxies. | ||
|
|
||
| Returns: | ||
| A configured `requests.Session` instance. | ||
| """ | ||
| adapter = requests.adapters.HTTPAdapter( | ||
| max_retries=urllib3.Retry( | ||
| total=3, | ||
| backoff_factor=0.3, | ||
| status_forcelist=[500, 502, 503, 504], | ||
| ) | ||
| ) | ||
|
|
||
| session = requests.Session() | ||
| session.mount("http://", adapter) | ||
| session.mount("https://", adapter) | ||
| return session | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| # Copyright 2026 Canonical Ltd. | ||
| # See LICENSE file for licensing details. |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.