Skip to content

Commit

Permalink
Add unit test for group_deletion_scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
kawych committed Jun 21, 2023
1 parent 0ed8e0c commit 0316e4f
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 12 deletions.
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterState
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, deleteOptions, NewDefaultEvictor(deleteOptions, ndt)),
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, ndt)),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx, ndt),
deleteOptions: deleteOptions,
}
Expand Down
7 changes: 3 additions & 4 deletions cluster-autoscaler/core/scaledown/actuation/actuator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
Expand Down Expand Up @@ -665,7 +664,7 @@ func TestStartDeletion(t *testing.T) {
wantNodeDeleteResults: map[string]status.NodeDeleteResult{
"test-node-0": {ResultType: status.NodeDeleteOk},
"test-node-1": {ResultType: status.NodeDeleteErrorInternal, Err: cmpopts.AnyError},
"atomic-2-node-0": {ResultType: status.NodeDeleteErrorInternal, Err: cmpopts.AnyError},
"atomic-2-node-0": {ResultType: status.NodeDeleteErrorFailedToDelete, Err: cmpopts.AnyError},
"atomic-2-node-1": {ResultType: status.NodeDeleteErrorInternal, Err: cmpopts.AnyError},
},
},
Expand Down Expand Up @@ -834,7 +833,7 @@ func TestStartDeletion(t *testing.T) {
evictor := Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}
actuator := Actuator{
ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, simulator.NodeDeleteOptions{}, evictor),
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx, ndt),
}
gotStatus, gotErr := actuator.StartDeletion(allEmptyNodes, allDrainNodes)
Expand Down Expand Up @@ -1068,7 +1067,7 @@ func TestStartDeletionInBatchBasic(t *testing.T) {
evictor := Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}
actuator := Actuator{
ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, simulator.NodeDeleteOptions{}, evictor),
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx, ndt),
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,31 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
)

type batcher interface {
AddNodes(nodes []*apiv1.Node, nodeGroup cloudprovider.NodeGroup, drain bool)
}

// GroupDeletionScheduler is a wrapper over NodeDeletionBatcher responsible for grouping nodes for deletion
// and rolling back deletion of all nodes from a group in case deletion fails for any of the other nodes.
type GroupDeletionScheduler struct {
sync.Mutex
ctx *context.AutoscalingContext
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
nodeDeletionBatcher *NodeDeletionBatcher
nodeDeletionBatcher batcher
evictor Evictor
nodeQueue map[string][]*apiv1.Node
failuresForGroup map[string]bool
}

