From e58a3a7e081d26b482b8caeeb97d509a0563b0e1 Mon Sep 17 00:00:00 2001 From: saintube Date: Mon, 22 Apr 2024 12:21:29 +0800 Subject: [PATCH] koordlet: revise node topology reporting for the kubelet cpu manager Signed-off-by: saintube --- .../plugins/blkio/blkio_reconcile.go | 2 +- .../plugins/blkio/blkio_reconcile_test.go | 10 +- .../impl/states_noderesourcetopology.go | 70 ++-- .../impl/states_noderesourcetopology_test.go | 364 +++++++++++++++++- pkg/koordlet/util/system/config.go | 10 +- pkg/koordlet/util/system/config_test.go | 4 +- 6 files changed, 422 insertions(+), 38 deletions(-) diff --git a/pkg/koordlet/qosmanager/plugins/blkio/blkio_reconcile.go b/pkg/koordlet/qosmanager/plugins/blkio/blkio_reconcile.go index 17aec7660..8ddad6c16 100644 --- a/pkg/koordlet/qosmanager/plugins/blkio/blkio_reconcile.go +++ b/pkg/koordlet/qosmanager/plugins/blkio/blkio_reconcile.go @@ -296,7 +296,7 @@ func (b *blkIOReconcile) getDiskNumberFromVolumeGroup(vgName string) (string, er // diskNumber: 253:16 func (b *blkIOReconcile) getDiskNumberFromPodVolume(podMeta *statesinformer.PodMeta, volumeName string) (string, error) { podUUID := podMeta.Pod.UID - mountpoint := filepath.Join(system.Conf.CgroupKubePath, "pods", string(podUUID), "volumes/kubernetes.io~csi", volumeName, "mount") + mountpoint := filepath.Join(system.Conf.VarLibKubeletRootDir, "pods", string(podUUID), "volumes/kubernetes.io~csi", volumeName, "mount") disk := getDiskByMountPoint(b.storageInfo, mountpoint) diskNumber := getDiskNumber(b.storageInfo, disk) if diskNumber == "" { diff --git a/pkg/koordlet/qosmanager/plugins/blkio/blkio_reconcile_test.go b/pkg/koordlet/qosmanager/plugins/blkio/blkio_reconcile_test.go index 0664b2e0f..fae05ba4c 100644 --- a/pkg/koordlet/qosmanager/plugins/blkio/blkio_reconcile_test.go +++ b/pkg/koordlet/qosmanager/plugins/blkio/blkio_reconcile_test.go @@ -50,6 +50,7 @@ const ( ) func TestBlkIOReconcile_reconcile(t *testing.T) { + helper := system.NewFileTestUtil(t) sysFSRootDirName := BlkIOReconcileName testingNodeSLO := newNodeSLO() @@ -92,7 +93,14 @@ func TestBlkIOReconcile_reconcile(t *testing.T) { "/dev/mapper/yoda--pool0-yoda--test1": "yoda-pool0", "/dev/mapper/yoda--pool0-yoda--test2": "yoda-pool0", } - system.Conf.CgroupKubePath = KubePath + + var oldVarLibKubeletRoot string + helper.SetConf(func(conf *system.Config) { + oldVarLibKubeletRoot = conf.VarLibKubeletRootDir + conf.VarLibKubeletRootDir = KubePath + }, func(conf *system.Config) { + conf.VarLibKubeletRootDir = oldVarLibKubeletRoot + }) mpDiskMap := map[string]string{ fmt.Sprintf("%s/pods/%s/volumes/kubernetes.io~csi/%s/mount", KubePath, pod0.UID, "yoda-87d8625a-dcc9-47bf-a14a-994cf2971193"): "/dev/mapper/yoda--pool0-yoda--87d8625a--dcc9--47bf--a14a--994cf2971193", fmt.Sprintf("%s/pods/%s/volumes/kubernetes.io~csi/html/mount", KubePath, pod1.UID): "/dev/mapper/yoda--pool0-yoda--test1", diff --git a/pkg/koordlet/statesinformer/impl/states_noderesourcetopology.go b/pkg/koordlet/statesinformer/impl/states_noderesourcetopology.go index a79ae85c7..e765f5bf6 100644 --- a/pkg/koordlet/statesinformer/impl/states_noderesourcetopology.go +++ b/pkg/koordlet/statesinformer/impl/states_noderesourcetopology.go @@ -51,6 +51,7 @@ import ( "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" koordletutil "github.com/koordinator-sh/koordinator/pkg/koordlet/util" "github.com/koordinator-sh/koordinator/pkg/koordlet/util/kubelet" + "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" "github.com/koordinator-sh/koordinator/pkg/util" "github.com/koordinator-sh/koordinator/pkg/util/cpuset" ) @@ -319,28 +320,16 @@ func (s *nodeTopoInformer) calcNodeTopo() (*nodeTopologyStatus, error) { return nil, fmt.Errorf("failed to marshal system qos resource, error %v", err) } - // Users can specify the kubelet RootDirectory on the host in the koordlet DaemonSet, - // but inside koordlet it is always mounted to the path /var/lib/kubelet - stateFilePath := kubelet.GetCPUManagerStateFilePath("/var/lib/kubelet") - data, err := os.ReadFile(stateFilePath) + // handle cpus allocated by the Kubelet cpu manager + var podAllocsJSON []byte + podAllocs, err := s.calKubeletAllocatedCPUs(sharedPoolCPUs) if err != nil { - if !os.IsNotExist(err) { - return nil, fmt.Errorf("failed to read state file, err: %v", err) - } + return nil, fmt.Errorf("failed to calculate Kubelet allocated cpus, err: %v", err) } - // TODO: report lse/lsr pod from cgroup - var podAllocsJSON []byte - if len(data) > 0 { - podAllocs, err := s.calGuaranteedCpu(sharedPoolCPUs, string(data)) - if err != nil { - return nil, fmt.Errorf("failed to cal GuaranteedCpu, err: %v", err) - } - if len(podAllocs) != 0 { - podAllocsJSON, err = json.Marshal(podAllocs) - if err != nil { - return nil, fmt.Errorf("failed to marshal pod allocs, err: %v", err) - } - } + // "null" when the podAllocs is empty + podAllocsJSON, err = json.Marshal(podAllocs) + if err != nil { + return nil, fmt.Errorf("failed to marshal pod allocs, err: %v", err) } cpuTopologyJSON, err := json.Marshal(cpuTopology) @@ -367,23 +356,21 @@ func (s *nodeTopoInformer) calcNodeTopo() (*nodeTopologyStatus, error) { return nil, fmt.Errorf("failed to marshal be cpushare pools for node, err: %v", err) } - annotations := map[string]string{ + nodeTopoStatus.Annotations = map[string]string{ extension.AnnotationCPUBasicInfo: string(cpuBasicInfoJSON), extension.AnnotationNodeCPUTopology: string(cpuTopologyJSON), extension.AnnotationNodeCPUSharedPools: string(lsCPUSharePoolsJSON), extension.AnnotationKubeletCPUManagerPolicy: string(cpuManagerPolicyJSON), extension.AnnotationNodeBECPUSharedPools: string(beCPUSharePoolsJSON), + extension.AnnotationNodeCPUAllocs: string(podAllocsJSON), } - if len(podAllocsJSON) != 0 { - annotations[extension.AnnotationNodeCPUAllocs] = string(podAllocsJSON) - } + // set optional annotations if len(reservedJson) != 0 { - annotations[extension.AnnotationNodeReservation] = string(reservedJson) + nodeTopoStatus.Annotations[extension.AnnotationNodeReservation] = string(reservedJson) } if len(systemQOSJson) != 0 { - annotations[extension.AnnotationNodeSystemQOSResource] = string(systemQOSJson) + nodeTopoStatus.Annotations[extension.AnnotationNodeSystemQOSResource] = string(systemQOSJson) } - nodeTopoStatus.Annotations = annotations klog.V(6).Infof("calculate node topology status: %+v", nodeTopoStatus) return nodeTopoStatus, nil @@ -480,6 +467,7 @@ func (s *nodeTopoInformer) calGuaranteedCpu(usedCPUs map[int32]*extension.CPUInf } var podAllocs []extension.PodCPUAlloc + // entries can be empty when the kubelet cpu manager policy is none for podUID := range checkpoint.Entries { if _, ok := managedPods[types.UID(podUID)]; ok { continue @@ -827,6 +815,34 @@ func (s *nodeTopoInformer) calTopologyZoneList(nodeCPUInfo *metriccache.NodeCPUI return zoneList, nil } +func (s *nodeTopoInformer) calKubeletAllocatedCPUs(sharePoolCPUs map[int32]*extension.CPUInfo) ([]extension.PodCPUAlloc, error) { + // Users can specify the kubelet RootDirectory on the host in the koordlet DaemonSet, + // inside koordlet it is mounted to the path /var/lib/kubelet by default. + stateFilePath := kubelet.GetCPUManagerStateFilePath(system.Conf.VarLibKubeletRootDir) + data, err := os.ReadFile(stateFilePath) + if err != nil && !os.IsNotExist(err) { + return nil, fmt.Errorf("failed to read state file, err: %v", err) + } + if err != nil || len(data) <= 0 { + // NOTE: Normally, the kubelet allocated cpuset is collected and reported by the koordlet, and the scheduler + // won't allocate from these cpus. So the kubelet cpu manager state should be available in this case. However, + // in some special scenarios we accept the cpu manager state is missing and the kubelet allocation is ignored. + // e.g. When pods' cpuset.cpus should be managed by the kubelet cpu manager instead of the koordlet, we expect + // the scheduler can allocate the cpuset cpus allocated by the kubelet cpu manager. For this temporarily usage, + // we can HACK the cpu manager state by adjusting the argument VarLibKubeletRootDir to a wrong path and ignore + // the non-exist error. + klog.Warningf("failed to read state file, cpu_manager_state empty or not exist, err: %s", err) + return nil, nil + } + + // TODO: report lse/lsr pod from cgroup + podAllocs, err := s.calGuaranteedCpu(sharePoolCPUs, string(data)) + if err != nil { + return nil, fmt.Errorf("failed to cal GuaranteedCpu, err: %v", err) + } + return podAllocs, nil +} + func (s *nodeTopoInformer) updateNodeTopo(newTopo *v1alpha1.NodeResourceTopology) { s.setNodeTopo(newTopo) klog.V(5).Infof("local node topology info updated %v", newTopo) diff --git a/pkg/koordlet/statesinformer/impl/states_noderesourcetopology_test.go b/pkg/koordlet/statesinformer/impl/states_noderesourcetopology_test.go index 7da063db1..e22ecf164 100644 --- a/pkg/koordlet/statesinformer/impl/states_noderesourcetopology_test.go +++ b/pkg/koordlet/statesinformer/impl/states_noderesourcetopology_test.go @@ -46,6 +46,7 @@ import ( mock_metriccache "github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache/mockmetriccache" "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" koordletutil "github.com/koordinator-sh/koordinator/pkg/koordlet/util" + "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" "github.com/koordinator-sh/koordinator/pkg/util" ) @@ -306,6 +307,303 @@ func Test_calGuaranteedCpu(t *testing.T) { } } +func Test_calKubeletAllocatedCPUs(t *testing.T) { + testSharePoolCPUs := map[int32]*extension.CPUInfo{ + 0: { + ID: 0, + Core: 0, + Socket: 0, + Node: 0, + }, + 1: { + ID: 1, + Core: 1, + Socket: 0, + Node: 0, + }, + 2: { + ID: 2, + Core: 2, + Socket: 0, + Node: 0, + }, + 3: { + ID: 3, + Core: 3, + Socket: 0, + Node: 0, + }, + 4: { + ID: 4, + Core: 0, + Socket: 0, + Node: 0, + }, + 5: { + ID: 5, + Core: 1, + Socket: 0, + Node: 0, + }, + 6: { + ID: 6, + Core: 2, + Socket: 0, + Node: 0, + }, + 7: { + ID: 7, + Core: 3, + Socket: 0, + Node: 0, + }, + } + type fields struct { + prepareFn func(helper *system.FileTestUtil) + podMap map[string]*statesinformer.PodMeta + } + tests := []struct { + name string + fields fields + arg map[int32]*extension.CPUInfo + wantErr bool + want []extension.PodCPUAlloc + }{ + { + name: "cpu_manager_state not exist", + fields: fields{ + prepareFn: func(helper *system.FileTestUtil) { + var oldVarKubeletLibRoot string + helper.SetConf(func(conf *system.Config) { + oldVarKubeletLibRoot = conf.VarLibKubeletRootDir + conf.VarLibKubeletRootDir = helper.TempDir + }, func(conf *system.Config) { + conf.VarLibKubeletRootDir = oldVarKubeletLibRoot + }) + }, + }, + arg: testSharePoolCPUs, + wantErr: false, + want: nil, + }, + { + name: "cpu manager policy is none", + fields: fields{ + prepareFn: func(helper *system.FileTestUtil) { + var oldVarKubeletLibRoot string + helper.SetConf(func(conf *system.Config) { + oldVarKubeletLibRoot = conf.VarLibKubeletRootDir + conf.VarLibKubeletRootDir = helper.TempDir + }, func(conf *system.Config) { + conf.VarLibKubeletRootDir = oldVarKubeletLibRoot + }) + helper.WriteFileContents("cpu_manager_state", `{"policyName":"none","defaultCpuSet":"","checksum":1000000000}`) + }, + }, + arg: testSharePoolCPUs, + wantErr: false, + want: nil, + }, + { + name: "cpu manager static is static", + fields: fields{ + prepareFn: func(helper *system.FileTestUtil) { + var oldVarKubeletLibRoot string + helper.SetConf(func(conf *system.Config) { + oldVarKubeletLibRoot = conf.VarLibKubeletRootDir + conf.VarLibKubeletRootDir = helper.TempDir + }, func(conf *system.Config) { + conf.VarLibKubeletRootDir = oldVarKubeletLibRoot + }) + helper.WriteFileContents("cpu_manager_state", `{"policyName":"static","defaultCpuSet":"0,2-7","entries":{"static-pod-xxx":{"demo":"1"}},"checksum":1000000000}`) + }, + podMap: map[string]*statesinformer.PodMeta{ + "static-pod": { + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "static-pod", + UID: types.UID("static-pod-xxx"), + }, + }, + }, + "LSPod": { + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-ls-pod", + UID: types.UID("LSPod"), + Labels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSLS), + }, + Annotations: map[string]string{ + extension.AnnotationResourceStatus: `{"cpuset": "3-4"}`, + }, + }, + }, + }, + "BEPod": { + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-be-pod", + UID: types.UID("BEPod"), + Labels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSBE), + }, + }, + }, + }, + "LSRPod": { + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-lsr-pod", + UID: types.UID("LSRPod"), + Labels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSLSR), + }, + Annotations: map[string]string{ + extension.AnnotationResourceStatus: `{"cpuset": "4-5"}`, + }, + }, + }, + }, + "LSEPod": { + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-lse-pod", + UID: types.UID("LSEPod"), + Labels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSLSE), + }, + Annotations: map[string]string{ + extension.AnnotationResourceStatus: `{"cpuset": "5-6"}`, + }, + }, + }, + }, + }, + }, + arg: testSharePoolCPUs, + wantErr: false, + want: []extension.PodCPUAlloc{ + { + Name: "static-pod", + Namespace: "default", + UID: "static-pod-xxx", + CPUSet: "1", + ManagedByKubelet: true, + }, + }, + }, + { + name: "failed to parse cpu manager checkpoint", + fields: fields{ + prepareFn: func(helper *system.FileTestUtil) { + var oldVarKubeletLibRoot string + helper.SetConf(func(conf *system.Config) { + oldVarKubeletLibRoot = conf.VarLibKubeletRootDir + conf.VarLibKubeletRootDir = helper.TempDir + }, func(conf *system.Config) { + conf.VarLibKubeletRootDir = oldVarKubeletLibRoot + }) + helper.WriteFileContents("cpu_manager_state", `invalidContent`) + }, + podMap: map[string]*statesinformer.PodMeta{ + "static-pod": { + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "static-pod", + UID: types.UID("static-pod-xxx"), + }, + }, + }, + "LSPod": { + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-ls-pod", + UID: types.UID("LSPod"), + Labels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSLS), + }, + Annotations: map[string]string{ + extension.AnnotationResourceStatus: `{"cpuset": "3-4"}`, + }, + }, + }, + }, + "BEPod": { + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-be-pod", + UID: types.UID("BEPod"), + Labels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSBE), + }, + }, + }, + }, + "LSRPod": { + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-lsr-pod", + UID: types.UID("LSRPod"), + Labels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSLSR), + }, + Annotations: map[string]string{ + extension.AnnotationResourceStatus: `{"cpuset": "4-5"}`, + }, + }, + }, + }, + "LSEPod": { + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-lse-pod", + UID: types.UID("LSEPod"), + Labels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSLSE), + }, + Annotations: map[string]string{ + extension.AnnotationResourceStatus: `{"cpuset": "5-6"}`, + }, + }, + }, + }, + }, + }, + arg: testSharePoolCPUs, + wantErr: true, + want: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + helper := system.NewFileTestUtil(t) + defer helper.Cleanup() + if tt.fields.prepareFn != nil { + tt.fields.prepareFn(helper) + } + s := &nodeTopoInformer{ + podsInformer: &podsInformer{ + podMap: tt.fields.podMap, + }, + } + got, gotErr := s.calKubeletAllocatedCPUs(tt.arg) + assert.Equal(t, tt.wantErr, gotErr != nil, gotErr) + assert.Equal(t, tt.want, got) + }) + } +} + func Test_reportNodeTopology(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() @@ -436,6 +734,7 @@ func Test_reportNodeTopology(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "pod1", Namespace: "ns1", + UID: "xxx-y1", Labels: map[string]string{ extension.LabelPodQoS: string(extension.QoSLSR), }, @@ -450,6 +749,7 @@ func Test_reportNodeTopology(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "pod2", Namespace: "ns2", + UID: "xxx-y2", Annotations: map[string]string{ extension.LabelPodQoS: string(extension.QoSLSR), extension.AnnotationResourceStatus: `{"cpuset": "3" }`, @@ -460,8 +760,9 @@ func Test_reportNodeTopology(t *testing.T) { "pod3-lse": { Pod: &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: "pod2", - Namespace: "ns2", + Name: "pod3", + Namespace: "ns3", + UID: "xxx-y3", Annotations: map[string]string{ extension.LabelPodQoS: string(extension.QoSLSE), extension.AnnotationResourceStatus: `{"cpuset": "5" }`, @@ -469,11 +770,21 @@ func Test_reportNodeTopology(t *testing.T) { }, }, }, + "pod4-static": { + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod4", + Namespace: "ns4", + UID: "xxx-y4", + }, + }, + }, } mockMetricCache.EXPECT().Get(metriccache.NodeCPUInfoKey).Return(&mockNodeCPUInfo, true).AnyTimes() mockMetricCache.EXPECT().Get(metriccache.NodeNUMAInfoKey).Return(mockNodeNUMAInfo, true).AnyTimes() expectedCPUSharedPool := `[{"socket":0,"node":0,"cpuset":"0-2"},{"socket":1,"node":1,"cpuset":"6-7"}]` + expectedCPUSharedPool1 := `[{"socket":0,"node":0,"cpuset":"0,2"},{"socket":1,"node":1,"cpuset":"6-7"}]` expectedBECPUSharedPool := `[{"socket":0,"node":0,"cpuset":"0-2,3-4"},{"socket":1,"node":1,"cpuset":"6-7"}]` expectedCPUTopology := `{"detail":[{"id":0,"core":0,"socket":0,"node":0},{"id":1,"core":0,"socket":0,"node":0},{"id":2,"core":1,"socket":0,"node":0},{"id":3,"core":1,"socket":0,"node":0},{"id":4,"core":2,"socket":1,"node":1},{"id":5,"core":2,"socket":1,"node":1},{"id":6,"core":3,"socket":1,"node":1},{"id":7,"core":3,"socket":1,"node":1}]}` expectedCPUBasicInfoBytes, err := json.Marshal(mockNodeCPUInfo.BasicInfo) @@ -747,6 +1058,7 @@ func Test_reportNodeTopology(t *testing.T) { tests := []struct { name string + prepareFn func(helper *system.FileTestUtil) config *Config kubeletStub KubeletStub disableCreateTopologyCRD bool @@ -758,6 +1070,7 @@ func Test_reportNodeTopology(t *testing.T) { expectedCPUSharedPool string expectedBECPUSharedPool string expectedCPUTopology string + expectedNodeCPUAllocs string expectedNodeReservation string expectedSystemQOS string expectedTopologyPolicies []string @@ -782,6 +1095,7 @@ func Test_reportNodeTopology(t *testing.T) { expectedCPUSharedPool: expectedCPUSharedPool, expectedBECPUSharedPool: expectedBECPUSharedPool, expectedCPUTopology: expectedCPUTopology, + expectedNodeCPUAllocs: "null", expectedNodeReservation: "{}", expectedSystemQOS: "{}", expectedTopologyPolicies: expectedTopologyPolices, @@ -812,6 +1126,7 @@ func Test_reportNodeTopology(t *testing.T) { expectedCPUBasicInfo: string(expectedCPUBasicInfoBytes), expectedCPUSharedPool: `[{"socket":0,"node":0,"cpuset":"0"},{"socket":1,"node":1,"cpuset":"6"}]`, expectedCPUTopology: expectedCPUTopology, + expectedNodeCPUAllocs: "null", expectedNodeReservation: `{"reservedCPUs":"1-2"}`, expectedSystemQOS: `{"cpuset":"7"}`, expectedTopologyPolicies: expectedTopologyPolices, @@ -837,6 +1152,7 @@ func Test_reportNodeTopology(t *testing.T) { expectedCPUBasicInfo: string(expectedCPUBasicInfoBytes), expectedCPUSharedPool: expectedCPUSharedPool, expectedCPUTopology: expectedCPUTopology, + expectedNodeCPUAllocs: "null", expectedNodeReservation: "{}", expectedSystemQOS: "{}", expectedTopologyPolicies: expectedTopologyPolices, @@ -861,6 +1177,7 @@ func Test_reportNodeTopology(t *testing.T) { expectedCPUBasicInfo: string(expectedCPUBasicInfoBytes), expectedCPUSharedPool: expectedCPUSharedPool, expectedCPUTopology: expectedCPUTopology, + expectedNodeCPUAllocs: "null", expectedNodeReservation: "{}", expectedSystemQOS: "{}", expectedTopologyPolicies: expectedTopologyPolices, @@ -886,6 +1203,7 @@ func Test_reportNodeTopology(t *testing.T) { expectedCPUSharedPool: expectedCPUSharedPool, expectedBECPUSharedPool: expectedBECPUSharedPool, expectedCPUTopology: expectedCPUTopology, + expectedNodeCPUAllocs: "null", expectedNodeReservation: "{}", expectedSystemQOS: "{}", expectedTopologyPolicies: expectedTopologyPolices, @@ -911,14 +1229,55 @@ func Test_reportNodeTopology(t *testing.T) { expectedCPUSharedPool: expectedCPUSharedPool, expectedBECPUSharedPool: expectedBECPUSharedPool, expectedCPUTopology: expectedCPUTopology, + expectedNodeCPUAllocs: "null", expectedNodeReservation: "{}", expectedSystemQOS: "{}", expectedTopologyPolicies: expectedTopologyPolices, expectedZones: mergedZones, }, + { + name: "report topology with kubelet allocated cpus", + prepareFn: func(helper *system.FileTestUtil) { + var oldVarKubeletLibRoot string + helper.SetConf(func(conf *system.Config) { + oldVarKubeletLibRoot = conf.VarLibKubeletRootDir + conf.VarLibKubeletRootDir = helper.TempDir + }, func(conf *system.Config) { + conf.VarLibKubeletRootDir = oldVarKubeletLibRoot + }) + helper.WriteFileContents("cpu_manager_state", `{"policyName":"static","defaultCpuSet":"2-7","entries":{"xxx-y4":{"demo":"1"}},"checksum":1000000000}`) + }, + config: NewDefaultConfig(), + kubeletStub: &testKubeletStub{ + config: &kubeletconfiginternal.KubeletConfiguration{ + CPUManagerPolicy: "static", + KubeReserved: map[string]string{ + "cpu": "2000m", + }, + }, + }, + expectedKubeletCPUManagerPolicy: extension.KubeletCPUManagerPolicy{ + Policy: "static", + ReservedCPUs: "0-1", + }, + expectedCPUBasicInfo: string(expectedCPUBasicInfoBytes), + expectedCPUSharedPool: expectedCPUSharedPool1, + expectedBECPUSharedPool: expectedBECPUSharedPool, + expectedCPUTopology: expectedCPUTopology, + expectedNodeCPUAllocs: `[{"namespace":"ns4","name":"pod4","uid":"xxx-y4","cpuset":"1","managedByKubelet":true}]`, + expectedNodeReservation: "{}", + expectedSystemQOS: "{}", + expectedTopologyPolicies: expectedTopologyPolices, + expectedZones: expectedZones, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + helper := system.NewFileTestUtil(t) + defer helper.Cleanup() + if tt.prepareFn != nil { + tt.prepareFn(helper) + } // prepare feature map enabled := features.DefaultKoordletFeatureGate.Enabled(features.NodeTopologyReport) testFeatureGates := map[string]bool{string(features.NodeTopologyReport): !tt.disableCreateTopologyCRD} @@ -990,6 +1349,7 @@ func Test_reportNodeTopology(t *testing.T) { assert.Equal(t, tt.expectedCPUBasicInfo, topo.Annotations[extension.AnnotationCPUBasicInfo]) assert.Equal(t, tt.expectedCPUSharedPool, topo.Annotations[extension.AnnotationNodeCPUSharedPools]) assert.Equal(t, tt.expectedCPUTopology, topo.Annotations[extension.AnnotationNodeCPUTopology]) + assert.Equal(t, tt.expectedNodeCPUAllocs, topo.Annotations[extension.AnnotationNodeCPUAllocs]) assert.Equal(t, tt.expectedNodeReservation, topo.Annotations[extension.AnnotationNodeReservation]) assert.Equal(t, tt.expectedSystemQOS, topo.Annotations[extension.AnnotationNodeSystemQOSResource]) assert.Equal(t, tt.expectedTopologyPolicies, topo.TopologyPolicies) diff --git a/pkg/koordlet/util/system/config.go b/pkg/koordlet/util/system/config.go index 6ff2bf885..09cf16fb1 100644 --- a/pkg/koordlet/util/system/config.go +++ b/pkg/koordlet/util/system/config.go @@ -36,11 +36,11 @@ var UseCgroupsV2 = atomic.NewBool(false) type Config struct { CgroupRootDir string - CgroupKubePath string SysRootDir string SysFSRootDir string ProcRootDir string VarRunRootDir string + VarLibKubeletRootDir string RunRootDir string RuntimeHooksConfigDir string @@ -76,12 +76,12 @@ func InitSupportConfigs() { func NewHostModeConfig() *Config { return &Config{ - CgroupKubePath: "kubepods/", CgroupRootDir: "/sys/fs/cgroup/", ProcRootDir: "/proc/", SysRootDir: "/sys/", SysFSRootDir: "/sys/fs/", VarRunRootDir: "/var/run/", + VarLibKubeletRootDir: "/var/lib/kubelet/", RunRootDir: "/run/", RuntimeHooksConfigDir: "/etc/runtime/hookserver.d", DefaultRuntimeType: "containerd", @@ -90,13 +90,13 @@ func NewHostModeConfig() *Config { func NewDsModeConfig() *Config { return &Config{ - CgroupKubePath: "kubepods/", - CgroupRootDir: "/host-cgroup/", + CgroupRootDir: "/host-cgroup/", // some dirs are not covered by ns, or unused with `hostPID` is on ProcRootDir: "/proc/", SysRootDir: "/host-sys/", SysFSRootDir: "/host-sys-fs/", VarRunRootDir: "/host-var-run/", + VarLibKubeletRootDir: "/var/lib/kubelet/", RunRootDir: "/host-run/", RuntimeHooksConfigDir: "/host-etc-hookserver/", DefaultRuntimeType: "containerd", @@ -113,9 +113,9 @@ func (c *Config) InitFlags(fs *flag.FlagSet) { fs.StringVar(&c.SysFSRootDir, "sys-fs-root-dir", c.SysFSRootDir, "host /sys/fs dir in container, used by resctrl fs") fs.StringVar(&c.ProcRootDir, "proc-root-dir", c.ProcRootDir, "host /proc dir in container") fs.StringVar(&c.VarRunRootDir, "var-run-root-dir", c.VarRunRootDir, "host /var/run dir in container") + fs.StringVar(&c.VarLibKubeletRootDir, "var-lib-kubelet-dir", c.VarLibKubeletRootDir, "host /var/lib/kubelet dir in container") fs.StringVar(&c.RunRootDir, "run-root-dir", c.RunRootDir, "host /run dir in container") - fs.StringVar(&c.CgroupKubePath, "cgroup-kube-dir", c.CgroupKubePath, "Cgroup kube dir") fs.StringVar(&c.ContainerdEndPoint, "containerd-endpoint", c.ContainerdEndPoint, "containerd endPoint") fs.StringVar(&c.DockerEndPoint, "docker-endpoint", c.DockerEndPoint, "docker endPoint") fs.StringVar(&c.PouchEndpoint, "pouch-endpoint", c.PouchEndpoint, "pouch endPoint") diff --git a/pkg/koordlet/util/system/config_test.go b/pkg/koordlet/util/system/config_test.go index 4de788b70..f5acc0bd7 100644 --- a/pkg/koordlet/util/system/config_test.go +++ b/pkg/koordlet/util/system/config_test.go @@ -24,12 +24,12 @@ import ( func Test_NewDsModeConfig(t *testing.T) { expectConfig := &Config{ - CgroupKubePath: "kubepods/", CgroupRootDir: "/host-cgroup/", ProcRootDir: "/proc/", SysRootDir: "/host-sys/", SysFSRootDir: "/host-sys-fs/", VarRunRootDir: "/host-var-run/", + VarLibKubeletRootDir: "/var/lib/kubelet/", RunRootDir: "/host-run/", RuntimeHooksConfigDir: "/host-etc-hookserver/", DefaultRuntimeType: "containerd", @@ -40,12 +40,12 @@ func Test_NewDsModeConfig(t *testing.T) { func Test_NewHostModeConfig(t *testing.T) { expectConfig := &Config{ - CgroupKubePath: "kubepods/", CgroupRootDir: "/sys/fs/cgroup/", ProcRootDir: "/proc/", SysRootDir: "/sys/", SysFSRootDir: "/sys/fs/", VarRunRootDir: "/var/run/", + VarLibKubeletRootDir: "/var/lib/kubelet/", RunRootDir: "/run/", RuntimeHooksConfigDir: "/etc/runtime/hookserver.d", DefaultRuntimeType: "containerd",