Skip to content

Commit

Permalink
Merge pull request #7059 from kisieland/add-option-to-filer-some-node…
Browse files Browse the repository at this point in the history
…s-in-simulation

Add option to pass custom filter funtion for nodes
  • Loading branch information
k8s-ci-robot committed Jul 16, 2024
2 parents d8c49f3 + 4d845bd commit eb4f907
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
klog "k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type filterOutSchedulablePodListProcessor struct {
schedulingSimulator *scheduling.HintingSimulator
nodeFilter func(*schedulerframework.NodeInfo) bool
}

// NewFilterOutSchedulablePodListProcessor creates a PodListProcessor filtering out schedulable pods
func NewFilterOutSchedulablePodListProcessor(predicateChecker predicatechecker.PredicateChecker) *filterOutSchedulablePodListProcessor {
func NewFilterOutSchedulablePodListProcessor(predicateChecker predicatechecker.PredicateChecker, nodeFilter func(*schedulerframework.NodeInfo) bool) *filterOutSchedulablePodListProcessor {
return &filterOutSchedulablePodListProcessor{
schedulingSimulator: scheduling.NewHintingSimulator(predicateChecker),
nodeFilter: nodeFilter,
}
}

Expand Down Expand Up @@ -98,7 +101,7 @@ func (p *filterOutSchedulablePodListProcessor) filterOutSchedulableByPacking(uns
return corev1helpers.PodPriority(unschedulableCandidates[i]) > corev1helpers.PodPriority(unschedulableCandidates[j])
})

statuses, overflowingControllerCount, err := p.schedulingSimulator.TrySchedulePods(clusterSnapshot, unschedulableCandidates, scheduling.ScheduleAnywhere, false)
statuses, overflowingControllerCount, err := p.schedulingSimulator.TrySchedulePods(clusterSnapshot, unschedulableCandidates, p.nodeFilter, false)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,26 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

