Skip to content

Commit

Permalink
scheduler: improve NodeNUMAResource handling node cpu bind policy
Browse files Browse the repository at this point in the history
Signed-off-by: Joseph <joseph.t.lee@outlook.com>
  • Loading branch information
eahydra committed Feb 4, 2024
1 parent 93f2bc2 commit a2f2107
Show file tree
Hide file tree
Showing 10 changed files with 408 additions and 182 deletions.
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/nodenumaresource/cpu_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (b *CPUTopologyBuilder) Result() *CPUTopology {

// IsValid checks if the topology is valid
func (topo *CPUTopology) IsValid() bool {
return topo.NumSockets != 0 && topo.NumNodes != 0 && topo.NumCores != 0 && topo.NumCPUs != 0
return topo != nil && topo.NumSockets != 0 && topo.NumNodes != 0 && topo.NumCores != 0 && topo.NumCPUs != 0
}

// CPUsPerCore returns the number of logical CPUs are associated with each core.
Expand Down
175 changes: 86 additions & 89 deletions pkg/scheduler/plugins/nodenumaresource/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package nodenumaresource

import (
"context"
"errors"
"fmt"

nrtv1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
Expand Down Expand Up @@ -47,10 +46,10 @@ const (
)

const (
ErrNotFoundCPUTopology = "node(s) CPU Topology not found"
ErrInvalidRequestedCPUs = "the requested CPUs must be integer"
ErrInvalidCPUTopology = "node(s) invalid CPU Topology"
ErrSMTAlignmentError = "node(s) requested cpus not multiple cpus per core"
ErrRequiredFullPCPUsPolicy = "node(s) required FullPCPUs policy"
ErrCPUBindPolicyConflict = "node(s) cpu bind policy conflicts with pod's required cpu bind policy"
ErrInvalidCPUAmplificationRatio = "node(s) invalid CPU amplification ratio"
ErrInsufficientAmplifiedCPU = "Insufficient amplified cpu"
)
Expand Down Expand Up @@ -230,9 +229,11 @@ func (p *Plugin) PreFilter(ctx context.Context, cycleState *framework.CycleState
})
return nil, nil
}
requestedCPU := requests.Cpu().MilliValue()
state := &preFilterState{
requestCPUBind: false,
requests: requests,
numCPUsNeeded: int(requestedCPU / 1000),
}
if AllowUseCPUSet(pod) {
cpuBindPolicy := schedulingconfig.CPUBindPolicy(resourceSpec.PreferredCPUBindPolicy)
Expand All @@ -249,17 +250,15 @@ func (p *Plugin) PreFilter(ctx context.Context, cycleState *framework.CycleState

if cpuBindPolicy == schedulingconfig.CPUBindPolicyFullPCPUs ||
cpuBindPolicy == schedulingconfig.CPUBindPolicySpreadByPCPUs {
requestedCPU := requests.Cpu().MilliValue()
if requestedCPU%1000 != 0 {
return nil, framework.NewStatus(framework.Error, "the requested CPUs must be integer")
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrInvalidRequestedCPUs)
}

if requestedCPU > 0 {
state.requestCPUBind = true
state.requiredCPUBindPolicy = requiredCPUBindPolicy
state.preferredCPUBindPolicy = cpuBindPolicy
state.preferredCPUExclusivePolicy = resourceSpec.PreferredCPUExclusivePolicy
state.numCPUsNeeded = int(requestedCPU / 1000)
}
}
}
Expand All @@ -278,47 +277,49 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p
if !status.IsSuccess() {
return status
}

if status := p.filterAmplifiedCPUs(state, nodeInfo); !status.IsSuccess() {
return status
if state.skip {
return nil
}

node := nodeInfo.Node()
topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name)
nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy)
numaTopologyPolicy := getNUMATopologyPolicy(node.Labels, topologyOptions.NUMATopologyPolicy)

