Skip to content

Commit

Permalink
koordlet: revise system qos and numa aware cpuset (#1793)
Browse files Browse the repository at this point in the history
Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube authored Dec 26, 2023
1 parent 47f6715 commit 265ff9e
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 30 deletions.
5 changes: 5 additions & 0 deletions pkg/koordlet/runtimehooks/hooks/cpuset/cpuset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/utils/pointer"

ext "github.com/koordinator-sh/koordinator/apis/extension"
Expand Down Expand Up @@ -287,6 +289,9 @@ func Test_cpusetPlugin_SetContainerCPUSet(t *testing.T) {
NUMANodeResources: []ext.NUMANodeResource{
{
Node: 0,
Resources: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: *resource.NewQuantity(2, resource.DecimalSI),
},
},
},
},
Expand Down
56 changes: 35 additions & 21 deletions pkg/koordlet/runtimehooks/hooks/cpuset/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/utils/pointer"

ext "github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/features"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/protocol"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
Expand All @@ -35,9 +35,9 @@ import (
)

type cpusetRule struct {
kubeletPolicy ext.KubeletCPUManagerPolicy
sharePools []ext.CPUSharedPool
beSharePools []ext.CPUSharedPool
kubeletPolicy extension.KubeletCPUManagerPolicy
sharePools []extension.CPUSharedPool
beSharePools []extension.CPUSharedPool
systemQOSCPUSet string
}

Expand All @@ -54,14 +54,24 @@ func (r *cpusetRule) getContainerCPUSet(containerReq *protocol.ContainerRequest)
}
podAnnotations := containerReq.PodAnnotations
podLabels := containerReq.PodLabels
podAlloc, err := ext.GetResourceStatus(podAnnotations)
podAlloc, err := extension.GetResourceStatus(podAnnotations)
if err != nil {
return nil, err
}

