Skip to content

Commit

Permalink
scheduler: assure Fairness and DeadlockFree
Browse files Browse the repository at this point in the history
Signed-off-by: wangjianyu.wjy <wangjianyu.wjy@alibaba-inc.com>
  • Loading branch information
wangjianyu.wjy committed Apr 8, 2024
1 parent ae23cc9 commit a60bfd2
Show file tree
Hide file tree
Showing 8 changed files with 638 additions and 79 deletions.
93 changes: 64 additions & 29 deletions pkg/scheduler/plugins/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ const (

// Manager defines the interfaces for PodGroup management.
type Manager interface {
PreFilter(context.Context, *corev1.Pod) error
PreFilter(context.Context, *corev1.Pod) (error, bool)
Permit(context.Context, *corev1.Pod) (time.Duration, Status)
PostBind(context.Context, *corev1.Pod, string)
PostFilter(context.Context, *corev1.Pod, framework.Handle, string, framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status)
Expand All @@ -73,7 +73,8 @@ type Manager interface {
GetGangSummaries() map[string]*GangSummary
IsGangMinSatisfied(*corev1.Pod) bool
GetChildScheduleCycle(*corev1.Pod) int
GetGangGroupWaitingBoundPodNum(pod *corev1.Pod) int
GetPodLastScheduleTime(pod *corev1.Pod, podLastScheduleTime time.Time) time.Time
GetPodScheduleInfo(pod *corev1.Pod) *PodScheduleInfo
}

// PodGroupManager defines the scheduling operation called
Expand Down Expand Up @@ -218,34 +219,36 @@ func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework
// iii.Check whether the Gang is OnceResourceSatisfied
// iv.Check whether the Gang has met the scheduleCycleValid check, and reject the pod if negative(only Strict mode ).
// v.Try update scheduleCycle, scheduleCycleValid, childrenScheduleRoundMap as mentioned above.
func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) error {
func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) (error, bool) {
if !util.IsPodNeedGang(pod) {
return nil
return nil, false
}
gang := pgMgr.GetGangByPod(pod)
if gang == nil {
return fmt.Errorf("can't find gang, gangName: %v, podName: %v", util.GetId(pod.Namespace, util.GetGangNameByPod(pod)),
util.GetId(pod.Namespace, pod.Name))
util.GetId(pod.Namespace, pod.Name)), false
}
podScheduleInfo := pgMgr.cache.getPodScheduleInfo(pod.UID, true)
podScheduleInfo.SetLastScheduleTime(gang.getLastScheduleTime())

// check if gang is initialized
if !gang.HasGangInit {
return fmt.Errorf("gang has not init, gangName: %v, podName: %v", gang.Name,
util.GetId(pod.Namespace, pod.Name))
util.GetId(pod.Namespace, pod.Name)), false
}
// resourceSatisfied means pod will directly pass the PreFilter
if gang.getGangMatchPolicy() == extension.GangMatchPolicyOnceSatisfied && gang.isGangOnceResourceSatisfied() {
return nil
return nil, false
}

// check minNum
if gang.getChildrenNum() < gang.getGangMinNum() {
return fmt.Errorf("gang child pod not collect enough, gangName: %v, podName: %v", gang.Name,
util.GetId(pod.Namespace, pod.Name))
util.GetId(pod.Namespace, pod.Name)), false
}

if pgMgr.args != nil && pgMgr.args.SkipCheckScheduleCycle {
return nil
return nil, false
}

// first try update the global cycle of gang
Expand All @@ -256,19 +259,19 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er
gangMode := gang.getGangMode()
if gangMode == extension.GangModeStrict {
if pod.Status.NominatedNodeName != "" {
return nil
return nil, false
}
podScheduleCycle := gang.getChildScheduleCycle(pod)
if !gang.isScheduleCycleValid() {
return fmt.Errorf("gang scheduleCycle not valid, gangName: %v, podName: %v",
gang.Name, util.GetId(pod.Namespace, pod.Name))
gang.Name, util.GetId(pod.Namespace, pod.Name)), true
}
if podScheduleCycle >= gangScheduleCycle {
return fmt.Errorf("pod's schedule cycle too large, gangName: %v, podName: %v, podCycle: %v, gangCycle: %v",
gang.Name, util.GetId(pod.Namespace, pod.Name), podScheduleCycle, gangScheduleCycle)
gang.Name, util.GetId(pod.Namespace, pod.Name), podScheduleCycle, gangScheduleCycle), true
}
}
return nil
return nil, false
}

