Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update pod.go #1

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ KOORD_SCHEDULER_IMG ?= "${REG}/${REG_NS}/koord-scheduler:${GIT_BRANCH}-${GIT_COM
KOORD_DESCHEDULER_IMG ?= "${REG}/${REG_NS}/koord-descheduler:${GIT_BRANCH}-${GIT_COMMIT_ID}"

# ENVTEST_K8S_VERSION refers to the version of kubebuilder assets to be downloaded by envtest binary.
ENVTEST_K8S_VERSION = 1.22
ENVTEST_K8S_VERSION = 1.28

AGENT_MODE ?= hostMode
# Set license header files.
Expand Down
1 change: 1 addition & 0 deletions pkg/koordlet/metrics/internal_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ func init() {
internalMustRegister(CoreSchedCollector...)
internalMustRegister(ResourceExecutorCollector...)
internalMustRegister(KubeletStubCollector...)
internalMustRegister(RuntimeHookCollectors...)
}
32 changes: 32 additions & 0 deletions pkg/koordlet/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,35 @@ func TestCoreSchedCollector(t *testing.T) {
testCoreSchedGroup, testCoreSchedCookie)
})
}

func TestRuntimeHookCollector(t *testing.T) {
testingNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node",
Labels: map[string]string{},
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100"),
corev1.ResourceMemory: resource.MustParse("200Gi"),
apiext.BatchCPU: resource.MustParse("50000"),
apiext.BatchMemory: resource.MustParse("80Gi"),
},
Capacity: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100"),
corev1.ResourceMemory: resource.MustParse("200Gi"),
apiext.BatchCPU: resource.MustParse("50000"),
apiext.BatchMemory: resource.MustParse("80Gi"),
},
},
}
testErr := fmt.Errorf("expected error")
t.Run("test", func(t *testing.T) {
Register(testingNode)
defer Register(nil)
RecordRuntimeHookInvokedDurationMilliSeconds("testHook", "testStage", nil, 10.0)
RecordRuntimeHookInvokedDurationMilliSeconds("testHook", "testStage", testErr, 5.0)
RecordRuntimeHookReconcilerInvokedDurationMilliSeconds("pod", "cpu.cfs_quota_us", nil, 10.0)
RecordRuntimeHookReconcilerInvokedDurationMilliSeconds("pod", "cpu.cfs_quota_us", testErr, 5.0)
})
}
83 changes: 83 additions & 0 deletions pkg/koordlet/metrics/runtime_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
Copyright 2022 The Koordinator Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import "github.com/prometheus/client_golang/prometheus"

const (
// RuntimeHookName represents the hook plugin name of runtime hook.
RuntimeHookName = "hook"
// RuntimeHookStage represents the stage of invoked runtime hook.
RuntimeHookStage = "stage"
// RuntimeHookReconcilerLevel represents the level (e.g. pod-level) of invoked runtime hook reconciler.
RuntimeHookReconcilerLevel = "level"
// RuntimeHookReconcilerResourceType represents the resource type (e.g. cpu.cfs_quota_us) of invoked runtime hook reconciler.
RuntimeHookReconcilerResourceType = "resource_type"
)

var (
runtimeHookInvokedDurationMilliSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: KoordletSubsystem,
Name: "runtime_hook_invoked_duration_milliseconds",
Help: "time duration of invocations of runtime hook plugins",
// 10us ~ 10.24ms, cgroup <= 40us
Buckets: prometheus.ExponentialBuckets(0.01, 4, 8),
}, []string{NodeKey, RuntimeHookName, RuntimeHookStage, StatusKey})

runtimeHookReconcilerInvokedDurationMilliSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: KoordletSubsystem,
Name: "runtime_hook_reconciler_invoked_duration_milliseconds",
Help: "time duration of invocations of runtime hook reconciler plugins",
// 10us ~ 10.24ms, cgroup <= 40us
Buckets: prometheus.ExponentialBuckets(0.01, 4, 8),
}, []string{NodeKey, RuntimeHookReconcilerLevel, RuntimeHookReconcilerResourceType, StatusKey})

RuntimeHookCollectors = []prometheus.Collector{
runtimeHookInvokedDurationMilliSeconds,
runtimeHookReconcilerInvokedDurationMilliSeconds,
}
)