if skipTheNode(state, numaTopologyPolicy) {
return nil
requestCPUBind, status := requestCPUBind(state, nodeCPUBindPolicy)
if !status.IsSuccess() {
return status
}

if state.requestCPUBind {
if topologyOptions.CPUTopology == nil {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrNotFoundCPUTopology)
}
if status := p.filterAmplifiedCPUs(state.requests.Cpu().MilliValue(), nodeInfo, requestCPUBind); !status.IsSuccess() {
return status
}

if requestCPUBind {
// It's necessary to force node to have NodeResourceTopology and CPUTopology
// We must satisfy the user's CPUSet request. Even if some nodes in the cluster have resources,
// they cannot be allocated without valid CPU topology.
if !topologyOptions.CPUTopology.IsValid() {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrInvalidCPUTopology)
}
nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy)
if nodeCPUBindPolicy == extension.NodeCPUBindPolicyFullPCPUsOnly ||
state.requiredCPUBindPolicy == schedulingconfig.CPUBindPolicyFullPCPUs {

requiredCPUBindPolicy := state.requiredCPUBindPolicy
if nodeCPUBindPolicy == extension.NodeCPUBindPolicyFullPCPUsOnly {
requiredCPUBindPolicy = schedulingconfig.CPUBindPolicyFullPCPUs
} else if nodeCPUBindPolicy == extension.NodeCPUBindPolicySpreadByPCPUs {
requiredCPUBindPolicy = schedulingconfig.CPUBindPolicySpreadByPCPUs
}
if state.requiredCPUBindPolicy != "" && state.requiredCPUBindPolicy != requiredCPUBindPolicy {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrCPUBindPolicyConflict)
}

if requiredCPUBindPolicy == schedulingconfig.CPUBindPolicyFullPCPUs {
if state.numCPUsNeeded%topologyOptions.CPUTopology.CPUsPerCore() != 0 {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrSMTAlignmentError)
}

if nodeCPUBindPolicy == extension.NodeCPUBindPolicyFullPCPUsOnly {
if (state.requiredCPUBindPolicy != "" && state.requiredCPUBindPolicy != schedulingconfig.CPUBindPolicyFullPCPUs) ||
(state.preferredCPUBindPolicy != "" && state.preferredCPUBindPolicy != schedulingconfig.CPUBindPolicyFullPCPUs) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy)
}
}
}

if state.requiredCPUBindPolicy != "" && numaTopologyPolicy == extension.NUMATopologyPolicyNone {
resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, topologymanager.NUMATopologyHint{}, topologyOptions)
if requiredCPUBindPolicy != "" && numaTopologyPolicy == extension.NUMATopologyPolicyNone {
resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, requestCPUBind, topologymanager.NUMATopologyHint{}, topologyOptions)
if err != nil {
return framework.AsStatus(err)
}
Expand All @@ -336,9 +337,7 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p
return nil
}

func (p *Plugin) filterAmplifiedCPUs(state *preFilterState, nodeInfo *framework.NodeInfo) *framework.Status {
quantity := state.requests[corev1.ResourceCPU]
podRequestMilliCPU := quantity.MilliValue()
func (p *Plugin) filterAmplifiedCPUs(podRequestMilliCPU int64, nodeInfo *framework.NodeInfo, requestCPUBind bool) *framework.Status {
if podRequestMilliCPU == 0 {
return nil
}
Expand All @@ -352,16 +351,14 @@ func (p *Plugin) filterAmplifiedCPUs(state *preFilterState, nodeInfo *framework.
return nil
}

if state.requestCPUBind {
if requestCPUBind {
podRequestMilliCPU = extension.Amplify(podRequestMilliCPU, cpuAmplificationRatio)
}

// TODO(joseph): Reservations and preemption should be considered here.
_, allocated, _ := p.resourceManager.GetAvailableCPUs(node.Name, cpuset.CPUSet{})
_, allocated, err := p.resourceManager.GetAvailableCPUs(node.Name, cpuset.CPUSet{})
if err != nil {
if err.Error() != ErrNotFoundCPUTopology {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
}
return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
}
allocatedMilliCPU := int64(allocated.CPUs().Size() * 1000)
requestedMilliCPU := nodeInfo.Requested.MilliCPU
Expand All @@ -380,31 +377,35 @@ func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState,
if !status.IsSuccess() {
return status
}
if state.skip {
return nil
}

nodeInfo, err := p.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
}
node := nodeInfo.Node()
topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name)
nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy)
numaTopologyPolicy := getNUMATopologyPolicy(node.Labels, topologyOptions.NUMATopologyPolicy)

