Skip to content

Commit

Permalink
koordlet: record invoke metrics for runtime hook (koordinator-sh#1961)
Browse files Browse the repository at this point in the history
Signed-off-by: saintube <saintube@foxmail.com>
Signed-off-by: george <xiangzhihua@gmail.com>
  • Loading branch information
saintube authored and georgexiang committed Apr 15, 2024
1 parent c8a3002 commit 3ed8b5e
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 7 deletions.
1 change: 1 addition & 0 deletions pkg/koordlet/metrics/internal_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ func init() {
internalMustRegister(CoreSchedCollector...)
internalMustRegister(ResourceExecutorCollector...)
internalMustRegister(KubeletStubCollector...)
internalMustRegister(RuntimeHookCollectors...)
}
32 changes: 32 additions & 0 deletions pkg/koordlet/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,35 @@ func TestCoreSchedCollector(t *testing.T) {
testCoreSchedGroup, testCoreSchedCookie)
})
}

func TestRuntimeHookCollector(t *testing.T) {
testingNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node",
Labels: map[string]string{},
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100"),
corev1.ResourceMemory: resource.MustParse("200Gi"),
apiext.BatchCPU: resource.MustParse("50000"),
apiext.BatchMemory: resource.MustParse("80Gi"),
},
Capacity: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100"),
corev1.ResourceMemory: resource.MustParse("200Gi"),
apiext.BatchCPU: resource.MustParse("50000"),
apiext.BatchMemory: resource.MustParse("80Gi"),
},
},
}
testErr := fmt.Errorf("expected error")
t.Run("test", func(t *testing.T) {
Register(testingNode)
defer Register(nil)
RecordRuntimeHookInvokedDurationMilliSeconds("testHook", "testStage", nil, 10.0)
RecordRuntimeHookInvokedDurationMilliSeconds("testHook", "testStage", testErr, 5.0)
RecordRuntimeHookReconcilerInvokedDurationMilliSeconds("pod", "cpu.cfs_quota_us", nil, 10.0)
RecordRuntimeHookReconcilerInvokedDurationMilliSeconds("pod", "cpu.cfs_quota_us", testErr, 5.0)
})
}
83 changes: 83 additions & 0 deletions pkg/koordlet/metrics/runtime_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import "github.com/prometheus/client_golang/prometheus"

const (
// RuntimeHookName represents the hook plugin name of runtime hook.
RuntimeHookName = "hook"
// RuntimeHookStage represents the stage of invoked runtime hook.
RuntimeHookStage = "stage"
// RuntimeHookReconcilerLevel represents the level (e.g. pod-level) of invoked runtime hook reconciler.
RuntimeHookReconcilerLevel = "level"
// RuntimeHookReconcilerResourceType represents the resource type (e.g. cpu.cfs_quota_us) of invoked runtime hook reconciler.
RuntimeHookReconcilerResourceType = "resource_type"
)

var (
runtimeHookInvokedDurationMilliSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: KoordletSubsystem,
Name: "runtime_hook_invoked_duration_milliseconds",
Help: "time duration of invocations of runtime hook plugins",
// 10us ~ 10.24ms, cgroup <= 40us
Buckets: prometheus.ExponentialBuckets(0.01, 4, 8),
}, []string{NodeKey, RuntimeHookName, RuntimeHookStage, StatusKey})

runtimeHookReconcilerInvokedDurationMilliSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: KoordletSubsystem,
Name: "runtime_hook_reconciler_invoked_duration_milliseconds",
Help: "time duration of invocations of runtime hook reconciler plugins",
// 10us ~ 10.24ms, cgroup <= 40us
Buckets: prometheus.ExponentialBuckets(0.01, 4, 8),
}, []string{NodeKey, RuntimeHookReconcilerLevel, RuntimeHookReconcilerResourceType, StatusKey})

RuntimeHookCollectors = []prometheus.Collector{
runtimeHookInvokedDurationMilliSeconds,
runtimeHookReconcilerInvokedDurationMilliSeconds,
}
)

