Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

koordlet: revise node topology reporting for the kubelet cpu manager #2015

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/koordlet/qosmanager/plugins/blkio/blkio_reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (b *blkIOReconcile) getDiskNumberFromVolumeGroup(vgName string) (string, er
// diskNumber: 253:16
func (b *blkIOReconcile) getDiskNumberFromPodVolume(podMeta *statesinformer.PodMeta, volumeName string) (string, error) {
podUUID := podMeta.Pod.UID
mountpoint := filepath.Join(system.Conf.CgroupKubePath, "pods", string(podUUID), "volumes/kubernetes.io~csi", volumeName, "mount")
mountpoint := filepath.Join(system.Conf.VarLibKubeletRootDir, "pods", string(podUUID), "volumes/kubernetes.io~csi", volumeName, "mount")
disk := getDiskByMountPoint(b.storageInfo, mountpoint)
diskNumber := getDiskNumber(b.storageInfo, disk)
if diskNumber == "" {
Expand Down
10 changes: 9 additions & 1 deletion pkg/koordlet/qosmanager/plugins/blkio/blkio_reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
)

func TestBlkIOReconcile_reconcile(t *testing.T) {
helper := system.NewFileTestUtil(t)
sysFSRootDirName := BlkIOReconcileName

testingNodeSLO := newNodeSLO()
Expand Down Expand Up @@ -92,7 +93,14 @@ func TestBlkIOReconcile_reconcile(t *testing.T) {
"/dev/mapper/yoda--pool0-yoda--test1": "yoda-pool0",
"/dev/mapper/yoda--pool0-yoda--test2": "yoda-pool0",
}
system.Conf.CgroupKubePath = KubePath

var oldVarLibKubeletRoot string
helper.SetConf(func(conf *system.Config) {
oldVarLibKubeletRoot = conf.VarLibKubeletRootDir
conf.VarLibKubeletRootDir = KubePath
}, func(conf *system.Config) {
conf.VarLibKubeletRootDir = oldVarLibKubeletRoot
})
mpDiskMap := map[string]string{
fmt.Sprintf("%s/pods/%s/volumes/kubernetes.io~csi/%s/mount", KubePath, pod0.UID, "yoda-87d8625a-dcc9-47bf-a14a-994cf2971193"): "/dev/mapper/yoda--pool0-yoda--87d8625a--dcc9--47bf--a14a--994cf2971193",
fmt.Sprintf("%s/pods/%s/volumes/kubernetes.io~csi/html/mount", KubePath, pod1.UID): "/dev/mapper/yoda--pool0-yoda--test1",
Expand Down
1 change: 1 addition & 0 deletions pkg/koordlet/runtimehooks/hooks/cpuset/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type cpusetRule struct {
sharePools []extension.CPUSharedPool
beSharePools []extension.CPUSharedPool
systemQOSCPUSet string
// TODO: support per-node disable
}

func (r *cpusetRule) getContainerCPUSet(containerReq *protocol.ContainerRequest) (*string, error) {
Expand Down
80 changes: 52 additions & 28 deletions pkg/koordlet/statesinformer/impl/states_noderesourcetopology.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
koordletutil "github.com/koordinator-sh/koordinator/pkg/koordlet/util"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/kubelet"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
"github.com/koordinator-sh/koordinator/pkg/util"
"github.com/koordinator-sh/koordinator/pkg/util/cpuset"
)
Expand Down Expand Up @@ -264,7 +265,22 @@ func (s *nodeTopoInformer) calcNodeTopo() (*nodeTopologyStatus, error) {
}

var cpuManagerPolicy extension.KubeletCPUManagerPolicy
var podAllocs []extension.PodCPUAlloc
topo := kubelet.NewCPUTopology((*koordletutil.LocalCPUInfo)(nodeCPUInfo))
// NOTE: The Koordlet is compatible with the Kubelet static cpu manager. Users can move to the Koordinator's CPU
// orchestration strategy by the following steps:
// 1. When the users want to keep the kubelet static cpu manager in use until the node is offline or removed, they
// can disable the awareness and reporting of the Kubelet cpu manager by setting the DisableQueryKubeletConfig
// to true. So the scheduler can allocate the cpuset cpus including the cpus managed by the kubelet static cpu
// manager. After the node is offline or ready to go to the step 2, the users can reset the
// DisableQueryKubeletConfig to false.
// 2. By default, the koordlet takes over the cpuset cpus for the new pods when DisableQueryKubeletConfig = false.
// The remain cpuset pods managed by the kubelet static cpu manager are reported according to the /configz
// and cpu_manager_state, so the related cpuset cpus are excluded from the cpu allocation of the scheduler.
// For the newly-scheduled cpuset pods, the koordlet follows their cpuset allocation results on the annotations
// that are exclusive to the remaining cpuset cpus managed by the kubelet static cpu manager. The users should
// no longer use the kubelet static cpu manager anymore and should set the policy to "none". After the last pod
// of the static cpu manager policy is terminated, the cpuset cpus will be fully managed by the koordlet.
if s.config != nil && !s.config.DisableQueryKubeletConfig {
kubeletConfiguration, err := s.kubelet.GetKubeletConfiguration()
if err != nil {
Expand All @@ -288,6 +304,12 @@ func (s *nodeTopoInformer) calcNodeTopo() (*nodeTopologyStatus, error) {
// NOTE: We should not remove reservedCPUs from sharedPoolCPUs to
// ensure that Burstable Pods (e.g. Pods request 0C but are limited to 4C)
// at least there are reservedCPUs available when nodes are allocated

// handle cpus allocated by the Kubelet cpu manager
podAllocs, err = s.calKubeletAllocatedCPUs(sharedPoolCPUs)
if err != nil {
return nil, fmt.Errorf("failed to calculate Kubelet allocated cpus, err: %v", err)
}
}

// get NRT topology policy
Expand Down Expand Up @@ -319,28 +341,10 @@ func (s *nodeTopoInformer) calcNodeTopo() (*nodeTopologyStatus, error) {
return nil, fmt.Errorf("failed to marshal system qos resource, error %v", err)
}

// Users can specify the kubelet RootDirectory on the host in the koordlet DaemonSet,
// but inside koordlet it is always mounted to the path /var/lib/kubelet
stateFilePath := kubelet.GetCPUManagerStateFilePath("/var/lib/kubelet")
data, err := os.ReadFile(stateFilePath)
// "null" when the podAllocs is empty
podAllocsJSON, err := json.Marshal(podAllocs)
if err != nil {
if !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to read state file, err: %v", err)
}
}
// TODO: report lse/lsr pod from cgroup
var podAllocsJSON []byte
if len(data) > 0 {
podAllocs, err := s.calGuaranteedCpu(sharedPoolCPUs, string(data))
if err != nil {
return nil, fmt.Errorf("failed to cal GuaranteedCpu, err: %v", err)
}
if len(podAllocs) != 0 {
podAllocsJSON, err = json.Marshal(podAllocs)
if err != nil {
return nil, fmt.Errorf("failed to marshal pod allocs, err: %v", err)
}
}
return nil, fmt.Errorf("failed to marshal pod allocs, err: %v", err)
}

cpuTopologyJSON, err := json.Marshal(cpuTopology)
Expand All @@ -367,23 +371,21 @@ func (s *nodeTopoInformer) calcNodeTopo() (*nodeTopologyStatus, error) {
return nil, fmt.Errorf("failed to marshal be cpushare pools for node, err: %v", err)
}

annotations := map[string]string{
nodeTopoStatus.Annotations = map[string]string{
extension.AnnotationCPUBasicInfo: string(cpuBasicInfoJSON),
extension.AnnotationNodeCPUTopology: string(cpuTopologyJSON),
extension.AnnotationNodeCPUSharedPools: string(lsCPUSharePoolsJSON),
extension.AnnotationKubeletCPUManagerPolicy: string(cpuManagerPolicyJSON),
extension.AnnotationNodeBECPUSharedPools: string(beCPUSharePoolsJSON),
extension.AnnotationNodeCPUAllocs: string(podAllocsJSON),
saintube marked this conversation as resolved.
Show resolved Hide resolved
}
if len(podAllocsJSON) != 0 {
annotations[extension.AnnotationNodeCPUAllocs] = string(podAllocsJSON)
}
// set optional annotations
if len(reservedJson) != 0 {
annotations[extension.AnnotationNodeReservation] = string(reservedJson)
nodeTopoStatus.Annotations[extension.AnnotationNodeReservation] = string(reservedJson)
}
if len(systemQOSJson) != 0 {
annotations[extension.AnnotationNodeSystemQOSResource] = string(systemQOSJson)
nodeTopoStatus.Annotations[extension.AnnotationNodeSystemQOSResource] = string(systemQOSJson)
}
nodeTopoStatus.Annotations = annotations

klog.V(6).Infof("calculate node topology status: %+v", nodeTopoStatus)
return nodeTopoStatus, nil
Expand Down Expand Up @@ -480,6 +482,7 @@ func (s *nodeTopoInformer) calGuaranteedCpu(usedCPUs map[int32]*extension.CPUInf
}

var podAllocs []extension.PodCPUAlloc
// entries can be empty when the kubelet cpu manager policy is none
for podUID := range checkpoint.Entries {
if _, ok := managedPods[types.UID(podUID)]; ok {
continue
Expand Down Expand Up @@ -827,6 +830,27 @@ func (s *nodeTopoInformer) calTopologyZoneList(nodeCPUInfo *metriccache.NodeCPUI
return zoneList, nil
}

func (s *nodeTopoInformer) calKubeletAllocatedCPUs(sharePoolCPUs map[int32]*extension.CPUInfo) ([]extension.PodCPUAlloc, error) {
// Users can specify the kubelet RootDirectory on the host in the koordlet DaemonSet,
// inside koordlet it is mounted to the path /var/lib/kubelet by default.
stateFilePath := kubelet.GetCPUManagerStateFilePath(system.Conf.VarLibKubeletRootDir)
data, err := os.ReadFile(stateFilePath)
if err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to read state file, err: %v", err)
}
if err != nil || len(data) <= 0 {
klog.Warningf("failed to read state file, cpu_manager_state empty or not exist, err: %s", err)
return nil, nil
}

// TODO: report lse/lsr pod from cgroup
podAllocs, err := s.calGuaranteedCpu(sharePoolCPUs, string(data))
if err != nil {
return nil, fmt.Errorf("failed to cal GuaranteedCpu, err: %v", err)
}
return podAllocs, nil
}

func (s *nodeTopoInformer) updateNodeTopo(newTopo *v1alpha1.NodeResourceTopology) {
s.setNodeTopo(newTopo)
klog.V(5).Infof("local node topology info updated %v", newTopo)
Expand Down
Loading
Loading