diff --git a/.github/workflows/integration_test.yaml b/.github/workflows/integration_test.yaml index 3787ed4196..836f6ba4dd 100644 --- a/.github/workflows/integration_test.yaml +++ b/.github/workflows/integration_test.yaml @@ -20,7 +20,7 @@ jobs: juju-channel: 3.6/stable provider: lxd test-tox-env: integration-juju3.6 - modules: '["test_multi_unit_same_machine", "test_charm_metrics_failure", "test_charm_metrics_success", "test_charm_fork_path_change", "test_charm_no_runner", "test_charm_upgrade", "test_reactive"]' + modules: '["test_multi_unit_same_machine", "test_charm_fork_path_change", "test_charm_no_runner", "test_charm_upgrade"]' # INTEGRATION_TOKEN, INTEGRATION_TOKEN_ALT, OS_* are passed through INTEGRATION_TEST_SECRET_ENV_VALUE_ # mapping. See CONTRIBUTING.md for more details. extra-arguments: | diff --git a/.github/workflows/test_github_runner_manager.yaml b/.github/workflows/test_github_runner_manager.yaml index bb8598f1d7..cdbecc563f 100644 --- a/.github/workflows/test_github_runner_manager.yaml +++ b/.github/workflows/test_github_runner_manager.yaml @@ -20,6 +20,7 @@ jobs: matrix: test-module: - test_debug_ssh + - test_metrics - test_planner_runner steps: - name: Checkout code diff --git a/github-runner-manager/src/github_runner_manager/manager/runner_manager.py b/github-runner-manager/src/github_runner_manager/manager/runner_manager.py index cc6e50d06b..dd32c02a6b 100644 --- a/github-runner-manager/src/github_runner_manager/manager/runner_manager.py +++ b/github-runner-manager/src/github_runner_manager/manager/runner_manager.py @@ -331,6 +331,15 @@ def _delete_runners_core( platform_runner_ids_to_delete = list( platform_runner_ids_to_cleanup | platform_runner_ids_to_scaledown ) + # Extract metrics BEFORE deleting runners from GitHub. Removing a runner + # causes the runner process to exit, which triggers VM shutdown — after + # that SSH is no longer available for metric extraction. + vm_ids_to_cleanup = list( + _get_vms_to_cleanup(vms=vms, runner_ids=platform_runner_ids_to_delete) + ) + logger.info("Extracting metrics from VMs: %s", vm_ids_to_cleanup) + extracted_metrics = list(self._cloud.extract_metrics(instance_ids=vm_ids_to_cleanup)) + logger.info("Deleting platform runners: %s", platform_runner_ids_to_delete) deleted_runner_ids = self._delete_runners(runner_ids=platform_runner_ids_to_delete) logger.info("Deleted runners: %s", deleted_runner_ids) @@ -346,11 +355,9 @@ def _delete_runners_core( runner_ids=runner_ids_for_vm_cleanup, ) ) - logger.info("Extracting metrics from VMs: %s", vm_ids_to_cleanup) - extracted_metrics = list(self._cloud.extract_metrics(instance_ids=vm_ids_to_cleanup)) logger.info("Deleting VMs: %s", vm_ids_to_cleanup) deleted_vms = self._delete_vms(vm_ids=vm_ids_to_cleanup) - logger.info("deleted VMs: %s", deleted_vms) + logger.info("Deleted VMs: %s", deleted_vms) return deleted_vms, extracted_metrics @@ -382,6 +389,15 @@ def flush_runners( platform_runner_ids_to_delete = list( platform_runner_ids_to_cleanup | platform_runner_ids_to_flush ) + # Extract metrics BEFORE deleting runners from GitHub. Removing a runner + # causes the runner process to exit, which triggers VM shutdown — after + # that SSH is no longer available for metric extraction. + all_candidate_vm_ids = list( + _get_vms_to_cleanup(vms=vms, runner_ids=platform_runner_ids_to_delete) + ) + logger.info("Extracting metrics from VMs: %s", all_candidate_vm_ids) + extracted_metrics = self._cloud.extract_metrics(instance_ids=all_candidate_vm_ids) + logger.info("Deleting platform runners: %s", platform_runner_ids_to_delete) deleted_runner_ids = self._delete_runners(runner_ids=platform_runner_ids_to_delete) logger.info("Deleted runners: %s", deleted_runner_ids) @@ -400,8 +416,6 @@ def flush_runners( ), ) ) - logger.info("Extracting metrics from VMs: %s", vm_ids_to_cleanup) - extracted_metrics = self._cloud.extract_metrics(instance_ids=vm_ids_to_cleanup) logger.info("Deleting VMs: %s", vm_ids_to_cleanup) deleted_vms = self._delete_vms(vm_ids=vm_ids_to_cleanup) logger.info("Deleted VMs: %s", deleted_vms) diff --git a/github-runner-manager/src/github_runner_manager/metrics/events.py b/github-runner-manager/src/github_runner_manager/metrics/events.py index 17a9cf5f3b..48b295b616 100644 --- a/github-runner-manager/src/github_runner_manager/metrics/events.py +++ b/github-runner-manager/src/github_runner_manager/metrics/events.py @@ -13,8 +13,7 @@ from github_runner_manager.errors import IssueMetricEventError from github_runner_manager.manager.vm_manager import CodeInformation -METRICS_LOG_PATH = Path(os.getenv("METRICS_LOG_PATH", "/var/log/github-runner-metrics.log")) - +_DEFAULT_METRICS_LOG_PATH = "/var/log/github-runner-metrics.log" logger = logging.getLogger(__name__) @@ -156,8 +155,18 @@ def issue_event(event: Event) -> None: Raises: IssueMetricEventError: If the event cannot be logged. """ + metrics_log_path = get_metrics_log_path() try: - with METRICS_LOG_PATH.open(mode="a", encoding="utf-8") as metrics_file: + with metrics_log_path.open(mode="a", encoding="utf-8") as metrics_file: metrics_file.write(f"{event.json(exclude_none=True)}\n") except OSError as exc: - raise IssueMetricEventError(f"Cannot write to {METRICS_LOG_PATH}") from exc + raise IssueMetricEventError(f"Cannot write to {metrics_log_path}") from exc + + +def get_metrics_log_path() -> Path: + """Get the metrics log path, reading the env var at call time rather than import time. + + Returns: + The metrics log file path. + """ + return Path(os.getenv("METRICS_LOG_PATH", _DEFAULT_METRICS_LOG_PATH)) diff --git a/github-runner-manager/tests/integration/conftest.py b/github-runner-manager/tests/integration/conftest.py index ffb8ca643e..541e2dd0e8 100644 --- a/github-runner-manager/tests/integration/conftest.py +++ b/github-runner-manager/tests/integration/conftest.py @@ -4,7 +4,6 @@ """Fixtures for github-runner-manager integration tests.""" import logging -import subprocess import time from pathlib import Path from typing import Generator @@ -15,7 +14,6 @@ from github.Auth import Token from github.Branch import Branch from github.Repository import Repository -from openstack.compute.v2.server import Server as OpenstackServer from .factories import GitHubConfig, OpenStackConfig, ProxyConfig, TestConfig from .planner_stub import PlannerStub, PlannerStubConfig @@ -23,80 +21,6 @@ logger = logging.getLogger(__name__) -def wait_for_runner( - openstack_connection: openstack.connection.Connection, - test_config: TestConfig, - timeout: int = 300, - interval: int = 5, -) -> tuple[OpenstackServer, str] | tuple[None, None]: - """Wait for an OpenStack runner to be created and return it with its IP. - - Args: - openstack_connection: OpenStack connection object. - test_config: Test configuration with VM prefix. - timeout: Maximum time to wait in seconds. - interval: Time between checks in seconds. - - Returns: - Tuple of (runner, ip) if found, or (None, None) if not found within timeout. - """ - start_time = time.time() - while time.time() - start_time < timeout: - servers = [ - server - for server in openstack_connection.list_servers() - if server.name.startswith(test_config.vm_prefix) - ] - if servers: - runner = servers[0] - logger.info("Found runner: %s", runner.name) - - ip = None - for network_addresses in runner.addresses.values(): - for address in network_addresses: - ip = address["addr"] - break - if ip: - break - - if ip: - return runner, ip - - time.sleep(interval) - - return None, None - - -def wait_for_no_runners( - openstack_connection: openstack.connection.Connection, - test_config: TestConfig, - timeout: int = 900, - interval: int = 15, -) -> bool: - """Wait until no VMs with the test prefix exist on OpenStack. - - Args: - openstack_connection: OpenStack connection object. - test_config: Test configuration with VM prefix. - timeout: Maximum time to wait in seconds. - interval: Time between checks in seconds. - - Returns: - True when no matching VMs exist; False if timeout is reached first. - """ - start = time.time() - while time.time() - start < timeout: - servers = [ - s - for s in openstack_connection.list_servers() - if s.name.startswith(test_config.vm_prefix) - ] - if not servers: - return True - time.sleep(interval) - return False - - @pytest.fixture(scope="module") def test_config(pytestconfig: pytest.Config) -> TestConfig: """Create a unique test configuration for parallel test execution. @@ -345,7 +269,7 @@ def github_repository(github_config: GitHubConfig) -> Repository: def github_branch( github_repository: Repository, test_config: TestConfig ) -> Generator[Branch, None, None]: - """Create a new branch for testing, from latest commit in current branch. + """Create a new branch for testing, from the repository's default branch. Args: github_repository: GitHub repository object. @@ -355,16 +279,9 @@ def github_branch( """ test_branch = f"test-{test_config.test_id}" - sha_result = subprocess.run( - ["/usr/bin/git", "rev-parse", "HEAD"], - capture_output=True, - text=True, - check=True, - ) - current_commit_sha = sha_result.stdout.strip() - + default_branch = github_repository.get_branch(github_repository.default_branch) branch_ref = github_repository.create_git_ref( - ref=f"refs/heads/{test_branch}", sha=current_commit_sha + ref=f"refs/heads/{test_branch}", sha=default_branch.commit.sha ) # Wait for branch to be available, GitHub is eventually consistent @@ -376,7 +293,11 @@ def github_branch( while time.time() - start_time < timeout: try: branch = github_repository.get_branch(test_branch) - logger.info("Created test branch: %s at SHA: %s", test_branch, current_commit_sha) + logger.info( + "Created test branch: %s at SHA: %s", + test_branch, + default_branch.commit.sha, + ) break except Exception as e: elapsed = time.time() - start_time diff --git a/github-runner-manager/tests/integration/factories.py b/github-runner-manager/tests/integration/factories.py index a7fe8db629..07228bcddf 100644 --- a/github-runner-manager/tests/integration/factories.py +++ b/github-runner-manager/tests/integration/factories.py @@ -141,6 +141,8 @@ def create_default_config( test_config: TestConfig | None = None, planner_url: str | None = None, planner_token: str | None = None, + reconcile_interval: int = 60, + base_virtual_machines: int = 1, ) -> dict[str, Any]: """Create a default test configuration dictionary. @@ -154,6 +156,8 @@ def create_default_config( Defaults to new unique values. planner_url: Planner service URL. Omitted from config when not provided. planner_token: Planner service token. Omitted from config when not provided. + reconcile_interval: Minutes between delete-loop reconciliation ticks. + base_virtual_machines: Floor for non-reactive desired runners. Returns: Configuration dictionary for the application. @@ -235,7 +239,7 @@ def create_default_config( "labels": ["noble", "x64"], }, "flavor": {"name": openstack_config.flavor or "small", "labels": ["small"]}, - "base_virtual_machines": 1, + "base_virtual_machines": base_virtual_machines, "max_total_virtual_machines": 0, } ] @@ -256,5 +260,5 @@ def create_default_config( }, **({"planner_url": planner_url} if planner_url else {}), **({"planner_token": planner_token} if planner_token else {}), - "reconcile_interval": 60, + "reconcile_interval": reconcile_interval, } diff --git a/github-runner-manager/tests/integration/metrics_helpers.py b/github-runner-manager/tests/integration/metrics_helpers.py new file mode 100644 index 0000000000..fcb956baf1 --- /dev/null +++ b/github-runner-manager/tests/integration/metrics_helpers.py @@ -0,0 +1,123 @@ +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Helpers for app-level integration metrics assertions.""" + +import json +import time +from pathlib import Path +from typing import Any + +from github.Repository import Repository + +from github_runner_manager.manager.vm_manager import PostJobStatus +from github_runner_manager.types_.github import JobConclusion + +TEST_WORKFLOW_NAMES = [ + "Workflow Dispatch Tests", + "Workflow Dispatch Crash Tests", + "Workflow Dispatch Failure Tests 2a34f8b1-41e4-4bcb-9bbf-7a74e6c482f7", +] + + +def clear_metrics_log(metrics_log_path: Path) -> None: + """Delete metrics log file to reset test state.""" + metrics_log_path.unlink(missing_ok=True) + + +def get_metrics_events(metrics_log_path: Path) -> list[dict[str, Any]]: + """Return metrics events from the log file.""" + if not metrics_log_path.exists(): + return [] + lines = metrics_log_path.read_text(encoding="utf-8").splitlines() + return [json.loads(line) for line in lines if line.strip()] + + +def wait_for_events( + metrics_log_path: Path, + expected_events: set[str], + timeout: int = 10 * 60, + interval: int = 10, +) -> list[dict[str, Any]]: + """Wait until all expected event names are present in the metrics log.""" + deadline = time.time() + timeout + while time.time() < deadline: + events = get_metrics_events(metrics_log_path) + emitted = {event.get("event") for event in events} + if expected_events <= emitted: + return events + time.sleep(interval) + raise TimeoutError(f"Timed out waiting for metrics events: {sorted(expected_events)}") + + +def assert_events_after_reconciliation( + events: list[dict[str, Any]], + flavor: str, + github_repository: Repository, + post_job_status: PostJobStatus, +) -> None: + """Assert runner-start/stop/reconciliation metrics for a completed test flow.""" + emitted = {event.get("event") for event in events} + assert { + "runner_start", + "runner_stop", + "reconciliation", + } <= emitted, "Not all metrics events were logged" + + for metric in events: + if metric.get("event") == "runner_start": + assert metric.get("flavor") == flavor + assert metric.get("workflow") in TEST_WORKFLOW_NAMES + assert metric.get("repo") == github_repository.full_name + assert metric.get("github_event") == "workflow_dispatch" + _assert_non_negative_number(metric, "idle") + _assert_non_negative_number(metric, "queue_duration") + + if metric.get("event") == "runner_stop": + assert metric.get("flavor") == flavor + assert metric.get("workflow") in TEST_WORKFLOW_NAMES + assert metric.get("repo") == github_repository.full_name + assert metric.get("github_event") == "workflow_dispatch" + assert metric.get("status") == post_job_status + if post_job_status == PostJobStatus.ABNORMAL: + assert metric.get("status_info", {}).get("code", 0) != 0 + assert metric.get("job_conclusion") in [None, JobConclusion.CANCELLED] + else: + assert "status_info" not in metric + assert metric.get("job_conclusion") == JobConclusion.SUCCESS + _assert_non_negative_number(metric, "job_duration") + + if metric.get("event") == "reconciliation": + assert metric.get("flavor") == flavor + _assert_non_negative_number(metric, "duration") + assert metric.get("crashed_runners") == 0 + _assert_non_negative_number(metric, "idle_runners") + _assert_non_negative_number(metric, "active_runners") + _assert_non_negative_number(metric, "expected_runners") + + +def wait_for_runner_to_be_marked_offline( + github_repository: Repository, + runner_name: str, + timeout: int = 30 * 60, + interval: int = 60, +) -> None: + """Wait for a runner to become offline or disappear from GitHub.""" + deadline = time.time() + timeout + while time.time() < deadline: + for runner in github_repository.get_self_hosted_runners(): + if runner.name == runner_name: + if runner.status == "online": + time.sleep(interval) + break + else: + return + raise TimeoutError(f"Timeout while waiting for runner {runner_name} to be marked offline") + + +def _assert_non_negative_number(metric: dict[str, Any], key: str) -> None: + """Assert event key exists and contains a non-negative numeric value.""" + assert key in metric, f"Missing metric field: {key}" + value = metric[key] + assert isinstance(value, (int, float)), f"Metric field {key} is not numeric: {value!r}" + assert value >= 0, f"Metric field {key} is negative: {value!r}" diff --git a/github-runner-manager/tests/integration/openstack_helpers.py b/github-runner-manager/tests/integration/openstack_helpers.py new file mode 100644 index 0000000000..6643813e93 --- /dev/null +++ b/github-runner-manager/tests/integration/openstack_helpers.py @@ -0,0 +1,121 @@ +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""OpenStack helper functions shared by integration tests.""" + +import logging +import socket +import time +from pathlib import Path + +import openstack +from github.Repository import Repository +from openstack.compute.v2.server import Server as OpenstackServer + +from .factories import TestConfig + +logger = logging.getLogger(__name__) + + +def wait_for_runner( + openstack_connection: openstack.connection.Connection, + test_config: TestConfig, + timeout: int = 300, + interval: int = 5, +) -> tuple[OpenstackServer, str] | tuple[None, None]: + """Wait for an OpenStack runner to be created and return it with its IP.""" + start_time = time.time() + while time.time() - start_time < timeout: + servers = [ + server + for server in openstack_connection.list_servers() + if server.name.startswith(test_config.vm_prefix) + ] + if servers: + runner = servers[0] + logger.info("Found runner: %s", runner.name) + + ip = None + for network_addresses in runner.addresses.values(): + for address in network_addresses: + ip = address["addr"] + break + if ip: + break + + if ip: + return runner, ip + + time.sleep(interval) + + return None, None + + +def wait_for_no_runners( + openstack_connection: openstack.connection.Connection, + test_config: TestConfig, + timeout: int = 900, + interval: int = 15, +) -> bool: + """Wait until no VMs with the test prefix exist on OpenStack.""" + start = time.time() + while time.time() - start < timeout: + servers = [ + server + for server in openstack_connection.list_servers() + if server.name.startswith(test_config.vm_prefix) + ] + if not servers: + return True + time.sleep(interval) + return False + + +def wait_for_ssh( + runner_ip: str, + port: int = 22, + timeout: int = 120, + interval: int = 2, + connect_timeout: int = 5, +) -> bool: + """Wait for SSH port to become available on the runner.""" + start_time = time.time() + while time.time() - start_time < timeout: + try: + with socket.create_connection((runner_ip, port), timeout=connect_timeout): + logger.info("SSH port %d is now available on %s", port, runner_ip) + return True + except (socket.timeout, socket.error, OSError): + time.sleep(interval) + + logger.error("SSH port %d never became available on %s", port, runner_ip) + return False + + +def wait_for_runner_online( + github_repository: Repository, + runner_name: str, + timeout: int = 10 * 60, + interval: int = 30, +) -> None: + """Wait for a runner to register as online on GitHub. + + The runner VM may exist in OpenStack but cloud-init and runner registration + take additional time. This ensures the runner is fully initialized before + the test proceeds. + """ + deadline = time.time() + timeout + while time.time() < deadline: + for runner in github_repository.get_self_hosted_runners(): + if runner.name == runner_name and runner.status == "online": + logger.info("Runner %s is online on GitHub", runner_name) + return + time.sleep(interval) + raise TimeoutError(f"Timeout waiting for runner {runner_name} to register online on GitHub") + + +def resolve_runner_ssh_key_path( + runner: OpenstackServer, +) -> Path: + """Resolve the local SSH private key path for an OpenStack runner.""" + return Path.home() / ".ssh" / f"{runner.name}.key" diff --git a/github-runner-manager/tests/integration/planner_stub.py b/github-runner-manager/tests/integration/planner_stub.py index 013a0c9db9..ccccd6f712 100644 --- a/github-runner-manager/tests/integration/planner_stub.py +++ b/github-runner-manager/tests/integration/planner_stub.py @@ -13,6 +13,7 @@ import json import logging +import socket import threading import time from dataclasses import dataclass @@ -30,7 +31,6 @@ class PlannerStubConfig: """Configuration for the planner stub server.""" host: str = "127.0.0.1" - port: int = 8081 token: str = "stub-token" flavor_name: str = "small" initial_pressure: int = 1 @@ -94,13 +94,13 @@ def __init__(self, config: PlannerStubConfig | None = None) -> None: """Initialize the planner stub manager. Args: - config: Optional configuration for host, port, token, flavor name, - and initial pressure. If not provided, defaults from - `PlannerStubConfig` are used. + config: Optional configuration for host, token, flavor name, and + initial pressure. If not provided, defaults from + ``PlannerStubConfig`` are used. """ self._config = config or PlannerStubConfig() self._thread: threading.Thread | None = None - self._port = self._config.port + self._port = 0 @property def base_url(self) -> str: @@ -114,6 +114,10 @@ def token(self) -> str: def start(self) -> None: """Start the planner stub server in a daemon thread.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind((self._config.host, 0)) + self._port = int(sock.getsockname()[1]) + state = {"pressure": self._config.initial_pressure} app = _make_app(self._config, state) self._thread = threading.Thread( diff --git a/github-runner-manager/tests/integration/test_debug_ssh.py b/github-runner-manager/tests/integration/test_debug_ssh.py index 5851d8cfd5..07fcea3e37 100644 --- a/github-runner-manager/tests/integration/test_debug_ssh.py +++ b/github-runner-manager/tests/integration/test_debug_ssh.py @@ -4,9 +4,7 @@ """Integration tests for tmate ssh connection.""" import logging -import socket import subprocess -import time from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path @@ -24,7 +22,6 @@ from openstack.compute.v2.server import Server as OpenstackServer from .application import RunningApplication -from .conftest import wait_for_runner from .factories import ( GitHubConfig, OpenStackConfig, @@ -39,6 +36,7 @@ get_workflow_dispatch_run, wait_for_workflow_completion, ) +from .openstack_helpers import resolve_runner_ssh_key_path, wait_for_runner, wait_for_ssh from .planner_stub import PlannerStub logger = logging.getLogger(__name__) @@ -79,31 +77,6 @@ class TmateServer: ed25519_fingerprint: str -def wait_for_ssh(runner_ip: str, port: int = 22, timeout: int = 120, interval: int = 2) -> bool: - """Wait for SSH port to become available on the runner. - - Args: - runner_ip: IP address of the runner. - port: SSH port to check (default: 22). - timeout: Maximum time to wait in seconds. - interval: Time between connection attempts in seconds. - - Returns: - True if SSH is available, False if timeout reached. - """ - start_time = time.time() - while time.time() - start_time < timeout: - try: - with socket.create_connection((runner_ip, port), timeout=5): - logger.info("SSH port %d is now available on %s", port, runner_ip) - return True - except (socket.timeout, socket.error, OSError): - time.sleep(interval) - - logger.error("SSH port %d never became available on %s", port, runner_ip) - return False - - def setup_reverse_ssh_tunnel( runner: OpenstackServer, runner_ip: str, @@ -119,8 +92,7 @@ def setup_reverse_ssh_tunnel( Returns: True if tunnel and DNAT were successfully established, False otherwise. """ - key_name = runner.name - key_path = Path.home() / ".ssh" / f"{key_name}.key" + key_path = resolve_runner_ssh_key_path(runner) logger.info("Waiting for SSH on runner %s at %s...", runner.name, runner_ip) if not wait_for_ssh(runner_ip): @@ -434,9 +406,9 @@ def test_tmate_ssh_connection( ): """Test that a tmate SSH connection can be established via the runner manager. - Arrange: Application configured with tmate SSH server connection details. - Act: Dispatch workflow that connects to the tmate SSH server. - Assert: Workflow completes successfully and logs contain server connection details. + arrange: application is configured with tmate SSH server connection details. + act: dispatch workflow that connects to the tmate SSH server. + assert: workflow completes successfully and logs contain server connection details. Args: test_config: Test-specific configuration for unique identification. diff --git a/github-runner-manager/tests/integration/test_metrics.py b/github-runner-manager/tests/integration/test_metrics.py new file mode 100644 index 0000000000..edbd2864e3 --- /dev/null +++ b/github-runner-manager/tests/integration/test_metrics.py @@ -0,0 +1,303 @@ +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Application-level integration tests for runner metrics events.""" + +import subprocess +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import Iterator + +import openstack +import pytest +import yaml +from github.Branch import Branch +from github.Repository import Repository +from github.WorkflowRun import WorkflowRun +from openstack.compute.v2.server import Server as OpenstackServer + +from github_runner_manager.manager.vm_manager import PostJobStatus + +from .application import RunningApplication +from .factories import ( + GitHubConfig, + OpenStackConfig, + ProxyConfig, + TestConfig, + create_default_config, +) +from .github_helpers import ( + dispatch_workflow, + get_workflow_dispatch_run, + wait_for_workflow_completion, +) +from .metrics_helpers import ( + assert_events_after_reconciliation, + wait_for_events, + wait_for_runner_to_be_marked_offline, +) +from .openstack_helpers import ( + resolve_runner_ssh_key_path, + wait_for_no_runners, + wait_for_runner, + wait_for_runner_online, + wait_for_ssh, +) +from .planner_stub import PlannerStub, PlannerStubConfig + +DISPATCH_TEST_WORKFLOW_FILENAME = "workflow_dispatch_test.yaml" +DISPATCH_CRASH_TEST_WORKFLOW_FILENAME = "workflow_dispatch_crash_test.yaml" +RUNNER_CRASH_WAIT_SECONDS = 10 +SSH_CONNECT_TIMEOUT_SECONDS = 10 +SSH_READY_TIMEOUT_SECONDS = 3 * 60 +SSH_RETRY_INTERVAL_SECONDS = 5 + + +@pytest.fixture +def metrics_planner_stub(test_config: TestConfig) -> Iterator[PlannerStub]: + """Start a planner stub compatible with this module's flavor name.""" + stub = PlannerStub(PlannerStubConfig(initial_pressure=1, flavor_name=test_config.runner_name)) + stub.start() + try: + yield stub + finally: + stub.stop() + + +@pytest.fixture +def planner_app_with_metrics( + tmp_test_dir: Path, + github_config: GitHubConfig, + openstack_config: OpenStackConfig, + openstack_connection: openstack.connection.Connection, + test_config: TestConfig, + proxy_config: ProxyConfig | None, + metrics_planner_stub: PlannerStub, +) -> Iterator[tuple[RunningApplication, PlannerStub, Path]]: + """Start app + planner stub for metrics tests with metrics log persisted to disk.""" + run_id = time.time_ns() + metrics_planner_stub.set_pressure(1) + + config = create_default_config( + github_config=github_config, + openstack_config=openstack_config, + proxy_config=proxy_config, + test_config=test_config, + planner_url=metrics_planner_stub.base_url, + planner_token=metrics_planner_stub.token, + reconcile_interval=1, + base_virtual_machines=0, + ) + + config_path = tmp_test_dir / f"metrics-config-{run_id}.yaml" + config_path.write_text(yaml.dump(config), encoding="utf-8") + metrics_log_path = tmp_test_dir / f"github-runner-metrics-{run_id}.log" + log_file_path = test_config.debug_log_dir / f"metrics-app-{test_config.test_id}-{run_id}.log" + app = RunningApplication.create( + config_file_path=config_path, + metrics_log_path=metrics_log_path, + log_file_path=log_file_path, + ) + + try: + yield app, metrics_planner_stub, metrics_log_path + finally: + metrics_planner_stub.set_pressure(0) + wait_for_no_runners(openstack_connection, test_config, timeout=15 * 60) + app.stop() + + +def test_runner_installed_metric( + planner_app_with_metrics: tuple[RunningApplication, PlannerStub, Path], + openstack_connection: openstack.connection.Connection, + test_config: TestConfig, + github_repository: Repository, +) -> None: + """ + arrange: planner-driven app is running with pressure=1. + act: + 1. wait for runner creation and GitHub registration. + 2. set pressure to 0 and wait for cleanup. + 3. read metrics events. + assert: `runner_installed` event is present with expected flavor and duration. + """ + _, stub, metrics_log_path = planner_app_with_metrics + + runner, _ = wait_for_runner(openstack_connection, test_config, timeout=10 * 60) + assert runner is not None, "Runner did not appear within timeout" + wait_for_runner_online(github_repository, runner.name) + + stub.set_pressure(0) + cleaned = wait_for_no_runners(openstack_connection, test_config, timeout=15 * 60) + assert cleaned, "Runner was not cleaned up after setting pressure to 0" + + events = wait_for_events(metrics_log_path, {"runner_installed"}, timeout=5 * 60) + runner_installed_events = [ + event for event in events if event.get("event") == "runner_installed" + ] + assert runner_installed_events, "runner_installed event has not been logged" + for metric in runner_installed_events: + assert metric.get("flavor") == test_config.runner_name + duration = metric.get("duration") + assert isinstance(duration, (int, float)) + assert duration >= 0 + + +def test_metrics_after_workflow_completion( + planner_app_with_metrics: tuple[RunningApplication, PlannerStub, Path], + openstack_connection: openstack.connection.Connection, + test_config: TestConfig, + github_repository: Repository, + github_branch: Branch, +) -> None: + """ + arrange: planner-driven app is running with one runner. + act: + 1. dispatch success workflow and wait for completion. + 2. scale pressure down to 0 and wait for cleanup. + 3. read metrics events. + assert: runner_start, runner_stop and reconciliation metrics are logged as normal. + """ + _, stub, metrics_log_path = planner_app_with_metrics + + runner, _ = wait_for_runner(openstack_connection, test_config, timeout=10 * 60) + assert runner is not None, "Runner did not appear within timeout" + wait_for_runner_online(github_repository, runner.name) + + dispatch_time = datetime.now(timezone.utc) + workflow = dispatch_workflow( + repository=github_repository, + workflow_filename=DISPATCH_TEST_WORKFLOW_FILENAME, + ref=github_branch, + inputs={"runner": test_config.labels[0]}, + ) + workflow_run = get_workflow_dispatch_run( + workflow=workflow, ref=github_branch, dispatch_time=dispatch_time + ) + assert _wait_for_workflow_status( + workflow_run, "in_progress", acceptable_terminal_statuses=("completed",) + ), "Workflow never started running" + assert wait_for_workflow_completion( + workflow_run, timeout=20 * 60 + ), "Workflow did not complete or timed out." + assert workflow_run.conclusion == "success" + + stub.set_pressure(0) + cleaned = wait_for_no_runners(openstack_connection, test_config, timeout=15 * 60) + assert cleaned, "Runner was not cleaned up after setting pressure to 0" + + events = wait_for_events(metrics_log_path, {"runner_start", "runner_stop", "reconciliation"}) + assert_events_after_reconciliation( + events=events, + flavor=test_config.runner_name, + github_repository=github_repository, + post_job_status=PostJobStatus.NORMAL, + ) + + +def test_metrics_for_abnormal_termination( + planner_app_with_metrics: tuple[RunningApplication, PlannerStub, Path], + openstack_connection: openstack.connection.Connection, + test_config: TestConfig, + github_repository: Repository, + github_branch: Branch, +) -> None: + """ + arrange: planner-driven app is running with one runner. + act: + 1. dispatch crash workflow and wait for it to start. + 2. terminate run.sh in the runner VM and cancel the workflow. + 3. scale pressure down to 0 and wait for cleanup. + 4. read metrics events. + assert: runner_stop and reconciliation metrics reflect abnormal termination. + """ + _, stub, metrics_log_path = planner_app_with_metrics + + runner, runner_ip = wait_for_runner(openstack_connection, test_config, timeout=10 * 60) + assert runner is not None and runner_ip, "Runner did not appear within timeout" + wait_for_runner_online(github_repository, runner.name) + + dispatch_time = datetime.now(timezone.utc) + workflow = dispatch_workflow( + repository=github_repository, + workflow_filename=DISPATCH_CRASH_TEST_WORKFLOW_FILENAME, + ref=github_branch, + inputs={"runner": test_config.labels[0]}, + ) + workflow_run = get_workflow_dispatch_run( + workflow=workflow, ref=github_branch, dispatch_time=dispatch_time + ) + assert _wait_for_workflow_status(workflow_run, "in_progress"), "Workflow never started running" + + # Let the runner fully enter job execution before terminating run.sh. + time.sleep(RUNNER_CRASH_WAIT_SECONDS) + _kill_run_script(runner, runner_ip) + workflow_run.cancel() + wait_for_runner_to_be_marked_offline(github_repository, runner.name, timeout=20 * 60) + + stub.set_pressure(0) + cleaned = wait_for_no_runners(openstack_connection, test_config, timeout=15 * 60) + assert cleaned, "Runner was not cleaned up after setting pressure to 0" + + events = wait_for_events(metrics_log_path, {"runner_start", "runner_stop", "reconciliation"}) + assert_events_after_reconciliation( + events=events, + flavor=test_config.runner_name, + github_repository=github_repository, + post_job_status=PostJobStatus.ABNORMAL, + ) + + +def _wait_for_workflow_status( + workflow_run: WorkflowRun, + status: str, + acceptable_terminal_statuses: tuple[str, ...] = (), + timeout: int = 15 * 60, + interval: int = 10, +) -> bool: + """Wait for a workflow run to reach the target status.""" + deadline = time.time() + timeout + while time.time() < deadline: + workflow_run.update() + if workflow_run.status == status or workflow_run.status in acceptable_terminal_statuses: + return True + time.sleep(interval) + return False + + +def _kill_run_script(runner: OpenstackServer, runner_ip: str) -> None: + """Kill actions-runner run.sh inside a runner VM.""" + assert wait_for_ssh( + runner_ip, + timeout=SSH_READY_TIMEOUT_SECONDS, + interval=SSH_RETRY_INTERVAL_SECONDS, + connect_timeout=SSH_CONNECT_TIMEOUT_SECONDS, + ), f"SSH did not become reachable on runner {runner.name}" + key_path = resolve_runner_ssh_key_path(runner) + command = [ + "/usr/bin/ssh", + "-i", + str(key_path), + "-o", + "BatchMode=yes", + "-o", + "ConnectTimeout=10", + "-o", + "StrictHostKeyChecking=no", + "-o", + "UserKnownHostsFile=/dev/null", + f"ubuntu@{runner_ip}", + "pkill -9 run.sh", + ] + result = subprocess.run( + command, + check=False, + capture_output=True, + text=True, + timeout=SSH_CONNECT_TIMEOUT_SECONDS + 5, + ) + assert result.returncode == 0, ( + f"Failed to kill run.sh (exit code {result.returncode}). " + f"stdout={result.stdout!r} stderr={result.stderr!r}" + ) diff --git a/github-runner-manager/tests/integration/test_planner_runner.py b/github-runner-manager/tests/integration/test_planner_runner.py index 678ffa26b2..af26ff2fa9 100644 --- a/github-runner-manager/tests/integration/test_planner_runner.py +++ b/github-runner-manager/tests/integration/test_planner_runner.py @@ -15,7 +15,6 @@ import yaml from .application import RunningApplication -from .conftest import wait_for_no_runners, wait_for_runner from .factories import ( GitHubConfig, OpenStackConfig, @@ -23,6 +22,7 @@ TestConfig, create_default_config, ) +from .openstack_helpers import wait_for_no_runners, wait_for_runner from .planner_stub import PlannerStub, PlannerStubConfig logger = logging.getLogger(__name__) @@ -50,13 +50,9 @@ def planner_app( test_config=test_config, planner_url=stub.base_url, planner_token=stub.token, + reconcile_interval=5, + base_virtual_machines=0, ) - # Fire the reconcile loop every 5 minutes so scale-down is visible within the - # 15-minute test timeout. The default factory value (60) waits 60 minutes. - config["reconcile_interval"] = 5 - # Set base_virtual_machines to 0 so min_pressure doesn't override planner pressure. - # Without this, min_pressure=1 acts as a floor and prevents scaling down to zero. - config["non_reactive_configuration"]["combinations"][0]["base_virtual_machines"] = 0 config_path = tmp_test_dir / "config.yaml" config_path.write_text(yaml.dump(config), encoding="utf-8") log_file_path = test_config.debug_log_dir / f"app-{test_config.test_id}.log" @@ -79,7 +75,7 @@ def test_planner_pressure_spawns_and_cleans_runner( 1. wait for a runner VM to appear on OpenStack. 2. set planner pressure to 0. 3. wait for the runner VM to disappear from OpenStack. - Assert: runner lifecycle is driven entirely by planner pressure. + assert: runner lifecycle is driven entirely by planner pressure. """ app, stub = planner_app diff --git a/github-runner-manager/tests/unit/metrics/test_events.py b/github-runner-manager/tests/unit/metrics/test_events.py index 6f84875776..3ab2d83383 100644 --- a/github-runner-manager/tests/unit/metrics/test_events.py +++ b/github-runner-manager/tests/unit/metrics/test_events.py @@ -13,9 +13,7 @@ @pytest.fixture(autouse=True, name="patch_metrics_path") def patch_metrics_path_fixture(monkeypatch: pytest.MonkeyPatch, tmp_path: Path): """Patch the hardcoded metrics log path.""" - monkeypatch.setattr( - "github_runner_manager.metrics.events.METRICS_LOG_PATH", Path(tmp_path / "metrics.log") - ) + monkeypatch.setenv("METRICS_LOG_PATH", str(tmp_path / "metrics.log")) def test_issue_events_logs_events(tmp_path: Path): @@ -28,7 +26,7 @@ def test_issue_events_logs_events(tmp_path: Path): events.issue_event(event) - assert json.loads(events.METRICS_LOG_PATH.read_text()) == { + assert json.loads(events.get_metrics_log_path().read_text()) == { "event": "runner_installed", "timestamp": 123, "flavor": "small", @@ -55,7 +53,7 @@ def test_issue_events_exclude_none_values(tmp_path: Path): events.issue_event(event) - assert json.loads(events.METRICS_LOG_PATH.read_text()) == { + assert json.loads(events.get_metrics_log_path().read_text()) == { "event": "runner_stop", "timestamp": 123, "flavor": "small", diff --git a/src/charm.py b/src/charm.py index 257587be32..427884ccef 100755 --- a/src/charm.py +++ b/src/charm.py @@ -26,7 +26,7 @@ from charms.grafana_agent.v0.cos_agent import COSAgentProvider from charms.operator_libs_linux.v1 import systemd from github_runner_manager import constants -from github_runner_manager.metrics.events import METRICS_LOG_PATH +from github_runner_manager.metrics.events import get_metrics_log_path from github_runner_manager.platform.platform_provider import TokenError from github_runner_manager.utilities import set_env_var from ops.charm import ( @@ -636,9 +636,9 @@ def _setup_runner_manager_user() -> None: # For charm upgrade, previous revision root owns the metric logs, this is changed to runner # manager. - if METRICS_LOG_PATH.exists(): + if (metrics_log_path := get_metrics_log_path()).exists(): shutil.chown( - METRICS_LOG_PATH, + metrics_log_path, user=constants.RUNNER_MANAGER_USER, group=constants.RUNNER_MANAGER_GROUP, ) diff --git a/src/logrotate.py b/src/logrotate.py index a5bb3e6b42..bc7cc53c1d 100644 --- a/src/logrotate.py +++ b/src/logrotate.py @@ -7,7 +7,7 @@ from pathlib import Path from charms.operator_libs_linux.v1 import systemd -from github_runner_manager.metrics.events import METRICS_LOG_PATH +from github_runner_manager.metrics.events import get_metrics_log_path from github_runner_manager.reactive.process_manager import REACTIVE_RUNNER_LOG_DIR from pydantic import BaseModel @@ -62,7 +62,7 @@ class LogrotateConfig(BaseModel): # metrics to Loki twice, which may happen if there is a corrupt log scrape configuration. METRICS_LOGROTATE_CONFIG = LogrotateConfig( name="github-runner-metrics", - log_path_glob_pattern=str(METRICS_LOG_PATH), + log_path_glob_pattern=str(get_metrics_log_path()), rotate=0, create=True, ) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index f17172b6e0..134b389ed0 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -45,11 +45,9 @@ USE_APROXY_CONFIG_NAME, ) from tests.integration.helpers.common import ( - MONGODB_APP_NAME, deploy_github_runner_charm, get_github_runner_manager_service_log, get_github_runner_metrics_log, - get_github_runner_reactive_log, wait_for, wait_for_runner_ready, ) @@ -655,8 +653,6 @@ async def app_openstack_runner_fixture( try: app_log = await get_github_runner_manager_service_log(unit=application.units[0]) logging.info("Application log: \n%s", app_log) - reactive_log = await get_github_runner_reactive_log(unit=application.units[0]) - logging.info("Reactive log: \n%s", reactive_log) metrics_log = await get_github_runner_metrics_log(unit=application.units[0]) logging.info("Metrics log: \n%s", metrics_log) except AssertionError: @@ -874,45 +870,6 @@ def get_branch(): branch_ref.delete() -@pytest_asyncio.fixture(scope="module", name="app_for_metric") -async def app_for_metric_fixture( - basic_app: Application, -) -> AsyncIterator[Application]: - yield basic_app - - -@pytest_asyncio.fixture(scope="module", name="mongodb") -async def mongodb_fixture(model: Model, existing_app_suffix: str | None) -> Application: - """Deploy MongoDB.""" - if not existing_app_suffix: - mongodb = await model.deploy( - MONGODB_APP_NAME, - channel="6/edge", - # 2025-11-26: Set deployment type to virtual-machine due to bug with snapd. See: - # https://github.com/canonical/snapd/pull/16131 - constraints={"virt-type": "virtual-machine"}, - ) - else: - mongodb = model.applications["mongodb"] - return mongodb - - -@pytest_asyncio.fixture(scope="module", name="app_for_reactive") -async def app_for_reactive_fixture( - model: Model, - mongodb: Application, - app_openstack_runner: Application, - existing_app_suffix: Optional[str], -) -> Application: - """Application for testing reactive.""" - if not existing_app_suffix: - await model.relate(f"{app_openstack_runner.name}:mongodb", f"{mongodb.name}:database") - - await model.wait_for_idle(apps=[app_openstack_runner.name, mongodb.name], status=ACTIVE) - - return app_openstack_runner - - @pytest_asyncio.fixture(scope="module", name="basic_app") async def basic_app_fixture(request: pytest.FixtureRequest) -> Application: """Setup the charm with the basic configuration.""" diff --git a/tests/integration/helpers/charm_metrics.py b/tests/integration/helpers/charm_metrics.py deleted file mode 100644 index 0ac47b1dcf..0000000000 --- a/tests/integration/helpers/charm_metrics.py +++ /dev/null @@ -1,232 +0,0 @@ -# Copyright 2026 Canonical Ltd. -# See LICENSE file for licensing details. - -"""Utilities for charm metrics integration tests.""" - -import datetime -import json -import logging -from time import sleep - -import github -from github.Branch import Branch -from github.GithubException import GithubException -from github.Repository import Repository -from github.Workflow import Workflow -from github.WorkflowJob import WorkflowJob -from github_runner_manager.manager.vm_manager import PostJobStatus -from github_runner_manager.metrics.events import METRICS_LOG_PATH -from github_runner_manager.types_.github import JobConclusion -from juju.application import Application -from juju.unit import Unit - -from tests.integration.helpers.common import get_file_content, run_in_unit, wait_for -from tests.integration.helpers.openstack import OpenStackInstanceHelper - -logger = logging.getLogger(__name__) - -TEST_WORKFLOW_NAMES = [ - "Workflow Dispatch Tests", - "Workflow Dispatch Crash Tests", - "Workflow Dispatch Failure Tests 2a34f8b1-41e4-4bcb-9bbf-7a74e6c482f7", -] - - -async def wait_for_workflow_to_start( - unit: Unit, - workflow: Workflow, - instance_helper: OpenStackInstanceHelper, - branch: Branch | None = None, - started_time: float | None = None, - timeout: int = 20 * 60, -): - """Wait for the workflow to start. - - Args: - unit: The unit which contains the runner. - workflow: The workflow to wait for. - instance_helper: The instance helper to get the runner name. - branch: The branch where the workflow belongs to. - started_time: The time in seconds since epoch the job was started. - timeout: Timeout in seconds to wait for the workflow to start. - - Raises: - TimeoutError: If the workflow didn't start for specified time period. - """ - runner_name = await instance_helper.get_runner_name(unit) - created_at = ( - None - if not started_time - # convert to integer since GH API takes up to seconds. - else f">={datetime.datetime.fromtimestamp(int(started_time), tz=datetime.timezone.utc).isoformat()}" - ) - - def is_runner_log(): - """Return whether a log for given runner exists. - - Returns: - Whether the log exists. - """ - for run in workflow.get_runs( - branch=branch or github.GithubObject.NotSet, - created=created_at or github.GithubObject.NotSet, - ): - jobs = run.jobs() - if not jobs: - return False - try: - job: WorkflowJob = jobs[0] - if runner_name == job.runner_name: - return True - except GithubException as exc: - logger.warning("Github error, %s", exc) - return False - - try: - await wait_for(is_runner_log, timeout=timeout, check_interval=30) - except TimeoutError as exc: - raise TimeoutError("Timeout while waiting for the workflow to start") from exc - - -async def clear_metrics_log(unit: Unit) -> None: - """Clear the metrics log on the unit. - - Args: - unit: The unit to clear the metrics log on. - """ - retcode, _, stderr = await run_in_unit( - unit=unit, - command=f"if [ -f {METRICS_LOG_PATH} ]; then rm {METRICS_LOG_PATH}; fi", - ) - assert retcode == 0, f"Failed to clear metrics log, {stderr}" - - -async def get_metrics_log(unit: Unit) -> str: - """Retrieve the metrics log from the unit. - - Args: - unit: The unit to retrieve the metrics log from. - - Returns: - The metrics log. - """ - return await get_file_content(unit=unit, filepath=METRICS_LOG_PATH) - - -async def cancel_workflow_run( - unit: Unit, - workflow: Workflow, - instance_helper: OpenStackInstanceHelper, - branch: Branch | None = None, -): - """Cancel the workflow run. - - Args: - unit: The unit which contains the runner. - workflow: The workflow to cancel the workflow run for. - instance_helper: The instance helper to get the runner name. - branch: The branch where the workflow belongs to. - """ - runner_name = await instance_helper.get_runner_name(unit) - - for run in workflow.get_runs(branch=branch or github.GithubObject.NotSet): - jobs = run.jobs() - if not jobs: - continue - try: - job: WorkflowJob = jobs[0] - except GithubException as exc: - if exc.status == 410: - logger.warning("Transient github error, %s", exc) - continue - logger.warning("Github error, %s", exc) - if runner_name == job.runner_name: - run.cancel() - - -async def assert_events_after_reconciliation( - app: Application, - github_repository: Repository, - post_job_status: PostJobStatus, - reactive_mode: bool = False, -): - """Assert that the RunnerStart, RunnerStop and Reconciliation metric is logged. - - Args: - app: The charm to assert the events for. - github_repository: The github repository to assert the events for. - post_job_status: The expected post job status of the reconciliation event. - reactive_mode: Whether the charm manages reactive runners, - this changes the expected events. - """ - unit = app.units[0] - - metrics_log = await get_metrics_log(unit=unit) - log_lines = list(map(lambda line: json.loads(line), metrics_log.splitlines())) - events = set(map(lambda line: line.get("event"), log_lines)) - assert { - "runner_start", - "runner_stop", - "reconciliation", - } <= events, "Not all events were logged" - for metric_log in log_lines: - if metric_log.get("event") == "runner_start": - assert metric_log.get("flavor") == app.name - assert metric_log.get("workflow") in TEST_WORKFLOW_NAMES - assert metric_log.get("repo") == github_repository.full_name - assert metric_log.get("github_event") == "workflow_dispatch" - assert metric_log.get("idle") >= 0 - assert metric_log.get("queue_duration") >= 0 - if metric_log.get("event") == "runner_stop": - assert metric_log.get("flavor") == app.name - assert metric_log.get("workflow") in TEST_WORKFLOW_NAMES - assert metric_log.get("repo") == github_repository.full_name - assert metric_log.get("github_event") == "workflow_dispatch" - assert metric_log.get("status") == post_job_status - if post_job_status == PostJobStatus.ABNORMAL: - assert metric_log.get("status_info", {}).get("code", 0) != 0 - # Either the job conclusion is not yet set or it is set to cancelled. - assert metric_log.get("job_conclusion") in [ - None, - JobConclusion.CANCELLED, - ] - else: - assert "status_info" not in metric_log - assert metric_log.get("job_conclusion") == JobConclusion.SUCCESS - assert metric_log.get("job_duration") >= 0 - if metric_log.get("event") == "reconciliation": - assert metric_log.get("flavor") == app.name - assert metric_log.get("duration") >= 0 - assert metric_log.get("crashed_runners") == 0 - assert metric_log.get("idle_runners") >= 0 - assert metric_log.get("active_runners") >= 0 - if not reactive_mode: - assert metric_log.get("expected_runners") >= 0 - else: - assert metric_log.get("expected_runners") == 0 - - -async def wait_for_runner_to_be_marked_offline( - forked_github_repository: Repository, runner_name: str -): - """Wait for the runner to be marked offline or to be non-existent. - - Args: - forked_github_repository: The github repository to wait for the runner - to be marked offline. - runner_name: The runner name to wait for. - """ - for _ in range(30): - for runner in forked_github_repository.get_self_hosted_runners(): - if runner.name == runner_name: - logging.info("Runner %s status: %s", runner.name, runner.status) - if runner.status == "online": - logging.info( - "Runner still marked as online, waiting for it to be marked offline" - ) - sleep(60) - break - else: - break - else: - assert False, "Timeout while waiting for runner to be marked offline" diff --git a/tests/integration/helpers/common.py b/tests/integration/helpers/common.py index 77536accc2..5ff52ca98c 100644 --- a/tests/integration/helpers/common.py +++ b/tests/integration/helpers/common.py @@ -20,8 +20,7 @@ from github.Workflow import Workflow from github.WorkflowJob import WorkflowJob from github.WorkflowRun import WorkflowRun -from github_runner_manager.metrics.events import METRICS_LOG_PATH -from github_runner_manager.reactive.process_manager import REACTIVE_RUNNER_LOG_DIR +from github_runner_manager.metrics.events import get_metrics_log_path from juju.action import Action from juju.application import Application from juju.model import Model @@ -45,7 +44,6 @@ DISPATCH_E2E_TEST_RUN_WORKFLOW_FILENAME = "e2e_test_run.yaml" DISPATCH_E2E_TEST_RUN_OPENSTACK_WORKFLOW_FILENAME = "e2e_test_run_openstack.yaml" -MONGODB_APP_NAME = "mongodb" # 2025-11-26: Set deployment type to virtual-machine due to bug with snapd. See: # https://github.com/canonical/snapd/pull/16131 DEFAULT_RUNNER_CONSTRAINTS = {"root-disk": 20 * 1024, "virt-type": "virtual-machine"} @@ -87,50 +85,6 @@ async def run_in_unit( return code, stdout, stderr -async def get_reconcile_id(unit: Unit) -> str: - """Get reconcile UUID of the unit. - - This is to distinguish whether reconcile has happened in a unit. - - Args: - unit: The unit. - - Returns: - The UUID. - """ - _, stdout, _ = await run_in_unit( - unit, - "cat /home/runner-manager/reconcile.id", - assert_on_failure=True, - assert_msg="Unable to get reconcile ID", - ) - logger.info("Current reconcile ID: %s", stdout) - assert stdout is not None, "Got empty reconcile ID, this should be impossible" - return stdout - - -async def wait_for_reconcile(app: Application) -> None: - """Wait until a reconcile has happened. - - Uses the first unit found in the application. - - Args: - app: The GitHub Runner Charm application. - """ - # Wait the application is actively reconciling. Avoid waiting for image, etc. - await app.model.wait_for_idle(apps=[app.name], status=ACTIVE) - - unit = app.units[0] - base_id = await get_reconcile_id(unit) - for _ in range(10): - logger.info("Waiting for reconcile ID: %s", base_id) - await sleep(60) - current_id = await get_reconcile_id(unit) - logger.info("Current reconcile ID: %s, expected reconcile ID: %s", current_id, base_id) - if base_id != current_id: - return - - async def wait_for_runner_ready(app: Application) -> None: """Wait until a runner is ready. @@ -139,8 +93,6 @@ async def wait_for_runner_ready(app: Application) -> None: Args: app: The GitHub Runner Charm application. """ - await wait_for_reconcile(app) - # Wait for 10 minutes for the runner to come online. for _ in range(20): action = await app.units[0].run_action("check-runners") @@ -507,27 +459,6 @@ async def get_github_runner_manager_service_log(unit: Unit) -> str: return stdout -async def get_github_runner_reactive_log(unit: Unit) -> str: - """Get the logs of github-runner-manager reactive processes. - - Args: - unit: The unit to get the logs from. - - Returns: - Reactive process logs. - """ - log_file_path = REACTIVE_RUNNER_LOG_DIR / "*.log" - _, stdout, stderr = await run_in_unit( - unit, - f"cat {log_file_path}", - timeout=60, - assert_on_failure=False, - assert_msg="Failed to get the GitHub runner manager reactive logs", - ) - - return stdout or stderr or "Empty reactive log" - - async def get_github_runner_metrics_log(unit: Unit) -> str: """Get the github-runner-manager metric logs. @@ -537,7 +468,7 @@ async def get_github_runner_metrics_log(unit: Unit) -> str: Returns: Runner metrics logs. """ - log_file_path = METRICS_LOG_PATH + log_file_path = get_metrics_log_path() _, stdout, stderr = await run_in_unit( unit, f"cat {log_file_path}", diff --git a/tests/integration/requirements.txt b/tests/integration/requirements.txt index b6f5659dea..d5a120a1ce 100644 --- a/tests/integration/requirements.txt +++ b/tests/integration/requirements.txt @@ -1,6 +1,4 @@ GitPython>3,<4 jubilant==1.7.* -kombu==5.* pygithub -pymongo==4.* tenacity==9.1.4 diff --git a/tests/integration/test_charm_metrics_failure.py b/tests/integration/test_charm_metrics_failure.py deleted file mode 100644 index 36005ad8af..0000000000 --- a/tests/integration/test_charm_metrics_failure.py +++ /dev/null @@ -1,109 +0,0 @@ -# Copyright 2026 Canonical Ltd. -# See LICENSE file for licensing details. - -"""Integration tests for metrics/logs assuming Github workflow failures or a runner crash.""" - -import time -from asyncio import sleep -from typing import AsyncIterator - -import jubilant -import pytest -import pytest_asyncio -from github.Branch import Branch -from github.Repository import Repository -from github_runner_manager.manager.vm_manager import PostJobStatus -from juju.application import Application - -from charm_state import BASE_VIRTUAL_MACHINES_CONFIG_NAME, PATH_CONFIG_NAME -from tests.integration.helpers.charm_metrics import ( - assert_events_after_reconciliation, - cancel_workflow_run, - clear_metrics_log, - wait_for_runner_to_be_marked_offline, - wait_for_workflow_to_start, -) -from tests.integration.helpers.common import ( - DISPATCH_CRASH_TEST_WORKFLOW_FILENAME, - wait_for_reconcile, -) -from tests.integration.helpers.openstack import OpenStackInstanceHelper - - -@pytest_asyncio.fixture(scope="function", name="app") -async def app_fixture( - juju: jubilant.Juju, app_for_metric: Application -) -> AsyncIterator[Application]: - """Setup and teardown the charm after each test. - - Clear the metrics log before each test. - """ - unit = app_for_metric.units[0] - await clear_metrics_log(unit) - juju.config(app_for_metric.name, values={BASE_VIRTUAL_MACHINES_CONFIG_NAME: "0"}) - await wait_for_reconcile(app=app_for_metric) - - yield app_for_metric - - -@pytest.mark.openstack -@pytest.mark.asyncio -@pytest.mark.abort_on_fail -async def test_charm_issues_metrics_for_abnormal_termination( - juju: jubilant.Juju, - app: Application, - github_repository: Repository, - test_github_branch: Branch, - instance_helper: OpenStackInstanceHelper, -): - """ - arrange: A properly integrated charm with a runner registered on the fork repo. - act: Dispatch a test workflow and afterwards kill run.sh. After that, reconcile. - assert: The RunnerStart, RunnerStop and Reconciliation metric is logged. - The Reconciliation metric has the post job status set to Abnormal. - """ - juju.config(app.name, values={PATH_CONFIG_NAME: github_repository.full_name}) - juju.config(app.name, values={BASE_VIRTUAL_MACHINES_CONFIG_NAME: "1"}) - await instance_helper.ensure_charm_has_runner(app) - - unit = app.units[0] - - workflow = github_repository.get_workflow( - id_or_file_name=DISPATCH_CRASH_TEST_WORKFLOW_FILENAME - ) - dispatch_time = time.time() - assert workflow.create_dispatch(test_github_branch, {"runner": app.name}) - - await wait_for_workflow_to_start( - unit, - workflow, - branch=test_github_branch, - started_time=dispatch_time, - instance_helper=instance_helper, - ) - - # Wait a bit to ensure pre-job script has been executed. - await sleep(10) - - # Make the runner terminate abnormally by killing run.sh - runner_name = await instance_helper.get_runner_name(unit) - kill_run_sh_cmd = "pkill -9 run.sh" - ret_code, _, stderr = await instance_helper.run_in_instance(unit, kill_run_sh_cmd) - assert ret_code == 0, f"Failed to kill run.sh with code {ret_code}: {stderr}" - - # Cancel workflow and wait that the runner is marked offline - # to avoid errors during reconciliation. - await cancel_workflow_run( - unit, workflow, branch=test_github_branch, instance_helper=instance_helper - ) - await wait_for_runner_to_be_marked_offline(github_repository, runner_name) - - # Set the number of virtual machines to 0 to speedup reconciliation - await app.set_config({BASE_VIRTUAL_MACHINES_CONFIG_NAME: "0"}) - await wait_for_reconcile(app=app) - - await assert_events_after_reconciliation( - app=app, - github_repository=github_repository, - post_job_status=PostJobStatus.ABNORMAL, - ) diff --git a/tests/integration/test_charm_metrics_success.py b/tests/integration/test_charm_metrics_success.py deleted file mode 100644 index 98a2efa908..0000000000 --- a/tests/integration/test_charm_metrics_success.py +++ /dev/null @@ -1,107 +0,0 @@ -# Copyright 2026 Canonical Ltd. -# See LICENSE file for licensing details. - -"""Integration tests for metrics/logs assuming no Github workflow failures.""" - -import json -from typing import AsyncIterator - -import jubilant -import pytest -import pytest_asyncio -from github.Branch import Branch -from github.Repository import Repository -from github_runner_manager.manager.vm_manager import PostJobStatus -from juju.application import Application - -from charm_state import BASE_VIRTUAL_MACHINES_CONFIG_NAME -from tests.integration.helpers.charm_metrics import ( - assert_events_after_reconciliation, - clear_metrics_log, - get_metrics_log, -) -from tests.integration.helpers.common import ( - DISPATCH_TEST_WORKFLOW_FILENAME, - dispatch_workflow, - wait_for_reconcile, -) -from tests.integration.helpers.openstack import OpenStackInstanceHelper - - -@pytest_asyncio.fixture(scope="function", name="app") -async def app_fixture(app_for_metric: Application) -> AsyncIterator[Application]: - """Setup and teardown the charm after each test. - - Clear the metrics log before each test. - """ - unit = app_for_metric.units[0] - await clear_metrics_log(unit) - - yield app_for_metric - - -@pytest.mark.openstack -@pytest.mark.asyncio -@pytest.mark.abort_on_fail -async def test_charm_issues_runner_installed_metric( - juju: jubilant.Juju, app: Application, instance_helper: OpenStackInstanceHelper -): - """ - arrange: A working charm deployment. - act: Config the charm to contain one runner. - assert: The RunnerInstalled metric is logged. - """ - await instance_helper.ensure_charm_has_runner(app) - - # Set the number of virtual machines to 0 to speedup reconciliation - juju.config(app.name, values={BASE_VIRTUAL_MACHINES_CONFIG_NAME: "0"}) - await wait_for_reconcile(app=app) - - metrics_log = await get_metrics_log(app.units[0]) - log_lines = list(map(lambda line: json.loads(line), metrics_log.splitlines())) - events = set(map(lambda line: line.get("event"), log_lines)) - assert "runner_installed" in events, "runner_installed event has not been logged" - - for metric_log in log_lines: - if metric_log.get("event") == "runner_installed": - assert metric_log.get("flavor") == app.name - assert metric_log.get("event") == "runner_installed" - assert metric_log.get("duration") >= 0 - - -@pytest.mark.openstack -@pytest.mark.asyncio -@pytest.mark.abort_on_fail -async def test_charm_issues_metrics_after_reconciliation( - juju: jubilant.Juju, - app: Application, - github_repository: Repository, - test_github_branch: Branch, - instance_helper: OpenStackInstanceHelper, -): - """ - arrange: A properly integrated charm with a runner registered on the fork repo. - act: Dispatch a workflow on a branch for the runner to run. After completion, reconcile. - assert: The RunnerStart, RunnerStop and Reconciliation metric is logged. - The Reconciliation metric has the post job status set to normal. - """ - await instance_helper.ensure_charm_has_runner(app) - - # Clear metrics log to make reconciliation event more predictable - unit = app.units[0] - await clear_metrics_log(unit) - await dispatch_workflow( - app=app, - branch=test_github_branch, - github_repository=github_repository, - conclusion="success", - workflow_id_or_name=DISPATCH_TEST_WORKFLOW_FILENAME, - ) - - # Set the number of virtual machines to 0 to speedup reconciliation - juju.config(app.name, values={BASE_VIRTUAL_MACHINES_CONFIG_NAME: "0"}) - await wait_for_reconcile(app=app) - - await assert_events_after_reconciliation( - app=app, github_repository=github_repository, post_job_status=PostJobStatus.NORMAL - ) diff --git a/tests/integration/test_charm_no_runner.py b/tests/integration/test_charm_no_runner.py index 87932c318f..28060748cb 100644 --- a/tests/integration/test_charm_no_runner.py +++ b/tests/integration/test_charm_no_runner.py @@ -8,25 +8,10 @@ import jubilant import pytest -from github_runner_manager.reconcile_service import ( - RECONCILE_SERVICE_START_MSG, - RECONCILE_START_MSG, -) from juju.application import Application from juju.model import Model from ops import ActiveStatus -from charm_state import BASE_VIRTUAL_MACHINES_CONFIG_NAME -from manager_service import GITHUB_RUNNER_MANAGER_SERVICE_NAME -from tests.integration.helpers.common import ( - get_github_runner_manager_service_log, - run_in_unit, - wait_for, - wait_for_reconcile, - wait_for_runner_ready, -) -from tests.integration.helpers.openstack import OpenStackInstanceHelper - logger = logging.getLogger(__name__) pytestmark = pytest.mark.openstack @@ -51,96 +36,6 @@ async def test_check_runners_no_runners(app_no_runner: Application) -> None: assert action.results["runners"] == "()" -@pytest.mark.asyncio -@pytest.mark.abort_on_fail -async def test_reconcile_runners( - app_no_runner: Application, - instance_helper: OpenStackInstanceHelper, -) -> None: - """ - arrange: A working application with no runners. - act: - 1. a. Set virtual-machines config to 1. - 2. a. Set virtual-machines config to 0. - assert: - 1. One runner should exist. - 2. No runner should exist. - - The two test is combine to maintain no runners in the application after the - test. - """ - # Rename since the app will have a runner. - app = app_no_runner - - unit = app.units[0] - - # 1. - await app.set_config({BASE_VIRTUAL_MACHINES_CONFIG_NAME: "1"}) - - await wait_for_runner_ready(app=app) - - async def _runners_number(number) -> bool: - """Check if there is the expected number of runners.""" - return len(await instance_helper.get_runner_names(unit)) == number - - await wait_for(lambda: _runners_number(1), timeout=10 * 60, check_interval=10) - - # 2. - await app.set_config({BASE_VIRTUAL_MACHINES_CONFIG_NAME: "0"}) - - await wait_for_reconcile(app=app) - - await wait_for(lambda: _runners_number(0), timeout=10 * 60, check_interval=10) - - -@pytest.mark.asyncio -@pytest.mark.abort_on_fail -async def test_manager_service_started( - app_no_runner: Application, -) -> None: - """ - arrange: A working application with no runners. - act: - 1. Check the github runner manager service. - 2. Force a logrotate - assert: - 1. The service should be running, and logs generated. - 2. New lines of log should be found, the initialize logs should not be found. - """ - app = app_no_runner - unit = app.units[0] - - # 1. - await run_in_unit( - unit, - f"sudo systemctl status {GITHUB_RUNNER_MANAGER_SERVICE_NAME}" - f"@{unit.name.replace('/', '-')}.service", - timeout=60, - assert_on_failure=True, - assert_msg="GitHub runner manager service not healthy", - ) - - log = await get_github_runner_manager_service_log(unit) - assert RECONCILE_SERVICE_START_MSG in log - - # 2. - return_code, _, _ = await run_in_unit( - unit, - "sudo logrotate -f /etc/logrotate.d/github-runner-manager", - timeout=60, - assert_on_failure=True, - assert_msg="Failed to force rotate of logs", - ) - assert return_code == 0 - - # Wait for more log lines. - await wait_for_reconcile(app) - - log = await get_github_runner_manager_service_log(unit) - assert RECONCILE_SERVICE_START_MSG not in log - assert RECONCILE_START_MSG in log - - @pytest.mark.asyncio @pytest.mark.abort_on_fail async def test_planner_integration( diff --git a/tests/integration/test_charm_runner.py b/tests/integration/test_charm_runner.py index 5ec9bf1da2..93ebf94bf3 100644 --- a/tests/integration/test_charm_runner.py +++ b/tests/integration/test_charm_runner.py @@ -19,7 +19,6 @@ dispatch_workflow, get_job_logs, wait_for, - wait_for_reconcile, wait_for_runner_ready, ) from tests.integration.helpers.openstack import OpenStackInstanceHelper @@ -36,7 +35,19 @@ async def app_fixture( yield basic_app await basic_app.set_config({BASE_VIRTUAL_MACHINES_CONFIG_NAME: "0"}) - await wait_for_reconcile(basic_app) + + async def _no_runners() -> bool: + """Check that no runners are active.""" + action: Action = await basic_app.units[0].run_action("check-runners") + await action.wait() + return ( + action.status == "completed" + and action.results["online"] == "0" + and action.results["offline"] == "0" + and action.results["unknown"] == "0" + ) + + await wait_for(_no_runners, timeout=10 * 60, check_interval=10) @pytest.mark.openstack diff --git a/tests/integration/test_prometheus_metrics.py b/tests/integration/test_prometheus_metrics.py index 5f6e0b4396..db54795ad2 100644 --- a/tests/integration/test_prometheus_metrics.py +++ b/tests/integration/test_prometheus_metrics.py @@ -20,7 +20,7 @@ from tests.integration.helpers.common import ( DISPATCH_TEST_WORKFLOW_FILENAME, dispatch_workflow, - wait_for_reconcile, + wait_for, ) from tests.integration.helpers.openstack import OpenStackInstanceHelper @@ -153,7 +153,6 @@ async def test_prometheus_metrics( ) await instance_helper.ensure_charm_has_runner(openstack_app_cos_agent) - await wait_for_reconcile(app=openstack_app_cos_agent) await dispatch_workflow( app=openstack_app_cos_agent, branch=test_github_branch, @@ -163,7 +162,19 @@ async def test_prometheus_metrics( ) # Set the number of virtual machines to 0 to speedup reconciliation await openstack_app_cos_agent.set_config({BASE_VIRTUAL_MACHINES_CONFIG_NAME: "0"}) - await wait_for_reconcile(app=openstack_app_cos_agent) + + async def _no_runners() -> bool: + """Check that no runners are active.""" + action = await openstack_app_cos_agent.units[0].run_action("check-runners") + await action.wait() + return ( + action.status == "completed" + and action.results["online"] == "0" + and action.results["offline"] == "0" + and action.results["unknown"] == "0" + ) + + await wait_for(_no_runners, timeout=10 * 60, check_interval=10) prometheus_ip = prometheus_app.address _patiently_wait_for_prometheus_metrics( diff --git a/tests/integration/test_reactive.py b/tests/integration/test_reactive.py deleted file mode 100644 index 56efcb3e8c..0000000000 --- a/tests/integration/test_reactive.py +++ /dev/null @@ -1,305 +0,0 @@ -# Copyright 2026 Canonical Ltd. -# See LICENSE file for licensing details. - -"""Testing reactive mode. This is only supported for the OpenStack cloud.""" - -import json -from asyncio import sleep -from typing import AsyncIterator - -import pytest -import pytest_asyncio -from github.Branch import Branch -from github.Repository import Repository -from github_runner_manager.manager.vm_manager import PostJobStatus -from github_runner_manager.reactive.consumer import WAIT_TIME_IN_SEC, JobDetails -from juju.application import Application -from pytest_operator.plugin import OpsTest - -from charm_state import BASE_VIRTUAL_MACHINES_CONFIG_NAME, MAX_TOTAL_VIRTUAL_MACHINES_CONFIG_NAME -from tests.integration.helpers.charm_metrics import ( - assert_events_after_reconciliation, - clear_metrics_log, - get_metrics_log, -) -from tests.integration.helpers.common import ( - DISPATCH_CRASH_TEST_WORKFLOW_FILENAME, - DISPATCH_TEST_WORKFLOW_FILENAME, - dispatch_workflow, - wait_for, - wait_for_completion, - wait_for_reconcile, - wait_for_status, -) -from tests.integration.utils_reactive import ( - add_to_queue, - assert_queue_has_size, - assert_queue_is_empty, - clear_queue, - create_job_details, - get_mongodb_uri, - get_queue_size, -) - -pytestmark = pytest.mark.openstack - - -@pytest_asyncio.fixture(name="app") -async def app_fixture( - ops_test: OpsTest, app_for_reactive: Application -) -> AsyncIterator[Application]: - """Setup the reactive charm with 1 virtual machine and tear down afterwards.""" - mongodb_uri = await get_mongodb_uri(ops_test, app_for_reactive) - clear_queue(mongodb_uri, app_for_reactive.name) - assert_queue_is_empty(mongodb_uri, app_for_reactive.name) - - await app_for_reactive.set_config( - { - BASE_VIRTUAL_MACHINES_CONFIG_NAME: "0", - MAX_TOTAL_VIRTUAL_MACHINES_CONFIG_NAME: "1", - } - ) - await wait_for_reconcile(app_for_reactive) - await clear_metrics_log(app_for_reactive.units[0]) - - yield app_for_reactive - - # Call reconcile to enable cleanup of any runner spawned - await app_for_reactive.set_config({MAX_TOTAL_VIRTUAL_MACHINES_CONFIG_NAME: "0"}) - await wait_for_reconcile(app_for_reactive) - - -@pytest.mark.abort_on_fail -async def test_reactive_mode_spawns_runner( - ops_test: OpsTest, - app: Application, - github_repository: Repository, - test_github_branch: Branch, -): - """ - arrange: Place a message in the queue and dispatch a workflow. - act: Call reconcile. - assert: A runner is spawned to process the job and the message is removed from the queue. - The metrics are logged. - """ - mongodb_uri = await get_mongodb_uri(ops_test, app) - - run = await dispatch_workflow( - app=app, - branch=test_github_branch, - github_repository=github_repository, - conclusion="success", - workflow_id_or_name=DISPATCH_TEST_WORKFLOW_FILENAME, - wait=False, - ) - labels = {app.name, "x64"} # The architecture label should be ignored in the - # label validation in the reactive consumer. - job = create_job_details(run=run, labels=labels) - add_to_queue( - json.dumps(json.loads(job.json()) | {"ignored_noise": "foobar"}), - mongodb_uri, - app.name, - ) - - # This reconcile call is to check that we are not killing machines that are under - # construction in a subsequent reconciliation. - - # There may be a race condition between getting the token and spawning the machine. - await sleep(10) - await wait_for_reconcile(app) - - try: - await wait_for_completion(run, conclusion="success") - except TimeoutError: - assert False, ( - "Job did not complete successfully, check the reactive log using tmate," - " it might be due to infrastructure issues" - ) - - assert_queue_is_empty(mongodb_uri, app.name) - - async def _runner_installed_in_metrics_log() -> bool: - """Check if the runner_installed event is logged in the metrics log. - - Returns: - True if the runner_installed event is logged, False otherwise. - """ - # trigger reconcile which extracts metrics - await wait_for_reconcile(app) - metrics_log = await get_metrics_log(app.units[0]) - log_lines = list(map(lambda line: json.loads(line), metrics_log.splitlines())) - events = set(map(lambda line: line.get("event"), log_lines)) - return "runner_installed" in events - - try: - await wait_for(_runner_installed_in_metrics_log, check_interval=30, timeout=60 * 10) - except TimeoutError: - assert False, "runner_installed event has not been logged" - - await _assert_metrics_are_logged(app, github_repository) - - -@pytest.mark.abort_on_fail -async def test_reactive_mode_with_not_found_job( - ops_test: OpsTest, - app: Application, -): - """ - arrange: Place a message in the queue with a non-existent job url. - act: Call reconcile. - assert: The message is removed from the queue. - """ - mongodb_uri = await get_mongodb_uri(ops_test, app) - - labels = {app.name, "x64"} # The architecture label should be ignored in the - # label validation in the reactive consumer. - job = JobDetails( - labels=labels, - url="https://github.com/canonical/github-runner-operator/actions/runs/mock-run/job/mock-job", - ) - add_to_queue( - json.dumps(json.loads(job.json()) | {"ignored_noise": "foobar"}), - mongodb_uri, - app.name, - ) - - # There may be a race condition between getting the token and spawning the machine. - await sleep(10) - await wait_for_reconcile(app) - - assert_queue_is_empty(mongodb_uri, app.name) - - -@pytest.mark.abort_on_fail -async def test_reactive_mode_does_not_consume_jobs_with_unsupported_labels( - ops_test: OpsTest, - app: Application, - github_repository: Repository, - test_github_branch: Branch, -): - """ - arrange: Place a message with an unsupported label in the queue and dispatch a workflow. - act: Call reconcile. - assert: No runner is spawned and the message is not requeued. - """ - mongodb_uri = await get_mongodb_uri(ops_test, app) - run = await dispatch_workflow( - app=app, - branch=test_github_branch, - github_repository=github_repository, - conclusion="success", # this is ignored currently if wait=False kwarg is used - workflow_id_or_name=DISPATCH_TEST_WORKFLOW_FILENAME, - wait=False, - ) - job = create_job_details(run=run, labels={"not supported label"}) - add_to_queue( - job.json(), - mongodb_uri, - app.name, - ) - - # wait for queue being empty, there could be a race condition where it takes some - # time for the job message to be consumed and the queue to be empty - try: - await wait_for(lambda: get_queue_size(mongodb_uri, app.name) == 0) - run.update() - assert run.status == "queued" - finally: - run.cancel() # cancel the run to avoid a queued run in GitHub actions page - - -@pytest.mark.abort_on_fail -async def test_reactive_mode_scale_down( - ops_test: OpsTest, - app: Application, - github_repository: Repository, - test_github_branch: Branch, -): - """ - arrange: Scale down the number of virtual machines to 2 and spawn a job. - act: - 1. Scale down the number of virtual machines to 0 and call reconcile. - 2. Spawn a job. - assert: - 1. The job fails. - 2. The job is queued and there is a message in the queue. - """ - mongodb_uri = await get_mongodb_uri(ops_test, app) - - await app.set_config({MAX_TOTAL_VIRTUAL_MACHINES_CONFIG_NAME: "2"}) - await wait_for_reconcile(app) - - run = await dispatch_workflow( - app=app, - branch=test_github_branch, - github_repository=github_repository, - conclusion="success", # this is ignored currently if wait=False kwarg is used - workflow_id_or_name=DISPATCH_CRASH_TEST_WORKFLOW_FILENAME, - wait=False, - ) - job = create_job_details(run=run, labels={app.name}) - add_to_queue( - job.json(), - mongodb_uri, - app.name, - ) - - await wait_for_status(run, "in_progress") - # Sleep for enough time due to race condition where the reactive process would be sleeping due - # to job_picked_up check but the application would kill the process on scale down. - await sleep(WAIT_TIME_IN_SEC * 1.5) - - # 1. Scale down the number of virtual machines to 0 and call reconcile. - await app.set_config({MAX_TOTAL_VIRTUAL_MACHINES_CONFIG_NAME: "0"}) - await wait_for_reconcile(app) - - # we assume that the runner got deleted while running the job, so we expect a failed job - await wait_for_completion(run, conclusion="failure") - await wait_for_reconcile(app) - assert_queue_is_empty(mongodb_uri, app.name) - - # 2. Spawn a job. - run = await dispatch_workflow( - app=app, - branch=test_github_branch, - github_repository=github_repository, - conclusion="success", # this is ignored currently if wait=False kwarg is used - workflow_id_or_name=DISPATCH_CRASH_TEST_WORKFLOW_FILENAME, - wait=False, - ) - job = create_job_details(run=run, labels={app.name}) - add_to_queue( - job.json(), - mongodb_uri, - app.name, - ) - - await wait_for_reconcile(app) - - run.update() - assert run.status == "queued" - run.cancel() - - assert_queue_has_size(mongodb_uri, app.name, 1) - - -async def _assert_metrics_are_logged(app: Application, github_repository: Repository): - """Assert that all runner metrics are logged. - - Args: - app: The juju application, used to extract the metrics log and flavor name. - github_repository: The GitHub repository to be included in the metrics. - """ - metrics_log = await get_metrics_log(app.units[0]) - log_lines = list(map(lambda line: json.loads(line), metrics_log.splitlines())) - for metric_log in log_lines: - if metric_log.get("event") == "runner_installed": - assert metric_log.get("flavor") == app.name - assert metric_log.get("event") == "runner_installed" - assert metric_log.get("duration") >= 0 - await assert_events_after_reconciliation( - app=app, - github_repository=github_repository, - post_job_status=PostJobStatus.NORMAL, - reactive_mode=True, - ) diff --git a/tests/integration/utils_reactive.py b/tests/integration/utils_reactive.py deleted file mode 100644 index fa09c6b3a4..0000000000 --- a/tests/integration/utils_reactive.py +++ /dev/null @@ -1,208 +0,0 @@ -# Copyright 2026 Canonical Ltd. -# See LICENSE file for licensing details. - -"""Utilities for reactive mode.""" - -import json -import logging - -from github.WorkflowRun import WorkflowRun -from github_runner_manager.reactive.consumer import JobDetails -from juju.application import Application -from juju.model import Model -from juju.unit import Unit -from kombu import Connection -from pytest_operator.plugin import OpsTest - -logger = logging.getLogger(__name__) - - -async def get_mongodb_uri(ops_test: OpsTest, app: Application) -> str: - """Get the mongodb uri. - - Args: - ops_test: The ops_test plugin. - app: The juju application containing the unit. - - Returns: - The mongodb uri. - - """ - mongodb_uri = await get_mongodb_uri_from_integration_data(ops_test, app.units[0]) - if not mongodb_uri: - mongodb_uri = await get_mongodb_uri_from_secrets(ops_test, app.units[0], app.model) - assert mongodb_uri, "mongodb uri not found in integration data or secret" - return mongodb_uri - - -async def get_mongodb_uri_from_integration_data(ops_test: OpsTest, unit: Unit) -> str | None: - """Get the mongodb uri from the relation data. - - Args: - ops_test: The ops_test plugin. - unit: The juju unit containing the relation data. - - Returns: - The mongodb uri or None if not found. - """ - mongodb_uri = None - _, unit_data, _ = await ops_test.juju("show-unit", unit.name, "--format", "json") - unit_data = json.loads(unit_data) - - for rel_info in unit_data[unit.name]["relation-info"]: - if rel_info["endpoint"] == "mongodb": - try: - mongodb_uri = rel_info["application-data"]["uris"] - break - except KeyError: - pass - - return mongodb_uri - - -async def get_mongodb_uri_from_secrets(ops_test, unit: Unit, model: Model) -> str | None: - """Get the mongodb uri from the secrets. - - Args: - ops_test: The ops_test plugin. - unit: The juju unit containing the secret. - model: The juju model containing the unit. - - Returns: - The mongodb uri or None if not found. - """ - mongodb_uri = None - - juju_secrets = await model.list_secrets() - - # Juju < 3.6 returns a dictionary instead of a list - if not isinstance(juju_secrets, list): - juju_secrets = juju_secrets["results"] - - mongodb_secret_id = None - _, unit_data, _ = await ops_test.juju("show-unit", unit.name, "--format", "json") - unit_data = json.loads(unit_data) - for rel_info in unit_data[unit.name]["relation-info"]: - if rel_info["endpoint"] == "mongodb": - try: - mongodb_secret_id = rel_info["application-data"]["secret-user"] - break - except KeyError: - return None - - _, show_secret, _ = await ops_test.juju( - "show-secret", mongodb_secret_id, "--reveal", "--format", "json" - ) - show_secret = json.loads(show_secret) - for value in show_secret.values(): - mongodb_uri = value["content"]["Data"]["uris"] - break - return mongodb_uri - - -def create_job_details(run: WorkflowRun, labels: set[str]) -> JobDetails: - """Create a JobDetails object. - - Args: - run: The workflow run containing the job. Used to retrieve the job url. We assyne - the run only contains one job. - labels: The labels for the job. - - Returns: - The job details. - """ - jobs = list(run.jobs()) - assert len(jobs) == 1, "Expected 1 job to be created" - job = jobs[0] - job_url = job.url - job = JobDetails( - labels=labels, - url=job_url, - ) - return job - - -def add_to_queue(msg: str, mongodb_uri: str, queue_name: str) -> None: - """Add a message to a queue. - - Args: - msg: The message to add to the queue. - mongodb_uri: The mongodb uri. - queue_name: The name of the queue to add the message to. - """ - with Connection(mongodb_uri) as conn: - with conn.SimpleQueue(queue_name) as simple_queue: - simple_queue.put(msg, retry=True) - - -def clear_queue(mongodb_uri: str, queue_name: str) -> None: - """Clear the queue. - - Args: - mongodb_uri: The mongodb uri. - queue_name: The name of the queue to clear. - """ - with Connection(mongodb_uri) as conn: - with conn.SimpleQueue(queue_name) as simple_queue: - simple_queue.clear() - - -def assert_queue_is_empty(mongodb_uri: str, queue_name: str): - """Assert that the queue is empty. - - Args: - mongodb_uri: The mongodb uri. - queue_name: The name of the queue to check. - """ - assert_queue_has_size(mongodb_uri, queue_name, 0) - - -def assert_queue_has_size(mongodb_uri: str, queue_name: str, size: int): - """Assert that the queue is empty. - - Args: - mongodb_uri: The mongodb uri. - queue_name: The name of the queue to check. - size: The expected size of the queue. - - Raises: - AssertionError: if an unexpected queue size was encountered. - """ - queue_size = get_queue_size(mongodb_uri, queue_name) - try: - assert queue_size == size, f"Queue {queue_name} expected: {size}, received: {queue_size}" - except AssertionError: - _log_queue_contents(mongodb_uri=mongodb_uri, queue_name=queue_name) - raise - - -def get_queue_size(mongodb_uri: str, queue_name: str) -> int: - """Get the size of the queue. - - Args: - mongodb_uri: The mongodb uri. - queue_name: The name of the queue to check. - - Returns: - The size of the queue. - """ - with Connection(mongodb_uri) as conn: - with conn.SimpleQueue(queue_name) as simple_queue: - return simple_queue.qsize() - - -def _log_queue_contents(mongodb_uri: str, queue_name: str) -> None: - """Print contents of the queue for debugging. - - Args: - mongodb_uri: The mongodb uri. - queue_name: The name of the queue to check. - """ - with Connection(mongodb_uri) as conn: - with conn.SimpleQueue(queue_name) as simple_queue: - while simple_queue.qsize(): - queue_item = simple_queue.get(timeout=10) - logger.info( - "Queue item payload: %s, headers: %s", queue_item.payload, queue_item.headers - ) - queue_item.ack() diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 0402693531..c0f18a5d0c 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -643,7 +643,7 @@ def test_metric_log_ownership_for_upgrade( harness.charm._manager_client = manager_client_mock mock_manager_service = MagicMock() monkeypatch.setattr("charm.manager_service", mock_manager_service) - monkeypatch.setattr("charm.METRICS_LOG_PATH", mock_metric_log_path) + monkeypatch.setattr("charm.get_metrics_log_path", lambda: mock_metric_log_path) monkeypatch.setattr("charm.shutil", shutil_mock := MagicMock()) monkeypatch.setattr("charm.execute_command", MagicMock(return_value=(0, "Mock_stdout")))