From 694019e81d41fc77c0bdb7df171374f826f34ec3 Mon Sep 17 00:00:00 2001 From: Luca Carrogu Date: Tue, 5 Dec 2023 14:47:19 +0100 Subject: [PATCH] Change update policy for MinCount, MaxCount, Queue and ComputeResource Change update policy for * MinCount and MaxCount, from COMPUTE_FLEET_STOP to RESIZE_UPDATE_STRATEGY_ON_REMOVE * Queue and ComputeResource, from COMPUTE_FLEET_STOP_ON_REMOVE to RESIZE_UPDATE_STRATEGY_ON_REMOVE. RESIZE_UPDATE_STRATEGY_ON_REMOVE is a new update policy that permits update in one of the following cases: * compute fleet is stopped * QueueUpdateStrategy is set to TERMINATE * a new Queue is added * a new ComputeResource is added * MaxCount is increased * MinCount is increased AND MaxCount is increased of at least the same amount When setting QueueUpdateStrategy = TERMINATE, only the nodes at the back of the node list will be terminated. Example: * cluster initial capacity is `MinCount = 5` and `MaxCount = 10`, the nodes are `st-[1-5]; dy-[1-5]` * when resizing the cluster to `MinCount = 3` and `MaxCount = 5`, the new cluster capacity will be composed by the nodes `st-[1-3]; dy-[1-2]`, which will not be touched during the update * the nodes `st-[4-5]; dy-[3-5]` are going to be terminated This is possible by the adoption of Slurm 23.11, ref https://slurm.schedmd.com/news.html Signed-off-by: Luca Carrogu --- CHANGELOG.md | 6 +- cli/src/pcluster/config/update_policy.py | 77 ++- cli/src/pcluster/schemas/cluster_schema.py | 12 +- .../pcluster/config/test_config_patch.py | 18 +- .../pcluster/config/test_update_policy.py | 557 +++++++++++++++--- .../tests/update/test_update.py | 71 ++- .../pcluster.config.update_resize.yaml | 40 ++ ...uster.config.update_with_running_job.yaml} | 0 ...er.config.update_without_running_job.yaml} | 6 +- 9 files changed, 695 insertions(+), 92 deletions(-) create mode 100644 tests/integration-tests/tests/update/test_update/test_queue_parameters_update/pcluster.config.update_resize.yaml rename tests/integration-tests/tests/update/test_update/test_queue_parameters_update/{pcluster.config.update_drain.yaml => pcluster.config.update_with_running_job.yaml} (100%) rename tests/integration-tests/tests/update/test_update/test_queue_parameters_update/{pcluster.update_drain_without_running_job.yaml => pcluster.config.update_without_running_job.yaml} (88%) diff --git a/CHANGELOG.md b/CHANGELOG.md index a983338168..bda51743d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,10 +16,14 @@ CHANGELOG **CHANGES** - Upgrade Slurm to 23.11.1. +- Permit to update `MinCount`, `MaxCount`, `Queue` and `ComputeResource` configuration parameters without the need to + stop the compute fleet. It's now possible to update them by setting `Scheduling/SlurmSettings/QueueUpdateStrategy` + to TERMINATE. ParallelCluster will terminate only the nodes removed during a resize of the cluster capacity + performed through a cluster update. - Add support for Python 3.11, 3.12 in pcluster CLI and aws-parallelcluster-batch-cli. - Upgrade Python to version 3.12 and NodeJS to version 18 in ParallelCluster Lambda Layer. - Build network interfaces using network card index from `NetworkCardIndex` list of EC2 DescribeInstances response, - instead of looping over `MaximumNetworkCards` range. + instead of looping over `MaximumNetworkCards` range. 3.8.0 ------ diff --git a/cli/src/pcluster/config/update_policy.py b/cli/src/pcluster/config/update_policy.py index 14121ff3e9..fbf1be9a62 100644 --- a/cli/src/pcluster/config/update_policy.py +++ b/cli/src/pcluster/config/update_policy.py @@ -12,7 +12,7 @@ from enum import Enum from pcluster.config.cluster_config import QueueUpdateStrategy -from pcluster.constants import AWSBATCH, DEFAULT_MAX_COUNT, SLURM +from pcluster.constants import AWSBATCH, DEFAULT_MAX_COUNT, DEFAULT_MIN_COUNT, SLURM class UpdatePolicy: @@ -113,6 +113,15 @@ def actions_needed_queue_update_strategy(change, _): return actions +def actions_needed_resize_update_strategy_on_remove(*_): + return ( + "Stop the compute fleet with the pcluster update-compute-fleet command, " + "or set QueueUpdateStrategy to TERMINATE in the configuration used for the 'update-cluster' operation. " + "Be aware that this update will remove nodes from the scheduler and terminates the EC2 instances " + "associated. Jobs running on the removed nodes will terminate" + ) + + def actions_needed_managed_placement_group(change, patch): if is_managed_placement_group_deletion(change, patch): actions = "Stop the compute fleet with the pcluster update-compute-fleet command." @@ -258,6 +267,10 @@ def fail_reason_queue_update_strategy(change, _): return reason +def fail_reason_resize_update_strategy_on_remove(*_): + return "All compute nodes must be stopped or QueueUpdateStrategy must be set to TERMINATE" + + def fail_reason_managed_placement_group(change, patch): if is_managed_placement_group_deletion(change, patch): reason = "All compute nodes must be stopped for a managed placement group deletion" @@ -287,6 +300,16 @@ def is_queue_update_strategy_set(patch): ) +def is_resize_update_strategy_terminate(patch): + # Return true if the update strategy is set to TERMINATE + update_strategy = ( + patch.target_config.get("Scheduling") + .get("SlurmSettings", {}) + .get("QueueUpdateStrategy", QueueUpdateStrategy.COMPUTE_FLEET_STOP.value) + ) + return update_strategy == QueueUpdateStrategy.TERMINATE.value + + def condition_checker_queue_update_strategy(change, patch): result = not patch.cluster.has_running_capacity() # QueueUpdateStrategy can override UpdatePolicy of parameters under SlurmQueues @@ -296,6 +319,48 @@ def condition_checker_queue_update_strategy(change, patch): return result +def condition_checker_resize_update_strategy_on_remove(change, patch): + # Check if fleet is stopped + result = not patch.cluster.has_running_capacity() + + # Check if the change is inside a Queue section + if not result and (is_slurm_queues_change(change) or change.key == "SlurmQueues"): + # Check if QueueUpdateStrategy is TERMINATE + result = result or is_resize_update_strategy_terminate(patch) + + # Queue or ComputeResource can be added + if not result and change.is_list: + result = result or (change.old_value is None and change.new_value is not None) + + # Check if MaxCount is increased + if not result and change.key == "MaxCount": + result = result or (int(change.new_value) if change.new_value is not None else DEFAULT_MAX_COUNT) >= ( + int(change.old_value) if change.old_value is not None else DEFAULT_MAX_COUNT + ) + + # Check if MinCount is increased and MaxCount is increased of at least the same amount + if not result and change.key == "MinCount": + path = change.path + for other_change in patch.changes: + # Check the value of MaxCount next to MinCount. + # MinCount and MaxCount next to each other have the same path + if path == other_change.path and other_change.key == "MaxCount": + other_change_new_value = ( + int(other_change.new_value) if other_change.new_value is not None else DEFAULT_MAX_COUNT + ) + other_change_old_value = ( + int(other_change.old_value) if other_change.old_value is not None else DEFAULT_MAX_COUNT + ) + change_new_value = int(change.new_value) if change.new_value is not None else DEFAULT_MIN_COUNT + change_old_value = int(change.old_value) if change.old_value is not None else DEFAULT_MIN_COUNT + result = result or ( + (other_change_new_value - other_change_old_value) >= (change_new_value - change_old_value) + ) + break + + return result + + def condition_checker_queue_update_strategy_on_remove(change, patch): result = not patch.cluster.has_running_capacity() # Update of list element value is possible if one of the following is verified: @@ -414,6 +479,7 @@ def condition_checker_login_nodes_stop_policy(_, patch): ), "pcluster_stop": lambda change, patch: "Stop the compute fleet with the pcluster update-compute-fleet command", "pcluster_stop_conditional": actions_needed_queue_update_strategy, + "pcluster_resize_conditional": actions_needed_resize_update_strategy_on_remove, "managed_placement_group": actions_needed_managed_placement_group, "shared_storage_update_conditional": actions_needed_shared_storage_update, "managed_fsx": actions_needed_managed_fsx, @@ -472,6 +538,15 @@ def condition_checker_login_nodes_stop_policy(_, patch): condition_checker=condition_checker_queue_update_strategy, ) +# Update supported with fleet stopped or with replacement policy set to TERMINATE +UpdatePolicy.RESIZE_UPDATE_STRATEGY_ON_REMOVE = UpdatePolicy( + name="RESIZE_UPDATE_STRATEGY_ON_REMOVE", + level=5, + fail_reason=fail_reason_resize_update_strategy_on_remove, + action_needed=UpdatePolicy.ACTIONS_NEEDED["pcluster_resize_conditional"], + condition_checker=condition_checker_resize_update_strategy_on_remove, +) + # We must force COMPUTE_FLEET_STOP for the deletion of managed groups, otherwise fall back to QUEUE_UPDATE_STRATEGY UpdatePolicy.MANAGED_PLACEMENT_GROUP = UpdatePolicy( name="MANAGED_PLACEMENT_GROUP", diff --git a/cli/src/pcluster/schemas/cluster_schema.py b/cli/src/pcluster/schemas/cluster_schema.py index e0fb5fd3aa..ec8743bd09 100644 --- a/cli/src/pcluster/schemas/cluster_schema.py +++ b/cli/src/pcluster/schemas/cluster_schema.py @@ -1501,8 +1501,12 @@ class SlurmComputeResourceSchema(_ComputeResourceSchema): many=True, metadata={"update_policy": UpdatePolicy.COMPUTE_FLEET_STOP_ON_REMOVE, "update_key": "InstanceType"}, ) - max_count = fields.Int(validate=validate.Range(min=1), metadata={"update_policy": UpdatePolicy.MAX_COUNT}) - min_count = fields.Int(validate=validate.Range(min=0), metadata={"update_policy": UpdatePolicy.COMPUTE_FLEET_STOP}) + max_count = fields.Int( + validate=validate.Range(min=1), metadata={"update_policy": UpdatePolicy.RESIZE_UPDATE_STRATEGY_ON_REMOVE} + ) + min_count = fields.Int( + validate=validate.Range(min=0), metadata={"update_policy": UpdatePolicy.RESIZE_UPDATE_STRATEGY_ON_REMOVE} + ) spot_price = fields.Float( validate=validate.Range(min=0), metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY} ) @@ -1649,7 +1653,7 @@ class SlurmQueueSchema(_CommonQueueSchema): compute_resources = fields.Nested( SlurmComputeResourceSchema, many=True, - metadata={"update_policy": UpdatePolicy.COMPUTE_FLEET_STOP_ON_REMOVE, "update_key": "Name"}, + metadata={"update_policy": UpdatePolicy.RESIZE_UPDATE_STRATEGY_ON_REMOVE, "update_key": "Name"}, ) networking = fields.Nested( SlurmQueueNetworkingSchema, required=True, metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY} @@ -1781,7 +1785,7 @@ class SchedulingSchema(BaseSchema): slurm_queues = fields.Nested( SlurmQueueSchema, many=True, - metadata={"update_policy": UpdatePolicy.COMPUTE_FLEET_STOP_ON_REMOVE, "update_key": "Name"}, + metadata={"update_policy": UpdatePolicy.RESIZE_UPDATE_STRATEGY_ON_REMOVE, "update_key": "Name"}, ) # Awsbatch schema: aws_batch_queues = fields.Nested( diff --git a/cli/tests/pcluster/config/test_config_patch.py b/cli/tests/pcluster/config/test_config_patch.py index 2e42bdb8bc..3ea13f6c2b 100644 --- a/cli/tests/pcluster/config/test_config_patch.py +++ b/cli/tests/pcluster/config/test_config_patch.py @@ -163,7 +163,7 @@ def _sorting_func(change): "MaxCount", 1, 2, - UpdatePolicy.MAX_COUNT, + UpdatePolicy.RESIZE_UPDATE_STRATEGY_ON_REMOVE, False, id="change compute resources max count", ), @@ -332,7 +332,7 @@ def _test_compute_resources(base_conf, target_conf): "ComputeResources", {"Name": "compute-removed", "InstanceType": "c5.9xlarge", "MaxCount": 20}, None, - UpdatePolicy.COMPUTE_FLEET_STOP_ON_REMOVE, + UpdatePolicy.RESIZE_UPDATE_STRATEGY_ON_REMOVE, is_list=True, ), Change( @@ -340,11 +340,11 @@ def _test_compute_resources(base_conf, target_conf): "ComputeResources", None, {"Name": "new-compute", "InstanceType": "c5.large", "MinCount": 1}, - UpdatePolicy.COMPUTE_FLEET_STOP_ON_REMOVE, + UpdatePolicy.RESIZE_UPDATE_STRATEGY_ON_REMOVE, is_list=True, ), ], - UpdatePolicy.COMPUTE_FLEET_STOP_ON_REMOVE, + UpdatePolicy.RESIZE_UPDATE_STRATEGY_ON_REMOVE, ) @@ -381,7 +381,7 @@ def _test_queues(base_conf, target_conf): "ComputeResources": {"Name": "compute-removed", "InstanceType": "c5.9xlarge"}, }, None, - UpdatePolicy.COMPUTE_FLEET_STOP_ON_REMOVE, + UpdatePolicy.RESIZE_UPDATE_STRATEGY_ON_REMOVE, is_list=True, ), Change( @@ -393,11 +393,11 @@ def _test_queues(base_conf, target_conf): "Networking": {"SubnetIds": "subnet-987654321"}, "ComputeResources": {"Name": "new-compute", "InstanceType": "c5.xlarge"}, }, - UpdatePolicy.COMPUTE_FLEET_STOP_ON_REMOVE, + UpdatePolicy.RESIZE_UPDATE_STRATEGY_ON_REMOVE, is_list=True, ), ], - UpdatePolicy.COMPUTE_FLEET_STOP_ON_REMOVE, + UpdatePolicy.RESIZE_UPDATE_STRATEGY_ON_REMOVE, ) @@ -566,7 +566,7 @@ def _test_less_target_sections(base_conf, target_conf): "MinCount", 1, None, - UpdatePolicy.COMPUTE_FLEET_STOP, + UpdatePolicy.RESIZE_UPDATE_STRATEGY_ON_REMOVE, is_list=False, ), Change( @@ -670,7 +670,7 @@ def _test_more_target_sections(base_conf, target_conf): "MinCount", None, 1, - UpdatePolicy.COMPUTE_FLEET_STOP, + UpdatePolicy.RESIZE_UPDATE_STRATEGY_ON_REMOVE, is_list=False, ), Change( diff --git a/cli/tests/pcluster/config/test_update_policy.py b/cli/tests/pcluster/config/test_update_policy.py index 6f7af3424e..d13f904737 100644 --- a/cli/tests/pcluster/config/test_update_policy.py +++ b/cli/tests/pcluster/config/test_update_policy.py @@ -56,6 +56,473 @@ def test_max_count_policy(mocker, is_fleet_stopped, old_max, new_max, expected_r cluster_has_running_capacity_mock.assert_called() +@pytest.mark.parametrize( + "is_fleet_stopped, key, path, old_value, new_value, update_strategy, other_changes, expected_result", + [ + # tests with fleet stopped + pytest.param( + True, + "SlurmQueues", + ["Scheduling"], + None, + { + "Name": "queue-added", + "Networking": {"SubnetIds": "subnet-12345678"}, + "ComputeResources": {"Name": "compute-added", "InstanceType": "c5.9xlarge"}, + }, + None, + [], + True, + id="stopped fleet and queue is added", + ), + pytest.param( + True, + "SlurmQueues", + ["Scheduling"], + { + "Name": "queue-to-remove", + "Networking": {"SubnetIds": "subnet-12345678"}, + "ComputeResources": {"Name": "compute-to-remove", "InstanceType": "c5.9xlarge"}, + }, + None, + None, + [], + True, + id="stopped fleet and queue is removed", + ), + pytest.param( + True, + "ComputeResources", + ["Scheduling", "SlurmQueues[queue1]"], + None, + {"Name": "compute-added", "InstanceType": "c5.large", "MinCount": 1}, + None, + [], + True, + id="stopped fleet and compute is added", + ), + pytest.param( + True, + "ComputeResources", + ["Scheduling", "SlurmQueues[queue1]"], + {"Name": "compute-to-remove", "InstanceType": "c5.large", "MinCount": 1}, + None, + None, + [], + True, + id="stopped fleet and compute is removed", + ), + pytest.param( + True, + "MinCount", + ["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + 10, + 0, + None, + [], + True, + id="stopped fleet and min count is decreased", + ), + pytest.param( + True, + "MinCount", + ["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + 0, + 10, + None, + [], + True, + id="stopped fleet and min count is increased", + ), + pytest.param( + True, + "MaxCount", + ["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + 10, + 0, + None, + [], + True, + id="stopped fleet and max count is decreased", + ), + pytest.param( + True, + "MaxCount", + ["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + 0, + 10, + None, + [], + True, + id="stopped fleet and max count is increased", + ), + # tests with fleet running + pytest.param( + False, + "SlurmQueues", + ["Scheduling"], + None, + { + "Name": "queue-added", + "Networking": {"SubnetIds": "subnet-12345678"}, + "ComputeResources": {"Name": "compute-added", "InstanceType": "c5.9xlarge"}, + }, + None, + [], + True, + id="running fleet and queue is added", + ), + pytest.param( + False, + "SlurmQueues", + ["Scheduling"], + { + "Name": "queue-to-remove", + "Networking": {"SubnetIds": "subnet-12345678"}, + "ComputeResources": {"Name": "compute-to-remove", "InstanceType": "c5.9xlarge"}, + }, + None, + None, + [], + False, + id="running fleet and queue is removed wo update strategy", + ), + pytest.param( + False, + "SlurmQueues", + ["Scheduling"], + { + "Name": "queue-to-remove", + "Networking": {"SubnetIds": "subnet-12345678"}, + "ComputeResources": {"Name": "compute-to-remove", "InstanceType": "c5.9xlarge"}, + }, + None, + QueueUpdateStrategy.TERMINATE.value, + [], + True, + id="running fleet and queue is removed with TERMINATE update strategy", + ), + pytest.param( + False, + "SlurmQueues", + ["Scheduling"], + { + "Name": "queue-to-remove", + "Networking": {"SubnetIds": "subnet-12345678"}, + "ComputeResources": {"Name": "compute-to-remove", "InstanceType": "c5.9xlarge"}, + }, + None, + QueueUpdateStrategy.DRAIN.value, + [], + False, + id="running fleet and queue is removed with DRAIN update strategy", + ), + pytest.param( + False, + "ComputeResources", + ["Scheduling", "SlurmQueues[queue1]"], + None, + {"Name": "compute-added", "InstanceType": "c5.large", "MinCount": 1}, + None, + [], + True, + id="running fleet and compute is added", + ), + pytest.param( + False, + "ComputeResources", + ["Scheduling", "SlurmQueues[queue1]"], + {"Name": "compute-to-remove", "InstanceType": "c5.large", "MinCount": 1}, + None, + None, + [], + False, + id="running fleet and compute is removed wo update strategy", + ), + pytest.param( + False, + "ComputeResources", + ["Scheduling", "SlurmQueues[queue1]"], + {"Name": "compute-to-remove", "InstanceType": "c5.large", "MinCount": 1}, + None, + QueueUpdateStrategy.TERMINATE.value, + [], + True, + id="running fleet and compute is removed with TERMINATE update strategy", + ), + pytest.param( + False, + "ComputeResources", + ["Scheduling", "SlurmQueues[queue1]"], + {"Name": "compute-to-remove", "InstanceType": "c5.large", "MinCount": 1}, + None, + QueueUpdateStrategy.DRAIN.value, + [], + False, + id="running fleet and compute is removed with DRAIN update strategy", + ), + pytest.param( + False, + "MaxCount", + ["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + 10, + 0, + None, + [], + False, + id="running fleet and max count is decreased wo update strategy", + ), + pytest.param( + False, + "MaxCount", + ["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + 10, + 0, + QueueUpdateStrategy.TERMINATE.value, + [], + True, + id="running fleet and max count is decreased with TERMINATE update strategy", + ), + pytest.param( + False, + "MaxCount", + ["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + 10, + 0, + QueueUpdateStrategy.DRAIN.value, + [], + False, + id="running fleet and max count is decreased with DRAIN update strategy", + ), + pytest.param( + False, + "MaxCount", + ["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + 0, + 10, + None, + [], + True, + id="running fleet and max count is increased", + ), + pytest.param( + False, + "MaxCount", + ["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + None, + 10, + None, + [], + True, + id="running fleet and max count is increased (initial value was not set)", + ), + pytest.param( + False, + "MinCount", + ["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + 10, + 0, + None, + [], + False, + id="running fleet and min count is decreased wo update strategy", + ), + pytest.param( + False, + "MinCount", + ["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + 10, + 0, + QueueUpdateStrategy.TERMINATE.value, + [], + True, + id="running fleet and min count is decreased with TERMINATE update strategy", + ), + pytest.param( + False, + "MinCount", + ["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + 10, + 0, + QueueUpdateStrategy.DRAIN.value, + [], + False, + id="running fleet and min count is decreased with DRAIN update strategy", + ), + pytest.param( + False, + "MinCount", + ["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + 0, + 10, + None, + [], + False, + id="running fleet and min count is increased wo update strategy", + ), + pytest.param( + False, + "MinCount", + ["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + 0, + 10, + QueueUpdateStrategy.TERMINATE.value, + [], + True, + id="running fleet and min count is increased with TERMINATE update strategy", + ), + pytest.param( + False, + "MinCount", + ["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + 0, + 10, + QueueUpdateStrategy.DRAIN.value, + [], + False, + id="running fleet and min count is increased with DRAIN update strategy", + ), + pytest.param( + False, + "MinCount", + ["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + 0, + 10, + None, + [ + Change( + path=["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + key="MaxCount", + old_value="0", + new_value="1", + update_policy={}, + is_list=False, + ) + ], + False, + id="running fleet and min > max count are increased wo update strategy", + ), + pytest.param( + False, + "MinCount", + ["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + 0, + 10, + None, + [ + Change( + path=["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + key="MaxCount", + old_value=None, + new_value="1", + update_policy={}, + is_list=False, + ) + ], + False, + id="running fleet and min > max count are increased wo update strategy (max count old value not set)", + ), + pytest.param( + False, + "MinCount", + ["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + 0, + 10, + None, + [ + Change( + path=["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + key="MaxCount", + old_value="0", + new_value="10", + update_policy={}, + is_list=False, + ) + ], + True, + id="running fleet and min = max count are increased wo update strategy", + ), + pytest.param( + False, + "MinCount", + ["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + None, + 10, + None, + [ + Change( + path=["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + key="MaxCount", + old_value=None, + new_value="10", + update_policy={}, + is_list=False, + ) + ], + True, + id="running fleet and min = max count are increased wo update strategy " + "(both min and max count old value not set)", + ), + pytest.param( + False, + "MinCount", + ["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + 0, + 10, + None, + [ + Change( + path=["Scheduling", "SlurmQueues[queue1], ComputeResources[compute1]"], + key="MaxCount", + old_value="0", + new_value="20", + update_policy={}, + is_list=False, + ) + ], + True, + id="running fleet and min < max count are increased wo update strategy", + ), + ], +) +def test_condition_checker_resize_update_strategy_on_remove( + mocker, is_fleet_stopped, key, path, old_value, new_value, update_strategy, other_changes, expected_result +): + cluster = dummy_cluster() + cluster_has_running_capacity_mock = mocker.patch.object( + cluster, "has_running_capacity", return_value=not is_fleet_stopped + ) + + patch_mock = mocker.MagicMock() + patch_mock.cluster = cluster + patch_mock.target_config = ( + {"Scheduling": {"SlurmSettings": {"QueueUpdateStrategy": update_strategy}}} + if update_strategy + else {"Scheduling": {"SlurmSettings": {}}} + ) + if [other_changes]: + patch_mock.changes = other_changes + change_mock = mocker.MagicMock() + change_mock.path = path + change_mock.key = key + change_mock.old_value = old_value + change_mock.new_value = new_value + + assert_that(UpdatePolicy.RESIZE_UPDATE_STRATEGY_ON_REMOVE.condition_checker(change_mock, patch_mock)).is_equal_to( + expected_result + ) + assert_that(UpdatePolicy.RESIZE_UPDATE_STRATEGY_ON_REMOVE.fail_reason(change_mock, patch_mock)).is_equal_to( + "All compute nodes must be stopped or QueueUpdateStrategy must be set to TERMINATE" + ) + assert_that(UpdatePolicy.RESIZE_UPDATE_STRATEGY_ON_REMOVE.action_needed(change_mock, patch_mock)).is_equal_to( + "Stop the compute fleet with the pcluster update-compute-fleet command, or set QueueUpdateStrategy to " + "TERMINATE in the configuration used for the 'update-cluster' operation. Be aware that this update will remove " + "nodes from the scheduler and terminates the EC2 instances associated. Jobs running on the removed nodes will " + "terminate" + ) + cluster_has_running_capacity_mock.assert_called() + + @pytest.mark.parametrize( "is_fleet_stopped, key, path, old_value, new_value, update_strategy, expected_result", [ @@ -303,95 +770,43 @@ def test_queue_update_strategy_condition_checker( [ pytest.param( True, - "SlurmQueues", - ["Scheduling"], - None, - { - "Name": "queue-added", - "Networking": {"SubnetIds": "subnet-12345678"}, - "ComputeResources": {"Name": "compute-added", "InstanceType": "c5.9xlarge"}, - }, - True, - id="stopped fleet and queue is added", - ), - pytest.param( - True, - "SlurmQueues", - ["Scheduling"], - { - "Name": "queue-removed", - "Networking": {"SubnetIds": "subnet-12345678"}, - "ComputeResources": {"Name": "compute-removed", "InstanceType": "c5.9xlarge"}, - }, - None, - True, - id="stopped fleet and queue is removed", - ), - pytest.param( - True, - "ComputeResources", - ["Scheduling", "SlurmQueues[queue1]"], + "Instances", + ["Scheduling", "SlurmQueues[queue1]", "ComputeResources[compute-resource1]"], None, - {"Name": "compute-added", "InstanceType": "c5.large", "MinCount": 1}, + {"InstanceType": "c5.9xlarge"}, True, - id="stopped fleet and compute is added", + id="stopped fleet and instance type is added", ), pytest.param( True, - "ComputeResources", - ["Scheduling", "SlurmQueues[queue1]"], - {"Name": "compute-removed", "InstanceType": "c5.large", "MinCount": 1}, + "Instances", + ["Scheduling", "SlurmQueues[queue1]", "ComputeResources[compute-resource1]"], + {"InstanceType": "c5.9xlarge"}, None, True, - id="stopped fleet and compute is removed", + id="stopped fleet and instance type is removed", ), pytest.param( False, - "SlurmQueues", - ["Scheduling"], + "Instances", + ["Scheduling", "SlurmQueues[queue1]", "ComputeResources[compute-resource1]"], None, - { - "Name": "queue-added", - "Networking": {"SubnetIds": "subnet-12345678"}, - "ComputeResources": {"Name": "compute-added", "InstanceType": "c5.9xlarge"}, - }, + {"InstanceType": "c5.9xlarge"}, True, - id="running fleet and queue is added", - ), - pytest.param( - False, - "SlurmQueues", - ["Scheduling"], - { - "Name": "queue-removed", - "Networking": {"SubnetIds": "subnet-12345678"}, - "ComputeResources": {"Name": "compute-removed", "InstanceType": "c5.9xlarge"}, - }, - None, - False, - id="running fleet and queue is removed", + id="running fleet and instance type is added", ), pytest.param( False, - "ComputeResources", - ["Scheduling", "SlurmQueues[queue1]"], - None, - {"Name": "compute-added", "InstanceType": "c5.large", "MinCount": 1}, - True, - id="running fleet and compute is added", - ), - pytest.param( - False, - "ComputeResources", - ["Scheduling", "SlurmQueues[queue1]"], - {"Name": "compute-removed", "InstanceType": "c5.large", "MinCount": 1}, + "Instances", + ["Scheduling", "SlurmQueues[queue1]", "ComputeResources[compute-resource1]"], + {"InstanceType": "c5.9xlarge"}, None, False, - id="running fleet and compute is removed", + id="running fleet and instance type is removed", ), ], ) -def test_compute_fleet_stop_on_remove_condition_checker( +def test_condition_checker_compute_fleet_stop_on_remove( mocker, is_fleet_stopped, key, path, old_value, new_value, expected_result ): cluster = dummy_cluster() @@ -456,9 +871,9 @@ def test_queue_update_strategy_fail_reason_and_actions_needed( "SlurmQueues", ["Scheduling"], { - "Name": "queue-removed", + "Name": "queue-to-remove", "Networking": {"SubnetIds": "subnet-12345678"}, - "ComputeResources": {"Name": "compute-removed", "InstanceType": "c5.9xlarge"}, + "ComputeResources": {"Name": "compute-to-remove", "InstanceType": "c5.9xlarge"}, }, None, "All compute nodes must be stopped", diff --git a/tests/integration-tests/tests/update/test_update.py b/tests/integration-tests/tests/update/test_update.py index 9d46fc4cc2..1dcb43b314 100644 --- a/tests/integration-tests/tests/update/test_update.py +++ b/tests/integration-tests/tests/update/test_update.py @@ -37,7 +37,12 @@ ) from tests.common.assertions import assert_lines_in_logs, assert_no_msg_in_logs -from tests.common.hit_common import assert_compute_node_states, assert_initial_conditions, wait_for_compute_nodes_states +from tests.common.hit_common import ( + assert_compute_node_states, + assert_initial_conditions, + get_partition_nodes, + wait_for_compute_nodes_states, +) from tests.common.scaling_common import get_batch_ce, get_batch_ce_max_size, get_batch_ce_min_size from tests.common.schedulers_common import SlurmCommands from tests.common.storage.assertions import assert_storage_existence @@ -749,6 +754,16 @@ def test_queue_parameters_update( queue_update_strategy, ) + _test_update_resize( + scheduler_commands, + pcluster_config_reader, + pcluster_ami_id, + pcluster_copy_ami_id, + updated_compute_root_volume_size, + cluster, + queue_update_strategy, + ) + # assert queue drain strategy doesn't trigger protected mode assert_no_msg_in_logs( remote_command_executor, @@ -794,7 +809,7 @@ def _test_update_queue_strategy_without_running_job( ): """Test queue parameter update with drain stragegy without running job in the queue.""" updated_config_file = pcluster_config_reader( - config_file="pcluster.update_drain_without_running_job.yaml", + config_file="pcluster.config.update_without_running_job.yaml", global_custom_ami=pcluster_ami_id, updated_compute_root_volume_size=updated_compute_root_volume_size, queue_update_strategy=queue_update_strategy, @@ -853,7 +868,7 @@ def _test_update_queue_strategy_with_running_job( scheduler_commands.wait_job_running(queue2_job_id) updated_config_file = pcluster_config_reader( - config_file="pcluster.config.update_drain.yaml", + config_file="pcluster.config.update_with_running_job.yaml", global_custom_ami=pcluster_ami_id, custom_ami=pcluster_copy_ami_id, updated_compute_root_volume_size=updated_compute_root_volume_size, @@ -905,6 +920,56 @@ def _test_update_queue_strategy_with_running_job( assert_compute_node_states(scheduler_commands, queue1_nodes, "idle") +def _test_update_resize( + scheduler_commands, + pcluster_config_reader, + pcluster_ami_id, + pcluster_copy_ami_id, + updated_compute_root_volume_size, + cluster, + queue_update_strategy, +): + # Submit job in static node, eventually removed with TERMINATE strategy + queue1_job_id = scheduler_commands.submit_command_and_assert_job_accepted( + submit_command_args={ + "command": "srun sleep 3000", + "nodes": 1, + "partition": "queue1", + } + ) + # Wait for the job to run + scheduler_commands.wait_job_running(queue1_job_id) + + # prepare new cluster config with static nodes removed from all queues + updated_config_file = pcluster_config_reader( + config_file="pcluster.config.update_resize.yaml", + global_custom_ami=pcluster_ami_id, + custom_ami=pcluster_copy_ami_id, + updated_compute_root_volume_size=updated_compute_root_volume_size, + queue_update_strategy=queue_update_strategy, + ) + + if queue_update_strategy == "DRAIN": + """Test update resize doesn't support DRAIN strategy, update will fail.""" + response = cluster.update(str(updated_config_file), raise_on_error=False) + assert_that(response["message"]).is_equal_to("Update failure") + assert_that(response.get("updateValidationErrors")[0].get("message")).contains( + "All compute nodes must be stopped or QueueUpdateStrategy must be set" + ) + # assert that job running on static nodes not removed stays running + scheduler_commands.assert_job_state(queue1_job_id, "RUNNING") + elif queue_update_strategy == "TERMINATE": + """Test update resize with TERMINATE strategy, static nodes removed.""" + cluster.update(str(updated_config_file)) + # assert that static nodes are removed + nodes_in_scheduler = scheduler_commands.get_compute_nodes(all_nodes=True) + static_nodes, dynamic_nodes = get_partition_nodes(nodes_in_scheduler) + assert_that(len(static_nodes)).is_equal_to(0) + assert_that(len(dynamic_nodes)).is_equal_to(4) + # assert that job running on static nodes removed with the update is re-queued + scheduler_commands.assert_job_state(queue1_job_id, "PENDING") + + @pytest.fixture def external_shared_storage_stack(request, test_datadir, region, vpc_stack: CfnVpcStack, cfn_stacks_factory): def create_stack(bucket_name, file_cache_path): diff --git a/tests/integration-tests/tests/update/test_update/test_queue_parameters_update/pcluster.config.update_resize.yaml b/tests/integration-tests/tests/update/test_update/test_queue_parameters_update/pcluster.config.update_resize.yaml new file mode 100644 index 0000000000..5dbeb6b272 --- /dev/null +++ b/tests/integration-tests/tests/update/test_update/test_queue_parameters_update/pcluster.config.update_resize.yaml @@ -0,0 +1,40 @@ +Image: + Os: {{ os }} + CustomAmi: {{ global_custom_ami }} +HeadNode: + InstanceType: {{ instance }} + Networking: + SubnetId: {{ public_subnet_id }} + Ssh: + KeyName: {{ key_name }} +Scheduling: + SlurmSettings: + QueueUpdateStrategy: {{ queue_update_strategy }} + Scheduler: slurm + SlurmQueues: + - Name: queue1 + ComputeResources: + - Name: queue1-i1 + Instances: + - InstanceType: c5.xlarge + MinCount: 0 + MaxCount: 2 + Networking: + SubnetIds: + - {{ private_subnet_id }} + ComputeSettings: + LocalStorage: + RootVolume: + Size: {{ updated_compute_root_volume_size }} + - Name: queue2 + ComputeResources: + - Name: queue2-i1 + Instances: + - InstanceType: c5.xlarge + MinCount: 0 + MaxCount: 2 + Networking: + SubnetIds: + - {{ private_subnet_id }} + Image: + CustomAmi: {{ custom_ami }} \ No newline at end of file diff --git a/tests/integration-tests/tests/update/test_update/test_queue_parameters_update/pcluster.config.update_drain.yaml b/tests/integration-tests/tests/update/test_update/test_queue_parameters_update/pcluster.config.update_with_running_job.yaml similarity index 100% rename from tests/integration-tests/tests/update/test_update/test_queue_parameters_update/pcluster.config.update_drain.yaml rename to tests/integration-tests/tests/update/test_update/test_queue_parameters_update/pcluster.config.update_with_running_job.yaml diff --git a/tests/integration-tests/tests/update/test_update/test_queue_parameters_update/pcluster.update_drain_without_running_job.yaml b/tests/integration-tests/tests/update/test_update/test_queue_parameters_update/pcluster.config.update_without_running_job.yaml similarity index 88% rename from tests/integration-tests/tests/update/test_update/test_queue_parameters_update/pcluster.update_drain_without_running_job.yaml rename to tests/integration-tests/tests/update/test_update/test_queue_parameters_update/pcluster.config.update_without_running_job.yaml index 56621f205d..f35f3cabdb 100644 --- a/tests/integration-tests/tests/update/test_update/test_queue_parameters_update/pcluster.update_drain_without_running_job.yaml +++ b/tests/integration-tests/tests/update/test_update/test_queue_parameters_update/pcluster.config.update_without_running_job.yaml @@ -23,9 +23,9 @@ Scheduling: SubnetIds: - {{ private_subnet_id }} ComputeSettings: - LocalStorage: - RootVolume: - Size: {{ updated_compute_root_volume_size }} + LocalStorage: + RootVolume: + Size: {{ updated_compute_root_volume_size }} - Name: queue2 ComputeResources: - Name: queue2-i1