Skip to content

Commit

Permalink
scheduler: improve NodeNUMAResource handling node cpu bind policy
Browse files Browse the repository at this point in the history
Signed-off-by: Joseph <joseph.t.lee@outlook.com>
  • Loading branch information
eahydra committed Feb 4, 2024
1 parent 93f2bc2 commit 90308dd
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 50 deletions.
64 changes: 26 additions & 38 deletions pkg/scheduler/plugins/nodenumaresource/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
ErrNotFoundCPUTopology = "node(s) CPU Topology not found"
ErrInvalidCPUTopology = "node(s) invalid CPU Topology"
ErrSMTAlignmentError = "node(s) requested cpus not multiple cpus per core"
ErrRequiredFullPCPUsPolicy = "node(s) required FullPCPUs policy"
ErrCPUBindPolicyConflict = "node(s) cpu bind policy conflicts with pod's required cpu bind policy"
ErrInvalidCPUAmplificationRatio = "node(s) invalid CPU amplification ratio"
ErrInsufficientAmplifiedCPU = "Insufficient amplified cpu"
)
Expand Down Expand Up @@ -302,22 +302,25 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p
if !topologyOptions.CPUTopology.IsValid() {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrInvalidCPUTopology)
}

requiredCPUBindPolicy := state.requiredCPUBindPolicy
nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy)
if nodeCPUBindPolicy == extension.NodeCPUBindPolicyFullPCPUsOnly ||
state.requiredCPUBindPolicy == schedulingconfig.CPUBindPolicyFullPCPUs {
if nodeCPUBindPolicy == extension.NodeCPUBindPolicyFullPCPUsOnly {
requiredCPUBindPolicy = schedulingconfig.CPUBindPolicyFullPCPUs
} else if nodeCPUBindPolicy == extension.NodeCPUBindPolicySpreadByPCPUs {
requiredCPUBindPolicy = schedulingconfig.CPUBindPolicySpreadByPCPUs
}
if state.requiredCPUBindPolicy != "" && state.requiredCPUBindPolicy != requiredCPUBindPolicy {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrCPUBindPolicyConflict)
}

if requiredCPUBindPolicy == schedulingconfig.CPUBindPolicyFullPCPUs {
if state.numCPUsNeeded%topologyOptions.CPUTopology.CPUsPerCore() != 0 {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrSMTAlignmentError)
}

if nodeCPUBindPolicy == extension.NodeCPUBindPolicyFullPCPUsOnly {
if (state.requiredCPUBindPolicy != "" && state.requiredCPUBindPolicy != schedulingconfig.CPUBindPolicyFullPCPUs) ||
(state.preferredCPUBindPolicy != "" && state.preferredCPUBindPolicy != schedulingconfig.CPUBindPolicyFullPCPUs) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy)
}
}
}

