Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: assure Fairness and DeadlockFree #1991

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 61 additions & 30 deletions pkg/scheduler/plugins/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ const (

// Manager defines the interfaces for PodGroup management.
type Manager interface {
PreFilter(context.Context, *corev1.Pod) error
PreFilter(context.Context, *corev1.Pod) (error, bool)
Permit(context.Context, *corev1.Pod) (time.Duration, Status)
PostBind(context.Context, *corev1.Pod, string)
PostFilter(context.Context, *corev1.Pod, framework.Handle, string, framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status)
Expand All @@ -73,7 +73,8 @@ type Manager interface {
GetGangSummaries() map[string]*GangSummary
IsGangMinSatisfied(*corev1.Pod) bool
GetChildScheduleCycle(*corev1.Pod) int
GetGangGroupWaitingBoundPodNum(pod *corev1.Pod) int
GetPodLastScheduleTime(pod *corev1.Pod, podLastScheduleTime time.Time) time.Time
GetPodScheduleInfo(pod *corev1.Pod) *PodScheduleInfo
}

// PodGroupManager defines the scheduling operation called
Expand Down Expand Up @@ -218,34 +219,36 @@ func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework
// iii.Check whether the Gang is OnceResourceSatisfied
// iv.Check whether the Gang has met the scheduleCycleValid check, and reject the pod if negative(only Strict mode ).
// v.Try update scheduleCycle, scheduleCycleValid, childrenScheduleRoundMap as mentioned above.
func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) error {
func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) (error, bool) {
ZiMengSheng marked this conversation as resolved.
Show resolved Hide resolved
if !util.IsPodNeedGang(pod) {
return nil
return nil, false
}
gang := pgMgr.GetGangByPod(pod)
if gang == nil {
return fmt.Errorf("can't find gang, gangName: %v, podName: %v", util.GetId(pod.Namespace, util.GetGangNameByPod(pod)),
util.GetId(pod.Namespace, pod.Name))
util.GetId(pod.Namespace, pod.Name)), false
}
podScheduleInfo := pgMgr.cache.getPodScheduleInfo(pod.UID, true)
podScheduleInfo.SetLastScheduleTime(gang.getLastScheduleTime())

// check if gang is initialized
if !gang.HasGangInit {
return fmt.Errorf("gang has not init, gangName: %v, podName: %v", gang.Name,
util.GetId(pod.Namespace, pod.Name))
util.GetId(pod.Namespace, pod.Name)), false
}
// resourceSatisfied means pod will directly pass the PreFilter
if gang.getGangMatchPolicy() == extension.GangMatchPolicyOnceSatisfied && gang.isGangOnceResourceSatisfied() {
return nil
return nil, false
}

// check minNum
if gang.getChildrenNum() < gang.getGangMinNum() {
return fmt.Errorf("gang child pod not collect enough, gangName: %v, podName: %v", gang.Name,
util.GetId(pod.Namespace, pod.Name))
util.GetId(pod.Namespace, pod.Name)), false
}

if pgMgr.args != nil && pgMgr.args.SkipCheckScheduleCycle {
return nil
return nil, false
}

// first try update the global cycle of gang
Expand All @@ -256,19 +259,19 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er
gangMode := gang.getGangMode()
if gangMode == extension.GangModeStrict {
if pod.Status.NominatedNodeName != "" {
return nil
return nil, false
}
podScheduleCycle := gang.getChildScheduleCycle(pod)
if !gang.isScheduleCycleValid() {
return fmt.Errorf("gang scheduleCycle not valid, gangName: %v, podName: %v",
gang.Name, util.GetId(pod.Namespace, pod.Name))
gang.Name, util.GetId(pod.Namespace, pod.Name)), true
}
if podScheduleCycle >= gangScheduleCycle {
return fmt.Errorf("pod's schedule cycle too large, gangName: %v, podName: %v, podCycle: %v, gangCycle: %v",
gang.Name, util.GetId(pod.Namespace, pod.Name), podScheduleCycle, gangScheduleCycle)
gang.Name, util.GetId(pod.Namespace, pod.Name), podScheduleCycle, gangScheduleCycle), true
}
}
return nil
return nil, false
}

