Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: support numa topology policy on pod #1939

Merged
merged 1 commit into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions apis/extension/numa_aware.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (

// Defines the pod level annotations and labels
const (
// AnnotationNUMATopologySpec represents numa allocation API defined by Koordinator.
// The user specifies the desired numa policy by setting the annotation.
AnnotationNUMATopologySpec = SchedulingDomainPrefix + "/numa-topology-spec"
// AnnotationResourceSpec represents resource allocation API defined by Koordinator.
// The user specifies the desired CPU orchestration policy by setting the annotation.
AnnotationResourceSpec = SchedulingDomainPrefix + "/resource-spec"
Expand Down Expand Up @@ -67,6 +70,15 @@ type ResourceSpec struct {
PreferredCPUExclusivePolicy CPUExclusivePolicy `json:"preferredCPUExclusivePolicy,omitempty"`
}

type NUMATopologySpec struct {
// NUMATopologyPolicy represents the numa topology policy when schedule pod
NUMATopologyPolicy NUMATopologyPolicy `json:"numaTopologyPolicy,omitempty"`
// SingleNUMANodeExclusive represents whether a Pod that will use a single NUMA node/multiple NUMA nodes
// on a NUMA node can be scheduled to use the NUMA node when another Pod that uses multiple NUMA nodes/a single NUMA node
// is already running on the same node.
SingleNUMANodeExclusive NumaTopologyExclusive `json:"singleNUMANodeExclusive,omitempty"`
}

// ResourceStatus describes resource allocation result, such as how to bind CPU.
type ResourceStatus struct {
// CPUSet represents the allocated CPUs. It is Linux CPU list formatted string.
Expand Down Expand Up @@ -135,6 +147,21 @@ const (
NodeNUMAAllocateStrategyMostAllocated = NUMAMostAllocated
)

type NumaTopologyExclusive string
KunWuLuan marked this conversation as resolved.
Show resolved Hide resolved

const (
NumaTopologyExclusivePreferred NumaTopologyExclusive = "Preferred"
NumaTopologyExclusiveRequired NumaTopologyExclusive = "Required"
)

type NumaNodeStatus string

const (
NumaNodeStatusIdle NumaNodeStatus = "idle"
NumaNodeStatusShared NumaNodeStatus = "shared"
NumaNodeStatusSingle NumaNodeStatus = "single"
)

type NUMATopologyPolicy string

const (
Expand Down Expand Up @@ -187,6 +214,19 @@ type KubeletCPUManagerPolicy struct {
ReservedCPUs string `json:"reservedCPUs,omitempty"`
}

func GetNUMATopologySpec(annotations map[string]string) (*NUMATopologySpec, error) {
numaSpec := &NUMATopologySpec{}
data, ok := annotations[AnnotationNUMATopologySpec]
if !ok {
return numaSpec, nil
}
err := json.Unmarshal([]byte(data), numaSpec)
if err != nil {
return nil, err
}
return numaSpec, nil
}

// GetResourceSpec parses ResourceSpec from annotations
func GetResourceSpec(annotations map[string]string) (*ResourceSpec, error) {
resourceSpec := &ResourceSpec{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/frameworkext/framework_extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,8 @@ func (ext *frameworkExtenderImpl) ForgetPod(pod *corev1.Pod) error {
return nil
}

func (ext *frameworkExtenderImpl) RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy) *framework.Status {
return ext.topologyManager.Admit(ctx, cycleState, pod, nodeName, numaNodes, policyType)
func (ext *frameworkExtenderImpl) RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) *framework.Status {
return ext.topologyManager.Admit(ctx, cycleState, pod, nodeName, numaNodes, policyType, exclusivePolicy, allNUMANodeStatus)
}

func (ext *frameworkExtenderImpl) GetNUMATopologyHintProvider() []topologymanager.NUMATopologyHintProvider {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/frameworkext/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type FrameworkExtender interface {
RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status
RunReservationScorePlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfos []*ReservationInfo, nodeName string) (PluginToReservationScores, *framework.Status)

RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy) *framework.Status
RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) *framework.Status

RunResizePod(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status
}
Expand Down
14 changes: 9 additions & 5 deletions pkg/scheduler/frameworkext/topologymanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

type Interface interface {
Admit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy) *framework.Status
Admit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) *framework.Status
}

type NUMATopologyHintProvider interface {
Expand Down Expand Up @@ -55,7 +55,7 @@ func New(hintProviderFactory NUMATopologyHintProviderFactory) Interface {
}
}

func (m *topologyManager) Admit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy) *framework.Status {
func (m *topologyManager) Admit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) *framework.Status {
s, err := cycleState.Read(affinityStateKey)
if err != nil {
return framework.AsStatus(err)
Expand All @@ -64,7 +64,7 @@ func (m *topologyManager) Admit(ctx context.Context, cycleState *framework.Cycle

policy := createNUMATopologyPolicy(policyType, numaNodes)

bestHint, admit := m.calculateAffinity(ctx, cycleState, policy, pod, nodeName)
bestHint, admit := m.calculateAffinity(ctx, cycleState, policy, pod, nodeName, exclusivePolicy, allNUMANodeStatus)
klog.V(5).Infof("Best TopologyHint for (pod: %v): %v on node: %v", klog.KObj(pod), bestHint, nodeName)
if !admit {
return framework.NewStatus(framework.Unschedulable, "node(s) NUMA Topology affinity error")
Expand All @@ -79,9 +79,13 @@ func (m *topologyManager) Admit(ctx context.Context, cycleState *framework.Cycle
return nil
}

func (m *topologyManager) calculateAffinity(ctx context.Context, cycleState *framework.CycleState, policy Policy, pod *corev1.Pod, nodeName string) (NUMATopologyHint, bool) {
func (m *topologyManager) calculateAffinity(ctx context.Context, cycleState *framework.CycleState, policy Policy, pod *corev1.Pod, nodeName string, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) (NUMATopologyHint, bool) {
providersHints := m.accumulateProvidersHints(ctx, cycleState, pod, nodeName)
bestHint, admit := policy.Merge(providersHints)
bestHint, admit := policy.Merge(providersHints, exclusivePolicy, allNUMANodeStatus)
if !checkExclusivePolicy(bestHint, exclusivePolicy, allNUMANodeStatus) {
klog.V(5).Infof("bestHint violated the exclusivePolicy requirement: bestHint: %v, policy: %v, numaStatus: %v, nodeName: %v, pod: %v",
bestHint, exclusivePolicy, allNUMANodeStatus, nodeName, pod.Name)
}
klog.V(5).Infof("PodTopologyHint: %v", bestHint)
return bestHint, admit
}
Expand Down
31 changes: 29 additions & 2 deletions pkg/scheduler/frameworkext/topologymanager/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ package topologymanager
import (
"k8s.io/klog/v2"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/util/bitmask"
)

type Policy interface {
// Name returns Policy Name
Name() string
// Merge returns a merged NUMATopologyHint based on input from hint providers
Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool)
Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) (NUMATopologyHint, bool)
}

// NUMATopologyHint is a struct containing the NUMANodeAffinity for a Container
Expand Down Expand Up @@ -62,6 +63,29 @@ func (th *NUMATopologyHint) LessThan(other NUMATopologyHint) bool {
return th.NUMANodeAffinity.IsNarrowerThan(other.NUMANodeAffinity)
}

// Check if the affinity match the exclusive policy, return true if match or false otherwise.
func checkExclusivePolicy(affinity NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) bool {
// check bestHint again if default hint is the best
if affinity.NUMANodeAffinity == nil {
return false
}
if exclusivePolicy == apiext.NumaTopologyExclusiveRequired {
if affinity.NUMANodeAffinity.Count() > 1 {
// we should make sure no numa is in single state
for _, nodeid := range affinity.NUMANodeAffinity.GetBits() {
if allNUMANodeStatus[nodeid] == apiext.NumaNodeStatusSingle {
return false
}
}
} else {
if allNUMANodeStatus[affinity.NUMANodeAffinity.GetBits()[0]] == apiext.NumaNodeStatusShared {
return false
}
}
}
return true
}

// Merge a TopologyHints permutation to a single hint by performing a bitwise-AND
// of their affinity masks. The hint shall be preferred if all hits in the permutation
// are preferred.
Expand Down Expand Up @@ -126,7 +150,7 @@ func filterProvidersHints(providersHints []map[string][]NUMATopologyHint) [][]NU
return allProviderHints
}

func mergeFilteredHints(numaNodes []int, filteredHints [][]NUMATopologyHint) NUMATopologyHint {
func mergeFilteredHints(numaNodes []int, filteredHints [][]NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) NUMATopologyHint {
// Set the default affinity as an any-numa affinity containing the list
// of NUMA Nodes available on this machine.
defaultAffinity, _ := bitmask.NewBitMask(numaNodes...)
Expand All @@ -144,6 +168,9 @@ func mergeFilteredHints(numaNodes []int, filteredHints [][]NUMATopologyHint) NUM
if mergedHint.NUMANodeAffinity.Count() == 0 {
return
}
if !checkExclusivePolicy(mergedHint, exclusivePolicy, allNUMANodeStatus) {
mergedHint.Preferred = false
}

for _, v := range permutation {
if v.NUMANodeAffinity != nil && mergedHint.NUMANodeAffinity.IsEqual(v.NUMANodeAffinity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.

package topologymanager

import apiext "github.com/koordinator-sh/koordinator/apis/extension"

type bestEffortPolicy struct {
//List of NUMA Nodes available on the underlying machine
numaNodes []int
Expand All @@ -40,9 +42,9 @@ func (p *bestEffortPolicy) canAdmitPodResult(hint *NUMATopologyHint) bool {
return true
}

func (p *bestEffortPolicy) Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) {
func (p *bestEffortPolicy) Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) (NUMATopologyHint, bool) {
filteredProvidersHints := filterProvidersHints(providersHints)
bestHint := mergeFilteredHints(p.numaNodes, filteredProvidersHints)
bestHint := mergeFilteredHints(p.numaNodes, filteredProvidersHints, exclusivePolicy, allNUMANodeStatus)
admit := p.canAdmitPodResult(&bestHint)
return bestHint, admit
}
4 changes: 3 additions & 1 deletion pkg/scheduler/frameworkext/topologymanager/policy_none.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.

package topologymanager

import apiext "github.com/koordinator-sh/koordinator/apis/extension"

type nonePolicy struct{}

var _ Policy = &nonePolicy{}
Expand All @@ -37,6 +39,6 @@ func (p *nonePolicy) canAdmitPodResult(hint *NUMATopologyHint) bool {
return true
}

func (p *nonePolicy) Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) {
func (p *nonePolicy) Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) (NUMATopologyHint, bool) {
return NUMATopologyHint{}, p.canAdmitPodResult(nil)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package topologymanager

import (
"testing"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
)

func TestPolicyNoneName(t *testing.T) {
Expand Down Expand Up @@ -104,7 +106,7 @@ func TestPolicyNoneMerge(t *testing.T) {

for _, tc := range tcases {
policy := NewNonePolicy()
result, admit := policy.Merge(tc.providersHints)
result, admit := policy.Merge(tc.providersHints, apiext.NumaTopologyExclusivePreferred, []apiext.NumaNodeStatus{})
if !result.IsEqual(tc.expectedHint) || admit != tc.expectedAdmit {
t.Errorf("Test Case: %s: Expected merge hint to be %v, got %v", tc.name, tc.expectedHint, result)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.

package topologymanager

import apiext "github.com/koordinator-sh/koordinator/apis/extension"

type restrictedPolicy struct {
bestEffortPolicy
}
Expand All @@ -39,9 +41,9 @@ func (p *restrictedPolicy) canAdmitPodResult(hint *NUMATopologyHint) bool {
return hint.Preferred
}

func (p *restrictedPolicy) Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) {
func (p *restrictedPolicy) Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) (NUMATopologyHint, bool) {
filteredHints := filterProvidersHints(providersHints)
hint := mergeFilteredHints(p.numaNodes, filteredHints)
hint := mergeFilteredHints(p.numaNodes, filteredHints, exclusivePolicy, allNUMANodeStatus)
admit := p.canAdmitPodResult(&hint)
return hint, admit
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package topologymanager

import (
apiext "github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/util/bitmask"
)

Expand Down Expand Up @@ -62,11 +63,11 @@ func filterSingleNumaHints(allResourcesHints [][]NUMATopologyHint) [][]NUMATopol
return filteredResourcesHints
}

func (p *singleNumaNodePolicy) Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) {
func (p *singleNumaNodePolicy) Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) (NUMATopologyHint, bool) {
filteredHints := filterProvidersHints(providersHints)
// Filter to only include don't care and hints with a single NUMA node.
singleNumaHints := filterSingleNumaHints(filteredHints)
bestHint := mergeFilteredHints(p.numaNodes, singleNumaHints)
bestHint := mergeFilteredHints(p.numaNodes, singleNumaHints, exclusivePolicy, allNUMANodeStatus)

defaultAffinity, _ := bitmask.NewBitMask(p.numaNodes...)
if bestHint.NUMANodeAffinity.IsEqual(defaultAffinity) {
Expand Down
Loading