From c74f2df3d61c3fd88a3808c3ef9a608936abd5f5 Mon Sep 17 00:00:00 2001 From: Aleksandra Malinowska Date: Tue, 30 Apr 2024 12:31:09 +0200 Subject: [PATCH 1/2] Add support for all-or-nothing scale-up strategy --- .../core/scaleup/equivalence/groups.go | 7 ++- .../core/scaleup/orchestrator/executor.go | 22 +++++-- .../core/scaleup/orchestrator/orchestrator.go | 63 ++++++++++++++++++- .../scaleup/orchestrator/orchestrator_test.go | 14 ++--- .../scaleup/orchestrator/rejectedreasons.go | 37 +++++++++++ cluster-autoscaler/core/scaleup/scaleup.go | 1 + cluster-autoscaler/core/static_autoscaler.go | 2 +- .../orchestrator/orchestrator.go | 1 + .../orchestrator/orchestrator_test.go | 2 +- .../orchestrator/wrapper_orchestrator.go | 5 +- .../orchestrator/wrapper_orchestrator_test.go | 5 +- 11 files changed, 135 insertions(+), 24 deletions(-) create mode 100644 cluster-autoscaler/core/scaleup/orchestrator/rejectedreasons.go diff --git a/cluster-autoscaler/core/scaleup/equivalence/groups.go b/cluster-autoscaler/core/scaleup/equivalence/groups.go index c805b9f01bcb..5fcefd620162 100644 --- a/cluster-autoscaler/core/scaleup/equivalence/groups.go +++ b/cluster-autoscaler/core/scaleup/equivalence/groups.go @@ -30,9 +30,10 @@ import ( // PodGroup contains a group of pods that are equivalent in terms of schedulability. type PodGroup struct { - Pods []*apiv1.Pod - SchedulingErrors map[string]status.Reasons - Schedulable bool + Pods []*apiv1.Pod + SchedulingErrors map[string]status.Reasons + SchedulableGroups []string + Schedulable bool } // BuildPodGroups prepares pod groups with equivalent scheduling properties. diff --git a/cluster-autoscaler/core/scaleup/orchestrator/executor.go b/cluster-autoscaler/core/scaleup/orchestrator/executor.go index baba3b73551d..5a685fa7781f 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/executor.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/executor.go @@ -61,18 +61,20 @@ func (e *scaleUpExecutor) ExecuteScaleUps( scaleUpInfos []nodegroupset.ScaleUpInfo, nodeInfos map[string]*schedulerframework.NodeInfo, now time.Time, + allOrNothing bool, ) (errors.AutoscalerError, []cloudprovider.NodeGroup) { options := e.autoscalingContext.AutoscalingOptions if options.ParallelScaleUp { - return e.executeScaleUpsParallel(scaleUpInfos, nodeInfos, now) + return e.executeScaleUpsParallel(scaleUpInfos, nodeInfos, now, allOrNothing) } - return e.executeScaleUpsSync(scaleUpInfos, nodeInfos, now) + return e.executeScaleUpsSync(scaleUpInfos, nodeInfos, now, allOrNothing) } func (e *scaleUpExecutor) executeScaleUpsSync( scaleUpInfos []nodegroupset.ScaleUpInfo, nodeInfos map[string]*schedulerframework.NodeInfo, now time.Time, + allOrNothing bool, ) (errors.AutoscalerError, []cloudprovider.NodeGroup) { availableGPUTypes := e.autoscalingContext.CloudProvider.GetAvailableGPUTypes() for _, scaleUpInfo := range scaleUpInfos { @@ -81,7 +83,7 @@ func (e *scaleUpExecutor) executeScaleUpsSync( klog.Errorf("ExecuteScaleUp: failed to get node info for node group %s", scaleUpInfo.Group.Id()) continue } - if aErr := e.executeScaleUp(scaleUpInfo, nodeInfo, availableGPUTypes, now); aErr != nil { + if aErr := e.executeScaleUp(scaleUpInfo, nodeInfo, availableGPUTypes, now, allOrNothing); aErr != nil { return aErr, []cloudprovider.NodeGroup{scaleUpInfo.Group} } } @@ -92,6 +94,7 @@ func (e *scaleUpExecutor) executeScaleUpsParallel( scaleUpInfos []nodegroupset.ScaleUpInfo, nodeInfos map[string]*schedulerframework.NodeInfo, now time.Time, + allOrNothing bool, ) (errors.AutoscalerError, []cloudprovider.NodeGroup) { if err := checkUniqueNodeGroups(scaleUpInfos); err != nil { return err, extractNodeGroups(scaleUpInfos) @@ -113,7 +116,7 @@ func (e *scaleUpExecutor) executeScaleUpsParallel( klog.Errorf("ExecuteScaleUp: failed to get node info for node group %s", info.Group.Id()) return } - if aErr := e.executeScaleUp(info, nodeInfo, availableGPUTypes, now); aErr != nil { + if aErr := e.executeScaleUp(info, nodeInfo, availableGPUTypes, now, allOrNothing); aErr != nil { errResults <- errResult{err: aErr, info: &info} } }(scaleUpInfo) @@ -141,6 +144,7 @@ func (e *scaleUpExecutor) executeScaleUp( nodeInfo *schedulerframework.NodeInfo, availableGPUTypes map[string]struct{}, now time.Time, + allOrNothing bool, ) errors.AutoscalerError { gpuConfig := e.autoscalingContext.CloudProvider.GetNodeGpuConfig(nodeInfo.Node()) gpuResourceName, gpuType := gpu.GetGpuInfoForMetrics(gpuConfig, availableGPUTypes, nodeInfo.Node(), nil) @@ -148,7 +152,15 @@ func (e *scaleUpExecutor) executeScaleUp( e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup", "Scale-up: setting group %s size to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize) increase := info.NewSize - info.CurrentSize - if err := info.Group.IncreaseSize(increase); err != nil { + var err error + if allOrNothing { + if err = info.Group.AtomicIncreaseSize(increase); err == cloudprovider.ErrNotImplemented { + err = info.Group.IncreaseSize(increase) + } + } else { + err = info.Group.IncreaseSize(increase) + } + if err != nil { e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeWarning, "FailedToScaleUpGroup", "Scale-up failed for group %s: %v", info.Group.Id(), err) aerr := errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to increase node group size: ") e.scaleStateNotifier.RegisterFailedScaleUp(info.Group, string(aerr.Type()), aerr.Error(), gpuResourceName, gpuType, now) diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go index e94ed4ccae49..497c10b15db4 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go @@ -89,6 +89,7 @@ func (o *ScaleUpOrchestrator) ScaleUp( nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, + allOrNothing bool, ) (*status.ScaleUpStatus, errors.AutoscalerError) { if !o.initialized { return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized")) @@ -146,11 +147,13 @@ func (o *ScaleUpOrchestrator) ScaleUp( } for _, nodeGroup := range validNodeGroups { - option := o.ComputeExpansionOption(nodeGroup, schedulablePodGroups, nodeInfos, len(nodes)+len(upcomingNodes), now) + option := o.ComputeExpansionOption(nodeGroup, schedulablePodGroups, nodeInfos, len(nodes)+len(upcomingNodes), now, allOrNothing) o.processors.BinpackingLimiter.MarkProcessed(o.autoscalingContext, nodeGroup.Id()) if len(option.Pods) == 0 || option.NodeCount == 0 { klog.V(4).Infof("No pod can fit to %s", nodeGroup.Id()) + } else if allOrNothing && len(option.Pods) < len(unschedulablePods) { + klog.V(4).Infof("Some pods can't fit to %s, giving up due to all-or-nothing scale-up strategy", nodeGroup.Id()) } else { options = append(options, option) } @@ -211,9 +214,20 @@ func (o *ScaleUpOrchestrator) ScaleUp( aErr) } + if newNodes < bestOption.NodeCount { + klog.V(1).Infof("Only %d nodes can be added to %s due to cluster-wide limits", newNodes, bestOption.NodeGroup.Id()) + if allOrNothing { + return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups) + } + } + // If necessary, create the node group. This is no longer simulation, an empty node group will be created by cloud provider if supported. createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0) if !bestOption.NodeGroup.Exist() { + if allOrNothing && bestOption.NodeGroup.MaxSize() < newNodes { + klog.V(1).Infof("Can only create a new node group with max %d nodes, need %d nodes", bestOption.NodeGroup.MaxSize(), newNodes) + return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups) + } var scaleUpStatus *status.ScaleUpStatus createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets) if aErr != nil { @@ -256,9 +270,21 @@ func (o *ScaleUpOrchestrator) ScaleUp( aErr) } + // Last check before scale-up. Node group capacity (both due to max size limits & current size) is only checked when balancing. + totalCapacity := 0 + for _, sui := range scaleUpInfos { + totalCapacity += sui.NewSize - sui.CurrentSize + } + if totalCapacity < newNodes { + klog.V(1).Infof("Can only add %d nodes due to node group limits, need %d nodes", totalCapacity, newNodes) + if allOrNothing { + return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups) + } + } + // Execute scale up. klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos) - aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now) + aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now, allOrNothing) if aErr != nil { return status.UpdateScaleUpError( &status.ScaleUpStatus{ @@ -364,7 +390,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize( } klog.V(1).Infof("ScaleUpToNodeGroupMinSize: final scale-up plan: %v", scaleUpInfos) - aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now) + aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now, false /* allOrNothing disabled */) if aErr != nil { return status.UpdateScaleUpError( &status.ScaleUpStatus{ @@ -447,6 +473,7 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption( nodeInfos map[string]*schedulerframework.NodeInfo, currentNodeCount int, now time.Time, + allOrNothing bool, ) expander.Option { option := expander.Option{NodeGroup: nodeGroup} podGroups := schedulablePodGroups[nodeGroup.Id()] @@ -472,6 +499,15 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption( klog.Errorf("Failed to get autoscaling options for node group %s: %v", nodeGroup.Id(), err) } if autoscalingOptions != nil && autoscalingOptions.ZeroOrMaxNodeScaling { + if allOrNothing && option.NodeCount > nodeGroup.MaxSize() { + // The following check can quietly cap the number of nodes and so breaks the + // assumption that as long as we're able to provision option.NodeCount of + // nodes, all pods will be accommodated. + // This fix isn't applicable to non-atomic node groups as we'll operate + // on uncapped number of nodes in that case. + option.Pods = nil + option.NodeCount = 0 + } if option.NodeCount > 0 && option.NodeCount != nodeGroup.MaxSize() { option.NodeCount = nodeGroup.MaxSize() } @@ -564,6 +600,7 @@ func (o *ScaleUpOrchestrator) SchedulablePodGroups( }) // Mark pod group as (theoretically) schedulable. eg.Schedulable = true + eg.SchedulableGroups = append(eg.SchedulableGroups, nodeGroup.Id()) } else { klog.V(2).Infof("Pod %s/%s can't be scheduled on %s, predicate checking error: %v", samplePod.Namespace, samplePod.Name, nodeGroup.Id(), err.VerboseMessage()) if podCount := len(eg.Pods); podCount > 1 { @@ -709,6 +746,26 @@ func matchingSchedulablePodGroups(podGroups []estimator.PodEquivalenceGroup, sim return true } +func stopAllOrNothingScaleUp(egs []*equivalence.PodGroup, skipped map[string]status.Reasons, ngs []cloudprovider.NodeGroup) (*status.ScaleUpStatus, errors.AutoscalerError) { + // Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable. + for _, eg := range egs { + if eg.Schedulable { + errs := map[string]status.Reasons{} + for _, sg := range eg.SchedulableGroups { + errs[sg] = AllOrNothingReason + } + eg.Schedulable = false + } + } + klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated") + return &status.ScaleUpStatus{ + Result: status.ScaleUpNoOptionsAvailable, + PodsRemainUnschedulable: GetRemainingPods(egs, skipped), + ConsideredNodeGroups: ngs, + }, nil + +} + // GetRemainingPods returns information about pods which CA is unable to help // at this moment. func GetRemainingPods(egs []*equivalence.PodGroup, skipped map[string]status.Reasons) []status.NoScaleUpInfo { diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go index 2c446dcf0f44..9b887a108313 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go @@ -1032,7 +1032,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR context.ExpanderStrategy = expander // scale up - scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos) + scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus) // aggregate group size changes @@ -1131,7 +1131,7 @@ func TestScaleUpUnhealthy(t *testing.T) { processors := NewTestProcessors(&context) suOrchestrator := New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) - scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos) + scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) assert.NoError(t, err) // Node group is unhealthy. @@ -1185,7 +1185,7 @@ func TestBinpackingLimiter(t *testing.T) { expander := NewMockRepotingStrategy(t, nil) context.ExpanderStrategy = expander - scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{extraPod}, nodes, []*appsv1.DaemonSet{}, nodeInfos) + scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{extraPod}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus) assert.NoError(t, err) assert.True(t, scaleUpStatus.WasSuccessful()) @@ -1231,7 +1231,7 @@ func TestScaleUpNoHelp(t *testing.T) { processors := NewTestProcessors(&context) suOrchestrator := New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) - scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos) + scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus) assert.NoError(t, err) @@ -1453,7 +1453,7 @@ func TestScaleUpBalanceGroups(t *testing.T) { processors := NewTestProcessors(&context) suOrchestrator := New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) - scaleUpStatus, typedErr := suOrchestrator.ScaleUp(pods, nodes, []*appsv1.DaemonSet{}, nodeInfos) + scaleUpStatus, typedErr := suOrchestrator.ScaleUp(pods, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) assert.NoError(t, typedErr) assert.True(t, scaleUpStatus.WasSuccessful()) @@ -1515,7 +1515,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) { suOrchestrator := New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) - scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos) + scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) assert.NoError(t, err) assert.True(t, scaleUpStatus.WasSuccessful()) assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups)) @@ -1570,7 +1570,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) { suOrchestrator := New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) - scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos) + scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) assert.NoError(t, err) assert.True(t, scaleUpStatus.WasSuccessful()) assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups)) diff --git a/cluster-autoscaler/core/scaleup/orchestrator/rejectedreasons.go b/cluster-autoscaler/core/scaleup/orchestrator/rejectedreasons.go new file mode 100644 index 000000000000..b5e0ab92392a --- /dev/null +++ b/cluster-autoscaler/core/scaleup/orchestrator/rejectedreasons.go @@ -0,0 +1,37 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package orchestrator + +// RejectedReasons contains information why given node group was rejected as a scale-up option. +type RejectedReasons struct { + messages []string +} + +// NewRejectedReasons creates new RejectedReason object. +func NewRejectedReasons(m string) *RejectedReasons { + return &RejectedReasons{[]string{m}} +} + +// Reasons returns a slice of reasons why the node group was not considered for scale up. +func (sr *RejectedReasons) Reasons() []string { + return sr.messages +} + +var ( + // AllOrNothingReason means the node group was rejected because not all pods would fit it when using all-or-nothing strategy. + AllOrNothingReason = NewRejectedReasons("not all pods would fit and scale-up is using all-or-nothing strategy") +) diff --git a/cluster-autoscaler/core/scaleup/scaleup.go b/cluster-autoscaler/core/scaleup/scaleup.go index 6f9781bff0e2..0da619134ea2 100644 --- a/cluster-autoscaler/core/scaleup/scaleup.go +++ b/cluster-autoscaler/core/scaleup/scaleup.go @@ -48,6 +48,7 @@ type Orchestrator interface { nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, + allOrNothing bool, ) (*status.ScaleUpStatus, errors.AutoscalerError) // ScaleUpToNodeGroupMinSize tries to scale up node groups that have less nodes // than the configured min size. The source of truth for the current node group diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index a56ad8e4268f..b45733bdbb20 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -576,7 +576,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr klog.V(1).Info("Unschedulable pods are very new, waiting one iteration for more") } else { scaleUpStart := preScaleUp() - scaleUpStatus, typedErr = a.scaleUpOrchestrator.ScaleUp(unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups) + scaleUpStatus, typedErr = a.scaleUpOrchestrator.ScaleUp(unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups, false) if exit, err := postScaleUp(scaleUpStart); exit { return err } diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go index beadd9acd1d3..70e75057ac78 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go @@ -91,6 +91,7 @@ func (o *provReqOrchestrator) ScaleUp( nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, + _ bool, ) (*status.ScaleUpStatus, ca_errors.AutoscalerError) { if !o.initialized { return &status.ScaleUpStatus{}, ca_errors.ToAutoscalerError(ca_errors.InternalError, fmt.Errorf("provisioningrequest.Orchestrator is not initialized")) diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go index 5d2bd3bf0249..2cb99b21bd75 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go @@ -115,7 +115,7 @@ func TestScaleUp(t *testing.T) { provisioningClasses: []provisioningClass{checkcapacity.New(client)}, } orchestrator.Initialize(&autoscalingContext, nil, nil, nil, taints.TaintConfig{}) - st, err := orchestrator.ScaleUp(prPods, []*apiv1.Node{}, []*v1.DaemonSet{}, map[string]*framework.NodeInfo{}) + st, err := orchestrator.ScaleUp(prPods, []*apiv1.Node{}, []*v1.DaemonSet{}, map[string]*framework.NodeInfo{}, false) if !tc.err { assert.NoError(t, err) assert.Equal(t, tc.scaleUpResult, st.Result) diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator.go b/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator.go index cda3bc297d70..5c7f792e55b2 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator.go @@ -68,6 +68,7 @@ func (o *WrapperOrchestrator) ScaleUp( nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, + allOrNothing bool, ) (*status.ScaleUpStatus, errors.AutoscalerError) { defer func() { o.scaleUpRegularPods = !o.scaleUpRegularPods }() @@ -79,9 +80,9 @@ func (o *WrapperOrchestrator) ScaleUp( } if o.scaleUpRegularPods { - return o.podsOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos) + return o.podsOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos, allOrNothing) } - return o.provReqOrchestrator.ScaleUp(provReqPods, nodes, daemonSets, nodeInfos) + return o.provReqOrchestrator.ScaleUp(provReqPods, nodes, daemonSets, nodeInfos, allOrNothing) } func splitOut(unschedulablePods []*apiv1.Pod) (provReqPods, regularPods []*apiv1.Pod) { diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator_test.go b/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator_test.go index 1e28b8f10779..bf31f59f8b1b 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator_test.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/wrapper_orchestrator_test.go @@ -56,9 +56,9 @@ func TestWrapperScaleUp(t *testing.T) { pod.Annotations[provreq.ProvisioningRequestPodAnnotationKey] = "true" } unschedulablePods := append(regularPods, provReqPods...) - _, err := o.ScaleUp(unschedulablePods, nil, nil, nil) + _, err := o.ScaleUp(unschedulablePods, nil, nil, nil, false) assert.Equal(t, err.Error(), provisioningRequestErrorMsg) - _, err = o.ScaleUp(unschedulablePods, nil, nil, nil) + _, err = o.ScaleUp(unschedulablePods, nil, nil, nil, false) assert.Equal(t, err.Error(), regularPodsErrorMsg) } @@ -71,6 +71,7 @@ func (f *fakeScaleUp) ScaleUp( nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, + allOrNothing bool, ) (*status.ScaleUpStatus, errors.AutoscalerError) { return nil, errors.NewAutoscalerError(errors.InternalError, f.errorMsg) } From 12374e6c786f27c470b31daf19b0e06b6a961fcd Mon Sep 17 00:00:00 2001 From: Aleksandra Malinowska Date: Wed, 15 May 2024 13:14:51 +0200 Subject: [PATCH 2/2] Review fixes --- .../core/scaleup/orchestrator/executor.go | 37 +++++++------- .../core/scaleup/orchestrator/orchestrator.go | 48 ++++++++++++------- .../scaleup/orchestrator/orchestrator_test.go | 33 ++++++++++++- cluster-autoscaler/core/test/common.go | 1 + .../orchestrator/orchestrator.go | 2 +- 5 files changed, 85 insertions(+), 36 deletions(-) diff --git a/cluster-autoscaler/core/scaleup/orchestrator/executor.go b/cluster-autoscaler/core/scaleup/orchestrator/executor.go index 5a685fa7781f..6d41849f67ee 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/executor.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/executor.go @@ -61,20 +61,20 @@ func (e *scaleUpExecutor) ExecuteScaleUps( scaleUpInfos []nodegroupset.ScaleUpInfo, nodeInfos map[string]*schedulerframework.NodeInfo, now time.Time, - allOrNothing bool, + atomic bool, ) (errors.AutoscalerError, []cloudprovider.NodeGroup) { options := e.autoscalingContext.AutoscalingOptions if options.ParallelScaleUp { - return e.executeScaleUpsParallel(scaleUpInfos, nodeInfos, now, allOrNothing) + return e.executeScaleUpsParallel(scaleUpInfos, nodeInfos, now, atomic) } - return e.executeScaleUpsSync(scaleUpInfos, nodeInfos, now, allOrNothing) + return e.executeScaleUpsSync(scaleUpInfos, nodeInfos, now, atomic) } func (e *scaleUpExecutor) executeScaleUpsSync( scaleUpInfos []nodegroupset.ScaleUpInfo, nodeInfos map[string]*schedulerframework.NodeInfo, now time.Time, - allOrNothing bool, + atomic bool, ) (errors.AutoscalerError, []cloudprovider.NodeGroup) { availableGPUTypes := e.autoscalingContext.CloudProvider.GetAvailableGPUTypes() for _, scaleUpInfo := range scaleUpInfos { @@ -83,7 +83,7 @@ func (e *scaleUpExecutor) executeScaleUpsSync( klog.Errorf("ExecuteScaleUp: failed to get node info for node group %s", scaleUpInfo.Group.Id()) continue } - if aErr := e.executeScaleUp(scaleUpInfo, nodeInfo, availableGPUTypes, now, allOrNothing); aErr != nil { + if aErr := e.executeScaleUp(scaleUpInfo, nodeInfo, availableGPUTypes, now, atomic); aErr != nil { return aErr, []cloudprovider.NodeGroup{scaleUpInfo.Group} } } @@ -94,7 +94,7 @@ func (e *scaleUpExecutor) executeScaleUpsParallel( scaleUpInfos []nodegroupset.ScaleUpInfo, nodeInfos map[string]*schedulerframework.NodeInfo, now time.Time, - allOrNothing bool, + atomic bool, ) (errors.AutoscalerError, []cloudprovider.NodeGroup) { if err := checkUniqueNodeGroups(scaleUpInfos); err != nil { return err, extractNodeGroups(scaleUpInfos) @@ -116,7 +116,7 @@ func (e *scaleUpExecutor) executeScaleUpsParallel( klog.Errorf("ExecuteScaleUp: failed to get node info for node group %s", info.Group.Id()) return } - if aErr := e.executeScaleUp(info, nodeInfo, availableGPUTypes, now, allOrNothing); aErr != nil { + if aErr := e.executeScaleUp(info, nodeInfo, availableGPUTypes, now, atomic); aErr != nil { errResults <- errResult{err: aErr, info: &info} } }(scaleUpInfo) @@ -139,12 +139,23 @@ func (e *scaleUpExecutor) executeScaleUpsParallel( return nil, nil } +func (e *scaleUpExecutor) increaseSize(nodeGroup cloudprovider.NodeGroup, increase int, atomic bool) error { + if atomic { + if err := nodeGroup.AtomicIncreaseSize(increase); err != cloudprovider.ErrNotImplemented { + return err + } + // If error is cloudprovider.ErrNotImplemented, fall back to non-atomic + // increase - cloud provider doesn't support it. + } + return nodeGroup.IncreaseSize(increase) +} + func (e *scaleUpExecutor) executeScaleUp( info nodegroupset.ScaleUpInfo, nodeInfo *schedulerframework.NodeInfo, availableGPUTypes map[string]struct{}, now time.Time, - allOrNothing bool, + atomic bool, ) errors.AutoscalerError { gpuConfig := e.autoscalingContext.CloudProvider.GetNodeGpuConfig(nodeInfo.Node()) gpuResourceName, gpuType := gpu.GetGpuInfoForMetrics(gpuConfig, availableGPUTypes, nodeInfo.Node(), nil) @@ -152,15 +163,7 @@ func (e *scaleUpExecutor) executeScaleUp( e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup", "Scale-up: setting group %s size to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize) increase := info.NewSize - info.CurrentSize - var err error - if allOrNothing { - if err = info.Group.AtomicIncreaseSize(increase); err == cloudprovider.ErrNotImplemented { - err = info.Group.IncreaseSize(increase) - } - } else { - err = info.Group.IncreaseSize(increase) - } - if err != nil { + if err := e.increaseSize(info.Group, increase, atomic); err != nil { e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeWarning, "FailedToScaleUpGroup", "Scale-up failed for group %s: %v", info.Group.Id(), err) aerr := errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to increase node group size: ") e.scaleStateNotifier.RegisterFailedScaleUp(info.Group, string(aerr.Type()), aerr.Error(), gpuResourceName, gpuType, now) diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go index 497c10b15db4..7fb533570288 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go @@ -89,7 +89,7 @@ func (o *ScaleUpOrchestrator) ScaleUp( nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, - allOrNothing bool, + allOrNothing bool, // Either request enough capacity for all unschedulablePods, or don't request it at all. ) (*status.ScaleUpStatus, errors.AutoscalerError) { if !o.initialized { return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized")) @@ -217,7 +217,10 @@ func (o *ScaleUpOrchestrator) ScaleUp( if newNodes < bestOption.NodeCount { klog.V(1).Infof("Only %d nodes can be added to %s due to cluster-wide limits", newNodes, bestOption.NodeGroup.Id()) if allOrNothing { - return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups) + // Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable. + klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated") + markedEquivalenceGroups := markAllGroupsAsUnschedulable(podEquivalenceGroups, AllOrNothingReason) + return buildNoOptionsAvailableStatus(markedEquivalenceGroups, skippedNodeGroups, nodeGroups), nil } } @@ -226,7 +229,10 @@ func (o *ScaleUpOrchestrator) ScaleUp( if !bestOption.NodeGroup.Exist() { if allOrNothing && bestOption.NodeGroup.MaxSize() < newNodes { klog.V(1).Infof("Can only create a new node group with max %d nodes, need %d nodes", bestOption.NodeGroup.MaxSize(), newNodes) - return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups) + // Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable. + klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated") + markedEquivalenceGroups := markAllGroupsAsUnschedulable(podEquivalenceGroups, AllOrNothingReason) + return buildNoOptionsAvailableStatus(markedEquivalenceGroups, skippedNodeGroups, nodeGroups), nil } var scaleUpStatus *status.ScaleUpStatus createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets) @@ -278,7 +284,10 @@ func (o *ScaleUpOrchestrator) ScaleUp( if totalCapacity < newNodes { klog.V(1).Infof("Can only add %d nodes due to node group limits, need %d nodes", totalCapacity, newNodes) if allOrNothing { - return stopAllOrNothingScaleUp(podEquivalenceGroups, skippedNodeGroups, nodeGroups) + // Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable. + klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated") + markedEquivalenceGroups := markAllGroupsAsUnschedulable(podEquivalenceGroups, AllOrNothingReason) + return buildNoOptionsAvailableStatus(markedEquivalenceGroups, skippedNodeGroups, nodeGroups), nil } } @@ -498,20 +507,22 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption( if err != nil && err != cloudprovider.ErrNotImplemented { klog.Errorf("Failed to get autoscaling options for node group %s: %v", nodeGroup.Id(), err) } + + // Special handling for groups that only scale from zero to max. if autoscalingOptions != nil && autoscalingOptions.ZeroOrMaxNodeScaling { + // For zero-or-max scaling groups, the only valid value of node count is node group's max size. if allOrNothing && option.NodeCount > nodeGroup.MaxSize() { - // The following check can quietly cap the number of nodes and so breaks the - // assumption that as long as we're able to provision option.NodeCount of - // nodes, all pods will be accommodated. - // This fix isn't applicable to non-atomic node groups as we'll operate - // on uncapped number of nodes in that case. + // We would have to cap the node count, which means not all pods will be + // accommodated. This violates the principle of all-or-nothing strategy. option.Pods = nil option.NodeCount = 0 } - if option.NodeCount > 0 && option.NodeCount != nodeGroup.MaxSize() { + if option.NodeCount > 0 { + // Cap or increase the number of nodes to the only valid value - node group's max size. option.NodeCount = nodeGroup.MaxSize() } } + return option } @@ -746,24 +757,27 @@ func matchingSchedulablePodGroups(podGroups []estimator.PodEquivalenceGroup, sim return true } -func stopAllOrNothingScaleUp(egs []*equivalence.PodGroup, skipped map[string]status.Reasons, ngs []cloudprovider.NodeGroup) (*status.ScaleUpStatus, errors.AutoscalerError) { - // Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable. +func markAllGroupsAsUnschedulable(egs []*equivalence.PodGroup, reason status.Reasons) []*equivalence.PodGroup { for _, eg := range egs { if eg.Schedulable { - errs := map[string]status.Reasons{} + if eg.SchedulingErrors == nil { + eg.SchedulingErrors = map[string]status.Reasons{} + } for _, sg := range eg.SchedulableGroups { - errs[sg] = AllOrNothingReason + eg.SchedulingErrors[sg] = reason } eg.Schedulable = false } } - klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated") + return egs +} + +func buildNoOptionsAvailableStatus(egs []*equivalence.PodGroup, skipped map[string]status.Reasons, ngs []cloudprovider.NodeGroup) *status.ScaleUpStatus { return &status.ScaleUpStatus{ Result: status.ScaleUpNoOptionsAvailable, PodsRemainUnschedulable: GetRemainingPods(egs, skipped), ConsideredNodeGroups: ngs, - }, nil - + } } // GetRemainingPods returns information about pods which CA is unable to help diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go index 9b887a108313..40b779f545fe 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go @@ -861,6 +861,37 @@ func TestNoCreateNodeGroupMaxCoresLimitHit(t *testing.T) { simpleNoScaleUpTest(t, config, results) } +func TestAllOrNothing(t *testing.T) { + options := defaultOptions + + extraPods := []PodConfig{} + extraPodNames := []string{} + for i := 0; i < 11; i++ { + podName := fmt.Sprintf("pod-%d", i) + extraPods = append(extraPods, PodConfig{Name: podName, Cpu: 1000, Memory: 100}) + extraPodNames = append(extraPodNames, podName) + } + + config := &ScaleUpTestConfig{ + Nodes: []NodeConfig{ + {Name: "n1", Cpu: 1000, Memory: 1000, Gpu: 0, Ready: true, Group: "ng"}, + }, + Pods: []PodConfig{}, + ExtraPods: extraPods, + Options: &options, + AllOrNothing: true, + } + + result := &ScaleTestResults{ + NoScaleUpReason: "all-or-nothing", + ScaleUpStatus: ScaleUpStatusInfo{ + PodsRemainUnschedulable: extraPodNames, + }, + } + + simpleNoScaleUpTest(t, config, result) +} + func simpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig, expectedResults *ScaleTestResults) { results := runSimpleScaleUpTest(t, config) assert.NotNil(t, results.GroupSizeChanges, "Expected scale up event") @@ -1032,7 +1063,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR context.ExpanderStrategy = expander // scale up - scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) + scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos, config.AllOrNothing) processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus) // aggregate group size changes diff --git a/cluster-autoscaler/core/test/common.go b/cluster-autoscaler/core/test/common.go index ff9ef91a545c..0bbb6bdc8815 100644 --- a/cluster-autoscaler/core/test/common.go +++ b/cluster-autoscaler/core/test/common.go @@ -127,6 +127,7 @@ type ScaleUpTestConfig struct { Options *config.AutoscalingOptions NodeTemplateConfigs map[string]*NodeTemplateConfig EnableAutoprovisioning bool + AllOrNothing bool } // ScaleUpTestResult represents a node groups scale up result diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go index 70e75057ac78..6738ab5ea006 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go @@ -91,7 +91,7 @@ func (o *provReqOrchestrator) ScaleUp( nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, - _ bool, + _ bool, // Provision() doesn't use this parameter. ) (*status.ScaleUpStatus, ca_errors.AutoscalerError) { if !o.initialized { return &status.ScaleUpStatus{}, ca_errors.ToAutoscalerError(ca_errors.InternalError, fmt.Errorf("provisioningrequest.Orchestrator is not initialized"))