From 6997a042ec86645092cd2480748dfeb218fd9a7b Mon Sep 17 00:00:00 2001 From: KunWuLuan Date: Wed, 6 Mar 2024 15:20:25 +0800 Subject: [PATCH] support numa topology policy on pod Signed-off-by: KunWuLuan --- apis/extension/numa_aware.go | 21 +++++++++ .../frameworkext/framework_extender.go | 4 +- pkg/scheduler/frameworkext/interface.go | 2 +- .../frameworkext/topologymanager/manager.go | 28 +++++++++-- .../frameworkext/topologymanager/policy.go | 19 +++++++- .../topologymanager/policy_best_effort.go | 6 ++- .../topologymanager/policy_none.go | 4 +- .../topologymanager/policy_none_test.go | 4 +- .../topologymanager/policy_restricted.go | 6 ++- .../policy_single_numa_node.go | 5 +- .../topologymanager/policy_test.go | 3 +- .../nodenumaresource/node_allocation.go | 47 +++++++++++++++++++ .../plugins/nodenumaresource/plugin.go | 19 +++++++- .../plugins/nodenumaresource/scoring.go | 3 ++ .../plugins/nodenumaresource/topology_hint.go | 5 +- .../plugins/nodenumaresource/util.go | 12 +++++ 16 files changed, 166 insertions(+), 22 deletions(-) diff --git a/apis/extension/numa_aware.go b/apis/extension/numa_aware.go index 9b86414dc..9be740bc3 100644 --- a/apis/extension/numa_aware.go +++ b/apis/extension/numa_aware.go @@ -58,6 +58,12 @@ const ( // ResourceSpec describes extra attributes of the resource requirements. type ResourceSpec struct { + // NumaTopologyPolicy represents the numa topology policy when schedule pod + NumaTopologyPolicy NUMATopologyPolicy `json:"numaTopologyPolicy,omitempty"` + // SingleNUMANodeExclusive represents whether a Pod that will use a single NUMA node/multiple NUMA nodes + // on a NUMA node can be scheduled to use the NUMA node when another Pod that uses multiple NUMA nodes/a single NUMA node + // is already running on the same node. + SingleNUMANodeExclusive NUMATopologyExclusive `json:"singleNUMANodeExclusive,omitempty"` // RequiredCPUBindPolicy indicates that the CPU is allocated strictly // according to the specified CPUBindPolicy, otherwise the scheduling fails RequiredCPUBindPolicy CPUBindPolicy `json:"requiredCPUBindPolicy,omitempty"` @@ -135,6 +141,21 @@ const ( NodeNUMAAllocateStrategyMostAllocated = NUMAMostAllocated ) +type NUMATopologyExclusive string + +const ( + NUMATopologyExclusivePreferred NUMATopologyExclusive = "Preferred" + NUMATopologyExclusiveRequired NUMATopologyExclusive = "Required" +) + +type NUMANodeStatus string + +const ( + NUMANodeStatusIdle NUMANodeStatus = "idle" + NUMANodeStatusShared NUMANodeStatus = "shared" + NUMANodeStatusSingle NUMANodeStatus = "single" +) + type NUMATopologyPolicy string const ( diff --git a/pkg/scheduler/frameworkext/framework_extender.go b/pkg/scheduler/frameworkext/framework_extender.go index 7ee2df40a..f00d24c94 100644 --- a/pkg/scheduler/frameworkext/framework_extender.go +++ b/pkg/scheduler/frameworkext/framework_extender.go @@ -445,8 +445,8 @@ func (ext *frameworkExtenderImpl) ForgetPod(pod *corev1.Pod) error { return nil } -func (ext *frameworkExtenderImpl) RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy) *framework.Status { - return ext.topologyManager.Admit(ctx, cycleState, pod, nodeName, numaNodes, policyType) +func (ext *frameworkExtenderImpl) RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) *framework.Status { + return ext.topologyManager.Admit(ctx, cycleState, pod, nodeName, numaNodes, policyType, exclusivePolicy, allNUMANodeStatus) } func (ext *frameworkExtenderImpl) GetNUMATopologyHintProvider() []topologymanager.NUMATopologyHintProvider { diff --git a/pkg/scheduler/frameworkext/interface.go b/pkg/scheduler/frameworkext/interface.go index 89aea0a53..408298d26 100644 --- a/pkg/scheduler/frameworkext/interface.go +++ b/pkg/scheduler/frameworkext/interface.go @@ -64,7 +64,7 @@ type FrameworkExtender interface { RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status RunReservationScorePlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfos []*ReservationInfo, nodeName string) (PluginToReservationScores, *framework.Status) - RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy) *framework.Status + RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) *framework.Status RunResizePod(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status } diff --git a/pkg/scheduler/frameworkext/topologymanager/manager.go b/pkg/scheduler/frameworkext/topologymanager/manager.go index 2812d407f..1214b8263 100644 --- a/pkg/scheduler/frameworkext/topologymanager/manager.go +++ b/pkg/scheduler/frameworkext/topologymanager/manager.go @@ -27,7 +27,7 @@ import ( ) type Interface interface { - Admit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy) *framework.Status + Admit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) *framework.Status } type NUMATopologyHintProvider interface { @@ -55,7 +55,7 @@ func New(hintProviderFactory NUMATopologyHintProviderFactory) Interface { } } -func (m *topologyManager) Admit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy) *framework.Status { +func (m *topologyManager) Admit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) *framework.Status { s, err := cycleState.Read(affinityStateKey) if err != nil { return framework.AsStatus(err) @@ -64,7 +64,7 @@ func (m *topologyManager) Admit(ctx context.Context, cycleState *framework.Cycle policy := createNUMATopologyPolicy(policyType, numaNodes) - bestHint, admit := m.calculateAffinity(ctx, cycleState, policy, pod, nodeName) + bestHint, admit := m.calculateAffinity(ctx, cycleState, policy, pod, nodeName, exclusivePolicy, allNUMANodeStatus) klog.V(5).Infof("Best TopologyHint for (pod: %v): %v on node: %v", klog.KObj(pod), bestHint, nodeName) if !admit { return framework.NewStatus(framework.Unschedulable, "node(s) NUMA Topology affinity error") @@ -79,9 +79,27 @@ func (m *topologyManager) Admit(ctx context.Context, cycleState *framework.Cycle return nil } -func (m *topologyManager) calculateAffinity(ctx context.Context, cycleState *framework.CycleState, policy Policy, pod *corev1.Pod, nodeName string) (NUMATopologyHint, bool) { +func (m *topologyManager) calculateAffinity(ctx context.Context, cycleState *framework.CycleState, policy Policy, pod *corev1.Pod, nodeName string, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) (NUMATopologyHint, bool) { providersHints := m.accumulateProvidersHints(ctx, cycleState, pod, nodeName) - bestHint, admit := policy.Merge(providersHints) + bestHint, admit := policy.Merge(providersHints, exclusivePolicy, allNUMANodeStatus) + if exclusivePolicy == apiext.NUMATopologyExclusiveRequired { + if bestHint.NUMANodeAffinity.Count() > 1 { + // we should make sure no numa is in single state + for _, nodeid := range bestHint.NUMANodeAffinity.GetBits() { + if allNUMANodeStatus[nodeid] == apiext.NUMANodeStatusSingle { + klog.V(5).Infof("bestHint violated the exclusivePolicy requirement: bestHint: %v, policy: %v, numaStatus: %v, nodeName: %v, pod: %v", + bestHint, exclusivePolicy, allNUMANodeStatus, nodeName, pod.Name) + return bestHint, false + } + } + } else { + if allNUMANodeStatus[bestHint.NUMANodeAffinity.GetBits()[0]] == apiext.NUMANodeStatusShared { + klog.V(5).Infof("bestHint violated the exclusivePolicy requirement: bestHint: %v, policy: %v, numaStatus: %v, nodeName: %v, pod: %v", + bestHint, exclusivePolicy, allNUMANodeStatus, nodeName, pod.Name) + return bestHint, false + } + } + } klog.V(5).Infof("PodTopologyHint: %v", bestHint) return bestHint, admit } diff --git a/pkg/scheduler/frameworkext/topologymanager/policy.go b/pkg/scheduler/frameworkext/topologymanager/policy.go index ac1e2f763..48f02ca06 100644 --- a/pkg/scheduler/frameworkext/topologymanager/policy.go +++ b/pkg/scheduler/frameworkext/topologymanager/policy.go @@ -20,6 +20,7 @@ package topologymanager import ( "k8s.io/klog/v2" + apiext "github.com/koordinator-sh/koordinator/apis/extension" "github.com/koordinator-sh/koordinator/pkg/util/bitmask" ) @@ -27,7 +28,7 @@ type Policy interface { // Name returns Policy Name Name() string // Merge returns a merged NUMATopologyHint based on input from hint providers - Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) + Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) (NUMATopologyHint, bool) } // NUMATopologyHint is a struct containing the NUMANodeAffinity for a Container @@ -124,7 +125,7 @@ func filterProvidersHints(providersHints []map[string][]NUMATopologyHint) [][]NU return allProviderHints } -func mergeFilteredHints(numaNodes []int, filteredHints [][]NUMATopologyHint) NUMATopologyHint { +func mergeFilteredHints(numaNodes []int, filteredHints [][]NUMATopologyHint, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) NUMATopologyHint { // Set the default affinity as an any-numa affinity containing the list // of NUMA Nodes available on this machine. defaultAffinity, _ := bitmask.NewBitMask(numaNodes...) @@ -142,6 +143,20 @@ func mergeFilteredHints(numaNodes []int, filteredHints [][]NUMATopologyHint) NUM if mergedHint.NUMANodeAffinity.Count() == 0 { return } + if exclusivePolicy == apiext.NUMATopologyExclusiveRequired { + if mergedHint.NUMANodeAffinity.Count() > 1 { + // we should make sure no numa is in single state + for _, nodeid := range mergedHint.NUMANodeAffinity.GetBits() { + if allNUMANodeStatus[nodeid] == apiext.NUMANodeStatusSingle { + return + } + } + } else { + if allNUMANodeStatus[mergedHint.NUMANodeAffinity.GetBits()[0]] == apiext.NUMANodeStatusShared { + return + } + } + } for _, v := range permutation { if v.NUMANodeAffinity != nil && mergedHint.NUMANodeAffinity.IsEqual(v.NUMANodeAffinity) { diff --git a/pkg/scheduler/frameworkext/topologymanager/policy_best_effort.go b/pkg/scheduler/frameworkext/topologymanager/policy_best_effort.go index c128b1fbd..928965700 100644 --- a/pkg/scheduler/frameworkext/topologymanager/policy_best_effort.go +++ b/pkg/scheduler/frameworkext/topologymanager/policy_best_effort.go @@ -17,6 +17,8 @@ limitations under the License. package topologymanager +import apiext "github.com/koordinator-sh/koordinator/apis/extension" + type bestEffortPolicy struct { //List of NUMA Nodes available on the underlying machine numaNodes []int @@ -40,9 +42,9 @@ func (p *bestEffortPolicy) canAdmitPodResult(hint *NUMATopologyHint) bool { return true } -func (p *bestEffortPolicy) Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) { +func (p *bestEffortPolicy) Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) (NUMATopologyHint, bool) { filteredProvidersHints := filterProvidersHints(providersHints) - bestHint := mergeFilteredHints(p.numaNodes, filteredProvidersHints) + bestHint := mergeFilteredHints(p.numaNodes, filteredProvidersHints, exclusivePolicy, allNUMANodeStatus) admit := p.canAdmitPodResult(&bestHint) return bestHint, admit } diff --git a/pkg/scheduler/frameworkext/topologymanager/policy_none.go b/pkg/scheduler/frameworkext/topologymanager/policy_none.go index 0fd264db6..6de365c28 100644 --- a/pkg/scheduler/frameworkext/topologymanager/policy_none.go +++ b/pkg/scheduler/frameworkext/topologymanager/policy_none.go @@ -17,6 +17,8 @@ limitations under the License. package topologymanager +import apiext "github.com/koordinator-sh/koordinator/apis/extension" + type nonePolicy struct{} var _ Policy = &nonePolicy{} @@ -37,6 +39,6 @@ func (p *nonePolicy) canAdmitPodResult(hint *NUMATopologyHint) bool { return true } -func (p *nonePolicy) Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) { +func (p *nonePolicy) Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) (NUMATopologyHint, bool) { return NUMATopologyHint{}, p.canAdmitPodResult(nil) } diff --git a/pkg/scheduler/frameworkext/topologymanager/policy_none_test.go b/pkg/scheduler/frameworkext/topologymanager/policy_none_test.go index 33482427d..7850a75d3 100644 --- a/pkg/scheduler/frameworkext/topologymanager/policy_none_test.go +++ b/pkg/scheduler/frameworkext/topologymanager/policy_none_test.go @@ -19,6 +19,8 @@ package topologymanager import ( "testing" + + apiext "github.com/koordinator-sh/koordinator/apis/extension" ) func TestPolicyNoneName(t *testing.T) { @@ -104,7 +106,7 @@ func TestPolicyNoneMerge(t *testing.T) { for _, tc := range tcases { policy := NewNonePolicy() - result, admit := policy.Merge(tc.providersHints) + result, admit := policy.Merge(tc.providersHints, apiext.NUMATopologyExclusivePreferred, []apiext.NUMANodeStatus{}) if !result.IsEqual(tc.expectedHint) || admit != tc.expectedAdmit { t.Errorf("Test Case: %s: Expected merge hint to be %v, got %v", tc.name, tc.expectedHint, result) } diff --git a/pkg/scheduler/frameworkext/topologymanager/policy_restricted.go b/pkg/scheduler/frameworkext/topologymanager/policy_restricted.go index db4a7c9ef..a7bfc1e55 100644 --- a/pkg/scheduler/frameworkext/topologymanager/policy_restricted.go +++ b/pkg/scheduler/frameworkext/topologymanager/policy_restricted.go @@ -17,6 +17,8 @@ limitations under the License. package topologymanager +import apiext "github.com/koordinator-sh/koordinator/apis/extension" + type restrictedPolicy struct { bestEffortPolicy } @@ -39,9 +41,9 @@ func (p *restrictedPolicy) canAdmitPodResult(hint *NUMATopologyHint) bool { return hint.Preferred } -func (p *restrictedPolicy) Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) { +func (p *restrictedPolicy) Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) (NUMATopologyHint, bool) { filteredHints := filterProvidersHints(providersHints) - hint := mergeFilteredHints(p.numaNodes, filteredHints) + hint := mergeFilteredHints(p.numaNodes, filteredHints, exclusivePolicy, allNUMANodeStatus) admit := p.canAdmitPodResult(&hint) return hint, admit } diff --git a/pkg/scheduler/frameworkext/topologymanager/policy_single_numa_node.go b/pkg/scheduler/frameworkext/topologymanager/policy_single_numa_node.go index 7c8e853af..194594915 100644 --- a/pkg/scheduler/frameworkext/topologymanager/policy_single_numa_node.go +++ b/pkg/scheduler/frameworkext/topologymanager/policy_single_numa_node.go @@ -18,6 +18,7 @@ limitations under the License. package topologymanager import ( + apiext "github.com/koordinator-sh/koordinator/apis/extension" "github.com/koordinator-sh/koordinator/pkg/util/bitmask" ) @@ -62,11 +63,11 @@ func filterSingleNumaHints(allResourcesHints [][]NUMATopologyHint) [][]NUMATopol return filteredResourcesHints } -func (p *singleNumaNodePolicy) Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) { +func (p *singleNumaNodePolicy) Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) (NUMATopologyHint, bool) { filteredHints := filterProvidersHints(providersHints) // Filter to only include don't care and hints with a single NUMA node. singleNumaHints := filterSingleNumaHints(filteredHints) - bestHint := mergeFilteredHints(p.numaNodes, singleNumaHints) + bestHint := mergeFilteredHints(p.numaNodes, singleNumaHints, exclusivePolicy, allNUMANodeStatus) defaultAffinity, _ := bitmask.NewBitMask(p.numaNodes...) if bestHint.NUMANodeAffinity.IsEqual(defaultAffinity) { diff --git a/pkg/scheduler/frameworkext/topologymanager/policy_test.go b/pkg/scheduler/frameworkext/topologymanager/policy_test.go index 34bcf8faf..a9bc90fa7 100644 --- a/pkg/scheduler/frameworkext/topologymanager/policy_test.go +++ b/pkg/scheduler/frameworkext/topologymanager/policy_test.go @@ -26,6 +26,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/scheduler/framework" + "github.com/koordinator-sh/koordinator/apis/extension" "github.com/koordinator-sh/koordinator/pkg/util/bitmask" ) @@ -891,7 +892,7 @@ func testPolicyMerge(policy Policy, tcases []policyMergeTestCase, t *testing.T) providersHints = append(providersHints, hints) } - actual, _ := policy.Merge(providersHints) + actual, _ := policy.Merge(providersHints, extension.NUMATopologyExclusivePreferred, []extension.NUMANodeStatus{}) if !reflect.DeepEqual(actual, tc.expected) { t.Errorf("%v: Expected Topology Hint to be %v, got %v:", tc.name, tc.expected, actual) } diff --git a/pkg/scheduler/plugins/nodenumaresource/node_allocation.go b/pkg/scheduler/plugins/nodenumaresource/node_allocation.go index ae4a2515f..a72ff0709 100644 --- a/pkg/scheduler/plugins/nodenumaresource/node_allocation.go +++ b/pkg/scheduler/plugins/nodenumaresource/node_allocation.go @@ -22,6 +22,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" quotav1 "k8s.io/apiserver/pkg/quota/v1" "github.com/koordinator-sh/koordinator/apis/extension" @@ -35,6 +36,8 @@ type NodeAllocation struct { allocatedPods map[types.UID]PodAllocation allocatedCPUs CPUDetails allocatedResources map[int]*NUMANodeResource + sharedNode map[int]sets.String + singleNUMANode map[int]sets.String } type PodAllocation struct { @@ -46,12 +49,32 @@ type PodAllocation struct { NUMANodeResources []NUMANodeResource `json:"numaNodeResources,omitempty"` } +func (n *NodeAllocation) GetAllNUMANodeStatus(numaNodes int) []extension.NUMANodeStatus { + status := make([]extension.NUMANodeStatus, numaNodes) + for i := 0; i < numaNodes; i++ { + status = append(status, n.NUMANodeSharedStatus(i)) + } + return status +} + +func (n *NodeAllocation) NUMANodeSharedStatus(nodeid int) extension.NUMANodeStatus { + if len(n.singleNUMANode[nodeid]) == 0 && len(n.sharedNode[nodeid]) == 0 { + return extension.NUMANodeStatusIdle + } + if len(n.singleNUMANode[nodeid]) > 0 && len(n.sharedNode[nodeid]) == 0 { + return extension.NUMANodeStatusSingle + } + return extension.NUMANodeStatusShared +} + func NewNodeAllocation(nodeName string) *NodeAllocation { return &NodeAllocation{ nodeName: nodeName, allocatedPods: map[types.UID]PodAllocation{}, allocatedCPUs: NewCPUDetails(), allocatedResources: map[int]*NUMANodeResource{}, + sharedNode: map[int]sets.String{}, + singleNUMANode: map[int]sets.String{}, } } @@ -78,6 +101,7 @@ func (n *NodeAllocation) addPodAllocation(request *PodAllocation, cpuTopology *C return } n.allocatedPods[request.UID] = *request + usedNUMA := sets.NewInt() for _, cpuID := range request.CPUSet.ToSliceNoSort() { cpuInfo, ok := n.allocatedCPUs[cpuID] @@ -87,6 +111,23 @@ func (n *NodeAllocation) addPodAllocation(request *PodAllocation, cpuTopology *C cpuInfo.ExclusivePolicy = request.CPUExclusivePolicy cpuInfo.RefCount++ n.allocatedCPUs[cpuID] = cpuInfo + usedNUMA.Insert(cpuInfo.NodeID) + } + if len(usedNUMA) > 1 { + for ni := range usedNUMA { + if ps := n.sharedNode[ni]; ps == nil { + n.sharedNode[ni] = sets.NewString(string(request.UID)) + } else { + n.sharedNode[ni].Insert(string(request.UID)) + } + } + } else if len(usedNUMA) == 1 { + ni := usedNUMA.UnsortedList()[0] + if ps := n.singleNUMANode[ni]; ps == nil { + n.singleNUMANode[ni] = sets.NewString(string(request.UID)) + } else { + n.singleNUMANode[ni].Insert(string(request.UID)) + } } for nodeID, numaNodeRes := range request.NUMANodeResources { @@ -109,6 +150,7 @@ func (n *NodeAllocation) release(podUID types.UID) { } delete(n.allocatedPods, podUID) + usedNUMA := sets.NewInt() for _, cpuID := range request.CPUSet.ToSliceNoSort() { cpuInfo, ok := n.allocatedCPUs[cpuID] if !ok { @@ -120,6 +162,11 @@ func (n *NodeAllocation) release(podUID types.UID) { } else { n.allocatedCPUs[cpuID] = cpuInfo } + usedNUMA.Insert(cpuInfo.NodeID) + } + for ni := range usedNUMA { + delete(n.sharedNode[ni], string(podUID)) + delete(n.singleNUMANode[ni], string(podUID)) } for _, numaNodeRes := range request.NUMANodeResources { diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin.go b/pkg/scheduler/plugins/nodenumaresource/plugin.go index 92f4c50fc..3d37644c0 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin.go +++ b/pkg/scheduler/plugins/nodenumaresource/plugin.go @@ -46,6 +46,7 @@ const ( ) const ( + ErrNotMatchNUMATopology = "node(s) NUMA Topology policy not match" ErrInvalidRequestedCPUs = "the requested CPUs must be integer" ErrInvalidCPUTopology = "node(s) invalid CPU Topology" ErrSMTAlignmentError = "node(s) requested cpus not multiple cpus per core" @@ -180,6 +181,8 @@ type preFilterState struct { requiredCPUBindPolicy schedulingconfig.CPUBindPolicy preferredCPUBindPolicy schedulingconfig.CPUBindPolicy preferredCPUExclusivePolicy schedulingconfig.CPUExclusivePolicy + podNUMATopologyPolicy extension.NUMATopologyPolicy + podNUMAExclusive extension.NUMATopologyExclusive numCPUsNeeded int allocation *PodAllocation } @@ -284,7 +287,18 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p node := nodeInfo.Node() topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name) nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy) + podNUMAExclusive := state.podNUMAExclusive + podNUMATopologyPolicy := state.podNUMATopologyPolicy + // when numa topology policy is set on node, we should maintain the same behavior as before, so we only + // set default podNUMAExclusive when podNUMATopologyPolicy is not none + if podNUMAExclusive == "" && podNUMATopologyPolicy != "" { + podNUMAExclusive = extension.NUMATopologyExclusiveRequired + } numaTopologyPolicy := getNUMATopologyPolicy(node.Labels, topologyOptions.NUMATopologyPolicy) + numaTopologyPolicy, err := mergeTopologyPolicy(numaTopologyPolicy, podNUMATopologyPolicy) + if err != nil { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrNotMatchNUMATopology) + } requestCPUBind, status := requestCPUBind(state, nodeCPUBindPolicy) if !status.IsSuccess() { return status @@ -331,7 +345,7 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p } if numaTopologyPolicy != extension.NUMATopologyPolicyNone { - return p.FilterByNUMANode(ctx, cycleState, pod, node.Name, numaTopologyPolicy, topologyOptions) + return p.FilterByNUMANode(ctx, cycleState, pod, node.Name, numaTopologyPolicy, podNUMAExclusive, topologyOptions) } return nil @@ -388,7 +402,10 @@ func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, node := nodeInfo.Node() topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name) nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy) + podNUMATopologyPolicy := state.podNUMATopologyPolicy numaTopologyPolicy := getNUMATopologyPolicy(node.Labels, topologyOptions.NUMATopologyPolicy) + // we have check in filter, so we will not get error in reserve + numaTopologyPolicy, _ = mergeTopologyPolicy(numaTopologyPolicy, podNUMATopologyPolicy) requestCPUBind, status := requestCPUBind(state, nodeCPUBindPolicy) if !status.IsSuccess() { return status diff --git a/pkg/scheduler/plugins/nodenumaresource/scoring.go b/pkg/scheduler/plugins/nodenumaresource/scoring.go index f34614c57..6f74a2a00 100644 --- a/pkg/scheduler/plugins/nodenumaresource/scoring.go +++ b/pkg/scheduler/plugins/nodenumaresource/scoring.go @@ -68,7 +68,10 @@ func (p *Plugin) Score(ctx context.Context, cycleState *framework.CycleState, po node := nodeInfo.Node() topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name) nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy) + podNUMATopologyPolicy := state.podNUMATopologyPolicy numaTopologyPolicy := getNUMATopologyPolicy(node.Labels, topologyOptions.NUMATopologyPolicy) + // we have check in filter, so we will not get error in reserve + numaTopologyPolicy, _ = mergeTopologyPolicy(numaTopologyPolicy, podNUMATopologyPolicy) requestCPUBind, status := requestCPUBind(state, nodeCPUBindPolicy) if !status.IsSuccess() { return 0, status diff --git a/pkg/scheduler/plugins/nodenumaresource/topology_hint.go b/pkg/scheduler/plugins/nodenumaresource/topology_hint.go index d09aba88a..61ba18806 100644 --- a/pkg/scheduler/plugins/nodenumaresource/topology_hint.go +++ b/pkg/scheduler/plugins/nodenumaresource/topology_hint.go @@ -27,7 +27,7 @@ import ( "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext/topologymanager" ) -func (p *Plugin) FilterByNUMANode(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, policyType apiext.NUMATopologyPolicy, topologyOptions TopologyOptions) *framework.Status { +func (p *Plugin) FilterByNUMANode(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NUMATopologyExclusive, topologyOptions TopologyOptions) *framework.Status { if policyType == apiext.NUMATopologyPolicyNone { return nil } @@ -35,7 +35,8 @@ func (p *Plugin) FilterByNUMANode(ctx context.Context, cycleState *framework.Cyc if len(numaNodes) == 0 { return framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) missing NUMA resources") } - return p.handle.(frameworkext.FrameworkExtender).RunNUMATopologyManagerAdmit(ctx, cycleState, pod, nodeName, numaNodes, policyType) + numaNodesStatus := p.resourceManager.GetNodeAllocation(nodeName).GetAllNUMANodeStatus(len(numaNodes)) + return p.handle.(frameworkext.FrameworkExtender).RunNUMATopologyManagerAdmit(ctx, cycleState, pod, nodeName, numaNodes, policyType, exclusivePolicy, numaNodesStatus) } func (p *Plugin) GetPodTopologyHints(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) (map[string][]topologymanager.NUMATopologyHint, *framework.Status) { diff --git a/pkg/scheduler/plugins/nodenumaresource/util.go b/pkg/scheduler/plugins/nodenumaresource/util.go index 21ed563e0..c850bf572 100644 --- a/pkg/scheduler/plugins/nodenumaresource/util.go +++ b/pkg/scheduler/plugins/nodenumaresource/util.go @@ -17,6 +17,8 @@ limitations under the License. package nodenumaresource import ( + "errors" + corev1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -49,6 +51,16 @@ func AllowUseCPUSet(pod *corev1.Pod) bool { return (qosClass == extension.QoSLSE || qosClass == extension.QoSLSR) && priorityClass == extension.PriorityProd } +func mergeTopologyPolicy(nodePolicy, podPolicy extension.NUMATopologyPolicy) (extension.NUMATopologyPolicy, error) { + if nodePolicy != "" && podPolicy != "" && podPolicy != nodePolicy { + return "", errors.New(ErrNotMatchNUMATopology) + } + if podPolicy != "" { + nodePolicy = extension.NUMATopologyPolicy(podPolicy) + } + return nodePolicy, nil +} + func getNUMATopologyPolicy(nodeLabels map[string]string, kubeletTopologyManagerPolicy extension.NUMATopologyPolicy) extension.NUMATopologyPolicy { policyType := extension.GetNodeNUMATopologyPolicy(nodeLabels) if policyType != extension.NUMATopologyPolicyNone {