Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: improve NodeNUMAResource handling node cpu bind policy #1892

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/nodenumaresource/cpu_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (b *CPUTopologyBuilder) Result() *CPUTopology {

// IsValid checks if the topology is valid
func (topo *CPUTopology) IsValid() bool {
return topo.NumSockets != 0 && topo.NumNodes != 0 && topo.NumCores != 0 && topo.NumCPUs != 0
return topo != nil && topo.NumSockets != 0 && topo.NumNodes != 0 && topo.NumCores != 0 && topo.NumCPUs != 0
}

// CPUsPerCore returns the number of logical CPUs are associated with each core.
Expand Down
179 changes: 89 additions & 90 deletions pkg/scheduler/plugins/nodenumaresource/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package nodenumaresource

import (
"context"
"errors"
"fmt"

nrtv1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
Expand Down Expand Up @@ -47,10 +46,10 @@ const (
)

const (
ErrNotFoundCPUTopology = "node(s) CPU Topology not found"
ErrInvalidRequestedCPUs = "the requested CPUs must be integer"
ErrInvalidCPUTopology = "node(s) invalid CPU Topology"
ErrSMTAlignmentError = "node(s) requested cpus not multiple cpus per core"
ErrRequiredFullPCPUsPolicy = "node(s) required FullPCPUs policy"
ErrCPUBindPolicyConflict = "node(s) cpu bind policy conflicts with pod's required cpu bind policy"
ErrInvalidCPUAmplificationRatio = "node(s) invalid CPU amplification ratio"
ErrInsufficientAmplifiedCPU = "Insufficient amplified cpu"
)
Expand Down Expand Up @@ -230,9 +229,11 @@ func (p *Plugin) PreFilter(ctx context.Context, cycleState *framework.CycleState
})
return nil, nil
}
requestedCPU := requests.Cpu().MilliValue()
state := &preFilterState{
requestCPUBind: false,
requests: requests,
numCPUsNeeded: int(requestedCPU / 1000),
}
if AllowUseCPUSet(pod) {
cpuBindPolicy := schedulingconfig.CPUBindPolicy(resourceSpec.PreferredCPUBindPolicy)
Expand All @@ -249,17 +250,15 @@ func (p *Plugin) PreFilter(ctx context.Context, cycleState *framework.CycleState

if cpuBindPolicy == schedulingconfig.CPUBindPolicyFullPCPUs ||
cpuBindPolicy == schedulingconfig.CPUBindPolicySpreadByPCPUs {
requestedCPU := requests.Cpu().MilliValue()
if requestedCPU%1000 != 0 {
return nil, framework.NewStatus(framework.Error, "the requested CPUs must be integer")
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrInvalidRequestedCPUs)
}

if requestedCPU > 0 {
state.requestCPUBind = true
state.requiredCPUBindPolicy = requiredCPUBindPolicy
state.preferredCPUBindPolicy = cpuBindPolicy
state.preferredCPUExclusivePolicy = resourceSpec.PreferredCPUExclusivePolicy
state.numCPUsNeeded = int(requestedCPU / 1000)
}
}
}
Expand All @@ -278,47 +277,49 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p
if !status.IsSuccess() {
return status
}

if status := p.filterAmplifiedCPUs(state, nodeInfo); !status.IsSuccess() {
return status
if state.skip {
return nil
}

node := nodeInfo.Node()
topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name)
nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy)
numaTopologyPolicy := getNUMATopologyPolicy(node.Labels, topologyOptions.NUMATopologyPolicy)

