Skip to content

Commit

Permalink
support numa topology policy on pod
Browse files Browse the repository at this point in the history
Signed-off-by: KunWuLuan <kunwuluan@gmail.com>
  • Loading branch information
KunWuLuan committed Mar 6, 2024
1 parent 987c076 commit 6997a04
Show file tree
Hide file tree
Showing 16 changed files with 166 additions and 22 deletions.
21 changes: 21 additions & 0 deletions apis/extension/numa_aware.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ const (

// ResourceSpec describes extra attributes of the resource requirements.
type ResourceSpec struct {
// NumaTopologyPolicy represents the numa topology policy when schedule pod
NumaTopologyPolicy NUMATopologyPolicy `json:"numaTopologyPolicy,omitempty"`
// SingleNUMANodeExclusive represents whether a Pod that will use a single NUMA node/multiple NUMA nodes
// on a NUMA node can be scheduled to use the NUMA node when another Pod that uses multiple NUMA nodes/a single NUMA node
// is already running on the same node.
SingleNUMANodeExclusive NUMATopologyExclusive `json:"singleNUMANodeExclusive,omitempty"`
// RequiredCPUBindPolicy indicates that the CPU is allocated strictly
// according to the specified CPUBindPolicy, otherwise the scheduling fails
RequiredCPUBindPolicy CPUBindPolicy `json:"requiredCPUBindPolicy,omitempty"`
Expand Down Expand Up @@ -135,6 +141,21 @@ const (
NodeNUMAAllocateStrategyMostAllocated = NUMAMostAllocated
)

type NUMATopologyExclusive string

const (
NUMATopologyExclusivePreferred NUMATopologyExclusive = "Preferred"
NUMATopologyExclusiveRequired NUMATopologyExclusive = "Required"
)

type NUMANodeStatus string

const (
NUMANodeStatusIdle NUMANodeStatus = "idle"
NUMANodeStatusShared NUMANodeStatus = "shared"
NUMANodeStatusSingle NUMANodeStatus = "single"
)

type NUMATopologyPolicy string

const (
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/frameworkext/framework_extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,8 @@ func (ext *frameworkExtenderImpl) ForgetPod(pod *corev1.Pod) error {
return nil
}

func (ext *frameworkExtenderImpl) RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy) *framework.Status {
return ext.topologyManager.Admit(ctx, cycleState, pod, nodeName, numaNodes, policyType)
func (ext *frameworkExtenderImpl) RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) *framework.Status {
return ext.topologyManager.Admit(ctx, cycleState, pod, nodeName, numaNodes, policyType, exclusivePolicy, allNUMANodeStatus)
}

func (ext *frameworkExtenderImpl) GetNUMATopologyHintProvider() []topologymanager.NUMATopologyHintProvider {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/frameworkext/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type FrameworkExtender interface {
RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status
RunReservationScorePlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfos []*ReservationInfo, nodeName string) (PluginToReservationScores, *framework.Status)

RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy) *framework.Status
RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) *framework.Status

RunResizePod(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status
}
Expand Down
28 changes: 23 additions & 5 deletions pkg/scheduler/frameworkext/topologymanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

type Interface interface {
Admit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy) *framework.Status
Admit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) *framework.Status
}

type NUMATopologyHintProvider interface {
Expand Down Expand Up @@ -55,7 +55,7 @@ func New(hintProviderFactory NUMATopologyHintProviderFactory) Interface {
}
}

