Skip to content

Commit

Permalink
Add support for all-or-nothing scale-up strategy (#6821)
Browse files Browse the repository at this point in the history
* Add support for all-or-nothing scale-up strategy

* Review fixes
  • Loading branch information
aleksandra-malinowska authored May 16, 2024
1 parent fa9969a commit e795ac9
Show file tree
Hide file tree
Showing 12 changed files with 185 additions and 25 deletions.
7 changes: 4 additions & 3 deletions cluster-autoscaler/core/scaleup/equivalence/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import (

// PodGroup contains a group of pods that are equivalent in terms of schedulability.
type PodGroup struct {
Pods []*apiv1.Pod
SchedulingErrors map[string]status.Reasons
Schedulable bool
Pods []*apiv1.Pod
SchedulingErrors map[string]status.Reasons
SchedulableGroups []string
Schedulable bool
}

// BuildPodGroups prepares pod groups with equivalent scheduling properties.
Expand Down
25 changes: 20 additions & 5 deletions cluster-autoscaler/core/scaleup/orchestrator/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,20 @@ func (e *scaleUpExecutor) ExecuteScaleUps(
scaleUpInfos []nodegroupset.ScaleUpInfo,
nodeInfos map[string]*schedulerframework.NodeInfo,
now time.Time,
atomic bool,
) (errors.AutoscalerError, []cloudprovider.NodeGroup) {
options := e.autoscalingContext.AutoscalingOptions
if options.ParallelScaleUp {
return e.executeScaleUpsParallel(scaleUpInfos, nodeInfos, now)
return e.executeScaleUpsParallel(scaleUpInfos, nodeInfos, now, atomic)
}
return e.executeScaleUpsSync(scaleUpInfos, nodeInfos, now)
return e.executeScaleUpsSync(scaleUpInfos, nodeInfos, now, atomic)
}

func (e *scaleUpExecutor) executeScaleUpsSync(
scaleUpInfos []nodegroupset.ScaleUpInfo,
nodeInfos map[string]*schedulerframework.NodeInfo,
now time.Time,
atomic bool,
) (errors.AutoscalerError, []cloudprovider.NodeGroup) {
availableGPUTypes := e.autoscalingContext.CloudProvider.GetAvailableGPUTypes()
for _, scaleUpInfo := range scaleUpInfos {
Expand All @@ -81,7 +83,7 @@ func (e *scaleUpExecutor) executeScaleUpsSync(
klog.Errorf("ExecuteScaleUp: failed to get node info for node group %s", scaleUpInfo.Group.Id())
continue
}
if aErr := e.executeScaleUp(scaleUpInfo, nodeInfo, availableGPUTypes, now); aErr != nil {
if aErr := e.executeScaleUp(scaleUpInfo, nodeInfo, availableGPUTypes, now, atomic); aErr != nil {
return aErr, []cloudprovider.NodeGroup{scaleUpInfo.Group}
}
}
Expand All @@ -92,6 +94,7 @@ func (e *scaleUpExecutor) executeScaleUpsParallel(
scaleUpInfos []nodegroupset.ScaleUpInfo,
nodeInfos map[string]*schedulerframework.NodeInfo,
now time.Time,
atomic bool,
) (errors.AutoscalerError, []cloudprovider.NodeGroup) {
if err := checkUniqueNodeGroups(scaleUpInfos); err != nil {
return err, extractNodeGroups(scaleUpInfos)
Expand All @@ -113,7 +116,7 @@ func (e *scaleUpExecutor) executeScaleUpsParallel(
klog.Errorf("ExecuteScaleUp: failed to get node info for node group %s", info.Group.Id())
return
}
if aErr := e.executeScaleUp(info, nodeInfo, availableGPUTypes, now); aErr != nil {
if aErr := e.executeScaleUp(info, nodeInfo, availableGPUTypes, now, atomic); aErr != nil {
errResults <- errResult{err: aErr, info: &info}
}
}(scaleUpInfo)
Expand All @@ -136,19 +139,31 @@ func (e *scaleUpExecutor) executeScaleUpsParallel(
return nil, nil
}

func (e *scaleUpExecutor) increaseSize(nodeGroup cloudprovider.NodeGroup, increase int, atomic bool) error {
if atomic {
if err := nodeGroup.AtomicIncreaseSize(increase); err != cloudprovider.ErrNotImplemented {
return err
}
// If error is cloudprovider.ErrNotImplemented, fall back to non-atomic
// increase - cloud provider doesn't support it.
}
return nodeGroup.IncreaseSize(increase)
}

func (e *scaleUpExecutor) executeScaleUp(
info nodegroupset.ScaleUpInfo,
nodeInfo *schedulerframework.NodeInfo,
availableGPUTypes map[string]struct{},
now time.Time,
atomic bool,
) errors.AutoscalerError {
gpuConfig := e.autoscalingContext.CloudProvider.GetNodeGpuConfig(nodeInfo.Node())
gpuResourceName, gpuType := gpu.GetGpuInfoForMetrics(gpuConfig, availableGPUTypes, nodeInfo.Node(), nil)
klog.V(0).Infof("Scale-up: setting group %s size to %d", info.Group.Id(), info.NewSize)
e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup",
"Scale-up: setting group %s size to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize)
increase := info.NewSize - info.CurrentSize
if err := info.Group.IncreaseSize(increase); err != nil {
if err := e.increaseSize(info.Group, increase, atomic); err != nil {
e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeWarning, "FailedToScaleUpGroup", "Scale-up failed for group %s: %v", info.Group.Id(), err)
aerr := errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to increase node group size: ")
e.scaleStateNotifier.RegisterFailedScaleUp(info.Group, string(aerr.Type()), aerr.Error(), gpuResourceName, gpuType, now)
Expand Down
79 changes: 75 additions & 4 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
allOrNothing bool, // Either request enough capacity for all unschedulablePods, or don't request it at all.
) (*status.ScaleUpStatus, errors.AutoscalerError) {
if !o.initialized {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized"))
Expand Down Expand Up @@ -146,11 +147,13 @@ func (o *ScaleUpOrchestrator) ScaleUp(
}

for _, nodeGroup := range validNodeGroups {
option := o.ComputeExpansionOption(nodeGroup, schedulablePodGroups, nodeInfos, len(nodes)+len(upcomingNodes), now)
option := o.ComputeExpansionOption(nodeGroup, schedulablePodGroups, nodeInfos, len(nodes)+len(upcomingNodes), now, allOrNothing)
o.processors.BinpackingLimiter.MarkProcessed(o.autoscalingContext, nodeGroup.Id())

if len(option.Pods) == 0 || option.NodeCount == 0 {
klog.V(4).Infof("No pod can fit to %s", nodeGroup.Id())
} else if allOrNothing && len(option.Pods) < len(unschedulablePods) {
klog.V(4).Infof("Some pods can't fit to %s, giving up due to all-or-nothing scale-up strategy", nodeGroup.Id())
} else {
options = append(options, option)
}
Expand Down Expand Up @@ -211,9 +214,26 @@ func (o *ScaleUpOrchestrator) ScaleUp(
aErr)
}

if newNodes < bestOption.NodeCount {
klog.V(1).Infof("Only %d nodes can be added to %s due to cluster-wide limits", newNodes, bestOption.NodeGroup.Id())
if allOrNothing {
// Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable.
klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated")
markedEquivalenceGroups := markAllGroupsAsUnschedulable(podEquivalenceGroups, AllOrNothingReason)
return buildNoOptionsAvailableStatus(markedEquivalenceGroups, skippedNodeGroups, nodeGroups), nil
}
}

// If necessary, create the node group. This is no longer simulation, an empty node group will be created by cloud provider if supported.
createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0)
if !bestOption.NodeGroup.Exist() {
if allOrNothing && bestOption.NodeGroup.MaxSize() < newNodes {
klog.V(1).Infof("Can only create a new node group with max %d nodes, need %d nodes", bestOption.NodeGroup.MaxSize(), newNodes)
// Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable.
klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated")
markedEquivalenceGroups := markAllGroupsAsUnschedulable(podEquivalenceGroups, AllOrNothingReason)
return buildNoOptionsAvailableStatus(markedEquivalenceGroups, skippedNodeGroups, nodeGroups), nil
}
var scaleUpStatus *status.ScaleUpStatus
createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets)
if aErr != nil {
Expand Down Expand Up @@ -256,9 +276,24 @@ func (o *ScaleUpOrchestrator) ScaleUp(
aErr)
}

// Last check before scale-up. Node group capacity (both due to max size limits & current size) is only checked when balancing.
totalCapacity := 0
for _, sui := range scaleUpInfos {
totalCapacity += sui.NewSize - sui.CurrentSize
}
if totalCapacity < newNodes {
klog.V(1).Infof("Can only add %d nodes due to node group limits, need %d nodes", totalCapacity, newNodes)
if allOrNothing {
// Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable.
klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated")
markedEquivalenceGroups := markAllGroupsAsUnschedulable(podEquivalenceGroups, AllOrNothingReason)
return buildNoOptionsAvailableStatus(markedEquivalenceGroups, skippedNodeGroups, nodeGroups), nil
}
}

// Execute scale up.
klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos)
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now)
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now, allOrNothing)
if aErr != nil {
return status.UpdateScaleUpError(
&status.ScaleUpStatus{
Expand Down Expand Up @@ -364,7 +399,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
}

klog.V(1).Infof("ScaleUpToNodeGroupMinSize: final scale-up plan: %v", scaleUpInfos)
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now)
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now, false /* allOrNothing disabled */)
if aErr != nil {
return status.UpdateScaleUpError(
&status.ScaleUpStatus{
Expand Down Expand Up @@ -447,6 +482,7 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption(
nodeInfos map[string]*schedulerframework.NodeInfo,
currentNodeCount int,
now time.Time,
allOrNothing bool,
) expander.Option {
option := expander.Option{NodeGroup: nodeGroup}
podGroups := schedulablePodGroups[nodeGroup.Id()]
Expand All @@ -471,11 +507,22 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption(
if err != nil && err != cloudprovider.ErrNotImplemented {
klog.Errorf("Failed to get autoscaling options for node group %s: %v", nodeGroup.Id(), err)
}

// Special handling for groups that only scale from zero to max.
if autoscalingOptions != nil && autoscalingOptions.ZeroOrMaxNodeScaling {
if option.NodeCount > 0 && option.NodeCount != nodeGroup.MaxSize() {
// For zero-or-max scaling groups, the only valid value of node count is node group's max size.
if allOrNothing && option.NodeCount > nodeGroup.MaxSize() {
// We would have to cap the node count, which means not all pods will be
// accommodated. This violates the principle of all-or-nothing strategy.
option.Pods = nil
option.NodeCount = 0
}
if option.NodeCount > 0 {
// Cap or increase the number of nodes to the only valid value - node group's max size.
option.NodeCount = nodeGroup.MaxSize()
}
}

return option
}

Expand Down Expand Up @@ -564,6 +611,7 @@ func (o *ScaleUpOrchestrator) SchedulablePodGroups(
})
// Mark pod group as (theoretically) schedulable.
eg.Schedulable = true
eg.SchedulableGroups = append(eg.SchedulableGroups, nodeGroup.Id())
} else {
klog.V(2).Infof("Pod %s/%s can't be scheduled on %s, predicate checking error: %v", samplePod.Namespace, samplePod.Name, nodeGroup.Id(), err.VerboseMessage())
if podCount := len(eg.Pods); podCount > 1 {
Expand Down Expand Up @@ -709,6 +757,29 @@ func matchingSchedulablePodGroups(podGroups []estimator.PodEquivalenceGroup, sim
return true
}

func markAllGroupsAsUnschedulable(egs []*equivalence.PodGroup, reason status.Reasons) []*equivalence.PodGroup {
for _, eg := range egs {
if eg.Schedulable {
if eg.SchedulingErrors == nil {
eg.SchedulingErrors = map[string]status.Reasons{}
}
for _, sg := range eg.SchedulableGroups {
eg.SchedulingErrors[sg] = reason
}
eg.Schedulable = false
}
}
return egs
}

func buildNoOptionsAvailableStatus(egs []*equivalence.PodGroup, skipped map[string]status.Reasons, ngs []cloudprovider.NodeGroup) *status.ScaleUpStatus {
return &status.ScaleUpStatus{
Result: status.ScaleUpNoOptionsAvailable,
PodsRemainUnschedulable: GetRemainingPods(egs, skipped),
ConsideredNodeGroups: ngs,
}
}

// GetRemainingPods returns information about pods which CA is unable to help
// at this moment.
func GetRemainingPods(egs []*equivalence.PodGroup, skipped map[string]status.Reasons) []status.NoScaleUpInfo {
Expand Down
45 changes: 38 additions & 7 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,37 @@ func TestNoCreateNodeGroupMaxCoresLimitHit(t *testing.T) {
simpleNoScaleUpTest(t, config, results)
}

func TestAllOrNothing(t *testing.T) {
options := defaultOptions

extraPods := []PodConfig{}
extraPodNames := []string{}
for i := 0; i < 11; i++ {
podName := fmt.Sprintf("pod-%d", i)
extraPods = append(extraPods, PodConfig{Name: podName, Cpu: 1000, Memory: 100})
extraPodNames = append(extraPodNames, podName)
}

config := &ScaleUpTestConfig{
Nodes: []NodeConfig{
{Name: "n1", Cpu: 1000, Memory: 1000, Gpu: 0, Ready: true, Group: "ng"},
},
Pods: []PodConfig{},
ExtraPods: extraPods,
Options: &options,
AllOrNothing: true,
}

result := &ScaleTestResults{
NoScaleUpReason: "all-or-nothing",
ScaleUpStatus: ScaleUpStatusInfo{
PodsRemainUnschedulable: extraPodNames,
},
}

simpleNoScaleUpTest(t, config, result)
}

func simpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig, expectedResults *ScaleTestResults) {
results := runSimpleScaleUpTest(t, config)
assert.NotNil(t, results.GroupSizeChanges, "Expected scale up event")
Expand Down Expand Up @@ -1032,7 +1063,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR
context.ExpanderStrategy = expander

// scale up
scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos, config.AllOrNothing)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)

// aggregate group size changes
Expand Down Expand Up @@ -1131,7 +1162,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
processors := NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)

assert.NoError(t, err)
// Node group is unhealthy.
Expand Down Expand Up @@ -1185,7 +1216,7 @@ func TestBinpackingLimiter(t *testing.T) {
expander := NewMockRepotingStrategy(t, nil)
context.ExpanderStrategy = expander

scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{extraPod}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{extraPod}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
Expand Down Expand Up @@ -1231,7 +1262,7 @@ func TestScaleUpNoHelp(t *testing.T) {
processors := NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)

assert.NoError(t, err)
Expand Down Expand Up @@ -1453,7 +1484,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
processors := NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, typedErr := suOrchestrator.ScaleUp(pods, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, typedErr := suOrchestrator.ScaleUp(pods, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)

assert.NoError(t, typedErr)
assert.True(t, scaleUpStatus.WasSuccessful())
Expand Down Expand Up @@ -1515,7 +1546,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {

suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups))
Expand Down Expand Up @@ -1570,7 +1601,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {

suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups))
Expand Down
37 changes: 37 additions & 0 deletions cluster-autoscaler/core/scaleup/orchestrator/rejectedreasons.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package orchestrator

// RejectedReasons contains information why given node group was rejected as a scale-up option.
type RejectedReasons struct {
messages []string
}

// NewRejectedReasons creates new RejectedReason object.
func NewRejectedReasons(m string) *RejectedReasons {
return &RejectedReasons{[]string{m}}
}

// Reasons returns a slice of reasons why the node group was not considered for scale up.
func (sr *RejectedReasons) Reasons() []string {
return sr.messages
}

var (
// AllOrNothingReason means the node group was rejected because not all pods would fit it when using all-or-nothing strategy.
AllOrNothingReason = NewRejectedReasons("not all pods would fit and scale-up is using all-or-nothing strategy")
)
1 change: 1 addition & 0 deletions cluster-autoscaler/core/scaleup/scaleup.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Orchestrator interface {
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
allOrNothing bool,
) (*status.ScaleUpStatus, errors.AutoscalerError)
// ScaleUpToNodeGroupMinSize tries to scale up node groups that have less nodes
// than the configured min size. The source of truth for the current node group
Expand Down
Loading

0 comments on commit e795ac9

Please sign in to comment.