From 5de49a11fb06eddc8682de03e09d7dc6d6ab3961 Mon Sep 17 00:00:00 2001 From: vadasambar Date: Fri, 5 May 2023 10:05:52 +0530 Subject: [PATCH] feat: support `--scale-down-delay-after-*` per nodegroup Signed-off-by: vadasambar feat: update scale down status after every scale up - move scaledown delay status to cluster state/registry - enable scale down if `ScaleDownDelayTypeLocal` is enabled - add new funcs on cluster state to get and update scale down delay status - use timestamp instead of booleans to track scale down delay status Signed-off-by: vadasambar refactor: use existing fields on clusterstate - uses `scaleUpRequests`, `scaleDownRequests` and `scaleUpFailures` instead of `ScaleUpDelayStatus` - changed the above existing fields a little to make them more convenient for use - moved initializing scale down delay processor to static autoscaler (because clusterstate is not available in main.go) Signed-off-by: vadasambar refactor: remove note saying only `scale-down-after-add` is supported - because we are supporting all the flags Signed-off-by: vadasambar fix: evaluate `scaleDownInCooldown` the old way only if `ScaleDownDelayTypeLocal` is set to `false` Signed-off-by: vadasambar refactor: remove line saying `--scale-down-delay-type-local` is only supported for `--scale-down-delay-after-add` - because it is not true anymore - we are supporting all `--scale-down-delay-after-*` flags per nodegroup Signed-off-by: vadasambar test: fix clusterstate tests failing Signed-off-by: vadasambar refactor: move back initializing processors logic to from static autoscaler to main - we don't want to initialize processors in static autoscaler because anyone implementing an alternative to static_autoscaler has to initialize the processors - and initializing specific processors is making static autoscaler aware of an implementation detail which might not be the best practice Signed-off-by: vadasambar refactor: revert changes related to `clusterstate` - since I am going with observer pattern Signed-off-by: vadasambar feat: add observer interface for state of scaling - to implement observer pattern for tracking state of scale up/downs (as opposed to using clusterstate to do the same) - refactor `ScaleDownCandidatesDelayProcessor` to use fields from the new observer Signed-off-by: vadasambar refactor: remove params passed to `clearScaleUpFailures` - not needed anymore Signed-off-by: vadasambar refactor: revert clusterstate tests - approach has changed - I am not making any changes in clusterstate now Signed-off-by: vadasambar refactor: add accidentally deleted lines for clusterstate test Signed-off-by: vadasambar feat: implement `Add` fn for scale state observer - to easily add new observers - re-word comments - remove redundant params from `NewDefaultScaleDownCandidatesProcessor` Signed-off-by: vadasambar fix: CI complaining because no comments on fn definitions Signed-off-by: vadasambar feat: initialize parent `ScaleDownCandidatesProcessor` - instead of `ScaleDownCandidatesSortingProcessor` and `ScaleDownCandidatesDelayProcessor` separately Signed-off-by: vadasambar refactor: add scale state notifier to list of default processors - initialize processors for `NewDefaultScaleDownCandidatesProcessor` outside and pass them to the fn - this allows more flexibility Signed-off-by: vadasambar refactor: add observer interface - create a separate observer directory - implement `RegisterScaleUp` function in the clusterstate - TODO: resolve syntax errors Signed-off-by: vadasambar feat: use `scaleStateNotifier` in place of `clusterstate` - delete leftover `scale_stateA_observer.go` (new one is already present in `observers` directory) - register `clustertstate` with `scaleStateNotifier` - use `Register` instead of `Add` function in `scaleStateNotifier` - fix `go build` - wip: fixing tests Signed-off-by: vadasambar test: fix syntax errors - add utils package `pointers` for converting `time` to pointer (without having to initialize a new variable) Signed-off-by: vadasambar feat: wip track scale down failures along with scale up failures - I was tracking scale up failures but not scale down failures - fix copyright year 2017 -> 2023 for the new `pointers` package Signed-off-by: vadasambar feat: register failed scale down with scale state notifier - wip writing tests for `scale_down_candidates_delay_processor` - fix CI lint errors - remove test file for `scale_down_candidates_processor` (there is not much to test as of now) Signed-off-by: vadasambar test: wip tests for `ScaleDownCandidatesDelayProcessor` Signed-off-by: vadasambar test: add unit tests for `ScaleDownCandidatesDelayProcessor` Signed-off-by: vadasambar refactor: don't track scale up failures in `ScaleDownCandidatesDelayProcessor` - not needed Signed-off-by: vadasambar test: better doc comments for `TestGetScaleDownCandidates` Signed-off-by: vadasambar refactor: don't ignore error in `NGChangeObserver` - return it instead and let the caller decide what to do with it Signed-off-by: vadasambar refactor: change pointers to values in `NGChangeObserver` interface - easier to work with - remove `expectedAddTime` param from `RegisterScaleUp` (not needed for now) - add tests for clusterstate's `RegisterScaleUp` Signed-off-by: vadasambar refactor: conditions in `GetScaleDownCandidates` - set scale down in cool down if the number of scale down candidates is 0 Signed-off-by: vadasambar test: use `ng1` instead of `ng2` in existing test Signed-off-by: vadasambar feat: wip static autoscaler tests Signed-off-by: vadasambar refactor: assign directly instead of using `sdProcessor` variable - variable is not needed Signed-off-by: vadasambar test: first working test for static autoscaler Signed-off-by: vadasambar test: continue working on static autoscaler tests Signed-off-by: vadasambar test: wip second static autoscaler test Signed-off-by: vadasambar refactor: remove `Println` used for debugging Signed-off-by: vadasambar test: add static_autoscaler tests for scale down delay per nodegroup flags Signed-off-by: vadasambar chore: rebase off the latest `master` - change scale state observer interface's `RegisterFailedScaleup` to reflect latest changes around clusterstate's `RegisterFailedScaleup` in `master` Signed-off-by: vadasambar test: fix clusterstate test failing Signed-off-by: vadasambar test: fix failing orchestrator test Signed-off-by: vadasambar refactor: rename `defaultScaleDownCandidatesProcessor` -> `combinedScaleDownCandidatesProcessor` - describes the processor better Signed-off-by: vadasambar refactor: replace `NGChangeObserver` -> `NodeGroupChangeObserver` - makes it easier to understand for someone not familiar with the codebase Signed-off-by: vadasambar docs: reword code comment `after` -> `for which` Signed-off-by: vadasambar refactor: don't return error from `RegisterScaleDown` - not needed as of now (no implementer function returns a non-nil error for this function) Signed-off-by: vadasambar refactor: address review comments around ng change observer interface - change dir structure of nodegroup change observer package - stop returning errors wherever it is not needed in the nodegroup change observer interface - rename `NGChangeObserver` -> `NodeGroupChangeObserver` interface (makes it easier to understand) Signed-off-by: vadasambar refactor: make nodegroupchange observer thread-safe Signed-off-by: vadasambar docs: add TODO to consider using multiple mutexes in nodegroupchange observer Signed-off-by: vadasambar refactor: use `time.Now()` directly instead of assigning a variable to it Signed-off-by: vadasambar refactor: share code for checking if there was a recent scale-up/down/failure Signed-off-by: vadasambar test: convert `ScaleDownCandidatesDelayProcessor` into table tests Signed-off-by: vadasambar refactor: change scale state notifier's `Register()` -> `RegisterForNotifications()` - makes it easier to understand what the function does Signed-off-by: vadasambar test: replace scale state notifier `Register` -> `RegisterForNotifications` in test - to fix syntax errors since it is already renamed in the actual code Signed-off-by: vadasambar refactor: remove `clusterStateRegistry` from `delete_in_batch` tests - not needed anymore since we have `scaleStateNotifier` Signed-off-by: vadasambar refactor: address PR review comments Signed-off-by: vadasambar fix: add empty `RegisterFailedScaleDown` for clusterstate - fix syntax error in static autoscaler test Signed-off-by: vadasambar --- .../clusterstate/clusterstate.go | 27 ++- .../clusterstate/clusterstate_test.go | 41 ++-- .../config/autoscaling_options.go | 3 + cluster-autoscaler/config/const.go | 7 +- .../core/scaledown/actuation/actuator.go | 10 +- .../core/scaledown/actuation/actuator_test.go | 14 +- .../scaledown/actuation/delete_in_batch.go | 32 +-- .../actuation/delete_in_batch_test.go | 13 +- .../core/scaleup/orchestrator/executor.go | 22 +- .../core/scaleup/orchestrator/orchestrator.go | 2 +- .../scaleup/orchestrator/orchestrator_test.go | 1 + cluster-autoscaler/core/static_autoscaler.go | 22 +- .../core/static_autoscaler_test.go | 217 ++++++++++++++++++ cluster-autoscaler/core/test/common.go | 2 + cluster-autoscaler/main.go | 20 +- .../nodegroupchange/scale_state_observer.go | 100 ++++++++ cluster-autoscaler/processors/processors.go | 8 + .../scale_down_candidates_delay_processor.go | 124 ++++++++++ ...le_down_candidates_delay_processor_test.go | 151 ++++++++++++ .../scale_down_candidates_processor.go | 72 ++++++ 20 files changed, 798 insertions(+), 90 deletions(-) create mode 100644 cluster-autoscaler/observers/nodegroupchange/scale_state_observer.go create mode 100644 cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_delay_processor.go create mode 100644 cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_delay_processor_test.go create mode 100644 cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_processor.go diff --git a/cluster-autoscaler/clusterstate/clusterstate.go b/cluster-autoscaler/clusterstate/clusterstate.go index 67deb6595c4c..b8ba1d7e1b0f 100644 --- a/cluster-autoscaler/clusterstate/clusterstate.go +++ b/cluster-autoscaler/clusterstate/clusterstate.go @@ -188,13 +188,8 @@ func (csr *ClusterStateRegistry) Stop() { close(csr.interrupt) } -// RegisterOrUpdateScaleUp registers scale-up for give node group or changes requested node increase -// count. -// If delta is positive then number of new nodes requested is increased; Time and expectedAddTime -// are reset. -// If delta is negative the number of new nodes requested is decreased; Time and expectedAddTime are -// left intact. -func (csr *ClusterStateRegistry) RegisterOrUpdateScaleUp(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) { +// RegisterScaleUp registers scale-up for give node group +func (csr *ClusterStateRegistry) RegisterScaleUp(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) { csr.Lock() defer csr.Unlock() csr.registerOrUpdateScaleUpNoLock(nodeGroup, delta, currentTime) @@ -246,7 +241,14 @@ func (csr *ClusterStateRegistry) registerOrUpdateScaleUpNoLock(nodeGroup cloudpr } // RegisterScaleDown registers node scale down. -func (csr *ClusterStateRegistry) RegisterScaleDown(request *ScaleDownRequest) { +func (csr *ClusterStateRegistry) RegisterScaleDown(nodeGroup cloudprovider.NodeGroup, + nodeName string, currentTime time.Time, expectedDeleteTime time.Time) { + request := &ScaleDownRequest{ + NodeGroup: nodeGroup, + NodeName: nodeName, + Time: currentTime, + ExpectedDeleteTime: expectedDeleteTime, + } csr.Lock() defer csr.Unlock() csr.scaleDownRequests = append(csr.scaleDownRequests, request) @@ -310,16 +312,21 @@ func (csr *ClusterStateRegistry) backoffNodeGroup(nodeGroup cloudprovider.NodeGr // RegisterFailedScaleUp should be called after getting error from cloudprovider // when trying to scale-up node group. It will mark this group as not safe to autoscale // for some time. -func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, reason metrics.FailedScaleUpReason, errorMessage, gpuResourceName, gpuType string, currentTime time.Time) { +func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, reason string, errorMessage, gpuResourceName, gpuType string, currentTime time.Time) { csr.Lock() defer csr.Unlock() - csr.registerFailedScaleUpNoLock(nodeGroup, reason, cloudprovider.InstanceErrorInfo{ + csr.registerFailedScaleUpNoLock(nodeGroup, metrics.FailedScaleUpReason(reason), cloudprovider.InstanceErrorInfo{ ErrorClass: cloudprovider.OtherErrorClass, ErrorCode: string(reason), ErrorMessage: errorMessage, }, gpuResourceName, gpuType, currentTime) } +// RegisterFailedScaleDown records failed scale-down for a nodegroup. +// We don't need to implement this function for cluster state registry +func (csr *ClusterStateRegistry) RegisterFailedScaleDown(_ cloudprovider.NodeGroup, _ string, _ time.Time) { +} + func (csr *ClusterStateRegistry) registerFailedScaleUpNoLock(nodeGroup cloudprovider.NodeGroup, reason metrics.FailedScaleUpReason, errorInfo cloudprovider.InstanceErrorInfo, gpuResourceName, gpuType string, currentTime time.Time) { csr.scaleUpFailures[nodeGroup.Id()] = append(csr.scaleUpFailures[nodeGroup.Id()], ScaleUpFailure{NodeGroup: nodeGroup, Reason: reason, Time: currentTime}) metrics.RegisterFailedScaleUp(reason, gpuResourceName, gpuType) diff --git a/cluster-autoscaler/clusterstate/clusterstate_test.go b/cluster-autoscaler/clusterstate/clusterstate_test.go index a57a6a7aceaa..52f2952d6807 100644 --- a/cluster-autoscaler/clusterstate/clusterstate_test.go +++ b/cluster-autoscaler/clusterstate/clusterstate_test.go @@ -31,6 +31,7 @@ import ( testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" "k8s.io/autoscaler/cluster-autoscaler/clusterstate/api" "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" + "k8s.io/autoscaler/cluster-autoscaler/utils/taints" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" "k8s.io/client-go/kubernetes/fake" @@ -75,7 +76,7 @@ func TestOKWithScaleUp(t *testing.T) { MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: time.Minute})) - clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 4, time.Now()) + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 4, time.Now()) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) @@ -122,7 +123,7 @@ func TestEmptyOK(t *testing.T) { assert.False(t, clusterstate.HasNodeGroupStartedScaleUp("ng1")) provider.AddNodeGroup("ng1", 0, 10, 3) - clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 3, now.Add(-3*time.Second)) + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 3, now.Add(-3*time.Second)) // clusterstate.scaleUpRequests["ng1"].Time = now.Add(-3 * time.Second) // clusterstate.scaleUpRequests["ng1"].ExpectedAddTime = now.Add(1 * time.Minute) err = clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now) @@ -161,7 +162,7 @@ func TestHasNodeGroupStartedScaleUp(t *testing.T) { assert.False(t, clusterstate.HasNodeGroupStartedScaleUp("ng1")) provider.AddNodeGroup("ng1", 0, 5, tc.initialSize+tc.delta) - clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), tc.delta, now.Add(-3*time.Second)) + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), tc.delta, now.Add(-3*time.Second)) err = clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsNodeGroupScalingUp("ng1")) @@ -450,7 +451,7 @@ func TestExpiredScaleUp(t *testing.T) { MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 2 * time.Minute})) - clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 4, now.Add(-3*time.Minute)) + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 4, now.Add(-3*time.Minute)) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) @@ -476,13 +477,7 @@ func TestRegisterScaleDown(t *testing.T) { OkTotalUnreadyCount: 1, }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute})) now := time.Now() - - clusterstate.RegisterScaleDown(&ScaleDownRequest{ - NodeGroup: provider.GetNodeGroup("ng1"), - NodeName: "ng1-1", - ExpectedDeleteTime: now.Add(time.Minute), - Time: now, - }) + clusterstate.RegisterScaleDown(provider.GetNodeGroup("ng1"), "ng1-1", now.Add(time.Minute), now) assert.Equal(t, 1, len(clusterstate.scaleDownRequests)) clusterstate.updateScaleRequests(now.Add(5 * time.Minute)) assert.Equal(t, 0, len(clusterstate.scaleDownRequests)) @@ -794,7 +789,7 @@ func TestScaleUpBackoff(t *testing.T) { }, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 120 * time.Second})) // After failed scale-up, node group should be still healthy, but should backoff from scale-ups - clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-180*time.Second)) + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-180*time.Second)) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, nil, now) assert.NoError(t, err) assert.True(t, clusterstate.IsClusterHealthy()) @@ -826,7 +821,7 @@ func TestScaleUpBackoff(t *testing.T) { assert.Equal(t, NodeGroupScalingSafety{SafeToScale: true, Healthy: true}, clusterstate.NodeGroupScaleUpSafety(ng1, now)) // Another failed scale up should cause longer backoff - clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-121*time.Second)) + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-121*time.Second)) err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, nil, now) assert.NoError(t, err) @@ -860,7 +855,7 @@ func TestScaleUpBackoff(t *testing.T) { }, clusterstate.NodeGroupScaleUpSafety(ng1, now)) // The backoff should be cleared after a successful scale-up - clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 1, now) + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 1, now) ng1_4 := BuildTestNode("ng1-4", 1000, 1000) SetNodeReadyState(ng1_4, true, now.Add(-1*time.Minute)) provider.AddNode("ng1", ng1_4) @@ -935,6 +930,7 @@ func TestUpdateScaleUp(t *testing.T) { provider := testprovider.NewTestCloudProvider(nil, nil) provider.AddNodeGroup("ng1", 1, 10, 5) + provider.AddNodeGroup("ng2", 1, 10, 5) fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") clusterstate := NewClusterStateRegistry( @@ -948,29 +944,30 @@ func TestUpdateScaleUp(t *testing.T) { nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 10 * time.Second}), ) - clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 100, now) + // Test cases for `RegisterScaleUp` + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 100, now) assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 100) assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, now) assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, now.Add(10*time.Second)) // expect no change of times on negative delta - clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -20, later) + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -20, later) assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 80) assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, now) assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, now.Add(10*time.Second)) // update times on positive delta - clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 30, later) + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), 30, later) assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Increase, 110) assert.Equal(t, clusterstate.scaleUpRequests["ng1"].Time, later) assert.Equal(t, clusterstate.scaleUpRequests["ng1"].ExpectedAddTime, later.Add(10*time.Second)) // if we get below 0 scalup is deleted - clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -200, now) + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -200, now) assert.Nil(t, clusterstate.scaleUpRequests["ng1"]) // If new scalup is registered with negative delta nothing should happen - clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), -200, now) + clusterstate.RegisterScaleUp(provider.GetNodeGroup("ng1"), -200, now) assert.Nil(t, clusterstate.scaleUpRequests["ng1"]) } @@ -986,9 +983,9 @@ func TestScaleUpFailures(t *testing.T) { fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute})) - clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), metrics.Timeout, "", "", "", now) - clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng2"), metrics.Timeout, "", "", "", now) - clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), metrics.APIError, "", "", "", now.Add(time.Minute)) + clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), string(metrics.Timeout), "", "", "", now) + clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng2"), string(metrics.Timeout), "", "", "", now) + clusterstate.RegisterFailedScaleUp(provider.GetNodeGroup("ng1"), string(metrics.APIError), "", "", "", now.Add(time.Minute)) failures := clusterstate.GetScaleUpFailures() assert.Equal(t, map[string][]ScaleUpFailure{ diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index c2324cd3f7af..b335c05dcb28 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -158,6 +158,9 @@ type AutoscalingOptions struct { ScaleDownDelayAfterDelete time.Duration // ScaleDownDelayAfterFailure sets the duration before the next scale down attempt if scale down results in an error ScaleDownDelayAfterFailure time.Duration + // ScaleDownDelayTypeLocal sets if the --scale-down-delay-after-* flags should be applied locally per nodegroup + // or globally across all nodegroups + ScaleDownDelayTypeLocal bool // ScaleDownNonEmptyCandidatesCount is the maximum number of non empty nodes // considered at once as candidates for scale down. ScaleDownNonEmptyCandidatesCount int diff --git a/cluster-autoscaler/config/const.go b/cluster-autoscaler/config/const.go index 1025ac9059d7..d599def6fd32 100644 --- a/cluster-autoscaler/config/const.go +++ b/cluster-autoscaler/config/const.go @@ -40,7 +40,8 @@ const ( DefaultMaxNodeProvisionTimeKey = "maxnodeprovisiontime" // DefaultIgnoreDaemonSetsUtilizationKey identifies IgnoreDaemonSetsUtilization autoscaling option DefaultIgnoreDaemonSetsUtilizationKey = "ignoredaemonsetsutilization" - // DefaultScaleDownUnneededTime identifies ScaleDownUnneededTime autoscaling option + + // DefaultScaleDownUnneededTime is the default time duration for which CA waits before deleting an unneeded node DefaultScaleDownUnneededTime = 10 * time.Minute // DefaultScaleDownUnreadyTime identifies ScaleDownUnreadyTime autoscaling option DefaultScaleDownUnreadyTime = 20 * time.Minute @@ -48,4 +49,8 @@ const ( DefaultScaleDownUtilizationThreshold = 0.5 // DefaultScaleDownGpuUtilizationThreshold identifies ScaleDownGpuUtilizationThreshold autoscaling option DefaultScaleDownGpuUtilizationThreshold = 0.5 + // DefaultScaleDownDelayAfterFailure is the default value for ScaleDownDelayAfterFailure autoscaling option + DefaultScaleDownDelayAfterFailure = 3 * time.Minute + // DefaultScanInterval is the default scan interval for CA + DefaultScanInterval = 10 * time.Second ) diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator.go b/cluster-autoscaler/core/scaledown/actuation/actuator.go index b80a0a7856e3..482853260600 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator.go @@ -22,7 +22,6 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" - "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets" @@ -31,6 +30,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" @@ -45,7 +45,6 @@ import ( // Actuator is responsible for draining and deleting nodes. type Actuator struct { ctx *context.AutoscalingContext - clusterState *clusterstate.ClusterStateRegistry nodeDeletionTracker *deletiontracker.NodeDeletionTracker nodeDeletionScheduler *GroupDeletionScheduler deleteOptions options.NodeDeleteOptions @@ -66,8 +65,8 @@ type actuatorNodeGroupConfigGetter interface { } // NewActuator returns a new instance of Actuator. -func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator { - ndb := NewNodeDeletionBatcher(ctx, csr, ndt, ctx.NodeDeletionBatcherInterval) +func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator { + ndb := NewNodeDeletionBatcher(ctx, scaleStateNotifier, ndt, ctx.NodeDeletionBatcherInterval) legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec) var evictor Evictor if len(ctx.DrainPriorityConfig) > 0 { @@ -77,7 +76,6 @@ func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterState } return &Actuator{ ctx: ctx, - clusterState: csr, nodeDeletionTracker: ndt, nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, evictor), budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx), @@ -102,7 +100,7 @@ func (a *Actuator) ClearResultsNotNewerThan(t time.Time) { func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) { a.nodeDeletionScheduler.ReportMetrics() deletionStartTime := time.Now() - defer func() { metrics.UpdateDuration(metrics.ScaleDownNodeDeletion, time.Now().Sub(deletionStartTime)) }() + defer func() { metrics.UpdateDuration(metrics.ScaleDownNodeDeletion, time.Since(deletionStartTime)) }() results, ts := a.nodeDeletionTracker.DeletionResults() scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: results, NodeDeleteResultsAsOf: ts} diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go index 5c35c0369841..3f468ceb4a24 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go @@ -40,6 +40,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" . "k8s.io/autoscaler/cluster-autoscaler/core/test" + "k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" @@ -1186,13 +1187,16 @@ func TestStartDeletion(t *testing.T) { wantScaleDownStatus.ScaledDownNodes = append(wantScaleDownStatus.ScaledDownNodes, statusScaledDownNode) } + scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList() + scaleStateNotifier.Register(csr) + // Create Actuator, run StartDeletion, and verify the error. ndt := deletiontracker.NewNodeDeletionTracker(0) - ndb := NewNodeDeletionBatcher(&ctx, csr, ndt, 0*time.Second) + ndb := NewNodeDeletionBatcher(&ctx, scaleStateNotifier, ndt, 0*time.Second) legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec) evictor := Evictor{EvictionRetryTime: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom, shutdownGracePeriodByPodPriority: legacyFlagDrainConfig, fullDsEviction: false} actuator := Actuator{ - ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt, + ctx: &ctx, nodeDeletionTracker: ndt, nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor), budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx), configGetter: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(ctx.NodeGroupDefaults), @@ -1424,12 +1428,14 @@ func TestStartDeletionInBatchBasic(t *testing.T) { t.Fatalf("Couldn't set up autoscaling context: %v", err) } csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute})) + scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList() + scaleStateNotifier.Register(csr) ndt := deletiontracker.NewNodeDeletionTracker(0) - ndb := NewNodeDeletionBatcher(&ctx, csr, ndt, deleteInterval) + ndb := NewNodeDeletionBatcher(&ctx, scaleStateNotifier, ndt, deleteInterval) legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec) evictor := Evictor{EvictionRetryTime: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom, shutdownGracePeriodByPodPriority: legacyFlagDrainConfig} actuator := Actuator{ - ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt, + ctx: &ctx, nodeDeletionTracker: ndt, nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor), budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx), } diff --git a/cluster-autoscaler/core/scaledown/actuation/delete_in_batch.go b/cluster-autoscaler/core/scaledown/actuation/delete_in_batch.go index ad63736ade6b..48f406cca78a 100644 --- a/cluster-autoscaler/core/scaledown/actuation/delete_in_batch.go +++ b/cluster-autoscaler/core/scaledown/actuation/delete_in_batch.go @@ -25,6 +25,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" @@ -32,7 +33,6 @@ import ( apiv1 "k8s.io/api/core/v1" - "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" ) @@ -48,7 +48,7 @@ const ( type NodeDeletionBatcher struct { sync.Mutex ctx *context.AutoscalingContext - clusterState *clusterstate.ClusterStateRegistry + scaleStateNotifier nodegroupchange.NodeGroupChangeObserver nodeDeletionTracker *deletiontracker.NodeDeletionTracker deletionsPerNodeGroup map[string][]*apiv1.Node deleteInterval time.Duration @@ -56,14 +56,14 @@ type NodeDeletionBatcher struct { } // NewNodeDeletionBatcher return new NodeBatchDeleter -func NewNodeDeletionBatcher(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, nodeDeletionTracker *deletiontracker.NodeDeletionTracker, deleteInterval time.Duration) *NodeDeletionBatcher { +func NewNodeDeletionBatcher(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, nodeDeletionTracker *deletiontracker.NodeDeletionTracker, deleteInterval time.Duration) *NodeDeletionBatcher { return &NodeDeletionBatcher{ ctx: ctx, - clusterState: csr, nodeDeletionTracker: nodeDeletionTracker, deletionsPerNodeGroup: make(map[string][]*apiv1.Node), deleteInterval: deleteInterval, drainedNodeDeletions: make(map[string]bool), + scaleStateNotifier: scaleStateNotifier, } } @@ -85,13 +85,13 @@ func (d *NodeDeletionBatcher) AddNodes(nodes []*apiv1.Node, nodeGroup cloudprovi } func (d *NodeDeletionBatcher) deleteNodesAndRegisterStatus(nodes []*apiv1.Node, drain bool) { - nodeGroup, err := deleteNodesFromCloudProvider(d.ctx, nodes) + nodeGroup, err := deleteNodesFromCloudProvider(d.ctx, d.scaleStateNotifier, nodes) for _, node := range nodes { if err != nil { result := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err} CleanUpAndRecordFailedScaleDownEvent(d.ctx, node, nodeGroup.Id(), drain, d.nodeDeletionTracker, "", result) } else { - RegisterAndRecordSuccessfulScaleDownEvent(d.ctx, d.clusterState, node, nodeGroup, drain, d.nodeDeletionTracker) + RegisterAndRecordSuccessfulScaleDownEvent(d.ctx, d.scaleStateNotifier, node, nodeGroup, drain, d.nodeDeletionTracker) } } } @@ -129,14 +129,14 @@ func (d *NodeDeletionBatcher) remove(nodeGroupId string) error { go func(nodes []*apiv1.Node, drainedNodeDeletions map[string]bool) { var result status.NodeDeleteResult - nodeGroup, err := deleteNodesFromCloudProvider(d.ctx, nodes) + nodeGroup, err := deleteNodesFromCloudProvider(d.ctx, d.scaleStateNotifier, nodes) for _, node := range nodes { drain := drainedNodeDeletions[node.Name] if err != nil { result = status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err} CleanUpAndRecordFailedScaleDownEvent(d.ctx, node, nodeGroupId, drain, d.nodeDeletionTracker, "", result) } else { - RegisterAndRecordSuccessfulScaleDownEvent(d.ctx, d.clusterState, node, nodeGroup, drain, d.nodeDeletionTracker) + RegisterAndRecordSuccessfulScaleDownEvent(d.ctx, d.scaleStateNotifier, node, nodeGroup, drain, d.nodeDeletionTracker) } } }(nodes, drainedNodeDeletions) @@ -145,12 +145,15 @@ func (d *NodeDeletionBatcher) remove(nodeGroupId string) error { // deleteNodeFromCloudProvider removes the given nodes from cloud provider. No extra pre-deletion actions are executed on // the Kubernetes side. -func deleteNodesFromCloudProvider(ctx *context.AutoscalingContext, nodes []*apiv1.Node) (cloudprovider.NodeGroup, error) { +func deleteNodesFromCloudProvider(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, nodes []*apiv1.Node) (cloudprovider.NodeGroup, error) { nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(nodes[0]) if err != nil { return nodeGroup, errors.NewAutoscalerError(errors.CloudProviderError, "failed to find node group for %s: %v", nodes[0].Name, err) } if err := nodeGroup.DeleteNodes(nodes); err != nil { + scaleStateNotifier.RegisterFailedScaleDown(nodeGroup, + string(errors.CloudProviderError), + time.Now()) return nodeGroup, errors.NewAutoscalerError(errors.CloudProviderError, "failed to delete nodes from group %s: %v", nodeGroup.Id(), err) } return nodeGroup, nil @@ -193,14 +196,11 @@ func CleanUpAndRecordFailedScaleDownEvent(ctx *context.AutoscalingContext, node } // RegisterAndRecordSuccessfulScaleDownEvent register scale down and record successful scale down event. -func RegisterAndRecordSuccessfulScaleDownEvent(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup, drain bool, nodeDeletionTracker *deletiontracker.NodeDeletionTracker) { +func RegisterAndRecordSuccessfulScaleDownEvent(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup, drain bool, nodeDeletionTracker *deletiontracker.NodeDeletionTracker) { ctx.Recorder.Eventf(node, apiv1.EventTypeNormal, "ScaleDown", "nodes removed by cluster autoscaler") - csr.RegisterScaleDown(&clusterstate.ScaleDownRequest{ - NodeGroup: nodeGroup, - NodeName: node.Name, - Time: time.Now(), - ExpectedDeleteTime: time.Now().Add(MaxCloudProviderNodeDeletionTime), - }) + currentTime := time.Now() + expectedDeleteTime := time.Now().Add(MaxCloudProviderNodeDeletionTime) + scaleStateNotifier.RegisterScaleDown(nodeGroup, node.Name, currentTime, expectedDeleteTime) gpuConfig := ctx.CloudProvider.GetNodeGpuConfig(node) metricResourceName, metricGpuType := gpu.GetGpuInfoForMetrics(gpuConfig, ctx.CloudProvider.GetAvailableGPUTypes(), node, nodeGroup) metrics.RegisterScaleDown(1, metricResourceName, metricGpuType, nodeScaleDownReason(node, drain)) diff --git a/cluster-autoscaler/core/scaledown/actuation/delete_in_batch_test.go b/cluster-autoscaler/core/scaledown/actuation/delete_in_batch_test.go index 8b0ab49e8a59..6e1461d4ddd7 100644 --- a/cluster-autoscaler/core/scaledown/actuation/delete_in_batch_test.go +++ b/cluster-autoscaler/core/scaledown/actuation/delete_in_batch_test.go @@ -24,16 +24,13 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" - "k8s.io/autoscaler/cluster-autoscaler/clusterstate" - clusterstate_utils "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" . "k8s.io/autoscaler/cluster-autoscaler/core/test" - "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" + "k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" - kube_record "k8s.io/client-go/tools/record" ) func TestAddNodeToBucket(t *testing.T) { @@ -85,7 +82,7 @@ func TestAddNodeToBucket(t *testing.T) { for _, test := range testcases { d := NodeDeletionBatcher{ ctx: &ctx, - clusterState: nil, + scaleStateNotifier: nil, nodeDeletionTracker: nil, deletionsPerNodeGroup: make(map[string][]*apiv1.Node), drainedNodeDeletions: make(map[string]bool), @@ -139,7 +136,6 @@ func TestRemove(t *testing.T) { t.Run(test.name, func(t *testing.T) { test := test fakeClient := &fake.Clientset{} - fakeLogRecorder, _ := clusterstate_utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") failedNodeDeletion := make(map[string]bool) deletedNodes := make(chan string, 10) @@ -163,20 +159,21 @@ func TestRemove(t *testing.T) { }) ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, fakeClient, nil, provider, nil, nil) - clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute})) if err != nil { t.Fatalf("Couldn't set up autoscaling context: %v", err) } + scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList() + ng := "ng" provider.AddNodeGroup(ng, 1, 10, test.numNodes) nodeGroup := provider.GetNodeGroup(ng) d := NodeDeletionBatcher{ ctx: &ctx, - clusterState: clusterStateRegistry, nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(1 * time.Minute), deletionsPerNodeGroup: make(map[string][]*apiv1.Node), + scaleStateNotifier: scaleStateNotifier, drainedNodeDeletions: make(map[string]bool), } nodes := generateNodes(0, test.numNodes, ng) diff --git a/cluster-autoscaler/core/scaleup/orchestrator/executor.go b/cluster-autoscaler/core/scaleup/orchestrator/executor.go index ea433b2a5d56..baba3b73551d 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/executor.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/executor.go @@ -28,9 +28,9 @@ import ( schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" - "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" @@ -38,18 +38,18 @@ import ( // ScaleUpExecutor scales up node groups. type scaleUpExecutor struct { - autoscalingContext *context.AutoscalingContext - clusterStateRegistry *clusterstate.ClusterStateRegistry + autoscalingContext *context.AutoscalingContext + scaleStateNotifier nodegroupchange.NodeGroupChangeObserver } // New returns new instance of scale up executor. func newScaleUpExecutor( autoscalingContext *context.AutoscalingContext, - clusterStateRegistry *clusterstate.ClusterStateRegistry, + scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ) *scaleUpExecutor { return &scaleUpExecutor{ - autoscalingContext: autoscalingContext, - clusterStateRegistry: clusterStateRegistry, + autoscalingContext: autoscalingContext, + scaleStateNotifier: scaleStateNotifier, } } @@ -151,13 +151,13 @@ func (e *scaleUpExecutor) executeScaleUp( if err := info.Group.IncreaseSize(increase); err != nil { e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeWarning, "FailedToScaleUpGroup", "Scale-up failed for group %s: %v", info.Group.Id(), err) aerr := errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to increase node group size: ") - e.clusterStateRegistry.RegisterFailedScaleUp(info.Group, metrics.FailedScaleUpReason(string(aerr.Type())), aerr.Error(), gpuResourceName, gpuType, now) + e.scaleStateNotifier.RegisterFailedScaleUp(info.Group, string(aerr.Type()), aerr.Error(), gpuResourceName, gpuType, now) return aerr } - e.clusterStateRegistry.RegisterOrUpdateScaleUp( - info.Group, - increase, - time.Now()) + if increase < 0 { + return errors.NewAutoscalerError(errors.InternalError, fmt.Sprintf("increase in number of nodes cannot be negative, got: %v", increase)) + } + e.scaleStateNotifier.RegisterScaleUp(info.Group, increase, time.Now()) metrics.RegisterScaleUp(increase, gpuResourceName, gpuType) e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup", "Scale-up: group %s size set to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize) diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go index 656da06f274d..417d4062fb27 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go @@ -74,7 +74,7 @@ func (o *ScaleUpOrchestrator) Initialize( o.clusterStateRegistry = clusterStateRegistry o.taintConfig = taintConfig o.resourceManager = resource.NewManager(processors.CustomResourcesProcessor) - o.scaleUpExecutor = newScaleUpExecutor(autoscalingContext, clusterStateRegistry) + o.scaleUpExecutor = newScaleUpExecutor(autoscalingContext, processors.ScaleStateNotifier) o.initialized = true } diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go index b0470d9a899a..886c4e8dd01b 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go @@ -973,6 +973,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) processors := NewTestProcessors(&context) + processors.ScaleStateNotifier.Register(clusterState) orchestrator := New() orchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) expander := NewMockRepotingStrategy(t, config.ExpansionOptionToChoose) diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 1afedcfb1bb5..04a30fc609b0 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -167,12 +167,13 @@ func NewStaticAutoscaler( taintConfig := taints.NewTaintConfig(opts) processors.ScaleDownCandidatesNotifier.Register(clusterStateRegistry) + processors.ScaleStateNotifier.Register(clusterStateRegistry) // TODO: Populate the ScaleDownActuator/Planner fields in AutoscalingContext // during the struct creation rather than here. ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second) scaleDown := legacy.NewScaleDown(autoscalingContext, processors, ndt, deleteOptions, drainabilityRules) - actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor) + actuator := actuation.NewActuator(autoscalingContext, processors.ScaleStateNotifier, ndt, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor) autoscalingContext.ScaleDownActuator = actuator var scaleDownPlanner scaledown.Planner @@ -609,17 +610,12 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr metrics.UpdateDurationFromStart(metrics.FindUnneeded, unneededStart) - scaleDownInCooldown := a.processorCallbacks.disableScaleDownForLoop || - a.lastScaleUpTime.Add(a.ScaleDownDelayAfterAdd).After(currentTime) || - a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) || - a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime) - + scaleDownInCooldown := a.isScaleDownInCooldown(currentTime, scaleDownCandidates) klog.V(4).Infof("Scale down status: lastScaleUpTime=%s lastScaleDownDeleteTime=%v "+ "lastScaleDownFailTime=%s scaleDownForbidden=%v scaleDownInCooldown=%v", a.lastScaleUpTime, a.lastScaleDownDeleteTime, a.lastScaleDownFailTime, a.processorCallbacks.disableScaleDownForLoop, scaleDownInCooldown) metrics.UpdateScaleDownInCooldown(scaleDownInCooldown) - // We want to delete unneeded Node Groups only if here is no current delete // in progress. _, drained := scaleDownActuationStatus.DeletionsInProgress() @@ -689,6 +685,18 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr return nil } +func (a *StaticAutoscaler) isScaleDownInCooldown(currentTime time.Time, scaleDownCandidates []*apiv1.Node) bool { + scaleDownInCooldown := a.processorCallbacks.disableScaleDownForLoop || len(scaleDownCandidates) == 0 + + if a.ScaleDownDelayTypeLocal { + return scaleDownInCooldown + } + return scaleDownInCooldown || + a.lastScaleUpTime.Add(a.ScaleDownDelayAfterAdd).After(currentTime) || + a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) || + a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime) +} + // Sets the target size of node groups to the current number of nodes in them // if the difference was constant for a prolonged time. Returns true if managed // to fix something. diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index dd63e6d90c5b..fb1fb1c34966 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -44,11 +44,13 @@ import ( ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" + "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" "k8s.io/autoscaler/cluster-autoscaler/simulator/options" "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" + "k8s.io/autoscaler/cluster-autoscaler/utils/drain" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" @@ -473,6 +475,221 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { mock.AssertExpectationsForObjects(t, onScaleUpMock) } +func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) { + readyNodeLister := kubernetes.NewTestNodeLister(nil) + allNodeLister := kubernetes.NewTestNodeLister(nil) + allPodListerMock := &podListerMock{} + podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{} + daemonSetListerMock := &daemonSetListerMock{} + onScaleUpMock := &onScaleUpMock{} + onScaleDownMock := &onScaleDownMock{} + deleteFinished := make(chan bool, 1) + + n1 := BuildTestNode("n1", 1000, 1000) + SetNodeReadyState(n1, true, time.Now()) + n2 := BuildTestNode("n2", 1000, 1000) + SetNodeReadyState(n2, true, time.Now()) + + tn := BuildTestNode("tn", 1000, 1000) + tni := schedulerframework.NewNodeInfo() + tni.SetNode(tn) + + provider := testprovider.NewTestAutoprovisioningCloudProvider( + func(id string, delta int) error { + return onScaleUpMock.ScaleUp(id, delta) + }, func(id string, name string) error { + ret := onScaleDownMock.ScaleDown(id, name) + deleteFinished <- true + return ret + }, + nil, nil, + nil, map[string]*schedulerframework.NodeInfo{"ng1": tni, "ng2": tni}) + assert.NotNil(t, provider) + + provider.AddNodeGroup("ng1", 0, 10, 1) + ng1 := reflect.ValueOf(provider.GetNodeGroup("ng1")).Interface().(*testprovider.TestNodeGroup) + assert.NotNil(t, ng1) + + provider.AddNodeGroup("ng2", 0, 10, 1) + ng2 := reflect.ValueOf(provider.GetNodeGroup("ng2")).Interface().(*testprovider.TestNodeGroup) + assert.NotNil(t, ng2) + + // Create context with mocked lister registry. + options := config.AutoscalingOptions{ + NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ + ScaleDownUnneededTime: config.DefaultScaleDownUnneededTime, + ScaleDownUnreadyTime: time.Minute, + ScaleDownUtilizationThreshold: 0.5, + MaxNodeProvisionTime: 10 * time.Second, + }, + EstimatorName: estimator.BinpackingEstimatorName, + EnforceNodeGroupMinSize: true, + ScaleDownEnabled: true, + MaxNodesTotal: 1, + MaxCoresTotal: 10, + MaxMemoryTotal: 100000, + ScaleDownDelayTypeLocal: true, + ScaleDownDelayAfterAdd: 5 * time.Minute, + ScaleDownDelayAfterDelete: 5 * time.Minute, + ScaleDownDelayAfterFailure: 5 * time.Minute, + } + processorCallbacks := newStaticAutoscalerProcessorCallbacks() + + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil) + assert.NoError(t, err) + + setUpScaleDownActuator(&context, options) + + listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock, podDisruptionBudgetListerMock, daemonSetListerMock, + nil, nil, nil, nil) + context.ListerRegistry = listerRegistry + + clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ + OkTotalUnreadyCount: 1, + } + processors := NewTestProcessors(&context) + sddProcessor := scaledowncandidates.NewScaleDownCandidatesDelayProcessor() + processors.ScaleStateNotifier.Register(sddProcessor) + scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{} + cp := scaledowncandidates.NewCombinedScaleDownCandidatesProcessor() + cp.Register(scaledowncandidates.NewScaleDownCandidatesSortingProcessor(scaleDownCandidatesComparers)) + cp.Register(sddProcessor) + processors.ScaleDownNodeProcessor = cp + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) + processors.ScaleStateNotifier.Register(clusterState) + + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) + suOrchestrator := orchestrator.New() + suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) + + autoscaler := &StaticAutoscaler{ + AutoscalingContext: &context, + clusterStateRegistry: clusterState, + lastScaleUpTime: time.Now(), + lastScaleDownFailTime: time.Now(), + scaleDownPlanner: sdPlanner, + scaleDownActuator: sdActuator, + scaleUpOrchestrator: suOrchestrator, + processors: processors, + processorCallbacks: processorCallbacks, + initialized: true, + } + + p1 := BuildTestPod("p1", 400, 100) + p1.Annotations[drain.PodSafeToEvictKey] = "true" + p1.Spec.NodeName = "n1" + + p2 := BuildTestPod("p2", 400, 100) + p2.Annotations[drain.PodSafeToEvictKey] = "true" + p2.Spec.NodeName = "n2" + + testCases := []struct { + description string + beforeTest func(processors *ca_processors.AutoscalingProcessors) + expectedScaleDownNG string + expectedScaleDownNode string + afterTest func(processors *ca_processors.AutoscalingProcessors) + }{ + // Case 1: + // ng1 scaled up recently + // both ng1 and ng2 have under-utilized nodes + // expectation: under-utilized node in ng2 should be scaled down + { + description: "ng1 scaled up recently - both ng1 and ng2 have under-utilized nodes", + beforeTest: func(processors *ca_processors.AutoscalingProcessors) { + // make CA think ng1 scaled up recently + processors.ScaleStateNotifier.RegisterScaleUp(ng1, 1, time.Now().Add(-3*time.Minute)) + }, + expectedScaleDownNG: "ng2", + expectedScaleDownNode: "n2", + afterTest: func(processors *ca_processors.AutoscalingProcessors) { + // reset scale up in ng1 so that it doesn't block scale down in the next test + // scale down is always recorded relative to time.Now(), no matter + // what currentTime time is passed to RunOnce() + processors.ScaleStateNotifier.RegisterScaleUp(ng1, 1, time.Time{}) + }, + }, + + // Case 2: + // ng2 scaled down recently + // both ng1 and ng2 have under-utilized nodes + // expectation: under-utilized node in ng1 should be scaled down + { + description: "ng2 scaled down recently - both ng1 and ng2 have under-utilized nodes", + beforeTest: func(processors *ca_processors.AutoscalingProcessors) { + // make CA think ng2 scaled down recently + processors.ScaleStateNotifier.RegisterScaleDown(ng2, "n3", time.Now().Add(-3*time.Minute), time.Now()) + }, + expectedScaleDownNG: "ng1", + expectedScaleDownNode: "n1", + afterTest: func(processors *ca_processors.AutoscalingProcessors) { + // reset scale down in ng1 and ng2 so that it doesn't block scale down in the next test + // scale down is always recorded relative to time.Now(), no matter + // what currentTime time is passed to RunOnce() + processors.ScaleStateNotifier.RegisterScaleDown(ng2, "n3", time.Time{}, time.Time{}) + processors.ScaleStateNotifier.RegisterScaleDown(ng1, "n1", time.Time{}, time.Time{}) + }, + }, + + // Case 3: + // ng1 had a scale down failure + // both ng1 and ng2 have under-utilized nodes + // expectation: under-utilized node in ng2 should be scaled down + { + description: "ng1 had scale-down failure - both ng1 and ng2 have under-utilized nodes", + beforeTest: func(processors *ca_processors.AutoscalingProcessors) { + // Make CA think scale down failed in ng1 + processors.ScaleStateNotifier.RegisterFailedScaleDown(ng1, "scale down failed", time.Now().Add(-3*time.Minute)) + }, + expectedScaleDownNG: "ng2", + expectedScaleDownNode: "n2", + afterTest: func(processors *ca_processors.AutoscalingProcessors) { + + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + + tc.beforeTest(processors) + + provider.AddNode("ng1", n1) + provider.AddNode("ng2", n2) + ng1.SetTargetSize(1) + ng2.SetTargetSize(1) + + // Mark unneeded nodes. + readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) + allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) + allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice() + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() + podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() + + err = autoscaler.RunOnce(time.Now()) + assert.NoError(t, err) + mock.AssertExpectationsForObjects(t, allPodListerMock, + podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) + + // Scale down nodegroup + readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) + allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) + allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Times(3) + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() + podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Twice() + onScaleDownMock.On("ScaleDown", tc.expectedScaleDownNG, tc.expectedScaleDownNode).Return(nil).Once() + + err = autoscaler.RunOnce(time.Now().Add(config.DefaultScaleDownUnneededTime)) + waitForDeleteToFinish(t, deleteFinished) + assert.NoError(t, err) + mock.AssertExpectationsForObjects(t, allPodListerMock, + podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) + + tc.afterTest(processors) + }) + } +} + func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { readyNodeLister := kubernetes.NewTestNodeLister(nil) allNodeLister := kubernetes.NewTestNodeLister(nil) diff --git a/cluster-autoscaler/core/test/common.go b/cluster-autoscaler/core/test/common.go index 35c582f5d127..2f7037846ef9 100644 --- a/cluster-autoscaler/core/test/common.go +++ b/cluster-autoscaler/core/test/common.go @@ -35,6 +35,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/expander/random" "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange" "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster" "k8s.io/autoscaler/cluster-autoscaler/processors/binpacking" @@ -193,6 +194,7 @@ func NewTestProcessors(context *context.AutoscalingContext) *processors.Autoscal CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(), ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(), + ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(), } } diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 4c7cc499de36..0c4fcd569c57 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -113,9 +113,11 @@ var ( scaleDownUnreadyEnabled = flag.Bool("scale-down-unready-enabled", true, "Should CA scale down unready nodes of the cluster") scaleDownDelayAfterAdd = flag.Duration("scale-down-delay-after-add", 10*time.Minute, "How long after scale up that scale down evaluation resumes") + scaleDownDelayTypeLocal = flag.Bool("scale-down-delay-type-local", false, + "Should --scale-down-delay-after-* flags be applied locally per nodegroup or globally across all nodegroups") scaleDownDelayAfterDelete = flag.Duration("scale-down-delay-after-delete", 0, "How long after node deletion that scale down evaluation resumes, defaults to scanInterval") - scaleDownDelayAfterFailure = flag.Duration("scale-down-delay-after-failure", 3*time.Minute, + scaleDownDelayAfterFailure = flag.Duration("scale-down-delay-after-failure", config.DefaultScaleDownDelayAfterFailure, "How long after scale down failure that scale down evaluation resumes") scaleDownUnneededTime = flag.Duration("scale-down-unneeded-time", config.DefaultScaleDownUnneededTime, "How long a node should be unneeded before it is eligible for scale down") @@ -145,7 +147,7 @@ var ( schedulerConfigFile = flag.String(config.SchedulerConfigFileFlag, "", "scheduler-config allows changing configuration of in-tree scheduler plugins acting on PreFilter and Filter extension points") nodeDeletionDelayTimeout = flag.Duration("node-deletion-delay-timeout", 2*time.Minute, "Maximum time CA waits for removing delay-deletion.cluster-autoscaler.kubernetes.io/ annotations before deleting the node.") nodeDeletionBatcherInterval = flag.Duration("node-deletion-batcher-interval", 0*time.Second, "How long CA ScaleDown gather nodes to delete them in batch.") - scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down") + scanInterval = flag.Duration("scan-interval", config.DefaultScanInterval, "How often cluster is reevaluated for scale up or down") maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.") coresTotal = flag.String("cores-total", minMaxFlagString(0, config.DefaultMaxClusterCores), "Minimum and maximum number of cores in cluster, in the format :. Cluster autoscaler will not scale the cluster beyond these numbers.") memoryTotal = flag.String("memory-total", minMaxFlagString(0, config.DefaultMaxClusterMemory), "Minimum and maximum number of gigabytes of memory in cluster, in the format :. Cluster autoscaler will not scale the cluster beyond these numbers.") @@ -359,6 +361,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { NodeGroups: *nodeGroupsFlag, EnforceNodeGroupMinSize: *enforceNodeGroupMinSize, ScaleDownDelayAfterAdd: *scaleDownDelayAfterAdd, + ScaleDownDelayTypeLocal: *scaleDownDelayTypeLocal, ScaleDownDelayAfterDelete: *scaleDownDelayAfterDelete, ScaleDownDelayAfterFailure: *scaleDownDelayAfterFailure, ScaleDownEnabled: *scaleDownEnabled, @@ -492,8 +495,17 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter } opts.Processors.ScaleDownCandidatesNotifier.Register(sdCandidatesSorting) } - sdProcessor := scaledowncandidates.NewScaleDownCandidatesSortingProcessor(scaleDownCandidatesComparers) - opts.Processors.ScaleDownNodeProcessor = sdProcessor + + cp := scaledowncandidates.NewCombinedScaleDownCandidatesProcessor() + cp.Register(scaledowncandidates.NewScaleDownCandidatesSortingProcessor(scaleDownCandidatesComparers)) + + if autoscalingOptions.ScaleDownDelayTypeLocal { + sdp := scaledowncandidates.NewScaleDownCandidatesDelayProcessor() + cp.Register(sdp) + opts.Processors.ScaleStateNotifier.Register(sdp) + + } + opts.Processors.ScaleDownNodeProcessor = cp var nodeInfoComparator nodegroupset.NodeInfoComparator if len(autoscalingOptions.BalancingLabels) > 0 { diff --git a/cluster-autoscaler/observers/nodegroupchange/scale_state_observer.go b/cluster-autoscaler/observers/nodegroupchange/scale_state_observer.go new file mode 100644 index 000000000000..0f771746c917 --- /dev/null +++ b/cluster-autoscaler/observers/nodegroupchange/scale_state_observer.go @@ -0,0 +1,100 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodegroupchange + +import ( + "sync" + "time" + + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" +) + +// NodeGroupChangeObserver is an observer of: +// * scale-up(s) for a nodegroup +// * scale-down(s) for a nodegroup +// * scale-up failure(s) for a nodegroup +// * scale-down failure(s) for a nodegroup +type NodeGroupChangeObserver interface { + // RegisterScaleUp records scale up for a nodegroup. + RegisterScaleUp(nodeGroup cloudprovider.NodeGroup, delta int, currentTime time.Time) + // RegisterScaleDowns records scale down for a nodegroup. + RegisterScaleDown(nodeGroup cloudprovider.NodeGroup, nodeName string, currentTime time.Time, expectedDeleteTime time.Time) + // RegisterFailedScaleUp records failed scale-up for a nodegroup. + // reason denotes optional reason for failed scale-up + // errMsg denotes the actual error message + RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, reason string, errMsg string, gpuResourceName, gpuType string, currentTime time.Time) + // RegisterFailedScaleDown records failed scale-down for a nodegroup. + RegisterFailedScaleDown(nodeGroup cloudprovider.NodeGroup, reason string, currentTime time.Time) +} + +// NodeGroupChangeObserversList is a slice of observers +// of state of scale up/down in the cluster +type NodeGroupChangeObserversList struct { + observers []NodeGroupChangeObserver + // TODO(vadasambar): consider using separate mutexes for functions not related to each other + mutex sync.Mutex +} + +// Register adds new observer to the list. +func (l *NodeGroupChangeObserversList) Register(o NodeGroupChangeObserver) { + l.observers = append(l.observers, o) +} + +// RegisterScaleUp calls RegisterScaleUp for each observer. +func (l *NodeGroupChangeObserversList) RegisterScaleUp(nodeGroup cloudprovider.NodeGroup, + delta int, currentTime time.Time) { + l.mutex.Lock() + defer l.mutex.Unlock() + for _, observer := range l.observers { + observer.RegisterScaleUp(nodeGroup, delta, currentTime) + } +} + +// RegisterScaleDown calls RegisterScaleDown for each observer. +func (l *NodeGroupChangeObserversList) RegisterScaleDown(nodeGroup cloudprovider.NodeGroup, + nodeName string, currentTime time.Time, expectedDeleteTime time.Time) { + l.mutex.Lock() + defer l.mutex.Unlock() + for _, observer := range l.observers { + observer.RegisterScaleDown(nodeGroup, nodeName, currentTime, expectedDeleteTime) + } +} + +// RegisterFailedScaleUp calls RegisterFailedScaleUp for each observer. +func (l *NodeGroupChangeObserversList) RegisterFailedScaleUp(nodeGroup cloudprovider.NodeGroup, + reason string, errMsg, gpuResourceName, gpuType string, currentTime time.Time) { + l.mutex.Lock() + defer l.mutex.Unlock() + for _, observer := range l.observers { + observer.RegisterFailedScaleUp(nodeGroup, reason, errMsg, gpuResourceName, gpuType, currentTime) + } +} + +// RegisterFailedScaleDown records failed scale-down for a nodegroup. +func (l *NodeGroupChangeObserversList) RegisterFailedScaleDown(nodeGroup cloudprovider.NodeGroup, + reason string, currentTime time.Time) { + l.mutex.Lock() + defer l.mutex.Unlock() + for _, observer := range l.observers { + observer.RegisterFailedScaleDown(nodeGroup, reason, currentTime) + } +} + +// NewNodeGroupChangeObserversList return empty list of scale state observers. +func NewNodeGroupChangeObserversList() *NodeGroupChangeObserversList { + return &NodeGroupChangeObserversList{} +} diff --git a/cluster-autoscaler/processors/processors.go b/cluster-autoscaler/processors/processors.go index 8d7b949540e7..43957b14889b 100644 --- a/cluster-autoscaler/processors/processors.go +++ b/cluster-autoscaler/processors/processors.go @@ -18,6 +18,7 @@ package processors import ( "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange" "k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster" "k8s.io/autoscaler/cluster-autoscaler/processors/binpacking" "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" @@ -67,6 +68,12 @@ type AutoscalingProcessors struct { ActionableClusterProcessor actionablecluster.ActionableClusterProcessor // ScaleDownCandidatesNotifier is used to Update and Register new scale down candidates observer. ScaleDownCandidatesNotifier *scaledowncandidates.ObserversList + // ScaleStateNotifier is used to notify + // * scale-ups per nodegroup + // * scale-downs per nodegroup + // * scale-up failures per nodegroup + // * scale-down failures per nodegroup + ScaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList } // DefaultProcessors returns default set of processors. @@ -97,6 +104,7 @@ func DefaultProcessors(options config.AutoscalingOptions) *AutoscalingProcessors ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false), ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(), + ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(), } } diff --git a/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_delay_processor.go b/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_delay_processor.go new file mode 100644 index 000000000000..7f125fdd7faf --- /dev/null +++ b/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_delay_processor.go @@ -0,0 +1,124 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scaledowncandidates + +import ( + "reflect" + "time" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" +) + +// ScaleDownCandidatesDelayProcessor is a processor to filter out +// nodes according to scale down delay per nodegroup +type ScaleDownCandidatesDelayProcessor struct { + scaleUps map[string]time.Time + scaleDowns map[string]time.Time + scaleDownFailures map[string]time.Time +} + +// GetPodDestinationCandidates returns nodes as is no processing is required here +func (p *ScaleDownCandidatesDelayProcessor) GetPodDestinationCandidates(ctx *context.AutoscalingContext, + nodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) { + return nodes, nil +} + +// GetScaleDownCandidates returns filter nodes based on if scale down is enabled or disabled per nodegroup. +func (p *ScaleDownCandidatesDelayProcessor) GetScaleDownCandidates(ctx *context.AutoscalingContext, + nodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) { + result := []*apiv1.Node{} + + for _, node := range nodes { + nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node) + if err != nil { + klog.Warningf("Error while checking node group for %s: %v", node.Name, err) + continue + } + if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() { + klog.V(4).Infof("Node %s should not be processed by cluster autoscaler (no node group config)", node.Name) + continue + } + + currentTime := time.Now() + + recent := func(m map[string]time.Time, d time.Duration, msg string) bool { + if !m[nodeGroup.Id()].IsZero() && m[nodeGroup.Id()].Add(d).After(currentTime) { + klog.V(4).Infof("Skipping scale down on node group %s because it %s recently at %v", + nodeGroup.Id(), msg, m[nodeGroup.Id()]) + return true + } + + return false + } + + if recent(p.scaleUps, ctx.ScaleDownDelayAfterAdd, "scaled up") { + continue + } + + if recent(p.scaleDowns, ctx.ScaleDownDelayAfterDelete, "scaled down") { + continue + } + + if recent(p.scaleDownFailures, ctx.ScaleDownDelayAfterFailure, "failed to scale down") { + continue + } + + result = append(result, node) + } + return result, nil +} + +// CleanUp is called at CA termination. +func (p *ScaleDownCandidatesDelayProcessor) CleanUp() { +} + +// RegisterScaleUp records when the last scale up happened for a nodegroup. +func (p *ScaleDownCandidatesDelayProcessor) RegisterScaleUp(nodeGroup cloudprovider.NodeGroup, + _ int, currentTime time.Time) { + p.scaleUps[nodeGroup.Id()] = currentTime +} + +// RegisterScaleDown records when the last scale down happened for a nodegroup. +func (p *ScaleDownCandidatesDelayProcessor) RegisterScaleDown(nodeGroup cloudprovider.NodeGroup, + nodeName string, currentTime time.Time, _ time.Time) { + p.scaleDowns[nodeGroup.Id()] = currentTime +} + +// RegisterFailedScaleUp records when the last scale up failed for a nodegroup. +func (p *ScaleDownCandidatesDelayProcessor) RegisterFailedScaleUp(_ cloudprovider.NodeGroup, + _ string, _ string, _ string, _ string, _ time.Time) { +} + +// RegisterFailedScaleDown records failed scale-down for a nodegroup. +func (p *ScaleDownCandidatesDelayProcessor) RegisterFailedScaleDown(nodeGroup cloudprovider.NodeGroup, + reason string, currentTime time.Time) { + p.scaleDownFailures[nodeGroup.Id()] = currentTime +} + +// NewScaleDownCandidatesDelayProcessor returns a new ScaleDownCandidatesDelayProcessor. +func NewScaleDownCandidatesDelayProcessor() *ScaleDownCandidatesDelayProcessor { + return &ScaleDownCandidatesDelayProcessor{ + scaleUps: make(map[string]time.Time), + scaleDowns: make(map[string]time.Time), + scaleDownFailures: make(map[string]time.Time), + } +} diff --git a/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_delay_processor_test.go b/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_delay_processor_test.go new file mode 100644 index 000000000000..cdc1eac303f1 --- /dev/null +++ b/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_delay_processor_test.go @@ -0,0 +1,151 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scaledowncandidates + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/context" +) + +func TestGetScaleDownCandidates(t *testing.T) { + n1 := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "n1", + }, + } + + n2 := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "n2", + }, + } + + n3 := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "n3", + }, + } + + ctx := context.AutoscalingContext{ + AutoscalingOptions: config.AutoscalingOptions{ + ScaleDownDelayAfterAdd: time.Minute * 10, + ScaleDownDelayAfterDelete: time.Minute * 10, + ScaleDownDelayAfterFailure: time.Minute * 10, + ScaleDownDelayTypeLocal: true, + }, + } + + testCases := map[string]struct { + autoscalingContext context.AutoscalingContext + candidates []*v1.Node + expected []*v1.Node + setupProcessor func(p *ScaleDownCandidatesDelayProcessor) *ScaleDownCandidatesDelayProcessor + }{ + // Expectation: no nodegroups should be filtered out + "no scale ups - no scale downs - no scale down failures": { + autoscalingContext: ctx, + candidates: []*v1.Node{n1, n2, n3}, + expected: []*v1.Node{n1, n2, n3}, + setupProcessor: nil, + }, + // Expectation: only nodegroups in cool-down should be filtered out + "no scale ups - 2 scale downs - no scale down failures": { + autoscalingContext: ctx, + candidates: []*v1.Node{n1, n2, n3}, + expected: []*v1.Node{n1, n3}, + setupProcessor: func(p *ScaleDownCandidatesDelayProcessor) *ScaleDownCandidatesDelayProcessor { + // fake nodegroups for calling `RegisterScaleDown` + ng2 := test.NewTestNodeGroup("ng-2", 0, 0, 0, false, false, "", nil, nil) + ng3 := test.NewTestNodeGroup("ng-3", 0, 0, 0, false, false, "", nil, nil) + // in cool down + p.RegisterScaleDown(ng2, "n2", time.Now().Add(-time.Minute*5), time.Time{}) + // not in cool down anymore + p.RegisterScaleDown(ng3, "n3", time.Now().Add(-time.Minute*11), time.Time{}) + + return p + }, + }, + // Expectation: only nodegroups in cool-down should be filtered out + "1 scale up - no scale down - no scale down failures": { + autoscalingContext: ctx, + candidates: []*v1.Node{n1, n2, n3}, + expected: []*v1.Node{n1, n3}, + setupProcessor: func(p *ScaleDownCandidatesDelayProcessor) *ScaleDownCandidatesDelayProcessor { + // fake nodegroups for calling `RegisterScaleUp` + ng2 := test.NewTestNodeGroup("ng-2", 0, 0, 0, false, false, "", nil, nil) + ng3 := test.NewTestNodeGroup("ng-3", 0, 0, 0, false, false, "", nil, nil) + + // in cool down + p.RegisterScaleUp(ng2, 0, time.Now().Add(-time.Minute*5)) + // not in cool down anymore + p.RegisterScaleUp(ng3, 0, time.Now().Add(-time.Minute*11)) + return p + }, + }, + // Expectation: only nodegroups in cool-down should be filtered out + "no scale up - no scale down - 1 scale down failure": { + autoscalingContext: ctx, + candidates: []*v1.Node{n1, n2, n3}, + expected: []*v1.Node{n1, n3}, + setupProcessor: func(p *ScaleDownCandidatesDelayProcessor) *ScaleDownCandidatesDelayProcessor { + // fake nodegroups for calling `RegisterScaleUp` + ng2 := test.NewTestNodeGroup("ng-2", 0, 0, 0, false, false, "", nil, nil) + ng3 := test.NewTestNodeGroup("ng-3", 0, 0, 0, false, false, "", nil, nil) + + // in cool down + p.RegisterFailedScaleDown(ng2, "", time.Now().Add(-time.Minute*5)) + // not in cool down anymore + p.RegisterFailedScaleDown(ng3, "", time.Now().Add(-time.Minute*11)) + return p + }, + }, + } + + for description, testCase := range testCases { + t.Run(description, func(t *testing.T) { + provider := testprovider.NewTestCloudProvider(nil, nil) + + p := NewScaleDownCandidatesDelayProcessor() + + if testCase.setupProcessor != nil { + p = testCase.setupProcessor(p) + } + + provider.AddNodeGroup("ng-1", 1, 3, 2) + provider.AddNode("ng-1", n1) + provider.AddNodeGroup("ng-2", 1, 3, 2) + provider.AddNode("ng-2", n2) + provider.AddNodeGroup("ng-3", 1, 3, 2) + provider.AddNode("ng-3", n3) + + testCase.autoscalingContext.CloudProvider = provider + + no, err := p.GetScaleDownCandidates(&testCase.autoscalingContext, testCase.candidates) + + assert.NoError(t, err) + assert.Equal(t, testCase.expected, no) + }) + } +} diff --git a/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_processor.go b/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_processor.go new file mode 100644 index 000000000000..854b6831624e --- /dev/null +++ b/cluster-autoscaler/processors/scaledowncandidates/scale_down_candidates_processor.go @@ -0,0 +1,72 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scaledowncandidates + +import ( + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodes" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" +) + +type combinedScaleDownCandidatesProcessor struct { + processors []nodes.ScaleDownNodeProcessor +} + +// NewCombinedScaleDownCandidatesProcessor returns a default implementation of the scale down candidates +// processor, which wraps and sequentially runs other sub-processors. +func NewCombinedScaleDownCandidatesProcessor() *combinedScaleDownCandidatesProcessor { + return &combinedScaleDownCandidatesProcessor{} + +} + +// Register registers a new ScaleDownNodeProcessor +func (p *combinedScaleDownCandidatesProcessor) Register(np nodes.ScaleDownNodeProcessor) { + p.processors = append(p.processors, np) +} + +// GetPodDestinationCandidates returns nodes that potentially could act as destinations for pods +// that would become unscheduled after a scale down. +func (p *combinedScaleDownCandidatesProcessor) GetPodDestinationCandidates(ctx *context.AutoscalingContext, nodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) { + var err errors.AutoscalerError + for _, processor := range p.processors { + nodes, err = processor.GetPodDestinationCandidates(ctx, nodes) + if err != nil { + return nil, err + } + } + return nodes, nil +} + +// GetScaleDownCandidates returns nodes that potentially could be scaled down. +func (p *combinedScaleDownCandidatesProcessor) GetScaleDownCandidates(ctx *context.AutoscalingContext, nodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) { + var err errors.AutoscalerError + for _, processor := range p.processors { + nodes, err = processor.GetScaleDownCandidates(ctx, nodes) + if err != nil { + return nil, err + } + } + return nodes, nil +} + +// CleanUp is called at CA termination +func (p *combinedScaleDownCandidatesProcessor) CleanUp() { + for _, processor := range p.processors { + processor.CleanUp() + } +}