Skip to content

Commit

Permalink
koordlet: revise node topology reporting for the kubelet cpu manager
Browse files Browse the repository at this point in the history
Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube committed Apr 25, 2024
1 parent af172b1 commit 80a327a
Show file tree
Hide file tree
Showing 7 changed files with 432 additions and 39 deletions.
2 changes: 1 addition & 1 deletion pkg/koordlet/qosmanager/plugins/blkio/blkio_reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (b *blkIOReconcile) getDiskNumberFromVolumeGroup(vgName string) (string, er
// diskNumber: 253:16
func (b *blkIOReconcile) getDiskNumberFromPodVolume(podMeta *statesinformer.PodMeta, volumeName string) (string, error) {
podUUID := podMeta.Pod.UID
mountpoint := filepath.Join(system.Conf.CgroupKubePath, "pods", string(podUUID), "volumes/kubernetes.io~csi", volumeName, "mount")
mountpoint := filepath.Join(system.Conf.VarLibKubeletRootDir, "pods", string(podUUID), "volumes/kubernetes.io~csi", volumeName, "mount")
disk := getDiskByMountPoint(b.storageInfo, mountpoint)
diskNumber := getDiskNumber(b.storageInfo, disk)
if diskNumber == "" {
Expand Down
10 changes: 9 additions & 1 deletion pkg/koordlet/qosmanager/plugins/blkio/blkio_reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
)

func TestBlkIOReconcile_reconcile(t *testing.T) {
helper := system.NewFileTestUtil(t)
sysFSRootDirName := BlkIOReconcileName

testingNodeSLO := newNodeSLO()
Expand Down Expand Up @@ -92,7 +93,14 @@ func TestBlkIOReconcile_reconcile(t *testing.T) {
"/dev/mapper/yoda--pool0-yoda--test1": "yoda-pool0",
"/dev/mapper/yoda--pool0-yoda--test2": "yoda-pool0",
}
system.Conf.CgroupKubePath = KubePath

var oldVarLibKubeletRoot string
helper.SetConf(func(conf *system.Config) {
oldVarLibKubeletRoot = conf.VarLibKubeletRootDir
conf.VarLibKubeletRootDir = KubePath
}, func(conf *system.Config) {
conf.VarLibKubeletRootDir = oldVarLibKubeletRoot
})
mpDiskMap := map[string]string{
fmt.Sprintf("%s/pods/%s/volumes/kubernetes.io~csi/%s/mount", KubePath, pod0.UID, "yoda-87d8625a-dcc9-47bf-a14a-994cf2971193"): "/dev/mapper/yoda--pool0-yoda--87d8625a--dcc9--47bf--a14a--994cf2971193",
fmt.Sprintf("%s/pods/%s/volumes/kubernetes.io~csi/html/mount", KubePath, pod1.UID): "/dev/mapper/yoda--pool0-yoda--test1",
Expand Down
1 change: 1 addition & 0 deletions pkg/koordlet/runtimehooks/hooks/cpuset/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type cpusetRule struct {
sharePools []extension.CPUSharedPool
beSharePools []extension.CPUSharedPool
systemQOSCPUSet string
// TODO: support per-node disable
}

func (r *cpusetRule) getContainerCPUSet(containerReq *protocol.ContainerRequest) (*string, error) {
Expand Down
80 changes: 52 additions & 28 deletions pkg/koordlet/statesinformer/impl/states_noderesourcetopology.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
koordletutil "github.com/koordinator-sh/koordinator/pkg/koordlet/util"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/kubelet"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
"github.com/koordinator-sh/koordinator/pkg/util"
"github.com/koordinator-sh/koordinator/pkg/util/cpuset"
)
Expand Down Expand Up @@ -264,7 +265,22 @@ func (s *nodeTopoInformer) calcNodeTopo() (*nodeTopologyStatus, error) {
}

var cpuManagerPolicy extension.KubeletCPUManagerPolicy
var podAllocs []extension.PodCPUAlloc
topo := kubelet.NewCPUTopology((*koordletutil.LocalCPUInfo)(nodeCPUInfo))
// NOTE: The Koordlet is compatible with the Kubelet static cpu manager. Users can move to the Koordinator's CPU
// orchestration strategy by the following steps:
// 1. When the users want to keep the kubelet static cpu manager in use until the node is offline or removed, they
// can disable the awareness and reporting of the Kubelet cpu manager by setting the DisableQueryKubeletConfig
// to true. So the scheduler can allocate the cpuset cpus including the cpus managed by the kubelet static cpu
// manager. After the node is offline or ready to go to the step 2, the users can reset the
// DisableQueryKubeletConfig to false.
// 2. By default, the koordlet takes over the cpuset cpus for the new pods when DisableQueryKubeletConfig = false.
// The remain cpuset pods managed by the kubelet static cpu manager are reported according to the /configz
// and cpu_manager_state, so the related cpuset cpus are excluded from the cpu allocation of the scheduler.
// For the newly-scheduled cpuset pods, the koordlet follows their cpuset allocation results on the annotations
// that are exclusive to the remaining cpuset cpus managed by the kubelet static cpu manager. The users should
// no longer use the kubelet static cpu manager anymore and should set the policy to "none". After the last pod
// of the static cpu manager policy is terminated, the cpuset cpus will be fully managed by the koordlet.
if s.config != nil && !s.config.DisableQueryKubeletConfig {
kubeletConfiguration, err := s.kubelet.GetKubeletConfiguration()
if err != nil {
Expand All @@ -288,6 +304,12 @@ func (s *nodeTopoInformer) calcNodeTopo() (*nodeTopologyStatus, error) {
// NOTE: We should not remove reservedCPUs from sharedPoolCPUs to
// ensure that Burstable Pods (e.g. Pods request 0C but are limited to 4C)
// at least there are reservedCPUs available when nodes are allocated

// handle cpus allocated by the Kubelet cpu manager
podAllocs, err = s.calKubeletAllocatedCPUs(sharedPoolCPUs)
if err != nil {
return nil, fmt.Errorf("failed to calculate Kubelet allocated cpus, err: %v", err)
}
}

// get NRT topology policy
Expand Down Expand Up @@ -319,28 +341,10 @@ func (s *nodeTopoInformer) calcNodeTopo() (*nodeTopologyStatus, error) {
return nil, fmt.Errorf("failed to marshal system qos resource, error %v", err)
}

// Users can specify the kubelet RootDirectory on the host in the koordlet DaemonSet,
// but inside koordlet it is always mounted to the path /var/lib/kubelet
stateFilePath := kubelet.GetCPUManagerStateFilePath("/var/lib/kubelet")
data, err := os.ReadFile(stateFilePath)
// "null" when the podAllocs is empty
podAllocsJSON, err := json.Marshal(podAllocs)
if err != nil {
if !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to read state file, err: %v", err)
}
}
// TODO: report lse/lsr pod from cgroup
var podAllocsJSON []byte
if len(data) > 0 {
podAllocs, err := s.calGuaranteedCpu(sharedPoolCPUs, string(data))
if err != nil {
return nil, fmt.Errorf("failed to cal GuaranteedCpu, err: %v", err)
}
if len(podAllocs) != 0 {
podAllocsJSON, err = json.Marshal(podAllocs)
if err != nil {
return nil, fmt.Errorf("failed to marshal pod allocs, err: %v", err)
}
}
return nil, fmt.Errorf("failed to marshal pod allocs, err: %v", err)
}

cpuTopologyJSON, err := json.Marshal(cpuTopology)
Expand All @@ -367,23 +371,21 @@ func (s *nodeTopoInformer) calcNodeTopo() (*nodeTopologyStatus, error) {
return nil, fmt.Errorf("failed to marshal be cpushare pools for node, err: %v", err)
}

annotations := map[string]string{
nodeTopoStatus.Annotations = map[string]string{
extension.AnnotationCPUBasicInfo: string(cpuBasicInfoJSON),
extension.AnnotationNodeCPUTopology: string(cpuTopologyJSON),
extension.AnnotationNodeCPUSharedPools: string(lsCPUSharePoolsJSON),
extension.AnnotationKubeletCPUManagerPolicy: string(cpuManagerPolicyJSON),
extension.AnnotationNodeBECPUSharedPools: string(beCPUSharePoolsJSON),
extension.AnnotationNodeCPUAllocs: string(podAllocsJSON),
}
if len(podAllocsJSON) != 0 {
annotations[extension.AnnotationNodeCPUAllocs] = string(podAllocsJSON)
}
// set optional annotations
if len(reservedJson) != 0 {
annotations[extension.AnnotationNodeReservation] = string(reservedJson)
nodeTopoStatus.Annotations[extension.AnnotationNodeReservation] = string(reservedJson)
}
if len(systemQOSJson) != 0 {
annotations[extension.AnnotationNodeSystemQOSResource] = string(systemQOSJson)
nodeTopoStatus.Annotations[extension.AnnotationNodeSystemQOSResource] = string(systemQOSJson)
}
nodeTopoStatus.Annotations = annotations

klog.V(6).Infof("calculate node topology status: %+v", nodeTopoStatus)
return nodeTopoStatus, nil
Expand Down Expand Up @@ -480,6 +482,7 @@ func (s *nodeTopoInformer) calGuaranteedCpu(usedCPUs map[int32]*extension.CPUInf
}

var podAllocs []extension.PodCPUAlloc
// entries can be empty when the kubelet cpu manager policy is none
for podUID := range checkpoint.Entries {
if _, ok := managedPods[types.UID(podUID)]; ok {
continue
Expand Down Expand Up @@ -827,6 +830,27 @@ func (s *nodeTopoInformer) calTopologyZoneList(nodeCPUInfo *metriccache.NodeCPUI
return zoneList, nil
}

func (s *nodeTopoInformer) calKubeletAllocatedCPUs(sharePoolCPUs map[int32]*extension.CPUInfo) ([]extension.PodCPUAlloc, error) {
// Users can specify the kubelet RootDirectory on the host in the koordlet DaemonSet,
// inside koordlet it is mounted to the path /var/lib/kubelet by default.
stateFilePath := kubelet.GetCPUManagerStateFilePath(system.Conf.VarLibKubeletRootDir)
data, err := os.ReadFile(stateFilePath)
if err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to read state file, err: %v", err)
}
if err != nil || len(data) <= 0 {
klog.Warningf("failed to read state file, cpu_manager_state empty or not exist, err: %s", err)
return nil, nil
}

// TODO: report lse/lsr pod from cgroup
podAllocs, err := s.calGuaranteedCpu(sharePoolCPUs, string(data))
if err != nil {
return nil, fmt.Errorf("failed to cal GuaranteedCpu, err: %v", err)
}
return podAllocs, nil
}

func (s *nodeTopoInformer) updateNodeTopo(newTopo *v1alpha1.NodeResourceTopology) {
s.setNodeTopo(newTopo)
klog.V(5).Infof("local node topology info updated %v", newTopo)
Expand Down
Loading

0 comments on commit 80a327a

Please sign in to comment.