// NewGroupDeletionScheduler creates an instance of GroupDeletionScheduler.
func NewGroupDeletionScheduler(ctx *context.AutoscalingContext, ndt *deletiontracker.NodeDeletionTracker, ndb *NodeDeletionBatcher, deleteOptions simulator.NodeDeleteOptions, evictor Evictor) *GroupDeletionScheduler {
func NewGroupDeletionScheduler(ctx *context.AutoscalingContext, ndt *deletiontracker.NodeDeletionTracker, b batcher, evictor Evictor) *GroupDeletionScheduler {
return &GroupDeletionScheduler{
ctx: ctx,
nodeDeletionTracker: ndt,
nodeDeletionBatcher: ndb,
nodeDeletionBatcher: b,
evictor: evictor,
nodeQueue: map[string][]*apiv1.Node{},
failuresForGroup: map[string]bool{},
Expand Down Expand Up @@ -83,7 +86,7 @@ func (ds *GroupDeletionScheduler) ScheduleDeletion(nodeInfo *framework.NodeInfo,
func (ds *GroupDeletionScheduler) prepareNodeForDeletion(nodeInfo *framework.NodeInfo, drain bool) status.NodeDeleteResult {
node := nodeInfo.Node()
if drain {
if evictionResults, err := ds.evictor.DrainNode(ds.nodeDeletionBatcher.ctx, nodeInfo); err != nil {
if evictionResults, err := ds.evictor.DrainNode(ds.ctx, nodeInfo); err != nil {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToEvictPods, Err: err, PodEvictionResults: evictionResults}
}
} else {
Expand All @@ -104,7 +107,7 @@ func (ds *GroupDeletionScheduler) addToBatcher(nodeInfo *framework.NodeInfo, nod
ds.nodeQueue[nodeGroup.Id()] = append(ds.nodeQueue[nodeGroup.Id()], nodeInfo.Node())
if atomic {
if ds.failuresForGroup[nodeGroup.Id()] {
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "couldn't scale down other nodes in this node group")}
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: errors.NewAutoscalerError(errors.TransientError, "couldn't scale down other nodes in this node group")}
CleanUpAndRecordFailedScaleDownEvent(ds.ctx, nodeInfo.Node(), nodeGroup.Id(), drain, ds.nodeDeletionTracker, "scale down failed for node group as a whole", nodeDeleteResult)
delete(ds.nodeQueue, nodeGroup.Id())
}
Expand All @@ -127,7 +130,7 @@ func (ds *GroupDeletionScheduler) AbortNodeDeletion(node *apiv1.Node, nodeGroupI
if otherNode == node {
continue
}
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "couldn't scale down other nodes in this node group")}
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: errors.NewAutoscalerError(errors.TransientError, "couldn't scale down other nodes in this node group")}
CleanUpAndRecordFailedScaleDownEvent(ds.ctx, otherNode, nodeGroupId, drain, ds.nodeDeletionTracker, "scale down failed for node group as a whole", nodeDeleteResult)
}
delete(ds.nodeQueue, nodeGroupId)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package actuation

import (
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/scheduler/framework"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

func TestScheduleDeletion(t *testing.T) {
testNg := testprovider.NewTestNodeGroup("test", 0, 100, 3, true, false, "n1-standard-2", nil, nil)
atomic2 := sizedNodeGroup("atomic-2", 2, true)
atomic4 := sizedNodeGroup("atomic-4", 4, true)

testCases := []struct {
name string
toSchedule []*budgets.NodeGroupView
toAbort []*budgets.NodeGroupView
toScheduleAfterAbort []*budgets.NodeGroupView
wantDeleted int
wantNodeDeleteResults map[string]status.NodeDeleteResult
}{
{
name: "no nodes",
toSchedule: []*budgets.NodeGroupView{},
},
{
name: "individual nodes are deleted right away",
toSchedule: generateNodeGroupViewList(testNg, 0, 3),
toAbort: generateNodeGroupViewList(testNg, 3, 6),
toScheduleAfterAbort: generateNodeGroupViewList(testNg, 6, 9),
wantDeleted: 6,
wantNodeDeleteResults: map[string]status.NodeDeleteResult{
"test-node-3": {ResultType: status.NodeDeleteErrorFailedToDelete, Err: cmpopts.AnyError},
"test-node-4": {ResultType: status.NodeDeleteErrorFailedToDelete, Err: cmpopts.AnyError},
"test-node-5": {ResultType: status.NodeDeleteErrorFailedToDelete, Err: cmpopts.AnyError},
},
},
{
name: "whole atomic node groups deleted",
toSchedule: mergeLists(
generateNodeGroupViewList(atomic4, 0, 1),
generateNodeGroupViewList(atomic2, 0, 1),
generateNodeGroupViewList(atomic4, 1, 2),
generateNodeGroupViewList(atomic2, 1, 2),
generateNodeGroupViewList(atomic4, 2, 4),
),
wantDeleted: 6,
},
{
name: "atomic node group aborted in the process",
toSchedule: mergeLists(
generateNodeGroupViewList(atomic4, 0, 1),
generateNodeGroupViewList(atomic2, 0, 1),
generateNodeGroupViewList(atomic4, 1, 2),
generateNodeGroupViewList(atomic2, 1, 2),
),
toAbort: generateNodeGroupViewList(atomic4, 2, 3),
toScheduleAfterAbort: generateNodeGroupViewList(atomic4, 3, 4),
wantDeleted: 2,
wantNodeDeleteResults: map[string]status.NodeDeleteResult{
"atomic-4-node-0": {ResultType: status.NodeDeleteErrorFailedToDelete, Err: cmpopts.AnyError},
"atomic-4-node-1": {ResultType: status.NodeDeleteErrorFailedToDelete, Err: cmpopts.AnyError},
"atomic-4-node-2": {ResultType: status.NodeDeleteErrorFailedToDelete, Err: cmpopts.AnyError},
"atomic-4-node-3": {ResultType: status.NodeDeleteErrorFailedToDelete, Err: cmpopts.AnyError},
},
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error {
return nil
})
for _, bucket := range append(append(tc.toSchedule, tc.toAbort...), tc.toScheduleAfterAbort...) {
bucket.Group.(*testprovider.TestNodeGroup).SetCloudProvider(provider)
provider.InsertNodeGroup(bucket.Group)
for _, node := range bucket.Nodes {
provider.AddNode(bucket.Group.Id(), node)
}
}

batcher := &countingBatcher{}
tracker := deletiontracker.NewNodeDeletionTracker(0)
opts := config.AutoscalingOptions{}
fakeClient := &fake.Clientset{}
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{})
dsLister, err := kube_util.NewTestDaemonSetLister([]*appsv1.DaemonSet{})
if err != nil {
t.Fatalf("Couldn't create daemonset lister")
}
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, pdbLister, dsLister, nil, nil, nil, nil)
ctx, err := NewScaleTestAutoscalingContext(opts, fakeClient, registry, provider, nil, nil)
if err != nil {
t.Fatalf("Couldn't set up autoscaling context: %v", err)
}
scheduler := NewGroupDeletionScheduler(&ctx, tracker, batcher, Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom})

if err := scheduleAll(tc.toSchedule, scheduler); err != nil {
t.Fatal(err)
}
for _, bucket := range tc.toAbort {
for _, node := range bucket.Nodes {
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: cmpopts.AnyError}
scheduler.AbortNodeDeletion(node, bucket.Group.Id(), false, "simulated abort", nodeDeleteResult)
}
}
if err := scheduleAll(tc.toScheduleAfterAbort, scheduler); err != nil {
t.Fatal(err)
}

if batcher.addedNodes != tc.wantDeleted {
t.Errorf("Incorrect number of deleted nodes, want %v but got %v", tc.wantDeleted, batcher.addedNodes)
}
gotDeletionResult, _ := tracker.DeletionResults()
if diff := cmp.Diff(tc.wantNodeDeleteResults, gotDeletionResult, cmpopts.EquateEmpty(), cmpopts.EquateErrors()); diff != "" {
t.Errorf("NodeDeleteResults diff (-want +got):\n%s", diff)
}
})
}
}

type countingBatcher struct {
addedNodes int
}

func (b *countingBatcher) AddNodes(nodes []*apiv1.Node, nodeGroup cloudprovider.NodeGroup, drain bool) {
b.addedNodes += len(nodes)
}

func scheduleAll(toSchedule []*budgets.NodeGroupView, scheduler *GroupDeletionScheduler) error {
for _, bucket := range toSchedule {
bucketSize, err := bucket.Group.TargetSize()
if err != nil {
return fmt.Errorf("failed to get target size for node group %q: %s", bucket.Group.Id(), err)
}
for _, node := range bucket.Nodes {
scheduler.ScheduleDeletion(infoForNode(node), bucket.Group, bucketSize, false)
}
}
return nil
}

func infoForNode(n *apiv1.Node) *framework.NodeInfo {
info := schedulerframework.NewNodeInfo()
info.SetNode(n)
return info
}

func mergeLists(lists ...[]*budgets.NodeGroupView) []*budgets.NodeGroupView {
merged := []*budgets.NodeGroupView{}
for _, l := range lists {
merged = append(merged, l...)
}
return merged
}

0 comments on commit 0316e4f

Please sign in to comment.