diff --git a/pkg/scheduler/plugins/coscheduling/core/core.go b/pkg/scheduler/plugins/coscheduling/core/core.go index 9f0d0c5fd..10e2f7eef 100644 --- a/pkg/scheduler/plugins/coscheduling/core/core.go +++ b/pkg/scheduler/plugins/coscheduling/core/core.go @@ -59,7 +59,7 @@ const ( // Manager defines the interfaces for PodGroup management. type Manager interface { - PreFilter(context.Context, *corev1.Pod) error + PreFilter(context.Context, *corev1.Pod) (err error, scheduleCycleInvalid bool) Permit(context.Context, *corev1.Pod) (time.Duration, Status) PostBind(context.Context, *corev1.Pod, string) PostFilter(context.Context, *corev1.Pod, framework.Handle, string, framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) @@ -218,35 +218,35 @@ func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework // iii.Check whether the Gang is OnceResourceSatisfied // iv.Check whether the Gang has met the scheduleCycleValid check, and reject the pod if negative(only Strict mode ). // v.Try update scheduleCycle, scheduleCycleValid, childrenScheduleRoundMap as mentioned above. -func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) error { +func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) (err error, scheduleCycleInvalid bool) { if !util.IsPodNeedGang(pod) { - return nil + return nil, false } gang := pgMgr.GetGangByPod(pod) if gang == nil { return fmt.Errorf("can't find gang, gangName: %v, podName: %v", util.GetId(pod.Namespace, util.GetGangNameByPod(pod)), - util.GetId(pod.Namespace, pod.Name)) + util.GetId(pod.Namespace, pod.Name)), false } pgMgr.cache.setGangGroupLastScheduleTimeOfPod(pod.UID, gang.getLastScheduleTime()) // check if gang is initialized if !gang.HasGangInit { return fmt.Errorf("gang has not init, gangName: %v, podName: %v", gang.Name, - util.GetId(pod.Namespace, pod.Name)) + util.GetId(pod.Namespace, pod.Name)), false } // resourceSatisfied means pod will directly pass the PreFilter if gang.getGangMatchPolicy() == extension.GangMatchPolicyOnceSatisfied && gang.isGangOnceResourceSatisfied() { - return nil + return nil, false } // check minNum if gang.getChildrenNum() < gang.getGangMinNum() { return fmt.Errorf("gang child pod not collect enough, gangName: %v, podName: %v", gang.Name, - util.GetId(pod.Namespace, pod.Name)) + util.GetId(pod.Namespace, pod.Name)), false } if pgMgr.args != nil && pgMgr.args.SkipCheckScheduleCycle { - return nil + return nil, false } // first try update the global cycle of gang @@ -257,19 +257,19 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er gangMode := gang.getGangMode() if gangMode == extension.GangModeStrict { if pod.Status.NominatedNodeName != "" { - return nil + return nil, false } podScheduleCycle := gang.getChildScheduleCycle(pod) if !gang.isScheduleCycleValid() { return fmt.Errorf("gang scheduleCycle not valid, gangName: %v, podName: %v", - gang.Name, util.GetId(pod.Namespace, pod.Name)) + gang.Name, util.GetId(pod.Namespace, pod.Name)), true } if podScheduleCycle >= gangScheduleCycle { return fmt.Errorf("pod's schedule cycle too large, gangName: %v, podName: %v, podCycle: %v, gangCycle: %v", - gang.Name, util.GetId(pod.Namespace, pod.Name), podScheduleCycle, gangScheduleCycle) + gang.Name, util.GetId(pod.Namespace, pod.Name), podScheduleCycle, gangScheduleCycle), false } } - return nil + return nil, false } // PostFilter diff --git a/pkg/scheduler/plugins/coscheduling/core/core_test.go b/pkg/scheduler/plugins/coscheduling/core/core_test.go index 7978cb113..0883f762c 100644 --- a/pkg/scheduler/plugins/coscheduling/core/core_test.go +++ b/pkg/scheduler/plugins/coscheduling/core/core_test.go @@ -315,7 +315,7 @@ func TestPlugin_PreFilter(t *testing.T) { }() } // run the case - err := mgr.PreFilter(ctx, tt.pod) + err, _ := mgr.PreFilter(ctx, tt.pod) var returnMessage string if err == nil { returnMessage = "" diff --git a/pkg/scheduler/plugins/coscheduling/coscheduling.go b/pkg/scheduler/plugins/coscheduling/coscheduling.go index 0768cbd2f..2ae5d5a65 100644 --- a/pkg/scheduler/plugins/coscheduling/coscheduling.go +++ b/pkg/scheduler/plugins/coscheduling/coscheduling.go @@ -59,7 +59,8 @@ var _ framework.EnqueueExtensions = &Coscheduling{} const ( // Name is the name of the plugin used in Registry and configurations. - Name = "Coscheduling" + Name = "Coscheduling" + stateKey = Name ) // New initializes and returns a new Coscheduling plugin. @@ -160,19 +161,44 @@ func (cs *Coscheduling) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool { // iii.Check whether the Gang has met the scheduleCycleValid check, and reject the pod if negative. // iv.Try update scheduleCycle, scheduleCycleValid, childrenScheduleRoundMap as mentioned above. func (cs *Coscheduling) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { - // If PreFilter fails, return framework.Error to avoid - // any preemption attempts. - if err := cs.pgMgr.PreFilter(ctx, pod); err != nil { + // If PreFilter fails, return framework.UnschedulableAndUnresolvable to avoid any preemption attempts. + // If Prefilter failed due to scheduleCycle invalid, we shouldn't reject it's assumed sibling. + if err, scheduleCycleInvalid := cs.pgMgr.PreFilter(ctx, pod); err != nil { + state.Write(stateKey, &stateData{skipPostFilter: scheduleCycleInvalid}) klog.ErrorS(err, "PreFilter failed", "pod", klog.KObj(pod)) return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) } return nil, framework.NewStatus(framework.Success, "") } +type stateData struct { + skipPostFilter bool +} + +func (s *stateData) Clone() framework.StateData { + ns := &stateData{ + skipPostFilter: s.skipPostFilter, + } + return ns +} + +func getPreFilterState(cycleState *framework.CycleState) *stateData { + value, err := cycleState.Read(stateKey) + if err != nil { + return nil + } + state := value.(*stateData) + return state +} + // PostFilter // i. If strict-mode, we will set scheduleCycleValid to false and release all assumed pods. // ii. If non-strict mode, we will do nothing. func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { + preFilterState := getPreFilterState(state) + if preFilterState != nil && preFilterState.skipPostFilter { + return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable) + } return cs.pgMgr.PostFilter(ctx, pod, cs.frameworkHandler, Name, filteredNodeStatusMap) }