diff --git a/apis/extension/numa_aware.go b/apis/extension/numa_aware.go index 9b86414dc..03d9e087b 100644 --- a/apis/extension/numa_aware.go +++ b/apis/extension/numa_aware.go @@ -26,6 +26,9 @@ import ( // Defines the pod level annotations and labels const ( + // AnnotationResourceSpec represents numa allocation API defined by Koordinator. + // The user specifies the desired numa policy by setting the annotation. + AnnotationNumaTopologySpec = SchedulingDomainPrefix + "/numa-topology-spec" // AnnotationResourceSpec represents resource allocation API defined by Koordinator. // The user specifies the desired CPU orchestration policy by setting the annotation. AnnotationResourceSpec = SchedulingDomainPrefix + "/resource-spec" @@ -67,6 +70,15 @@ type ResourceSpec struct { PreferredCPUExclusivePolicy CPUExclusivePolicy `json:"preferredCPUExclusivePolicy,omitempty"` } +type NumaTopologySpec struct { + // NUMATopologyPolicy represents the numa topology policy when schedule pod + NUMATopologyPolicy NUMATopologyPolicy `json:"numaTopologyPolicy,omitempty"` + // SingleNUMANodeExclusive represents whether a Pod that will use a single NUMA node/multiple NUMA nodes + // on a NUMA node can be scheduled to use the NUMA node when another Pod that uses multiple NUMA nodes/a single NUMA node + // is already running on the same node. + SingleNUMANodeExclusive NumaTopologyExclusive `json:"singleNUMANodeExclusive,omitempty"` +} + // ResourceStatus describes resource allocation result, such as how to bind CPU. type ResourceStatus struct { // CPUSet represents the allocated CPUs. It is Linux CPU list formatted string. @@ -135,6 +147,21 @@ const ( NodeNUMAAllocateStrategyMostAllocated = NUMAMostAllocated ) +type NumaTopologyExclusive string + +const ( + NumaTopologyExclusivePreferred NumaTopologyExclusive = "Preferred" + NumaTopologyExclusiveRequired NumaTopologyExclusive = "Required" +) + +type NumaNodeStatus string + +const ( + NumaNodeStatusIdle NumaNodeStatus = "idle" + NumaNodeStatusShared NumaNodeStatus = "shared" + NumaNodeStatusSingle NumaNodeStatus = "single" +) + type NUMATopologyPolicy string const ( @@ -187,6 +214,19 @@ type KubeletCPUManagerPolicy struct { ReservedCPUs string `json:"reservedCPUs,omitempty"` } +func GetNumaTopologySpec(annotations map[string]string) (*NumaTopologySpec, error) { + numaSpec := &NumaTopologySpec{} + data, ok := annotations[AnnotationNumaTopologySpec] + if !ok { + return numaSpec, nil + } + err := json.Unmarshal([]byte(data), numaSpec) + if err != nil { + return nil, err + } + return numaSpec, nil +} + // GetResourceSpec parses ResourceSpec from annotations func GetResourceSpec(annotations map[string]string) (*ResourceSpec, error) { resourceSpec := &ResourceSpec{} diff --git a/pkg/scheduler/frameworkext/framework_extender.go b/pkg/scheduler/frameworkext/framework_extender.go index ff61e2432..dc900c792 100644 --- a/pkg/scheduler/frameworkext/framework_extender.go +++ b/pkg/scheduler/frameworkext/framework_extender.go @@ -446,8 +446,8 @@ func (ext *frameworkExtenderImpl) ForgetPod(pod *corev1.Pod) error { return nil } -func (ext *frameworkExtenderImpl) RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy) *framework.Status { - return ext.topologyManager.Admit(ctx, cycleState, pod, nodeName, numaNodes, policyType) +func (ext *frameworkExtenderImpl) RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) *framework.Status { + return ext.topologyManager.Admit(ctx, cycleState, pod, nodeName, numaNodes, policyType, exclusivePolicy, allNUMANodeStatus) } func (ext *frameworkExtenderImpl) GetNUMATopologyHintProvider() []topologymanager.NUMATopologyHintProvider { diff --git a/pkg/scheduler/frameworkext/interface.go b/pkg/scheduler/frameworkext/interface.go index ec9f35c1a..e26dc09c3 100644 --- a/pkg/scheduler/frameworkext/interface.go +++ b/pkg/scheduler/frameworkext/interface.go @@ -64,7 +64,7 @@ type FrameworkExtender interface { RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status RunReservationScorePlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfos []*ReservationInfo, nodeName string) (PluginToReservationScores, *framework.Status) - RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy) *framework.Status + RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) *framework.Status RunResizePod(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status } diff --git a/pkg/scheduler/frameworkext/topologymanager/manager.go b/pkg/scheduler/frameworkext/topologymanager/manager.go index 2812d407f..697039de2 100644 --- a/pkg/scheduler/frameworkext/topologymanager/manager.go +++ b/pkg/scheduler/frameworkext/topologymanager/manager.go @@ -27,7 +27,7 @@ import ( ) type Interface interface { - Admit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy) *framework.Status + Admit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) *framework.Status } type NUMATopologyHintProvider interface { @@ -55,7 +55,7 @@ func New(hintProviderFactory NUMATopologyHintProviderFactory) Interface { } } -func (m *topologyManager) Admit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy) *framework.Status { +func (m *topologyManager) Admit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) *framework.Status { s, err := cycleState.Read(affinityStateKey) if err != nil { return framework.AsStatus(err) @@ -64,7 +64,7 @@ func (m *topologyManager) Admit(ctx context.Context, cycleState *framework.Cycle policy := createNUMATopologyPolicy(policyType, numaNodes) - bestHint, admit := m.calculateAffinity(ctx, cycleState, policy, pod, nodeName) + bestHint, admit := m.calculateAffinity(ctx, cycleState, policy, pod, nodeName, exclusivePolicy, allNUMANodeStatus) klog.V(5).Infof("Best TopologyHint for (pod: %v): %v on node: %v", klog.KObj(pod), bestHint, nodeName) if !admit { return framework.NewStatus(framework.Unschedulable, "node(s) NUMA Topology affinity error") @@ -79,9 +79,13 @@ func (m *topologyManager) Admit(ctx context.Context, cycleState *framework.Cycle return nil } -func (m *topologyManager) calculateAffinity(ctx context.Context, cycleState *framework.CycleState, policy Policy, pod *corev1.Pod, nodeName string) (NUMATopologyHint, bool) { +func (m *topologyManager) calculateAffinity(ctx context.Context, cycleState *framework.CycleState, policy Policy, pod *corev1.Pod, nodeName string, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) (NUMATopologyHint, bool) { providersHints := m.accumulateProvidersHints(ctx, cycleState, pod, nodeName) - bestHint, admit := policy.Merge(providersHints) + bestHint, admit := policy.Merge(providersHints, exclusivePolicy, allNUMANodeStatus) + if !checkExclusivePolicy(bestHint, exclusivePolicy, allNUMANodeStatus) { + klog.V(5).Infof("bestHint violated the exclusivePolicy requirement: bestHint: %v, policy: %v, numaStatus: %v, nodeName: %v, pod: %v", + bestHint, exclusivePolicy, allNUMANodeStatus, nodeName, pod.Name) + } klog.V(5).Infof("PodTopologyHint: %v", bestHint) return bestHint, admit } diff --git a/pkg/scheduler/frameworkext/topologymanager/policy.go b/pkg/scheduler/frameworkext/topologymanager/policy.go index 4de0c93d2..8c37847c9 100644 --- a/pkg/scheduler/frameworkext/topologymanager/policy.go +++ b/pkg/scheduler/frameworkext/topologymanager/policy.go @@ -20,6 +20,7 @@ package topologymanager import ( "k8s.io/klog/v2" + apiext "github.com/koordinator-sh/koordinator/apis/extension" "github.com/koordinator-sh/koordinator/pkg/util/bitmask" ) @@ -27,7 +28,7 @@ type Policy interface { // Name returns Policy Name Name() string // Merge returns a merged NUMATopologyHint based on input from hint providers - Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) + Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) (NUMATopologyHint, bool) } // NUMATopologyHint is a struct containing the NUMANodeAffinity for a Container @@ -62,6 +63,29 @@ func (th *NUMATopologyHint) LessThan(other NUMATopologyHint) bool { return th.NUMANodeAffinity.IsNarrowerThan(other.NUMANodeAffinity) } +// Check if the affinity match the exclusive policy, return true if match or false otherwise. +func checkExclusivePolicy(affinity NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) bool { + // check bestHint again if default hint is the best + if affinity.NUMANodeAffinity == nil { + return false + } + if exclusivePolicy == apiext.NumaTopologyExclusiveRequired { + if affinity.NUMANodeAffinity.Count() > 1 { + // we should make sure no numa is in single state + for _, nodeid := range affinity.NUMANodeAffinity.GetBits() { + if allNUMANodeStatus[nodeid] == apiext.NumaNodeStatusSingle { + return false + } + } + } else { + if allNUMANodeStatus[affinity.NUMANodeAffinity.GetBits()[0]] == apiext.NumaNodeStatusShared { + return false + } + } + } + return true +} + // Merge a TopologyHints permutation to a single hint by performing a bitwise-AND // of their affinity masks. The hint shall be preferred if all hits in the permutation // are preferred. @@ -126,7 +150,7 @@ func filterProvidersHints(providersHints []map[string][]NUMATopologyHint) [][]NU return allProviderHints } -func mergeFilteredHints(numaNodes []int, filteredHints [][]NUMATopologyHint) NUMATopologyHint { +func mergeFilteredHints(numaNodes []int, filteredHints [][]NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) NUMATopologyHint { // Set the default affinity as an any-numa affinity containing the list // of NUMA Nodes available on this machine. defaultAffinity, _ := bitmask.NewBitMask(numaNodes...) @@ -144,6 +168,9 @@ func mergeFilteredHints(numaNodes []int, filteredHints [][]NUMATopologyHint) NUM if mergedHint.NUMANodeAffinity.Count() == 0 { return } + if !checkExclusivePolicy(mergedHint, exclusivePolicy, allNUMANodeStatus) { + mergedHint.Preferred = false + } for _, v := range permutation { if v.NUMANodeAffinity != nil && mergedHint.NUMANodeAffinity.IsEqual(v.NUMANodeAffinity) { diff --git a/pkg/scheduler/frameworkext/topologymanager/policy_best_effort.go b/pkg/scheduler/frameworkext/topologymanager/policy_best_effort.go index c128b1fbd..53cc6c98e 100644 --- a/pkg/scheduler/frameworkext/topologymanager/policy_best_effort.go +++ b/pkg/scheduler/frameworkext/topologymanager/policy_best_effort.go @@ -17,6 +17,8 @@ limitations under the License. package topologymanager +import apiext "github.com/koordinator-sh/koordinator/apis/extension" + type bestEffortPolicy struct { //List of NUMA Nodes available on the underlying machine numaNodes []int @@ -40,9 +42,9 @@ func (p *bestEffortPolicy) canAdmitPodResult(hint *NUMATopologyHint) bool { return true } -func (p *bestEffortPolicy) Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) { +func (p *bestEffortPolicy) Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) (NUMATopologyHint, bool) { filteredProvidersHints := filterProvidersHints(providersHints) - bestHint := mergeFilteredHints(p.numaNodes, filteredProvidersHints) + bestHint := mergeFilteredHints(p.numaNodes, filteredProvidersHints, exclusivePolicy, allNUMANodeStatus) admit := p.canAdmitPodResult(&bestHint) return bestHint, admit } diff --git a/pkg/scheduler/frameworkext/topologymanager/policy_none.go b/pkg/scheduler/frameworkext/topologymanager/policy_none.go index 0fd264db6..ca12d555b 100644 --- a/pkg/scheduler/frameworkext/topologymanager/policy_none.go +++ b/pkg/scheduler/frameworkext/topologymanager/policy_none.go @@ -17,6 +17,8 @@ limitations under the License. package topologymanager +import apiext "github.com/koordinator-sh/koordinator/apis/extension" + type nonePolicy struct{} var _ Policy = &nonePolicy{} @@ -37,6 +39,6 @@ func (p *nonePolicy) canAdmitPodResult(hint *NUMATopologyHint) bool { return true } -func (p *nonePolicy) Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) { +func (p *nonePolicy) Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) (NUMATopologyHint, bool) { return NUMATopologyHint{}, p.canAdmitPodResult(nil) } diff --git a/pkg/scheduler/frameworkext/topologymanager/policy_none_test.go b/pkg/scheduler/frameworkext/topologymanager/policy_none_test.go index 33482427d..dc86c4149 100644 --- a/pkg/scheduler/frameworkext/topologymanager/policy_none_test.go +++ b/pkg/scheduler/frameworkext/topologymanager/policy_none_test.go @@ -19,6 +19,8 @@ package topologymanager import ( "testing" + + apiext "github.com/koordinator-sh/koordinator/apis/extension" ) func TestPolicyNoneName(t *testing.T) { @@ -104,7 +106,7 @@ func TestPolicyNoneMerge(t *testing.T) { for _, tc := range tcases { policy := NewNonePolicy() - result, admit := policy.Merge(tc.providersHints) + result, admit := policy.Merge(tc.providersHints, apiext.NumaTopologyExclusivePreferred, []apiext.NumaNodeStatus{}) if !result.IsEqual(tc.expectedHint) || admit != tc.expectedAdmit { t.Errorf("Test Case: %s: Expected merge hint to be %v, got %v", tc.name, tc.expectedHint, result) } diff --git a/pkg/scheduler/frameworkext/topologymanager/policy_restricted.go b/pkg/scheduler/frameworkext/topologymanager/policy_restricted.go index db4a7c9ef..6d489ea3a 100644 --- a/pkg/scheduler/frameworkext/topologymanager/policy_restricted.go +++ b/pkg/scheduler/frameworkext/topologymanager/policy_restricted.go @@ -17,6 +17,8 @@ limitations under the License. package topologymanager +import apiext "github.com/koordinator-sh/koordinator/apis/extension" + type restrictedPolicy struct { bestEffortPolicy } @@ -39,9 +41,9 @@ func (p *restrictedPolicy) canAdmitPodResult(hint *NUMATopologyHint) bool { return hint.Preferred } -func (p *restrictedPolicy) Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) { +func (p *restrictedPolicy) Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) (NUMATopologyHint, bool) { filteredHints := filterProvidersHints(providersHints) - hint := mergeFilteredHints(p.numaNodes, filteredHints) + hint := mergeFilteredHints(p.numaNodes, filteredHints, exclusivePolicy, allNUMANodeStatus) admit := p.canAdmitPodResult(&hint) return hint, admit } diff --git a/pkg/scheduler/frameworkext/topologymanager/policy_single_numa_node.go b/pkg/scheduler/frameworkext/topologymanager/policy_single_numa_node.go index 7c8e853af..2947bf23b 100644 --- a/pkg/scheduler/frameworkext/topologymanager/policy_single_numa_node.go +++ b/pkg/scheduler/frameworkext/topologymanager/policy_single_numa_node.go @@ -18,6 +18,7 @@ limitations under the License. package topologymanager import ( + apiext "github.com/koordinator-sh/koordinator/apis/extension" "github.com/koordinator-sh/koordinator/pkg/util/bitmask" ) @@ -62,11 +63,11 @@ func filterSingleNumaHints(allResourcesHints [][]NUMATopologyHint) [][]NUMATopol return filteredResourcesHints } -func (p *singleNumaNodePolicy) Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) { +func (p *singleNumaNodePolicy) Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) (NUMATopologyHint, bool) { filteredHints := filterProvidersHints(providersHints) // Filter to only include don't care and hints with a single NUMA node. singleNumaHints := filterSingleNumaHints(filteredHints) - bestHint := mergeFilteredHints(p.numaNodes, singleNumaHints) + bestHint := mergeFilteredHints(p.numaNodes, singleNumaHints, exclusivePolicy, allNUMANodeStatus) defaultAffinity, _ := bitmask.NewBitMask(p.numaNodes...) if bestHint.NUMANodeAffinity.IsEqual(defaultAffinity) { diff --git a/pkg/scheduler/frameworkext/topologymanager/policy_test.go b/pkg/scheduler/frameworkext/topologymanager/policy_test.go index 95193479c..26a7f985b 100644 --- a/pkg/scheduler/frameworkext/topologymanager/policy_test.go +++ b/pkg/scheduler/frameworkext/topologymanager/policy_test.go @@ -26,6 +26,8 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/scheduler/framework" + "github.com/koordinator-sh/koordinator/apis/extension" + apiext "github.com/koordinator-sh/koordinator/apis/extension" "github.com/koordinator-sh/koordinator/pkg/util/bitmask" ) @@ -928,9 +930,130 @@ func testPolicyMerge(policy Policy, tcases []policyMergeTestCase, t *testing.T) providersHints = append(providersHints, hints) } - actual, _ := policy.Merge(providersHints) + actual, _ := policy.Merge(providersHints, extension.NumaTopologyExclusivePreferred, []extension.NumaNodeStatus{}) if !reflect.DeepEqual(actual, tc.expected) { t.Errorf("%v: Expected Topology Hint to be %v, got %v:", tc.name, tc.expected, actual) } } } + +func Test_checkExclusivePolicy(t *testing.T) { + type args struct { + affinity NUMATopologyHint + exclusivePolicy apiext.NumaTopologyExclusive + allNUMANodeStatus []apiext.NumaNodeStatus + } + tests := []struct { + name string + args args + want bool + }{ + // TODO: Add test cases. + { + name: "preferred policy 1", + args: args{ + affinity: NUMATopologyHint{NUMANodeAffinity: NewTestBitMask(0), Preferred: true, Score: 0}, + exclusivePolicy: apiext.NumaTopologyExclusivePreferred, + allNUMANodeStatus: []apiext.NumaNodeStatus{apiext.NumaNodeStatusShared, apiext.NumaNodeStatusShared}, + }, + want: true, + }, + { + name: "preferred policy 2", + args: args{ + affinity: NUMATopologyHint{NUMANodeAffinity: NewTestBitMask(0, 1), Preferred: true, Score: 0}, + exclusivePolicy: apiext.NumaTopologyExclusivePreferred, + allNUMANodeStatus: []apiext.NumaNodeStatus{apiext.NumaNodeStatusShared, apiext.NumaNodeStatusShared}, + }, + want: true, + }, + { + name: "preferred policy 3", + args: args{ + affinity: NUMATopologyHint{NUMANodeAffinity: NewTestBitMask(0), Preferred: true, Score: 0}, + exclusivePolicy: apiext.NumaTopologyExclusivePreferred, + allNUMANodeStatus: []apiext.NumaNodeStatus{apiext.NumaNodeStatusIdle, apiext.NumaNodeStatusSingle}, + }, + want: true, + }, + { + name: "preferred policy 4", + args: args{ + affinity: NUMATopologyHint{NUMANodeAffinity: NewTestBitMask(0, 1), Preferred: true, Score: 0}, + exclusivePolicy: apiext.NumaTopologyExclusivePreferred, + allNUMANodeStatus: []apiext.NumaNodeStatus{apiext.NumaNodeStatusIdle, apiext.NumaNodeStatusSingle}, + }, + want: true, + }, + { + name: "required policy 1", + args: args{ + affinity: NUMATopologyHint{NUMANodeAffinity: NewTestBitMask(0), Preferred: true, Score: 0}, + exclusivePolicy: apiext.NumaTopologyExclusiveRequired, + allNUMANodeStatus: []apiext.NumaNodeStatus{apiext.NumaNodeStatusIdle, apiext.NumaNodeStatusSingle}, + }, + want: true, + }, + { + name: "required policy 2", + args: args{ + affinity: NUMATopologyHint{NUMANodeAffinity: NewTestBitMask(0), Preferred: true, Score: 0}, + exclusivePolicy: apiext.NumaTopologyExclusiveRequired, + allNUMANodeStatus: []apiext.NumaNodeStatus{apiext.NumaNodeStatusShared, apiext.NumaNodeStatusSingle}, + }, + want: false, + }, + { + name: "required policy 3", + args: args{ + affinity: NUMATopologyHint{NUMANodeAffinity: NewTestBitMask(0), Preferred: true, Score: 0}, + exclusivePolicy: apiext.NumaTopologyExclusiveRequired, + allNUMANodeStatus: []apiext.NumaNodeStatus{apiext.NumaNodeStatusShared, apiext.NumaNodeStatusShared}, + }, + want: false, + }, + { + name: "required policy 4", + args: args{ + affinity: NUMATopologyHint{NUMANodeAffinity: NewTestBitMask(0), Preferred: true, Score: 0}, + exclusivePolicy: apiext.NumaTopologyExclusiveRequired, + allNUMANodeStatus: []apiext.NumaNodeStatus{apiext.NumaNodeStatusSingle, apiext.NumaNodeStatusShared}, + }, + want: true, + }, + { + name: "required policy 5", + args: args{ + affinity: NUMATopologyHint{NUMANodeAffinity: NewTestBitMask(0, 1), Preferred: true, Score: 0}, + exclusivePolicy: apiext.NumaTopologyExclusiveRequired, + allNUMANodeStatus: []apiext.NumaNodeStatus{apiext.NumaNodeStatusShared, apiext.NumaNodeStatusSingle}, + }, + want: false, + }, + { + name: "required policy 6", + args: args{ + affinity: NUMATopologyHint{NUMANodeAffinity: NewTestBitMask(0, 1), Preferred: true, Score: 0}, + exclusivePolicy: apiext.NumaTopologyExclusiveRequired, + allNUMANodeStatus: []apiext.NumaNodeStatus{apiext.NumaNodeStatusShared, apiext.NumaNodeStatusIdle}, + }, + want: true, + }, + { + name: "required policy 7", + args: args{ + affinity: NUMATopologyHint{NUMANodeAffinity: NewTestBitMask(0, 1), Preferred: true, Score: 0}, + exclusivePolicy: apiext.NumaTopologyExclusiveRequired, + allNUMANodeStatus: []apiext.NumaNodeStatus{apiext.NumaNodeStatusShared, apiext.NumaNodeStatusShared}, + }, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := checkExclusivePolicy(tt.args.affinity, tt.args.exclusivePolicy, tt.args.allNUMANodeStatus); got != tt.want { + t.Errorf("checkExclusivePolicy() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/scheduler/plugins/deviceshare/topology_hint.go b/pkg/scheduler/plugins/deviceshare/topology_hint.go index 354559ceb..044f74936 100644 --- a/pkg/scheduler/plugins/deviceshare/topology_hint.go +++ b/pkg/scheduler/plugins/deviceshare/topology_hint.go @@ -157,11 +157,11 @@ func (p *Plugin) generateTopologyHints(cycleState *framework.CycleState, state * } } - for resourceName, affinitySize := range minAffinitySize { - if nodeCount < affinitySize { - minAffinitySize[resourceName] = nodeCount - } - } + // for resourceName, affinitySize := range minAffinitySize { + // if nodeCount < affinitySize { + // minAffinitySize[resourceName] = nodeCount + // } + // } allocateResult, status := p.tryAllocateFromReservation(allocator, state, restoreState, restoreState.matched, node, preemptible, false) if !status.IsSuccess() { @@ -175,7 +175,10 @@ func (p *Plugin) generateTopologyHints(cycleState *framework.CycleState, state * } } - for resourceName := range minAffinitySize { + for resourceName, affinitySize := range minAffinitySize { + if nodeCount < affinitySize { + minAffinitySize[resourceName] = nodeCount + } if _, ok := hints[string(resourceName)]; !ok { hints[string(resourceName)] = []topologymanager.NUMATopologyHint{} } diff --git a/pkg/scheduler/plugins/nodenumaresource/node_allocation.go b/pkg/scheduler/plugins/nodenumaresource/node_allocation.go index ae4a2515f..9cd352af1 100644 --- a/pkg/scheduler/plugins/nodenumaresource/node_allocation.go +++ b/pkg/scheduler/plugins/nodenumaresource/node_allocation.go @@ -22,6 +22,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" quotav1 "k8s.io/apiserver/pkg/quota/v1" "github.com/koordinator-sh/koordinator/apis/extension" @@ -35,6 +36,8 @@ type NodeAllocation struct { allocatedPods map[types.UID]PodAllocation allocatedCPUs CPUDetails allocatedResources map[int]*NUMANodeResource + sharedNode map[int]sets.String + singleNUMANode map[int]sets.String } type PodAllocation struct { @@ -46,12 +49,32 @@ type PodAllocation struct { NUMANodeResources []NUMANodeResource `json:"numaNodeResources,omitempty"` } +func (n *NodeAllocation) GetAllNUMANodeStatus(numaNodes int) []extension.NumaNodeStatus { + status := make([]extension.NumaNodeStatus, 0, numaNodes) + for i := 0; i < numaNodes; i++ { + status = append(status, n.NUMANodeSharedStatus(i)) + } + return status +} + +func (n *NodeAllocation) NUMANodeSharedStatus(nodeid int) extension.NumaNodeStatus { + if len(n.singleNUMANode[nodeid]) == 0 && len(n.sharedNode[nodeid]) == 0 { + return extension.NumaNodeStatusIdle + } + if len(n.singleNUMANode[nodeid]) > 0 && len(n.sharedNode[nodeid]) == 0 { + return extension.NumaNodeStatusSingle + } + return extension.NumaNodeStatusShared +} + func NewNodeAllocation(nodeName string) *NodeAllocation { return &NodeAllocation{ nodeName: nodeName, allocatedPods: map[types.UID]PodAllocation{}, allocatedCPUs: NewCPUDetails(), allocatedResources: map[int]*NUMANodeResource{}, + sharedNode: map[int]sets.String{}, + singleNUMANode: map[int]sets.String{}, } } @@ -78,6 +101,7 @@ func (n *NodeAllocation) addPodAllocation(request *PodAllocation, cpuTopology *C return } n.allocatedPods[request.UID] = *request + usedNUMA := sets.NewInt() for _, cpuID := range request.CPUSet.ToSliceNoSort() { cpuInfo, ok := n.allocatedCPUs[cpuID] @@ -87,6 +111,23 @@ func (n *NodeAllocation) addPodAllocation(request *PodAllocation, cpuTopology *C cpuInfo.ExclusivePolicy = request.CPUExclusivePolicy cpuInfo.RefCount++ n.allocatedCPUs[cpuID] = cpuInfo + usedNUMA.Insert(cpuInfo.NodeID) + } + if len(usedNUMA) > 1 { + for ni := range usedNUMA { + if ps := n.sharedNode[ni]; ps == nil { + n.sharedNode[ni] = sets.NewString(string(request.UID)) + } else { + n.sharedNode[ni].Insert(string(request.UID)) + } + } + } else if len(usedNUMA) == 1 { + ni := usedNUMA.UnsortedList()[0] + if ps := n.singleNUMANode[ni]; ps == nil { + n.singleNUMANode[ni] = sets.NewString(string(request.UID)) + } else { + n.singleNUMANode[ni].Insert(string(request.UID)) + } } for nodeID, numaNodeRes := range request.NUMANodeResources { @@ -109,6 +150,7 @@ func (n *NodeAllocation) release(podUID types.UID) { } delete(n.allocatedPods, podUID) + usedNUMA := sets.NewInt() for _, cpuID := range request.CPUSet.ToSliceNoSort() { cpuInfo, ok := n.allocatedCPUs[cpuID] if !ok { @@ -120,6 +162,11 @@ func (n *NodeAllocation) release(podUID types.UID) { } else { n.allocatedCPUs[cpuID] = cpuInfo } + usedNUMA.Insert(cpuInfo.NodeID) + } + for ni := range usedNUMA { + delete(n.sharedNode[ni], string(podUID)) + delete(n.singleNUMANode[ni], string(podUID)) } for _, numaNodeRes := range request.NUMANodeResources { diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin.go b/pkg/scheduler/plugins/nodenumaresource/plugin.go index 92f4c50fc..30d05da7f 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin.go +++ b/pkg/scheduler/plugins/nodenumaresource/plugin.go @@ -46,6 +46,7 @@ const ( ) const ( + ErrNotMatchNUMATopology = "node(s) NUMA Topology policy not match" ErrInvalidRequestedCPUs = "the requested CPUs must be integer" ErrInvalidCPUTopology = "node(s) invalid CPU Topology" ErrSMTAlignmentError = "node(s) requested cpus not multiple cpus per core" @@ -180,6 +181,8 @@ type preFilterState struct { requiredCPUBindPolicy schedulingconfig.CPUBindPolicy preferredCPUBindPolicy schedulingconfig.CPUBindPolicy preferredCPUExclusivePolicy schedulingconfig.CPUExclusivePolicy + podNUMATopologyPolicy extension.NUMATopologyPolicy + podNUMAExclusive extension.NumaTopologyExclusive numCPUsNeeded int allocation *PodAllocation } @@ -221,6 +224,10 @@ func (p *Plugin) PreFilter(ctx context.Context, cycleState *framework.CycleState if err != nil { return nil, framework.NewStatus(framework.Error, err.Error()) } + numaSpec, err := extension.GetNumaTopologySpec(pod.Annotations) + if err != nil { + return nil, framework.NewStatus(framework.Error, err.Error()) + } requests, _ := resourceapi.PodRequestsAndLimits(pod) if quotav1.IsZero(requests) { @@ -231,9 +238,11 @@ func (p *Plugin) PreFilter(ctx context.Context, cycleState *framework.CycleState } requestedCPU := requests.Cpu().MilliValue() state := &preFilterState{ - requestCPUBind: false, - requests: requests, - numCPUsNeeded: int(requestedCPU / 1000), + requestCPUBind: false, + requests: requests, + numCPUsNeeded: int(requestedCPU / 1000), + podNUMATopologyPolicy: numaSpec.NUMATopologyPolicy, + podNUMAExclusive: numaSpec.SingleNUMANodeExclusive, } if AllowUseCPUSet(pod) { cpuBindPolicy := schedulingconfig.CPUBindPolicy(resourceSpec.PreferredCPUBindPolicy) @@ -284,7 +293,18 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p node := nodeInfo.Node() topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name) nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy) + podNUMAExclusive := state.podNUMAExclusive + podNUMATopologyPolicy := state.podNUMATopologyPolicy + // when numa topology policy is set on node, we should maintain the same behavior as before, so we only + // set default podNUMAExclusive when podNUMATopologyPolicy is not none + if podNUMAExclusive == "" && podNUMATopologyPolicy != "" { + podNUMAExclusive = extension.NumaTopologyExclusiveRequired + } numaTopologyPolicy := getNUMATopologyPolicy(node.Labels, topologyOptions.NUMATopologyPolicy) + numaTopologyPolicy, err := mergeTopologyPolicy(numaTopologyPolicy, podNUMATopologyPolicy) + if err != nil { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrNotMatchNUMATopology) + } requestCPUBind, status := requestCPUBind(state, nodeCPUBindPolicy) if !status.IsSuccess() { return status @@ -331,7 +351,7 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p } if numaTopologyPolicy != extension.NUMATopologyPolicyNone { - return p.FilterByNUMANode(ctx, cycleState, pod, node.Name, numaTopologyPolicy, topologyOptions) + return p.FilterByNUMANode(ctx, cycleState, pod, node.Name, numaTopologyPolicy, podNUMAExclusive, topologyOptions) } return nil @@ -388,7 +408,10 @@ func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, node := nodeInfo.Node() topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name) nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy) + podNUMATopologyPolicy := state.podNUMATopologyPolicy numaTopologyPolicy := getNUMATopologyPolicy(node.Labels, topologyOptions.NUMATopologyPolicy) + // we have check in filter, so we will not get error in reserve + numaTopologyPolicy, _ = mergeTopologyPolicy(numaTopologyPolicy, podNUMATopologyPolicy) requestCPUBind, status := requestCPUBind(state, nodeCPUBindPolicy) if !status.IsSuccess() { return status diff --git a/pkg/scheduler/plugins/nodenumaresource/resource_manager.go b/pkg/scheduler/plugins/nodenumaresource/resource_manager.go index 458eafd30..e36557f24 100644 --- a/pkg/scheduler/plugins/nodenumaresource/resource_manager.go +++ b/pkg/scheduler/plugins/nodenumaresource/resource_manager.go @@ -567,12 +567,12 @@ func (g *hintsGenerator) generateHints(mask bitmask.BitMask, score int64, totalA } nodeCount := mask.Count() - for _, resourceName := range resourceNames { - affinitySize := g.minAffinitySize[resourceName] - if nodeCount < affinitySize { - g.minAffinitySize[resourceName] = nodeCount - } - } + // for _, resourceName := range resourceNames { + // affinitySize := g.minAffinitySize[resourceName] + // if nodeCount < affinitySize { + // g.minAffinitySize[resourceName] = nodeCount + // } + // } for _, resourceName := range resourceNames { free, request := totalFree[resourceName], podRequests[resourceName] @@ -582,6 +582,10 @@ func (g *hintsGenerator) generateHints(mask bitmask.BitMask, score int64, totalA } for _, resourceName := range resourceNames { + affinitySize := g.minAffinitySize[resourceName] + if nodeCount < affinitySize { + g.minAffinitySize[resourceName] = nodeCount + } if _, ok := g.hints[string(resourceName)]; !ok { g.hints[string(resourceName)] = []topologymanager.NUMATopologyHint{} } diff --git a/pkg/scheduler/plugins/nodenumaresource/resource_manager_test.go b/pkg/scheduler/plugins/nodenumaresource/resource_manager_test.go index 21091011e..a37a7d5eb 100644 --- a/pkg/scheduler/plugins/nodenumaresource/resource_manager_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/resource_manager_test.go @@ -1573,7 +1573,7 @@ func TestResourceManagerGetTopologyHint(t *testing.T) { mask, _ := bitmask.NewBitMask(0, 1) return mask }(), - Preferred: false, + Preferred: true, }, }, }, diff --git a/pkg/scheduler/plugins/nodenumaresource/scoring.go b/pkg/scheduler/plugins/nodenumaresource/scoring.go index f34614c57..6f74a2a00 100644 --- a/pkg/scheduler/plugins/nodenumaresource/scoring.go +++ b/pkg/scheduler/plugins/nodenumaresource/scoring.go @@ -68,7 +68,10 @@ func (p *Plugin) Score(ctx context.Context, cycleState *framework.CycleState, po node := nodeInfo.Node() topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name) nodeCPUBindPolicy := extension.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy) + podNUMATopologyPolicy := state.podNUMATopologyPolicy numaTopologyPolicy := getNUMATopologyPolicy(node.Labels, topologyOptions.NUMATopologyPolicy) + // we have check in filter, so we will not get error in reserve + numaTopologyPolicy, _ = mergeTopologyPolicy(numaTopologyPolicy, podNUMATopologyPolicy) requestCPUBind, status := requestCPUBind(state, nodeCPUBindPolicy) if !status.IsSuccess() { return 0, status diff --git a/pkg/scheduler/plugins/nodenumaresource/topology_hint.go b/pkg/scheduler/plugins/nodenumaresource/topology_hint.go index 43f3b95a9..0f4eb4192 100644 --- a/pkg/scheduler/plugins/nodenumaresource/topology_hint.go +++ b/pkg/scheduler/plugins/nodenumaresource/topology_hint.go @@ -27,7 +27,7 @@ import ( "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext/topologymanager" ) -func (p *Plugin) FilterByNUMANode(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, policyType apiext.NUMATopologyPolicy, topologyOptions TopologyOptions) *framework.Status { +func (p *Plugin) FilterByNUMANode(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NumaTopologyExclusive, topologyOptions TopologyOptions) *framework.Status { if policyType == apiext.NUMATopologyPolicyNone { return nil } @@ -35,7 +35,8 @@ func (p *Plugin) FilterByNUMANode(ctx context.Context, cycleState *framework.Cyc if len(numaNodes) == 0 { return framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) missing NUMA resources") } - return p.handle.(frameworkext.FrameworkExtender).RunNUMATopologyManagerAdmit(ctx, cycleState, pod, nodeName, numaNodes, policyType) + numaNodesStatus := p.resourceManager.GetNodeAllocation(nodeName).GetAllNUMANodeStatus(len(numaNodes)) + return p.handle.(frameworkext.FrameworkExtender).RunNUMATopologyManagerAdmit(ctx, cycleState, pod, nodeName, numaNodes, policyType, exclusivePolicy, numaNodesStatus) } func (p *Plugin) GetPodTopologyHints(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) (map[string][]topologymanager.NUMATopologyHint, *framework.Status) { @@ -49,6 +50,10 @@ func (p *Plugin) GetPodTopologyHints(ctx context.Context, cycleState *framework. } node := nodeInfo.Node() topologyOptions := p.topologyOptionsManager.GetTopologyOptions(nodeName) + podNUMATopologyPolicy := state.podNUMATopologyPolicy + numaTopologyPolicy := getNUMATopologyPolicy(node.Labels, topologyOptions.NUMATopologyPolicy) + // we have check in filter, so we will not get error in reserve + numaTopologyPolicy, _ = mergeTopologyPolicy(numaTopologyPolicy, podNUMATopologyPolicy) nodeCPUBindPolicy := apiext.GetNodeCPUBindPolicy(node.Labels, topologyOptions.Policy) requestCPUBind, status := requestCPUBind(state, nodeCPUBindPolicy) if !status.IsSuccess() { @@ -59,7 +64,7 @@ func (p *Plugin) GetPodTopologyHints(ctx context.Context, cycleState *framework. return nil, framework.AsStatus(err) } resourceOptions.numaScorer = p.numaScorer - hints, err := p.resourceManager.GetTopologyHints(node, pod, resourceOptions, topologyOptions.NUMATopologyPolicy) + hints, err := p.resourceManager.GetTopologyHints(node, pod, resourceOptions, numaTopologyPolicy) if err != nil { return nil, framework.NewStatus(framework.Unschedulable, "node(s) Insufficient NUMA Node resources") } diff --git a/pkg/scheduler/plugins/nodenumaresource/util.go b/pkg/scheduler/plugins/nodenumaresource/util.go index 21ed563e0..8bba59ad7 100644 --- a/pkg/scheduler/plugins/nodenumaresource/util.go +++ b/pkg/scheduler/plugins/nodenumaresource/util.go @@ -17,6 +17,8 @@ limitations under the License. package nodenumaresource import ( + "errors" + corev1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -49,6 +51,16 @@ func AllowUseCPUSet(pod *corev1.Pod) bool { return (qosClass == extension.QoSLSE || qosClass == extension.QoSLSR) && priorityClass == extension.PriorityProd } +func mergeTopologyPolicy(nodePolicy, podPolicy extension.NUMATopologyPolicy) (extension.NUMATopologyPolicy, error) { + if nodePolicy != "" && podPolicy != "" && podPolicy != nodePolicy { + return "", errors.New(ErrNotMatchNUMATopology) + } + if podPolicy != "" { + nodePolicy = podPolicy + } + return nodePolicy, nil +} + func getNUMATopologyPolicy(nodeLabels map[string]string, kubeletTopologyManagerPolicy extension.NUMATopologyPolicy) extension.NUMATopologyPolicy { policyType := extension.GetNodeNUMATopologyPolicy(nodeLabels) if policyType != extension.NUMATopologyPolicyNone { diff --git a/pkg/scheduler/plugins/nodenumaresource/util_test.go b/pkg/scheduler/plugins/nodenumaresource/util_test.go index 853f56db0..b0eaf2864 100644 --- a/pkg/scheduler/plugins/nodenumaresource/util_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/util_test.go @@ -17,8 +17,12 @@ limitations under the License. package nodenumaresource import ( + "errors" + "reflect" "testing" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -99,3 +103,64 @@ func Test_getCPUBindPolicy(t *testing.T) { }) } } + +func Test_mergeTopologyPolicy(t *testing.T) { + type args struct { + nodePolicy extension.NUMATopologyPolicy + podPolicy extension.NUMATopologyPolicy + } + tests := []struct { + name string + args args + want extension.NUMATopologyPolicy + wantErr error + }{ + // TODO: Add test cases. + { + name: "no policy on pod", + args: args{ + nodePolicy: extension.NUMATopologyPolicyRestricted, + podPolicy: extension.NUMATopologyPolicyNone, + }, + want: extension.NUMATopologyPolicyRestricted, + }, + { + name: "policy on pod", + args: args{ + nodePolicy: extension.NUMATopologyPolicyRestricted, + podPolicy: extension.NUMATopologyPolicyRestricted, + }, + want: extension.NUMATopologyPolicyRestricted, + }, + { + name: "policy on pod not match policy on node", + args: args{ + nodePolicy: extension.NUMATopologyPolicyRestricted, + podPolicy: extension.NUMATopologyPolicyBestEffort, + }, + want: extension.NUMATopologyPolicyNone, + wantErr: errors.New(ErrNotMatchNUMATopology), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := mergeTopologyPolicy(tt.args.nodePolicy, tt.args.podPolicy) + if err == nil && err != tt.wantErr { + t.Errorf("mergeTopologyPolicy() error = %v, wantErr %v", err, tt.wantErr) + return + } else if tt.wantErr == nil && err != tt.wantErr { + t.Errorf("mergeTopologyPolicy() error = %v, wantErr %v", err, tt.wantErr) + return + } + if err != nil && tt.wantErr != nil { + if diff := cmp.Diff(err.Error(), tt.wantErr.Error(), cmpopts.EquateErrors()); diff != "" { + t.Errorf("mergeTopologyPolicy() error = %v, wantErr %v, diff: %v", err, tt.wantErr, diff) + return + } + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("mergeTopologyPolicy() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/test/e2e/scheduling/nodenumaresource.go b/test/e2e/scheduling/nodenumaresource.go index 6f7a01cdf..5d67bb694 100644 --- a/test/e2e/scheduling/nodenumaresource.go +++ b/test/e2e/scheduling/nodenumaresource.go @@ -385,6 +385,88 @@ var _ = SIGDescribe("NodeNUMAResource", func() { } }) + framework.ConformanceIt("NUMA topology is set on pod, the pod is scheduled on a node without any policy", func() { + nrt := getSuitableNodeResourceTopology(nrtClient, 2) + node, err := f.ClientSet.CoreV1().Nodes().Get(context.TODO(), nrt.Name, metav1.GetOptions{}) + framework.ExpectNoError(err, "unable to get node") + targetNodeName = node.Name + ginkgo.By("Create two pods allocate 56% resources of Node, and every Pod allocates 28% resources per NUMA Node") + cpuQuantity := node.Status.Allocatable[corev1.ResourceCPU] + memoryQuantity := node.Status.Allocatable[corev1.ResourceMemory] + percent := intstr.FromString("28%") + cpu, _ := intstr.GetScaledValueFromIntOrPercent(&percent, int(cpuQuantity.MilliValue()), false) + memory, _ := intstr.GetScaledValueFromIntOrPercent(&percent, int(memoryQuantity.Value()), false) + requests := corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewMilliQuantity(int64(cpu), resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(int64(memory), resource.DecimalSI), + } + rsConfig := pauseRSConfig{ + Replicas: int32(2), + PodConfig: pausePodConfig{ + Name: "request-resource-with-single-numa-pod", + Namespace: f.Namespace.Name, + Labels: map[string]string{ + "test-app": "true", + }, + Annotations: map[string]string{ + extension.AnnotationNumaTopologySpec: `{"numaTopologyPolicy":"SingleNUMANode"}`, + }, + Resources: &corev1.ResourceRequirements{ + Limits: requests, + Requests: requests, + }, + SchedulerName: koordSchedulerName, + NodeName: node.Name, + }, + } + runPauseRS(f, rsConfig) + + ginkgo.By("Check the two pods allocated in two NUMA Nodes") + podList, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{}) + framework.ExpectNoError(err) + nodes := sets.NewInt() + for i := range podList.Items { + pod := &podList.Items[i] + ginkgo.By(fmt.Sprintf("pod %q, resourceStatus: %s", pod.Name, pod.Annotations[extension.AnnotationResourceStatus])) + resourceStatus, err := extension.GetResourceStatus(pod.Annotations) + framework.ExpectNoError(err, "invalid resourceStatus") + gomega.Expect(len(resourceStatus.NUMANodeResources)).Should(gomega.Equal(1)) + r := equality.Semantic.DeepEqual(resourceStatus.NUMANodeResources[0].Resources, requests) + gomega.Expect(r).Should(gomega.Equal(true)) + gomega.Expect(false).Should(gomega.Equal(nodes.Has(int(resourceStatus.NUMANodeResources[0].Node)))) + nodes.Insert(int(resourceStatus.NUMANodeResources[0].Node)) + } + gomega.Expect(nodes.Len()).Should(gomega.Equal(2)) + + ginkgo.By("Create the third Pod allocates 30% resources of Node, expect it failed to schedule") + percent = intstr.FromString("30%") + cpu, _ = intstr.GetScaledValueFromIntOrPercent(&percent, int(cpuQuantity.MilliValue()), false) + memory, _ = intstr.GetScaledValueFromIntOrPercent(&percent, int(memoryQuantity.Value()), false) + requests = corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewMilliQuantity(int64(cpu), resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(int64(memory), resource.DecimalSI), + } + pod := createPausePod(f, pausePodConfig{ + Name: "must-be-failed-pod", + Namespace: f.Namespace.Name, + Resources: &corev1.ResourceRequirements{ + Limits: requests, + Requests: requests, + }, + SchedulerName: koordSchedulerName, + NodeName: node.Name, + }) + ginkgo.By("Wait for Pod schedule failed") + framework.ExpectNoError(e2epod.WaitForPodCondition(f.ClientSet, pod.Namespace, pod.Name, "wait for pod schedule failed", 60*time.Second, func(pod *corev1.Pod) (bool, error) { + _, scheduledCondition := k8spodutil.GetPodCondition(&pod.Status, corev1.PodScheduled) + if scheduledCondition != nil && scheduledCondition.Status == corev1.ConditionFalse && + strings.Contains(scheduledCondition.Message, "NUMA Topology affinity") { + return true, nil + } + return false, nil + })) + }) + // SingleNUMANode with 2 NUMA Nodes framework.ConformanceIt("SingleNUMANode with 2 NUMA Nodes", func() { nrt := getSuitableNodeResourceTopology(nrtClient, 2)