func (m *topologyManager) Admit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy) *framework.Status {
func (m *topologyManager) Admit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) *framework.Status {
s, err := cycleState.Read(affinityStateKey)
if err != nil {
return framework.AsStatus(err)
Expand All @@ -64,7 +64,7 @@ func (m *topologyManager) Admit(ctx context.Context, cycleState *framework.Cycle

policy := createNUMATopologyPolicy(policyType, numaNodes)

bestHint, admit := m.calculateAffinity(ctx, cycleState, policy, pod, nodeName)
bestHint, admit := m.calculateAffinity(ctx, cycleState, policy, pod, nodeName, exclusivePolicy, allNUMANodeStatus)
klog.V(5).Infof("Best TopologyHint for (pod: %v): %v on node: %v", klog.KObj(pod), bestHint, nodeName)
if !admit {
return framework.NewStatus(framework.Unschedulable, "node(s) NUMA Topology affinity error")
Expand All @@ -79,9 +79,27 @@ func (m *topologyManager) Admit(ctx context.Context, cycleState *framework.Cycle
return nil
}

func (m *topologyManager) calculateAffinity(ctx context.Context, cycleState *framework.CycleState, policy Policy, pod *corev1.Pod, nodeName string) (NUMATopologyHint, bool) {
func (m *topologyManager) calculateAffinity(ctx context.Context, cycleState *framework.CycleState, policy Policy, pod *corev1.Pod, nodeName string, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) (NUMATopologyHint, bool) {
providersHints := m.accumulateProvidersHints(ctx, cycleState, pod, nodeName)
bestHint, admit := policy.Merge(providersHints)
bestHint, admit := policy.Merge(providersHints, exclusivePolicy, allNUMANodeStatus)
if exclusivePolicy == apiext.NUMATopologyExclusiveRequired {
if bestHint.NUMANodeAffinity.Count() > 1 {
// we should make sure no numa is in single state
for _, nodeid := range bestHint.NUMANodeAffinity.GetBits() {
if allNUMANodeStatus[nodeid] == apiext.NUMANodeStatusSingle {
klog.V(5).Infof("bestHint violated the exclusivePolicy requirement: bestHint: %v, policy: %v, numaStatus: %v, nodeName: %v, pod: %v",
bestHint, exclusivePolicy, allNUMANodeStatus, nodeName, pod.Name)
return bestHint, false
}
}
} else {
if allNUMANodeStatus[bestHint.NUMANodeAffinity.GetBits()[0]] == apiext.NUMANodeStatusShared {
klog.V(5).Infof("bestHint violated the exclusivePolicy requirement: bestHint: %v, policy: %v, numaStatus: %v, nodeName: %v, pod: %v",
bestHint, exclusivePolicy, allNUMANodeStatus, nodeName, pod.Name)
return bestHint, false
}
}
}
klog.V(5).Infof("PodTopologyHint: %v", bestHint)
return bestHint, admit
}
Expand Down
19 changes: 17 additions & 2 deletions pkg/scheduler/frameworkext/topologymanager/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ package topologymanager
import (
"k8s.io/klog/v2"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/util/bitmask"
)

type Policy interface {
// Name returns Policy Name
Name() string
// Merge returns a merged NUMATopologyHint based on input from hint providers
Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool)
Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) (NUMATopologyHint, bool)
}

// NUMATopologyHint is a struct containing the NUMANodeAffinity for a Container
Expand Down Expand Up @@ -124,7 +125,7 @@ func filterProvidersHints(providersHints []map[string][]NUMATopologyHint) [][]NU
return allProviderHints
}

func mergeFilteredHints(numaNodes []int, filteredHints [][]NUMATopologyHint) NUMATopologyHint {
func mergeFilteredHints(numaNodes []int, filteredHints [][]NUMATopologyHint, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) NUMATopologyHint {
// Set the default affinity as an any-numa affinity containing the list
// of NUMA Nodes available on this machine.
defaultAffinity, _ := bitmask.NewBitMask(numaNodes...)
Expand All @@ -142,6 +143,20 @@ func mergeFilteredHints(numaNodes []int, filteredHints [][]NUMATopologyHint) NUM
if mergedHint.NUMANodeAffinity.Count() == 0 {
return
}
if exclusivePolicy == apiext.NUMATopologyExclusiveRequired {
if mergedHint.NUMANodeAffinity.Count() > 1 {
// we should make sure no numa is in single state
for _, nodeid := range mergedHint.NUMANodeAffinity.GetBits() {
if allNUMANodeStatus[nodeid] == apiext.NUMANodeStatusSingle {
return
}
}
} else {
if allNUMANodeStatus[mergedHint.NUMANodeAffinity.GetBits()[0]] == apiext.NUMANodeStatusShared {
return
}
}
}

for _, v := range permutation {
if v.NUMANodeAffinity != nil && mergedHint.NUMANodeAffinity.IsEqual(v.NUMANodeAffinity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.

package topologymanager

import apiext "github.com/koordinator-sh/koordinator/apis/extension"

type bestEffortPolicy struct {
//List of NUMA Nodes available on the underlying machine
numaNodes []int
Expand All @@ -40,9 +42,9 @@ func (p *bestEffortPolicy) canAdmitPodResult(hint *NUMATopologyHint) bool {
return true
}

func (p *bestEffortPolicy) Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) {
func (p *bestEffortPolicy) Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) (NUMATopologyHint, bool) {
filteredProvidersHints := filterProvidersHints(providersHints)
bestHint := mergeFilteredHints(p.numaNodes, filteredProvidersHints)
bestHint := mergeFilteredHints(p.numaNodes, filteredProvidersHints, exclusivePolicy, allNUMANodeStatus)
admit := p.canAdmitPodResult(&bestHint)
return bestHint, admit
}
4 changes: 3 additions & 1 deletion pkg/scheduler/frameworkext/topologymanager/policy_none.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.

package topologymanager

import apiext "github.com/koordinator-sh/koordinator/apis/extension"

type nonePolicy struct{}

var _ Policy = &nonePolicy{}
Expand All @@ -37,6 +39,6 @@ func (p *nonePolicy) canAdmitPodResult(hint *NUMATopologyHint) bool {
return true
}

func (p *nonePolicy) Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) {
func (p *nonePolicy) Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) (NUMATopologyHint, bool) {
return NUMATopologyHint{}, p.canAdmitPodResult(nil)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package topologymanager

import (
"testing"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
)

func TestPolicyNoneName(t *testing.T) {
Expand Down Expand Up @@ -104,7 +106,7 @@ func TestPolicyNoneMerge(t *testing.T) {

for _, tc := range tcases {
policy := NewNonePolicy()
result, admit := policy.Merge(tc.providersHints)
result, admit := policy.Merge(tc.providersHints, apiext.NUMATopologyExclusivePreferred, []apiext.NUMANodeStatus{})
if !result.IsEqual(tc.expectedHint) || admit != tc.expectedAdmit {
t.Errorf("Test Case: %s: Expected merge hint to be %v, got %v", tc.name, tc.expectedHint, result)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.

package topologymanager

import apiext "github.com/koordinator-sh/koordinator/apis/extension"

type restrictedPolicy struct {
bestEffortPolicy
}
Expand All @@ -39,9 +41,9 @@ func (p *restrictedPolicy) canAdmitPodResult(hint *NUMATopologyHint) bool {
return hint.Preferred
}

func (p *restrictedPolicy) Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) {
func (p *restrictedPolicy) Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) (NUMATopologyHint, bool) {
filteredHints := filterProvidersHints(providersHints)
hint := mergeFilteredHints(p.numaNodes, filteredHints)
hint := mergeFilteredHints(p.numaNodes, filteredHints, exclusivePolicy, allNUMANodeStatus)
admit := p.canAdmitPodResult(&hint)
return hint, admit
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package topologymanager

import (
apiext "github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/util/bitmask"
)

Expand Down Expand Up @@ -62,11 +63,11 @@ func filterSingleNumaHints(allResourcesHints [][]NUMATopologyHint) [][]NUMATopol
return filteredResourcesHints
}

func (p *singleNumaNodePolicy) Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) {
func (p *singleNumaNodePolicy) Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NUMATopologyExclusive, allNUMANodeStatus []apiext.NUMANodeStatus) (NUMATopologyHint, bool) {
filteredHints := filterProvidersHints(providersHints)
// Filter to only include don't care and hints with a single NUMA node.
singleNumaHints := filterSingleNumaHints(filteredHints)
bestHint := mergeFilteredHints(p.numaNodes, singleNumaHints)
bestHint := mergeFilteredHints(p.numaNodes, singleNumaHints, exclusivePolicy, allNUMANodeStatus)

defaultAffinity, _ := bitmask.NewBitMask(p.numaNodes...)
if bestHint.NUMANodeAffinity.IsEqual(defaultAffinity) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/frameworkext/topologymanager/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/scheduler/framework"

"github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/util/bitmask"
)

Expand Down Expand Up @@ -891,7 +892,7 @@ func testPolicyMerge(policy Policy, tcases []policyMergeTestCase, t *testing.T)
providersHints = append(providersHints, hints)
}

actual, _ := policy.Merge(providersHints)
actual, _ := policy.Merge(providersHints, extension.NUMATopologyExclusivePreferred, []extension.NUMANodeStatus{})
if !reflect.DeepEqual(actual, tc.expected) {
t.Errorf("%v: Expected Topology Hint to be %v, got %v:", tc.name, tc.expected, actual)
}
Expand Down
47 changes: 47 additions & 0 deletions pkg/scheduler/plugins/nodenumaresource/node_allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
quotav1 "k8s.io/apiserver/pkg/quota/v1"

"github.com/koordinator-sh/koordinator/apis/extension"
Expand All @@ -35,6 +36,8 @@ type NodeAllocation struct {
allocatedPods map[types.UID]PodAllocation
allocatedCPUs CPUDetails
allocatedResources map[int]*NUMANodeResource
sharedNode map[int]sets.String
singleNUMANode map[int]sets.String
}

type PodAllocation struct {
Expand All @@ -46,12 +49,32 @@ type PodAllocation struct {
NUMANodeResources []NUMANodeResource `json:"numaNodeResources,omitempty"`
}

func (n *NodeAllocation) GetAllNUMANodeStatus(numaNodes int) []extension.NUMANodeStatus {
status := make([]extension.NUMANodeStatus, numaNodes)
for i := 0; i < numaNodes; i++ {
status = append(status, n.NUMANodeSharedStatus(i))
}
return status
}

func (n *NodeAllocation) NUMANodeSharedStatus(nodeid int) extension.NUMANodeStatus {
if len(n.singleNUMANode[nodeid]) == 0 && len(n.sharedNode[nodeid]) == 0 {
return extension.NUMANodeStatusIdle
}
if len(n.singleNUMANode[nodeid]) > 0 && len(n.sharedNode[nodeid]) == 0 {
return extension.NUMANodeStatusSingle
}
return extension.NUMANodeStatusShared
}

func NewNodeAllocation(nodeName string) *NodeAllocation {
return &NodeAllocation{
nodeName: nodeName,
allocatedPods: map[types.UID]PodAllocation{},
allocatedCPUs: NewCPUDetails(),
allocatedResources: map[int]*NUMANodeResource{},
sharedNode: map[int]sets.String{},
singleNUMANode: map[int]sets.String{},
}
}

Expand All @@ -78,6 +101,7 @@ func (n *NodeAllocation) addPodAllocation(request *PodAllocation, cpuTopology *C
return
}
n.allocatedPods[request.UID] = *request
usedNUMA := sets.NewInt()

for _, cpuID := range request.CPUSet.ToSliceNoSort() {
cpuInfo, ok := n.allocatedCPUs[cpuID]
Expand All @@ -87,6 +111,23 @@ func (n *NodeAllocation) addPodAllocation(request *PodAllocation, cpuTopology *C
cpuInfo.ExclusivePolicy = request.CPUExclusivePolicy
cpuInfo.RefCount++
n.allocatedCPUs[cpuID] = cpuInfo
usedNUMA.Insert(cpuInfo.NodeID)
}
if len(usedNUMA) > 1 {
for ni := range usedNUMA {
if ps := n.sharedNode[ni]; ps == nil {
n.sharedNode[ni] = sets.NewString(string(request.UID))
} else {
n.sharedNode[ni].Insert(string(request.UID))
}
}
} else if len(usedNUMA) == 1 {
ni := usedNUMA.UnsortedList()[0]
if ps := n.singleNUMANode[ni]; ps == nil {
n.singleNUMANode[ni] = sets.NewString(string(request.UID))
} else {
n.singleNUMANode[ni].Insert(string(request.UID))
}
}

for nodeID, numaNodeRes := range request.NUMANodeResources {
Expand All @@ -109,6 +150,7 @@ func (n *NodeAllocation) release(podUID types.UID) {
}
delete(n.allocatedPods, podUID)

usedNUMA := sets.NewInt()
for _, cpuID := range request.CPUSet.ToSliceNoSort() {
cpuInfo, ok := n.allocatedCPUs[cpuID]
if !ok {
Expand All @@ -120,6 +162,11 @@ func (n *NodeAllocation) release(podUID types.UID) {
} else {
n.allocatedCPUs[cpuID] = cpuInfo
}
usedNUMA.Insert(cpuInfo.NodeID)
}
for ni := range usedNUMA {
delete(n.sharedNode[ni], string(podUID))
delete(n.singleNUMANode[ni], string(podUID))
}

for _, numaNodeRes := range request.NUMANodeResources {
Expand Down
Loading

0 comments on commit 6997a04

Please sign in to comment.