From 90308dda3807cea93a4064c5ed147f9ac5128b72 Mon Sep 17 00:00:00 2001 From: Joseph Date: Sun, 4 Feb 2024 17:44:37 +0800 Subject: [PATCH] scheduler: improve NodeNUMAResource handling node cpu bind policy Signed-off-by: Joseph --- .../plugins/nodenumaresource/plugin.go | 64 +++++------ .../plugins/nodenumaresource/plugin_test.go | 54 +++++++--- .../plugins/nodenumaresource/util.go | 20 ++++ .../plugins/nodenumaresource/util_test.go | 101 ++++++++++++++++++ 4 files changed, 189 insertions(+), 50 deletions(-) create mode 100644 pkg/scheduler/plugins/nodenumaresource/util_test.go diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin.go b/pkg/scheduler/plugins/nodenumaresource/plugin.go index 90f989db87..5fa7969235 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin.go +++ b/pkg/scheduler/plugins/nodenumaresource/plugin.go @@ -50,7 +50,7 @@ const ( ErrNotFoundCPUTopology = "node(s) CPU Topology not found" ErrInvalidCPUTopology = "node(s) invalid CPU Topology" ErrSMTAlignmentError = "node(s) requested cpus not multiple cpus per core" - ErrRequiredFullPCPUsPolicy = "node(s) required FullPCPUs policy" + ErrCPUBindPolicyConflict = "node(s) cpu bind policy conflicts with pod's required cpu bind policy" ErrInvalidCPUAmplificationRatio = "node(s) invalid CPU amplification ratio" ErrInsufficientAmplifiedCPU = "Insufficient amplified cpu" ) @@ -302,22 +302,25 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p if !topologyOptions.CPUTopology.IsValid() { return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrInvalidCPUTopology) } + + requiredCPUBindPolicy := state.requiredCPUBindPolicy nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy) - if nodeCPUBindPolicy == extension.NodeCPUBindPolicyFullPCPUsOnly || - state.requiredCPUBindPolicy == schedulingconfig.CPUBindPolicyFullPCPUs { + if nodeCPUBindPolicy == extension.NodeCPUBindPolicyFullPCPUsOnly { + requiredCPUBindPolicy = schedulingconfig.CPUBindPolicyFullPCPUs + } else if nodeCPUBindPolicy == extension.NodeCPUBindPolicySpreadByPCPUs { + requiredCPUBindPolicy = schedulingconfig.CPUBindPolicySpreadByPCPUs + } + if state.requiredCPUBindPolicy != "" && state.requiredCPUBindPolicy != requiredCPUBindPolicy { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrCPUBindPolicyConflict) + } + + if requiredCPUBindPolicy == schedulingconfig.CPUBindPolicyFullPCPUs { if state.numCPUsNeeded%topologyOptions.CPUTopology.CPUsPerCore() != 0 { return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrSMTAlignmentError) } - - if nodeCPUBindPolicy == extension.NodeCPUBindPolicyFullPCPUsOnly { - if (state.requiredCPUBindPolicy != "" && state.requiredCPUBindPolicy != schedulingconfig.CPUBindPolicyFullPCPUs) || - (state.preferredCPUBindPolicy != "" && state.preferredCPUBindPolicy != schedulingconfig.CPUBindPolicyFullPCPUs) { - return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy) - } - } } - if state.requiredCPUBindPolicy != "" && numaTopologyPolicy == extension.NUMATopologyPolicyNone { + if requiredCPUBindPolicy != "" && numaTopologyPolicy == extension.NUMATopologyPolicyNone { resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, topologymanager.NUMATopologyHint{}, topologyOptions) if err != nil { return framework.AsStatus(err) @@ -466,9 +469,11 @@ func (p *Plugin) preBindObject(ctx context.Context, cycleState *framework.CycleS } func (p *Plugin) getResourceOptions(cycleState *framework.CycleState, state *preFilterState, node *corev1.Node, pod *corev1.Pod, affinity topologymanager.NUMATopologyHint, topologyOptions TopologyOptions) (*ResourceOptions, error) { - preferredCPUBindPolicy, err := p.getPreferredCPUBindPolicy(node, state.preferredCPUBindPolicy) - if err != nil { - return nil, err + if topologyOptions.CPUTopology == nil { + return nil, errors.New(ErrNotFoundCPUTopology) + } + if !topologyOptions.CPUTopology.IsValid() { + return nil, errors.New(ErrInvalidCPUTopology) } if err := amplifyNUMANodeResources(node, &topologyOptions); err != nil { @@ -497,13 +502,18 @@ func (p *Plugin) getResourceOptions(cycleState *framework.CycleState, state *pre extension.AmplifyResourceList(requests, topologyOptions.AmplificationRatios, corev1.ResourceCPU) } + cpuBindPolicy, requiredCPUBindPolicy, err := getCPUBindPolicy(&topologyOptions, node, state.requiredCPUBindPolicy, state.preferredCPUBindPolicy) + if err != nil { + return nil, err + } + options := &ResourceOptions{ requests: requests, originalRequests: state.requests, numCPUsNeeded: state.numCPUsNeeded, requestCPUBind: state.requestCPUBind, - requiredCPUBindPolicy: state.requiredCPUBindPolicy != "", - cpuBindPolicy: preferredCPUBindPolicy, + requiredCPUBindPolicy: requiredCPUBindPolicy, + cpuBindPolicy: cpuBindPolicy, cpuExclusivePolicy: state.preferredCPUExclusivePolicy, preferredCPUs: reservationReservedCPUs, reusableResources: reusableResources, @@ -555,25 +565,3 @@ func appendResourceSpecIfMissed(object metav1.Object, state *preFilterState) err resourceSpec.PreferredCPUBindPolicy = extension.CPUBindPolicy(preferredCPUBindPolicy) return extension.SetResourceSpec(object, resourceSpec) } - -func (p *Plugin) getPreferredCPUBindPolicy(node *corev1.Node, preferredCPUBindPolicy schedulingconfig.CPUBindPolicy) (schedulingconfig.CPUBindPolicy, error) { - topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name) - if topologyOptions.CPUTopology == nil { - return preferredCPUBindPolicy, errors.New(ErrNotFoundCPUTopology) - } - if !topologyOptions.CPUTopology.IsValid() { - return preferredCPUBindPolicy, errors.New(ErrInvalidCPUTopology) - } - - kubeletCPUPolicy := topologyOptions.Policy - nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, kubeletCPUPolicy) - switch nodeCPUBindPolicy { - default: - case extension.NodeCPUBindPolicyNone: - case extension.NodeCPUBindPolicySpreadByPCPUs: - preferredCPUBindPolicy = schedulingconfig.CPUBindPolicySpreadByPCPUs - case extension.NodeCPUBindPolicyFullPCPUsOnly: - preferredCPUBindPolicy = schedulingconfig.CPUBindPolicyFullPCPUs - } - return preferredCPUBindPolicy, nil -} diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin_test.go b/pkg/scheduler/plugins/nodenumaresource/plugin_test.go index 0bbdf1bcbc..02984880bc 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/plugin_test.go @@ -654,22 +654,35 @@ func TestPlugin_Filter(t *testing.T) { }, cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), allocationState: NewNodeAllocation("test-node-1"), - want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy), + want: nil, }, { - name: "verify FullPCPUsOnly with required SpreadByPCPUs", + name: "failed to verify FullPCPUsOnly with required SpreadByPCPUs", nodeLabels: map[string]string{ extension.LabelNodeCPUBindPolicy: string(extension.NodeCPUBindPolicyFullPCPUsOnly), }, state: &preFilterState{ - requestCPUBind: true, - requiredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs, - preferredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs, - numCPUsNeeded: 4, + requestCPUBind: true, + requiredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs, + numCPUsNeeded: 4, }, cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), allocationState: NewNodeAllocation("test-node-1"), - want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy), + want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrCPUBindPolicyConflict), + }, + { + name: "verify FullPCPUsOnly with required FullPCPUs", + nodeLabels: map[string]string{ + extension.LabelNodeCPUBindPolicy: string(extension.NodeCPUBindPolicyFullPCPUsOnly), + }, + state: &preFilterState{ + requestCPUBind: true, + requiredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + numCPUsNeeded: 4, + }, + cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), + allocationState: NewNodeAllocation("test-node-1"), + want: nil, }, { name: "verify Kubelet FullPCPUsOnly with SMTAlignmentError", @@ -689,11 +702,28 @@ func TestPlugin_Filter(t *testing.T) { want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrSMTAlignmentError), }, { - name: "verify Kubelet FullPCPUsOnly with RequiredFullPCPUsPolicy", + name: "verify Kubelet FullPCPUsOnly with required SpreadByPCPUs", state: &preFilterState{ - requestCPUBind: true, - preferredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs, - numCPUsNeeded: 4, + requestCPUBind: true, + requiredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs, + numCPUsNeeded: 4, + }, + cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), + allocationState: NewNodeAllocation("test-node-1"), + kubeletPolicy: &extension.KubeletCPUManagerPolicy{ + Policy: extension.KubeletCPUManagerPolicyStatic, + Options: map[string]string{ + extension.KubeletCPUManagerPolicyFullPCPUsOnlyOption: "true", + }, + }, + want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrCPUBindPolicyConflict), + }, + { + name: "verify Kubelet FullPCPUsOnly with required FullPCPUs", + state: &preFilterState{ + requestCPUBind: true, + requiredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + numCPUsNeeded: 4, }, cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), allocationState: NewNodeAllocation("test-node-1"), @@ -703,7 +733,7 @@ func TestPlugin_Filter(t *testing.T) { extension.KubeletCPUManagerPolicyFullPCPUsOnlyOption: "true", }, }, - want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy), + want: nil, }, { name: "verify required FullPCPUs with none NUMA topology policy", diff --git a/pkg/scheduler/plugins/nodenumaresource/util.go b/pkg/scheduler/plugins/nodenumaresource/util.go index b4eccf845c..6e23a1c819 100644 --- a/pkg/scheduler/plugins/nodenumaresource/util.go +++ b/pkg/scheduler/plugins/nodenumaresource/util.go @@ -84,3 +84,23 @@ func amplifyNUMANodeResources(node *corev1.Node, topologyOptions *TopologyOption topologyOptions.NUMANodeResources = numaNodeResources return nil } + +func getCPUBindPolicy(topologyOptions *TopologyOptions, node *corev1.Node, requiredCPUBindPolicy, preferredCPUBindPolicy schedulingconfig.CPUBindPolicy) (schedulingconfig.CPUBindPolicy, bool, error) { + if requiredCPUBindPolicy != "" { + return requiredCPUBindPolicy, true, nil + } + + cpuBindPolicy := preferredCPUBindPolicy + required := false + kubeletCPUPolicy := topologyOptions.Policy + nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, kubeletCPUPolicy) + switch nodeCPUBindPolicy { + case extension.NodeCPUBindPolicySpreadByPCPUs: + cpuBindPolicy = schedulingconfig.CPUBindPolicySpreadByPCPUs + required = true + case extension.NodeCPUBindPolicyFullPCPUsOnly: + cpuBindPolicy = schedulingconfig.CPUBindPolicyFullPCPUs + required = true + } + return cpuBindPolicy, required, nil +} diff --git a/pkg/scheduler/plugins/nodenumaresource/util_test.go b/pkg/scheduler/plugins/nodenumaresource/util_test.go new file mode 100644 index 0000000000..853f56db05 --- /dev/null +++ b/pkg/scheduler/plugins/nodenumaresource/util_test.go @@ -0,0 +1,101 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodenumaresource + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/koordinator-sh/koordinator/apis/extension" + schedulingconfig "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" +) + +func Test_getCPUBindPolicy(t *testing.T) { + tests := []struct { + name string + kubeletPolicy *extension.KubeletCPUManagerPolicy + nodePolicy extension.NodeCPUBindPolicy + requiredPolicy schedulingconfig.CPUBindPolicy + preferredPolicy schedulingconfig.CPUBindPolicy + wantPolicy schedulingconfig.CPUBindPolicy + wantRequired bool + wantError bool + }{ + { + name: "kubelet enables FullPCPUsOnly", + kubeletPolicy: &extension.KubeletCPUManagerPolicy{ + Policy: extension.KubeletCPUManagerPolicyStatic, + Options: map[string]string{ + extension.KubeletCPUManagerPolicyFullPCPUsOnlyOption: "true", + }, + }, + nodePolicy: "", + requiredPolicy: "", + preferredPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + wantPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + wantRequired: true, + wantError: false, + }, + { + name: "node enables FullPCPUsOnly", + nodePolicy: extension.NodeCPUBindPolicyFullPCPUsOnly, + requiredPolicy: "", + preferredPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + wantPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + wantRequired: true, + wantError: false, + }, + { + name: "pod enables required FullPCPUsOnly", + requiredPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + wantPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + wantRequired: true, + wantError: false, + }, + { + name: "pod enables preferred FullPCPUsOnly", + preferredPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + wantPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + wantRequired: false, + wantError: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + topologyOpts := &TopologyOptions{ + Policy: tt.kubeletPolicy, + } + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{}, + }, + } + if tt.nodePolicy != "" { + node.Labels[extension.LabelNodeCPUBindPolicy] = string(tt.nodePolicy) + } + policy, required, err := getCPUBindPolicy(topologyOpts, node, tt.requiredPolicy, tt.preferredPolicy) + assert.Equal(t, tt.wantPolicy, policy) + assert.Equal(t, tt.wantRequired, required) + if tt.wantError != (err != nil) { + t.Errorf("wantErr=%v, but got err=%v", tt.wantError, err) + } + }) + } +}