Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/sagemaker/hyperpod/training/hyperpod_pytorch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ def _process_replica_resources(cls, data):
spec = template.get('spec', {})
node_selector = spec.get('nodeSelector', {})
instance_type = node_selector.get(INSTANCE_TYPE_LABEL) if node_selector else None
if not instance_type:
return None

containers = spec.get('containers', [])

Expand All @@ -117,6 +115,16 @@ def _process_replica_resources(cls, data):
requests = resources.get('requests', {})
limits = resources.get('limits', {})

# Check if accelerators are specified without instance_type
accel_keys = ['accelerators', NVIDIA_RESOURCE_KEY, NEURON_RESOURCE_KEY]
has_accelerators = any(requests.get(k) and str(requests.get(k)) != "0" for k in accel_keys)
has_accelerators_limit = any(limits.get(k) and str(limits.get(k)) != "0" for k in accel_keys)
if not instance_type and (has_accelerators or has_accelerators_limit):
raise ValueError("--instance-type is required when specifying accelerator resources")

if not instance_type:
return None

accelerators = None
if requests.get('accelerators'):
accelerators = int(requests.get('accelerators'))
Expand Down
3 changes: 0 additions & 3 deletions src/sagemaker/hyperpod/training/quota_allocation_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,6 @@ def _get_limits(instance_type: str, vcpu_limit: Optional[float], memory_in_gib_l
if accelerators_limit is not None:
if type_of_accelerator is not None:
result[type_of_accelerator] = accelerators_limit
else:
# user specified accelerator limit but the instance type wasn't found, set limit to 0 as a precaution
result["nvidia.com/gpu"] = 0
if accelerator_partition_limit is not None:
result[f"nvidia.com/{accelerator_partition_type}"] = accelerator_partition_limit
if memory_in_gib_limit is not None:
Expand Down
12 changes: 6 additions & 6 deletions test/unit_tests/cli/test_quota_allocation_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,18 +193,18 @@ def test_get_limits_trainium_instance(self):

def test_get_limits_cpu_only_instance(self):
result = _get_limits("ml.c5.large", 2.0, 8.0, 1, None, None)
# CPU-only instance should set accelerator limit to 0 as precaution
assert result == {"cpu": "2.0", "memory": "8.0Gi", "nvidia.com/gpu": 0}
# CPU-only instance has no accelerator type, so accelerator limit is dropped
assert result == {"cpu": "2.0", "memory": "8.0Gi"}

def test_get_limits_invalid_instance_type(self):
result = _get_limits("invalid-instance", 4.0, 16.0, 2, None, None)
# Invalid instance type should set accelerator limit to 0 as precaution
assert result == {"cpu": "4.0", "memory": "16.0Gi", "nvidia.com/gpu": 0}
# Invalid instance type has no accelerator type, so accelerator limit is dropped
assert result == {"cpu": "4.0", "memory": "16.0Gi"}

def test_get_limits_cpu_instance_r7i(self):
result = _get_limits("ml.r7i.48xlarge", 16.0, 64.0, 2, None, None)
# CPU-only instance (ml.r7i.48xlarge) should set accelerator limit to 0 as precaution
assert result == {"cpu": "16.0", "memory": "64.0Gi", "nvidia.com/gpu": 0}
# CPU-only instance has no accelerator type, so accelerator limit is dropped
assert result == {"cpu": "16.0", "memory": "64.0Gi"}

def test_is_valid_no_instance_type_with_resources(self):
valid, message = _is_valid(4.0, 16.0, None, None, None, None)
Expand Down
209 changes: 77 additions & 132 deletions test/unit_tests/training/test_pytorch_job_template_model.py
Original file line number Diff line number Diff line change
@@ -1,153 +1,98 @@
import unittest
from hyperpod_pytorch_job_template.v1_1.model import PyTorchJobConfig
from hyperpod_pytorch_job_template.v1_0.model import PyTorchJobConfig as PyTorchJobConfigV1_0
from sagemaker.hyperpod.training.hyperpod_pytorch_job import HyperPodPytorchJob


class TestPyTorchJobConfigEFA(unittest.TestCase):
"""Test EFA resource allocation in PyTorchJobConfig"""

# def test_single_node_no_efa(self):
# """Test that single-node jobs don't get EFA resources"""
# config = PyTorchJobConfig(
# job_name="test-single-node",
# image="pytorch:latest",
# node_count=1,
# accelerators=2,
# instance_type="ml.p4d.24xlarge"
# )

# job = config.to_domain()
# container = job.replicaSpecs[0].template.spec.containers[0]

# # Should not have EFA resources
# self.assertNotIn("vpc.amazonaws.com/efa", container.resources.requests)
# self.assertNotIn("vpc.amazonaws.com/efa", container.resources.limits)

# # Should have GPU resources
# self.assertEqual(container.resources.requests["nvidia.com/gpu"], "2")

# def test_multi_node_with_efa(self):
# """Test that multi-node jobs automatically get EFA resources"""
# config = PyTorchJobConfig(
# job_name="test-multi-node",
# image="pytorch:latest",
# node_count=4,
# accelerators=8,
# instance_type="ml.p4d.24xlarge"
# )

# job = config.to_domain()
# container = job.replicaSpecs[0].template.spec.containers[0]

# # Should have EFA resources
# self.assertEqual(container.resources.requests["vpc.amazonaws.com/efa"], "1")
# self.assertEqual(container.resources.limits["vpc.amazonaws.com/efa"], "1")

# # Should also have GPU resources
# self.assertEqual(container.resources.requests["nvidia.com/gpu"], "8")

def test_instance_without_efa_support_no_efa(self):
"""Test that instances without EFA support don't get EFA (ml.g5.xlarge doesn't support EFA)"""
from sagemaker.hyperpod.training.hyperpod_pytorch_job import HyperPodPytorchJob
class TestResourceAllocation(unittest.TestCase):
"""Test resource allocation through the full pipeline (to_domain + allocate_quotas_if_applicable).

config = PyTorchJobConfig(
job_name="test-no-efa-support",
image="pytorch:latest",
accelerators=1,
instance_type="ml.g5.xlarge"
)
These tests verify that config fields are correctly resolved into k8s resource keys
(e.g. 'accelerators' -> 'nvidia.com/gpu') and that EFA is auto-populated from instance constants.
"""

def _resolve(self, **kwargs):
"""Helper: create config, convert to domain, and run quota allocation."""
config = PyTorchJobConfig(image="pytorch:latest", **kwargs)
job = config.to_domain()
# Call allocate_quotas_if_applicable to convert generic keys to actual resource keys
job_with_resources = HyperPodPytorchJob.allocate_quotas_if_applicable(job)
container = job_with_resources.replicaSpecs[0].template.spec.containers[0]
return HyperPodPytorchJob.allocate_quotas_if_applicable(job)

# Should not have EFA resources (instance doesn't support it)
self.assertNotIn("vpc.amazonaws.com/efa", container.resources.requests)
self.assertNotIn("vpc.amazonaws.com/efa", container.resources.limits)
def _get_resources(self, job):
container = job.replicaSpecs[0].template.spec.containers[0]
return container.resources.requests, container.resources.limits

# Should have GPU resources
self.assertIn("nvidia.com/gpu", container.resources.requests)
def test_single_node_gpu_job(self):
"""Single-node GPU job gets resolved accelerator key and no EFA (g5.xlarge has 0 EFA)."""
job = self._resolve(job_name="test-single", accelerators=1, instance_type="ml.g5.xlarge")
requests, limits = self._get_resources(job)

def test_accelerators_with_efa_support_gets_default_efa(self):
"""Test that specifying accelerators on EFA-capable instance gets EFA from constants"""
from sagemaker.hyperpod.training.hyperpod_pytorch_job import HyperPodPytorchJob

config = PyTorchJobConfig(
job_name="test-accelerators-default-efa",
image="pytorch:latest",
accelerators=4,
instance_type="ml.p4d.24xlarge"
)
self.assertIn("nvidia.com/gpu", requests)
self.assertNotIn("vpc.amazonaws.com/efa", requests)

job = config.to_domain()
# Call allocate_quotas_if_applicable to convert generic keys to actual resource keys
job_with_resources = HyperPodPytorchJob.allocate_quotas_if_applicable(job)
container = job_with_resources.replicaSpecs[0].template.spec.containers[0]
def test_multi_node_gpu_job_with_efa(self):
"""Multi-node GPU job on EFA-capable instance gets auto-populated EFA from constants."""
job = self._resolve(job_name="test-multi", accelerators=8, instance_type="ml.p4d.24xlarge")
requests, limits = self._get_resources(job)

# Should have EFA from constants
self.assertIn("vpc.amazonaws.com/efa", container.resources.requests)
self.assertIn("vpc.amazonaws.com/efa", container.resources.limits)
self.assertEqual(int(container.resources.requests["vpc.amazonaws.com/efa"]), 4)
self.assertEqual(int(requests["nvidia.com/gpu"]), 8)
self.assertIn("vpc.amazonaws.com/efa", requests)
self.assertEqual(int(requests["vpc.amazonaws.com/efa"]), 4) # p4d has 4 EFA interfaces

def test_user_specified_efa_overrides_default(self):
"""Test that user-specified EFA value overrides the default from constants"""
from sagemaker.hyperpod.training.hyperpod_pytorch_job import HyperPodPytorchJob
"""User-specified EFA value takes precedence over the instance default."""
job = self._resolve(job_name="test-efa", accelerators=4, efa_interfaces=2, instance_type="ml.p4d.24xlarge")
requests, limits = self._get_resources(job)

self.assertEqual(int(requests["vpc.amazonaws.com/efa"]), 2)
self.assertEqual(int(limits["vpc.amazonaws.com/efa"]), 2)

def test_instance_without_efa_gets_no_efa(self):
"""Instance that doesn't support EFA (g5.xlarge) gets no EFA resources."""
job = self._resolve(job_name="test-no-efa", accelerators=1, instance_type="ml.g5.xlarge")
requests, limits = self._get_resources(job)

self.assertNotIn("vpc.amazonaws.com/efa", requests)
self.assertNotIn("vpc.amazonaws.com/efa", limits)
self.assertIn("nvidia.com/gpu", requests)

def test_all_resource_types_together(self):
"""Accelerators, CPU, memory, and EFA all resolve correctly in a single job."""
job = self._resolve(
job_name="test-all", accelerators=4, vcpu=16.0, memory=64.0,
instance_type="ml.p4d.24xlarge",
)
requests, limits = self._get_resources(job)

config = PyTorchJobConfig(
job_name="test-custom-efa",
image="pytorch:latest",
accelerators=4,
efa_interfaces=2,
instance_type="ml.p4d.24xlarge"
self.assertIn("nvidia.com/gpu", requests)
self.assertIn("cpu", requests)
self.assertIn("memory", requests)
self.assertIn("vpc.amazonaws.com/efa", requests)

def test_accelerators_without_instance_type_rejected(self):
"""Specifying accelerators without instance_type raises a clear error."""
from sagemaker.hyperpod.training.config.hyperpod_pytorch_job_unified_config import (
Containers, ReplicaSpec, Resources, Spec, Template,
)

job = config.to_domain()
# Call allocate_quotas_if_applicable to convert generic keys to actual resource keys
job_with_resources = HyperPodPytorchJob.allocate_quotas_if_applicable(job)
container = job_with_resources.replicaSpecs[0].template.spec.containers[0]

# Should use user-specified EFA value
self.assertEqual(int(container.resources.requests["vpc.amazonaws.com/efa"]), 2)
self.assertEqual(int(container.resources.limits["vpc.amazonaws.com/efa"]), 2)

# def test_multi_node_with_memory_and_cpu(self):
# """Test EFA with other resource types"""
# config = PyTorchJobConfig(
# job_name="test-multi-resources",
# image="pytorch:latest",
# node_count=2,
# accelerators=4,
# vcpu=16.0,
# memory=64.0,
# instance_type="ml.p4d.24xlarge"
# )

# job = config.to_domain()
# container = job.replicaSpecs[0].template.spec.containers[0]

# # Should have all resources including EFA
# self.assertEqual(container.resources.requests["vpc.amazonaws.com/efa"], "1")
# self.assertEqual(container.resources.requests["nvidia.com/gpu"], "4")
# self.assertEqual(container.resources.requests["cpu"], "16.0")
# self.assertEqual(container.resources.requests["memory"], "64.0Gi")

# def test_accelerators_without_instance_type(self):
# """Test that accelerators work without instance_type (fixes the main issue)"""
# config = PyTorchJobConfig(
# job_name="test-no-instance-type",
# image="pytorch:latest",
# accelerators=4
# # No instance_type specified
# )

# job = config.to_domain()
# container = job.replicaSpecs[0].template.spec.containers[0]

# # Should respect accelerators value even without instance_type
# self.assertEqual(container.resources.requests["nvidia.com/gpu"], "4")
# # Limits should default to "0" since accelerators_limit not specified
# self.assertEqual(container.resources.limits["nvidia.com/gpu"], "0")
job = HyperPodPytorchJob(
metadata={"name": "test-no-instance-type", "namespace": "default"},
replica_specs=[ReplicaSpec(
name="pod",
template=Template(
spec=Spec(containers=[Containers(
name="test",
image="pytorch:latest",
resources=Resources(
requests={"nvidia.com/gpu": "4"},
limits={"nvidia.com/gpu": "4"},
),
)])
),
)],
)

with self.assertRaises(ValueError, msg="--instance-type is required when specifying accelerator resources"):
HyperPodPytorchJob.allocate_quotas_if_applicable(job)


class TestDeepHealthCheckNodeSelector(unittest.TestCase):
Expand Down
Loading