if skipTheNode(state, numaTopologyPolicy) {
return nil
requestCPUBind, status := requestCPUBind(state, nodeCPUBindPolicy)
if !status.IsSuccess() {
return status
}

if state.requestCPUBind {
if topologyOptions.CPUTopology == nil {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrNotFoundCPUTopology)
}
if status := p.filterAmplifiedCPUs(state.requests.Cpu().MilliValue(), nodeInfo, requestCPUBind); !status.IsSuccess() {
return status
}

if requestCPUBind {
// It's necessary to force node to have NodeResourceTopology and CPUTopology
// We must satisfy the user's CPUSet request. Even if some nodes in the cluster have resources,
// they cannot be allocated without valid CPU topology.
if !topologyOptions.CPUTopology.IsValid() {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrInvalidCPUTopology)
}
nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy)
if nodeCPUBindPolicy == extension.NodeCPUBindPolicyFullPCPUsOnly ||
state.requiredCPUBindPolicy == schedulingconfig.CPUBindPolicyFullPCPUs {

requiredCPUBindPolicy := state.requiredCPUBindPolicy
if nodeCPUBindPolicy == extension.NodeCPUBindPolicyFullPCPUsOnly {
requiredCPUBindPolicy = schedulingconfig.CPUBindPolicyFullPCPUs
} else if nodeCPUBindPolicy == extension.NodeCPUBindPolicySpreadByPCPUs {
requiredCPUBindPolicy = schedulingconfig.CPUBindPolicySpreadByPCPUs
}
if state.requiredCPUBindPolicy != "" && state.requiredCPUBindPolicy != requiredCPUBindPolicy {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrCPUBindPolicyConflict)
}

if requiredCPUBindPolicy == schedulingconfig.CPUBindPolicyFullPCPUs {
if state.numCPUsNeeded%topologyOptions.CPUTopology.CPUsPerCore() != 0 {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrSMTAlignmentError)
}

if nodeCPUBindPolicy == extension.NodeCPUBindPolicyFullPCPUsOnly {
if (state.requiredCPUBindPolicy != "" && state.requiredCPUBindPolicy != schedulingconfig.CPUBindPolicyFullPCPUs) ||
(state.preferredCPUBindPolicy != "" && state.preferredCPUBindPolicy != schedulingconfig.CPUBindPolicyFullPCPUs) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy)
}
}
}

if state.requiredCPUBindPolicy != "" && numaTopologyPolicy == extension.NUMATopologyPolicyNone {
resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, topologymanager.NUMATopologyHint{}, topologyOptions)
if requiredCPUBindPolicy != "" && numaTopologyPolicy == extension.NUMATopologyPolicyNone {
resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, requestCPUBind, topologymanager.NUMATopologyHint{}, topologyOptions)
if err != nil {
return framework.AsStatus(err)
}
Expand All @@ -336,9 +337,7 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p
return nil
}

func (p *Plugin) filterAmplifiedCPUs(state *preFilterState, nodeInfo *framework.NodeInfo) *framework.Status {
quantity := state.requests[corev1.ResourceCPU]
podRequestMilliCPU := quantity.MilliValue()
func (p *Plugin) filterAmplifiedCPUs(podRequestMilliCPU int64, nodeInfo *framework.NodeInfo, requestCPUBind bool) *framework.Status {
if podRequestMilliCPU == 0 {
return nil
}
Expand All @@ -352,16 +351,14 @@ func (p *Plugin) filterAmplifiedCPUs(state *preFilterState, nodeInfo *framework.
return nil
}

if state.requestCPUBind {
if requestCPUBind {
podRequestMilliCPU = extension.Amplify(podRequestMilliCPU, cpuAmplificationRatio)
}

// TODO(joseph): Reservations and preemption should be considered here.
_, allocated, _ := p.resourceManager.GetAvailableCPUs(node.Name, cpuset.CPUSet{})
_, allocated, err := p.resourceManager.GetAvailableCPUs(node.Name, cpuset.CPUSet{})
if err != nil {
if err.Error() != ErrNotFoundCPUTopology {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
}
return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
}
allocatedMilliCPU := int64(allocated.CPUs().Size() * 1000)
requestedMilliCPU := nodeInfo.Requested.MilliCPU
Expand All @@ -380,31 +377,35 @@ func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState,
if !status.IsSuccess() {
return status
}
if state.skip {
return nil
}

nodeInfo, err := p.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
}
node := nodeInfo.Node()
topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name)
nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy)
numaTopologyPolicy := getNUMATopologyPolicy(node.Labels, topologyOptions.NUMATopologyPolicy)