if state.requiredCPUBindPolicy != "" && numaTopologyPolicy == extension.NUMATopologyPolicyNone {
if requiredCPUBindPolicy != "" && numaTopologyPolicy == extension.NUMATopologyPolicyNone {
resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, topologymanager.NUMATopologyHint{}, topologyOptions)
if err != nil {
return framework.AsStatus(err)
Expand Down Expand Up @@ -466,9 +469,11 @@ func (p *Plugin) preBindObject(ctx context.Context, cycleState *framework.CycleS
}

func (p *Plugin) getResourceOptions(cycleState *framework.CycleState, state *preFilterState, node *corev1.Node, pod *corev1.Pod, affinity topologymanager.NUMATopologyHint, topologyOptions TopologyOptions) (*ResourceOptions, error) {
preferredCPUBindPolicy, err := p.getPreferredCPUBindPolicy(node, state.preferredCPUBindPolicy)
if err != nil {
return nil, err
if topologyOptions.CPUTopology == nil {
return nil, errors.New(ErrNotFoundCPUTopology)
}
if !topologyOptions.CPUTopology.IsValid() {
return nil, errors.New(ErrInvalidCPUTopology)
}

if err := amplifyNUMANodeResources(node, &topologyOptions); err != nil {
Expand Down Expand Up @@ -497,13 +502,18 @@ func (p *Plugin) getResourceOptions(cycleState *framework.CycleState, state *pre
extension.AmplifyResourceList(requests, topologyOptions.AmplificationRatios, corev1.ResourceCPU)
}

cpuBindPolicy, requiredCPUBindPolicy, err := getCPUBindPolicy(&topologyOptions, node, state.requiredCPUBindPolicy, state.preferredCPUBindPolicy)
if err != nil {
return nil, err
}

options := &ResourceOptions{
requests: requests,
originalRequests: state.requests,
numCPUsNeeded: state.numCPUsNeeded,
requestCPUBind: state.requestCPUBind,
requiredCPUBindPolicy: state.requiredCPUBindPolicy != "",
cpuBindPolicy: preferredCPUBindPolicy,
requiredCPUBindPolicy: requiredCPUBindPolicy,
cpuBindPolicy: cpuBindPolicy,
cpuExclusivePolicy: state.preferredCPUExclusivePolicy,
preferredCPUs: reservationReservedCPUs,
reusableResources: reusableResources,
Expand Down Expand Up @@ -555,25 +565,3 @@ func appendResourceSpecIfMissed(object metav1.Object, state *preFilterState) err
resourceSpec.PreferredCPUBindPolicy = extension.CPUBindPolicy(preferredCPUBindPolicy)
return extension.SetResourceSpec(object, resourceSpec)
}

func (p *Plugin) getPreferredCPUBindPolicy(node *corev1.Node, preferredCPUBindPolicy schedulingconfig.CPUBindPolicy) (schedulingconfig.CPUBindPolicy, error) {
topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name)
if topologyOptions.CPUTopology == nil {
return preferredCPUBindPolicy, errors.New(ErrNotFoundCPUTopology)
}
if !topologyOptions.CPUTopology.IsValid() {
return preferredCPUBindPolicy, errors.New(ErrInvalidCPUTopology)
}

kubeletCPUPolicy := topologyOptions.Policy
nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, kubeletCPUPolicy)
switch nodeCPUBindPolicy {
default:
case extension.NodeCPUBindPolicyNone:
case extension.NodeCPUBindPolicySpreadByPCPUs:
preferredCPUBindPolicy = schedulingconfig.CPUBindPolicySpreadByPCPUs
case extension.NodeCPUBindPolicyFullPCPUsOnly:
preferredCPUBindPolicy = schedulingconfig.CPUBindPolicyFullPCPUs
}
return preferredCPUBindPolicy, nil
}
54 changes: 42 additions & 12 deletions pkg/scheduler/plugins/nodenumaresource/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,22 +654,35 @@ func TestPlugin_Filter(t *testing.T) {
},
cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2),
allocationState: NewNodeAllocation("test-node-1"),
want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy),
want: nil,
},
{
name: "verify FullPCPUsOnly with required SpreadByPCPUs",
name: "failed to verify FullPCPUsOnly with required SpreadByPCPUs",
nodeLabels: map[string]string{
extension.LabelNodeCPUBindPolicy: string(extension.NodeCPUBindPolicyFullPCPUsOnly),
},
state: &preFilterState{
requestCPUBind: true,
requiredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs,
preferredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs,
numCPUsNeeded: 4,
requestCPUBind: true,
requiredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs,
numCPUsNeeded: 4,
},
cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2),
allocationState: NewNodeAllocation("test-node-1"),
want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy),
want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrCPUBindPolicyConflict),
},
{
name: "verify FullPCPUsOnly with required FullPCPUs",
nodeLabels: map[string]string{
extension.LabelNodeCPUBindPolicy: string(extension.NodeCPUBindPolicyFullPCPUsOnly),
},
state: &preFilterState{
requestCPUBind: true,
requiredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs,
numCPUsNeeded: 4,
},
cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2),
allocationState: NewNodeAllocation("test-node-1"),
want: nil,
},
{
name: "verify Kubelet FullPCPUsOnly with SMTAlignmentError",
Expand All @@ -689,11 +702,28 @@ func TestPlugin_Filter(t *testing.T) {
want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrSMTAlignmentError),
},
{
name: "verify Kubelet FullPCPUsOnly with RequiredFullPCPUsPolicy",
name: "verify Kubelet FullPCPUsOnly with required SpreadByPCPUs",
state: &preFilterState{
requestCPUBind: true,
preferredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs,
numCPUsNeeded: 4,
requestCPUBind: true,
requiredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs,
numCPUsNeeded: 4,
},
cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2),
allocationState: NewNodeAllocation("test-node-1"),
kubeletPolicy: &extension.KubeletCPUManagerPolicy{
Policy: extension.KubeletCPUManagerPolicyStatic,
Options: map[string]string{
extension.KubeletCPUManagerPolicyFullPCPUsOnlyOption: "true",
},
},
want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrCPUBindPolicyConflict),
},
{
name: "verify Kubelet FullPCPUsOnly with required FullPCPUs",
state: &preFilterState{
requestCPUBind: true,
requiredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs,
numCPUsNeeded: 4,
},
cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2),
allocationState: NewNodeAllocation("test-node-1"),
Expand All @@ -703,7 +733,7 @@ func TestPlugin_Filter(t *testing.T) {
extension.KubeletCPUManagerPolicyFullPCPUsOnlyOption: "true",
},
},
want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy),
want: nil,
},
{
name: "verify required FullPCPUs with none NUMA topology policy",
Expand Down
20 changes: 20 additions & 0 deletions pkg/scheduler/plugins/nodenumaresource/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,23 @@ func amplifyNUMANodeResources(node *corev1.Node, topologyOptions *TopologyOption
topologyOptions.NUMANodeResources = numaNodeResources
return nil
}

func getCPUBindPolicy(topologyOptions *TopologyOptions, node *corev1.Node, requiredCPUBindPolicy, preferredCPUBindPolicy schedulingconfig.CPUBindPolicy) (schedulingconfig.CPUBindPolicy, bool, error) {
if requiredCPUBindPolicy != "" {
return requiredCPUBindPolicy, true, nil
}

cpuBindPolicy := preferredCPUBindPolicy
required := false
kubeletCPUPolicy := topologyOptions.Policy
nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, kubeletCPUPolicy)
switch nodeCPUBindPolicy {
case extension.NodeCPUBindPolicySpreadByPCPUs:
cpuBindPolicy = schedulingconfig.CPUBindPolicySpreadByPCPUs
required = true
case extension.NodeCPUBindPolicyFullPCPUsOnly:
cpuBindPolicy = schedulingconfig.CPUBindPolicyFullPCPUs
required = true
}
return cpuBindPolicy, required, nil
}
101 changes: 101 additions & 0 deletions pkg/scheduler/plugins/nodenumaresource/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nodenumaresource