func RecordRuntimeHookInvokedDurationMilliSeconds(hookName, stage string, err error, seconds float64) {
labels := genNodeLabels()
if labels == nil {
return
}
labels[RuntimeHookName] = hookName
labels[RuntimeHookStage] = stage
labels[StatusKey] = StatusSucceed
if err != nil {
labels[StatusKey] = StatusFailed
}
// convert seconds to milliseconds
runtimeHookInvokedDurationMilliSeconds.With(labels).Observe(seconds * 1000)
}

func RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(level, resourceType string, err error, seconds float64) {
labels := genNodeLabels()
if labels == nil {
return
}
labels[RuntimeHookReconcilerLevel] = level
labels[RuntimeHookReconcilerResourceType] = resourceType
labels[StatusKey] = StatusSucceed
if err != nil {
labels[StatusKey] = StatusFailed
}
// convert seconds to milliseconds
runtimeHookReconcilerInvokedDurationMilliSeconds.With(labels).Observe(seconds * 1000)
}
7 changes: 6 additions & 1 deletion pkg/koordlet/runtimehooks/hooks/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package hooks

import (
"fmt"
"time"

"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/pkg/koordlet/metrics"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/protocol"
rmconfig "github.com/koordinator-sh/koordinator/pkg/runtimeproxy/config"
Expand Down Expand Up @@ -81,8 +83,11 @@ func RunHooks(failPolicy rmconfig.FailurePolicyType, stage rmconfig.RuntimeHookT
hooks := getHooksByStage(stage)
klog.V(5).Infof("start run %v hooks at %s", len(hooks), stage)
for _, hook := range hooks {
start := time.Now()
klog.V(5).Infof("call hook %v", hook.name)
if err := hook.fn(protocol); err != nil {
err := hook.fn(protocol)
metrics.RecordRuntimeHookInvokedDurationMilliSeconds(hook.name, string(stage), err, metrics.SinceInSeconds(start))
if err != nil {
klog.Errorf("failed to run hook %s in stage %s, reason: %v", hook.name, stage, err)
if failPolicy == rmconfig.PolicyFail {
return err
Expand Down
25 changes: 19 additions & 6 deletions pkg/koordlet/runtimehooks/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/klog/v2"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metrics"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/protocol"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
Expand Down Expand Up @@ -91,7 +92,7 @@ func (d *noneFilter) Filter(podMeta *statesinformer.PodMeta) string {
var singletonNoneFilter *noneFilter

// NoneFilter returns a Filter which skip filtering anything (into the same condition)
func NoneFilter() *noneFilter {
func NoneFilter() Filter {
if singletonNoneFilter == nil {
singletonNoneFilter = &noneFilter{}
}
Expand Down Expand Up @@ -125,7 +126,7 @@ func (p *podQOSFilter) Filter(podMeta *statesinformer.PodMeta) string {
var singletonPodQOSFilter *podQOSFilter

// PodQOSFilter returns a Filter which filters pod qos class
func PodQOSFilter() *podQOSFilter {
func PodQOSFilter() Filter {
if singletonPodQOSFilter == nil {
singletonPodQOSFilter = &podQOSFilter{}
}
Expand Down Expand Up @@ -286,19 +287,22 @@ func (c *reconciler) reconcileKubeQOSCgroup(stopCh <-chan struct{}) {
func doKubeQOSCgroup(e resourceexecutor.ResourceUpdateExecutor) {
for _, kubeQOS := range []corev1.PodQOSClass{
corev1.PodQOSGuaranteed, corev1.PodQOSBurstable, corev1.PodQOSBestEffort} {
for _, r := range globalCgroupReconcilers.kubeQOSLevel {
for resourceType, r := range globalCgroupReconcilers.kubeQOSLevel {
kubeQOSCtx := protocol.HooksProtocolBuilder.KubeQOS(kubeQOS)
reconcileFn, ok := r.fn[NoneFilterCondition]
if !ok { // all kube qos reconcilers should register in this condition
klog.Warningf("calling reconcile function %v failed, error condition %s not registered",
r.description, NoneFilterCondition)
continue
}
start := time.Now()
if err := reconcileFn(kubeQOSCtx); err != nil {
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(KubeQOSLevel), resourceType, err, metrics.SinceInSeconds(start))
klog.Warningf("calling reconcile function %v for kube qos %v failed, error %v",
r.description, kubeQOS, err)
} else {
kubeQOSCtx.ReconcilerDone(e)
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(KubeQOSLevel), resourceType, nil, metrics.SinceInSeconds(start))
klog.V(5).Infof("calling reconcile function %v for kube qos %v finish",
r.description, kubeQOS)
}
Expand All @@ -314,7 +318,7 @@ func (c *reconciler) reconcilePodCgroup(stopCh <-chan struct{}) {
case <-c.podUpdated:
podsMeta := c.getPodsMeta()
for _, podMeta := range podsMeta {
for _, r := range globalCgroupReconcilers.podLevel {
for resourceType, r := range globalCgroupReconcilers.podLevel {
reconcileFn, ok := r.fn[r.filter.Filter(podMeta)]
if !ok {
klog.V(5).Infof("calling reconcile function %v aborted for pod %v, condition %s not registered",
Expand All @@ -323,36 +327,42 @@ func (c *reconciler) reconcilePodCgroup(stopCh <-chan struct{}) {
}

podCtx := protocol.HooksProtocolBuilder.Pod(podMeta)
start := time.Now()
if err := reconcileFn(podCtx); err != nil {
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(PodLevel), resourceType, err, metrics.SinceInSeconds(start))
klog.Warningf("calling reconcile function %v for pod %v failed, error %v",
r.description, podMeta.Key(), err)
} else {
podCtx.ReconcilerDone(c.executor)
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(PodLevel), resourceType, nil, metrics.SinceInSeconds(start))
klog.V(5).Infof("calling reconcile function %v for pod %v finished",
r.description, podMeta.Key())
}
}

for _, r := range globalCgroupReconcilers.sandboxContainerLevel {
for resourceType, r := range globalCgroupReconcilers.sandboxContainerLevel {
reconcileFn, ok := r.fn[r.filter.Filter(podMeta)]
if !ok {
klog.V(5).Infof("calling reconcile function %v aborted for pod %v, condition %s not registered",
r.description, podMeta.Key(), r.filter.Filter(podMeta))
continue
}
sandboxContainerCtx := protocol.HooksProtocolBuilder.Sandbox(podMeta)
start := time.Now()
if err := reconcileFn(sandboxContainerCtx); err != nil {
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(SandboxLevel), resourceType, err, metrics.SinceInSeconds(start))
klog.Warningf("calling reconcile function %v failed for sandbox %v, error %v",
r.description, podMeta.Key(), err)
} else {
sandboxContainerCtx.ReconcilerDone(c.executor)
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(SandboxLevel), resourceType, nil, metrics.SinceInSeconds(start))
klog.V(5).Infof("calling reconcile function %v for pod sandbox %v finished",
r.description, podMeta.Key())
}
}

for _, containerStat := range podMeta.Pod.Status.ContainerStatuses {
for _, r := range globalCgroupReconcilers.containerLevel {
for resourceType, r := range globalCgroupReconcilers.containerLevel {
reconcileFn, ok := r.fn[r.filter.Filter(podMeta)]
if !ok {
klog.V(5).Infof("calling reconcile function %v aborted for container %v/%v, condition %s not registered",
Expand All @@ -361,11 +371,14 @@ func (c *reconciler) reconcilePodCgroup(stopCh <-chan struct{}) {
}

containerCtx := protocol.HooksProtocolBuilder.Container(podMeta, containerStat.Name)
start := time.Now()
if err := reconcileFn(containerCtx); err != nil {
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(ContainerLevel), resourceType, err, metrics.SinceInSeconds(start))
klog.Warningf("calling reconcile function %v for container %v/%v failed, error %v",
r.description, podMeta.Key(), containerStat.Name, err)
} else {
containerCtx.ReconcilerDone(c.executor)
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(ContainerLevel), resourceType, nil, metrics.SinceInSeconds(start))
klog.V(5).Infof("calling reconcile function %v for container %v/%v finish",
r.description, podMeta.Key(), containerStat.Name)
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/koordlet/util/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"fmt"
"os"
"path/filepath"
"strings"

corev1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -79,7 +80,13 @@
continue
}
if _, exist := containerSubDirNames[containerDir.Name()]; !exist {
sandboxCandidates = append(sandboxCandidates, containerDir.Name())
if strings.HasPrefix(containerDir.Name(), "crio-") {
if ! strings.HasSuffix(containerDir.Name(), ".scope") {

Check failure on line 84 in pkg/koordlet/util/pod.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gofmt`-ed with `-s` (gofmt)
sandboxCandidates = append(sandboxCandidates, containerDir.Name())
}
}else {
sandboxCandidates = append(sandboxCandidates, containerDir.Name())
}
}
}

Expand Down
Loading
Loading