From 374cf611b74479897abde179e044a9093dbbaa29 Mon Sep 17 00:00:00 2001 From: Karol Wychowaniec Date: Tue, 27 Jun 2023 12:28:19 +0000 Subject: [PATCH] Address next set of comments --- .../core/scaledown/actuation/actuator.go | 4 +- .../core/scaledown/actuation/actuator_test.go | 4 +- .../core/scaledown/budgets/budgets.go | 12 ++--- .../core/scaledown/budgets/budgets_test.go | 4 +- .../core/scaledown/planner/planner.go | 43 ----------------- .../core/scaledown/planner/planner_test.go | 8 ++-- .../nodes/post_filtering_processor.go | 46 +++++++++++++++++-- 7 files changed, 58 insertions(+), 63 deletions(-) diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator.go b/cluster-autoscaler/core/scaledown/actuation/actuator.go index a144ae398da9..6b4d0a065ece 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator.go @@ -62,7 +62,7 @@ func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterState clusterState: csr, nodeDeletionTracker: ndt, nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, ndt)), - budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx, ndt), + budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx), deleteOptions: deleteOptions, } } @@ -85,7 +85,7 @@ func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node) (*status.ScaleDownS results, ts := a.nodeDeletionTracker.DeletionResults() scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: results, NodeDeleteResultsAsOf: ts} - emptyToDelete, drainToDelete := a.budgetProcessor.CropNodes(empty, drain) + emptyToDelete, drainToDelete := a.budgetProcessor.CropNodes(a.nodeDeletionTracker, empty, drain) if len(emptyToDelete) == 0 && len(drainToDelete) == 0 { scaleDownStatus.Result = status.ScaleDownNoNodeDeleted return scaleDownStatus, nil diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go index 01791cab13a1..93bec833cab1 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go @@ -834,7 +834,7 @@ func TestStartDeletion(t *testing.T) { actuator := Actuator{ ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt, nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor), - budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx, ndt), + budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx), } gotStatus, gotErr := actuator.StartDeletion(allEmptyNodes, allDrainNodes) if diff := cmp.Diff(tc.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" { @@ -1068,7 +1068,7 @@ func TestStartDeletionInBatchBasic(t *testing.T) { actuator := Actuator{ ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt, nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor), - budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx, ndt), + budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx), } for _, nodes := range deleteNodes { diff --git a/cluster-autoscaler/core/scaledown/budgets/budgets.go b/cluster-autoscaler/core/scaledown/budgets/budgets.go index cc13b926a4eb..3c33f3fe6b0f 100644 --- a/cluster-autoscaler/core/scaledown/budgets/budgets.go +++ b/cluster-autoscaler/core/scaledown/budgets/budgets.go @@ -35,25 +35,23 @@ type NodeGroupView struct { // ScaleDownBudgetProcessor is responsible for keeping the number of nodes deleted in parallel within defined limits. type ScaleDownBudgetProcessor struct { - ctx *context.AutoscalingContext - actuationStatus scaledown.ActuationStatus + ctx *context.AutoscalingContext } // NewScaleDownBudgetProcessor creates a ScaleDownBudgetProcessor instance. -func NewScaleDownBudgetProcessor(ctx *context.AutoscalingContext, as scaledown.ActuationStatus) *ScaleDownBudgetProcessor { +func NewScaleDownBudgetProcessor(ctx *context.AutoscalingContext) *ScaleDownBudgetProcessor { return &ScaleDownBudgetProcessor{ - ctx: ctx, - actuationStatus: as, + ctx: ctx, } } // CropNodes crops the provided node lists to respect scale-down max parallelism budgets. // The returned nodes are grouped by a node group. -func (bp *ScaleDownBudgetProcessor) CropNodes(empty, drain []*apiv1.Node) (emptyToDelete, drainToDelete []*NodeGroupView) { +func (bp *ScaleDownBudgetProcessor) CropNodes(as scaledown.ActuationStatus, empty, drain []*apiv1.Node) (emptyToDelete, drainToDelete []*NodeGroupView) { emptyIndividual, emptyAtomic := bp.categorize(bp.group(empty)) drainIndividual, drainAtomic := bp.categorize(bp.group(drain)) - emptyInProgress, drainInProgress := bp.actuationStatus.DeletionsInProgress() + emptyInProgress, drainInProgress := as.DeletionsInProgress() parallelismBudget := bp.ctx.MaxScaleDownParallelism - len(emptyInProgress) - len(drainInProgress) drainBudget := bp.ctx.MaxDrainParallelism - len(drainInProgress) diff --git a/cluster-autoscaler/core/scaledown/budgets/budgets_test.go b/cluster-autoscaler/core/scaledown/budgets/budgets_test.go index 773b93de2c18..ade0c9682b0d 100644 --- a/cluster-autoscaler/core/scaledown/budgets/budgets_test.go +++ b/cluster-autoscaler/core/scaledown/budgets/budgets_test.go @@ -335,8 +335,8 @@ func TestCropNodesToBudgets(t *testing.T) { drainList = append(drainList, bucket.Nodes...) } - budgeter := NewScaleDownBudgetProcessor(ctx, ndt) - gotEmpty, gotDrain := budgeter.CropNodes(emptyList, drainList) + budgeter := NewScaleDownBudgetProcessor(ctx) + gotEmpty, gotDrain := budgeter.CropNodes(ndt, emptyList, drainList) if diff := cmp.Diff(tc.wantEmpty, gotEmpty, cmpopts.EquateEmpty(), transformNodeGroupView); diff != "" { t.Errorf("cropNodesToBudgets empty nodes diff (-want +got):\n%s", diff) } diff --git a/cluster-autoscaler/core/scaledown/planner/planner.go b/cluster-autoscaler/core/scaledown/planner/planner.go index 6e0b1edb2c40..0cd4002c4af3 100644 --- a/cluster-autoscaler/core/scaledown/planner/planner.go +++ b/cluster-autoscaler/core/scaledown/planner/planner.go @@ -24,7 +24,6 @@ import ( apiv1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility" @@ -169,51 +168,9 @@ func (p *Planner) NodesToDelete(_ time.Time) (empty, needDrain []*apiv1.Node) { } } - empty, filteredOut := p.filterOutIncompleteAtomicNodeGroups(empty) - needDrain, _ = p.filterOutIncompleteAtomicNodeGroups(append(needDrain, filteredOut...)) return empty, needDrain } -func (p *Planner) filterOutIncompleteAtomicNodeGroups(nodes []*apiv1.Node) ([]*apiv1.Node, []*apiv1.Node) { - nodesByGroup := map[cloudprovider.NodeGroup][]*apiv1.Node{} - result := []*apiv1.Node{} - filteredOut := []*apiv1.Node{} - for _, node := range nodes { - nodeGroup, err := p.context.CloudProvider.NodeGroupForNode(node) - if err != nil { - klog.Errorf("Node %v will not scale down, failed to get node info: %s", node.Name, err) - continue - } - autoscalingOptions, err := nodeGroup.GetOptions(p.context.NodeGroupDefaults) - if err != nil { - klog.Errorf("Failed to get autoscaling options for node group %s: %v", nodeGroup.Id(), err) - continue - } - if autoscalingOptions != nil && autoscalingOptions.AtomicScaling { - klog.V(2).Infof("Considering node %s for atomic scale down", node.Name) - nodesByGroup[nodeGroup] = append(nodesByGroup[nodeGroup], node) - } else { - klog.V(2).Infof("Considering node %s for standard scale down", node.Name) - result = append(result, node) - } - } - for nodeGroup, nodes := range nodesByGroup { - ngSize, err := nodeGroup.TargetSize() - if err != nil { - klog.Errorf("Nodes from group %s will not scale down, failed to get target size: %s", nodeGroup.Id(), err) - continue - } - if ngSize == len(nodes) { - klog.V(2).Infof("Scheduling atomic scale down for all %v nodes from node group %s", len(nodes), nodeGroup.Id()) - result = append(result, nodes...) - } else { - klog.V(2).Infof("Skipping scale down for %v nodes from node group %s, all %v nodes have to be scaled down atomically", len(nodes), nodeGroup.Id(), ngSize) - filteredOut = append(filteredOut, nodes...) - } - } - return result, filteredOut -} - func allNodes(s clustersnapshot.ClusterSnapshot) ([]*apiv1.Node, error) { nodeInfos, err := s.NodeInfos().List() if err != nil { diff --git a/cluster-autoscaler/core/scaledown/planner/planner_test.go b/cluster-autoscaler/core/scaledown/planner/planner_test.go index bf16ea2b7a5a..255c3cc37b3b 100644 --- a/cluster-autoscaler/core/scaledown/planner/planner_test.go +++ b/cluster-autoscaler/core/scaledown/planner/planner_test.go @@ -623,7 +623,6 @@ func TestUpdateClusterStatUnneededNodesLimit(t *testing.T) { } func TestNodesToDelete(t *testing.T) { - testCases := []struct { name string nodes map[cloudprovider.NodeGroup][]simulator.NodeToBeRemoved @@ -695,10 +694,11 @@ func TestNodesToDelete(t *testing.T) { buildRemovableNode("node-3", 1), }, }, - wantEmpty: []*apiv1.Node{}, - wantDrain: []*apiv1.Node{ + wantEmpty: []*apiv1.Node{ buildRemovableNode("node-1", 0).Node, buildRemovableNode("node-2", 0).Node, + }, + wantDrain: []*apiv1.Node{ buildRemovableNode("node-3", 1).Node, }, }, @@ -743,6 +743,7 @@ func TestNodesToDelete(t *testing.T) { buildRemovableNode("node-10", 0).Node, buildRemovableNode("node-11", 0).Node, buildRemovableNode("node-12", 0).Node, + buildRemovableNode("node-13", 0).Node, }, wantDrain: []*apiv1.Node{ buildRemovableNode("node-4", 0).Node, @@ -750,7 +751,6 @@ func TestNodesToDelete(t *testing.T) { buildRemovableNode("node-6", 0).Node, buildRemovableNode("node-8", 0).Node, buildRemovableNode("node-9", 0).Node, - buildRemovableNode("node-13", 0).Node, buildRemovableNode("node-14", 0).Node, buildRemovableNode("node-15", 0).Node, }, diff --git a/cluster-autoscaler/processors/nodes/post_filtering_processor.go b/cluster-autoscaler/processors/nodes/post_filtering_processor.go index a7dfba82402a..60ef5a1f660b 100644 --- a/cluster-autoscaler/processors/nodes/post_filtering_processor.go +++ b/cluster-autoscaler/processors/nodes/post_filtering_processor.go @@ -17,8 +17,10 @@ limitations under the License. package nodes import ( + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/simulator" + klog "k8s.io/klog/v2" ) // PostFilteringScaleDownNodeProcessor selects first maxCount nodes (if possible) to be removed @@ -26,19 +28,57 @@ type PostFilteringScaleDownNodeProcessor struct { } // GetNodesToRemove selects up to maxCount nodes for deletion, by selecting a first maxCount candidates -func (n *PostFilteringScaleDownNodeProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved { +func (p *PostFilteringScaleDownNodeProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved { end := len(candidates) if len(candidates) > maxCount { end = maxCount } - return candidates[:end] + return p.filterOutIncompleteAtomicNodeGroups(ctx, candidates[:end]) } // CleanUp is called at CA termination -func (n *PostFilteringScaleDownNodeProcessor) CleanUp() { +func (p *PostFilteringScaleDownNodeProcessor) CleanUp() { } // NewPostFilteringScaleDownNodeProcessor returns a new PostFilteringScaleDownNodeProcessor func NewPostFilteringScaleDownNodeProcessor() *PostFilteringScaleDownNodeProcessor { return &PostFilteringScaleDownNodeProcessor{} } + +func (p *PostFilteringScaleDownNodeProcessor) filterOutIncompleteAtomicNodeGroups(ctx *context.AutoscalingContext, nodes []simulator.NodeToBeRemoved) []simulator.NodeToBeRemoved { + nodesByGroup := map[cloudprovider.NodeGroup][]simulator.NodeToBeRemoved{} + result := []simulator.NodeToBeRemoved{} + for _, node := range nodes { + nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node.Node) + if err != nil { + klog.Errorf("Node %v will not scale down, failed to get node info: %s", node.Node.Name, err) + continue + } + autoscalingOptions, err := nodeGroup.GetOptions(ctx.NodeGroupDefaults) + if err != nil { + klog.Errorf("Failed to get autoscaling options for node group %s: %v", nodeGroup.Id(), err) + continue + } + if autoscalingOptions != nil && autoscalingOptions.AtomicScaling { + klog.V(2).Infof("Considering node %s for atomic scale down", node.Node.Name) + nodesByGroup[nodeGroup] = append(nodesByGroup[nodeGroup], node) + } else { + klog.V(2).Infof("Considering node %s for standard scale down", node.Node.Name) + result = append(result, node) + } + } + for nodeGroup, nodes := range nodesByGroup { + ngSize, err := nodeGroup.TargetSize() + if err != nil { + klog.Errorf("Nodes from group %s will not scale down, failed to get target size: %s", nodeGroup.Id(), err) + continue + } + if ngSize == len(nodes) { + klog.V(2).Infof("Scheduling atomic scale down for all %v nodes from node group %s", len(nodes), nodeGroup.Id()) + result = append(result, nodes...) + } else { + klog.V(2).Infof("Skipping scale down for %v nodes from node group %s, all %v nodes have to be scaled down atomically", len(nodes), nodeGroup.Id(), ngSize) + } + } + return result +}