import (
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/koordinator-sh/koordinator/apis/extension"
schedulingconfig "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config"
)

func Test_getCPUBindPolicy(t *testing.T) {
tests := []struct {
name string
kubeletPolicy *extension.KubeletCPUManagerPolicy
nodePolicy extension.NodeCPUBindPolicy
requiredPolicy schedulingconfig.CPUBindPolicy
preferredPolicy schedulingconfig.CPUBindPolicy
wantPolicy schedulingconfig.CPUBindPolicy
wantRequired bool
wantError bool
}{
{
name: "kubelet enables FullPCPUsOnly",
kubeletPolicy: &extension.KubeletCPUManagerPolicy{
Policy: extension.KubeletCPUManagerPolicyStatic,
Options: map[string]string{
extension.KubeletCPUManagerPolicyFullPCPUsOnlyOption: "true",
},
},
nodePolicy: "",
requiredPolicy: "",
preferredPolicy: schedulingconfig.CPUBindPolicyFullPCPUs,
wantPolicy: schedulingconfig.CPUBindPolicyFullPCPUs,
wantRequired: true,
wantError: false,
},
{
name: "node enables FullPCPUsOnly",
nodePolicy: extension.NodeCPUBindPolicyFullPCPUsOnly,
requiredPolicy: "",
preferredPolicy: schedulingconfig.CPUBindPolicyFullPCPUs,
wantPolicy: schedulingconfig.CPUBindPolicyFullPCPUs,
wantRequired: true,
wantError: false,
},
{
name: "pod enables required FullPCPUsOnly",
requiredPolicy: schedulingconfig.CPUBindPolicyFullPCPUs,
wantPolicy: schedulingconfig.CPUBindPolicyFullPCPUs,
wantRequired: true,
wantError: false,
},
{
name: "pod enables preferred FullPCPUsOnly",
preferredPolicy: schedulingconfig.CPUBindPolicyFullPCPUs,
wantPolicy: schedulingconfig.CPUBindPolicyFullPCPUs,
wantRequired: false,
wantError: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
topologyOpts := &TopologyOptions{
Policy: tt.kubeletPolicy,
}
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{},
},
}
if tt.nodePolicy != "" {
node.Labels[extension.LabelNodeCPUBindPolicy] = string(tt.nodePolicy)
}
policy, required, err := getCPUBindPolicy(topologyOpts, node, tt.requiredPolicy, tt.preferredPolicy)
assert.Equal(t, tt.wantPolicy, policy)
assert.Equal(t, tt.wantRequired, required)
if tt.wantError != (err != nil) {
t.Errorf("wantErr=%v, but got err=%v", tt.wantError, err)
}
})
}
}

0 comments on commit 90308dd

Please sign in to comment.