// PostFilter
Expand All @@ -284,6 +287,12 @@ func (pgMgr *PodGroupManager) PostFilter(ctx context.Context, pod *corev1.Pod, h
klog.Warningf(message)
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable, message)
}
defer func() {
podScheduleInfo := pgMgr.cache.getPodScheduleInfo(pod.UID, false)
if podScheduleInfo != nil {
podScheduleInfo.SetLastScheduleTime(gang.getLastScheduleTime())
}
}()
if gang.getGangMatchPolicy() == extension.GangMatchPolicyOnceSatisfied && gang.isGangOnceResourceSatisfied() {
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable)
}
Expand Down Expand Up @@ -350,6 +359,11 @@ func (pgMgr *PodGroupManager) Unreserve(ctx context.Context, state *framework.Cy
klog.Warningf("Pod %q missing Gang", klog.KObj(pod))
return
}
podScheduleInfo := pgMgr.cache.getPodScheduleInfo(pod.UID, false)
if podScheduleInfo != nil {
podScheduleInfo.SetLastScheduleTime(gang.getLastScheduleTime())
podScheduleInfo.SetAlreadyBeenRejected(false)
}
// first delete the pod from gang's waitingFroBindChildren map
gang.delAssumedPod(pod)

Expand All @@ -373,22 +387,42 @@ func (pgMgr *PodGroupManager) rejectGangGroupById(handle framework.Handle, plugi
rejectedPodCount := 0
if handle != nil {
handle.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
/*
let's explain why by following example, we have one gang and min member of 20
1. gangA1-gangA10 assumed, gangA11 failed due to insufficient resource
2. then gangA11 invoke PostFilter, reject gangA1-gangA10, gangA1-gangA10 invoke unreserve asynchronously, i.e: invoke rejectGangGroupById asynchronously
3. so gangA11 should reject gangA1-gangA10 after alreadyBeenRejected and lastScheduleTime cached timely
*/
waitingGangId := util.GetId(waitingPod.GetPod().Namespace, util.GetGangNameByPod(waitingPod.GetPod()))
if gangSet.Has(waitingGangId) {
klog.V(1).InfoS("GangGroup gets rejected due to member Gang is unschedulable",
"gang", gangId, "waitingGang", waitingGangId, "waitingPod", klog.KObj(waitingPod.GetPod()))
waitingPod.Reject(pluginName, message)
rejectedPodCount++
pod := waitingPod.GetPod()
podScheduleInfo := pgMgr.cache.getPodScheduleInfo(pod.UID, false)
if podScheduleInfo != nil && !podScheduleInfo.GetAlreadyBeenRejected() {
podScheduleInfo.SetAlreadyBeenRejected(true)
rejectedPodCount++
}
}
})
defer func() {
handle.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
waitingGangId := util.GetId(waitingPod.GetPod().Namespace, util.GetGangNameByPod(waitingPod.GetPod()))
if gangSet.Has(waitingGangId) {
klog.V(1).InfoS("GangGroup gets rejected due to member Gang is unschedulable",
"gang", gangId, "waitingGang", waitingGangId, "waitingPod", klog.KObj(waitingPod.GetPod()))
waitingPod.Reject(pluginName, message)
}
})
}()
}
if rejectedPodCount == 0 {
return
}
gangGroupLastScheduleTime := timeNowFn()
for gang := range gangSet {
gangIns := pgMgr.cache.getGangFromCacheByGangId(gang, false)
if gangIns != nil {
gangIns.setScheduleCycleValid(false)
gangIns.setLastScheduleTime(gangGroupLastScheduleTime)
}
}
}
Expand All @@ -405,6 +439,7 @@ func (pgMgr *PodGroupManager) PostBind(ctx context.Context, pod *corev1.Pod, nod
}
// first update gang in cache
gang.addBoundPod(pod)
pgMgr.cache.deletePodScheduleInfo(pod)

// update PodGroup
_, pg := pgMgr.GetPodGroup(pod)
Expand Down Expand Up @@ -554,18 +589,14 @@ func (pgMgr *PodGroupManager) GetChildScheduleCycle(pod *corev1.Pod) int {
return gang.getChildScheduleCycle(pod)
}