// PostFilter
Expand All @@ -284,6 +287,12 @@ func (pgMgr *PodGroupManager) PostFilter(ctx context.Context, pod *corev1.Pod, h
klog.Warningf(message)
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable, message)
}
defer func() {
podScheduleInfo := pgMgr.cache.getPodScheduleInfo(pod.UID, false)
if podScheduleInfo != nil {
podScheduleInfo.SetLastScheduleTime(gang.getLastScheduleTime())
}
}()
if gang.getGangMatchPolicy() == extension.GangMatchPolicyOnceSatisfied && gang.isGangOnceResourceSatisfied() {
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable)
}
Expand Down Expand Up @@ -350,6 +359,11 @@ func (pgMgr *PodGroupManager) Unreserve(ctx context.Context, state *framework.Cy
klog.Warningf("Pod %q missing Gang", klog.KObj(pod))
return
}
podScheduleInfo := pgMgr.cache.getPodScheduleInfo(pod.UID, false)
if podScheduleInfo != nil {
podScheduleInfo.SetLastScheduleTime(gang.getLastScheduleTime())
podScheduleInfo.SetAlreadyBeenRejected(false)
}
// first delete the pod from gang's waitingFroBindChildren map
gang.delAssumedPod(pod)

Expand All @@ -373,22 +387,42 @@ func (pgMgr *PodGroupManager) rejectGangGroupById(handle framework.Handle, plugi
rejectedPodCount := 0
if handle != nil {
handle.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
/*
let's explain why by following example, we have one gang and min member of 20
1. gangA1-gangA10 assumed, gangA11 failed due to insufficient resource
2. then gangA11 invoke PostFilter, reject gangA1-gangA10, gangA1-gangA10 invoke unreserve asynchronously, i.e: invoke rejectGangGroupById asynchronously
3. so gangA11 should reject gangA1-gangA10 after alreadyBeenRejected and lastScheduleTime cached timely
*/
waitingGangId := util.GetId(waitingPod.GetPod().Namespace, util.GetGangNameByPod(waitingPod.GetPod()))
if gangSet.Has(waitingGangId) {
klog.V(1).InfoS("GangGroup gets rejected due to member Gang is unschedulable",
"gang", gangId, "waitingGang", waitingGangId, "waitingPod", klog.KObj(waitingPod.GetPod()))
waitingPod.Reject(pluginName, message)
rejectedPodCount++
pod := waitingPod.GetPod()
podScheduleInfo := pgMgr.cache.getPodScheduleInfo(pod.UID, false)
if podScheduleInfo != nil && !podScheduleInfo.GetAlreadyBeenRejected() {
podScheduleInfo.SetAlreadyBeenRejected(true)
rejectedPodCount++
}
}
})
defer func() {
handle.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
waitingGangId := util.GetId(waitingPod.GetPod().Namespace, util.GetGangNameByPod(waitingPod.GetPod()))
if gangSet.Has(waitingGangId) {
klog.V(1).InfoS("GangGroup gets rejected due to member Gang is unschedulable",
"gang", gangId, "waitingGang", waitingGangId, "waitingPod", klog.KObj(waitingPod.GetPod()))
waitingPod.Reject(pluginName, message)
}
})
}()
}
if rejectedPodCount == 0 {
return
}
gangGroupLastScheduleTime := timeNowFn()
for gang := range gangSet {
gangIns := pgMgr.cache.getGangFromCacheByGangId(gang, false)
if gangIns != nil {
gangIns.setScheduleCycleValid(false)
gangIns.setLastScheduleTime(gangGroupLastScheduleTime)
}
}
}
Expand All @@ -405,6 +439,7 @@ func (pgMgr *PodGroupManager) PostBind(ctx context.Context, pod *corev1.Pod, nod
}
// first update gang in cache
gang.addBoundPod(pod)
pgMgr.cache.deletePodScheduleInfo(pod)

// update PodGroup
_, pg := pgMgr.GetPodGroup(pod)
Expand Down Expand Up @@ -554,18 +589,18 @@ func (pgMgr *PodGroupManager) GetChildScheduleCycle(pod *corev1.Pod) int {
return gang.getChildScheduleCycle(pod)
}

