Skip to content

Commit

Permalink
koordlet: add metrics for kubelet and resource executor
Browse files Browse the repository at this point in the history
Signed-off-by: 佑祎 <zzw261520@alibaba-inc.com>
  • Loading branch information
zwzhang0107 committed Feb 22, 2024
1 parent b634779 commit 2c50351
Show file tree
Hide file tree
Showing 14 changed files with 202 additions and 66 deletions.
2 changes: 2 additions & 0 deletions pkg/koordlet/metrics/internal_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ func init() {
internalMustRegister(CPUBurstCollector...)
internalMustRegister(PredictionCollectors...)
internalMustRegister(CoreSchedCollector...)
internalMustRegister(ResourceExecutorCollector...)
internalMustRegister(KubeletStubCollector...)
}
59 changes: 59 additions & 0 deletions pkg/koordlet/metrics/kubelet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"time"

"github.com/prometheus/client_golang/prometheus"
)

const (
HTTPVerbKey = "verb"
HTTPPathKey = "path"
HTTPCodeKey = "code"
)

const (
HTTPVerbGet = "get"
)

var (
kubeletRequestDurationSeconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: KoordletSubsystem,
Name: "kubelet_request_duration_seconds",
Help: "kubelet http request duration in seconds",
// 10s ~ 4s, /config <= 4ms, /pods <= 16ms
Buckets: prometheus.ExponentialBuckets(0.001, 4, 7),
},
[]string{HTTPVerbKey, HTTPPathKey, HTTPCodeKey},
)

KubeletStubCollector = []prometheus.Collector{
kubeletRequestDurationSeconds,
}
)

// RecordKubeletRequestDuration records the duration of kubelet http request
func RecordKubeletRequestDuration(verb, path, code string, seconds float64) {
kubeletRequestDurationSeconds.WithLabelValues(verb, path, code).Observe(seconds)
}

func SinceInSeconds(start time.Time) float64 {
return time.Since(start).Seconds()
}
22 changes: 11 additions & 11 deletions pkg/koordlet/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

apiext "github.com/koordinator-sh/koordinator/apis/extension"
slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
"github.com/koordinator-sh/koordinator/pkg/util"
)

