Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert scale-down checks to drainability rules #6164

Merged
merged 9 commits into from
Oct 11, 2023
10 changes: 10 additions & 0 deletions cluster-autoscaler/core/scaledown/pdb/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ func (t *basicRemainingPdbTracker) GetPdbs() []*policyv1.PodDisruptionBudget {
return pdbs
}

func (t *basicRemainingPdbTracker) MatchingPdbs(pod *apiv1.Pod) []*policyv1.PodDisruptionBudget {
var pdbs []*policyv1.PodDisruptionBudget
for _, pdbInfo := range t.pdbInfos {
if pod.Namespace == pdbInfo.pdb.Namespace && pdbInfo.selector.Matches(labels.Set(pod.Labels)) {
pdbs = append(pdbs, pdbInfo.pdb)
}
}
return pdbs
}

func (t *basicRemainingPdbTracker) CanRemovePods(pods []*apiv1.Pod) (canRemove, inParallel bool, blockingPod *drain.BlockingPod) {
inParallel = true
for _, pdbInfo := range t.pdbInfos {
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/core/scaledown/pdb/pdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type RemainingPdbTracker interface {
SetPdbs(pdbs []*policyv1.PodDisruptionBudget) error
// GetPdbs returns the current remaining PDBs.
GetPdbs() []*policyv1.PodDisruptionBudget
// MatchingPdbs returns all PDBs matching the pod.
MatchingPdbs(pod *apiv1.Pod) []*policyv1.PodDisruptionBudget

// CanRemovePods checks if the set of pods can be removed.
// inParallel indicates if the pods can be removed in parallel. If it is false
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/static_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (m *onNodeGroupDeleteMock) Delete(id string) error {

func setUpScaleDownActuator(ctx *context.AutoscalingContext, autoscalingOptions config.AutoscalingOptions) {
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, rules.Default(), NewTestProcessors(ctx).NodeGroupConfigProcessor)
ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, rules.Default(deleteOptions), NewTestProcessors(ctx).NodeGroupConfigProcessor)
}

func TestStaticAutoscalerRunOnce(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
if autoscalingOptions.ParallelDrain {
sdCandidatesSorting := previouscandidates.NewPreviousCandidates()
scaleDownCandidatesComparers = []scaledowncandidates.CandidatesComparer{
emptycandidates.NewEmptySortingProcessor(emptycandidates.NewNodeInfoGetter(opts.ClusterSnapshot), deleteOptions, rules.Default()),
emptycandidates.NewEmptySortingProcessor(emptycandidates.NewNodeInfoGetter(opts.ClusterSnapshot), deleteOptions, rules.Default(deleteOptions)),
sdCandidatesSorting,
}
opts.Processors.ScaleDownCandidatesNotifier.Register(sdCandidatesSorting)
Expand Down
9 changes: 3 additions & 6 deletions cluster-autoscaler/simulator/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ func (r *RemovalSimulator) FindNodesToRemove(
timestamp time.Time,
remainingPdbTracker pdb.RemainingPdbTracker,
) (nodesToRemove []NodeToBeRemoved, unremovableNodes []*UnremovableNode) {
result := make([]NodeToBeRemoved, 0)
unremovable := make([]*UnremovableNode, 0)

destinationMap := make(map[string]bool, len(destinations))
for _, destination := range destinations {
destinationMap[destination] = true
Expand All @@ -134,12 +131,12 @@ func (r *RemovalSimulator) FindNodesToRemove(
for _, nodeName := range candidates {
rn, urn := r.SimulateNodeRemoval(nodeName, destinationMap, timestamp, remainingPdbTracker)
if rn != nil {
result = append(result, *rn)
nodesToRemove = append(nodesToRemove, *rn)
} else if urn != nil {
unremovable = append(unremovable, urn)
unremovableNodes = append(unremovableNodes, urn)
}
}
return result, unremovable
return nodesToRemove, unremovableNodes
}

// SimulateNodeRemoval simulates removing a node from the cluster to check
Expand Down
37 changes: 12 additions & 25 deletions cluster-autoscaler/simulator/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,11 @@ func TestFindNodesToRemove(t *testing.T) {
fullNodeInfo.AddPod(pod4)

emptyNodeToRemove := NodeToBeRemoved{
Node: emptyNode,
PodsToReschedule: []*apiv1.Pod{},
DaemonSetPods: []*apiv1.Pod{},
Node: emptyNode,
}
drainableNodeToRemove := NodeToBeRemoved{
Node: drainableNode,
PodsToReschedule: []*apiv1.Pod{pod1, pod2},
DaemonSetPods: []*apiv1.Pod{},
}

clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot()
Expand All @@ -152,25 +149,19 @@ func TestFindNodesToRemove(t *testing.T) {
tracker := NewUsageTracker()

tests := []findNodesToRemoveTestConfig{
// just an empty node, should be removed
{
name: "just an empty node, should be removed",
pods: []*apiv1.Pod{},
candidates: []string{emptyNode.Name},
allNodes: []*apiv1.Node{emptyNode},
toRemove: []NodeToBeRemoved{emptyNodeToRemove},
unremovable: []*UnremovableNode{},
name: "just an empty node, should be removed",
candidates: []string{emptyNode.Name},
allNodes: []*apiv1.Node{emptyNode},
toRemove: []NodeToBeRemoved{emptyNodeToRemove},
},
// just a drainable node, but nowhere for pods to go to
{
name: "just a drainable node, but nowhere for pods to go to",
pods: []*apiv1.Pod{pod1, pod2},
candidates: []string{drainableNode.Name},
allNodes: []*apiv1.Node{drainableNode},
toRemove: []NodeToBeRemoved{},
unremovable: []*UnremovableNode{{Node: drainableNode, Reason: NoPlaceToMovePods}},
},
// drainable node, and a mostly empty node that can take its pods
{
name: "drainable node, and a mostly empty node that can take its pods",
pods: []*apiv1.Pod{pod1, pod2, pod3},
Expand All @@ -179,23 +170,19 @@ func TestFindNodesToRemove(t *testing.T) {
toRemove: []NodeToBeRemoved{drainableNodeToRemove},
unremovable: []*UnremovableNode{{Node: nonDrainableNode, Reason: BlockedByPod, BlockingPod: &drain.BlockingPod{Pod: pod3, Reason: drain.NotReplicated}}},
},
// drainable node, and a full node that cannot fit anymore pods
{
name: "drainable node, and a full node that cannot fit anymore pods",
pods: []*apiv1.Pod{pod1, pod2, pod4},
candidates: []string{drainableNode.Name},
allNodes: []*apiv1.Node{drainableNode, fullNode},
toRemove: []NodeToBeRemoved{},
unremovable: []*UnremovableNode{{Node: drainableNode, Reason: NoPlaceToMovePods}},
},
// 4 nodes, 1 empty, 1 drainable
{
name: "4 nodes, 1 empty, 1 drainable",
pods: []*apiv1.Pod{pod1, pod2, pod3, pod4},
candidates: []string{emptyNode.Name, drainableNode.Name},
allNodes: []*apiv1.Node{emptyNode, drainableNode, fullNode, nonDrainableNode},
toRemove: []NodeToBeRemoved{emptyNodeToRemove, drainableNodeToRemove},
unremovable: []*UnremovableNode{},
name: "4 nodes, 1 empty, 1 drainable",
pods: []*apiv1.Pod{pod1, pod2, pod3, pod4},
candidates: []string{emptyNode.Name, drainableNode.Name},
allNodes: []*apiv1.Node{emptyNode, drainableNode, fullNode, nonDrainableNode},
toRemove: []NodeToBeRemoved{emptyNodeToRemove, drainableNodeToRemove},
},
}

Expand All @@ -209,8 +196,8 @@ func TestFindNodesToRemove(t *testing.T) {
r := NewRemovalSimulator(registry, clusterSnapshot, predicateChecker, tracker, testDeleteOptions(), nil, false)
toRemove, unremovable := r.FindNodesToRemove(test.candidates, destinations, time.Now(), nil)
fmt.Printf("Test scenario: %s, found len(toRemove)=%v, expected len(test.toRemove)=%v\n", test.name, len(toRemove), len(test.toRemove))
assert.Equal(t, toRemove, test.toRemove)
assert.Equal(t, unremovable, test.unremovable)
assert.Equal(t, test.toRemove, toRemove)
assert.Equal(t, test.unremovable, unremovable)
})
}
}
Expand Down
57 changes: 16 additions & 41 deletions cluster-autoscaler/simulator/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package simulator

import (
"fmt"
"time"

apiv1 "k8s.io/api/core/v1"
Expand All @@ -31,66 +30,42 @@ import (
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

// GetPodsToMove returns a list of pods that should be moved elsewhere
// and a list of DaemonSet pods that should be evicted if the node
// is drained. Raises error if there is an unreplicated pod.
// Based on kubectl drain code. If listers is nil it makes an assumption that RC, DS, Jobs and RS were deleted
// along with their pods (no abandoned pods with dangling created-by annotation).
// If listers is not nil it checks whether RC, DS, Jobs and RS that created these pods
// still exist.
// TODO(x13n): Rewrite GetPodsForDeletionOnNodeDrain into a set of DrainabilityRules.
// GetPodsToMove returns a list of pods that should be moved elsewhere and a
// list of DaemonSet pods that should be evicted if the node is drained.
// Raises error if there is an unreplicated pod.
// Based on kubectl drain code. If listers is nil it makes an assumption that
// RC, DS, Jobs and RS were deleted along with their pods (no abandoned pods
// with dangling created-by annotation).
// If listers is not nil it checks whether RC, DS, Jobs and RS that created
// these pods still exist.
func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, listers kube_util.ListerRegistry, remainingPdbTracker pdb.RemainingPdbTracker, timestamp time.Time) (pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod, blockingPod *drain.BlockingPod, err error) {
var drainPods, drainDs []*apiv1.Pod
if drainabilityRules == nil {
drainabilityRules = rules.Default()
drainabilityRules = rules.Default(deleteOptions)
}
if remainingPdbTracker == nil {
remainingPdbTracker = pdb.NewBasicRemainingPdbTracker()
}
drainCtx := &drainability.DrainContext{
RemainingPdbTracker: remainingPdbTracker,
DeleteOptions: deleteOptions,
Listers: listers,
Timestamp: timestamp,
}
for _, podInfo := range nodeInfo.Pods {
pod := podInfo.Pod
status := drainabilityRules.Drainable(drainCtx, pod)
switch status.Outcome {
case drainability.UndefinedOutcome:
pods = append(pods, podInfo.Pod)
case drainability.DrainOk:
case drainability.UndefinedOutcome, drainability.DrainOk:
if pod_util.IsDaemonSetPod(pod) {
drainDs = append(drainDs, pod)
daemonSetPods = append(daemonSetPods, pod)
} else {
drainPods = append(drainPods, pod)
pods = append(pods, pod)
}
case drainability.BlockDrain:
blockingPod = &drain.BlockingPod{
return nil, nil, &drain.BlockingPod{
Pod: pod,
Reason: status.BlockingReason,
}
err = status.Error
return
}, status.Error
}
}

pods, daemonSetPods, blockingPod, err = drain.GetPodsForDeletionOnNodeDrain(
pods,
remainingPdbTracker.GetPdbs(),
deleteOptions.SkipNodesWithSystemPods,
deleteOptions.SkipNodesWithLocalStorage,
deleteOptions.SkipNodesWithCustomControllerPods,
listers,
int32(deleteOptions.MinReplicaCount),
timestamp)
pods = append(pods, drainPods...)
daemonSetPods = append(daemonSetPods, drainDs...)
if err != nil {
return pods, daemonSetPods, blockingPod, err
}
if canRemove, _, blockingPodInfo := remainingPdbTracker.CanRemovePods(pods); !canRemove {
pod := blockingPodInfo.Pod
return []*apiv1.Pod{}, []*apiv1.Pod{}, blockingPodInfo, fmt.Errorf("not enough pod disruption budget to move %s/%s", pod.Namespace, pod.Name)
}

return pods, daemonSetPods, nil, nil
}
Loading
Loading