Skip to content

Commit

Permalink
koordlet: revise node topology reporting for the kubelet cpu manager
Browse files Browse the repository at this point in the history
Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube committed Apr 23, 2024
1 parent f85aefc commit e58a3a7
Show file tree
Hide file tree
Showing 6 changed files with 422 additions and 38 deletions.
2 changes: 1 addition & 1 deletion pkg/koordlet/qosmanager/plugins/blkio/blkio_reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (b *blkIOReconcile) getDiskNumberFromVolumeGroup(vgName string) (string, er
// diskNumber: 253:16
func (b *blkIOReconcile) getDiskNumberFromPodVolume(podMeta *statesinformer.PodMeta, volumeName string) (string, error) {
podUUID := podMeta.Pod.UID
mountpoint := filepath.Join(system.Conf.CgroupKubePath, "pods", string(podUUID), "volumes/kubernetes.io~csi", volumeName, "mount")
mountpoint := filepath.Join(system.Conf.VarLibKubeletRootDir, "pods", string(podUUID), "volumes/kubernetes.io~csi", volumeName, "mount")
disk := getDiskByMountPoint(b.storageInfo, mountpoint)
diskNumber := getDiskNumber(b.storageInfo, disk)
if diskNumber == "" {
Expand Down
10 changes: 9 additions & 1 deletion pkg/koordlet/qosmanager/plugins/blkio/blkio_reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
)

func TestBlkIOReconcile_reconcile(t *testing.T) {
helper := system.NewFileTestUtil(t)
sysFSRootDirName := BlkIOReconcileName

testingNodeSLO := newNodeSLO()
Expand Down Expand Up @@ -92,7 +93,14 @@ func TestBlkIOReconcile_reconcile(t *testing.T) {
"/dev/mapper/yoda--pool0-yoda--test1": "yoda-pool0",
"/dev/mapper/yoda--pool0-yoda--test2": "yoda-pool0",
}
system.Conf.CgroupKubePath = KubePath

var oldVarLibKubeletRoot string
helper.SetConf(func(conf *system.Config) {
oldVarLibKubeletRoot = conf.VarLibKubeletRootDir
conf.VarLibKubeletRootDir = KubePath
}, func(conf *system.Config) {
conf.VarLibKubeletRootDir = oldVarLibKubeletRoot
})
mpDiskMap := map[string]string{
fmt.Sprintf("%s/pods/%s/volumes/kubernetes.io~csi/%s/mount", KubePath, pod0.UID, "yoda-87d8625a-dcc9-47bf-a14a-994cf2971193"): "/dev/mapper/yoda--pool0-yoda--87d8625a--dcc9--47bf--a14a--994cf2971193",
fmt.Sprintf("%s/pods/%s/volumes/kubernetes.io~csi/html/mount", KubePath, pod1.UID): "/dev/mapper/yoda--pool0-yoda--test1",
Expand Down
70 changes: 43 additions & 27 deletions pkg/koordlet/statesinformer/impl/states_noderesourcetopology.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
koordletutil "github.com/koordinator-sh/koordinator/pkg/koordlet/util"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/kubelet"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
"github.com/koordinator-sh/koordinator/pkg/util"
"github.com/koordinator-sh/koordinator/pkg/util/cpuset"
)
Expand Down Expand Up @@ -319,28 +320,16 @@ func (s *nodeTopoInformer) calcNodeTopo() (*nodeTopologyStatus, error) {
return nil, fmt.Errorf("failed to marshal system qos resource, error %v", err)
}

// Users can specify the kubelet RootDirectory on the host in the koordlet DaemonSet,
// but inside koordlet it is always mounted to the path /var/lib/kubelet
stateFilePath := kubelet.GetCPUManagerStateFilePath("/var/lib/kubelet")
data, err := os.ReadFile(stateFilePath)
// handle cpus allocated by the Kubelet cpu manager
var podAllocsJSON []byte
podAllocs, err := s.calKubeletAllocatedCPUs(sharedPoolCPUs)
if err != nil {
if !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to read state file, err: %v", err)
}
return nil, fmt.Errorf("failed to calculate Kubelet allocated cpus, err: %v", err)
}
// TODO: report lse/lsr pod from cgroup
var podAllocsJSON []byte
if len(data) > 0 {
podAllocs, err := s.calGuaranteedCpu(sharedPoolCPUs, string(data))
if err != nil {
return nil, fmt.Errorf("failed to cal GuaranteedCpu, err: %v", err)
}
if len(podAllocs) != 0 {
podAllocsJSON, err = json.Marshal(podAllocs)
if err != nil {
return nil, fmt.Errorf("failed to marshal pod allocs, err: %v", err)
}
}
// "null" when the podAllocs is empty
podAllocsJSON, err = json.Marshal(podAllocs)
if err != nil {
return nil, fmt.Errorf("failed to marshal pod allocs, err: %v", err)
}

cpuTopologyJSON, err := json.Marshal(cpuTopology)
Expand All @@ -367,23 +356,21 @@ func (s *nodeTopoInformer) calcNodeTopo() (*nodeTopologyStatus, error) {
return nil, fmt.Errorf("failed to marshal be cpushare pools for node, err: %v", err)
}

annotations := map[string]string{
nodeTopoStatus.Annotations = map[string]string{
extension.AnnotationCPUBasicInfo: string(cpuBasicInfoJSON),
extension.AnnotationNodeCPUTopology: string(cpuTopologyJSON),
extension.AnnotationNodeCPUSharedPools: string(lsCPUSharePoolsJSON),
extension.AnnotationKubeletCPUManagerPolicy: string(cpuManagerPolicyJSON),
extension.AnnotationNodeBECPUSharedPools: string(beCPUSharePoolsJSON),
extension.AnnotationNodeCPUAllocs: string(podAllocsJSON),
}
if len(podAllocsJSON) != 0 {
annotations[extension.AnnotationNodeCPUAllocs] = string(podAllocsJSON)
}
// set optional annotations
if len(reservedJson) != 0 {
annotations[extension.AnnotationNodeReservation] = string(reservedJson)
nodeTopoStatus.Annotations[extension.AnnotationNodeReservation] = string(reservedJson)
}
if len(systemQOSJson) != 0 {
annotations[extension.AnnotationNodeSystemQOSResource] = string(systemQOSJson)
nodeTopoStatus.Annotations[extension.AnnotationNodeSystemQOSResource] = string(systemQOSJson)
}
nodeTopoStatus.Annotations = annotations

klog.V(6).Infof("calculate node topology status: %+v", nodeTopoStatus)
return nodeTopoStatus, nil
Expand Down Expand Up @@ -480,6 +467,7 @@ func (s *nodeTopoInformer) calGuaranteedCpu(usedCPUs map[int32]*extension.CPUInf
}

var podAllocs []extension.PodCPUAlloc
// entries can be empty when the kubelet cpu manager policy is none
for podUID := range checkpoint.Entries {
if _, ok := managedPods[types.UID(podUID)]; ok {
continue
Expand Down Expand Up @@ -827,6 +815,34 @@ func (s *nodeTopoInformer) calTopologyZoneList(nodeCPUInfo *metriccache.NodeCPUI
return zoneList, nil
}

func (s *nodeTopoInformer) calKubeletAllocatedCPUs(sharePoolCPUs map[int32]*extension.CPUInfo) ([]extension.PodCPUAlloc, error) {
// Users can specify the kubelet RootDirectory on the host in the koordlet DaemonSet,
// inside koordlet it is mounted to the path /var/lib/kubelet by default.
stateFilePath := kubelet.GetCPUManagerStateFilePath(system.Conf.VarLibKubeletRootDir)
data, err := os.ReadFile(stateFilePath)
if err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to read state file, err: %v", err)
}
if err != nil || len(data) <= 0 {
// NOTE: Normally, the kubelet allocated cpuset is collected and reported by the koordlet, and the scheduler
// won't allocate from these cpus. So the kubelet cpu manager state should be available in this case. However,
// in some special scenarios we accept the cpu manager state is missing and the kubelet allocation is ignored.
// e.g. When pods' cpuset.cpus should be managed by the kubelet cpu manager instead of the koordlet, we expect
// the scheduler can allocate the cpuset cpus allocated by the kubelet cpu manager. For this temporarily usage,
// we can HACK the cpu manager state by adjusting the argument VarLibKubeletRootDir to a wrong path and ignore
// the non-exist error.
klog.Warningf("failed to read state file, cpu_manager_state empty or not exist, err: %s", err)
return nil, nil
}

// TODO: report lse/lsr pod from cgroup
podAllocs, err := s.calGuaranteedCpu(sharePoolCPUs, string(data))
if err != nil {
return nil, fmt.Errorf("failed to cal GuaranteedCpu, err: %v", err)
}
return podAllocs, nil
}

func (s *nodeTopoInformer) updateNodeTopo(newTopo *v1alpha1.NodeResourceTopology) {
s.setNodeTopo(newTopo)
klog.V(5).Infof("local node topology info updated %v", newTopo)
Expand Down
Loading

0 comments on commit e58a3a7

Please sign in to comment.