Skip to content

Commit

Permalink
Merge branch 'koordinator-sh:main' into cri-o-runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
ocichina001 authored Apr 1, 2024
2 parents 5985170 + 2dc8735 commit 73dd932
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 50 deletions.
1 change: 1 addition & 0 deletions pkg/koordlet/metrics/internal_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ func init() {
internalMustRegister(CoreSchedCollector...)
internalMustRegister(ResourceExecutorCollector...)
internalMustRegister(KubeletStubCollector...)
internalMustRegister(RuntimeHookCollectors...)
}
32 changes: 32 additions & 0 deletions pkg/koordlet/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,35 @@ func TestCoreSchedCollector(t *testing.T) {
testCoreSchedGroup, testCoreSchedCookie)
})
}

func TestRuntimeHookCollector(t *testing.T) {
testingNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node",
Labels: map[string]string{},
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100"),
corev1.ResourceMemory: resource.MustParse("200Gi"),
apiext.BatchCPU: resource.MustParse("50000"),
apiext.BatchMemory: resource.MustParse("80Gi"),
},
Capacity: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100"),
corev1.ResourceMemory: resource.MustParse("200Gi"),
apiext.BatchCPU: resource.MustParse("50000"),
apiext.BatchMemory: resource.MustParse("80Gi"),
},
},
}
testErr := fmt.Errorf("expected error")
t.Run("test", func(t *testing.T) {
Register(testingNode)
defer Register(nil)
RecordRuntimeHookInvokedDurationMilliSeconds("testHook", "testStage", nil, 10.0)
RecordRuntimeHookInvokedDurationMilliSeconds("testHook", "testStage", testErr, 5.0)
RecordRuntimeHookReconcilerInvokedDurationMilliSeconds("pod", "cpu.cfs_quota_us", nil, 10.0)
RecordRuntimeHookReconcilerInvokedDurationMilliSeconds("pod", "cpu.cfs_quota_us", testErr, 5.0)
})
}
83 changes: 83 additions & 0 deletions pkg/koordlet/metrics/runtime_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import "github.com/prometheus/client_golang/prometheus"

const (
// RuntimeHookName represents the hook plugin name of runtime hook.
RuntimeHookName = "hook"
// RuntimeHookStage represents the stage of invoked runtime hook.
RuntimeHookStage = "stage"
// RuntimeHookReconcilerLevel represents the level (e.g. pod-level) of invoked runtime hook reconciler.
RuntimeHookReconcilerLevel = "level"
// RuntimeHookReconcilerResourceType represents the resource type (e.g. cpu.cfs_quota_us) of invoked runtime hook reconciler.
RuntimeHookReconcilerResourceType = "resource_type"
)

var (
runtimeHookInvokedDurationMilliSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: KoordletSubsystem,
Name: "runtime_hook_invoked_duration_milliseconds",
Help: "time duration of invocations of runtime hook plugins",
// 10us ~ 10.24ms, cgroup <= 40us
Buckets: prometheus.ExponentialBuckets(0.01, 4, 8),
}, []string{NodeKey, RuntimeHookName, RuntimeHookStage, StatusKey})

runtimeHookReconcilerInvokedDurationMilliSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: KoordletSubsystem,
Name: "runtime_hook_reconciler_invoked_duration_milliseconds",
Help: "time duration of invocations of runtime hook reconciler plugins",
// 10us ~ 10.24ms, cgroup <= 40us
Buckets: prometheus.ExponentialBuckets(0.01, 4, 8),
}, []string{NodeKey, RuntimeHookReconcilerLevel, RuntimeHookReconcilerResourceType, StatusKey})

RuntimeHookCollectors = []prometheus.Collector{
runtimeHookInvokedDurationMilliSeconds,
runtimeHookReconcilerInvokedDurationMilliSeconds,
}
)

func RecordRuntimeHookInvokedDurationMilliSeconds(hookName, stage string, err error, seconds float64) {
labels := genNodeLabels()
if labels == nil {
return
}
labels[RuntimeHookName] = hookName
labels[RuntimeHookStage] = stage
labels[StatusKey] = StatusSucceed
if err != nil {
labels[StatusKey] = StatusFailed
}
// convert seconds to milliseconds
runtimeHookInvokedDurationMilliSeconds.With(labels).Observe(seconds * 1000)
}

func RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(level, resourceType string, err error, seconds float64) {
labels := genNodeLabels()
if labels == nil {
return
}
labels[RuntimeHookReconcilerLevel] = level
labels[RuntimeHookReconcilerResourceType] = resourceType
labels[StatusKey] = StatusSucceed
if err != nil {
labels[StatusKey] = StatusFailed
}
// convert seconds to milliseconds
runtimeHookReconcilerInvokedDurationMilliSeconds.With(labels).Observe(seconds * 1000)
}
7 changes: 6 additions & 1 deletion pkg/koordlet/runtimehooks/hooks/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package hooks

import (
"fmt"
"time"

"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/pkg/koordlet/metrics"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/protocol"
rmconfig "github.com/koordinator-sh/koordinator/pkg/runtimeproxy/config"
Expand Down Expand Up @@ -81,8 +83,11 @@ func RunHooks(failPolicy rmconfig.FailurePolicyType, stage rmconfig.RuntimeHookT
hooks := getHooksByStage(stage)
klog.V(5).Infof("start run %v hooks at %s", len(hooks), stage)
for _, hook := range hooks {
start := time.Now()
klog.V(5).Infof("call hook %v", hook.name)
if err := hook.fn(protocol); err != nil {
err := hook.fn(protocol)
metrics.RecordRuntimeHookInvokedDurationMilliSeconds(hook.name, string(stage), err, metrics.SinceInSeconds(start))
if err != nil {
klog.Errorf("failed to run hook %s in stage %s, reason: %v", hook.name, stage, err)
if failPolicy == rmconfig.PolicyFail {
return err
Expand Down
25 changes: 19 additions & 6 deletions pkg/koordlet/runtimehooks/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/klog/v2"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metrics"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/protocol"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
Expand Down Expand Up @@ -91,7 +92,7 @@ func (d *noneFilter) Filter(podMeta *statesinformer.PodMeta) string {
var singletonNoneFilter *noneFilter

// NoneFilter returns a Filter which skip filtering anything (into the same condition)
func NoneFilter() *noneFilter {
func NoneFilter() Filter {
if singletonNoneFilter == nil {
singletonNoneFilter = &noneFilter{}
}
Expand Down Expand Up @@ -125,7 +126,7 @@ func (p *podQOSFilter) Filter(podMeta *statesinformer.PodMeta) string {
var singletonPodQOSFilter *podQOSFilter

// PodQOSFilter returns a Filter which filters pod qos class
func PodQOSFilter() *podQOSFilter {
func PodQOSFilter() Filter {
if singletonPodQOSFilter == nil {
singletonPodQOSFilter = &podQOSFilter{}
}
Expand Down Expand Up @@ -286,19 +287,22 @@ func (c *reconciler) reconcileKubeQOSCgroup(stopCh <-chan struct{}) {
func doKubeQOSCgroup(e resourceexecutor.ResourceUpdateExecutor) {
for _, kubeQOS := range []corev1.PodQOSClass{
corev1.PodQOSGuaranteed, corev1.PodQOSBurstable, corev1.PodQOSBestEffort} {
for _, r := range globalCgroupReconcilers.kubeQOSLevel {
for resourceType, r := range globalCgroupReconcilers.kubeQOSLevel {
kubeQOSCtx := protocol.HooksProtocolBuilder.KubeQOS(kubeQOS)
reconcileFn, ok := r.fn[NoneFilterCondition]
if !ok { // all kube qos reconcilers should register in this condition
klog.Warningf("calling reconcile function %v failed, error condition %s not registered",
r.description, NoneFilterCondition)
continue
}
start := time.Now()
if err := reconcileFn(kubeQOSCtx); err != nil {
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(KubeQOSLevel), resourceType, err, metrics.SinceInSeconds(start))
klog.Warningf("calling reconcile function %v for kube qos %v failed, error %v",
r.description, kubeQOS, err)
} else {
kubeQOSCtx.ReconcilerDone(e)
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(KubeQOSLevel), resourceType, nil, metrics.SinceInSeconds(start))
klog.V(5).Infof("calling reconcile function %v for kube qos %v finish",
r.description, kubeQOS)
}
Expand All @@ -314,7 +318,7 @@ func (c *reconciler) reconcilePodCgroup(stopCh <-chan struct{}) {
case <-c.podUpdated:
podsMeta := c.getPodsMeta()
for _, podMeta := range podsMeta {
for _, r := range globalCgroupReconcilers.podLevel {
for resourceType, r := range globalCgroupReconcilers.podLevel {
reconcileFn, ok := r.fn[r.filter.Filter(podMeta)]
if !ok {
klog.V(5).Infof("calling reconcile function %v aborted for pod %v, condition %s not registered",
Expand All @@ -323,36 +327,42 @@ func (c *reconciler) reconcilePodCgroup(stopCh <-chan struct{}) {
}

podCtx := protocol.HooksProtocolBuilder.Pod(podMeta)
start := time.Now()
if err := reconcileFn(podCtx); err != nil {
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(PodLevel), resourceType, err, metrics.SinceInSeconds(start))
klog.Warningf("calling reconcile function %v for pod %v failed, error %v",
r.description, podMeta.Key(), err)
} else {
podCtx.ReconcilerDone(c.executor)
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(PodLevel), resourceType, nil, metrics.SinceInSeconds(start))
klog.V(5).Infof("calling reconcile function %v for pod %v finished",
r.description, podMeta.Key())
}
}

for _, r := range globalCgroupReconcilers.sandboxContainerLevel {
for resourceType, r := range globalCgroupReconcilers.sandboxContainerLevel {
reconcileFn, ok := r.fn[r.filter.Filter(podMeta)]
if !ok {
klog.V(5).Infof("calling reconcile function %v aborted for pod %v, condition %s not registered",
r.description, podMeta.Key(), r.filter.Filter(podMeta))
continue
}
sandboxContainerCtx := protocol.HooksProtocolBuilder.Sandbox(podMeta)
start := time.Now()
if err := reconcileFn(sandboxContainerCtx); err != nil {
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(SandboxLevel), resourceType, err, metrics.SinceInSeconds(start))
klog.Warningf("calling reconcile function %v failed for sandbox %v, error %v",
r.description, podMeta.Key(), err)
} else {
sandboxContainerCtx.ReconcilerDone(c.executor)
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(SandboxLevel), resourceType, nil, metrics.SinceInSeconds(start))
klog.V(5).Infof("calling reconcile function %v for pod sandbox %v finished",
r.description, podMeta.Key())
}
}

for _, containerStat := range podMeta.Pod.Status.ContainerStatuses {
for _, r := range globalCgroupReconcilers.containerLevel {
for resourceType, r := range globalCgroupReconcilers.containerLevel {
reconcileFn, ok := r.fn[r.filter.Filter(podMeta)]
if !ok {
klog.V(5).Infof("calling reconcile function %v aborted for container %v/%v, condition %s not registered",
Expand All @@ -361,11 +371,14 @@ func (c *reconciler) reconcilePodCgroup(stopCh <-chan struct{}) {
}

containerCtx := protocol.HooksProtocolBuilder.Container(podMeta, containerStat.Name)
start := time.Now()
if err := reconcileFn(containerCtx); err != nil {
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(ContainerLevel), resourceType, err, metrics.SinceInSeconds(start))
klog.Warningf("calling reconcile function %v for container %v/%v failed, error %v",
r.description, podMeta.Key(), containerStat.Name, err)
} else {
containerCtx.ReconcilerDone(c.executor)
metrics.RecordRuntimeHookReconcilerInvokedDurationMilliSeconds(string(ContainerLevel), resourceType, nil, metrics.SinceInSeconds(start))
klog.V(5).Infof("calling reconcile function %v for container %v/%v finish",
r.description, podMeta.Key(), containerStat.Name)
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/scheduler/plugins/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Manager interface {
GetGangSummaries() map[string]*GangSummary
IsGangMinSatisfied(*corev1.Pod) bool
GetChildScheduleCycle(*corev1.Pod) int
GetGangGroupWaitingBoundPodNum(pod *corev1.Pod) int
}

// PodGroupManager defines the scheduling operation called
Expand Down Expand Up @@ -552,3 +553,19 @@ func (pgMgr *PodGroupManager) GetChildScheduleCycle(pod *corev1.Pod) int {

return gang.getChildScheduleCycle(pod)
}

func (pgMgr *PodGroupManager) GetGangGroupWaitingBoundPodNum(pod *corev1.Pod) int {
gang := pgMgr.GetGangByPod(pod)
if gang == nil {
return 0
}
gangGroup := gang.GangGroup
waitingPodNum := 0
for _, memberGangID := range gangGroup {
memberGang := pgMgr.cache.getGangFromCacheByGangId(memberGangID, false)
if memberGang != nil {
waitingPodNum += memberGang.getGangWaitingPods()
}
}
return waitingPodNum
}
41 changes: 21 additions & 20 deletions pkg/scheduler/plugins/coscheduling/coscheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,28 +135,29 @@ func (cs *Coscheduling) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool {

group1, _ := cs.pgMgr.GetGroupId(podInfo1.Pod)
group2, _ := cs.pgMgr.GetGroupId(podInfo2.Pod)
if group1 != group2 {
return group1 < group2
}

isgang1satisfied := cs.pgMgr.IsGangMinSatisfied(podInfo1.Pod)
isgang2satisfied := cs.pgMgr.IsGangMinSatisfied(podInfo2.Pod)
if isgang1satisfied != isgang2satisfied {
return !isgang1satisfied
}

childScheduleCycle1 := cs.pgMgr.GetChildScheduleCycle(podInfo1.Pod)
childScheduleCycle2 := cs.pgMgr.GetChildScheduleCycle(podInfo2.Pod)
if childScheduleCycle1 != childScheduleCycle2 {
return childScheduleCycle1 < childScheduleCycle2
}

creationTime1 := cs.pgMgr.GetCreatTime(podInfo1)
creationTime2 := cs.pgMgr.GetCreatTime(podInfo2)
if creationTime1.Equal(creationTime2) {
return util.GetId(podInfo1.Pod.Namespace, podInfo1.Pod.Name) < util.GetId(podInfo2.Pod.Namespace, podInfo2.Pod.Name)
waitingBoundPodNum1 := cs.pgMgr.GetGangGroupWaitingBoundPodNum(podInfo1.Pod)
waitingBoundPodNum2 := cs.pgMgr.GetGangGroupWaitingBoundPodNum(podInfo2.Pod)
if waitingBoundPodNum1 != 0 || waitingBoundPodNum2 != 0 {
// At the same time, only member pod of one podGroup should be assumed, so we prefer the pod already having sibling assumed, then they can succeed together.
if waitingBoundPodNum1 == 0 || waitingBoundPodNum2 == 0 {
return waitingBoundPodNum1 != 0
}
/*
Two gang groups may both already have some assumed sibling pods.
For example:
1. GroupA has submitted 6 member, and have 5 already assumed;
2. then the sixth has been deleted;
3. then GroupB submitted its pods and have 3 already assumed;
4. GroupA submit the sixth pod
In this case, waitingPodNum will make no sense, so just sort it by group id to give fixed order.
Because no matter former succeed or fail, it's waitingPodNum will be zeroed. And the deadlock will be avoided
*/
return group1 < group2
}
return creationTime1.Before(creationTime2)
// If no pod succeed, we will schedule all pod by RoundRobin to assure fairness.
// If some time-consuming member pod of one gang failed, then it's sibling will fail soon(because scheduling cycle invalid), so no need to assure all sibling should fail together.
return podInfo1.Timestamp.Before(podInfo2.Timestamp)
}

// PreFilter
Expand Down
Loading

0 comments on commit 73dd932

Please sign in to comment.