Skip to content

Commit

Permalink
scheduler: try best to distribute cpu and memory evenly across numa (#…
Browse files Browse the repository at this point in the history
…2017)

Signed-off-by: wangjianyu.wjy <wangjianyu.wjy@alibaba-inc.com>
Co-authored-by: wangjianyu.wjy <wangjianyu.wjy@alibaba-inc.com>
  • Loading branch information
ZiMengSheng and wangjianyu.wjy committed Apr 28, 2024
1 parent 9b4c5b1 commit e7624d0
Show file tree
Hide file tree
Showing 3 changed files with 778 additions and 34 deletions.
109 changes: 75 additions & 34 deletions pkg/scheduler/plugins/nodenumaresource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nodenumaresource
import (
"errors"
"fmt"
"sort"
"sync"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -151,15 +152,12 @@ func (c *resourceManager) trimNUMANodeResources(nodeName string, totalAvailable
if cpuQuantity.IsZero() {
continue
}
availableCPUs := cpuDetails.CPUsInNUMANodes(numaNode)
if int64(availableCPUs.Size()*1000) >= cpuQuantity.MilliValue() {
availableCPUs = filterCPUsByRequiredCPUBindPolicy(
options.cpuBindPolicy,
availableCPUs,
cpuDetails,
options.topologyOptions.CPUTopology.CPUsPerCore(),
)
}
availableCPUs := filterCPUsByRequiredCPUBindPolicy(
options.cpuBindPolicy,
cpuDetails.CPUsInNUMANodes(numaNode),
cpuDetails,
options.topologyOptions.CPUTopology.CPUsPerCore(),
)
if int64(availableCPUs.Size())*1000 < cpuQuantity.MilliValue() {
cpuQuantity.SetMilli(int64(availableCPUs.Size() * 1000))
available[corev1.ResourceCPU] = cpuQuantity
Expand Down Expand Up @@ -202,51 +200,94 @@ func (c *resourceManager) allocateResourcesByHint(node *corev1.Node, pod *corev1
return nil, err
}

if err := c.trimNUMANodeResources(node.Name, totalAvailable, options); err != nil {
return nil, err
}

var requests corev1.ResourceList
if options.requestCPUBind {
requests = options.originalRequests.DeepCopy()
} else {
requests = options.requests.DeepCopy()
}

intersectionResources := sets.NewString()
var result []NUMANodeResource
for _, numaNodeID := range options.hint.NUMANodeAffinity.GetBits() {
allocatable := totalAvailable[numaNodeID]
r := NUMANodeResource{
Node: numaNodeID,
Resources: corev1.ResourceList{},
result, reasons := tryBestToDistributeEvenly(requests, totalAvailable, options)
if len(reasons) > 0 {
return nil, framework.NewStatus(framework.Unschedulable, reasons...).AsError()
}
return result, nil
}

func tryBestToDistributeEvenly(requests corev1.ResourceList, totalAvailable map[int]corev1.ResourceList, options *ResourceOptions) ([]NUMANodeResource, []string) {
resourceNamesByNUMA := sets.NewString()
for _, available := range totalAvailable {
for resourceName := range available {
resourceNamesByNUMA.Insert(string(resourceName))
}
for resourceName, quantity := range requests {
if allocatableQuantity, ok := allocatable[resourceName]; ok {
intersectionResources.Insert(string(resourceName))
var allocated resource.Quantity
allocatable[resourceName], requests[resourceName], allocated = allocateRes(allocatableQuantity, quantity)
if !allocated.IsZero() {
r.Resources[resourceName] = allocated
}
sortedNUMANodeByResource := map[corev1.ResourceName][]int{}
numaNodes := options.hint.NUMANodeAffinity.GetBits()
for resourceName := range resourceNamesByNUMA {
sortedNUMANodes := make([]int, len(numaNodes))
copy(sortedNUMANodes, numaNodes)
sort.Slice(sortedNUMANodes, func(i, j int) bool {
iAvailableOfResource := totalAvailable[i][corev1.ResourceName(resourceName)]
return (&iAvailableOfResource).Cmp(totalAvailable[j][corev1.ResourceName(resourceName)]) < 0
})
sortedNUMANodeByResource[corev1.ResourceName(resourceName)] = sortedNUMANodes
}
allocatedNUMANodeResources := map[int]*NUMANodeResource{}
for resourceName, quantity := range requests {
for i, numaNodeID := range sortedNUMANodeByResource[resourceName] {
splittedQuantity := splitQuantity(resourceName, quantity, len(numaNodes)-i, options)
_, _, allocated := allocateRes(totalAvailable[numaNodeID][resourceName], splittedQuantity)
if !allocated.IsZero() {
allocatedNUMANodeResource := allocatedNUMANodeResources[numaNodeID]
if allocatedNUMANodeResource == nil {
allocatedNUMANodeResource = &NUMANodeResource{
Node: numaNodeID,
Resources: corev1.ResourceList{},
}
allocatedNUMANodeResources[numaNodeID] = allocatedNUMANodeResource
}
allocatedNUMANodeResource.Resources[resourceName] = allocated
quantity.Sub(allocated)
}
}
if !quotav1.IsZero(r.Resources) {
result = append(result, r)
}
if quotav1.IsZero(requests) {
break
}
requests[resourceName] = quantity
}

var reasons []string
for resourceName, quantity := range requests {
if intersectionResources.Has(string(resourceName)) {
if resourceNamesByNUMA.Has(string(resourceName)) {
if !quantity.IsZero() {
reasons = append(reasons, fmt.Sprintf("Insufficient NUMA %s", resourceName))
}
}
}
if len(reasons) > 0 {
return nil, framework.NewStatus(framework.Unschedulable, reasons...).AsError()
result := make([]NUMANodeResource, 0, len(allocatedNUMANodeResources))
for _, numaNodeResource := range allocatedNUMANodeResources {
result = append(result, *numaNodeResource)
}
return result, nil
sort.Slice(result, func(i, j int) bool {
return result[i].Node < result[j].Node
})
return result, reasons
}

func splitQuantity(resourceName corev1.ResourceName, quantity resource.Quantity, numaNodeCount int, options *ResourceOptions) resource.Quantity {
if resourceName != corev1.ResourceCPU {
return *resource.NewQuantity(quantity.Value()/int64(numaNodeCount), quantity.Format)
}
if !options.requestCPUBind {
return *resource.NewMilliQuantity(quantity.MilliValue()/int64(numaNodeCount), quantity.Format)
}
if options.requiredCPUBindPolicy && options.cpuBindPolicy == schedulingconfig.CPUBindPolicyFullPCPUs {
cpusPerCore := int64(options.topologyOptions.CPUTopology.CPUsPerCore())
numOfPCPUs := quantity.Value() / cpusPerCore
numOfPCPUsPerNUMA := numOfPCPUs / int64(numaNodeCount)
return *resource.NewQuantity(numOfPCPUsPerNUMA*cpusPerCore, quantity.Format)
}
return *resource.NewQuantity(quantity.Value()/int64(numaNodeCount), quantity.Format)
}

func allocateRes(available, request resource.Quantity) (resource.Quantity, resource.Quantity, resource.Quantity) {
Expand Down
Loading

0 comments on commit e7624d0

Please sign in to comment.