Expand Down Expand Up @@ -77,45 +77,45 @@ func TestCommonCollectors(t *testing.T) {
UID: "test01",
},
}
testingPSI := &resourceexecutor.PSIByResource{
CPU: resourceexecutor.PSIStats{
Some: &resourceexecutor.PSILine{
testingPSI := &system.PSIByResource{
CPU: system.PSIStats{
Some: &system.PSILine{
Avg10: 1,
Avg60: 1,
Avg300: 1,
Total: 1,
},
Full: &resourceexecutor.PSILine{
Full: &system.PSILine{
Avg10: 1,
Avg60: 1,
Avg300: 1,
Total: 1,
},
FullSupported: true,
},
Mem: resourceexecutor.PSIStats{
Some: &resourceexecutor.PSILine{
Mem: system.PSIStats{
Some: &system.PSILine{
Avg10: 1,
Avg60: 1,
Avg300: 1,
Total: 1,
},
Full: &resourceexecutor.PSILine{
Full: &system.PSILine{
Avg10: 1,
Avg60: 1,
Avg300: 1,
Total: 1,
},
FullSupported: true,
},
IO: resourceexecutor.PSIStats{
Some: &resourceexecutor.PSILine{
IO: system.PSIStats{
Some: &system.PSILine{
Avg10: 1,
Avg60: 1,
Avg300: 1,
Total: 1,
},
Full: &resourceexecutor.PSILine{
Full: &system.PSILine{
Avg10: 1,
Avg60: 1,
Avg300: 1,
Expand Down
10 changes: 5 additions & 5 deletions pkg/koordlet/metrics/psi.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
corev1 "k8s.io/api/core/v1"

"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
)

const (
Expand Down Expand Up @@ -73,15 +73,15 @@ type PSIRecord struct {
CPUFullSupported bool
}

func getPSIRecords(psi *resourceexecutor.PSIByResource) []PSIRecord {
func getPSIRecords(psi *system.PSIByResource) []PSIRecord {
var psiRecordAll []PSIRecord
psiRecordAll = append(psiRecordAll, makePSIRecordSlice(ResourceTypeCPU, psi.CPU)...)
psiRecordAll = append(psiRecordAll, makePSIRecordSlice(ResourceTypeMem, psi.Mem)...)
psiRecordAll = append(psiRecordAll, makePSIRecordSlice(ResourceTypeIO, psi.IO)...)
return psiRecordAll
}

func makePSIRecordSlice(resourceType string, psiStats resourceexecutor.PSIStats) []PSIRecord {
func makePSIRecordSlice(resourceType string, psiStats system.PSIStats) []PSIRecord {
records := []PSIRecord{
{
ResourceType: resourceType,
Expand Down Expand Up @@ -134,7 +134,7 @@ func makePSIRecordSlice(resourceType string, psiStats resourceexecutor.PSIStats)
return records
}

func RecordContainerPSI(status *corev1.ContainerStatus, pod *corev1.Pod, psi *resourceexecutor.PSIByResource) {
func RecordContainerPSI(status *corev1.ContainerStatus, pod *corev1.Pod, psi *system.PSIByResource) {
psiRecords := getPSIRecords(psi)
for _, record := range psiRecords {
labels := genNodeLabels()
Expand All @@ -155,7 +155,7 @@ func RecordContainerPSI(status *corev1.ContainerStatus, pod *corev1.Pod, psi *re
}
}

func RecordPodPSI(pod *corev1.Pod, psi *resourceexecutor.PSIByResource) {
func RecordPodPSI(pod *corev1.Pod, psi *system.PSIByResource) {
psiRecords := getPSIRecords(psi)
for _, record := range psiRecords {
labels := genNodeLabels()
Expand Down
22 changes: 11 additions & 11 deletions pkg/koordlet/metrics/psi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,24 @@ import (

"github.com/stretchr/testify/assert"

"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
)

func TestGetPSIRecords(t *testing.T) {
testingRecords := &resourceexecutor.PSIByResource{
CPU: resourceexecutor.PSIStats{
Some: &resourceexecutor.PSILine{},
Full: &resourceexecutor.PSILine{},
testingRecords := &system.PSIByResource{
CPU: system.PSIStats{
Some: &system.PSILine{},
Full: &system.PSILine{},
FullSupported: true,
},
Mem: resourceexecutor.PSIStats{
Some: &resourceexecutor.PSILine{},
Full: &resourceexecutor.PSILine{},
Mem: system.PSIStats{
Some: &system.PSILine{},
Full: &system.PSILine{},
FullSupported: true,
},
IO: resourceexecutor.PSIStats{
Some: &resourceexecutor.PSILine{},
Full: &resourceexecutor.PSILine{},
IO: system.PSIStats{
Some: &system.PSILine{},
Full: &system.PSILine{},
FullSupported: true,
},
}
Expand Down
51 changes: 51 additions & 0 deletions pkg/koordlet/metrics/resource_executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"github.com/prometheus/client_golang/prometheus"
)

const (
// ResourceUpdaterType represents the type of resource udpater, including cgroup files, resctrl files, etc
ResourceUpdaterType = "type"
// ResourceUpdateStatusKey represents the status of resource update
ResourceUpdateStatusKey = "status"
)

const (
ResourceUpdateStatusSuccess = "success"
ResourceUpdateStatusFailed = "failed"
)

var (
resourceUpdateDurationMilliSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: KoordletSubsystem,
Name: "resource_update_duration_milliseconds",
Help: "time duration of resource update such as cgroup files",
// 10us ~ 10.24ms, cgroup <= 40us
Buckets: prometheus.ExponentialBuckets(0.01, 4, 6),
}, []string{ResourceUpdaterType, ResourceUpdateStatusKey})

ResourceExecutorCollector = []prometheus.Collector{
resourceUpdateDurationMilliSeconds,
}
)

func RecordResourceUpdateDuration(updaterType, status string, seconds float64) {
resourceUpdateDurationMilliSeconds.WithLabelValues(updaterType, status).Observe(seconds * 1000)
}
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,8 @@ func mockLSPod() *corev1.Pod {
// Mem: /sys/fs/cgroup/cpu/kubepods.slice/kubepods-burstable.slice/kubepods-pod7712555c_ce62_454a_9e18_9ff0217b8941.slice/memory.pressure
// IO: /sys/fs/cgroup/cpu/kubepods.slice/kubepods-burstable.slice/kubepods-pod7712555c_ce62_454a_9e18_9ff0217b8941.slice/io.pressure
// }
func getPodCgroupCPUAcctPSIPath(podParentDir string) resourceexecutor.PSIPath {
return resourceexecutor.PSIPath{
func getPodCgroupCPUAcctPSIPath(podParentDir string) system.PSIPath {
return system.PSIPath{
CPU: system.GetCgroupFilePath(podParentDir, system.CPUAcctCPUPressure),
Mem: system.GetCgroupFilePath(podParentDir, system.CPUAcctMemoryPressure),
IO: system.GetCgroupFilePath(podParentDir, system.CPUAcctIOPressure),
Expand Down
19 changes: 13 additions & 6 deletions pkg/koordlet/resourceexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/pkg/koordlet/metrics"
sysutil "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
"github.com/koordinator-sh/koordinator/pkg/util/cache"
)
Expand Down Expand Up @@ -220,30 +221,36 @@ func (e *ResourceUpdateExecutorImpl) needUpdate(updater ResourceUpdater) bool {
}

func (e *ResourceUpdateExecutorImpl) update(updater ResourceUpdater) error {
start := time.Now()
err := updater.update()
if err != nil && e.isUpdateErrIgnored(err) {
klog.V(5).Infof("failed to update resource %s to %v, ignored err: %v", updater.Key(), updater.Value(), err)
return nil
}
if err != nil {
if err != nil && !e.isUpdateErrIgnored(err) {
metrics.RecordResourceUpdateDuration(updater.Name(), metrics.ResourceUpdateStatusFailed, metrics.SinceInSeconds(start))
klog.V(5).Infof("failed to update resource %s to %v, err: %v", updater.Key(), updater.Value(), err)
return err
} else if err != nil {
// error can be ignored
klog.V(5).Infof("failed to update resource %s to %v, ignored err: %v", updater.Key(), updater.Value(), err)
} else {
metrics.RecordResourceUpdateDuration(updater.Name(), metrics.ResourceUpdateStatusSuccess, metrics.SinceInSeconds(start))
klog.V(6).Infof("successfully update resource %s to %v", updater.Key(), updater.Value())
}
klog.V(6).Infof("successfully update resource %s to %v", updater.Key(), updater.Value())
return nil
}

func (e *ResourceUpdateExecutorImpl) updateByCache(updater ResourceUpdater) (bool, error) {
if e.needUpdate(updater) {
start := time.Now()
err := updater.update()
if err != nil && e.isUpdateErrIgnored(err) {
klog.V(5).Infof("failed to cacheable update resource %s to %v, ignored err: %v", updater.Key(), updater.Value(), err)
return false, nil
}
if err != nil {
metrics.RecordResourceUpdateDuration(updater.Name(), metrics.ResourceUpdateStatusFailed, metrics.SinceInSeconds(start))
klog.V(5).Infof("failed to cacheable update resource %s to %v, err: %v", updater.Key(), updater.Value(), err)
return false, err
}
metrics.RecordResourceUpdateDuration(updater.Name(), metrics.ResourceUpdateStatusSuccess, metrics.SinceInSeconds(start))
updater.UpdateLastUpdateTimestamp(time.Now())
err = e.ResourceCache.SetDefault(updater.Key(), updater)
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions pkg/koordlet/resourceexecutor/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type CgroupReader interface {
ReadMemoryNumaStat(parentDir string) ([]sysutil.NumaMemoryPages, error)
ReadCPUTasks(parentDir string) ([]int32, error)
ReadCPUProcs(parentDir string) ([]uint32, error)
ReadPSI(parentDir string) (*PSIByResource, error)
ReadPSI(parentDir string) (*sysutil.PSIByResource, error)
ReadMemoryColdPageUsage(parentDir string) (uint64, error)
}

Expand Down Expand Up @@ -204,7 +204,7 @@ func (r *CgroupV1Reader) ReadCPUProcs(parentDir string) ([]uint32, error) {
return sysutil.ParseCgroupProcs(s)
}

func (r *CgroupV1Reader) ReadPSI(parentDir string) (*PSIByResource, error) {
func (r *CgroupV1Reader) ReadPSI(parentDir string) (*sysutil.PSIByResource, error) {
cpuPressureResource, ok := sysutil.DefaultRegistry.Get(sysutil.CgroupVersionV1, sysutil.CPUAcctCPUPressureName)
if !ok {
return nil, ErrResourceNotRegistered
Expand All @@ -218,12 +218,12 @@ func (r *CgroupV1Reader) ReadPSI(parentDir string) (*PSIByResource, error) {
return nil, ErrResourceNotRegistered
}

paths := PSIPath{
paths := sysutil.PSIPath{
CPU: cpuPressureResource.Path(parentDir),
Mem: memPressureResource.Path(parentDir),
IO: ioPressureResource.Path(parentDir),
}
psi, err := getPSIByResource(paths)
psi, err := sysutil.GetPSIByResource(paths)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -410,7 +410,7 @@ func (r *CgroupV2Reader) ReadCPUProcs(parentDir string) ([]uint32, error) {
return sysutil.ParseCgroupProcs(s)
}

func (r *CgroupV2Reader) ReadPSI(parentDir string) (*PSIByResource, error) {
func (r *CgroupV2Reader) ReadPSI(parentDir string) (*sysutil.PSIByResource, error) {
cpuPressureResource, ok := sysutil.DefaultRegistry.Get(sysutil.CgroupVersionV2, sysutil.CPUAcctCPUPressureName)
if !ok {
return nil, ErrResourceNotRegistered
Expand All @@ -424,12 +424,12 @@ func (r *CgroupV2Reader) ReadPSI(parentDir string) (*PSIByResource, error) {
return nil, ErrResourceNotRegistered
}

paths := PSIPath{
paths := sysutil.PSIPath{
CPU: cpuPressureResource.Path(parentDir),
Mem: memPressureResource.Path(parentDir),
IO: ioPressureResource.Path(parentDir),
}
psi, err := getPSIByResource(paths)
psi, err := sysutil.GetPSIByResource(paths)
if err != nil {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/koordlet/resourceexecutor/resctrl_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ type ResctrlSchemataResourceUpdater struct {
schemataRaw *sysutil.ResctrlSchemataRaw
}

func (r *ResctrlSchemataResourceUpdater) Name() string {
return "resctrl-schema"
}

func (r *ResctrlSchemataResourceUpdater) Key() string {
return r.schemataRaw.Prefix() + r.file
}
Expand Down
Loading

0 comments on commit 2c50351

Please sign in to comment.