func RecordRuntimeHookInvokedDurationMilliSeconds(hookName, stage string, err error, seconds float64) {
labels := genNodeLabels()
if labels == nil {
return
}
labels[RuntimeHookName] = hookName
labels[RuntimeHookStage] = stage
labels[StatusKey] = StatusSucceed
if err != nil {
labels[StatusKey] = StatusFailed
}
// convert seconds to milliseconds
runtimeHookInvokedDurationMilliSeconds.With(labels).Observe(seconds * 1000)
}

func RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(level, resourceType string, err error, seconds float64) {
labels := genNodeLabels()
if labels == nil {
return
}
labels[RuntimeHookReconcilerLevel] = level
labels[RuntimeHookReconcilerResourceType] = resourceType
labels[StatusKey] = StatusSucceed
if err != nil {
labels[StatusKey] = StatusFailed
}
// convert seconds to milliseconds
runtimeHookReconcilerInvokedDurationMilliSeconds.With(labels).Observe(seconds * 1000)
}
7 changes: 6 additions & 1 deletion pkg/koordlet/runtimehooks/hooks/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package hooks

import (
"fmt"
"time"

"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/pkg/koordlet/metrics"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/protocol"
rmconfig "github.com/koordinator-sh/koordinator/pkg/runtimeproxy/config"
Expand Down Expand Up @@ -81,8 +83,11 @@ func RunHooks(failPolicy rmconfig.FailurePolicyType, stage rmconfig.RuntimeHookT
hooks := getHooksByStage(stage)
klog.V(5).Infof("start run %v hooks at %s", len(hooks), stage)
for _, hook := range hooks {
start := time.Now()
klog.V(5).Infof("call hook %v", hook.name)
if err := hook.fn(protocol); err != nil {
err := hook.fn(protocol)
metrics.RecordRuntimeHookInvokedDurationMilliSeconds(hook.name, string(stage), err, metrics.SinceInSeconds(start))
if err != nil {
klog.Errorf("failed to run hook %s in stage %s, reason: %v", hook.name, stage, err)
if failPolicy == rmconfig.PolicyFail {
return err
Expand Down
25 changes: 19 additions & 6 deletions pkg/koordlet/runtimehooks/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/klog/v2"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metrics"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/protocol"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
Expand Down Expand Up @@ -91,7 +92,7 @@ func (d *noneFilter) Filter(podMeta *statesinformer.PodMeta) string {
var singletonNoneFilter *noneFilter

// NoneFilter returns a Filter which skip filtering anything (into the same condition)
func NoneFilter() *noneFilter {
func NoneFilter() Filter {
if singletonNoneFilter == nil {
singletonNoneFilter = &noneFilter{}
}
Expand Down Expand Up @@ -125,7 +126,7 @@ func (p *podQOSFilter) Filter(podMeta *statesinformer.PodMeta) string {
var singletonPodQOSFilter *podQOSFilter

// PodQOSFilter returns a Filter which filters pod qos class
func PodQOSFilter() *podQOSFilter {
func PodQOSFilter() Filter {
if singletonPodQOSFilter == nil {
singletonPodQOSFilter = &podQOSFilter{}
}
Expand Down Expand Up @@ -286,19 +287,22 @@ func (c *reconciler) reconcileKubeQOSCgroup(stopCh <-chan struct{}) {
func doKubeQOSCgroup(e resourceexecutor.ResourceUpdateExecutor) {
for _, kubeQOS := range []corev1.PodQOSClass{
corev1.PodQOSGuaranteed, corev1.PodQOSBurstable, corev1.PodQOSBestEffort} {
for _, r := range globalCgroupReconcilers.kubeQOSLevel {
for resourceType, r := range globalCgroupReconcilers.kubeQOSLevel {
kubeQOSCtx := protocol.HooksProtocolBuilder.KubeQOS(kubeQOS)
reconcileFn, ok := r.fn[NoneFilterCondition]
if !ok { // all kube qos reconcilers should register in this condition
klog.Warningf("calling reconcile function %v failed, error condition %s not registered",
r.description, NoneFilterCondition)
continue
}
start := time.Now()
if err := reconcileFn(kubeQOSCtx); err != nil {
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(KubeQOSLevel), resourceType, err, metrics.SinceInSeconds(start))
klog.Warningf("calling reconcile function %v for kube qos %v failed, error %v",
r.description, kubeQOS, err)
} else {
kubeQOSCtx.ReconcilerDone(e)
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(KubeQOSLevel), resourceType, nil, metrics.SinceInSeconds(start))
klog.V(5).Infof("calling reconcile function %v for kube qos %v finish",
r.description, kubeQOS)
}
Expand All @@ -314,7 +318,7 @@ func (c *reconciler) reconcilePodCgroup(stopCh <-chan struct{}) {
case <-c.podUpdated:
podsMeta := c.getPodsMeta()
for _, podMeta := range podsMeta {
for _, r := range globalCgroupReconcilers.podLevel {
for resourceType, r := range globalCgroupReconcilers.podLevel {
reconcileFn, ok := r.fn[r.filter.Filter(podMeta)]
if !ok {
klog.V(5).Infof("calling reconcile function %v aborted for pod %v, condition %s not registered",
Expand All @@ -323,36 +327,42 @@ func (c *reconciler) reconcilePodCgroup(stopCh <-chan struct{}) {
}

podCtx := protocol.HooksProtocolBuilder.Pod(podMeta)
start := time.Now()
if err := reconcileFn(podCtx); err != nil {
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(PodLevel), resourceType, err, metrics.SinceInSeconds(start))
klog.Warningf("calling reconcile function %v for pod %v failed, error %v",
r.description, podMeta.Key(), err)
} else {
podCtx.ReconcilerDone(c.executor)
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(PodLevel), resourceType, nil, metrics.SinceInSeconds(start))
klog.V(5).Infof("calling reconcile function %v for pod %v finished",
r.description, podMeta.Key())
}
}

for _, r := range globalCgroupReconcilers.sandboxContainerLevel {
for resourceType, r := range globalCgroupReconcilers.sandboxContainerLevel {
reconcileFn, ok := r.fn[r.filter.Filter(podMeta)]
if !ok {
klog.V(5).Infof("calling reconcile function %v aborted for pod %v, condition %s not registered",
r.description, podMeta.Key(), r.filter.Filter(podMeta))
continue
}
sandboxContainerCtx := protocol.HooksProtocolBuilder.Sandbox(podMeta)
start := time.Now()
if err := reconcileFn(sandboxContainerCtx); err != nil {
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(SandboxLevel), resourceType, err, metrics.SinceInSeconds(start))
klog.Warningf("calling reconcile function %v failed for sandbox %v, error %v",
r.description, podMeta.Key(), err)
} else {
sandboxContainerCtx.ReconcilerDone(c.executor)
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(SandboxLevel), resourceType, nil, metrics.SinceInSeconds(start))
klog.V(5).Infof("calling reconcile function %v for pod sandbox %v finished",
r.description, podMeta.Key())
}
}

for _, containerStat := range podMeta.Pod.Status.ContainerStatuses {
for _, r := range globalCgroupReconcilers.containerLevel {
for resourceType, r := range globalCgroupReconcilers.containerLevel {
reconcileFn, ok := r.fn[r.filter.Filter(podMeta)]
if !ok {
klog.V(5).Infof("calling reconcile function %v aborted for container %v/%v, condition %s not registered",
Expand All @@ -361,11 +371,14 @@ func (c *reconciler) reconcilePodCgroup(stopCh <-chan struct{}) {
}

containerCtx := protocol.HooksProtocolBuilder.Container(podMeta, containerStat.Name)
start := time.Now()
if err := reconcileFn(containerCtx); err != nil {
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(ContainerLevel), resourceType, err, metrics.SinceInSeconds(start))
klog.Warningf("calling reconcile function %v for container %v/%v failed, error %v",
r.description, podMeta.Key(), containerStat.Name, err)
} else {
containerCtx.ReconcilerDone(c.executor)
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(ContainerLevel), resourceType, nil, metrics.SinceInSeconds(start))
klog.V(5).Infof("calling reconcile function %v for container %v/%v finish",
r.description, podMeta.Key(), containerStat.Name)
}
Expand Down

0 comments on commit 3ed8b5e

Please sign in to comment.