Skip to content

Commit

Permalink
Add atomic scale down option for node groups
Browse files Browse the repository at this point in the history
  • Loading branch information
kawych committed Apr 18, 2023
1 parent 1009797 commit f14d91a
Show file tree
Hide file tree
Showing 8 changed files with 1,065 additions and 405 deletions.
3 changes: 3 additions & 0 deletions cluster-autoscaler/cloudprovider/test/test_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,9 @@ func (tng *TestNodeGroup) DeleteNodes(nodes []*apiv1.Node) error {
id := tng.id
tng.targetSize -= len(nodes)
tng.Unlock()
if tng.opts != nil && tng.opts.AtomicScaleDown && tng.targetSize != 0 {
return fmt.Errorf("TestNodeGroup: attempted to partially scale down a node group that should be scaled down atomically")
}
for _, node := range nodes {
err := tng.cloudProvider.onScaleDown(id, node.Name)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type NodeGroupAutoscalingOptions struct {
ScaleDownUnneededTime time.Duration
// ScaleDownUnreadyTime represents how long an unready node should be unneeded before it is eligible for scale down
ScaleDownUnreadyTime time.Duration
// AtomicScaleDown means that all nodes should be brought down all at once instead of one-by-one
AtomicScaleDown bool
}

const (
Expand Down
307 changes: 210 additions & 97 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go

Large diffs are not rendered by default.

821 changes: 559 additions & 262 deletions cluster-autoscaler/core/scaledown/actuation/actuator_test.go

Large diffs are not rendered by default.

78 changes: 39 additions & 39 deletions cluster-autoscaler/core/scaledown/actuation/delete_in_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package actuation

import (
"fmt"
"reflect"
"sync"
"time"

Expand Down Expand Up @@ -67,59 +66,68 @@ func NewNodeDeletionBatcher(ctx *context.AutoscalingContext, csr *clusterstate.C
}
}

// AddNode adds node to delete candidates and schedule deletion.
// AddNode adds node to delete candidates and schedules deletion.
func (d *NodeDeletionBatcher) AddNode(node *apiv1.Node, drain bool) error {
nodeGroup, err := d.ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
return err
}
return d.AddNodes([]*apiv1.Node{node}, nodeGroup, drain)
}

// AddNodes adds node list to delete candidates and schedules deletion.
func (d *NodeDeletionBatcher) AddNodes(nodes []*apiv1.Node, nodeGroup cloudprovider.NodeGroup, drain bool) error {
// If delete interval is 0, than instantly start node deletion.
if d.deleteInterval == 0 {
nodeGroup, err := deleteNodesFromCloudProvider(d.ctx, []*apiv1.Node{node})
if err != nil {
result := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err}
CleanUpAndRecordFailedScaleDownEvent(d.ctx, node, nodeGroup.Id(), drain, d.nodeDeletionTracker, "", result)
} else {
RegisterAndRecordSuccessfulScaleDownEvent(d.ctx, d.clusterState, node, nodeGroup, drain, d.nodeDeletionTracker)
err := deleteNodesFromCloudProvider(d.ctx, nodes, nodeGroup)
for _, node := range nodes {
if err != nil {
result := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err}
CleanUpAndRecordFailedScaleDownEvent(d.ctx, node, nodeGroup.Id(), drain, d.nodeDeletionTracker, "", result)
} else {
RegisterAndRecordSuccessfulScaleDownEvent(d.ctx, d.clusterState, node, nodeGroup, drain, d.nodeDeletionTracker)
}
}
return nil
}
nodeGroupId, first, err := d.addNodeToBucket(node, drain)
first, err := d.addNodesToBucket(nodes, nodeGroup, drain)
if err != nil {
return err
}
if first {
go func(nodeGroupId string) {
go func(nodeGroup cloudprovider.NodeGroup) {
time.Sleep(d.deleteInterval)
d.remove(nodeGroupId)
}(nodeGroupId)
d.executeForBucket(nodeGroup)
}(nodeGroup)
}
return nil
}

// AddToBucket adds node to delete candidates and return if it's a first node in the group.
func (d *NodeDeletionBatcher) addNodeToBucket(node *apiv1.Node, drain bool) (string, bool, error) {
func (d *NodeDeletionBatcher) addNodesToBucket(nodes []*apiv1.Node, nodeGroup cloudprovider.NodeGroup, drain bool) (bool, error) {
d.Lock()
defer d.Unlock()
nodeGroup, err := d.ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
return "", false, err
for _, node := range nodes {
d.drainedNodeDeletions[node.Name] = drain
}
d.drainedNodeDeletions[node.Name] = drain
val, ok := d.deletionsPerNodeGroup[nodeGroup.Id()]
if !ok || len(val) == 0 {
d.deletionsPerNodeGroup[nodeGroup.Id()] = []*apiv1.Node{node}
return nodeGroup.Id(), true, nil
d.deletionsPerNodeGroup[nodeGroup.Id()] = nodes
return true, nil
}
d.deletionsPerNodeGroup[nodeGroup.Id()] = append(d.deletionsPerNodeGroup[nodeGroup.Id()], node)
return nodeGroup.Id(), false, nil
d.deletionsPerNodeGroup[nodeGroup.Id()] = append(d.deletionsPerNodeGroup[nodeGroup.Id()], nodes...)
return false, nil
}

// remove delete nodes of a given nodeGroup, if successful, the deletion is recorded in CSR, and an event is emitted on the node.
func (d *NodeDeletionBatcher) remove(nodeGroupId string) error {
// executeForBucket deletes nodes of a given nodeGroup, if successful, the deletion is recorded in CSR, and an event is emitted on the node.
func (d *NodeDeletionBatcher) executeForBucket(nodeGroup cloudprovider.NodeGroup) error {
d.Lock()
defer d.Unlock()
nodes, ok := d.deletionsPerNodeGroup[nodeGroupId]
nodes, ok := d.deletionsPerNodeGroup[nodeGroup.Id()]
if !ok {
return fmt.Errorf("Node Group %s is not present in the batch deleter", nodeGroupId)
return fmt.Errorf("Node Group %s is not present in the batch deleter", nodeGroup.Id())
}
delete(d.deletionsPerNodeGroup, nodeGroupId)
delete(d.deletionsPerNodeGroup, nodeGroup.Id())
drainedNodeDeletions := make(map[string]bool)
for _, node := range nodes {
drainedNodeDeletions[node.Name] = d.drainedNodeDeletions[node.Name]
Expand All @@ -128,7 +136,7 @@ func (d *NodeDeletionBatcher) remove(nodeGroupId string) error {

go func(nodes []*apiv1.Node, drainedNodeDeletions map[string]bool) {
var result status.NodeDeleteResult
nodeGroup, err := deleteNodesFromCloudProvider(d.ctx, nodes)
err := deleteNodesFromCloudProvider(d.ctx, nodes, nodeGroup)
for _, node := range nodes {
drain := drainedNodeDeletions[node.Name]
if err != nil {
Expand All @@ -137,26 +145,18 @@ func (d *NodeDeletionBatcher) remove(nodeGroupId string) error {
} else {
RegisterAndRecordSuccessfulScaleDownEvent(d.ctx, d.clusterState, node, nodeGroup, drain, d.nodeDeletionTracker)
}

}
}(nodes, drainedNodeDeletions)
return nil
}

// deleteNodeFromCloudProvider removes the given nodes from cloud provider. No extra pre-deletion actions are executed on
// the Kubernetes side.
func deleteNodesFromCloudProvider(ctx *context.AutoscalingContext, nodes []*apiv1.Node) (cloudprovider.NodeGroup, error) {
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(nodes[0])
if err != nil {
return nodeGroup, errors.NewAutoscalerError(errors.CloudProviderError, "failed to find node group for %s: %v", nodes[0].Name, err)
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
return nodeGroup, errors.NewAutoscalerError(errors.InternalError, "picked node that doesn't belong to a node group: %s", nodes[0].Name)
func deleteNodesFromCloudProvider(ctx *context.AutoscalingContext, nodes []*apiv1.Node, nodeGroup cloudprovider.NodeGroup) error {
if err := nodeGroup.DeleteNodes(nodes); err != nil {
return errors.NewAutoscalerError(errors.CloudProviderError, "failed to delete nodes from group %s: %v", nodeGroup.Id(), err)
}
if err = nodeGroup.DeleteNodes(nodes); err != nil {
return nodeGroup, errors.NewAutoscalerError(errors.CloudProviderError, "failed to delete %s: %v", nodes[0].Name, err)
}
return nodeGroup, nil
return nil
}

func nodeScaleDownReason(node *apiv1.Node, drain bool) metrics.NodeScaleDownReason {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func TestAddNodeToBucket(t *testing.T) {
}
nodeGroup1 := "ng-1"
nodeGroup2 := "ng-2"
nodes1 := generateNodes(5, "ng-1")
nodes2 := generateNodes(5, "ng-2")
nodes1 := generateNodes(0, 5, "ng-1")
nodes2 := generateNodes(0, 5, "ng-2")
provider.AddNodeGroup(nodeGroup1, 1, 10, 5)
provider.AddNodeGroup(nodeGroup2, 1, 10, 5)
for _, node := range nodes1 {
Expand Down Expand Up @@ -91,7 +91,11 @@ func TestAddNodeToBucket(t *testing.T) {
}
batchCount := 0
for _, node := range test.nodes {
_, first, err := d.addNodeToBucket(node, test.drained)
nodeGroup, err := provider.NodeGroupForNode(node)
if err != nil {
t.Errorf("couldn't get node info for node %s: %s", node.Name, err)
}
first, err := d.addNodesToBucket([]*apiv1.Node{node}, nodeGroup, test.drained)
if err != nil {
t.Errorf("addNodeToBucket return error %q when addidng node %v", err, node)
}
Expand Down Expand Up @@ -168,6 +172,7 @@ func TestRemove(t *testing.T) {

ng := "ng"
provider.AddNodeGroup(ng, 1, 10, test.numNodes)
nodeGroup := provider.GetNodeGroup(ng)

d := NodeDeletionBatcher{
ctx: &ctx,
Expand All @@ -176,7 +181,7 @@ func TestRemove(t *testing.T) {
deletionsPerNodeGroup: make(map[string][]*apiv1.Node),
drainedNodeDeletions: make(map[string]bool),
}
nodes := generateNodes(test.numNodes, ng)
nodes := generateNodes(0, test.numNodes, ng)
failedDeletion := test.failedDeletion
for _, node := range nodes {
if failedDeletion > 0 {
Expand All @@ -191,14 +196,14 @@ func TestRemove(t *testing.T) {
Key: taints.ToBeDeletedTaint,
Effect: apiv1.TaintEffectNoSchedule,
})
_, _, err := d.addNodeToBucket(node, true)
_, err = d.addNodesToBucket([]*apiv1.Node{node}, nodeGroup, true)
if err != nil {
t.Errorf("addNodeToBucket return error %q when addidng node %v", err, node)
}
}
}

err = d.remove(ng)
err = d.executeForBucket(nodeGroup)
if test.err {
if err == nil {
t.Errorf("remove() should return error, but return nil")
Expand Down
51 changes: 50 additions & 1 deletion cluster-autoscaler/core/scaledown/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ package planner

import (
"fmt"
"math"
"time"

apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility"
Expand Down Expand Up @@ -132,6 +134,7 @@ func (p *Planner) CleanUpUnneededNodes() {
// NodesToDelete returns all Nodes that could be removed right now, according
// to the Planner.
func (p *Planner) NodesToDelete(_ time.Time) (empty, needDrain []*apiv1.Node) {
empty, needDrain = []*apiv1.Node{}, []*apiv1.Node{}
nodes, err := allNodes(p.context.ClusterSnapshot)
if err != nil {
klog.Errorf("Nothing will scale down, failed to list nodes from ClusterSnapshot: %v", err)
Expand All @@ -154,17 +157,63 @@ func (p *Planner) NodesToDelete(_ time.Time) (empty, needDrain []*apiv1.Node) {
// downs already in progress. If we pass the empty nodes first, they will be first
// to get deleted, thus we decrease chances of hitting the limit on non-empty scale down.
append(emptyRemovable, needDrainRemovable...),
p.context.AutoscalingOptions.MaxScaleDownParallelism)
// No need to limit the number of nodes, since it will happen later, in the actuation stage.
// It will make a more appropriate decision by using additional information about deletions
// in progress.
math.MaxInt)
for _, nodeToRemove := range nodesToRemove {
if len(nodeToRemove.PodsToReschedule) > 0 {
needDrain = append(needDrain, nodeToRemove.Node)
} else {
empty = append(empty, nodeToRemove.Node)
}
}

empty, filteredOut := p.filterOutIncompleteAtomicNodeGroups(empty)
needDrain, _ = p.filterOutIncompleteAtomicNodeGroups(append(needDrain, filteredOut...))
return empty, needDrain
}

func (p *Planner) filterOutIncompleteAtomicNodeGroups(nodes []*apiv1.Node) ([]*apiv1.Node, []*apiv1.Node) {
nodesByGroup := map[cloudprovider.NodeGroup][]*apiv1.Node{}
result := []*apiv1.Node{}
filteredOut := []*apiv1.Node{}
for _, node := range nodes {
nodeGroup, err := p.context.CloudProvider.NodeGroupForNode(node)
if err != nil {
klog.Errorf("Node %v will not scale down, failed to get node info: %s", node.Name, err)
continue
}
autoscalingOptions, err := nodeGroup.GetOptions(p.context.NodeGroupDefaults)
if err != nil {
klog.Errorf("Failed to get autoscaling options for node group %s: %v", nodeGroup.Id(), err)
continue
}
if autoscalingOptions != nil && autoscalingOptions.AtomicScaleDown {
klog.V(2).Infof("Considering node %s for atomic scale down", node.Name)
nodesByGroup[nodeGroup] = append(nodesByGroup[nodeGroup], node)
} else {
klog.V(2).Infof("Considering node %s for standard scale down", node.Name)
result = append(result, node)
}
}
for nodeGroup, nodes := range nodesByGroup {
ngSize, err := nodeGroup.TargetSize()
if err != nil {
klog.Errorf("Nodes from group %s will not scale down, failed to get target size: %s", nodeGroup.Id(), err)
continue
}
if ngSize == len(nodes) {
klog.V(2).Infof("Scheduling atomic scale down for all %v nodes from node group %s", len(nodes), nodeGroup.Id())
result = append(result, nodes...)
} else {
klog.V(2).Infof("Skipping scale down for %v nodes from node group %s, all %v nodes have to be scaled down atomically", len(nodes), nodeGroup.Id(), ngSize)
filteredOut = append(filteredOut, nodes...)
}
}
return result, filteredOut
}

func allNodes(s clustersnapshot.ClusterSnapshot) ([]*apiv1.Node, error) {
nodeInfos, err := s.NodeInfos().List()
if err != nil {
Expand Down
Loading

0 comments on commit f14d91a

Please sign in to comment.