Skip to content

Commit

Permalink
koordlet: revise noderesource and podresource when device collector d…
Browse files Browse the repository at this point in the history
…isabled

Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube committed Jan 30, 2024
1 parent 83a18bf commit eabe076
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,15 @@ func (n *nodeResourceCollector) collectNodeResUsed() {
}
nodeMetrics = append(nodeMetrics, cpuUsageMetrics)

for _, deviceCollector := range n.deviceCollectors {
if metric, _ := deviceCollector.GetNodeMetric(); metric != nil {
for name, deviceCollector := range n.deviceCollectors {
if !deviceCollector.Enabled() {
klog.V(6).Infof("skip node metrics from the disabled device collector %s", name)
continue
}

if metric, err := deviceCollector.GetNodeMetric(); err != nil {
klog.Warningf("get node metrics from the device collector %s failed, err: %s", name, err)
} else {
nodeMetrics = append(nodeMetrics, metric...)
}
if info := deviceCollector.Infos(); info != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,15 @@ DirectMap1G: 0 kB`)
CPUTick: 0,
Timestamp: testLastCPUStatTime,
}
testDeviceCollector := &fakeDeviceCollector{isEnabled: true}

c := &nodeResourceCollector{
started: atomic.NewBool(false),
appendableDB: metricCache,
metricDB: metricCache,
lastNodeCPUStat: testLastCPUStat,
deviceCollectors: map[string]framework.DeviceCollector{
"TestDeviceCollector": &fakeDeviceCollector{},
"TestDeviceCollector": testDeviceCollector,
},
sharedState: framework.NewSharedState(),
}
Expand All @@ -167,6 +168,24 @@ DirectMap1G: 0 kB`)
assert.Equal(t, wantCPU, nodeCPU.Value)
assert.Equal(t, wantMemory, nodeMemory.Value)

// test collect without device collector
testDeviceCollector.isEnabled = false
testDeviceCollector.getNodeMetric = func() ([]metriccache.MetricSample, error) {
panic("should not be called")
}
assert.NotPanics(t, func() {
c.collectNodeResUsed()
})
assert.True(t, c.Started())
// validate collected values
// assert collected time is less than 10s
got, err = testGetNodeMetrics(t, c.metricDB, testNow, 5*time.Second)
wantCPU = testCPUUsage
assert.Equal(t, wantCPU, float64(got.Cpu().MilliValue()/1000))
// MemTotal - MemAvailable
wantMemory = float64(524288 * 1024)
assert.Equal(t, wantMemory, float64(got.Memory().Value()))

// test first cpu collection
c.lastNodeCPUStat = nil
assert.NotPanics(t, func() {
Expand All @@ -185,9 +204,18 @@ DirectMap1G: 0 kB`)

type fakeDeviceCollector struct {
framework.DeviceCollector
isEnabled bool
getNodeMetric func() ([]metriccache.MetricSample, error)
}

func (f *fakeDeviceCollector) Enabled() bool {
return f.isEnabled
}

func (f *fakeDeviceCollector) GetNodeMetric() ([]metriccache.MetricSample, error) {
if f.getNodeMetric != nil {
return f.getNodeMetric()
}
return nil, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ func (p *podResourceCollector) collectPodResUsed() {

metrics = append(metrics, cpuUsageMetric, memUsageMetric)
for deviceName, deviceCollector := range p.deviceCollectors {
if !deviceCollector.Enabled() {
klog.V(6).Infof("skip pod metrics from the disabled device collector %s, pod %s", deviceName, podKey)
continue
}

if deviceMetrics, err := deviceCollector.GetPodMetric(uid, meta.CgroupDir, pod.Status.ContainerStatuses); err != nil {
klog.V(4).Infof("get pod %s device usage failed for %v, error: %v", podKey, deviceName, err)
} else if len(metrics) > 0 {
Expand Down Expand Up @@ -262,6 +267,11 @@ func (p *podResourceCollector) collectContainerResUsed(meta *statesinformer.PodM
containerMetrics = append(containerMetrics, cpuUsageMetric, memUsageMetric)

for deviceName, deviceCollector := range p.deviceCollectors {
if !deviceCollector.Enabled() {
klog.V(6).Infof("skip container metrics from the disabled device collector %s, container %s", deviceName, containerKey)
continue
}

if metrics, err := deviceCollector.GetContainerMetric(containerStat.ContainerID, meta.CgroupDir, containerStat); err != nil {
klog.Warningf("get container %s device usage failed for %v, error: %v", containerKey, deviceName, err)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func Test_collector_collectPodResUsed(t *testing.T) {
}
type fields struct {
podFilterOption framework.PodFilter
deviceCollectors map[string]framework.DeviceCollector
getPodMetas []*statesinformer.PodMeta
initPodLastStat func(lastState *gocache.Cache)
initContainerLastStat func(lastState *gocache.Cache)
Expand All @@ -96,6 +97,9 @@ func Test_collector_collectPodResUsed(t *testing.T) {
name: "cgroups v1",
fields: fields{
podFilterOption: framework.DefaultPodFilter,
deviceCollectors: map[string]framework.DeviceCollector{
"TestDeviceCollector": &fakeDeviceCollector{isEnabled: true},
},
getPodMetas: []*statesinformer.PodMeta{
{
CgroupDir: testPodMetaDir,
Expand Down Expand Up @@ -150,6 +154,9 @@ total_unevictable 0
name: "cgroups v2",
fields: fields{
podFilterOption: framework.DefaultPodFilter,
deviceCollectors: map[string]framework.DeviceCollector{
"TestDeviceCollector": &fakeDeviceCollector{isEnabled: true},
},
getPodMetas: []*statesinformer.PodMeta{
{
CgroupDir: testPodMetaDir,
Expand Down Expand Up @@ -212,9 +219,20 @@ unevictable 0
},
},
{
name: "cgroups v1, filter non-running pods",
name: "cgroups v1, filter non-running pods, skip disabled device collector",
fields: fields{
podFilterOption: &framework.TerminatedPodFilter{},
deviceCollectors: map[string]framework.DeviceCollector{
"TestDeviceCollector": &fakeDeviceCollector{
isEnabled: false,
getPodMetric: func(uid, podParentDir string, cs []corev1.ContainerStatus) ([]metriccache.MetricSample, error) {
panic("should not be called")
},
getContainerMetric: func(uid, podParentDir string, c *corev1.ContainerStatus) ([]metriccache.MetricSample, error) {
panic("should not be called")
},
},
},
getPodMetas: []*statesinformer.PodMeta{
{
CgroupDir: testPodMetaDir,
Expand Down Expand Up @@ -304,7 +322,8 @@ total_unevictable 0
},
})
collector.Setup(&framework.Context{
State: framework.NewSharedState(),
State: framework.NewSharedState(),
DeviceCollectors: tt.fields.deviceCollectors,
})
c := collector.(*podResourceCollector)
tt.fields.initPodLastStat(c.lastPodCPUStat)
Expand Down Expand Up @@ -350,3 +369,28 @@ func Test_podResourceCollector_Run(t *testing.T) {
close(stopCh)
})
}

type fakeDeviceCollector struct {
framework.DeviceCollector
isEnabled bool
getPodMetric func(uid, podParentDir string, cs []corev1.ContainerStatus) ([]metriccache.MetricSample, error)
getContainerMetric func(uid, podParentDir string, c *corev1.ContainerStatus) ([]metriccache.MetricSample, error)
}

func (f *fakeDeviceCollector) Enabled() bool {
return f.isEnabled
}

func (f *fakeDeviceCollector) GetPodMetric(uid, podParentDir string, cs []corev1.ContainerStatus) ([]metriccache.MetricSample, error) {
if f.getPodMetric != nil {
return f.getPodMetric(uid, podParentDir, cs)
}
return nil, nil
}

func (f *fakeDeviceCollector) GetContainerMetric(containerID, podParentDir string, c *corev1.ContainerStatus) ([]metriccache.MetricSample, error) {
if f.getContainerMetric != nil {
return f.getContainerMetric(containerID, podParentDir, c)
}
return nil, nil
}

0 comments on commit eabe076

Please sign in to comment.