diff --git a/src/dstack/_internal/core/compatibility/runs.py b/src/dstack/_internal/core/compatibility/runs.py index f1b7dc522..90ef9d98a 100644 --- a/src/dstack/_internal/core/compatibility/runs.py +++ b/src/dstack/_internal/core/compatibility/runs.py @@ -1,7 +1,9 @@ from typing import Optional from dstack._internal.core.models.common import IncludeExcludeDictType, IncludeExcludeSetType +from dstack._internal.core.models.configurations import ServiceConfiguration from dstack._internal.core.models.runs import ( + DEFAULT_PROBE_UNTIL_READY, DEFAULT_REPLICA_GROUP_NAME, ApplyRunPlanInput, JobSpec, @@ -60,12 +62,12 @@ def get_run_spec_excludes(run_spec: RunSpec) -> IncludeExcludeDictType: configuration_excludes: IncludeExcludeDictType = {} profile_excludes: IncludeExcludeSetType = set() - # Add excludes like this: - # - # if run_spec.configuration.tags is None: - # configuration_excludes["tags"] = True - # if run_spec.profile is not None and run_spec.profile.tags is None: - # profile_excludes.add("tags") + if isinstance(run_spec.configuration, ServiceConfiguration): + if run_spec.configuration.probes: + probe_excludes: IncludeExcludeDictType = {} + configuration_excludes["probes"] = {"__all__": probe_excludes} + if all(p.until_ready is None for p in run_spec.configuration.probes): + probe_excludes["until_ready"] = True if configuration_excludes: spec_excludes["configuration"] = configuration_excludes @@ -83,4 +85,10 @@ def get_job_spec_excludes(job_specs: list[JobSpec]) -> IncludeExcludeDictType: spec_excludes: IncludeExcludeDictType = {} if all(s.replica_group == DEFAULT_REPLICA_GROUP_NAME for s in job_specs): spec_excludes["replica_group"] = True + + probe_excludes: IncludeExcludeDictType = {} + spec_excludes["probes"] = {"__all__": probe_excludes} + if all(all(p.until_ready == DEFAULT_PROBE_UNTIL_READY for p in s.probes) for s in job_specs): + probe_excludes["until_ready"] = True + return spec_excludes diff --git a/src/dstack/_internal/core/models/configurations.py b/src/dstack/_internal/core/models/configurations.py index 3b2c7812b..d0c77443e 100644 --- a/src/dstack/_internal/core/models/configurations.py +++ b/src/dstack/_internal/core/models/configurations.py @@ -54,6 +54,7 @@ DEFAULT_PROBE_INTERVAL = 15 DEFAULT_PROBE_READY_AFTER = 1 DEFAULT_PROBE_METHOD = "get" +DEFAULT_PROBE_UNTIL_READY = False MAX_PROBE_URL_LEN = 2048 DEFAULT_REPLICA_GROUP_NAME = "0" @@ -372,6 +373,16 @@ class ProbeConfig(generate_dual_core_model(ProbeConfigConfig)): ), ), ] = None + until_ready: Annotated[ + Optional[bool], + Field( + description=( + "If `true`, the probe will stop being executed as soon as it reaches the" + " `ready_after` threshold of successful executions." + f" Defaults to `{str(DEFAULT_PROBE_UNTIL_READY).lower()}`" + ), + ), + ] = None @validator("timeout", pre=True) def parse_timeout(cls, v: Optional[Union[int, str]]) -> Optional[int]: diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index 27c6f430c..817dd4c49 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -17,6 +17,7 @@ ) from dstack._internal.core.models.configurations import ( DEFAULT_PROBE_METHOD, + DEFAULT_PROBE_UNTIL_READY, DEFAULT_REPLICA_GROUP_NAME, LEGACY_REPO_DIR, AnyRunConfiguration, @@ -247,6 +248,7 @@ class ProbeSpec(CoreModel): timeout: int interval: int ready_after: int + until_ready: bool = DEFAULT_PROBE_UNTIL_READY class JobSpec(CoreModel): diff --git a/src/dstack/_internal/server/background/tasks/process_probes.py b/src/dstack/_internal/server/background/tasks/process_probes.py index bc1dc0943..4f712ff4c 100644 --- a/src/dstack/_internal/server/background/tasks/process_probes.py +++ b/src/dstack/_internal/server/background/tasks/process_probes.py @@ -73,12 +73,15 @@ async def process_probes(): else: job_spec: JobSpec = JobSpec.__response__.parse_raw(probe.job.job_spec_data) probe_spec = job_spec.probes[probe.probe_num] - # Schedule the next probe execution in case this execution is interrupted - probe.due = get_current_datetime() + _get_probe_async_processing_timeout( - probe_spec - ) - # Execute the probe asynchronously outside of the DB session - PROBES_SCHEDULER.add_job(partial(_process_probe_async, probe, probe_spec)) + if probe_spec.until_ready and probe.success_streak >= probe_spec.ready_after: + probe.active = False + else: + # Schedule the next probe execution in case this execution is interrupted + probe.due = get_current_datetime() + _get_probe_async_processing_timeout( + probe_spec + ) + # Execute the probe asynchronously outside of the DB session + PROBES_SCHEDULER.add_job(partial(_process_probe_async, probe, probe_spec)) await session.commit() finally: probe_lockset.difference_update(probe_ids) diff --git a/src/dstack/_internal/server/services/jobs/configurators/base.py b/src/dstack/_internal/server/services/jobs/configurators/base.py index df6738a77..c55add681 100644 --- a/src/dstack/_internal/server/services/jobs/configurators/base.py +++ b/src/dstack/_internal/server/services/jobs/configurators/base.py @@ -15,6 +15,7 @@ DEFAULT_PROBE_METHOD, DEFAULT_PROBE_READY_AFTER, DEFAULT_PROBE_TIMEOUT, + DEFAULT_PROBE_UNTIL_READY, DEFAULT_PROBE_URL, DEFAULT_REPLICA_GROUP_NAME, LEGACY_REPO_DIR, @@ -444,6 +445,7 @@ def _probe_config_to_spec(c: ProbeConfig) -> ProbeSpec: method=c.method if c.method is not None else DEFAULT_PROBE_METHOD, headers=c.headers, body=c.body, + until_ready=c.until_ready if c.until_ready is not None else DEFAULT_PROBE_UNTIL_READY, ) diff --git a/src/tests/_internal/server/background/tasks/test_process_probes.py b/src/tests/_internal/server/background/tasks/test_process_probes.py index ebc3c15b9..928709dd7 100644 --- a/src/tests/_internal/server/background/tasks/test_process_probes.py +++ b/src/tests/_internal/server/background/tasks/test_process_probes.py @@ -163,6 +163,67 @@ async def test_schedules_probe_execution(self, test_db, session: AsyncSession) - + PROCESSING_OVERHEAD_TIMEOUT ) + async def test_deactivates_probe_when_until_ready_and_ready_after_reached( + self, test_db, session: AsyncSession + ) -> None: + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo( + session=session, + project_id=project.id, + ) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + run_spec=get_run_spec( + run_name="test", + repo_id=repo.name, + configuration=ServiceConfiguration( + port=80, + image="nginx", + probes=[ + ProbeConfig( + type="http", url="/until_ready", until_ready=True, ready_after=3 + ), + ProbeConfig(type="http", url="/regular", until_ready=False, ready_after=3), + ], + ), + ), + ) + instance = await create_instance( + session=session, + project=project, + status=InstanceStatus.BUSY, + ) + job = await create_job( + session=session, + run=run, + status=JobStatus.RUNNING, + job_provisioning_data=get_job_provisioning_data(), + instance=instance, + instance_assigned=True, + ) + + probe_until_ready = await create_probe(session, job, probe_num=0, success_streak=3) + probe_regular = await create_probe(session, job, probe_num=1, success_streak=3) + + with patch( + "dstack._internal.server.background.tasks.process_probes.PROBES_SCHEDULER" + ) as scheduler_mock: + await process_probes() + + await session.refresh(probe_until_ready) + await session.refresh(probe_regular) + + assert not probe_until_ready.active + assert probe_until_ready.success_streak == 3 + + assert probe_regular.active + assert probe_regular.success_streak == 3 + assert scheduler_mock.add_job.call_count == 1 # only the regular probe was scheduled + # TODO: test probe success and failure # (skipping for now - a bit difficult to test and most of the logic will be mocked)