func (pgMgr *PodGroupManager) GetGangGroupWaitingBoundPodNum(pod *corev1.Pod) int {
gang := pgMgr.GetGangByPod(pod)
if gang == nil {
return 0
func (pgMgr *PodGroupManager) GetPodLastScheduleTime(pod *corev1.Pod, podLastScheduleTime time.Time) time.Time {
podScheduleInfo := pgMgr.cache.getPodScheduleInfo(pod.UID, false)
if podScheduleInfo != nil {
return podScheduleInfo.GetLastScheduleTime()
}
gangGroup := gang.GangGroup
waitingPodNum := 0
for _, memberGangID := range gangGroup {
memberGang := pgMgr.cache.getGangFromCacheByGangId(memberGangID, false)
if memberGang != nil {
waitingPodNum += memberGang.getGangWaitingPods()
}
gang := pgMgr.GetGangByPod(pod)
if gang != nil {
return gang.getLastScheduleTime()
}
return waitingPodNum
return podLastScheduleTime
}

func (pgMgr *PodGroupManager) GetPodScheduleInfo(pod *corev1.Pod) *PodScheduleInfo {
return pgMgr.cache.getPodScheduleInfo(pod.UID, false)
}
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/coscheduling/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func TestPlugin_PreFilter(t *testing.T) {
}()
}
// run the case
err := mgr.PreFilter(ctx, tt.pod)
err, _ := mgr.PreFilter(ctx, tt.pod)
var returnMessage string
if err == nil {
returnMessage = ""
Expand Down
29 changes: 28 additions & 1 deletion pkg/scheduler/plugins/coscheduling/core/gang.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ type Gang struct {
// we set the pod's cycle in `childrenScheduleRoundMap` equal with `scheduleCycle` and pass the check. If result is negative, means
// the pod has been scheduled in this cycle, so we should reject it. With `totalChildrenNum`'s help, when the last pod comes to make all
// `childrenScheduleRoundMap`'s values equal to `scheduleCycle`, Gang's `scheduleCycle` will be added by 1, which means a new schedule cycle.
ScheduleCycle int
ScheduleCycle int
// Pods with the same priority are scheduled according to the scheduling time of the previous round to ensure fairness.
// For a Gang, its last round time as a whole is recorded as LastScheduleTime, that is, all Pods below it have the same time, which is LastScheduleTime.
LastScheduleTime time.Time
ChildrenScheduleRoundMap map[string]int

GangFrom string
Expand All @@ -98,6 +101,7 @@ func NewGang(gangName string) *Gang {
BoundChildren: make(map[string]*v1.Pod),
ScheduleCycleValid: true,
ScheduleCycle: 1,
LastScheduleTime: timeNowFn(),
ChildrenScheduleRoundMap: make(map[string]int),
GangFrom: GangFromPodAnnotation,
HasGangInit: false,
Expand Down Expand Up @@ -149,6 +153,7 @@ func (gang *Gang) tryInitByPodConfig(pod *v1.Pod, args *config.CoschedulingArgs)

// here we assume that Coscheduling's CreateTime equal with the pod's CreateTime
gang.CreateTime = pod.CreationTimestamp.Time
gang.LastScheduleTime = pod.CreationTimestamp.Time

waitTime, err := time.ParseDuration(pod.Annotations[extension.AnnotationGangWaitTime])
if err != nil || waitTime <= 0 {
Expand Down Expand Up @@ -215,6 +220,7 @@ func (gang *Gang) tryInitByPodGroup(pg *v1alpha1.PodGroup, args *config.Coschedu

// here we assume that Coscheduling's CreateTime equal with the podGroup CRD CreateTime
gang.CreateTime = pg.CreationTimestamp.Time
gang.LastScheduleTime = pg.CreationTimestamp.Time

waitTime := util.GetWaitTimeDuration(pg, args.DefaultTimeout.Duration)
gang.WaitTime = waitTime
Expand Down Expand Up @@ -382,6 +388,15 @@ func (gang *Gang) setScheduleCycleValid(valid bool) {
gang.lock.Lock()
defer gang.lock.Unlock()

if !valid && !gang.ScheduleCycleValid {
/*
let's explain why by following example, there are three gang of one group: A, B and C, every gang group have min-member of 10 pod, noted as A1-A10, B1-B10, C1-C10.
1. A1-A5 assumed in gangA scheduleCycle 1, B1-B5 assumed in gangA scheduleCycle 1, C1-C5 assumed in gangA scheduleCycle 1, C7 filter failed due to insufficient resource, then A6-10 failed due to cycle invalid, C6-C10 failed due to cycle invalid, B6-B9 failed due to cycle invalid
2. A1-A5 assumed in gangA scheduleCycle 2, C1-C5 assumed in gangA scheduleCycle 2, C7 failed due to insufficient resource and gangA\B\C cycle set to invalid,
3. then B10 failed due to cycle invalid, B1 comes and find its gang scheduling cycle 2 can be valid, so it's assumed. however, we expect B1 should fail because schedule cycle 2 already failed.
*/
gang.ScheduleCycle += 1
}
gang.ScheduleCycleValid = valid
klog.Infof("SetScheduleCycleValid, gangName: %v, valid: %v", gang.Name, valid)
}
Expand Down Expand Up @@ -494,3 +509,15 @@ func (gang *Gang) isGangValidForPermit() bool {
return len(gang.WaitingForBindChildren) >= gang.MinRequiredNumber || gang.OnceResourceSatisfied == true
}
}

func (gang *Gang) getLastScheduleTime() time.Time {
gang.lock.Lock()
defer gang.lock.Unlock()
return gang.LastScheduleTime
}

func (gang *Gang) setLastScheduleTime(time time.Time) {
gang.lock.Lock()
defer gang.lock.Unlock()
gang.LastScheduleTime = time
}
Loading

0 comments on commit a60bfd2

Please sign in to comment.