diff --git a/pkg/scheduler/plugins/nodenumaresource/cpu_topology.go b/pkg/scheduler/plugins/nodenumaresource/cpu_topology.go index 96078afa3..febd2c0c2 100644 --- a/pkg/scheduler/plugins/nodenumaresource/cpu_topology.go +++ b/pkg/scheduler/plugins/nodenumaresource/cpu_topology.go @@ -75,7 +75,7 @@ func (b *CPUTopologyBuilder) Result() *CPUTopology { // IsValid checks if the topology is valid func (topo *CPUTopology) IsValid() bool { - return topo.NumSockets != 0 && topo.NumNodes != 0 && topo.NumCores != 0 && topo.NumCPUs != 0 + return topo != nil && topo.NumSockets != 0 && topo.NumNodes != 0 && topo.NumCores != 0 && topo.NumCPUs != 0 } // CPUsPerCore returns the number of logical CPUs are associated with each core. diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin.go b/pkg/scheduler/plugins/nodenumaresource/plugin.go index 90f989db8..92f4c50fc 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin.go +++ b/pkg/scheduler/plugins/nodenumaresource/plugin.go @@ -18,7 +18,6 @@ package nodenumaresource import ( "context" - "errors" "fmt" nrtv1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1" @@ -47,10 +46,10 @@ const ( ) const ( - ErrNotFoundCPUTopology = "node(s) CPU Topology not found" + ErrInvalidRequestedCPUs = "the requested CPUs must be integer" ErrInvalidCPUTopology = "node(s) invalid CPU Topology" ErrSMTAlignmentError = "node(s) requested cpus not multiple cpus per core" - ErrRequiredFullPCPUsPolicy = "node(s) required FullPCPUs policy" + ErrCPUBindPolicyConflict = "node(s) cpu bind policy conflicts with pod's required cpu bind policy" ErrInvalidCPUAmplificationRatio = "node(s) invalid CPU amplification ratio" ErrInsufficientAmplifiedCPU = "Insufficient amplified cpu" ) @@ -230,9 +229,11 @@ func (p *Plugin) PreFilter(ctx context.Context, cycleState *framework.CycleState }) return nil, nil } + requestedCPU := requests.Cpu().MilliValue() state := &preFilterState{ requestCPUBind: false, requests: requests, + numCPUsNeeded: int(requestedCPU / 1000), } if AllowUseCPUSet(pod) { cpuBindPolicy := schedulingconfig.CPUBindPolicy(resourceSpec.PreferredCPUBindPolicy) @@ -249,9 +250,8 @@ func (p *Plugin) PreFilter(ctx context.Context, cycleState *framework.CycleState if cpuBindPolicy == schedulingconfig.CPUBindPolicyFullPCPUs || cpuBindPolicy == schedulingconfig.CPUBindPolicySpreadByPCPUs { - requestedCPU := requests.Cpu().MilliValue() if requestedCPU%1000 != 0 { - return nil, framework.NewStatus(framework.Error, "the requested CPUs must be integer") + return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrInvalidRequestedCPUs) } if requestedCPU > 0 { @@ -259,7 +259,6 @@ func (p *Plugin) PreFilter(ctx context.Context, cycleState *framework.CycleState state.requiredCPUBindPolicy = requiredCPUBindPolicy state.preferredCPUBindPolicy = cpuBindPolicy state.preferredCPUExclusivePolicy = resourceSpec.PreferredCPUExclusivePolicy - state.numCPUsNeeded = int(requestedCPU / 1000) } } } @@ -278,47 +277,49 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p if !status.IsSuccess() { return status } - - if status := p.filterAmplifiedCPUs(state, nodeInfo); !status.IsSuccess() { - return status + if state.skip { + return nil } node := nodeInfo.Node() topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name) + nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy) numaTopologyPolicy := getNUMATopologyPolicy(node.Labels, topologyOptions.NUMATopologyPolicy) - - if skipTheNode(state, numaTopologyPolicy) { - return nil + requestCPUBind, status := requestCPUBind(state, nodeCPUBindPolicy) + if !status.IsSuccess() { + return status } - if state.requestCPUBind { - if topologyOptions.CPUTopology == nil { - return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrNotFoundCPUTopology) - } + if status := p.filterAmplifiedCPUs(state.requests.Cpu().MilliValue(), nodeInfo, requestCPUBind); !status.IsSuccess() { + return status + } + if requestCPUBind { // It's necessary to force node to have NodeResourceTopology and CPUTopology // We must satisfy the user's CPUSet request. Even if some nodes in the cluster have resources, // they cannot be allocated without valid CPU topology. if !topologyOptions.CPUTopology.IsValid() { return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrInvalidCPUTopology) } - nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy) - if nodeCPUBindPolicy == extension.NodeCPUBindPolicyFullPCPUsOnly || - state.requiredCPUBindPolicy == schedulingconfig.CPUBindPolicyFullPCPUs { + + requiredCPUBindPolicy := state.requiredCPUBindPolicy + if nodeCPUBindPolicy == extension.NodeCPUBindPolicyFullPCPUsOnly { + requiredCPUBindPolicy = schedulingconfig.CPUBindPolicyFullPCPUs + } else if nodeCPUBindPolicy == extension.NodeCPUBindPolicySpreadByPCPUs { + requiredCPUBindPolicy = schedulingconfig.CPUBindPolicySpreadByPCPUs + } + if state.requiredCPUBindPolicy != "" && state.requiredCPUBindPolicy != requiredCPUBindPolicy { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrCPUBindPolicyConflict) + } + + if requiredCPUBindPolicy == schedulingconfig.CPUBindPolicyFullPCPUs { if state.numCPUsNeeded%topologyOptions.CPUTopology.CPUsPerCore() != 0 { return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrSMTAlignmentError) } - - if nodeCPUBindPolicy == extension.NodeCPUBindPolicyFullPCPUsOnly { - if (state.requiredCPUBindPolicy != "" && state.requiredCPUBindPolicy != schedulingconfig.CPUBindPolicyFullPCPUs) || - (state.preferredCPUBindPolicy != "" && state.preferredCPUBindPolicy != schedulingconfig.CPUBindPolicyFullPCPUs) { - return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy) - } - } } - if state.requiredCPUBindPolicy != "" && numaTopologyPolicy == extension.NUMATopologyPolicyNone { - resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, topologymanager.NUMATopologyHint{}, topologyOptions) + if requiredCPUBindPolicy != "" && numaTopologyPolicy == extension.NUMATopologyPolicyNone { + resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, requestCPUBind, topologymanager.NUMATopologyHint{}, topologyOptions) if err != nil { return framework.AsStatus(err) } @@ -336,9 +337,7 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p return nil } -func (p *Plugin) filterAmplifiedCPUs(state *preFilterState, nodeInfo *framework.NodeInfo) *framework.Status { - quantity := state.requests[corev1.ResourceCPU] - podRequestMilliCPU := quantity.MilliValue() +func (p *Plugin) filterAmplifiedCPUs(podRequestMilliCPU int64, nodeInfo *framework.NodeInfo, requestCPUBind bool) *framework.Status { if podRequestMilliCPU == 0 { return nil } @@ -352,16 +351,14 @@ func (p *Plugin) filterAmplifiedCPUs(state *preFilterState, nodeInfo *framework. return nil } - if state.requestCPUBind { + if requestCPUBind { podRequestMilliCPU = extension.Amplify(podRequestMilliCPU, cpuAmplificationRatio) } // TODO(joseph): Reservations and preemption should be considered here. - _, allocated, _ := p.resourceManager.GetAvailableCPUs(node.Name, cpuset.CPUSet{}) + _, allocated, err := p.resourceManager.GetAvailableCPUs(node.Name, cpuset.CPUSet{}) if err != nil { - if err.Error() != ErrNotFoundCPUTopology { - return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) - } + return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) } allocatedMilliCPU := int64(allocated.CPUs().Size() * 1000) requestedMilliCPU := nodeInfo.Requested.MilliCPU @@ -380,6 +377,9 @@ func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, if !status.IsSuccess() { return status } + if state.skip { + return nil + } nodeInfo, err := p.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) if err != nil { @@ -387,16 +387,17 @@ func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, } node := nodeInfo.Node() topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name) + nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy) numaTopologyPolicy := getNUMATopologyPolicy(node.Labels, topologyOptions.NUMATopologyPolicy) - - if skipTheNode(state, numaTopologyPolicy) { + requestCPUBind, status := requestCPUBind(state, nodeCPUBindPolicy) + if !status.IsSuccess() { + return status + } + if !requestCPUBind && numaTopologyPolicy == extension.NUMATopologyPolicyNone { return nil } - if state.requestCPUBind { - if topologyOptions.CPUTopology == nil { - return framework.NewStatus(framework.Error, ErrNotFoundCPUTopology) - } + if requestCPUBind { if !topologyOptions.CPUTopology.IsValid() { return framework.NewStatus(framework.Error, ErrInvalidCPUTopology) } @@ -404,7 +405,7 @@ func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, store := topologymanager.GetStore(cycleState) affinity := store.GetAffinity(nodeName) - resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, affinity, topologyOptions) + resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, requestCPUBind, affinity, topologyOptions) if err != nil { return framework.AsStatus(err) } @@ -440,12 +441,24 @@ func (p *Plugin) preBindObject(ctx context.Context, cycleState *framework.CycleS if !status.IsSuccess() { return status } - if state.allocation == nil { + if state.skip || state.allocation == nil { return nil } - if state.requestCPUBind { - if err := appendResourceSpecIfMissed(object, state); err != nil { + nodeInfo, err := p.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) + } + node := nodeInfo.Node() + topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name) + nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy) + requestCPUBind, status := requestCPUBind(state, nodeCPUBindPolicy) + if !status.IsSuccess() { + return status + } + + if requestCPUBind { + if err := appendResourceSpecIfMissed(object, state, node, &topologyOptions); err != nil { return framework.AsStatus(err) } } @@ -465,12 +478,7 @@ func (p *Plugin) preBindObject(ctx context.Context, cycleState *framework.CycleS return nil } -func (p *Plugin) getResourceOptions(cycleState *framework.CycleState, state *preFilterState, node *corev1.Node, pod *corev1.Pod, affinity topologymanager.NUMATopologyHint, topologyOptions TopologyOptions) (*ResourceOptions, error) { - preferredCPUBindPolicy, err := p.getPreferredCPUBindPolicy(node, state.preferredCPUBindPolicy) - if err != nil { - return nil, err - } - +func (p *Plugin) getResourceOptions(cycleState *framework.CycleState, state *preFilterState, node *corev1.Node, pod *corev1.Pod, requestCPUBind bool, affinity topologymanager.NUMATopologyHint, topologyOptions TopologyOptions) (*ResourceOptions, error) { if err := amplifyNUMANodeResources(node, &topologyOptions); err != nil { return nil, err } @@ -492,18 +500,23 @@ func (p *Plugin) getResourceOptions(cycleState *framework.CycleState, state *pre } requests := state.requests - if state.requestCPUBind && amplificationRatio > 1 { + if requestCPUBind && amplificationRatio > 1 { requests = requests.DeepCopy() extension.AmplifyResourceList(requests, topologyOptions.AmplificationRatios, corev1.ResourceCPU) } + cpuBindPolicy, requiredCPUBindPolicy, err := getCPUBindPolicy(&topologyOptions, node, state.requiredCPUBindPolicy, state.preferredCPUBindPolicy) + if err != nil { + return nil, err + } + options := &ResourceOptions{ requests: requests, originalRequests: state.requests, numCPUsNeeded: state.numCPUsNeeded, - requestCPUBind: state.requestCPUBind, - requiredCPUBindPolicy: state.requiredCPUBindPolicy != "", - cpuBindPolicy: preferredCPUBindPolicy, + requestCPUBind: requestCPUBind, + requiredCPUBindPolicy: requiredCPUBindPolicy, + cpuBindPolicy: cpuBindPolicy, cpuExclusivePolicy: state.preferredCPUExclusivePolicy, preferredCPUs: reservationReservedCPUs, reusableResources: reusableResources, @@ -536,44 +549,30 @@ func (p *Plugin) getReservationReservedCPUs(cycleState *framework.CycleState, po return reservedCPUs, nil } -func appendResourceSpecIfMissed(object metav1.Object, state *preFilterState) error { - // Write back ResourceSpec annotation if LSR Pod hasn't specified CPUBindPolicy +func appendResourceSpecIfMissed(object metav1.Object, state *preFilterState, node *corev1.Node, topologyOpts *TopologyOptions) error { + cpuBindPolicy, required, err := getCPUBindPolicy(topologyOpts, node, state.requiredCPUBindPolicy, state.preferredCPUBindPolicy) + if err != nil { + return err + } + + // Write back ResourceSpec annotation if the Pod hasn't specified CPUBindPolicy + shouldWriteBack := false annotations := object.GetAnnotations() resourceSpec, _ := extension.GetResourceSpec(annotations) - if resourceSpec.RequiredCPUBindPolicy != extension.CPUBindPolicy(state.requiredCPUBindPolicy) { - resourceSpec.RequiredCPUBindPolicy = extension.CPUBindPolicy(state.requiredCPUBindPolicy) + if required && (resourceSpec.RequiredCPUBindPolicy == "" || resourceSpec.RequiredCPUBindPolicy == extension.CPUBindPolicyDefault) { + resourceSpec.RequiredCPUBindPolicy = extension.CPUBindPolicy(cpuBindPolicy) + shouldWriteBack = true } - - preferredCPUBindPolicy := schedulingconfig.CPUBindPolicy(resourceSpec.PreferredCPUBindPolicy) - if preferredCPUBindPolicy == "" || preferredCPUBindPolicy == schedulingconfig.CPUBindPolicyDefault { - if resourceSpec.RequiredCPUBindPolicy == "" { - preferredCPUBindPolicy = state.preferredCPUBindPolicy - } else if preferredCPUBindPolicy != "" { - preferredCPUBindPolicy = state.requiredCPUBindPolicy - } + if resourceSpec.PreferredCPUBindPolicy == extension.CPUBindPolicyDefault { + resourceSpec.PreferredCPUBindPolicy = extension.CPUBindPolicy(cpuBindPolicy) + shouldWriteBack = true } - resourceSpec.PreferredCPUBindPolicy = extension.CPUBindPolicy(preferredCPUBindPolicy) - return extension.SetResourceSpec(object, resourceSpec) -} - -func (p *Plugin) getPreferredCPUBindPolicy(node *corev1.Node, preferredCPUBindPolicy schedulingconfig.CPUBindPolicy) (schedulingconfig.CPUBindPolicy, error) { - topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name) - if topologyOptions.CPUTopology == nil { - return preferredCPUBindPolicy, errors.New(ErrNotFoundCPUTopology) - } - if !topologyOptions.CPUTopology.IsValid() { - return preferredCPUBindPolicy, errors.New(ErrInvalidCPUTopology) + if resourceSpec.RequiredCPUBindPolicy == "" && resourceSpec.PreferredCPUBindPolicy == "" && cpuBindPolicy != "" { + resourceSpec.PreferredCPUBindPolicy = extension.CPUBindPolicy(cpuBindPolicy) + shouldWriteBack = true } - - kubeletCPUPolicy := topologyOptions.Policy - nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, kubeletCPUPolicy) - switch nodeCPUBindPolicy { - default: - case extension.NodeCPUBindPolicyNone: - case extension.NodeCPUBindPolicySpreadByPCPUs: - preferredCPUBindPolicy = schedulingconfig.CPUBindPolicySpreadByPCPUs - case extension.NodeCPUBindPolicyFullPCPUsOnly: - preferredCPUBindPolicy = schedulingconfig.CPUBindPolicyFullPCPUs + if !shouldWriteBack { + return nil } - return preferredCPUBindPolicy, nil + return extension.SetResourceSpec(object, resourceSpec) } diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin_test.go b/pkg/scheduler/plugins/nodenumaresource/plugin_test.go index 0bbdf1bcb..522627caf 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/plugin_test.go @@ -113,6 +113,8 @@ func (f *testSharedLister) Get(nodeName string) (*framework.NodeInfo, error) { func makeNode(name string, capacity map[corev1.ResourceName]string, cpuAmpRatio extension.Ratio) *corev1.Node { node := st.MakeNode().Name(name).Capacity(capacity).Obj() + extension.SetNodeRawAllocatable(node, node.Status.Allocatable) + extension.AmplifyResourceList(node.Status.Allocatable, map[corev1.ResourceName]extension.Ratio{corev1.ResourceCPU: cpuAmpRatio}, corev1.ResourceCPU) _, _ = extension.SetNodeResourceAmplificationRatio(node, corev1.ResourceCPU, cpuAmpRatio) return node } @@ -438,6 +440,7 @@ func TestPlugin_PreFilter(t *testing.T) { requests: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("4"), }, + numCPUsNeeded: 4, }, }, { @@ -484,7 +487,7 @@ func TestPlugin_PreFilter(t *testing.T) { }, }, }, - want: framework.NewStatus(framework.Error, "the requested CPUs must be integer"), + want: framework.NewStatus(framework.UnschedulableAndUnresolvable, "the requested CPUs must be integer"), }, { name: "skip Pod with unsupported bind policy", @@ -516,6 +519,7 @@ func TestPlugin_PreFilter(t *testing.T) { requests: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("4.5"), }, + numCPUsNeeded: 4, }, }, } @@ -560,13 +564,6 @@ func TestPlugin_Filter(t *testing.T) { name: "error with missing preFilterState", want: framework.AsStatus(framework.ErrNotFound), }, - { - name: "error with missing CPUTopology", - state: &preFilterState{ - requestCPUBind: true, - }, - want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrNotFoundCPUTopology), - }, { name: "error with invalid cpu topology", state: &preFilterState{ @@ -606,6 +603,38 @@ func TestPlugin_Filter(t *testing.T) { allocationState: NewNodeAllocation("test-node-1"), want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrSMTAlignmentError), }, + { + name: "LS Pod failed to verify Node FullPCPUsOnly with SMTAlignmentError", + nodeLabels: map[string]string{ + extension.LabelNodeCPUBindPolicy: string(extension.NodeCPUBindPolicyFullPCPUsOnly), + }, + state: &preFilterState{ + requestCPUBind: false, + numCPUsNeeded: 5, + requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("5"), + }, + }, + cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), + allocationState: NewNodeAllocation("test-node-1"), + want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrSMTAlignmentError), + }, + { + name: "LS Pod failed to verify Node FullPCPUsOnly with non-integer request", + nodeLabels: map[string]string{ + extension.LabelNodeCPUBindPolicy: string(extension.NodeCPUBindPolicyFullPCPUsOnly), + }, + state: &preFilterState{ + requestCPUBind: false, + numCPUsNeeded: 5, + requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("5200m"), + }, + }, + cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), + allocationState: NewNodeAllocation("test-node-1"), + want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrInvalidRequestedCPUs), + }, { name: "verify Node FullPCPUsOnly", nodeLabels: map[string]string{ @@ -654,22 +683,35 @@ func TestPlugin_Filter(t *testing.T) { }, cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), allocationState: NewNodeAllocation("test-node-1"), - want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy), + want: nil, + }, + { + name: "failed to verify FullPCPUsOnly with required SpreadByPCPUs", + nodeLabels: map[string]string{ + extension.LabelNodeCPUBindPolicy: string(extension.NodeCPUBindPolicySpreadByPCPUs), + }, + state: &preFilterState{ + requestCPUBind: true, + requiredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + numCPUsNeeded: 4, + }, + cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), + allocationState: NewNodeAllocation("test-node-1"), + want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrCPUBindPolicyConflict), }, { - name: "verify FullPCPUsOnly with required SpreadByPCPUs", + name: "verify FullPCPUsOnly with required FullPCPUs", nodeLabels: map[string]string{ extension.LabelNodeCPUBindPolicy: string(extension.NodeCPUBindPolicyFullPCPUsOnly), }, state: &preFilterState{ - requestCPUBind: true, - requiredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs, - preferredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs, - numCPUsNeeded: 4, + requestCPUBind: true, + requiredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + numCPUsNeeded: 4, }, cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), allocationState: NewNodeAllocation("test-node-1"), - want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy), + want: nil, }, { name: "verify Kubelet FullPCPUsOnly with SMTAlignmentError", @@ -689,11 +731,28 @@ func TestPlugin_Filter(t *testing.T) { want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrSMTAlignmentError), }, { - name: "verify Kubelet FullPCPUsOnly with RequiredFullPCPUsPolicy", + name: "verify Kubelet FullPCPUsOnly with required SpreadByPCPUs", state: &preFilterState{ - requestCPUBind: true, - preferredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs, - numCPUsNeeded: 4, + requestCPUBind: true, + requiredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs, + numCPUsNeeded: 4, + }, + cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), + allocationState: NewNodeAllocation("test-node-1"), + kubeletPolicy: &extension.KubeletCPUManagerPolicy{ + Policy: extension.KubeletCPUManagerPolicyStatic, + Options: map[string]string{ + extension.KubeletCPUManagerPolicyFullPCPUsOnlyOption: "true", + }, + }, + want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrCPUBindPolicyConflict), + }, + { + name: "verify Kubelet FullPCPUsOnly with required FullPCPUs", + state: &preFilterState{ + requestCPUBind: true, + requiredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + numCPUsNeeded: 4, }, cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), allocationState: NewNodeAllocation("test-node-1"), @@ -703,7 +762,7 @@ func TestPlugin_Filter(t *testing.T) { extension.KubeletCPUManagerPolicyFullPCPUsOnlyOption: "true", }, }, - want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy), + want: nil, }, { name: "verify required FullPCPUs with none NUMA topology policy", @@ -820,7 +879,7 @@ func TestPlugin_Filter(t *testing.T) { cycleState := framework.NewCycleState() if tt.state != nil { - if tt.state.numCPUsNeeded > 0 { + if tt.state.numCPUsNeeded > 0 && tt.state.requests == nil { tt.state.requests = corev1.ResourceList{corev1.ResourceCPU: *resource.NewQuantity(int64(tt.state.numCPUsNeeded), resource.DecimalSI)} } cycleState.Write(stateKey, tt.state) @@ -910,7 +969,7 @@ func TestFilterWithAmplifiedCPUs(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { numCPUs := tt.cpuTopology.NumCPUs - cpu := fmt.Sprintf("%d", extension.Amplify(int64(numCPUs), tt.nodeCPUAmplificationRatio)) + cpu := fmt.Sprintf("%d", numCPUs) node := makeNode("node-1", map[corev1.ResourceName]string{"cpu": cpu, "memory": "40Gi"}, tt.nodeCPUAmplificationRatio) suit := newPluginTestSuit(t, tt.existingPods, []*corev1.Node{node}) @@ -963,21 +1022,12 @@ func TestPlugin_Reserve(t *testing.T) { allocatedCPUs []int want *framework.Status wantCPUSet cpuset.CPUSet - wantState *preFilterState }{ { name: "error with missing preFilterState", pod: &corev1.Pod{}, want: framework.AsStatus(framework.ErrNotFound), }, - { - name: "error with missing allocationState", - state: &preFilterState{ - requestCPUBind: true, - }, - pod: &corev1.Pod{}, - want: framework.NewStatus(framework.Error, ErrNotFoundCPUTopology), - }, { name: "error with invalid cpu topology", state: &preFilterState{ @@ -1013,19 +1063,33 @@ func TestPlugin_Reserve(t *testing.T) { extension.LabelNodeCPUBindPolicy: string(extension.NodeCPUBindPolicySpreadByPCPUs), }, state: &preFilterState{ - requestCPUBind: true, - numCPUsNeeded: 4, - preferredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + requestCPUBind: false, + numCPUsNeeded: 4, + requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("4"), + }, }, cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), pod: &corev1.Pod{}, want: nil, wantCPUSet: cpuset.NewCPUSet(0, 2, 4, 6), - wantState: &preFilterState{ - requestCPUBind: true, - numCPUsNeeded: 4, - preferredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs, + }, + { + name: "BE Pod reserves with node cpu bind policy", + nodeLabels: map[string]string{ + extension.LabelNodeCPUBindPolicy: string(extension.NodeCPUBindPolicySpreadByPCPUs), }, + state: &preFilterState{ + requestCPUBind: false, + numCPUsNeeded: 4, + requests: corev1.ResourceList{ + extension.BatchCPU: resource.MustParse("4000"), + }, + }, + cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), + pod: &corev1.Pod{}, + want: nil, + wantCPUSet: cpuset.NewCPUSet(), }, { name: "error with big request cpu", @@ -1215,7 +1279,12 @@ func TestPlugin_Unreserve(t *testing.T) { } func TestPlugin_PreBind(t *testing.T) { - suit := newPluginTestSuit(t, nil, nil) + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + } + suit := newPluginTestSuit(t, nil, []*corev1.Node{node}) p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle) assert.NotNil(t, p) assert.Nil(t, err) @@ -1245,7 +1314,7 @@ func TestPlugin_PreBind(t *testing.T) { cycleState := framework.NewCycleState() cycleState.Write(stateKey, state) - s := plg.PreBind(context.TODO(), cycleState, pod, "test-node-1") + s := plg.PreBind(context.TODO(), cycleState, pod, node.Name) assert.True(t, s.IsSuccess()) resourceStatus, err := extension.GetResourceStatus(pod.Annotations) assert.NoError(t, err) @@ -1257,7 +1326,12 @@ func TestPlugin_PreBind(t *testing.T) { } func TestPlugin_PreBindWithCPUBindPolicyNone(t *testing.T) { - suit := newPluginTestSuit(t, nil, nil) + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + } + suit := newPluginTestSuit(t, nil, []*corev1.Node{node}) p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle) assert.NotNil(t, p) assert.Nil(t, err) @@ -1288,7 +1362,7 @@ func TestPlugin_PreBindWithCPUBindPolicyNone(t *testing.T) { cycleState := framework.NewCycleState() cycleState.Write(stateKey, state) - s := plg.PreBind(context.TODO(), cycleState, pod, "test-node-1") + s := plg.PreBind(context.TODO(), cycleState, pod, node.Name) assert.True(t, s.IsSuccess()) resourceStatus, err := extension.GetResourceStatus(pod.Annotations) assert.NoError(t, err) @@ -1307,7 +1381,12 @@ func TestPlugin_PreBindWithCPUBindPolicyNone(t *testing.T) { } func TestPlugin_PreBindReservation(t *testing.T) { - suit := newPluginTestSuit(t, nil, nil) + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + } + suit := newPluginTestSuit(t, nil, []*corev1.Node{node}) p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle) assert.NotNil(t, p) assert.Nil(t, err) @@ -1336,7 +1415,7 @@ func TestPlugin_PreBindReservation(t *testing.T) { cycleState := framework.NewCycleState() cycleState.Write(stateKey, state) - s := plg.PreBindReservation(context.TODO(), cycleState, reservation, "test-node-1") + s := plg.PreBindReservation(context.TODO(), cycleState, reservation, node.Name) assert.True(t, s.IsSuccess()) resourceStatus, err := extension.GetResourceStatus(reservation.Annotations) @@ -1459,6 +1538,7 @@ func Test_appendResourceSpecIfMissed(t *testing.T) { tests := []struct { name string resourceSpec *extension.ResourceSpec + nodePolicy extension.NodeCPUBindPolicy state *preFilterState wantErr bool wantSpec *extension.ResourceSpec @@ -1532,6 +1612,14 @@ func Test_appendResourceSpecIfMissed(t *testing.T) { PreferredCPUExclusivePolicy: extension.CPUExclusivePolicyPCPULevel, }, }, + { + name: "LS Pod assigned on node with FullPCPUsOnly", + state: &preFilterState{}, + nodePolicy: extension.NodeCPUBindPolicyFullPCPUsOnly, + wantSpec: &extension.ResourceSpec{ + RequiredCPUBindPolicy: extension.CPUBindPolicyFullPCPUs, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -1539,7 +1627,15 @@ func Test_appendResourceSpecIfMissed(t *testing.T) { if tt.resourceSpec != nil { assert.NoError(t, extension.SetResourceSpec(pod, tt.resourceSpec)) } - err := appendResourceSpecIfMissed(pod, tt.state) + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{}, + }, + } + if tt.nodePolicy != "" { + node.Labels[extension.LabelNodeCPUBindPolicy] = string(tt.nodePolicy) + } + err := appendResourceSpecIfMissed(pod, tt.state, node, &TopologyOptions{}) if tt.wantErr != (err != nil) { t.Errorf("appendResourceSpecIfMissed(%v, %v)", pod, tt.state) } diff --git a/pkg/scheduler/plugins/nodenumaresource/resource_manager.go b/pkg/scheduler/plugins/nodenumaresource/resource_manager.go index c7e580bbb..be3a438c5 100644 --- a/pkg/scheduler/plugins/nodenumaresource/resource_manager.go +++ b/pkg/scheduler/plugins/nodenumaresource/resource_manager.go @@ -361,7 +361,7 @@ func (c *resourceManager) allocateCPUSet(node *corev1.Node, pod *corev1.Pod, all func (c *resourceManager) Update(nodeName string, allocation *PodAllocation) { topologyOptions := c.topologyOptionsManager.GetTopologyOptions(nodeName) - if topologyOptions.CPUTopology == nil || !topologyOptions.CPUTopology.IsValid() { + if !topologyOptions.CPUTopology.IsValid() { return } @@ -390,7 +390,7 @@ func (c *resourceManager) GetAllocatedCPUSet(nodeName string, podUID types.UID) func (c *resourceManager) GetAvailableCPUs(nodeName string, preferredCPUs cpuset.CPUSet) (availableCPUs cpuset.CPUSet, allocated CPUDetails, err error) { topologyOptions := c.topologyOptionsManager.GetTopologyOptions(nodeName) if topologyOptions.CPUTopology == nil { - return cpuset.NewCPUSet(), nil, errors.New(ErrNotFoundCPUTopology) + return cpuset.NewCPUSet(), nil, nil } if !topologyOptions.CPUTopology.IsValid() { return cpuset.NewCPUSet(), nil, errors.New(ErrInvalidCPUTopology) diff --git a/pkg/scheduler/plugins/nodenumaresource/scoring.go b/pkg/scheduler/plugins/nodenumaresource/scoring.go index c4034bf25..f34614c57 100644 --- a/pkg/scheduler/plugins/nodenumaresource/scoring.go +++ b/pkg/scheduler/plugins/nodenumaresource/scoring.go @@ -57,6 +57,9 @@ func (p *Plugin) Score(ctx context.Context, cycleState *framework.CycleState, po if !status.IsSuccess() { return 0, status } + if state.skip { + return 0, nil + } nodeInfo, err := p.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) if err != nil { @@ -64,53 +67,46 @@ func (p *Plugin) Score(ctx context.Context, cycleState *framework.CycleState, po } node := nodeInfo.Node() topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name) + nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy) numaTopologyPolicy := getNUMATopologyPolicy(node.Labels, topologyOptions.NUMATopologyPolicy) - - if skipTheNode(state, numaTopologyPolicy) { - if state.skip { - return 0, nil - } - return p.scoreWithAmplifiedCPUs(cycleState, state, pod, nodeInfo, topologyOptions) + requestCPUBind, status := requestCPUBind(state, nodeCPUBindPolicy) + if !status.IsSuccess() { + return 0, status } - - if state.requestCPUBind && (topologyOptions.CPUTopology == nil || !topologyOptions.CPUTopology.IsValid()) { + if requestCPUBind && !topologyOptions.CPUTopology.IsValid() { return 0, nil } store := topologymanager.GetStore(cycleState) affinity := store.GetAffinity(nodeName) - resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, affinity, topologyOptions) + resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, requestCPUBind, affinity, topologyOptions) if err != nil { return 0, nil } + + if numaTopologyPolicy == extension.NUMATopologyPolicyNone { + return p.scoreWithAmplifiedCPUs(state, nodeInfo, resourceOptions) + } + podAllocation, err := p.resourceManager.Allocate(node, pod, resourceOptions) if err != nil { return 0, nil } - allocatable, requested := p.calculateAllocatableAndRequested(node.Name, nodeInfo, podAllocation, resourceOptions) return p.scorer.score(requested, allocatable, framework.NewResource(resourceOptions.requests)) } -func (p *Plugin) scoreWithAmplifiedCPUs(cycleState *framework.CycleState, state *preFilterState, pod *corev1.Pod, nodeInfo *framework.NodeInfo, topologyOptions TopologyOptions) (int64, *framework.Status) { - node := nodeInfo.Node() - resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, topologymanager.NUMATopologyHint{}, topologyOptions) - if err != nil { - return 0, nil - } - +func (p *Plugin) scoreWithAmplifiedCPUs(state *preFilterState, nodeInfo *framework.NodeInfo, resourceOptions *ResourceOptions) (int64, *framework.Status) { quantity := state.requests[corev1.ResourceCPU] cpuAmplificationRatio := resourceOptions.topologyOptions.AmplificationRatios[corev1.ResourceCPU] if quantity.IsZero() || cpuAmplificationRatio <= 1 { return p.scorer.score(nodeInfo.Requested, nodeInfo.Allocatable, framework.NewResource(resourceOptions.requests)) } + node := nodeInfo.Node() _, allocated, err := p.resourceManager.GetAvailableCPUs(node.Name, resourceOptions.preferredCPUs) if err != nil { - if err.Error() != ErrNotFoundCPUTopology { - return 0, nil - } - return p.scorer.score(nodeInfo.Requested, nodeInfo.Allocatable, framework.NewResource(resourceOptions.requests)) + return 0, nil } allocatedMilliCPU := int64(allocated.CPUs().Size() * 1000) requested := nodeInfo.Requested.Clone() diff --git a/pkg/scheduler/plugins/nodenumaresource/scoring_test.go b/pkg/scheduler/plugins/nodenumaresource/scoring_test.go index c965a529c..91dd4113a 100644 --- a/pkg/scheduler/plugins/nodenumaresource/scoring_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/scoring_test.go @@ -566,14 +566,15 @@ func TestScoreWithAmplifiedCPUs(t *testing.T) { wantScoreList framework.NodeScoreList }{ { - name: "ScoringStrategy MostAllocated, no cpuset pod", + name: "ScoringStrategy MostAllocated, non-cpuset pod", requestedPod: makePod(map[corev1.ResourceName]string{"cpu": "8", "memory": "16Gi"}, false), nodes: []*corev1.Node{ makeNode("node1", map[corev1.ResourceName]string{"cpu": "32", "memory": "40Gi"}, 1.0), makeNode("node2", map[corev1.ResourceName]string{"cpu": "64", "memory": "60Gi"}, 2.0), + makeNode("node3", map[corev1.ResourceName]string{"cpu": "32", "memory": "40Gi"}, 2.0), }, - nodeRatios: map[string]apiext.Ratio{"node1": 1.0, "node2": 2.0}, - wantScoreList: []framework.NodeScore{{Name: "node1", Score: 0}, {Name: "node2", Score: 0}}, + nodeRatios: map[string]apiext.Ratio{"node1": 1.0, "node2": 2.0, "node3": 2.0}, + wantScoreList: []framework.NodeScore{{Name: "node1", Score: 32}, {Name: "node2", Score: 16}, {Name: "node3", Score: 26}}, args: schedulerconfig.NodeNUMAResourceArgs{ ScoringStrategy: &schedulerconfig.ScoringStrategy{ Type: schedulerconfig.MostAllocated, @@ -582,7 +583,30 @@ func TestScoreWithAmplifiedCPUs(t *testing.T) { }, }, { - name: "ScoringStrategy MostAllocated, cpuset pods on node", + name: "ScoringStrategy MostAllocated, cpuset pod", + requestedPod: makePod(map[corev1.ResourceName]string{"cpu": "8", "memory": "16Gi"}, true), + nodes: []*corev1.Node{ + makeNode("node1", map[corev1.ResourceName]string{"cpu": "32", "memory": "40Gi"}, 1.0), + makeNode("node2", map[corev1.ResourceName]string{"cpu": "64", "memory": "60Gi"}, 2.0), + makeNode("node3", map[corev1.ResourceName]string{"cpu": "32", "memory": "40Gi"}, 2.0), + }, + nodeRatios: map[string]apiext.Ratio{"node1": 1.0, "node2": 2.0, "node3": 2.0}, + nodeHasNRT: []string{"node1", "node2", "node3"}, + cpuTopologies: map[string]*CPUTopology{ + "node1": buildCPUTopologyForTest(2, 1, 8, 2), + "node2": buildCPUTopologyForTest(2, 1, 8, 2), + "node3": buildCPUTopologyForTest(2, 1, 8, 2), + }, + wantScoreList: []framework.NodeScore{{Name: "node1", Score: 32}, {Name: "node2", Score: 19}, {Name: "node3", Score: 32}}, + args: schedulerconfig.NodeNUMAResourceArgs{ + ScoringStrategy: &schedulerconfig.ScoringStrategy{ + Type: schedulerconfig.MostAllocated, + Resources: defaultResources, + }, + }, + }, + { + name: "ScoringStrategy MostAllocated, non-cpuset pods, and existing cpuset pod on node", requestedPod: makePod(map[corev1.ResourceName]string{"cpu": "8", "memory": "16Gi"}, false), nodes: []*corev1.Node{ makeNode("node1", map[corev1.ResourceName]string{"cpu": "32", "memory": "40Gi"}, 1.0), @@ -598,7 +622,7 @@ func TestScoreWithAmplifiedCPUs(t *testing.T) { makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node1", true), makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node2", true), }, - wantScoreList: []framework.NodeScore{{Name: "node1", Score: 68}, {Name: "node2", Score: 54}}, + wantScoreList: []framework.NodeScore{{Name: "node1", Score: 68}, {Name: "node2", Score: 35}}, args: schedulerconfig.NodeNUMAResourceArgs{ ScoringStrategy: &schedulerconfig.ScoringStrategy{ Type: schedulerconfig.MostAllocated, @@ -607,7 +631,7 @@ func TestScoreWithAmplifiedCPUs(t *testing.T) { }, }, { - name: "ScoringStrategy MostAllocated, scheduling cpuset pod", + name: "ScoringStrategy MostAllocated, scheduling cpuset pod with existing non-cpuset pods", requestedPod: makePod(map[corev1.ResourceName]string{"cpu": "8", "memory": "16Gi"}, true), nodes: []*corev1.Node{ makeNode("node1", map[corev1.ResourceName]string{"cpu": "32", "memory": "40Gi"}, 1.0), @@ -623,7 +647,7 @@ func TestScoreWithAmplifiedCPUs(t *testing.T) { makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node1", false), makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node2", false), }, - wantScoreList: []framework.NodeScore{{Name: "node1", Score: 37}, {Name: "node2", Score: 29}}, + wantScoreList: []framework.NodeScore{{Name: "node1", Score: 68}, {Name: "node2", Score: 30}}, args: schedulerconfig.NodeNUMAResourceArgs{ ScoringStrategy: &schedulerconfig.ScoringStrategy{ Type: schedulerconfig.MostAllocated, @@ -648,7 +672,7 @@ func TestScoreWithAmplifiedCPUs(t *testing.T) { makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node1", true), makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node2", true), }, - wantScoreList: []framework.NodeScore{{Name: "node1", Score: 68}, {Name: "node2", Score: 60}}, + wantScoreList: []framework.NodeScore{{Name: "node1", Score: 68}, {Name: "node2", Score: 38}}, args: schedulerconfig.NodeNUMAResourceArgs{ ScoringStrategy: &schedulerconfig.ScoringStrategy{ Type: schedulerconfig.MostAllocated, @@ -668,7 +692,7 @@ func TestScoreWithAmplifiedCPUs(t *testing.T) { makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node1", false), makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node2", false), }, - wantScoreList: []framework.NodeScore{{Name: "node1", Score: 0}, {Name: "node2", Score: 0}}, + wantScoreList: []framework.NodeScore{{Name: "node1", Score: 31}, {Name: "node2", Score: 72}}, args: schedulerconfig.NodeNUMAResourceArgs{ ScoringStrategy: &schedulerconfig.ScoringStrategy{ Type: schedulerconfig.LeastAllocated, @@ -677,7 +701,7 @@ func TestScoreWithAmplifiedCPUs(t *testing.T) { }, }, { - name: "ScoringStrategy LeastAllocated, cpuset pods on node", + name: "ScoringStrategy LeastAllocated, non-cpuset pod with existing cpuset pods", requestedPod: makePod(map[corev1.ResourceName]string{"cpu": "8", "memory": "16Gi"}, false), nodes: []*corev1.Node{ makeNode("node1", map[corev1.ResourceName]string{"cpu": "32", "memory": "40Gi"}, 1.0), @@ -693,7 +717,7 @@ func TestScoreWithAmplifiedCPUs(t *testing.T) { makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node1", true), makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node2", true), }, - wantScoreList: []framework.NodeScore{{Name: "node1", Score: 31}, {Name: "node2", Score: 45}}, + wantScoreList: []framework.NodeScore{{Name: "node1", Score: 31}, {Name: "node2", Score: 64}}, args: schedulerconfig.NodeNUMAResourceArgs{ ScoringStrategy: &schedulerconfig.ScoringStrategy{ Type: schedulerconfig.LeastAllocated, @@ -702,7 +726,7 @@ func TestScoreWithAmplifiedCPUs(t *testing.T) { }, }, { - name: "ScoringStrategy LeastAllocated, scheduling cpuset pod", + name: "ScoringStrategy LeastAllocated, scheduling cpuset pod with existing non-cpuset pods", requestedPod: makePod(map[corev1.ResourceName]string{"cpu": "8", "memory": "16Gi"}, true), nodes: []*corev1.Node{ makeNode("node1", map[corev1.ResourceName]string{"cpu": "32", "memory": "40Gi"}, 1.0), @@ -718,7 +742,7 @@ func TestScoreWithAmplifiedCPUs(t *testing.T) { makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node1", false), makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node2", false), }, - wantScoreList: []framework.NodeScore{{Name: "node1", Score: 62}, {Name: "node2", Score: 70}}, + wantScoreList: []framework.NodeScore{{Name: "node1", Score: 31}, {Name: "node2", Score: 68}}, args: schedulerconfig.NodeNUMAResourceArgs{ ScoringStrategy: &schedulerconfig.ScoringStrategy{ Type: schedulerconfig.LeastAllocated, @@ -743,7 +767,7 @@ func TestScoreWithAmplifiedCPUs(t *testing.T) { makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node1", true), makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node2", true), }, - wantScoreList: []framework.NodeScore{{Name: "node1", Score: 31}, {Name: "node2", Score: 39}}, + wantScoreList: []framework.NodeScore{{Name: "node1", Score: 31}, {Name: "node2", Score: 61}}, args: schedulerconfig.NodeNUMAResourceArgs{ ScoringStrategy: &schedulerconfig.ScoringStrategy{ Type: schedulerconfig.LeastAllocated, diff --git a/pkg/scheduler/plugins/nodenumaresource/service.go b/pkg/scheduler/plugins/nodenumaresource/service.go index 780f9a321..95f92b74c 100644 --- a/pkg/scheduler/plugins/nodenumaresource/service.go +++ b/pkg/scheduler/plugins/nodenumaresource/service.go @@ -48,7 +48,7 @@ func (p *Plugin) RegisterEndpoints(group *gin.RouterGroup) { } topologyOptions := p.topologyOptionsManager.GetTopologyOptions(nodeName) - if topologyOptions.CPUTopology == nil || !topologyOptions.CPUTopology.IsValid() { + if !topologyOptions.CPUTopology.IsValid() { services.ResponseErrorMessage(c, http.StatusInternalServerError, "invalid topology, please check the NodeResourceTopology object") return } diff --git a/pkg/scheduler/plugins/nodenumaresource/topology_hint.go b/pkg/scheduler/plugins/nodenumaresource/topology_hint.go index c25d321b7..d09aba88a 100644 --- a/pkg/scheduler/plugins/nodenumaresource/topology_hint.go +++ b/pkg/scheduler/plugins/nodenumaresource/topology_hint.go @@ -48,9 +48,13 @@ func (p *Plugin) GetPodTopologyHints(ctx context.Context, cycleState *framework. return nil, framework.AsStatus(err) } node := nodeInfo.Node() - topologyOptions := p.topologyOptionsManager.GetTopologyOptions(nodeName) - resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, topologymanager.NUMATopologyHint{}, topologyOptions) + nodeCPUBindPolicy := apiext.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy) + requestCPUBind, status := requestCPUBind(state, nodeCPUBindPolicy) + if !status.IsSuccess() { + return nil, status + } + resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, requestCPUBind, topologymanager.NUMATopologyHint{}, topologyOptions) if err != nil { return nil, framework.AsStatus(err) } @@ -63,7 +67,6 @@ func (p *Plugin) GetPodTopologyHints(ctx context.Context, cycleState *framework. } func (p *Plugin) Allocate(ctx context.Context, cycleState *framework.CycleState, affinity topologymanager.NUMATopologyHint, pod *corev1.Pod, nodeName string) *framework.Status { - topologyOptions := p.topologyOptionsManager.GetTopologyOptions(nodeName) state, status := getPreFilterState(cycleState) if !status.IsSuccess() { return status @@ -74,8 +77,14 @@ func (p *Plugin) Allocate(ctx context.Context, cycleState *framework.CycleState, return framework.AsStatus(err) } node := nodeInfo.Node() + topologyOptions := p.topologyOptionsManager.GetTopologyOptions(nodeName) + nodeCPUBindPolicy := apiext.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy) + requestCPUBind, status := requestCPUBind(state, nodeCPUBindPolicy) + if !status.IsSuccess() { + return status + } - resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, affinity, topologyOptions) + resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, requestCPUBind, affinity, topologyOptions) if err != nil { return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) } diff --git a/pkg/scheduler/plugins/nodenumaresource/util.go b/pkg/scheduler/plugins/nodenumaresource/util.go index b4eccf845..21ed563e0 100644 --- a/pkg/scheduler/plugins/nodenumaresource/util.go +++ b/pkg/scheduler/plugins/nodenumaresource/util.go @@ -18,6 +18,7 @@ package nodenumaresource import ( corev1 "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/scheduler/framework" "github.com/koordinator-sh/koordinator/apis/extension" schedulingconfig "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" @@ -56,10 +57,6 @@ func getNUMATopologyPolicy(nodeLabels map[string]string, kubeletTopologyManagerP return kubeletTopologyManagerPolicy } -func skipTheNode(state *preFilterState, numaTopologyPolicy extension.NUMATopologyPolicy) bool { - return state.skip || (!state.requestCPUBind && numaTopologyPolicy == extension.NUMATopologyPolicyNone) -} - // amplifyNUMANodeResources amplifies the resources per NUMA Node. // NOTE(joseph): After the NodeResource controller supports amplifying by ratios, should remove the function. func amplifyNUMANodeResources(node *corev1.Node, topologyOptions *TopologyOptions) error { @@ -84,3 +81,42 @@ func amplifyNUMANodeResources(node *corev1.Node, topologyOptions *TopologyOption topologyOptions.NUMANodeResources = numaNodeResources return nil } + +func getCPUBindPolicy(topologyOptions *TopologyOptions, node *corev1.Node, requiredCPUBindPolicy, preferredCPUBindPolicy schedulingconfig.CPUBindPolicy) (schedulingconfig.CPUBindPolicy, bool, error) { + if requiredCPUBindPolicy != "" { + return requiredCPUBindPolicy, true, nil + } + + cpuBindPolicy := preferredCPUBindPolicy + required := false + kubeletCPUPolicy := topologyOptions.Policy + nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, kubeletCPUPolicy) + switch nodeCPUBindPolicy { + case extension.NodeCPUBindPolicySpreadByPCPUs: + cpuBindPolicy = schedulingconfig.CPUBindPolicySpreadByPCPUs + required = true + case extension.NodeCPUBindPolicyFullPCPUsOnly: + cpuBindPolicy = schedulingconfig.CPUBindPolicyFullPCPUs + required = true + } + return cpuBindPolicy, required, nil +} + +func requestCPUBind(state *preFilterState, nodeCPUBindPolicy extension.NodeCPUBindPolicy) (bool, *framework.Status) { + if state.requestCPUBind { + return true, nil + } + + requestedCPU := state.requests.Cpu().MilliValue() + if requestedCPU == 0 { + return false, nil + } + + if nodeCPUBindPolicy != "" && nodeCPUBindPolicy != extension.NodeCPUBindPolicyNone { + if requestedCPU%1000 != 0 { + return false, framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrInvalidRequestedCPUs) + } + return true, nil + } + return false, nil +} diff --git a/pkg/scheduler/plugins/nodenumaresource/util_test.go b/pkg/scheduler/plugins/nodenumaresource/util_test.go new file mode 100644 index 000000000..853f56db0 --- /dev/null +++ b/pkg/scheduler/plugins/nodenumaresource/util_test.go @@ -0,0 +1,101 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodenumaresource + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/koordinator-sh/koordinator/apis/extension" + schedulingconfig "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" +) + +func Test_getCPUBindPolicy(t *testing.T) { + tests := []struct { + name string + kubeletPolicy *extension.KubeletCPUManagerPolicy + nodePolicy extension.NodeCPUBindPolicy + requiredPolicy schedulingconfig.CPUBindPolicy + preferredPolicy schedulingconfig.CPUBindPolicy + wantPolicy schedulingconfig.CPUBindPolicy + wantRequired bool + wantError bool + }{ + { + name: "kubelet enables FullPCPUsOnly", + kubeletPolicy: &extension.KubeletCPUManagerPolicy{ + Policy: extension.KubeletCPUManagerPolicyStatic, + Options: map[string]string{ + extension.KubeletCPUManagerPolicyFullPCPUsOnlyOption: "true", + }, + }, + nodePolicy: "", + requiredPolicy: "", + preferredPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + wantPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + wantRequired: true, + wantError: false, + }, + { + name: "node enables FullPCPUsOnly", + nodePolicy: extension.NodeCPUBindPolicyFullPCPUsOnly, + requiredPolicy: "", + preferredPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + wantPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + wantRequired: true, + wantError: false, + }, + { + name: "pod enables required FullPCPUsOnly", + requiredPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + wantPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + wantRequired: true, + wantError: false, + }, + { + name: "pod enables preferred FullPCPUsOnly", + preferredPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + wantPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + wantRequired: false, + wantError: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + topologyOpts := &TopologyOptions{ + Policy: tt.kubeletPolicy, + } + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{}, + }, + } + if tt.nodePolicy != "" { + node.Labels[extension.LabelNodeCPUBindPolicy] = string(tt.nodePolicy) + } + policy, required, err := getCPUBindPolicy(topologyOpts, node, tt.requiredPolicy, tt.preferredPolicy) + assert.Equal(t, tt.wantPolicy, policy) + assert.Equal(t, tt.wantRequired, required) + if tt.wantError != (err != nil) { + t.Errorf("wantErr=%v, but got err=%v", tt.wantError, err) + } + }) + } +}