Skip to content

Commit

Permalink
scheduler: assure Fairness and DeadlockFree (koordinator-sh#1996)
Browse files Browse the repository at this point in the history
Signed-off-by: wangjianyu.wjy <wangjianyu.wjy@alibaba-inc.com>
Co-authored-by: wangjianyu.wjy <wangjianyu.wjy@alibaba-inc.com>
Signed-off-by: george <xiangzhihua@gmail.com>
  • Loading branch information
2 people authored and georgexiang committed Apr 15, 2024
1 parent e4c83e6 commit b175fe8
Show file tree
Hide file tree
Showing 7 changed files with 560 additions and 90 deletions.
58 changes: 33 additions & 25 deletions pkg/scheduler/plugins/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type Manager interface {
GetGangSummaries() map[string]*GangSummary
IsGangMinSatisfied(*corev1.Pod) bool
GetChildScheduleCycle(*corev1.Pod) int
GetGangGroupWaitingBoundPodNum(pod *corev1.Pod) int
GetGangGroupLastScheduleTimeOfPod(pod *corev1.Pod, podLastScheduleTime time.Time) time.Time
}

// PodGroupManager defines the scheduling operation called
Expand Down Expand Up @@ -227,6 +227,7 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er
return fmt.Errorf("can't find gang, gangName: %v, podName: %v", util.GetId(pod.Namespace, util.GetGangNameByPod(pod)),
util.GetId(pod.Namespace, pod.Name))
}
pgMgr.cache.setGangGroupLastScheduleTimeOfPod(pod.UID, gang.getLastScheduleTime())

// check if gang is initialized
if !gang.HasGangInit {
Expand Down Expand Up @@ -284,6 +285,9 @@ func (pgMgr *PodGroupManager) PostFilter(ctx context.Context, pod *corev1.Pod, h
klog.Warningf(message)
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable, message)
}
defer func() {
pgMgr.cache.setGangGroupLastScheduleTimeOfPod(pod.UID, gang.getLastScheduleTime())
}()
if gang.getGangMatchPolicy() == extension.GangMatchPolicyOnceSatisfied && gang.isGangOnceResourceSatisfied() {
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable)
}
Expand Down Expand Up @@ -350,6 +354,7 @@ func (pgMgr *PodGroupManager) Unreserve(ctx context.Context, state *framework.Cy
klog.Warningf("Pod %q missing Gang", klog.KObj(pod))
return
}
pgMgr.cache.setGangGroupLastScheduleTimeOfPod(pod.UID, gang.getLastScheduleTime())
// first delete the pod from gang's waitingFroBindChildren map
gang.delAssumedPod(pod)

Expand All @@ -370,25 +375,35 @@ func (pgMgr *PodGroupManager) rejectGangGroupById(handle framework.Handle, plugi
gangGroup := gang.getGangGroup()
gangSet := sets.NewString(gangGroup...)

rejectedPodCount := 0
if handle != nil {
handle.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
waitingGangId := util.GetId(waitingPod.GetPod().Namespace, util.GetGangNameByPod(waitingPod.GetPod()))
if gangSet.Has(waitingGangId) {
klog.V(1).InfoS("GangGroup gets rejected due to member Gang is unschedulable",
"gang", gangId, "waitingGang", waitingGangId, "waitingPod", klog.KObj(waitingPod.GetPod()))
waitingPod.Reject(pluginName, message)
rejectedPodCount++
}
})
/*
let's explain why we need defer by following example, we have three gang of one gang group, noted as gangA, gangB and gangC,
the following is what happened when no defer.
1. gangB/C assumed, gangA failed
2. then gangA invoke PostFilter, reject gangB|gangC, gangB|gangC invoke unreserve asynchronously, i.e: invoke rejectGangGroupById concurrently
3. gangB or gangC maybe find it's scheduling cycle valid if it judges scheduling cycle before gang A reject it, then different lastScheduleTime is possible
*/
defer func() {
handle.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
waitingGangId := util.GetId(waitingPod.GetPod().Namespace, util.GetGangNameByPod(waitingPod.GetPod()))
if gangSet.Has(waitingGangId) {
klog.V(1).InfoS("GangGroup gets rejected due to member Gang is unschedulable",
"gang", gangId, "waitingGang", waitingGangId, "waitingPod", klog.KObj(waitingPod.GetPod()))
waitingPod.Reject(pluginName, message)
}
})
}()
}
if rejectedPodCount == 0 {
if !gang.isScheduleCycleValid() {
// In a schedule cycle, one gang can only reject its self and sibling gang once
return
}
gangGroupLastScheduleTime := timeNowFn()
for gang := range gangSet {
gangIns := pgMgr.cache.getGangFromCacheByGangId(gang, false)
if gangIns != nil {
gangIns.setScheduleCycleValid(false)
gangIns.setLastScheduleTime(gangGroupLastScheduleTime)
}
}
}
Expand All @@ -405,6 +420,7 @@ func (pgMgr *PodGroupManager) PostBind(ctx context.Context, pod *corev1.Pod, nod
}
// first update gang in cache
gang.addBoundPod(pod)
pgMgr.cache.deleteGangGroupLastScheduleTimeOfPod(pod)