func TestFilterOutSchedulable(t *testing.T) {
node := buildReadyTestNode("node", 2000, 100)
matchesAllNodes := func(*schedulerframework.NodeInfo) bool { return true }
matchesNoNodes := func(*schedulerframework.NodeInfo) bool { return false }

testCases := map[string]struct {
nodesWithPods map[*apiv1.Node][]*apiv1.Pod
unschedulableCandidates []*apiv1.Pod
expectedScheduledPods []*apiv1.Pod
expectedUnscheduledPods []*apiv1.Pod
nodeMatches func(*schedulerframework.NodeInfo) bool
nodeFilter func(*schedulerframework.NodeInfo) bool
}{
"single empty node, no pods": {
nodesWithPods: map[*apiv1.Node][]*apiv1.Pod{node: {}},
nodeMatches: matchesAllNodes,
nodeFilter: matchesAllNodes,
},
"single empty node, single schedulable pod": {
nodesWithPods: map[*apiv1.Node][]*apiv1.Pod{node: {}},
Expand All @@ -52,7 +54,7 @@ func TestFilterOutSchedulable(t *testing.T) {
expectedScheduledPods: []*apiv1.Pod{
BuildTestPod("pod", 500, 10),
},
nodeMatches: matchesAllNodes,
nodeFilter: matchesAllNodes,
},
"single empty node, many schedulable pods": {
nodesWithPods: map[*apiv1.Node][]*apiv1.Pod{node: {}},
Expand All @@ -66,7 +68,7 @@ func TestFilterOutSchedulable(t *testing.T) {
BuildTestPod("pod2", 500, 10),
BuildTestPod("pod3", 800, 10),
},
nodeMatches: matchesAllNodes,
nodeFilter: matchesAllNodes,
},
"single empty node, single unschedulable pod": {
nodesWithPods: map[*apiv1.Node][]*apiv1.Pod{node: {}},
Expand All @@ -76,7 +78,7 @@ func TestFilterOutSchedulable(t *testing.T) {
expectedUnscheduledPods: []*apiv1.Pod{
BuildTestPod("pod1", 3000, 10),
},
nodeMatches: matchesAllNodes,
nodeFilter: matchesAllNodes,
},
"single empty node, various pods": {
nodesWithPods: map[*apiv1.Node][]*apiv1.Pod{node: {}},
Expand All @@ -92,7 +94,7 @@ func TestFilterOutSchedulable(t *testing.T) {
expectedUnscheduledPods: []*apiv1.Pod{
BuildTestPod("pod3", 1800, 10),
},
nodeMatches: matchesAllNodes,
nodeFilter: matchesAllNodes,
},
"single empty node, some priority pods": {
nodesWithPods: map[*apiv1.Node][]*apiv1.Pod{node: {}},
Expand All @@ -108,7 +110,7 @@ func TestFilterOutSchedulable(t *testing.T) {
expectedUnscheduledPods: []*apiv1.Pod{
buildPriorityTestPod("pod2", 500, 10, 10),
},
nodeMatches: matchesAllNodes,
nodeFilter: matchesAllNodes,
},
"non-empty node with a single pods scheduled": {
nodesWithPods: map[*apiv1.Node][]*apiv1.Pod{
Expand All @@ -128,7 +130,7 @@ func TestFilterOutSchedulable(t *testing.T) {
expectedUnscheduledPods: []*apiv1.Pod{
BuildTestPod("pod4", 300, 10),
},
nodeMatches: matchesAllNodes,
nodeFilter: matchesAllNodes,
},
"non-empty node with many pods scheduled": {
nodesWithPods: map[*apiv1.Node][]*apiv1.Pod{
Expand All @@ -149,7 +151,22 @@ func TestFilterOutSchedulable(t *testing.T) {
BuildTestPod("pod3", 1000, 10),
BuildTestPod("pod5", 300, 10),
},
nodeMatches: matchesAllNodes,
nodeFilter: matchesAllNodes,
},
"single empty node, various pods, node should not be considered": {
nodesWithPods: map[*apiv1.Node][]*apiv1.Pod{node: {}},
unschedulableCandidates: []*apiv1.Pod{
BuildTestPod("pod1", 200, 10),
BuildTestPod("pod2", 500, 10),
BuildTestPod("pod3", 1800, 10),
},
expectedScheduledPods: []*apiv1.Pod{},
expectedUnscheduledPods: []*apiv1.Pod{
BuildTestPod("pod1", 200, 10),
BuildTestPod("pod2", 500, 10),
BuildTestPod("pod3", 1800, 10),
},
nodeFilter: matchesNoNodes,
},
}

Expand Down Expand Up @@ -177,7 +194,7 @@ func TestFilterOutSchedulable(t *testing.T) {

clusterSnapshot.Fork()

processor := NewFilterOutSchedulablePodListProcessor(predicateChecker)
processor := NewFilterOutSchedulablePodListProcessor(predicateChecker, tc.nodeFilter)
unschedulablePods, err := processor.filterOutSchedulableByPacking(tc.unschedulableCandidates, clusterSnapshot)

assert.NoError(t, err)
Expand Down Expand Up @@ -278,7 +295,7 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
processor := NewFilterOutSchedulablePodListProcessor(predicateChecker)
processor := NewFilterOutSchedulablePodListProcessor(predicateChecker, scheduling.ScheduleAnywhere)
if stillPending, err := processor.filterOutSchedulableByPacking(pendingPods, clusterSnapshot); err != nil {
assert.NoError(b, err)
} else if len(stillPending) < tc.pendingPods {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ package podlistprocessor
import (
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

// NewDefaultPodListProcessor returns a default implementation of the pod list
// processor, which wraps and sequentially runs other sub-processors.
func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker) *pods.CombinedPodListProcessor {
func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker, nodeFilter func(*schedulerframework.NodeInfo) bool) *pods.CombinedPodListProcessor {
return pods.NewCombinedPodListProcessor([]pods.PodListProcessor{
NewClearTPURequestsPodListProcessor(),
NewFilterOutExpendablePodListProcessor(),
NewCurrentlyDrainedNodesPodListProcessor(),
NewFilterOutSchedulablePodListProcessor(predicateChecker),
NewFilterOutSchedulablePodListProcessor(predicateChecker, nodeFilter),
NewFilterOutDaemonSetPodListProcessor(),
})
}
3 changes: 2 additions & 1 deletion cluster-autoscaler/core/test/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand Down Expand Up @@ -176,7 +177,7 @@ func ExtractPodNames(pods []*apiv1.Pod) []string {
// NewTestProcessors returns a set of simple processors for use in tests.
func NewTestProcessors(context *context.AutoscalingContext) *processors.AutoscalingProcessors {
return &processors.AutoscalingProcessors{
PodListProcessor: podlistprocessor.NewDefaultPodListProcessor(context.PredicateChecker),
PodListProcessor: podlistprocessor.NewDefaultPodListProcessor(context.PredicateChecker, scheduling.ScheduleAnywhere),
NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{},
BinpackingLimiter: binpacking.NewTimeLimiter(context.MaxNodeGroupBinpackingDuration),
NodeGroupSetProcessor: nodegroupset.NewDefaultNodeGroupSetProcessor([]string{}, config.NodeGroupDifferenceRatios{}),
Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"

"github.com/spf13/pflag"
Expand Down Expand Up @@ -495,7 +496,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter

opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions)
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets)
podListProcessor := podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker)
podListProcessor := podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker, scheduling.ScheduleAnywhere)

if autoscalingOptions.ProvisioningRequestEnabled {
podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager()))
Expand Down

0 comments on commit eb4f907

Please sign in to comment.