if skipTheNode(state, numaTopologyPolicy) {
requestCPUBind, status := requestCPUBind(state, nodeCPUBindPolicy)
if !status.IsSuccess() {
return status
}
if !requestCPUBind && numaTopologyPolicy == extension.NUMATopologyPolicyNone {
return nil
}

if state.requestCPUBind {
if topologyOptions.CPUTopology == nil {
return framework.NewStatus(framework.Error, ErrNotFoundCPUTopology)
}
if requestCPUBind {
if !topologyOptions.CPUTopology.IsValid() {
return framework.NewStatus(framework.Error, ErrInvalidCPUTopology)
}
}

store := topologymanager.GetStore(cycleState)
affinity := store.GetAffinity(nodeName)
resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, affinity, topologyOptions)
resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, requestCPUBind, affinity, topologyOptions)
if err != nil {
return framework.AsStatus(err)
}
Expand Down Expand Up @@ -440,12 +441,24 @@ func (p *Plugin) preBindObject(ctx context.Context, cycleState *framework.CycleS
if !status.IsSuccess() {
return status
}
if state.allocation == nil {
if state.skip || state.allocation == nil {
return nil
}

if state.requestCPUBind {
if err := appendResourceSpecIfMissed(object, state); err != nil {
nodeInfo, err := p.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
}
node := nodeInfo.Node()
topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name)
nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy)
requestCPUBind, status := requestCPUBind(state, nodeCPUBindPolicy)
if !status.IsSuccess() {
return status
}

if requestCPUBind {
if err := appendResourceSpecIfMissed(object, state, node, &topologyOptions); err != nil {
return framework.AsStatus(err)
}
}
Expand All @@ -465,12 +478,7 @@ func (p *Plugin) preBindObject(ctx context.Context, cycleState *framework.CycleS
return nil
}

func (p *Plugin) getResourceOptions(cycleState *framework.CycleState, state *preFilterState, node *corev1.Node, pod *corev1.Pod, affinity topologymanager.NUMATopologyHint, topologyOptions TopologyOptions) (*ResourceOptions, error) {
preferredCPUBindPolicy, err := p.getPreferredCPUBindPolicy(node, state.preferredCPUBindPolicy)
if err != nil {
return nil, err
}

func (p *Plugin) getResourceOptions(cycleState *framework.CycleState, state *preFilterState, node *corev1.Node, pod *corev1.Pod, requestCPUBind bool, affinity topologymanager.NUMATopologyHint, topologyOptions TopologyOptions) (*ResourceOptions, error) {
if err := amplifyNUMANodeResources(node, &topologyOptions); err != nil {
return nil, err
}
Expand All @@ -492,18 +500,23 @@ func (p *Plugin) getResourceOptions(cycleState *framework.CycleState, state *pre
}

requests := state.requests
if state.requestCPUBind && amplificationRatio > 1 {
if requestCPUBind && amplificationRatio > 1 {
requests = requests.DeepCopy()
extension.AmplifyResourceList(requests, topologyOptions.AmplificationRatios, corev1.ResourceCPU)
}

cpuBindPolicy, requiredCPUBindPolicy, err := getCPUBindPolicy(&topologyOptions, node, state.requiredCPUBindPolicy, state.preferredCPUBindPolicy)
if err != nil {
return nil, err
}

options := &ResourceOptions{
requests: requests,
originalRequests: state.requests,
numCPUsNeeded: state.numCPUsNeeded,
requestCPUBind: state.requestCPUBind,
requiredCPUBindPolicy: state.requiredCPUBindPolicy != "",
cpuBindPolicy: preferredCPUBindPolicy,
requestCPUBind: requestCPUBind,
requiredCPUBindPolicy: requiredCPUBindPolicy,
cpuBindPolicy: cpuBindPolicy,
cpuExclusivePolicy: state.preferredCPUExclusivePolicy,
preferredCPUs: reservationReservedCPUs,
reusableResources: reusableResources,
Expand Down Expand Up @@ -536,44 +549,30 @@ func (p *Plugin) getReservationReservedCPUs(cycleState *framework.CycleState, po
return reservedCPUs, nil
}

func appendResourceSpecIfMissed(object metav1.Object, state *preFilterState) error {
// Write back ResourceSpec annotation if LSR Pod hasn't specified CPUBindPolicy
func appendResourceSpecIfMissed(object metav1.Object, state *preFilterState, node *corev1.Node, topologyOpts *TopologyOptions) error {
cpuBindPolicy, required, err := getCPUBindPolicy(topologyOpts, node, state.requiredCPUBindPolicy, state.preferredCPUBindPolicy)
if err != nil {
return err
}

// Write back ResourceSpec annotation if the Pod hasn't specified CPUBindPolicy
shouldWriteBack := false
annotations := object.GetAnnotations()
resourceSpec, _ := extension.GetResourceSpec(annotations)
if resourceSpec.RequiredCPUBindPolicy != extension.CPUBindPolicy(state.requiredCPUBindPolicy) {
resourceSpec.RequiredCPUBindPolicy = extension.CPUBindPolicy(state.requiredCPUBindPolicy)
if required && (resourceSpec.RequiredCPUBindPolicy == "" || resourceSpec.RequiredCPUBindPolicy == extension.CPUBindPolicyDefault) {
resourceSpec.RequiredCPUBindPolicy = extension.CPUBindPolicy(cpuBindPolicy)
shouldWriteBack = true
}

preferredCPUBindPolicy := schedulingconfig.CPUBindPolicy(resourceSpec.PreferredCPUBindPolicy)
if preferredCPUBindPolicy == "" || preferredCPUBindPolicy == schedulingconfig.CPUBindPolicyDefault {
if resourceSpec.RequiredCPUBindPolicy == "" {
preferredCPUBindPolicy = state.preferredCPUBindPolicy
} else if preferredCPUBindPolicy != "" {
preferredCPUBindPolicy = state.requiredCPUBindPolicy
}
if resourceSpec.PreferredCPUBindPolicy == extension.CPUBindPolicyDefault {
resourceSpec.PreferredCPUBindPolicy = extension.CPUBindPolicy(cpuBindPolicy)
shouldWriteBack = true
}
resourceSpec.PreferredCPUBindPolicy = extension.CPUBindPolicy(preferredCPUBindPolicy)
return extension.SetResourceSpec(object, resourceSpec)
}

func (p *Plugin) getPreferredCPUBindPolicy(node *corev1.Node, preferredCPUBindPolicy schedulingconfig.CPUBindPolicy) (schedulingconfig.CPUBindPolicy, error) {
topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name)
if topologyOptions.CPUTopology == nil {
return preferredCPUBindPolicy, errors.New(ErrNotFoundCPUTopology)
}
if !topologyOptions.CPUTopology.IsValid() {
return preferredCPUBindPolicy, errors.New(ErrInvalidCPUTopology)
if resourceSpec.RequiredCPUBindPolicy == "" && resourceSpec.PreferredCPUBindPolicy == "" && cpuBindPolicy != "" {
resourceSpec.PreferredCPUBindPolicy = extension.CPUBindPolicy(cpuBindPolicy)
shouldWriteBack = true
}

kubeletCPUPolicy := topologyOptions.Policy
nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, kubeletCPUPolicy)
switch nodeCPUBindPolicy {
default:
case extension.NodeCPUBindPolicyNone:
case extension.NodeCPUBindPolicySpreadByPCPUs:
preferredCPUBindPolicy = schedulingconfig.CPUBindPolicySpreadByPCPUs
case extension.NodeCPUBindPolicyFullPCPUsOnly:
preferredCPUBindPolicy = schedulingconfig.CPUBindPolicyFullPCPUs
if !shouldWriteBack {
return nil
}
return preferredCPUBindPolicy, nil
return extension.SetResourceSpec(object, resourceSpec)
}
Loading
Loading