From b175fe837a5a37c3d4d94de45691ea0efb7313f4 Mon Sep 17 00:00:00 2001 From: wangjianyu Date: Thu, 11 Apr 2024 11:33:34 +0800 Subject: [PATCH] scheduler: assure Fairness and DeadlockFree (#1996) Signed-off-by: wangjianyu.wjy Co-authored-by: wangjianyu.wjy Signed-off-by: george --- .../plugins/coscheduling/core/core.go | 58 ++- .../plugins/coscheduling/core/gang.go | 33 +- .../plugins/coscheduling/core/gang_cache.go | 50 +- .../plugins/coscheduling/core/gang_summary.go | 2 + .../plugins/coscheduling/coscheduling.go | 42 +- .../plugins/coscheduling/coscheduling_test.go | 463 ++++++++++++++++-- .../coscheduling/plugin_service_test.go | 2 + 7 files changed, 560 insertions(+), 90 deletions(-) diff --git a/pkg/scheduler/plugins/coscheduling/core/core.go b/pkg/scheduler/plugins/coscheduling/core/core.go index c0f2a11d5..9f0d0c5fd 100644 --- a/pkg/scheduler/plugins/coscheduling/core/core.go +++ b/pkg/scheduler/plugins/coscheduling/core/core.go @@ -73,7 +73,7 @@ type Manager interface { GetGangSummaries() map[string]*GangSummary IsGangMinSatisfied(*corev1.Pod) bool GetChildScheduleCycle(*corev1.Pod) int - GetGangGroupWaitingBoundPodNum(pod *corev1.Pod) int + GetGangGroupLastScheduleTimeOfPod(pod *corev1.Pod, podLastScheduleTime time.Time) time.Time } // PodGroupManager defines the scheduling operation called @@ -227,6 +227,7 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er return fmt.Errorf("can't find gang, gangName: %v, podName: %v", util.GetId(pod.Namespace, util.GetGangNameByPod(pod)), util.GetId(pod.Namespace, pod.Name)) } + pgMgr.cache.setGangGroupLastScheduleTimeOfPod(pod.UID, gang.getLastScheduleTime()) // check if gang is initialized if !gang.HasGangInit { @@ -284,6 +285,9 @@ func (pgMgr *PodGroupManager) PostFilter(ctx context.Context, pod *corev1.Pod, h klog.Warningf(message) return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable, message) } + defer func() { + pgMgr.cache.setGangGroupLastScheduleTimeOfPod(pod.UID, gang.getLastScheduleTime()) + }() if gang.getGangMatchPolicy() == extension.GangMatchPolicyOnceSatisfied && gang.isGangOnceResourceSatisfied() { return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable) } @@ -350,6 +354,7 @@ func (pgMgr *PodGroupManager) Unreserve(ctx context.Context, state *framework.Cy klog.Warningf("Pod %q missing Gang", klog.KObj(pod)) return } + pgMgr.cache.setGangGroupLastScheduleTimeOfPod(pod.UID, gang.getLastScheduleTime()) // first delete the pod from gang's waitingFroBindChildren map gang.delAssumedPod(pod) @@ -370,25 +375,35 @@ func (pgMgr *PodGroupManager) rejectGangGroupById(handle framework.Handle, plugi gangGroup := gang.getGangGroup() gangSet := sets.NewString(gangGroup...) - rejectedPodCount := 0 if handle != nil { - handle.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { - waitingGangId := util.GetId(waitingPod.GetPod().Namespace, util.GetGangNameByPod(waitingPod.GetPod())) - if gangSet.Has(waitingGangId) { - klog.V(1).InfoS("GangGroup gets rejected due to member Gang is unschedulable", - "gang", gangId, "waitingGang", waitingGangId, "waitingPod", klog.KObj(waitingPod.GetPod())) - waitingPod.Reject(pluginName, message) - rejectedPodCount++ - } - }) + /* + let's explain why we need defer by following example, we have three gang of one gang group, noted as gangA, gangB and gangC, + the following is what happened when no defer. + 1. gangB/C assumed, gangA failed + 2. then gangA invoke PostFilter, reject gangB|gangC, gangB|gangC invoke unreserve asynchronously, i.e: invoke rejectGangGroupById concurrently + 3. gangB or gangC maybe find it's scheduling cycle valid if it judges scheduling cycle before gang A reject it, then different lastScheduleTime is possible + */ + defer func() { + handle.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { + waitingGangId := util.GetId(waitingPod.GetPod().Namespace, util.GetGangNameByPod(waitingPod.GetPod())) + if gangSet.Has(waitingGangId) { + klog.V(1).InfoS("GangGroup gets rejected due to member Gang is unschedulable", + "gang", gangId, "waitingGang", waitingGangId, "waitingPod", klog.KObj(waitingPod.GetPod())) + waitingPod.Reject(pluginName, message) + } + }) + }() } - if rejectedPodCount == 0 { + if !gang.isScheduleCycleValid() { + // In a schedule cycle, one gang can only reject its self and sibling gang once return } + gangGroupLastScheduleTime := timeNowFn() for gang := range gangSet { gangIns := pgMgr.cache.getGangFromCacheByGangId(gang, false) if gangIns != nil { gangIns.setScheduleCycleValid(false) + gangIns.setLastScheduleTime(gangGroupLastScheduleTime) } } } @@ -405,6 +420,7 @@ func (pgMgr *PodGroupManager) PostBind(ctx context.Context, pod *corev1.Pod, nod } // first update gang in cache gang.addBoundPod(pod) + pgMgr.cache.deleteGangGroupLastScheduleTimeOfPod(pod) // update PodGroup _, pg := pgMgr.GetPodGroup(pod) @@ -554,18 +570,10 @@ func (pgMgr *PodGroupManager) GetChildScheduleCycle(pod *corev1.Pod) int { return gang.getChildScheduleCycle(pod) } -func (pgMgr *PodGroupManager) GetGangGroupWaitingBoundPodNum(pod *corev1.Pod) int { - gang := pgMgr.GetGangByPod(pod) - if gang == nil { - return 0 - } - gangGroup := gang.GangGroup - waitingPodNum := 0 - for _, memberGangID := range gangGroup { - memberGang := pgMgr.cache.getGangFromCacheByGangId(memberGangID, false) - if memberGang != nil { - waitingPodNum += memberGang.getGangWaitingPods() - } +func (pgMgr *PodGroupManager) GetGangGroupLastScheduleTimeOfPod(pod *corev1.Pod, podLastScheduleTime time.Time) time.Time { + gangGroupLastScheduleTime := pgMgr.cache.getGangGroupLastScheduleTimeOfPod(pod.UID) + if gangGroupLastScheduleTime != nil { + return *gangGroupLastScheduleTime } - return waitingPodNum + return podLastScheduleTime } diff --git a/pkg/scheduler/plugins/coscheduling/core/gang.go b/pkg/scheduler/plugins/coscheduling/core/gang.go index 89a2a53db..0a7271d82 100644 --- a/pkg/scheduler/plugins/coscheduling/core/gang.go +++ b/pkg/scheduler/plugins/coscheduling/core/gang.go @@ -75,7 +75,10 @@ type Gang struct { // we set the pod's cycle in `childrenScheduleRoundMap` equal with `scheduleCycle` and pass the check. If result is negative, means // the pod has been scheduled in this cycle, so we should reject it. With `totalChildrenNum`'s help, when the last pod comes to make all // `childrenScheduleRoundMap`'s values equal to `scheduleCycle`, Gang's `scheduleCycle` will be added by 1, which means a new schedule cycle. - ScheduleCycle int + ScheduleCycle int + // Pods with the same priority are scheduled according to the scheduling time of the previous round to ensure fairness. + // For a Gang, its last round time as a whole is recorded as LastScheduleTime, that is, all Pods below it have the same time, which is LastScheduleTime. + LastScheduleTime time.Time ChildrenScheduleRoundMap map[string]int GangFrom string @@ -98,6 +101,7 @@ func NewGang(gangName string) *Gang { BoundChildren: make(map[string]*v1.Pod), ScheduleCycleValid: true, ScheduleCycle: 1, + LastScheduleTime: timeNowFn(), ChildrenScheduleRoundMap: make(map[string]int), GangFrom: GangFromPodAnnotation, HasGangInit: false, @@ -149,6 +153,7 @@ func (gang *Gang) tryInitByPodConfig(pod *v1.Pod, args *config.CoschedulingArgs) // here we assume that Coscheduling's CreateTime equal with the pod's CreateTime gang.CreateTime = pod.CreationTimestamp.Time + gang.LastScheduleTime = pod.CreationTimestamp.Time waitTime, err := time.ParseDuration(pod.Annotations[extension.AnnotationGangWaitTime]) if err != nil || waitTime <= 0 { @@ -215,6 +220,7 @@ func (gang *Gang) tryInitByPodGroup(pg *v1alpha1.PodGroup, args *config.Coschedu // here we assume that Coscheduling's CreateTime equal with the podGroup CRD CreateTime gang.CreateTime = pg.CreationTimestamp.Time + gang.LastScheduleTime = pg.CreationTimestamp.Time waitTime := util.GetWaitTimeDuration(pg, args.DefaultTimeout.Duration) gang.WaitTime = waitTime @@ -382,6 +388,19 @@ func (gang *Gang) setScheduleCycleValid(valid bool) { gang.lock.Lock() defer gang.lock.Unlock() + if !valid && !gang.ScheduleCycleValid { + /* + let's explain why by following example, there are three gang of one group: A, B and C, every gang group have min-member of 10 pod, noted as A1-A10, B1-B10, C1-C10. + 1. A1-A5 assumed in gangA scheduleCycle 1, B1-B5 assumed in gangA scheduleCycle 1, C1-C5 assumed in gangA scheduleCycle 1, C6 filter failed due to insufficient resource, then A6-10 failed due to cycle invalid, C7-C10 failed due to cycle invalid, B6-B9 failed due to cycle invalid + 2. A1-A5 assumed in gangA scheduleCycle 2, C1-C5 assumed in gangA scheduleCycle 2, C6 failed due to insufficient resource and gangA\B\C cycle set to invalid, + 3. then A6-A10,C7-C10 failed due to cycle invalid + 4. then B10 failed due to cycle invalid, B1 comes and find its gang scheduling cycle 2 can be valid, so it's assumed. + 5. however all it's sibling will come until next round of all gangs, these gangs will face insufficient resource due to pre-allocated of B10 + + TODO this logic can be optimized by give gangs of same group same scheduling cycle + */ + gang.ScheduleCycle += 1 + } gang.ScheduleCycleValid = valid klog.Infof("SetScheduleCycleValid, gangName: %v, valid: %v", gang.Name, valid) } @@ -494,3 +513,15 @@ func (gang *Gang) isGangValidForPermit() bool { return len(gang.WaitingForBindChildren) >= gang.MinRequiredNumber || gang.OnceResourceSatisfied == true } } + +func (gang *Gang) getLastScheduleTime() time.Time { + gang.lock.Lock() + defer gang.lock.Unlock() + return gang.LastScheduleTime +} + +func (gang *Gang) setLastScheduleTime(time time.Time) { + gang.lock.Lock() + defer gang.lock.Unlock() + gang.LastScheduleTime = time +} diff --git a/pkg/scheduler/plugins/coscheduling/core/gang_cache.go b/pkg/scheduler/plugins/coscheduling/core/gang_cache.go index 3d818a2af..df96dbd82 100644 --- a/pkg/scheduler/plugins/coscheduling/core/gang_cache.go +++ b/pkg/scheduler/plugins/coscheduling/core/gang_cache.go @@ -18,8 +18,10 @@ package core import ( "sync" + "time" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" listerv1 "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" "sigs.k8s.io/scheduler-plugins/pkg/apis/scheduling/v1alpha1" @@ -32,22 +34,24 @@ import ( ) type GangCache struct { - lock *sync.RWMutex - gangItems map[string]*Gang - pluginArgs *config.CoschedulingArgs - podLister listerv1.PodLister - pgLister pglister.PodGroupLister - pgClient pgclientset.Interface + lock *sync.RWMutex + gangItems map[string]*Gang + pluginArgs *config.CoschedulingArgs + podLister listerv1.PodLister + pgLister pglister.PodGroupLister + pgClient pgclientset.Interface + gangGroupLastScheduleTimeOfPod map[types.UID]*time.Time } func NewGangCache(args *config.CoschedulingArgs, podLister listerv1.PodLister, pgLister pglister.PodGroupLister, client pgclientset.Interface) *GangCache { return &GangCache{ - gangItems: make(map[string]*Gang), - lock: new(sync.RWMutex), - pluginArgs: args, - podLister: podLister, - pgLister: pgLister, - pgClient: client, + gangItems: make(map[string]*Gang), + lock: new(sync.RWMutex), + pluginArgs: args, + podLister: podLister, + pgLister: pgLister, + pgClient: client, + gangGroupLastScheduleTimeOfPod: make(map[types.UID]*time.Time), } } @@ -63,6 +67,26 @@ func (gangCache *GangCache) getGangFromCacheByGangId(gangId string, createIfNotE return gang } +func (gangCache *GangCache) getGangGroupLastScheduleTimeOfPod(podUID types.UID) *time.Time { + gangCache.lock.Lock() + defer gangCache.lock.Unlock() + return gangCache.gangGroupLastScheduleTimeOfPod[podUID] +} + +func (gangCache *GangCache) setGangGroupLastScheduleTimeOfPod(podUID types.UID, lastScheduleTime time.Time) { + gangCache.lock.Lock() + defer gangCache.lock.Unlock() + gangCache.gangGroupLastScheduleTimeOfPod[podUID] = &lastScheduleTime +} + +func (gangCache *GangCache) deleteGangGroupLastScheduleTimeOfPod(pod *v1.Pod) { + gangCache.lock.Lock() + defer gangCache.lock.Unlock() + + delete(gangCache.gangGroupLastScheduleTimeOfPod, pod.UID) + klog.Infof("delete podScheduleInfo from cache, pod: %s/%s/%v", pod.Namespace, pod.Name, pod.UID) +} + func (gangCache *GangCache) getAllGangsFromCache() map[string]*Gang { gangCache.lock.RLock() defer gangCache.lock.RUnlock() @@ -106,6 +130,7 @@ func (gangCache *GangCache) onPodAdd(obj interface{}) { if pod.Spec.NodeName != "" { gang.addBoundPod(pod) gang.setResourceSatisfied() + gangCache.deleteGangGroupLastScheduleTimeOfPod(pod) } } @@ -136,6 +161,7 @@ func (gangCache *GangCache) onPodDelete(obj interface{}) { if gangName == "" { return } + gangCache.deleteGangGroupLastScheduleTimeOfPod(pod) gangNamespace := pod.Namespace gangId := util.GetId(gangNamespace, gangName) diff --git a/pkg/scheduler/plugins/coscheduling/core/gang_summary.go b/pkg/scheduler/plugins/coscheduling/core/gang_summary.go index 4326789a8..4a08975ee 100644 --- a/pkg/scheduler/plugins/coscheduling/core/gang_summary.go +++ b/pkg/scheduler/plugins/coscheduling/core/gang_summary.go @@ -21,6 +21,7 @@ type GangSummary struct { OnceResourceSatisfied bool `json:"onceResourceSatisfied"` ScheduleCycleValid bool `json:"scheduleCycleValid"` ScheduleCycle int `json:"scheduleCycle"` + LastScheduleTime time.Time `json:"lastScheduleTime"` ChildrenScheduleRoundMap map[string]int `json:"childrenScheduleRoundMap"` GangFrom string `json:"gangFrom"` HasGangInit bool `json:"hasGangInit"` @@ -51,6 +52,7 @@ func (gang *Gang) GetGangSummary() *GangSummary { gangSummary.OnceResourceSatisfied = gang.OnceResourceSatisfied gangSummary.ScheduleCycleValid = gang.ScheduleCycleValid gangSummary.ScheduleCycle = gang.ScheduleCycle + gangSummary.LastScheduleTime = gang.LastScheduleTime gangSummary.GangFrom = gang.GangFrom gangSummary.HasGangInit = gang.HasGangInit gangSummary.GangGroup = append(gangSummary.GangGroup, gang.GangGroup...) diff --git a/pkg/scheduler/plugins/coscheduling/coscheduling.go b/pkg/scheduler/plugins/coscheduling/coscheduling.go index f59a12eb8..46531cf3f 100644 --- a/pkg/scheduler/plugins/coscheduling/coscheduling.go +++ b/pkg/scheduler/plugins/coscheduling/coscheduling.go @@ -133,30 +133,28 @@ func (cs *Coscheduling) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool { return subPrio1 > subPrio2 } - group1, _ := cs.pgMgr.GetGroupId(podInfo1.Pod) - group2, _ := cs.pgMgr.GetGroupId(podInfo2.Pod) - - waitingBoundPodNum1 := cs.pgMgr.GetGangGroupWaitingBoundPodNum(podInfo1.Pod) - waitingBoundPodNum2 := cs.pgMgr.GetGangGroupWaitingBoundPodNum(podInfo2.Pod) - if waitingBoundPodNum1 != 0 || waitingBoundPodNum2 != 0 { - // At the same time, only member pod of one podGroup should be assumed, so we prefer the pod already having sibling assumed, then they can succeed together. - if waitingBoundPodNum1 == 0 || waitingBoundPodNum2 == 0 { - return waitingBoundPodNum1 != 0 + lastScheduleTime1 := cs.pgMgr.GetGangGroupLastScheduleTimeOfPod(podInfo1.Pod, podInfo1.Timestamp) + lastScheduleTime2 := cs.pgMgr.GetGangGroupLastScheduleTimeOfPod(podInfo2.Pod, podInfo2.Timestamp) + if !lastScheduleTime1.Equal(lastScheduleTime2) { + return lastScheduleTime1.Before(lastScheduleTime2) + } + gangId1 := util.GetId(podInfo1.Pod.Namespace, util.GetGangNameByPod(podInfo1.Pod)) + gangId2 := util.GetId(podInfo2.Pod.Namespace, util.GetGangNameByPod(podInfo2.Pod)) + // for member gang of same gangGroup, the gang that haven’t been satisfied yet take precedence + if gangId1 != gangId2 { + isGang1Satisfied := cs.pgMgr.IsGangMinSatisfied(podInfo1.Pod) + isGang2Satisfied := cs.pgMgr.IsGangMinSatisfied(podInfo2.Pod) + if isGang1Satisfied != isGang2Satisfied { + return !isGang1Satisfied + } + } else { + // for member pod of same gang, the pod with the smaller scheduling cycle take precedence so that gang scheduling cycle can be valid and iterated + childScheduleCycle1 := cs.pgMgr.GetChildScheduleCycle(podInfo1.Pod) + childScheduleCycle2 := cs.pgMgr.GetChildScheduleCycle(podInfo2.Pod) + if childScheduleCycle1 != childScheduleCycle2 { + return childScheduleCycle1 < childScheduleCycle2 } - /* - Two gang groups may both already have some assumed sibling pods. - For example: - 1. GroupA has submitted 6 member, and have 5 already assumed; - 2. then the sixth has been deleted; - 3. then GroupB submitted its pods and have 3 already assumed; - 4. GroupA submit the sixth pod - In this case, waitingPodNum will make no sense, so just sort it by group id to give fixed order. - Because no matter former succeed or fail, it's waitingPodNum will be zeroed. And the deadlock will be avoided - */ - return group1 < group2 } - // If no pod succeed, we will schedule all pod by RoundRobin to assure fairness. - // If some time-consuming member pod of one gang failed, then it's sibling will fail soon(because scheduling cycle invalid), so no need to assure all sibling should fail together. return podInfo1.Timestamp.Before(podInfo2.Timestamp) } diff --git a/pkg/scheduler/plugins/coscheduling/coscheduling_test.go b/pkg/scheduler/plugins/coscheduling/coscheduling_test.go index b3535131a..a36460905 100644 --- a/pkg/scheduler/plugins/coscheduling/coscheduling_test.go +++ b/pkg/scheduler/plugins/coscheduling/coscheduling_test.go @@ -18,10 +18,24 @@ package coscheduling import ( "context" + "encoding/json" + "fmt" + "math" + "math/rand" + "strings" "sync" "testing" "time" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/events" + "k8s.io/kube-scheduler/config/v1beta3" + "k8s.io/kubernetes/pkg/scheduler" + configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" + "k8s.io/kubernetes/pkg/scheduler/profile" + "k8s.io/utils/pointer" + + "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext/helper" "github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/coscheduling/core" "github.com/stretchr/testify/assert" @@ -136,6 +150,7 @@ func makePg(name, namespace string, min int32, creationTime *time.Time, minResou } return pg } + func (f *testSharedLister) NodeInfos() framework.NodeInfoLister { return f } @@ -187,10 +202,13 @@ func newPluginTestSuit(t *testing.T, nodes []*corev1.Node, pgClientSet pgclients schedulertesting.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), schedulertesting.RegisterQueueSortPlugin(Name, proxyNew), schedulertesting.RegisterPreFilterPlugin(Name, proxyNew), + schedulertesting.RegisterReservePlugin(Name, proxyNew), schedulertesting.RegisterPermitPlugin(Name, proxyNew), + schedulertesting.RegisterPluginAsExtensions(Name, proxyNew, "PostBind"), } informerFactory := informers.NewSharedInformerFactory(cs, 0) + informerFactory = helper.NewForceSyncSharedInformerFactory(informerFactory) snapshot := newTestSharedLister(nil, nodes) fh, err := schedulertesting.NewFramework( registeredPlugins, @@ -252,6 +270,9 @@ func TestLess(t *testing.T) { "pod-group.scheduling.sigs.k8s.io/name": "gangC", }, }, + Spec: corev1.PodSpec{ + NodeName: "fake-node", + }, } // GangB by PodGroup gangBCreatTime := now.Add(5 * time.Second) @@ -413,18 +434,6 @@ func TestLess(t *testing.T) { }, expected: true, // p1 should be ahead of p2 in the queue }, - { - name: "equal priority, p1 is added to schedulingQ earlier than p2, but p1 belongs to gangB", - p1: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangB_ns).Name("pod1").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "gangB").Obj()), - InitialAttemptTimestamp: earltTime, - }, - p2: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangA_ns).Name("pod2").Priority(highPriority).Obj()), - InitialAttemptTimestamp: lateTime, - }, - expected: false, // p2 should be ahead of p1 in the queue - }, { name: "p1.priority less than p2.priority, p1 belongs to gangA and p2 belongs to gangB", p1: &framework.QueuedPodInfo{ @@ -437,20 +446,7 @@ func TestLess(t *testing.T) { expected: false, // p1 should be ahead of p2 in the queue }, { - name: "equal priority. p2 is added to schedulingQ earlier than p1, p1 belongs to gangA and p2 belongs to gangB", - p1: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangA_ns).Name("pod1").Priority(highPriority).Obj()), - Timestamp: lateTime, - }, - annotations: map[string]string{extension.AnnotationGangName: "gangA"}, - p2: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangB_ns).Name("pod2").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "gangB").Obj()), - Timestamp: earltTime, - }, - expected: false, // p1 should be ahead of p2 in the queue - }, - { - name: "equal priority and creation time, both belongs to gangB", + name: "equal priority and creation time, both belongs to gangB, earlier lastScheduleTime pod take precedence", p1: &framework.QueuedPodInfo{ PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangB_ns).Name("pod1").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "gangB").Obj()), Timestamp: lateTime, @@ -476,16 +472,16 @@ func TestLess(t *testing.T) { expected: false, // p1 should be ahead of p2 in the queue }, { - name: "equal priority and creation time, p1 belongs to gangA that has been satisfied", + name: "equal priority and creation time, p1 belongs to gang that has been satisfied", p1: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangB_ns).Name("pod1").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "gangD").Obj()), - Timestamp: lateTime, + PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangC_ns).Name("pod1").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "gangD").Obj()), + Timestamp: earltTime, }, p2: &framework.QueuedPodInfo{ PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangC_ns).Name("pod2").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "gangC").Obj()), Timestamp: earltTime, }, - expected: false, + expected: true, }, } { t.Run(tt.name, func(t *testing.T) { @@ -903,3 +899,410 @@ func TestUnreserve(t *testing.T) { }) } } + +func TestFairness(t *testing.T) { + gangNames := []string{"gangA", "gangB", "gangC", "gangD", "gangE", "gangF", "gangG", "gangH", "gangI", "gangJ"} + gangGroups := map[string][]string{ + "gangA": {"default/gangA", "default/gangB", "default/gangC"}, + "gangB": {"default/gangA", "default/gangB", "default/gangC"}, + "gangC": {"default/gangA", "default/gangB", "default/gangC"}, + "gangD": {"default/gangD", "default/gangE"}, + "gangE": {"default/gangD", "default/gangE"}, + "gangF": {"default/gangF", "default/gangG"}, + "gangG": {"default/gangF", "default/gangG"}, + "gangH": {"default/gangH", "default/gangI", "default/gangJ"}, + "gangI": {"default/gangH", "default/gangI", "default/gangJ"}, + "gangJ": {"default/gangH", "default/gangI", "default/gangJ"}, + } + gangMinRequiredNums := []int{20, 10, 32, 20, 20, 18, 43, 20, 30, 20} + gangInjectFilterError := []bool{false, false, true, false, false, true, true, false, false, true} + var gangInjectFilterErrorIndex []int + for _, gangMinRequiredNum := range gangMinRequiredNums { + gangInjectFilterErrorIndex = append(gangInjectFilterErrorIndex, rand.Intn(gangMinRequiredNum)) + } + + podGroupCRs := map[string]*v1alpha1.PodGroup{} + memberPodsOfGang := map[string][]*corev1.Pod{} + var allPods []*corev1.Pod + for i, gangName := range gangNames { + gangID := util.GetId("default", gangName) + + podGroup := makePg(gangName, "default", int32(gangMinRequiredNums[i]), nil, nil) + podGroup.Spec.ScheduleTimeoutSeconds = pointer.Int32(3600) + gangGroup := gangGroups[gangName] + rawGangGroup, err := json.Marshal(gangGroup) + assert.NoError(t, err) + podGroup.Annotations = map[string]string{extension.AnnotationGangGroups: string(rawGangGroup)} + podGroupCRs[gangID] = podGroup + + for j := 0; j < gangMinRequiredNums[i]; j++ { + memberPodOfGang := st.MakePod(). + Namespace("default"). + Name(fmt.Sprintf("%s_%d", gangName, j)). + UID(fmt.Sprintf("%s_%d", gangName, j)). + Label(v1alpha1.PodGroupLabel, gangName). + SchedulerName("koord-scheduler").Obj() + if gangInjectFilterError[i] && j == gangInjectFilterErrorIndex[i] { + memberPodOfGang.Labels["filterError"] = "true" + } + memberPodsOfGang[gangID] = append(memberPodsOfGang[gangID], memberPodOfGang) + allPods = append(allPods, memberPodOfGang) + } + } + + pgClientSet := fakepgclientset.NewSimpleClientset() + cs := kubefake.NewSimpleClientset() + for _, podGroupCR := range podGroupCRs { + _, err := pgClientSet.SchedulingV1alpha1().PodGroups(podGroupCR.Namespace).Create(context.Background(), podGroupCR, metav1.CreateOptions{}) + assert.NoError(t, err) + } + rand.Shuffle(len(allPods), func(i, j int) { + allPods[i], allPods[j] = allPods[j], allPods[i] + }) + for _, pod := range allPods { + _, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + assert.NoError(t, err) + } + + cfg := configtesting.V1beta3ToInternalWithDefaults(t, v1beta3.KubeSchedulerConfiguration{ + Profiles: []v1beta3.KubeSchedulerProfile{{ + SchedulerName: pointer.StringPtr("koord-scheduler"), + Plugins: &v1beta3.Plugins{ + QueueSort: v1beta3.PluginSet{ + Enabled: []v1beta3.Plugin{ + {Name: "fakeQueueSortPlugin"}, + }, + Disabled: []v1beta3.Plugin{ + {Name: "*"}, + }, + }, + }, + }}}) + + suit := newPluginTestSuit(t, nil, pgClientSet, cs) + registry := frameworkruntime.Registry{ + "fakeQueueSortPlugin": func(_ apiruntime.Object, fh framework.Handle) (framework.Plugin, error) { + return suit.plugin.(framework.QueueSortPlugin), nil + }, + } + + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: cs.EventsV1()}) + ctx := context.TODO() + sched, err := scheduler.New( + cs, + suit.SharedInformerFactory(), + nil, + profile.NewRecorderFactory(eventBroadcaster), + ctx.Done(), + scheduler.WithProfiles(cfg.Profiles...), + scheduler.WithFrameworkOutOfTreeRegistry(registry), + scheduler.WithPodInitialBackoffSeconds(1), + scheduler.WithPodMaxBackoffSeconds(1), + scheduler.WithPodMaxInUnschedulablePodsDuration(0), + ) + assert.NoError(t, err) + eventBroadcaster.StartRecordingToSink(ctx.Done()) + suit.start() + sched.SchedulingQueue.Run() + + var scheduleOrder []*debugPodScheduleInfo + + for i := 0; i < 3; i++ { + for j := 0; j < len(allPods); j++ { + simulateScheduleOne(t, ctx, sched, suit, &scheduleOrder, func(pod *corev1.Pod) bool { + if gangName := util.GetGangNameByPod(pod); gangName == "gangD" || gangName == "gangE" { + waitingPod := 0 + gangSummaries := suit.plugin.(*Coscheduling).pgMgr.GetGangSummaries() + for _, gangSummary := range gangSummaries { + if !gangSummary.ScheduleCycleValid { + continue + } + waitingPod += gangSummary.WaitingForBindChildren.Len() + } + + return waitingPod >= 40 + } + return pod.Labels["filterError"] == "true" + }) + } + } + + waitingPodNum := 0 + suit.Handle.IterateOverWaitingPods(func(pod framework.WaitingPod) { + waitingPodNum++ + }) + minGangSchedulingCycle, maxGangSchedulingCycle := math.MaxInt, 0 + gangSummaries := suit.plugin.(*Coscheduling).pgMgr.GetGangSummaries() + + nonZeroWaitingBoundGroup := map[string]bool{} + for _, gangSummary := range gangSummaries { + if gangSummary.Name == "default/gangD" || gangSummary.Name == "default/gangE" { + assert.True(t, gangSummary.OnceResourceSatisfied) + assert.Zero(t, len(gangSummary.WaitingForBindChildren)) + continue + } + if len(gangSummary.WaitingForBindChildren) != 0 { + nonZeroWaitingBoundGroup[strings.Join(gangSummary.GangGroup, ",")] = true + } + if gangSummary.ScheduleCycle < minGangSchedulingCycle { + minGangSchedulingCycle = gangSummary.ScheduleCycle + } + if gangSummary.ScheduleCycle > maxGangSchedulingCycle { + maxGangSchedulingCycle = gangSummary.ScheduleCycle + } + } + assert.LessOrEqual(t, 3, minGangSchedulingCycle) + for _, info := range scheduleOrder { + klog.Infoln(info) + } +} + +type debugPodScheduleInfo struct { + podID string + result string + childScheduleCycle int + gangScheduleCycle int + hasWaitForBoundChildren bool + schedulingCycleValid bool + lastScheduleTime time.Time +} + +func (p *debugPodScheduleInfo) String() string { + return fmt.Sprintf("%s,%s,%d,%d,%t,%s,%t", p.podID, p.lastScheduleTime, p.childScheduleCycle, p.gangScheduleCycle, p.schedulingCycleValid, p.result, p.hasWaitForBoundChildren) +} + +func simulateScheduleOne(t *testing.T, ctx context.Context, sched *scheduler.Scheduler, suit *pluginTestSuit, scheduleOrder *[]*debugPodScheduleInfo, injectFilterErr func(pod *corev1.Pod) bool) { + podInfo := sched.NextPod() + pod := podInfo.Pod + + scheduleInfo := &debugPodScheduleInfo{ + podID: pod.Name, + childScheduleCycle: suit.plugin.(*Coscheduling).pgMgr.GetChildScheduleCycle(pod), + } + summary, exists := suit.plugin.(*Coscheduling).pgMgr.GetGangSummary(util.GetId(pod.Namespace, util.GetGangNameByPod(pod))) + if exists { + scheduleInfo.gangScheduleCycle = summary.ScheduleCycle + scheduleInfo.schedulingCycleValid = summary.ScheduleCycleValid + scheduleInfo.hasWaitForBoundChildren = summary.WaitingForBindChildren.Len() != 0 + scheduleInfo.lastScheduleTime = suit.plugin.(*Coscheduling).pgMgr.GetGangGroupLastScheduleTimeOfPod(pod, time.Time{}) + } + *scheduleOrder = append(*scheduleOrder, scheduleInfo) + fwk := suit.Handle.(framework.Framework) + klog.InfoS("Attempting to schedule pod", "pod", klog.KObj(pod)) + + // Synchronously attempt to find a fit for the pod. + state := framework.NewCycleState() + // Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty. + podsToActivate := framework.NewPodsToActivate() + state.Write(framework.PodsToActivateKey, podsToActivate) + + schedulingCycleCtx, cancel := context.WithCancel(ctx) + defer cancel() + scheduleResult, err := schedulePod(schedulingCycleCtx, fwk, state, pod, scheduleInfo, injectFilterErr(pod)) + if err != nil { + // SchedulePod() may have failed because the pod would not fit on any host, so we try to + // preempt, with the expectation that the next time the pod is tried for scheduling it + // will fit due to the preemption. It is also possible that a different pod will schedule + // into the resources that were preempted, but this is harmless. + if fitError, ok := err.(*framework.FitError); ok { + // Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle. + _, status := suit.plugin.(*Coscheduling).PostFilter(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap) + assert.False(t, status.IsSuccess()) + } + klog.Info("sched.Error:" + podInfo.Pod.Name) + sched.Error(podInfo, err) + return + } + + // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet. + // This allows us to keep scheduling without waiting on binding to occur. + assumedPodInfo := podInfo.DeepCopy() + assumedPod := assumedPodInfo.Pod + assumedPod.Spec.NodeName = scheduleResult.SuggestedHost + scheduleInfo.result = "assumed" + + // Run "permit" plugins. + runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() { + // One of the plugins returned status different from success or wait. + fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + klog.Info("sched.Error:" + assumedPodInfo.Pod.Name) + sched.Error(assumedPodInfo, runPermitStatus.AsError()) + return + } + + // At the end of a successful scheduling cycle, pop and move up Pods if needed. + if len(podsToActivate.Map) != 0 { + sched.SchedulingQueue.Activate(podsToActivate.Map) + // Clear the entries after activation. + podsToActivate.Map = make(map[string]*corev1.Pod) + } + + // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). + go func() { + bindingCycleCtx, cancel := context.WithCancel(ctx) + defer cancel() + + waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod) + if !waitOnPermitStatus.IsSuccess() { + // trigger un-reserve plugins to clean up state associated with the reserved Pod + fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + klog.Info("sched.Error:" + assumedPodInfo.Pod.Name) + sched.Error(assumedPodInfo, waitOnPermitStatus.AsError()) + return + } + + // Run "PostBind" plugins. + fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + + // At the end of a successful binding cycle, move up Pods if needed. + if len(podsToActivate.Map) != 0 { + sched.SchedulingQueue.Activate(podsToActivate.Map) + // Unlike the logic in scheduling cycle, we don't bother deleting the entries + // as `podsToActivate.Map` is no longer consumed. + } + }() +} + +func schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *corev1.Pod, info *debugPodScheduleInfo, injectFilterError bool) (result scheduler.ScheduleResult, err error) { + diagnosis := framework.Diagnosis{ + NodeToStatusMap: make(framework.NodeToStatusMap), + UnschedulablePlugins: sets.NewString(), + } + + // Run "prefilter" plugins. + _, s := fwk.RunPreFilterPlugins(ctx, state, pod) + if !s.IsSuccess() { + info.result = "PreFiler" + } else if injectFilterError { + info.result = "Filter" + } + if info.result != "" { + return result, &framework.FitError{ + Pod: pod, + NumAllNodes: 1, + Diagnosis: diagnosis, + } + } + return scheduler.ScheduleResult{ + SuggestedHost: "fake-host", + }, err +} + +func TestDeadLockFree(t *testing.T) { + gangNames := []string{"gangA", "gangB", "gangC", "gangD"} + gangGroups := map[string][]string{ + "gangA": {"default/gangA"}, + "gangB": {"default/gangB"}, + "gangC": {"default/gangC"}, + "gangD": {"default/gangD"}, + } + gangMinRequiredNums := []int{13, 13, 13, 13} + + podGroupCRs := map[string]*v1alpha1.PodGroup{} + memberPodsOfGang := map[string][]*corev1.Pod{} + var allPods []*corev1.Pod + for i, gangName := range gangNames { + gangID := util.GetId("default", gangName) + + podGroup := makePg(gangName, "default", int32(gangMinRequiredNums[i]), nil, nil) + podGroup.Spec.ScheduleTimeoutSeconds = pointer.Int32(3600) + gangGroup := gangGroups[gangName] + rawGangGroup, err := json.Marshal(gangGroup) + assert.NoError(t, err) + podGroup.Annotations = map[string]string{extension.AnnotationGangGroups: string(rawGangGroup)} + podGroupCRs[gangID] = podGroup + + for j := 0; j < gangMinRequiredNums[i]; j++ { + memberPodOfGang := st.MakePod(). + Namespace("default"). + Name(fmt.Sprintf("%s_%d", gangName, j)). + UID(fmt.Sprintf("%s_%d", gangName, j)). + Label(v1alpha1.PodGroupLabel, gangName). + SchedulerName("koord-scheduler").Obj() + memberPodsOfGang[gangID] = append(memberPodsOfGang[gangID], memberPodOfGang) + allPods = append(allPods, memberPodOfGang) + } + } + + pgClientSet := fakepgclientset.NewSimpleClientset() + cs := kubefake.NewSimpleClientset() + for _, podGroupCR := range podGroupCRs { + _, err := pgClientSet.SchedulingV1alpha1().PodGroups(podGroupCR.Namespace).Create(context.Background(), podGroupCR, metav1.CreateOptions{}) + assert.NoError(t, err) + } + rand.Shuffle(len(allPods), func(i, j int) { + allPods[i], allPods[j] = allPods[j], allPods[i] + }) + for _, pod := range allPods { + _, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + assert.NoError(t, err) + } + + cfg := configtesting.V1beta3ToInternalWithDefaults(t, v1beta3.KubeSchedulerConfiguration{ + Profiles: []v1beta3.KubeSchedulerProfile{{ + SchedulerName: pointer.StringPtr("koord-scheduler"), + Plugins: &v1beta3.Plugins{ + QueueSort: v1beta3.PluginSet{ + Enabled: []v1beta3.Plugin{ + {Name: "fakeQueueSortPlugin"}, + }, + Disabled: []v1beta3.Plugin{ + {Name: "*"}, + }, + }, + }, + }}}) + + suit := newPluginTestSuit(t, nil, pgClientSet, cs) + registry := frameworkruntime.Registry{ + "fakeQueueSortPlugin": func(_ apiruntime.Object, fh framework.Handle) (framework.Plugin, error) { + return suit.plugin.(framework.QueueSortPlugin), nil + }, + } + + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: cs.EventsV1()}) + ctx := context.TODO() + sched, err := scheduler.New(cs, + suit.SharedInformerFactory(), + nil, + profile.NewRecorderFactory(eventBroadcaster), + ctx.Done(), + scheduler.WithProfiles(cfg.Profiles...), + scheduler.WithFrameworkOutOfTreeRegistry(registry), + scheduler.WithPodInitialBackoffSeconds(1), + scheduler.WithPodMaxBackoffSeconds(1), + scheduler.WithPodMaxInUnschedulablePodsDuration(0), + ) + + assert.NoError(t, err) + eventBroadcaster.StartRecordingToSink(ctx.Done()) + suit.start() + sched.SchedulingQueue.Run() + + var scheduleOrder []*debugPodScheduleInfo + + for i := 0; i < 3; i++ { + for j := 0; j < len(allPods); j++ { + if len(sched.SchedulingQueue.PendingPods()) == 0 { + break + } + + simulateScheduleOne(t, ctx, sched, suit, &scheduleOrder, func(pod *corev1.Pod) bool { + waitingPod := 0 + gangSummaries := suit.plugin.(*Coscheduling).pgMgr.GetGangSummaries() + for _, gangSummary := range gangSummaries { + if gangSummary.ScheduleCycleValid == false { + continue + } + waitingPod += gangSummary.WaitingForBindChildren.Len() + } + return waitingPod >= 13 + }) + } + } + assert.Equal(t, 0, len(sched.SchedulingQueue.PendingPods())) + for _, info := range scheduleOrder { + klog.Infoln(info) + } +} diff --git a/pkg/scheduler/plugins/coscheduling/plugin_service_test.go b/pkg/scheduler/plugins/coscheduling/plugin_service_test.go index 619c10395..13a5c3ff7 100644 --- a/pkg/scheduler/plugins/coscheduling/plugin_service_test.go +++ b/pkg/scheduler/plugins/coscheduling/plugin_service_test.go @@ -110,6 +110,7 @@ func TestEndpointsQueryGangInfo(t *testing.T) { gangMarshal := &core.GangSummary{} err = json.NewDecoder(w.Result().Body).Decode(gangMarshal) assert.NoError(t, err) + gangMarshal.LastScheduleTime = gangExpected.LastScheduleTime assert.Equal(t, &gangExpected, gangMarshal) } { @@ -122,6 +123,7 @@ func TestEndpointsQueryGangInfo(t *testing.T) { gangMarshalMap := make(map[string]*core.GangSummary) err = json.Unmarshal([]byte(w.Body.String()), &gangMarshalMap) assert.NoError(t, err) + gangMarshalMap["ganga_ns/ganga"].LastScheduleTime = gangExpected.LastScheduleTime assert.Equal(t, &gangExpected, gangMarshalMap["ganga_ns/ganga"]) } }