Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion github-runner-manager/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

[project]
name = "github-runner-manager"
version = "0.12.0"
version = "0.13.0"
authors = [
{ name = "Canonical IS DevOps", email = "is-devops-team@canonical.com" },
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class ApplicationConfiguration(BaseModel):
non_reactive_configuration: Configuration for non-reactive mode.
reactive_configuration: Configuration for reactive mode.
openstack_configuration: Configuration for authorization to a OpenStack host.
planner_url: Base URL of the planner service.
planner_token: Bearer token to authenticate against the planner service.
reconcile_interval: Seconds to wait between reconciliation.
"""

Expand All @@ -61,6 +63,8 @@ class ApplicationConfiguration(BaseModel):
non_reactive_configuration: "NonReactiveConfiguration"
reactive_configuration: "ReactiveConfiguration | None"
openstack_configuration: OpenStackConfiguration
planner_url: Optional[AnyHttpUrl] = None
planner_token: Optional[str] = None
reconcile_interval: int

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
# Copyright 2026 Canonical Ltd.
# See LICENSE file for licensing details.

"""Planner-driven pressure reconciler.

Creates or deletes runners based on pressure signals from the planner
service. Runs in two independent loops (create/delete) and coordinates
access to the underlying RunnerManager via the provided lock.
"""

from __future__ import annotations

import logging
import time
from dataclasses import dataclass
from threading import Event, Lock
from typing import Optional

from github_runner_manager.errors import MissingServerConfigError
from github_runner_manager.manager.runner_manager import RunnerManager, RunnerMetadata
from github_runner_manager.planner_client import PlannerApiError, PlannerClient

logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class PressureReconcilerConfig:
"""Configuration for pressure reconciliation.

Attributes:
flavor_name: Name of the planner flavor to reconcile.
reconcile_interval: Seconds between timer-based delete reconciliations.
min_pressure: Minimum desired runner count (floor) for the flavor.
Also used as fallback when the planner is unavailable.
"""

flavor_name: str
reconcile_interval: int = 5 * 60
min_pressure: int = 0


class PressureReconciler: # pylint: disable=too-few-public-methods
"""Continuously reconciles runner count against planner pressure.

This reconciler keeps the total number of runners near the desired level
indicated by the planner's pressure for a given flavor. It operates in two
threads:
- create loop: scales up when desired exceeds current total
- delete loop: scales down when current exceeds desired

Concurrency with any other reconcile loop is protected by a shared lock.

The delete loop uses the last pressure seen by the create loop rather than
fetching a fresh value, so it may act on a stale reading if pressure changed
between stream events. This is an accepted trade-off: the window is bounded
by the stream update frequency, and any over-deletion is self-correcting
because the create loop will scale back up on the next pressure event.

Attributes:
_manager: Runner manager used to list, create, and clean up runners.
_planner: Client used to stream pressure updates.
_config: Reconciler configuration.
_lock: Shared lock to serialize operations with other reconcile loops.
_stop: Event used to signal streaming loops to stop gracefully.
_last_pressure: Last pressure value seen in the create stream.
"""

def __init__(
self,
manager: RunnerManager,
planner_client: PlannerClient,
config: PressureReconcilerConfig,
lock: Lock,
) -> None:
"""Initialize reconciler state and dependencies.

Args:
manager: Runner manager interface for creating, cleaning up,
and listing runners.
planner_client: Client used to stream pressure updates.
config: Reconciler configuration.
lock: Shared lock to serialize operations with other reconcile loops.
"""
self._manager = manager
self._planner = planner_client
self._config = config
self._lock = lock

self._stop = Event()
self._last_pressure: Optional[int] = None

def start_create_loop(self) -> None:
"""Continuously create runners to satisfy planner pressure."""
while not self._stop.is_set():
try:
for update in self._planner.stream_pressure(self._config.flavor_name):
if self._stop.is_set():
return
self._handle_create_runners(update.pressure)
except PlannerApiError:
fallback = max(self._last_pressure or 0, self._config.min_pressure)
logger.exception(
"Error in pressure stream loop, falling back to %s runners.",
fallback,
)
self._handle_create_runners(fallback)
time.sleep(5)

def start_delete_loop(self) -> None:
"""Continuously delete runners using last seen pressure on a timer."""
logger.debug("Delete loop: starting, interval=%ss", self._config.reconcile_interval)
while not self._stop.wait(self._config.reconcile_interval):
logger.debug("Delete loop: woke up, _last_pressure=%s", self._last_pressure)
if self._last_pressure is None:
logger.debug("Delete loop: no pressure seen yet, skipping.")
continue
self._handle_timer_reconcile(self._last_pressure)

def stop(self) -> None:
"""Signal the reconciler loops to stop gracefully."""
self._stop.set()

def _handle_create_runners(self, pressure: int) -> None:
"""Create runners when desired exceeds current total.

Args:
pressure: Current pressure value used to compute desired total.
"""
desired_total = self._desired_total_from_pressure(pressure)
logger.debug(
"Create loop: pressure=%s, desired=%s, updating _last_pressure",
pressure,
desired_total,
)
self._last_pressure = pressure
with self._lock:
current_total = len(self._manager.get_runners())
to_create = max(desired_total - current_total, 0)
if to_create <= 0:
logger.info(
"Create loop: nothing to do (desired=%s current=%s)",
desired_total,
current_total,
)
return
logger.info(
"Create loop: creating %s runners (desired=%s current=%s)",
to_create,
desired_total,
current_total,
)
try:
self._manager.create_runners(num=to_create, metadata=RunnerMetadata())
except MissingServerConfigError:
logger.exception(
"Unable to create runners due to missing server configuration (image/flavor)."
)

def _handle_timer_reconcile(self, pressure: int) -> None:
"""Clean up stale runners, then converge toward the desired count.

Scales down (deletes) when current exceeds desired, and scales up
(creates) when current falls below desired after cleanup.

Args:
pressure: Current pressure value used to compute desired total.
"""
desired_total = self._desired_total_from_pressure(pressure)
with self._lock:
self._manager.cleanup()
current_total = len(self._manager.get_runners())
if current_total > desired_total:
to_delete = current_total - desired_total
logger.info(
"Timer: scaling down %s runners (desired=%s current=%s)",
to_delete,
desired_total,
current_total,
)
self._manager.delete_runners(num=to_delete)
elif current_total < desired_total:
to_create = desired_total - current_total
logger.info(
"Timer: scaling up %s runners (desired=%s current=%s)",
to_create,
desired_total,
current_total,
)
try:
self._manager.create_runners(num=to_create, metadata=RunnerMetadata())
except MissingServerConfigError:
logger.exception(
"Unable to create runners due to missing server configuration"
" (image/flavor)."
)
else:
logger.info(
"Timer: no changes needed (desired=%s current=%s)",
desired_total,
current_total,
)

def _desired_total_from_pressure(self, pressure: int) -> int:
"""Compute desired runner total from planner pressure.

Ensures non-negative totals and respects the configured `min_pressure`
floor.

Args:
pressure: Current pressure value from planner.

Returns:
The desired total number of runners.
"""
return max(pressure, self._config.min_pressure, 0)
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,15 @@ def get_runners(self) -> tuple[RunnerInstance, ...]:
)

def delete_runners(self, num: int) -> IssuedMetricEventsStats:
"""Delete runners.
"""Delete up to `num` runners, preferring idle ones over busy.

Runners are selected in order: deletable → idle → busy. Busy runners
are only targeted when there are not enough idle runners to satisfy
`num`, and the GitHub API will reject deletion of runners actively
executing a job, so the actual number deleted may be less than `num`.

Args:
num: The number of runner to delete.
num: The maximum number of runners to delete.

Returns:
Stats on metrics events issued during the deletion of runners.
Expand Down
117 changes: 117 additions & 0 deletions github-runner-manager/src/github_runner_manager/planner_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Copyright 2026 Canonical Ltd.
# See LICENSE file for licensing details.

"""Client to interact with the planner service."""

import json
import logging
from dataclasses import dataclass
from typing import Iterable
from urllib.parse import urljoin

import requests
import requests.adapters
import urllib3
from pydantic import AnyHttpUrl, BaseModel

logger = logging.getLogger(__name__)


@dataclass
class PressureInfo:
"""Pressure information for a flavor returned by the planner service.

Attributes:
pressure: Desired total runner count for the flavor.
"""

pressure: int


class PlannerConfiguration(BaseModel):
"""Configuration inputs for the PlannerClient.

Attributes:
base_url: Base URL of the planner service.
token: Bearer token used to authenticate against the planner service.
timeout: Default timeout in seconds for HTTP requests.
"""

base_url: AnyHttpUrl
token: str
timeout: int = 5 * 60


class PlannerApiError(Exception):
"""Represents an error while interacting with the planner service."""


class PlannerClient: # pylint: disable=too-few-public-methods
"""An HTTP client for the planner service."""

def __init__(self, config: PlannerConfiguration) -> None:
"""Initialize client with planner configuration.

Args:
config: Planner service configuration containing base URL,
authentication token, and default request timeout.
"""
self._session = self._create_session()
self._config = config

def stream_pressure(self, name: str) -> Iterable[PressureInfo]:
"""Stream pressure updates for the given flavor.

Args:
name: Flavor name.

Yields:
Parsed pressure updates.

Raises:
PlannerApiError: On HTTP or stream errors.
"""
url = urljoin(str(self._config.base_url), f"/api/v1/flavors/{name}/pressure?stream=true")
try:
with self._session.get(
url,
headers={"Authorization": f"Bearer {self._config.token}"},
timeout=self._config.timeout,
stream=True,
) as response:
response.raise_for_status()
for line in response.iter_lines(decode_unicode=True):
if not line:
continue
try:
data = json.loads(line)
if not isinstance(data, dict) or name not in data:
logger.debug("Skipping non-pressure stream line: %s", line)
continue
yield PressureInfo(pressure=int(data[name]))
except json.JSONDecodeError:
logger.warning("Skipping malformed stream line: %s", line)
continue
except requests.RequestException as exc:
logger.exception("Error while streaming pressure for flavor '%s' from planner.", name)
raise PlannerApiError from exc

@staticmethod
def _create_session() -> requests.Session:
"""Create a requests session with retries and no env proxies.

Returns:
A configured `requests.Session` instance.
"""
adapter = requests.adapters.HTTPAdapter(
max_retries=urllib3.Retry(
total=3,
backoff_factor=0.3,
status_forcelist=[500, 502, 503, 504],
)
)

session = requests.Session()
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
2 changes: 2 additions & 0 deletions github-runner-manager/tests/unit/manager/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright 2026 Canonical Ltd.
# See LICENSE file for licensing details.
Loading
Loading