Skip to content

Commit

Permalink
scheduler: no rejecting sbiling when invalid scheduling cycle
Browse files Browse the repository at this point in the history
Signed-off-by: wangjianyu.wjy <wangjianyu.wjy@alibaba-inc.com>
  • Loading branch information
wangjianyu.wjy committed Apr 12, 2024
1 parent 252c29d commit 34a4b55
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 17 deletions.
24 changes: 12 additions & 12 deletions pkg/scheduler/plugins/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ const (

// Manager defines the interfaces for PodGroup management.
type Manager interface {
PreFilter(context.Context, *corev1.Pod) error
PreFilter(context.Context, *corev1.Pod) (err error, scheduleCycleInvalid bool)
Permit(context.Context, *corev1.Pod) (time.Duration, Status)
PostBind(context.Context, *corev1.Pod, string)
PostFilter(context.Context, *corev1.Pod, framework.Handle, string, framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status)
Expand Down Expand Up @@ -218,35 +218,35 @@ func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework
// iii.Check whether the Gang is OnceResourceSatisfied
// iv.Check whether the Gang has met the scheduleCycleValid check, and reject the pod if negative(only Strict mode ).
// v.Try update scheduleCycle, scheduleCycleValid, childrenScheduleRoundMap as mentioned above.
func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) error {
func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) (err error, scheduleCycleInvalid bool) {
if !util.IsPodNeedGang(pod) {
return nil
return nil, false
}
gang := pgMgr.GetGangByPod(pod)
if gang == nil {
return fmt.Errorf("can't find gang, gangName: %v, podName: %v", util.GetId(pod.Namespace, util.GetGangNameByPod(pod)),
util.GetId(pod.Namespace, pod.Name))
util.GetId(pod.Namespace, pod.Name)), false
}
pgMgr.cache.setGangGroupLastScheduleTimeOfPod(pod.UID, gang.getLastScheduleTime())

// check if gang is initialized
if !gang.HasGangInit {
return fmt.Errorf("gang has not init, gangName: %v, podName: %v", gang.Name,
util.GetId(pod.Namespace, pod.Name))
util.GetId(pod.Namespace, pod.Name)), false
}
// resourceSatisfied means pod will directly pass the PreFilter
if gang.getGangMatchPolicy() == extension.GangMatchPolicyOnceSatisfied && gang.isGangOnceResourceSatisfied() {
return nil
return nil, false
}

// check minNum
if gang.getChildrenNum() < gang.getGangMinNum() {
return fmt.Errorf("gang child pod not collect enough, gangName: %v, podName: %v", gang.Name,
util.GetId(pod.Namespace, pod.Name))
util.GetId(pod.Namespace, pod.Name)), false
}

if pgMgr.args != nil && pgMgr.args.SkipCheckScheduleCycle {
return nil
return nil, false
}

// first try update the global cycle of gang
Expand All @@ -257,19 +257,19 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er
gangMode := gang.getGangMode()
if gangMode == extension.GangModeStrict {
if pod.Status.NominatedNodeName != "" {
return nil
return nil, false
}
podScheduleCycle := gang.getChildScheduleCycle(pod)
if !gang.isScheduleCycleValid() {
return fmt.Errorf("gang scheduleCycle not valid, gangName: %v, podName: %v",
gang.Name, util.GetId(pod.Namespace, pod.Name))
gang.Name, util.GetId(pod.Namespace, pod.Name)), true
}
if podScheduleCycle >= gangScheduleCycle {
return fmt.Errorf("pod's schedule cycle too large, gangName: %v, podName: %v, podCycle: %v, gangCycle: %v",
gang.Name, util.GetId(pod.Namespace, pod.Name), podScheduleCycle, gangScheduleCycle)
gang.Name, util.GetId(pod.Namespace, pod.Name), podScheduleCycle, gangScheduleCycle), false
}
}
return nil
return nil, false
}

// PostFilter
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/coscheduling/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func TestPlugin_PreFilter(t *testing.T) {
}()
}
// run the case
err := mgr.PreFilter(ctx, tt.pod)
err, _ := mgr.PreFilter(ctx, tt.pod)
var returnMessage string
if err == nil {
returnMessage = ""
Expand Down
34 changes: 30 additions & 4 deletions pkg/scheduler/plugins/coscheduling/coscheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ var _ framework.EnqueueExtensions = &Coscheduling{}

const (
// Name is the name of the plugin used in Registry and configurations.
Name = "Coscheduling"
Name = "Coscheduling"
stateKey = Name
)

// New initializes and returns a new Coscheduling plugin.
Expand Down Expand Up @@ -160,19 +161,44 @@ func (cs *Coscheduling) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool {
// iii.Check whether the Gang has met the scheduleCycleValid check, and reject the pod if negative.
// iv.Try update scheduleCycle, scheduleCycleValid, childrenScheduleRoundMap as mentioned above.
func (cs *Coscheduling) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
// If PreFilter fails, return framework.Error to avoid
// any preemption attempts.
if err := cs.pgMgr.PreFilter(ctx, pod); err != nil {
// If PreFilter fails, return framework.UnschedulableAndUnresolvable to avoid any preemption attempts.
// If Prefilter failed due to scheduleCycle invalid, we shouldn't reject it's assumed sibling.
if err, scheduleCycleInvalid := cs.pgMgr.PreFilter(ctx, pod); err != nil {
state.Write(stateKey, &stateData{skipPostFilter: scheduleCycleInvalid})
klog.ErrorS(err, "PreFilter failed", "pod", klog.KObj(pod))
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
}
return nil, framework.NewStatus(framework.Success, "")
}

type stateData struct {
skipPostFilter bool
}

func (s *stateData) Clone() framework.StateData {
ns := &stateData{
skipPostFilter: s.skipPostFilter,
}
return ns
}

func getPreFilterState(cycleState *framework.CycleState) *stateData {
value, err := cycleState.Read(stateKey)
if err != nil {
return nil
}
state := value.(*stateData)
return state
}

// PostFilter
// i. If strict-mode, we will set scheduleCycleValid to false and release all assumed pods.
// ii. If non-strict mode, we will do nothing.
func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
preFilterState := getPreFilterState(state)
if preFilterState != nil && preFilterState.skipPostFilter {
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable)
}
return cs.pgMgr.PostFilter(ctx, pod, cs.frameworkHandler, Name, filteredNodeStatusMap)
}

Expand Down

0 comments on commit 34a4b55

Please sign in to comment.