// update PodGroup
_, pg := pgMgr.GetPodGroup(pod)
Expand Down Expand Up @@ -554,18 +570,10 @@ func (pgMgr *PodGroupManager) GetChildScheduleCycle(pod *corev1.Pod) int {
return gang.getChildScheduleCycle(pod)
}

func (pgMgr *PodGroupManager) GetGangGroupWaitingBoundPodNum(pod *corev1.Pod) int {
gang := pgMgr.GetGangByPod(pod)
if gang == nil {
return 0
}
gangGroup := gang.GangGroup
waitingPodNum := 0
for _, memberGangID := range gangGroup {
memberGang := pgMgr.cache.getGangFromCacheByGangId(memberGangID, false)
if memberGang != nil {
waitingPodNum += memberGang.getGangWaitingPods()
}
func (pgMgr *PodGroupManager) GetGangGroupLastScheduleTimeOfPod(pod *corev1.Pod, podLastScheduleTime time.Time) time.Time {
gangGroupLastScheduleTime := pgMgr.cache.getGangGroupLastScheduleTimeOfPod(pod.UID)
if gangGroupLastScheduleTime != nil {
return *gangGroupLastScheduleTime
}
return waitingPodNum
return podLastScheduleTime
}
33 changes: 32 additions & 1 deletion pkg/scheduler/plugins/coscheduling/core/gang.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ type Gang struct {
// we set the pod's cycle in `childrenScheduleRoundMap` equal with `scheduleCycle` and pass the check. If result is negative, means
// the pod has been scheduled in this cycle, so we should reject it. With `totalChildrenNum`'s help, when the last pod comes to make all
// `childrenScheduleRoundMap`'s values equal to `scheduleCycle`, Gang's `scheduleCycle` will be added by 1, which means a new schedule cycle.
ScheduleCycle int
ScheduleCycle int
// Pods with the same priority are scheduled according to the scheduling time of the previous round to ensure fairness.
// For a Gang, its last round time as a whole is recorded as LastScheduleTime, that is, all Pods below it have the same time, which is LastScheduleTime.
LastScheduleTime time.Time
ChildrenScheduleRoundMap map[string]int

GangFrom string
Expand All @@ -98,6 +101,7 @@ func NewGang(gangName string) *Gang {
BoundChildren: make(map[string]*v1.Pod),
ScheduleCycleValid: true,
ScheduleCycle: 1,
LastScheduleTime: timeNowFn(),
ChildrenScheduleRoundMap: make(map[string]int),
GangFrom: GangFromPodAnnotation,
HasGangInit: false,
Expand Down Expand Up @@ -149,6 +153,7 @@ func (gang *Gang) tryInitByPodConfig(pod *v1.Pod, args *config.CoschedulingArgs)

// here we assume that Coscheduling's CreateTime equal with the pod's CreateTime
gang.CreateTime = pod.CreationTimestamp.Time
gang.LastScheduleTime = pod.CreationTimestamp.Time

waitTime, err := time.ParseDuration(pod.Annotations[extension.AnnotationGangWaitTime])
if err != nil || waitTime <= 0 {
Expand Down Expand Up @@ -215,6 +220,7 @@ func (gang *Gang) tryInitByPodGroup(pg *v1alpha1.PodGroup, args *config.Coschedu

// here we assume that Coscheduling's CreateTime equal with the podGroup CRD CreateTime
gang.CreateTime = pg.CreationTimestamp.Time
gang.LastScheduleTime = pg.CreationTimestamp.Time

waitTime := util.GetWaitTimeDuration(pg, args.DefaultTimeout.Duration)
gang.WaitTime = waitTime
Expand Down Expand Up @@ -382,6 +388,19 @@ func (gang *Gang) setScheduleCycleValid(valid bool) {
gang.lock.Lock()
defer gang.lock.Unlock()

if !valid && !gang.ScheduleCycleValid {
/*
let's explain why by following example, there are three gang of one group: A, B and C, every gang group have min-member of 10 pod, noted as A1-A10, B1-B10, C1-C10.
1. A1-A5 assumed in gangA scheduleCycle 1, B1-B5 assumed in gangA scheduleCycle 1, C1-C5 assumed in gangA scheduleCycle 1, C6 filter failed due to insufficient resource, then A6-10 failed due to cycle invalid, C7-C10 failed due to cycle invalid, B6-B9 failed due to cycle invalid
2. A1-A5 assumed in gangA scheduleCycle 2, C1-C5 assumed in gangA scheduleCycle 2, C6 failed due to insufficient resource and gangA\B\C cycle set to invalid,
3. then A6-A10,C7-C10 failed due to cycle invalid
4. then B10 failed due to cycle invalid, B1 comes and find its gang scheduling cycle 2 can be valid, so it's assumed.
5. however all it's sibling will come until next round of all gangs, these gangs will face insufficient resource due to pre-allocated of B10
TODO this logic can be optimized by give gangs of same group same scheduling cycle
*/
gang.ScheduleCycle += 1
}
gang.ScheduleCycleValid = valid
klog.Infof("SetScheduleCycleValid, gangName: %v, valid: %v", gang.Name, valid)
}
Expand Down Expand Up @@ -494,3 +513,15 @@ func (gang *Gang) isGangValidForPermit() bool {
return len(gang.WaitingForBindChildren) >= gang.MinRequiredNumber || gang.OnceResourceSatisfied == true
}
}

func (gang *Gang) getLastScheduleTime() time.Time {
gang.lock.Lock()
defer gang.lock.Unlock()
return gang.LastScheduleTime
}

func (gang *Gang) setLastScheduleTime(time time.Time) {
gang.lock.Lock()
defer gang.lock.Unlock()
gang.LastScheduleTime = time
}
50 changes: 38 additions & 12 deletions pkg/scheduler/plugins/coscheduling/core/gang_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package core

import (
"sync"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/scheduler-plugins/pkg/apis/scheduling/v1alpha1"
Expand All @@ -32,22 +34,24 @@ import (
)

type GangCache struct {
lock *sync.RWMutex
gangItems map[string]*Gang
pluginArgs *config.CoschedulingArgs
podLister listerv1.PodLister
pgLister pglister.PodGroupLister
pgClient pgclientset.Interface
lock *sync.RWMutex
gangItems map[string]*Gang
pluginArgs *config.CoschedulingArgs
podLister listerv1.PodLister
pgLister pglister.PodGroupLister
pgClient pgclientset.Interface
gangGroupLastScheduleTimeOfPod map[types.UID]*time.Time
}

func NewGangCache(args *config.CoschedulingArgs, podLister listerv1.PodLister, pgLister pglister.PodGroupLister, client pgclientset.Interface) *GangCache {
return &GangCache{
gangItems: make(map[string]*Gang),
lock: new(sync.RWMutex),
pluginArgs: args,
podLister: podLister,
pgLister: pgLister,
pgClient: client,
gangItems: make(map[string]*Gang),
lock: new(sync.RWMutex),
pluginArgs: args,
podLister: podLister,
pgLister: pgLister,
pgClient: client,
gangGroupLastScheduleTimeOfPod: make(map[types.UID]*time.Time),
}
}

Expand All @@ -63,6 +67,26 @@ func (gangCache *GangCache) getGangFromCacheByGangId(gangId string, createIfNotE
return gang
}

func (gangCache *GangCache) getGangGroupLastScheduleTimeOfPod(podUID types.UID) *time.Time {
gangCache.lock.Lock()
defer gangCache.lock.Unlock()
return gangCache.gangGroupLastScheduleTimeOfPod[podUID]
}

func (gangCache *GangCache) setGangGroupLastScheduleTimeOfPod(podUID types.UID, lastScheduleTime time.Time) {
gangCache.lock.Lock()
defer gangCache.lock.Unlock()
gangCache.gangGroupLastScheduleTimeOfPod[podUID] = &lastScheduleTime
}

func (gangCache *GangCache) deleteGangGroupLastScheduleTimeOfPod(pod *v1.Pod) {
gangCache.lock.Lock()
defer gangCache.lock.Unlock()

delete(gangCache.gangGroupLastScheduleTimeOfPod, pod.UID)
klog.Infof("delete podScheduleInfo from cache, pod: %s/%s/%v", pod.Namespace, pod.Name, pod.UID)
}

func (gangCache *GangCache) getAllGangsFromCache() map[string]*Gang {
gangCache.lock.RLock()
defer gangCache.lock.RUnlock()
Expand Down Expand Up @@ -106,6 +130,7 @@ func (gangCache *GangCache) onPodAdd(obj interface{}) {
if pod.Spec.NodeName != "" {
gang.addBoundPod(pod)
gang.setResourceSatisfied()
gangCache.deleteGangGroupLastScheduleTimeOfPod(pod)
}
}

Expand Down Expand Up @@ -136,6 +161,7 @@ func (gangCache *GangCache) onPodDelete(obj interface{}) {
if gangName == "" {
return
}
gangCache.deleteGangGroupLastScheduleTimeOfPod(pod)

gangNamespace := pod.Namespace
gangId := util.GetId(gangNamespace, gangName)
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/plugins/coscheduling/core/gang_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type GangSummary struct {
OnceResourceSatisfied bool `json:"onceResourceSatisfied"`
ScheduleCycleValid bool `json:"scheduleCycleValid"`
ScheduleCycle int `json:"scheduleCycle"`
LastScheduleTime time.Time `json:"lastScheduleTime"`
ChildrenScheduleRoundMap map[string]int `json:"childrenScheduleRoundMap"`
GangFrom string `json:"gangFrom"`
HasGangInit bool `json:"hasGangInit"`
Expand Down Expand Up @@ -51,6 +52,7 @@ func (gang *Gang) GetGangSummary() *GangSummary {
gangSummary.OnceResourceSatisfied = gang.OnceResourceSatisfied
gangSummary.ScheduleCycleValid = gang.ScheduleCycleValid
gangSummary.ScheduleCycle = gang.ScheduleCycle
gangSummary.LastScheduleTime = gang.LastScheduleTime
gangSummary.GangFrom = gang.GangFrom
gangSummary.HasGangInit = gang.HasGangInit
gangSummary.GangGroup = append(gangSummary.GangGroup, gang.GangGroup...)
Expand Down
42 changes: 20 additions & 22 deletions pkg/scheduler/plugins/coscheduling/coscheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,30 +133,28 @@ func (cs *Coscheduling) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool {
return subPrio1 > subPrio2
}

group1, _ := cs.pgMgr.GetGroupId(podInfo1.Pod)
group2, _ := cs.pgMgr.GetGroupId(podInfo2.Pod)

waitingBoundPodNum1 := cs.pgMgr.GetGangGroupWaitingBoundPodNum(podInfo1.Pod)
waitingBoundPodNum2 := cs.pgMgr.GetGangGroupWaitingBoundPodNum(podInfo2.Pod)
if waitingBoundPodNum1 != 0 || waitingBoundPodNum2 != 0 {
// At the same time, only member pod of one podGroup should be assumed, so we prefer the pod already having sibling assumed, then they can succeed together.
if waitingBoundPodNum1 == 0 || waitingBoundPodNum2 == 0 {
return waitingBoundPodNum1 != 0
lastScheduleTime1 := cs.pgMgr.GetGangGroupLastScheduleTimeOfPod(podInfo1.Pod, podInfo1.Timestamp)
lastScheduleTime2 := cs.pgMgr.GetGangGroupLastScheduleTimeOfPod(podInfo2.Pod, podInfo2.Timestamp)
if !lastScheduleTime1.Equal(lastScheduleTime2) {
return lastScheduleTime1.Before(lastScheduleTime2)
}
gangId1 := util.GetId(podInfo1.Pod.Namespace, util.GetGangNameByPod(podInfo1.Pod))
gangId2 := util.GetId(podInfo2.Pod.Namespace, util.GetGangNameByPod(podInfo2.Pod))
// for member gang of same gangGroup, the gang that haven’t been satisfied yet take precedence
if gangId1 != gangId2 {
isGang1Satisfied := cs.pgMgr.IsGangMinSatisfied(podInfo1.Pod)
isGang2Satisfied := cs.pgMgr.IsGangMinSatisfied(podInfo2.Pod)
if isGang1Satisfied != isGang2Satisfied {
return !isGang1Satisfied
}
} else {
// for member pod of same gang, the pod with the smaller scheduling cycle take precedence so that gang scheduling cycle can be valid and iterated
childScheduleCycle1 := cs.pgMgr.GetChildScheduleCycle(podInfo1.Pod)
childScheduleCycle2 := cs.pgMgr.GetChildScheduleCycle(podInfo2.Pod)
if childScheduleCycle1 != childScheduleCycle2 {
return childScheduleCycle1 < childScheduleCycle2
}
/*
Two gang groups may both already have some assumed sibling pods.
For example:
1. GroupA has submitted 6 member, and have 5 already assumed;
2. then the sixth has been deleted;
3. then GroupB submitted its pods and have 3 already assumed;
4. GroupA submit the sixth pod
In this case, waitingPodNum will make no sense, so just sort it by group id to give fixed order.
Because no matter former succeed or fail, it's waitingPodNum will be zeroed. And the deadlock will be avoided
*/
return group1 < group2
}
// If no pod succeed, we will schedule all pod by RoundRobin to assure fairness.
// If some time-consuming member pod of one gang failed, then it's sibling will fail soon(because scheduling cycle invalid), so no need to assure all sibling should fail together.
return podInfo1.Timestamp.Before(podInfo2.Timestamp)
}

Expand Down
Loading

0 comments on commit b175fe8

Please sign in to comment.