if skipTheNode(state, numaTopologyPolicy) {
requestCPUBind, status := requestCPUBind(state, nodeCPUBindPolicy)
if !status.IsSuccess() {
return status
}
if !requestCPUBind && numaTopologyPolicy == extension.NUMATopologyPolicyNone {
return nil
}

if state.requestCPUBind {
if topologyOptions.CPUTopology == nil {
return framework.NewStatus(framework.Error, ErrNotFoundCPUTopology)
}
if requestCPUBind {
if !topologyOptions.CPUTopology.IsValid() {
return framework.NewStatus(framework.Error, ErrInvalidCPUTopology)
}
}

store := topologymanager.GetStore(cycleState)
affinity := store.GetAffinity(nodeName)
resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, affinity, topologyOptions)
resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, requestCPUBind, affinity, topologyOptions)
if err != nil {
return framework.AsStatus(err)
}
Expand Down Expand Up @@ -440,12 +441,24 @@ func (p *Plugin) preBindObject(ctx context.Context, cycleState *framework.CycleS
if !status.IsSuccess() {
return status
}
if state.allocation == nil {
if state.skip || state.allocation == nil {
return nil
}

if state.requestCPUBind {
if err := appendResourceSpecIfMissed(object, state); err != nil {
nodeInfo, err := p.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
}
node := nodeInfo.Node()
topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name)
nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy)
requestCPUBind, status := requestCPUBind(state, nodeCPUBindPolicy)
if !status.IsSuccess() {
return status
}

if requestCPUBind {
if err := appendResourceSpecIfMissed(object, state, node, &topologyOptions); err != nil {
return framework.AsStatus(err)
}
}
Expand All @@ -465,12 +478,7 @@ func (p *Plugin) preBindObject(ctx context.Context, cycleState *framework.CycleS
return nil
}

func (p *Plugin) getResourceOptions(cycleState *framework.CycleState, state *preFilterState, node *corev1.Node, pod *corev1.Pod, affinity topologymanager.NUMATopologyHint, topologyOptions TopologyOptions) (*ResourceOptions, error) {
preferredCPUBindPolicy, err := p.getPreferredCPUBindPolicy(node, state.preferredCPUBindPolicy)
if err != nil {
return nil, err
}

func (p *Plugin) getResourceOptions(cycleState *framework.CycleState, state *preFilterState, node *corev1.Node, pod *corev1.Pod, requestCPUBind bool, affinity topologymanager.NUMATopologyHint, topologyOptions TopologyOptions) (*ResourceOptions, error) {
if err := amplifyNUMANodeResources(node, &topologyOptions); err != nil {
return nil, err
}
Expand All @@ -492,18 +500,23 @@ func (p *Plugin) getResourceOptions(cycleState *framework.CycleState, state *pre
}

requests := state.requests
if state.requestCPUBind && amplificationRatio > 1 {
if requestCPUBind && amplificationRatio > 1 {
requests = requests.DeepCopy()
extension.AmplifyResourceList(requests, topologyOptions.AmplificationRatios, corev1.ResourceCPU)
}

cpuBindPolicy, requiredCPUBindPolicy, err := getCPUBindPolicy(&topologyOptions, node, state.requiredCPUBindPolicy, state.preferredCPUBindPolicy)
if err != nil {
return nil, err
}

options := &ResourceOptions{
requests: requests,
originalRequests: state.requests,
numCPUsNeeded: state.numCPUsNeeded,
requestCPUBind: state.requestCPUBind,
requiredCPUBindPolicy: state.requiredCPUBindPolicy != "",
cpuBindPolicy: preferredCPUBindPolicy,
requestCPUBind: requestCPUBind,
requiredCPUBindPolicy: requiredCPUBindPolicy,
cpuBindPolicy: cpuBindPolicy,
cpuExclusivePolicy: state.preferredCPUExclusivePolicy,
preferredCPUs: reservationReservedCPUs,
reusableResources: reusableResources,
Expand Down Expand Up @@ -536,44 +549,28 @@ func (p *Plugin) getReservationReservedCPUs(cycleState *framework.CycleState, po
return reservedCPUs, nil
}

func appendResourceSpecIfMissed(object metav1.Object, state *preFilterState) error {
func appendResourceSpecIfMissed(object metav1.Object, state *preFilterState, node *corev1.Node, topologyOpts *TopologyOptions) error {
cpuBindPolicy, required, err := getCPUBindPolicy(topologyOpts, node, state.requiredCPUBindPolicy, state.preferredCPUBindPolicy)
if err != nil {
return err
}

// Write back ResourceSpec annotation if LSR Pod hasn't specified CPUBindPolicy
shouldWriteBack := false
annotations := object.GetAnnotations()
resourceSpec, _ := extension.GetResourceSpec(annotations)
if resourceSpec.RequiredCPUBindPolicy != extension.CPUBindPolicy(state.requiredCPUBindPolicy) {
resourceSpec.RequiredCPUBindPolicy = extension.CPUBindPolicy(state.requiredCPUBindPolicy)
if resourceSpec.RequiredCPUBindPolicy == extension.CPUBindPolicyDefault && required {
resourceSpec.RequiredCPUBindPolicy = extension.CPUBindPolicy(cpuBindPolicy)
shouldWriteBack = true
}

preferredCPUBindPolicy := schedulingconfig.CPUBindPolicy(resourceSpec.PreferredCPUBindPolicy)
if preferredCPUBindPolicy == "" || preferredCPUBindPolicy == schedulingconfig.CPUBindPolicyDefault {
if resourceSpec.RequiredCPUBindPolicy == "" {
preferredCPUBindPolicy = state.preferredCPUBindPolicy
} else if preferredCPUBindPolicy != "" {
preferredCPUBindPolicy = state.requiredCPUBindPolicy
}
if resourceSpec.RequiredCPUBindPolicy == "" &&
(resourceSpec.PreferredCPUBindPolicy == "" || resourceSpec.PreferredCPUBindPolicy == extension.CPUBindPolicyDefault) {
resourceSpec.PreferredCPUBindPolicy = extension.CPUBindPolicy(cpuBindPolicy)
shouldWriteBack = true
}
resourceSpec.PreferredCPUBindPolicy = extension.CPUBindPolicy(preferredCPUBindPolicy)
return extension.SetResourceSpec(object, resourceSpec)
}

func (p *Plugin) getPreferredCPUBindPolicy(node *corev1.Node, preferredCPUBindPolicy schedulingconfig.CPUBindPolicy) (schedulingconfig.CPUBindPolicy, error) {
topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name)
if topologyOptions.CPUTopology == nil {
return preferredCPUBindPolicy, errors.New(ErrNotFoundCPUTopology)
}
if !topologyOptions.CPUTopology.IsValid() {
return preferredCPUBindPolicy, errors.New(ErrInvalidCPUTopology)
}

kubeletCPUPolicy := topologyOptions.Policy
nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, kubeletCPUPolicy)
switch nodeCPUBindPolicy {
default:
case extension.NodeCPUBindPolicyNone:
case extension.NodeCPUBindPolicySpreadByPCPUs:
preferredCPUBindPolicy = schedulingconfig.CPUBindPolicySpreadByPCPUs
case extension.NodeCPUBindPolicyFullPCPUsOnly:
preferredCPUBindPolicy = schedulingconfig.CPUBindPolicyFullPCPUs
if !shouldWriteBack {
return nil
}
return preferredCPUBindPolicy, nil
return extension.SetResourceSpec(object, resourceSpec)
}
Loading

0 comments on commit a2f2107

Please sign in to comment.