podQOSClass := ext.GetQoSClassByAttrs(podLabels, podAnnotations)
if len(podAlloc.NUMANodeResources) != 0 {
getCPUFromSharePoolByAllocFn := func(sharePools []ext.CPUSharedPool, alloc *ext.ResourceStatus) string {
podQOSClass := extension.GetQoSClassByAttrs(podLabels, podAnnotations)

// check if numa-aware
isNUMAAware := false
for _, numaNode := range podAlloc.NUMANodeResources {
// check if cpu resource is allocated in numa-level since there can be numa allocation without cpu
if numaNode.Resources != nil && !numaNode.Resources.Cpu().IsZero() {
isNUMAAware = true
break
}
}
if isNUMAAware {
getCPUFromSharePoolByAllocFn := func(sharePools []extension.CPUSharedPool, alloc *extension.ResourceStatus) string {
cpusetList := make([]string, 0, len(alloc.NUMANodeResources))
for _, numaNode := range alloc.NUMANodeResources {
for _, nodeSharePool := range sharePools {
Expand All @@ -72,13 +82,13 @@ func (r *cpusetRule) getContainerCPUSet(containerReq *protocol.ContainerRequest)
}
return strings.Join(cpusetList, ",")
}
if podQOSClass == ext.QoSBE && features.DefaultKoordletFeatureGate.Enabled(features.BECPUManager) {
if podQOSClass == extension.QoSBE && features.DefaultKoordletFeatureGate.Enabled(features.BECPUManager) {
// BE pods which have specified cpu share pool
cpuSetStr := getCPUFromSharePoolByAllocFn(r.beSharePools, podAlloc)
klog.V(6).Infof("get cpuset from specified be cpushare pool for container %v/%v",
containerReq.PodMeta.String(), containerReq.ContainerMeta.Name)
return pointer.String(cpuSetStr), nil
} else if podQOSClass != ext.QoSBE {
} else if podQOSClass != extension.QoSBE {
// LS pods which have specified cpu share pool
cpuSetStr := getCPUFromSharePoolByAllocFn(r.sharePools, podAlloc)
klog.V(6).Infof("get cpuset from specified cpushare pool for container %v/%v",
Expand All @@ -87,16 +97,20 @@ func (r *cpusetRule) getContainerCPUSet(containerReq *protocol.ContainerRequest)
}
}

// SYSTEM QoS cpuset
// TBD: support numa-aware
if podQOSClass == extension.QoSSystem && len(r.systemQOSCPUSet) > 0 {
klog.V(6).Infof("get cpuset from system qos rule for container %s/%s",
containerReq.PodMeta.String(), containerReq.ContainerMeta.Name)
return pointer.String(r.systemQOSCPUSet), nil
}

allSharePoolCPUs := make([]string, 0, len(r.sharePools))
for _, nodeSharePool := range r.sharePools {
allSharePoolCPUs = append(allSharePoolCPUs, nodeSharePool.CPUSet)
}

if podQOSClass == ext.QoSSystem && len(r.systemQOSCPUSet) > 0 {
klog.V(6).Infof("get cpuset from system qos rule for container %v/%v",
containerReq.PodMeta.String(), containerReq.ContainerMeta.Name)
return pointer.String(r.systemQOSCPUSet), nil
} else if podQOSClass == ext.QoSLS {
if podQOSClass == extension.QoSLS {
// LS pods use all share pool
klog.V(6).Infof("get cpuset from all share pool for container %v/%v",
containerReq.PodMeta.String(), containerReq.ContainerMeta.Name)
Expand All @@ -113,7 +127,7 @@ func (r *cpusetRule) getContainerCPUSet(containerReq *protocol.ContainerRequest)
return pointer.String(""), nil
}

if r.kubeletPolicy.Policy == ext.KubeletCPUManagerPolicyStatic {
if r.kubeletPolicy.Policy == extension.KubeletCPUManagerPolicyStatic {
klog.V(6).Infof("get empty cpuset if kubelet is static policy for container %v/%v",
containerReq.PodMeta.String(), containerReq.ContainerMeta.Name)
return nil, nil
Expand All @@ -129,7 +143,7 @@ func (r *cpusetRule) getHostAppCpuset(hostAppReq *protocol.HostAppRequest) (*str
if hostAppReq == nil {
return nil, nil
}
if hostAppReq.QOSClass != ext.QoSLS {
if hostAppReq.QOSClass != extension.QoSLS {
return nil, fmt.Errorf("only LS is supported for host application %v", hostAppReq.Name)
}
allSharePoolCPUs := make([]string, 0, len(r.sharePools))
Expand All @@ -146,21 +160,21 @@ func (p *cpusetPlugin) parseRule(nodeTopoIf interface{}) (bool, error) {
return false, fmt.Errorf("parse format for hook plugin %v failed, expect: %v, got: %T",
name, "*topov1alpha1.NodeResourceTopology", nodeTopoIf)
}
cpuSharePools, err := ext.GetNodeCPUSharePools(nodeTopo.Annotations)
cpuSharePools, err := extension.GetNodeCPUSharePools(nodeTopo.Annotations)
if err != nil {
return false, err
}
beCPUSharePools, err := ext.GetNodeBECPUSharePools(nodeTopo.Annotations)
beCPUSharePools, err := extension.GetNodeBECPUSharePools(nodeTopo.Annotations)
if err != nil {
return false, err
}
cpuManagerPolicy, err := ext.GetKubeletCPUManagerPolicy(nodeTopo.Annotations)
cpuManagerPolicy, err := extension.GetKubeletCPUManagerPolicy(nodeTopo.Annotations)
if err != nil {
return false, err
}

systemQOSCPUSet := ""
systemQOSRes, err := ext.GetSystemQOSResource(nodeTopo.Annotations)
systemQOSRes, err := extension.GetSystemQOSResource(nodeTopo.Annotations)
if err != nil {
return false, err
} else if systemQOSRes != nil {
Expand Down
172 changes: 163 additions & 9 deletions pkg/koordlet/runtimehooks/hooks/cpuset/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
topov1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

Expand All @@ -40,9 +41,10 @@ import (

func Test_cpusetRule_getContainerCPUSet(t *testing.T) {
type fields struct {
kubeletPoicy string
sharePools []ext.CPUSharedPool
beSharePools []ext.CPUSharedPool
kubeletPolicy string
sharePools []ext.CPUSharedPool
beSharePools []ext.CPUSharedPool
systemQOSCPUSet string
}
type args struct {
podAlloc *ext.ResourceStatus
Expand All @@ -57,7 +59,7 @@ func Test_cpusetRule_getContainerCPUSet(t *testing.T) {
wantErr bool
}{
{
name: "get cpuset fqrom bad annotation",
name: "get cpuset from bad annotation",
fields: fields{
sharePools: []ext.CPUSharedPool{
{
Expand Down Expand Up @@ -123,6 +125,9 @@ func Test_cpusetRule_getContainerCPUSet(t *testing.T) {
NUMANodeResources: []ext.NUMANodeResource{
{
Node: 0,
Resources: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: *resource.NewQuantity(2, resource.DecimalSI),
},
},
},
},
Expand Down Expand Up @@ -159,6 +164,9 @@ func Test_cpusetRule_getContainerCPUSet(t *testing.T) {
NUMANodeResources: []ext.NUMANodeResource{
{
Node: 0,
Resources: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: *resource.NewQuantity(2, resource.DecimalSI),
},
},
},
},
Expand Down Expand Up @@ -196,10 +204,50 @@ func Test_cpusetRule_getContainerCPUSet(t *testing.T) {
want: pointer.String("0-7,8-15"),
wantErr: false,
},
{
name: "get all share pools for ls pod with no cpu numa allocation",
fields: fields{
sharePools: []ext.CPUSharedPool{
{
Socket: 0,
Node: 0,
CPUSet: "0-7",
},
{
Socket: 1,
Node: 1,
CPUSet: "8-15",
},
},
},
args: args{
containerReq: &protocol.ContainerRequest{
PodMeta: protocol.PodMeta{},
ContainerMeta: protocol.ContainerMeta{},
PodLabels: map[string]string{
ext.LabelPodQoS: string(ext.QoSLS),
},
PodAnnotations: map[string]string{},
CgroupParent: "burstable/test-pod/test-container",
},
podAlloc: &ext.ResourceStatus{
NUMANodeResources: []ext.NUMANodeResource{
{
Node: 0,
Resources: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceHugePagesPrefix + "1Gi": resource.MustParse("2Gi"),
},
},
},
},
},
want: pointer.String("0-7,8-15"),
wantErr: false,
},
{
name: "get all share pools for origin burstable pod under none policy",
fields: fields{
kubeletPoicy: ext.KubeletCPUManagerPolicyNone,
kubeletPolicy: ext.KubeletCPUManagerPolicyNone,
sharePools: []ext.CPUSharedPool{
{
Socket: 0,
Expand Down Expand Up @@ -228,7 +276,7 @@ func Test_cpusetRule_getContainerCPUSet(t *testing.T) {
{
name: "do nothing for origin burstable pod under static policy",
fields: fields{
kubeletPoicy: ext.KubeletCPUManagerPolicyStatic,
kubeletPolicy: ext.KubeletCPUManagerPolicyStatic,
sharePools: []ext.CPUSharedPool{
{
Socket: 0,
Expand Down Expand Up @@ -282,15 +330,121 @@ func Test_cpusetRule_getContainerCPUSet(t *testing.T) {
want: pointer.String(""),
wantErr: false,
},
{
name: "get cpuset from annotation ls share pool",
fields: fields{
sharePools: []ext.CPUSharedPool{
{
Socket: 0,
Node: 0,
CPUSet: "1-7",
},
{
Socket: 1,
Node: 1,
CPUSet: "9-15",
},
},
beSharePools: []ext.CPUSharedPool{
{
Socket: 0,
Node: 0,
CPUSet: "0-7",
},
{
Socket: 1,
Node: 1,
CPUSet: "8-15",
},
},
},
args: args{
containerReq: &protocol.ContainerRequest{
PodMeta: protocol.PodMeta{},
ContainerMeta: protocol.ContainerMeta{},
PodLabels: map[string]string{
ext.LabelPodQoS: string(ext.QoSLS),
},
PodAnnotations: map[string]string{},
CgroupParent: "burstable/test-pod/test-container",
},
podAlloc: &ext.ResourceStatus{
NUMANodeResources: []ext.NUMANodeResource{
{
Node: 1,
Resources: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: *resource.NewQuantity(2, resource.DecimalSI),
},
},
},
},
},
want: pointer.String("9-15"),
wantErr: false,
},
{
name: "get cpuset from annotation system qos resource",
fields: fields{
sharePools: []ext.CPUSharedPool{
{
Socket: 0,
Node: 0,
CPUSet: "4-7",
},
{
Socket: 1,
Node: 1,
CPUSet: "9-15",
},
},
beSharePools: []ext.CPUSharedPool{
{
Socket: 0,
Node: 0,
CPUSet: "4-7",
},
{
Socket: 1,
Node: 1,
CPUSet: "8-15",
},
},
systemQOSCPUSet: "0-3",
},
args: args{
containerReq: &protocol.ContainerRequest{
PodMeta: protocol.PodMeta{},
ContainerMeta: protocol.ContainerMeta{},
PodLabels: map[string]string{
ext.LabelPodQoS: string(ext.QoSSystem),
},
PodAnnotations: map[string]string{},
CgroupParent: "burstable/test-pod/test-container",
},
podAlloc: &ext.ResourceStatus{
NUMANodeResources: []ext.NUMANodeResource{
{
Node: 1,
Resources: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceHugePagesPrefix + "1Gi": resource.MustParse("2Gi"),
},
},
},
},
},
want: pointer.String("0-3"),
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &cpusetRule{
kubeletPolicy: ext.KubeletCPUManagerPolicy{
Policy: tt.fields.kubeletPoicy,
Policy: tt.fields.kubeletPolicy,
},
sharePools: tt.fields.sharePools,
beSharePools: tt.fields.beSharePools,
sharePools: tt.fields.sharePools,
beSharePools: tt.fields.beSharePools,
systemQOSCPUSet: tt.fields.systemQOSCPUSet,
}
if tt.args.podAlloc != nil {
podAllocJson := util.DumpJSON(tt.args.podAlloc)
Expand Down

0 comments on commit 265ff9e

Please sign in to comment.