From eabe0769aca1855ae3cc08f4d1433745aa24acd9 Mon Sep 17 00:00:00 2001 From: saintube Date: Tue, 30 Jan 2024 11:58:33 +0800 Subject: [PATCH] koordlet: revise noderesource and podresource when device collector disabled Signed-off-by: saintube --- .../noderesource/node_resource_collector.go | 11 ++++- .../node_resource_collector_test.go | 30 +++++++++++- .../podresource/pod_resource_collector.go | 10 ++++ .../pod_resource_collector_test.go | 48 ++++++++++++++++++- 4 files changed, 94 insertions(+), 5 deletions(-) diff --git a/pkg/koordlet/metricsadvisor/collectors/noderesource/node_resource_collector.go b/pkg/koordlet/metricsadvisor/collectors/noderesource/node_resource_collector.go index 9e02f04bf..75b7d12fd 100644 --- a/pkg/koordlet/metricsadvisor/collectors/noderesource/node_resource_collector.go +++ b/pkg/koordlet/metricsadvisor/collectors/noderesource/node_resource_collector.go @@ -126,8 +126,15 @@ func (n *nodeResourceCollector) collectNodeResUsed() { } nodeMetrics = append(nodeMetrics, cpuUsageMetrics) - for _, deviceCollector := range n.deviceCollectors { - if metric, _ := deviceCollector.GetNodeMetric(); metric != nil { + for name, deviceCollector := range n.deviceCollectors { + if !deviceCollector.Enabled() { + klog.V(6).Infof("skip node metrics from the disabled device collector %s", name) + continue + } + + if metric, err := deviceCollector.GetNodeMetric(); err != nil { + klog.Warningf("get node metrics from the device collector %s failed, err: %s", name, err) + } else { nodeMetrics = append(nodeMetrics, metric...) } if info := deviceCollector.Infos(); info != nil { diff --git a/pkg/koordlet/metricsadvisor/collectors/noderesource/node_resource_collector_test.go b/pkg/koordlet/metricsadvisor/collectors/noderesource/node_resource_collector_test.go index 37d8c452a..307a15950 100644 --- a/pkg/koordlet/metricsadvisor/collectors/noderesource/node_resource_collector_test.go +++ b/pkg/koordlet/metricsadvisor/collectors/noderesource/node_resource_collector_test.go @@ -134,6 +134,7 @@ DirectMap1G: 0 kB`) CPUTick: 0, Timestamp: testLastCPUStatTime, } + testDeviceCollector := &fakeDeviceCollector{isEnabled: true} c := &nodeResourceCollector{ started: atomic.NewBool(false), @@ -141,7 +142,7 @@ DirectMap1G: 0 kB`) metricDB: metricCache, lastNodeCPUStat: testLastCPUStat, deviceCollectors: map[string]framework.DeviceCollector{ - "TestDeviceCollector": &fakeDeviceCollector{}, + "TestDeviceCollector": testDeviceCollector, }, sharedState: framework.NewSharedState(), } @@ -167,6 +168,24 @@ DirectMap1G: 0 kB`) assert.Equal(t, wantCPU, nodeCPU.Value) assert.Equal(t, wantMemory, nodeMemory.Value) + // test collect without device collector + testDeviceCollector.isEnabled = false + testDeviceCollector.getNodeMetric = func() ([]metriccache.MetricSample, error) { + panic("should not be called") + } + assert.NotPanics(t, func() { + c.collectNodeResUsed() + }) + assert.True(t, c.Started()) + // validate collected values + // assert collected time is less than 10s + got, err = testGetNodeMetrics(t, c.metricDB, testNow, 5*time.Second) + wantCPU = testCPUUsage + assert.Equal(t, wantCPU, float64(got.Cpu().MilliValue()/1000)) + // MemTotal - MemAvailable + wantMemory = float64(524288 * 1024) + assert.Equal(t, wantMemory, float64(got.Memory().Value())) + // test first cpu collection c.lastNodeCPUStat = nil assert.NotPanics(t, func() { @@ -185,9 +204,18 @@ DirectMap1G: 0 kB`) type fakeDeviceCollector struct { framework.DeviceCollector + isEnabled bool + getNodeMetric func() ([]metriccache.MetricSample, error) +} + +func (f *fakeDeviceCollector) Enabled() bool { + return f.isEnabled } func (f *fakeDeviceCollector) GetNodeMetric() ([]metriccache.MetricSample, error) { + if f.getNodeMetric != nil { + return f.getNodeMetric() + } return nil, nil } diff --git a/pkg/koordlet/metricsadvisor/collectors/podresource/pod_resource_collector.go b/pkg/koordlet/metricsadvisor/collectors/podresource/pod_resource_collector.go index ff7ad1910..dc8913382 100644 --- a/pkg/koordlet/metricsadvisor/collectors/podresource/pod_resource_collector.go +++ b/pkg/koordlet/metricsadvisor/collectors/podresource/pod_resource_collector.go @@ -164,6 +164,11 @@ func (p *podResourceCollector) collectPodResUsed() { metrics = append(metrics, cpuUsageMetric, memUsageMetric) for deviceName, deviceCollector := range p.deviceCollectors { + if !deviceCollector.Enabled() { + klog.V(6).Infof("skip pod metrics from the disabled device collector %s, pod %s", deviceName, podKey) + continue + } + if deviceMetrics, err := deviceCollector.GetPodMetric(uid, meta.CgroupDir, pod.Status.ContainerStatuses); err != nil { klog.V(4).Infof("get pod %s device usage failed for %v, error: %v", podKey, deviceName, err) } else if len(metrics) > 0 { @@ -262,6 +267,11 @@ func (p *podResourceCollector) collectContainerResUsed(meta *statesinformer.PodM containerMetrics = append(containerMetrics, cpuUsageMetric, memUsageMetric) for deviceName, deviceCollector := range p.deviceCollectors { + if !deviceCollector.Enabled() { + klog.V(6).Infof("skip container metrics from the disabled device collector %s, container %s", deviceName, containerKey) + continue + } + if metrics, err := deviceCollector.GetContainerMetric(containerStat.ContainerID, meta.CgroupDir, containerStat); err != nil { klog.Warningf("get container %s device usage failed for %v, error: %v", containerKey, deviceName, err) } else { diff --git a/pkg/koordlet/metricsadvisor/collectors/podresource/pod_resource_collector_test.go b/pkg/koordlet/metricsadvisor/collectors/podresource/pod_resource_collector_test.go index fc930dc5e..0ce4eb704 100644 --- a/pkg/koordlet/metricsadvisor/collectors/podresource/pod_resource_collector_test.go +++ b/pkg/koordlet/metricsadvisor/collectors/podresource/pod_resource_collector_test.go @@ -78,6 +78,7 @@ func Test_collector_collectPodResUsed(t *testing.T) { } type fields struct { podFilterOption framework.PodFilter + deviceCollectors map[string]framework.DeviceCollector getPodMetas []*statesinformer.PodMeta initPodLastStat func(lastState *gocache.Cache) initContainerLastStat func(lastState *gocache.Cache) @@ -96,6 +97,9 @@ func Test_collector_collectPodResUsed(t *testing.T) { name: "cgroups v1", fields: fields{ podFilterOption: framework.DefaultPodFilter, + deviceCollectors: map[string]framework.DeviceCollector{ + "TestDeviceCollector": &fakeDeviceCollector{isEnabled: true}, + }, getPodMetas: []*statesinformer.PodMeta{ { CgroupDir: testPodMetaDir, @@ -150,6 +154,9 @@ total_unevictable 0 name: "cgroups v2", fields: fields{ podFilterOption: framework.DefaultPodFilter, + deviceCollectors: map[string]framework.DeviceCollector{ + "TestDeviceCollector": &fakeDeviceCollector{isEnabled: true}, + }, getPodMetas: []*statesinformer.PodMeta{ { CgroupDir: testPodMetaDir, @@ -212,9 +219,20 @@ unevictable 0 }, }, { - name: "cgroups v1, filter non-running pods", + name: "cgroups v1, filter non-running pods, skip disabled device collector", fields: fields{ podFilterOption: &framework.TerminatedPodFilter{}, + deviceCollectors: map[string]framework.DeviceCollector{ + "TestDeviceCollector": &fakeDeviceCollector{ + isEnabled: false, + getPodMetric: func(uid, podParentDir string, cs []corev1.ContainerStatus) ([]metriccache.MetricSample, error) { + panic("should not be called") + }, + getContainerMetric: func(uid, podParentDir string, c *corev1.ContainerStatus) ([]metriccache.MetricSample, error) { + panic("should not be called") + }, + }, + }, getPodMetas: []*statesinformer.PodMeta{ { CgroupDir: testPodMetaDir, @@ -304,7 +322,8 @@ total_unevictable 0 }, }) collector.Setup(&framework.Context{ - State: framework.NewSharedState(), + State: framework.NewSharedState(), + DeviceCollectors: tt.fields.deviceCollectors, }) c := collector.(*podResourceCollector) tt.fields.initPodLastStat(c.lastPodCPUStat) @@ -350,3 +369,28 @@ func Test_podResourceCollector_Run(t *testing.T) { close(stopCh) }) } + +type fakeDeviceCollector struct { + framework.DeviceCollector + isEnabled bool + getPodMetric func(uid, podParentDir string, cs []corev1.ContainerStatus) ([]metriccache.MetricSample, error) + getContainerMetric func(uid, podParentDir string, c *corev1.ContainerStatus) ([]metriccache.MetricSample, error) +} + +func (f *fakeDeviceCollector) Enabled() bool { + return f.isEnabled +} + +func (f *fakeDeviceCollector) GetPodMetric(uid, podParentDir string, cs []corev1.ContainerStatus) ([]metriccache.MetricSample, error) { + if f.getPodMetric != nil { + return f.getPodMetric(uid, podParentDir, cs) + } + return nil, nil +} + +func (f *fakeDeviceCollector) GetContainerMetric(containerID, podParentDir string, c *corev1.ContainerStatus) ([]metriccache.MetricSample, error) { + if f.getContainerMetric != nil { + return f.getContainerMetric(containerID, podParentDir, c) + } + return nil, nil +}