Skip to content

Commit

Permalink
Address next set of comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kawych committed Jun 27, 2023
1 parent 05e1fe1 commit 374cf61
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 63 deletions.
4 changes: 2 additions & 2 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterState
clusterState: csr,
nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, ndt)),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx, ndt),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
deleteOptions: deleteOptions,
}
}
Expand All @@ -85,7 +85,7 @@ func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node) (*status.ScaleDownS
results, ts := a.nodeDeletionTracker.DeletionResults()
scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: results, NodeDeleteResultsAsOf: ts}

emptyToDelete, drainToDelete := a.budgetProcessor.CropNodes(empty, drain)
emptyToDelete, drainToDelete := a.budgetProcessor.CropNodes(a.nodeDeletionTracker, empty, drain)
if len(emptyToDelete) == 0 && len(drainToDelete) == 0 {
scaleDownStatus.Result = status.ScaleDownNoNodeDeleted
return scaleDownStatus, nil
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/core/scaledown/actuation/actuator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ func TestStartDeletion(t *testing.T) {
actuator := Actuator{
ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx, ndt),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
}
gotStatus, gotErr := actuator.StartDeletion(allEmptyNodes, allDrainNodes)
if diff := cmp.Diff(tc.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" {
Expand Down Expand Up @@ -1068,7 +1068,7 @@ func TestStartDeletionInBatchBasic(t *testing.T) {
actuator := Actuator{
ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx, ndt),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
}

for _, nodes := range deleteNodes {
Expand Down
12 changes: 5 additions & 7 deletions cluster-autoscaler/core/scaledown/budgets/budgets.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,23 @@ type NodeGroupView struct {

// ScaleDownBudgetProcessor is responsible for keeping the number of nodes deleted in parallel within defined limits.
type ScaleDownBudgetProcessor struct {
ctx *context.AutoscalingContext
actuationStatus scaledown.ActuationStatus
ctx *context.AutoscalingContext
}

// NewScaleDownBudgetProcessor creates a ScaleDownBudgetProcessor instance.
func NewScaleDownBudgetProcessor(ctx *context.AutoscalingContext, as scaledown.ActuationStatus) *ScaleDownBudgetProcessor {
func NewScaleDownBudgetProcessor(ctx *context.AutoscalingContext) *ScaleDownBudgetProcessor {
return &ScaleDownBudgetProcessor{
ctx: ctx,
actuationStatus: as,
ctx: ctx,
}
}

// CropNodes crops the provided node lists to respect scale-down max parallelism budgets.
// The returned nodes are grouped by a node group.
func (bp *ScaleDownBudgetProcessor) CropNodes(empty, drain []*apiv1.Node) (emptyToDelete, drainToDelete []*NodeGroupView) {
func (bp *ScaleDownBudgetProcessor) CropNodes(as scaledown.ActuationStatus, empty, drain []*apiv1.Node) (emptyToDelete, drainToDelete []*NodeGroupView) {
emptyIndividual, emptyAtomic := bp.categorize(bp.group(empty))
drainIndividual, drainAtomic := bp.categorize(bp.group(drain))

emptyInProgress, drainInProgress := bp.actuationStatus.DeletionsInProgress()
emptyInProgress, drainInProgress := as.DeletionsInProgress()
parallelismBudget := bp.ctx.MaxScaleDownParallelism - len(emptyInProgress) - len(drainInProgress)
drainBudget := bp.ctx.MaxDrainParallelism - len(drainInProgress)

Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/core/scaledown/budgets/budgets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,8 @@ func TestCropNodesToBudgets(t *testing.T) {
drainList = append(drainList, bucket.Nodes...)
}

budgeter := NewScaleDownBudgetProcessor(ctx, ndt)
gotEmpty, gotDrain := budgeter.CropNodes(emptyList, drainList)
budgeter := NewScaleDownBudgetProcessor(ctx)
gotEmpty, gotDrain := budgeter.CropNodes(ndt, emptyList, drainList)
if diff := cmp.Diff(tc.wantEmpty, gotEmpty, cmpopts.EquateEmpty(), transformNodeGroupView); diff != "" {
t.Errorf("cropNodesToBudgets empty nodes diff (-want +got):\n%s", diff)
}
Expand Down
43 changes: 0 additions & 43 deletions cluster-autoscaler/core/scaledown/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility"
Expand Down Expand Up @@ -169,51 +168,9 @@ func (p *Planner) NodesToDelete(_ time.Time) (empty, needDrain []*apiv1.Node) {
}
}

empty, filteredOut := p.filterOutIncompleteAtomicNodeGroups(empty)
needDrain, _ = p.filterOutIncompleteAtomicNodeGroups(append(needDrain, filteredOut...))
return empty, needDrain
}

func (p *Planner) filterOutIncompleteAtomicNodeGroups(nodes []*apiv1.Node) ([]*apiv1.Node, []*apiv1.Node) {
nodesByGroup := map[cloudprovider.NodeGroup][]*apiv1.Node{}
result := []*apiv1.Node{}
filteredOut := []*apiv1.Node{}
for _, node := range nodes {
nodeGroup, err := p.context.CloudProvider.NodeGroupForNode(node)
if err != nil {
klog.Errorf("Node %v will not scale down, failed to get node info: %s", node.Name, err)
continue
}
autoscalingOptions, err := nodeGroup.GetOptions(p.context.NodeGroupDefaults)
if err != nil {
klog.Errorf("Failed to get autoscaling options for node group %s: %v", nodeGroup.Id(), err)
continue
}
if autoscalingOptions != nil && autoscalingOptions.AtomicScaling {
klog.V(2).Infof("Considering node %s for atomic scale down", node.Name)
nodesByGroup[nodeGroup] = append(nodesByGroup[nodeGroup], node)
} else {
klog.V(2).Infof("Considering node %s for standard scale down", node.Name)
result = append(result, node)
}
}
for nodeGroup, nodes := range nodesByGroup {
ngSize, err := nodeGroup.TargetSize()
if err != nil {
klog.Errorf("Nodes from group %s will not scale down, failed to get target size: %s", nodeGroup.Id(), err)
continue
}
if ngSize == len(nodes) {
klog.V(2).Infof("Scheduling atomic scale down for all %v nodes from node group %s", len(nodes), nodeGroup.Id())
result = append(result, nodes...)
} else {
klog.V(2).Infof("Skipping scale down for %v nodes from node group %s, all %v nodes have to be scaled down atomically", len(nodes), nodeGroup.Id(), ngSize)
filteredOut = append(filteredOut, nodes...)
}
}
return result, filteredOut
}

func allNodes(s clustersnapshot.ClusterSnapshot) ([]*apiv1.Node, error) {
nodeInfos, err := s.NodeInfos().List()
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions cluster-autoscaler/core/scaledown/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,6 @@ func TestUpdateClusterStatUnneededNodesLimit(t *testing.T) {
}

func TestNodesToDelete(t *testing.T) {

testCases := []struct {
name string
nodes map[cloudprovider.NodeGroup][]simulator.NodeToBeRemoved
Expand Down Expand Up @@ -695,10 +694,11 @@ func TestNodesToDelete(t *testing.T) {
buildRemovableNode("node-3", 1),
},
},
wantEmpty: []*apiv1.Node{},
wantDrain: []*apiv1.Node{
wantEmpty: []*apiv1.Node{
buildRemovableNode("node-1", 0).Node,
buildRemovableNode("node-2", 0).Node,
},
wantDrain: []*apiv1.Node{
buildRemovableNode("node-3", 1).Node,
},
},
Expand Down Expand Up @@ -743,14 +743,14 @@ func TestNodesToDelete(t *testing.T) {
buildRemovableNode("node-10", 0).Node,
buildRemovableNode("node-11", 0).Node,
buildRemovableNode("node-12", 0).Node,
buildRemovableNode("node-13", 0).Node,
},
wantDrain: []*apiv1.Node{
buildRemovableNode("node-4", 0).Node,
buildRemovableNode("node-5", 0).Node,
buildRemovableNode("node-6", 0).Node,
buildRemovableNode("node-8", 0).Node,
buildRemovableNode("node-9", 0).Node,
buildRemovableNode("node-13", 0).Node,
buildRemovableNode("node-14", 0).Node,
buildRemovableNode("node-15", 0).Node,
},
Expand Down
46 changes: 43 additions & 3 deletions cluster-autoscaler/processors/nodes/post_filtering_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,68 @@ limitations under the License.
package nodes

import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
klog "k8s.io/klog/v2"
)

// PostFilteringScaleDownNodeProcessor selects first maxCount nodes (if possible) to be removed
type PostFilteringScaleDownNodeProcessor struct {
}

// GetNodesToRemove selects up to maxCount nodes for deletion, by selecting a first maxCount candidates
func (n *PostFilteringScaleDownNodeProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved {
func (p *PostFilteringScaleDownNodeProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved {
end := len(candidates)
if len(candidates) > maxCount {
end = maxCount
}
return candidates[:end]
return p.filterOutIncompleteAtomicNodeGroups(ctx, candidates[:end])
}

// CleanUp is called at CA termination
func (n *PostFilteringScaleDownNodeProcessor) CleanUp() {
func (p *PostFilteringScaleDownNodeProcessor) CleanUp() {
}

// NewPostFilteringScaleDownNodeProcessor returns a new PostFilteringScaleDownNodeProcessor
func NewPostFilteringScaleDownNodeProcessor() *PostFilteringScaleDownNodeProcessor {
return &PostFilteringScaleDownNodeProcessor{}
}

func (p *PostFilteringScaleDownNodeProcessor) filterOutIncompleteAtomicNodeGroups(ctx *context.AutoscalingContext, nodes []simulator.NodeToBeRemoved) []simulator.NodeToBeRemoved {
nodesByGroup := map[cloudprovider.NodeGroup][]simulator.NodeToBeRemoved{}
result := []simulator.NodeToBeRemoved{}
for _, node := range nodes {
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node.Node)
if err != nil {
klog.Errorf("Node %v will not scale down, failed to get node info: %s", node.Node.Name, err)
continue
}
autoscalingOptions, err := nodeGroup.GetOptions(ctx.NodeGroupDefaults)
if err != nil {
klog.Errorf("Failed to get autoscaling options for node group %s: %v", nodeGroup.Id(), err)
continue
}
if autoscalingOptions != nil && autoscalingOptions.AtomicScaling {
klog.V(2).Infof("Considering node %s for atomic scale down", node.Node.Name)
nodesByGroup[nodeGroup] = append(nodesByGroup[nodeGroup], node)
} else {
klog.V(2).Infof("Considering node %s for standard scale down", node.Node.Name)
result = append(result, node)
}
}
for nodeGroup, nodes := range nodesByGroup {
ngSize, err := nodeGroup.TargetSize()
if err != nil {
klog.Errorf("Nodes from group %s will not scale down, failed to get target size: %s", nodeGroup.Id(), err)
continue
}
if ngSize == len(nodes) {
klog.V(2).Infof("Scheduling atomic scale down for all %v nodes from node group %s", len(nodes), nodeGroup.Id())
result = append(result, nodes...)
} else {
klog.V(2).Infof("Skipping scale down for %v nodes from node group %s, all %v nodes have to be scaled down atomically", len(nodes), nodeGroup.Id(), ngSize)
}
}
return result
}

0 comments on commit 374cf61

Please sign in to comment.