Skip to content

Commit

Permalink
Add option to pass custom filter funtion for nodes
Browse files Browse the repository at this point in the history
This will allow users to filter-out some of the nodes when
filtering out pods potentially schedulable.
  • Loading branch information
kisieland committed Jul 16, 2024
1 parent 6da986f commit 890bf24
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
klog "k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type filterOutSchedulablePodListProcessor struct {
schedulingSimulator *scheduling.HintingSimulator
nodesToConsider func(*schedulerframework.NodeInfo) bool
}

// NewFilterOutSchedulablePodListProcessor creates a PodListProcessor filtering out schedulable pods
func NewFilterOutSchedulablePodListProcessor(predicateChecker predicatechecker.PredicateChecker) *filterOutSchedulablePodListProcessor {
func NewFilterOutSchedulablePodListProcessor(predicateChecker predicatechecker.PredicateChecker, nodesToConsider func(*schedulerframework.NodeInfo) bool) *filterOutSchedulablePodListProcessor {
return &filterOutSchedulablePodListProcessor{
schedulingSimulator: scheduling.NewHintingSimulator(predicateChecker),
nodesToConsider: nodesToConsider,
}
}

Expand Down Expand Up @@ -98,7 +101,7 @@ func (p *filterOutSchedulablePodListProcessor) filterOutSchedulableByPacking(uns
return corev1helpers.PodPriority(unschedulableCandidates[i]) > corev1helpers.PodPriority(unschedulableCandidates[j])
})

statuses, overflowingControllerCount, err := p.schedulingSimulator.TrySchedulePods(clusterSnapshot, unschedulableCandidates, scheduling.ScheduleAnywhere, false)
statuses, overflowingControllerCount, err := p.schedulingSimulator.TrySchedulePods(clusterSnapshot, unschedulableCandidates, p.nodesToConsider, false)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

func TestFilterOutSchedulable(t *testing.T) {
node := buildReadyTestNode("node", 2000, 100)
matchesAllNodes := func(*schedulerframework.NodeInfo) bool { return true }
matchesNoNodes := func(*schedulerframework.NodeInfo) bool { return false }

testCases := map[string]struct {
nodesWithPods map[*apiv1.Node][]*apiv1.Pod
Expand Down Expand Up @@ -151,6 +153,21 @@ func TestFilterOutSchedulable(t *testing.T) {
},
nodeMatches: matchesAllNodes,
},
"single empty node, various pods, node should not be considered": {
nodesWithPods: map[*apiv1.Node][]*apiv1.Pod{node: {}},
unschedulableCandidates: []*apiv1.Pod{
BuildTestPod("pod1", 200, 10),
BuildTestPod("pod2", 500, 10),
BuildTestPod("pod3", 1800, 10),
},
expectedScheduledPods: []*apiv1.Pod{},
expectedUnscheduledPods: []*apiv1.Pod{
BuildTestPod("pod1", 200, 10),
BuildTestPod("pod2", 500, 10),
BuildTestPod("pod3", 1800, 10),
},
nodeMatches: matchesNoNodes,
},
}

for tn, tc := range testCases {
Expand All @@ -177,7 +194,7 @@ func TestFilterOutSchedulable(t *testing.T) {

clusterSnapshot.Fork()

processor := NewFilterOutSchedulablePodListProcessor(predicateChecker)
processor := NewFilterOutSchedulablePodListProcessor(predicateChecker, tc.nodeMatches)
unschedulablePods, err := processor.filterOutSchedulableByPacking(tc.unschedulableCandidates, clusterSnapshot)

assert.NoError(t, err)
Expand Down Expand Up @@ -278,7 +295,7 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
processor := NewFilterOutSchedulablePodListProcessor(predicateChecker)
processor := NewFilterOutSchedulablePodListProcessor(predicateChecker, scheduling.ScheduleAnywhere)
if stillPending, err := processor.filterOutSchedulableByPacking(pendingPods, clusterSnapshot); err != nil {
assert.NoError(b, err)
} else if len(stillPending) < tc.pendingPods {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ package podlistprocessor
import (
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

// NewDefaultPodListProcessor returns a default implementation of the pod list
// processor, which wraps and sequentially runs other sub-processors.
func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker) *pods.CombinedPodListProcessor {
func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker, nodesToConsider func(*schedulerframework.NodeInfo) bool) *pods.CombinedPodListProcessor {
return pods.NewCombinedPodListProcessor([]pods.PodListProcessor{
NewClearTPURequestsPodListProcessor(),
NewFilterOutExpendablePodListProcessor(),
NewCurrentlyDrainedNodesPodListProcessor(),
NewFilterOutSchedulablePodListProcessor(predicateChecker),
NewFilterOutSchedulablePodListProcessor(predicateChecker, nodesToConsider),
NewFilterOutDaemonSetPodListProcessor(),
})
}
3 changes: 2 additions & 1 deletion cluster-autoscaler/core/test/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand Down Expand Up @@ -176,7 +177,7 @@ func ExtractPodNames(pods []*apiv1.Pod) []string {
// NewTestProcessors returns a set of simple processors for use in tests.
func NewTestProcessors(context *context.AutoscalingContext) *processors.AutoscalingProcessors {
return &processors.AutoscalingProcessors{
PodListProcessor: podlistprocessor.NewDefaultPodListProcessor(context.PredicateChecker),
PodListProcessor: podlistprocessor.NewDefaultPodListProcessor(context.PredicateChecker, scheduling.ScheduleAnywhere),
NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{},
BinpackingLimiter: binpacking.NewTimeLimiter(context.MaxNodeGroupBinpackingDuration),
NodeGroupSetProcessor: nodegroupset.NewDefaultNodeGroupSetProcessor([]string{}, config.NodeGroupDifferenceRatios{}),
Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"

"github.com/spf13/pflag"
Expand Down Expand Up @@ -495,7 +496,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter

opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions)
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets)
podListProcessor := podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker)
podListProcessor := podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker, scheduling.ScheduleAnywhere)

if autoscalingOptions.ProvisioningRequestEnabled {
podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager()))
Expand Down

0 comments on commit 890bf24

Please sign in to comment.