func (pgMgr *PodGroupManager) GetGangGroupWaitingBoundPodNum(pod *corev1.Pod) int {
gang := pgMgr.GetGangByPod(pod)
if gang == nil {
return 0
func (pgMgr *PodGroupManager) GetPodLastScheduleTime(pod *corev1.Pod, podLastScheduleTime time.Time) time.Time {
podScheduleInfo := pgMgr.cache.getPodScheduleInfo(pod.UID, false)
if podScheduleInfo != nil {
return podScheduleInfo.GetLastScheduleTime()
}
gangGroup := gang.GangGroup
waitingPodNum := 0
for _, memberGangID := range gangGroup {
memberGang := pgMgr.cache.getGangFromCacheByGangId(memberGangID, false)
if memberGang != nil {
waitingPodNum += memberGang.getGangWaitingPods()
}
}
return waitingPodNum
return podLastScheduleTime
}

func (pgMgr *PodGroupManager) GetPodScheduleInfo(pod *corev1.Pod) *PodScheduleInfo {
return pgMgr.cache.getPodScheduleInfo(pod.UID, false)
}
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/coscheduling/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func TestPlugin_PreFilter(t *testing.T) {
}()
}
// run the case
err := mgr.PreFilter(ctx, tt.pod)
err, _ := mgr.PreFilter(ctx, tt.pod)
var returnMessage string
if err == nil {
returnMessage = ""
Expand Down
29 changes: 28 additions & 1 deletion pkg/scheduler/plugins/coscheduling/core/gang.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ type Gang struct {
// we set the pod's cycle in `childrenScheduleRoundMap` equal with `scheduleCycle` and pass the check. If result is negative, means
// the pod has been scheduled in this cycle, so we should reject it. With `totalChildrenNum`'s help, when the last pod comes to make all
// `childrenScheduleRoundMap`'s values equal to `scheduleCycle`, Gang's `scheduleCycle` will be added by 1, which means a new schedule cycle.
ScheduleCycle int
ScheduleCycle int
// Pods with the same priority are scheduled according to the scheduling time of the previous round to ensure fairness.
// For a Gang, its last round time as a whole is recorded as LastScheduleTime, that is, all Pods below it have the same time, which is LastScheduleTime.
LastScheduleTime time.Time
ChildrenScheduleRoundMap map[string]int

GangFrom string
Expand All @@ -98,6 +101,7 @@ func NewGang(gangName string) *Gang {
BoundChildren: make(map[string]*v1.Pod),
ScheduleCycleValid: true,
ScheduleCycle: 1,
LastScheduleTime: timeNowFn(),
ChildrenScheduleRoundMap: make(map[string]int),
GangFrom: GangFromPodAnnotation,
HasGangInit: false,
Expand Down Expand Up @@ -149,6 +153,7 @@ func (gang *Gang) tryInitByPodConfig(pod *v1.Pod, args *config.CoschedulingArgs)

// here we assume that Coscheduling's CreateTime equal with the pod's CreateTime
gang.CreateTime = pod.CreationTimestamp.Time
gang.LastScheduleTime = pod.CreationTimestamp.Time

waitTime, err := time.ParseDuration(pod.Annotations[extension.AnnotationGangWaitTime])
if err != nil || waitTime <= 0 {
Expand Down Expand Up @@ -215,6 +220,7 @@ func (gang *Gang) tryInitByPodGroup(pg *v1alpha1.PodGroup, args *config.Coschedu

// here we assume that Coscheduling's CreateTime equal with the podGroup CRD CreateTime
gang.CreateTime = pg.CreationTimestamp.Time
gang.LastScheduleTime = pg.CreationTimestamp.Time

waitTime := util.GetWaitTimeDuration(pg, args.DefaultTimeout.Duration)
gang.WaitTime = waitTime
Expand Down Expand Up @@ -382,6 +388,15 @@ func (gang *Gang) setScheduleCycleValid(valid bool) {
gang.lock.Lock()
defer gang.lock.Unlock()

if !valid && !gang.ScheduleCycleValid {
/*
let's explain why by following example, there are three gang of one group: A, B and C, every gang group have min-member of 10 pod, noted as A1-A10, B1-B10, C1-C10.
1. A1-A5 assumed in gangA scheduleCycle 1, B1-B5 assumed in gangA scheduleCycle 1, C1-C5 assumed in gangA scheduleCycle 1, C7 filter failed due to insufficient resource, then A6-10 failed due to cycle invalid, C6-C10 failed due to cycle invalid, B6-B9 failed due to cycle invalid
2. A1-A5 assumed in gangA scheduleCycle 2, C1-C5 assumed in gangA scheduleCycle 2, C7 failed due to insufficient resource and gangA\B\C cycle set to invalid,
3. then B10 failed due to cycle invalid, B1 comes and find its gang scheduling cycle 2 can be valid, so it's assumed. however, we expect B1 should fail because schedule cycle 2 already failed.
*/
gang.ScheduleCycle += 1
Copy link
Contributor

@buptcozy buptcozy Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里scheduleCycle的增加是什么情况,是想提前开启下一轮吗?感觉好像没什么必要; 感觉注释也有些问题,gang的scheduleCycle是在A\B\C内是独立的,为什么C7 insufficient resource会导致A6-A10的scheduleCycle校验失败

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

经过讨论,这个地方修改可以去掉

}
gang.ScheduleCycleValid = valid
klog.Infof("SetScheduleCycleValid, gangName: %v, valid: %v", gang.Name, valid)
}
Expand Down Expand Up @@ -494,3 +509,15 @@ func (gang *Gang) isGangValidForPermit() bool {
return len(gang.WaitingForBindChildren) >= gang.MinRequiredNumber || gang.OnceResourceSatisfied == true
}
}

func (gang *Gang) getLastScheduleTime() time.Time {
gang.lock.Lock()
defer gang.lock.Unlock()
return gang.LastScheduleTime
}

func (gang *Gang) setLastScheduleTime(time time.Time) {
gang.lock.Lock()
defer gang.lock.Unlock()
gang.LastScheduleTime = time
}
79 changes: 67 additions & 12 deletions pkg/scheduler/plugins/coscheduling/core/gang_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package core

import (
"sync"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/scheduler-plugins/pkg/apis/scheduling/v1alpha1"
Expand All @@ -32,22 +34,54 @@ import (
)

type GangCache struct {
lock *sync.RWMutex
gangItems map[string]*Gang
pluginArgs *config.CoschedulingArgs
podLister listerv1.PodLister
pgLister pglister.PodGroupLister
pgClient pgclientset.Interface
lock *sync.RWMutex
gangItems map[string]*Gang
pluginArgs *config.CoschedulingArgs
podLister listerv1.PodLister
pgLister pglister.PodGroupLister
pgClient pgclientset.Interface
podScheduleInfos map[types.UID]*PodScheduleInfo
ZiMengSheng marked this conversation as resolved.
Show resolved Hide resolved
}

type PodScheduleInfo struct {
lock sync.RWMutex
lastScheduleTime time.Time
alreadyBeenRejected bool
}

func (p *PodScheduleInfo) SetLastScheduleTime(lastScheduleTime time.Time) {
p.lock.Lock()
defer p.lock.Unlock()
p.lastScheduleTime = lastScheduleTime
}

func (p *PodScheduleInfo) SetAlreadyBeenRejected(rejected bool) {
p.lock.Lock()
defer p.lock.Unlock()
p.alreadyBeenRejected = rejected
}

func (p *PodScheduleInfo) GetAlreadyBeenRejected() bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.alreadyBeenRejected
}

func (p *PodScheduleInfo) GetLastScheduleTime() time.Time {
p.lock.RLock()
defer p.lock.RUnlock()
return p.lastScheduleTime
}

func NewGangCache(args *config.CoschedulingArgs, podLister listerv1.PodLister, pgLister pglister.PodGroupLister, client pgclientset.Interface) *GangCache {
return &GangCache{
gangItems: make(map[string]*Gang),
lock: new(sync.RWMutex),
pluginArgs: args,
podLister: podLister,
pgLister: pgLister,
pgClient: client,
gangItems: make(map[string]*Gang),
lock: new(sync.RWMutex),
pluginArgs: args,
podLister: podLister,
pgLister: pgLister,
pgClient: client,
podScheduleInfos: make(map[types.UID]*PodScheduleInfo),
}
}

Expand All @@ -63,6 +97,17 @@ func (gangCache *GangCache) getGangFromCacheByGangId(gangId string, createIfNotE
return gang
}

func (gangCache *GangCache) getPodScheduleInfo(podUID types.UID, createIfNotExist bool) *PodScheduleInfo {
gangCache.lock.Lock()
defer gangCache.lock.Unlock()
podScheduleInfo := gangCache.podScheduleInfos[podUID]
if podScheduleInfo == nil && createIfNotExist {
podScheduleInfo = &PodScheduleInfo{}
gangCache.podScheduleInfos[podUID] = podScheduleInfo
}
return podScheduleInfo
}

func (gangCache *GangCache) getAllGangsFromCache() map[string]*Gang {
gangCache.lock.RLock()
defer gangCache.lock.RUnlock()
Expand Down Expand Up @@ -106,6 +151,7 @@ func (gangCache *GangCache) onPodAdd(obj interface{}) {
if pod.Spec.NodeName != "" {
gang.addBoundPod(pod)
gang.setResourceSatisfied()
gangCache.deletePodScheduleInfo(pod)
}
}

Expand Down Expand Up @@ -136,6 +182,7 @@ func (gangCache *GangCache) onPodDelete(obj interface{}) {
if gangName == "" {
return
}
gangCache.deletePodScheduleInfo(pod)

gangNamespace := pod.Namespace
gangId := util.GetId(gangNamespace, gangName)
Expand All @@ -150,6 +197,14 @@ func (gangCache *GangCache) onPodDelete(obj interface{}) {
}
}

func (gangCache *GangCache) deletePodScheduleInfo(pod *v1.Pod) {
gangCache.lock.Lock()
defer gangCache.lock.Unlock()

delete(gangCache.podScheduleInfos, pod.UID)
klog.Infof("delete podScheduleInfo from cache, pod: %s/%s/%v", pod.Namespace, pod.Name, pod.UID)
}

func (gangCache *GangCache) onPodGroupAdd(obj interface{}) {
pg, ok := obj.(*v1alpha1.PodGroup)
if !ok {
Expand Down
Loading
Loading