diff --git a/pkg/scheduler/plugins/coscheduling/core/core.go b/pkg/scheduler/plugins/coscheduling/core/core.go index 9f0d0c5fd..10e2f7eef 100644 --- a/pkg/scheduler/plugins/coscheduling/core/core.go +++ b/pkg/scheduler/plugins/coscheduling/core/core.go @@ -59,7 +59,7 @@ const ( // Manager defines the interfaces for PodGroup management. type Manager interface { - PreFilter(context.Context, *corev1.Pod) error + PreFilter(context.Context, *corev1.Pod) (err error, scheduleCycleInvalid bool) Permit(context.Context, *corev1.Pod) (time.Duration, Status) PostBind(context.Context, *corev1.Pod, string) PostFilter(context.Context, *corev1.Pod, framework.Handle, string, framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) @@ -218,35 +218,35 @@ func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework // iii.Check whether the Gang is OnceResourceSatisfied // iv.Check whether the Gang has met the scheduleCycleValid check, and reject the pod if negative(only Strict mode ). // v.Try update scheduleCycle, scheduleCycleValid, childrenScheduleRoundMap as mentioned above. -func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) error { +func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) (err error, scheduleCycleInvalid bool) { if !util.IsPodNeedGang(pod) { - return nil + return nil, false } gang := pgMgr.GetGangByPod(pod) if gang == nil { return fmt.Errorf("can't find gang, gangName: %v, podName: %v", util.GetId(pod.Namespace, util.GetGangNameByPod(pod)), - util.GetId(pod.Namespace, pod.Name)) + util.GetId(pod.Namespace, pod.Name)), false } pgMgr.cache.setGangGroupLastScheduleTimeOfPod(pod.UID, gang.getLastScheduleTime()) // check if gang is initialized if !gang.HasGangInit { return fmt.Errorf("gang has not init, gangName: %v, podName: %v", gang.Name, - util.GetId(pod.Namespace, pod.Name)) + util.GetId(pod.Namespace, pod.Name)), false } // resourceSatisfied means pod will directly pass the PreFilter if gang.getGangMatchPolicy() == extension.GangMatchPolicyOnceSatisfied && gang.isGangOnceResourceSatisfied() { - return nil + return nil, false } // check minNum if gang.getChildrenNum() < gang.getGangMinNum() { return fmt.Errorf("gang child pod not collect enough, gangName: %v, podName: %v", gang.Name, - util.GetId(pod.Namespace, pod.Name)) + util.GetId(pod.Namespace, pod.Name)), false } if pgMgr.args != nil && pgMgr.args.SkipCheckScheduleCycle { - return nil + return nil, false } // first try update the global cycle of gang @@ -257,19 +257,19 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er gangMode := gang.getGangMode() if gangMode == extension.GangModeStrict { if pod.Status.NominatedNodeName != "" { - return nil + return nil, false } podScheduleCycle := gang.getChildScheduleCycle(pod) if !gang.isScheduleCycleValid() { return fmt.Errorf("gang scheduleCycle not valid, gangName: %v, podName: %v", - gang.Name, util.GetId(pod.Namespace, pod.Name)) + gang.Name, util.GetId(pod.Namespace, pod.Name)), true } if podScheduleCycle >= gangScheduleCycle { return fmt.Errorf("pod's schedule cycle too large, gangName: %v, podName: %v, podCycle: %v, gangCycle: %v", - gang.Name, util.GetId(pod.Namespace, pod.Name), podScheduleCycle, gangScheduleCycle) + gang.Name, util.GetId(pod.Namespace, pod.Name), podScheduleCycle, gangScheduleCycle), false } } - return nil + return nil, false } // PostFilter diff --git a/pkg/scheduler/plugins/coscheduling/core/core_test.go b/pkg/scheduler/plugins/coscheduling/core/core_test.go index 7978cb113..0883f762c 100644 --- a/pkg/scheduler/plugins/coscheduling/core/core_test.go +++ b/pkg/scheduler/plugins/coscheduling/core/core_test.go @@ -315,7 +315,7 @@ func TestPlugin_PreFilter(t *testing.T) { }() } // run the case - err := mgr.PreFilter(ctx, tt.pod) + err, _ := mgr.PreFilter(ctx, tt.pod) var returnMessage string if err == nil { returnMessage = "" diff --git a/pkg/scheduler/plugins/coscheduling/coscheduling.go b/pkg/scheduler/plugins/coscheduling/coscheduling.go index 0768cbd2f..2ae5d5a65 100644 --- a/pkg/scheduler/plugins/coscheduling/coscheduling.go +++ b/pkg/scheduler/plugins/coscheduling/coscheduling.go @@ -59,7 +59,8 @@ var _ framework.EnqueueExtensions = &Coscheduling{} const ( // Name is the name of the plugin used in Registry and configurations. - Name = "Coscheduling" + Name = "Coscheduling" + stateKey = Name ) // New initializes and returns a new Coscheduling plugin. @@ -160,19 +161,44 @@ func (cs *Coscheduling) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool { // iii.Check whether the Gang has met the scheduleCycleValid check, and reject the pod if negative. // iv.Try update scheduleCycle, scheduleCycleValid, childrenScheduleRoundMap as mentioned above. func (cs *Coscheduling) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { - // If PreFilter fails, return framework.Error to avoid - // any preemption attempts. - if err := cs.pgMgr.PreFilter(ctx, pod); err != nil { + // If PreFilter fails, return framework.UnschedulableAndUnresolvable to avoid any preemption attempts. + // If Prefilter failed due to scheduleCycle invalid, we shouldn't reject it's assumed sibling. + if err, scheduleCycleInvalid := cs.pgMgr.PreFilter(ctx, pod); err != nil { + state.Write(stateKey, &stateData{skipPostFilter: scheduleCycleInvalid}) klog.ErrorS(err, "PreFilter failed", "pod", klog.KObj(pod)) return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) } return nil, framework.NewStatus(framework.Success, "") } +type stateData struct { + skipPostFilter bool +} + +func (s *stateData) Clone() framework.StateData { + ns := &stateData{ + skipPostFilter: s.skipPostFilter, + } + return ns +} + +func getPreFilterState(cycleState *framework.CycleState) *stateData { + value, err := cycleState.Read(stateKey) + if err != nil { + return nil + } + state := value.(*stateData) + return state +} + // PostFilter // i. If strict-mode, we will set scheduleCycleValid to false and release all assumed pods. // ii. If non-strict mode, we will do nothing. func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { + preFilterState := getPreFilterState(state) + if preFilterState != nil && preFilterState.skipPostFilter { + return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable) + } return cs.pgMgr.PostFilter(ctx, pod, cs.frameworkHandler, Name, filteredNodeStatusMap) } diff --git a/pkg/scheduler/plugins/coscheduling/coscheduling_test.go b/pkg/scheduler/plugins/coscheduling/coscheduling_test.go index 69b4a5fc8..d4710b1f9 100644 --- a/pkg/scheduler/plugins/coscheduling/coscheduling_test.go +++ b/pkg/scheduler/plugins/coscheduling/coscheduling_test.go @@ -1335,3 +1335,117 @@ func TestDeadLockFree(t *testing.T) { klog.Infoln(info) } } + +func TestNoRejectWhenInvalidCycle(t *testing.T) { + gangNames := []string{"gangA", "gangB"} + gangGroups := map[string][]string{ + "gangA": {"default/gangA", "default/gangB"}, + "gangB": {"default/gangA", "default/gangB"}, + } + gangMinRequiredNums := []int{1, 2} + + podGroupCRs := map[string]*v1alpha1.PodGroup{} + memberPodsOfGang := map[string][]*corev1.Pod{} + var allPods []*corev1.Pod + for i, gangName := range gangNames { + gangID := util.GetId("default", gangName) + + podGroup := makePg(gangName, "default", int32(gangMinRequiredNums[i]), nil, nil) + podGroup.Spec.ScheduleTimeoutSeconds = pointer.Int32(3600) + gangGroup := gangGroups[gangName] + rawGangGroup, err := json.Marshal(gangGroup) + assert.NoError(t, err) + podGroup.Annotations = map[string]string{extension.AnnotationGangGroups: string(rawGangGroup)} + podGroupCRs[gangID] = podGroup + + for j := 0; j < gangMinRequiredNums[i]; j++ { + memberPodOfGang := st.MakePod(). + Namespace("default"). + Name(fmt.Sprintf("%s_%d", gangName, j)). + UID(fmt.Sprintf("%s_%d", gangName, j)). + Label(v1alpha1.PodGroupLabel, gangName). + SchedulerName("koord-scheduler").Obj() + memberPodsOfGang[gangID] = append(memberPodsOfGang[gangID], memberPodOfGang) + allPods = append(allPods, memberPodOfGang) + } + } + + pgClientSet := fakepgclientset.NewSimpleClientset() + cs := kubefake.NewSimpleClientset() + for _, podGroupCR := range podGroupCRs { + _, err := pgClientSet.SchedulingV1alpha1().PodGroups(podGroupCR.Namespace).Create(context.Background(), podGroupCR, metav1.CreateOptions{}) + assert.NoError(t, err) + } + rand.Shuffle(len(allPods), func(i, j int) { + allPods[i], allPods[j] = allPods[j], allPods[i] + }) + for _, pod := range allPods { + _, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + assert.NoError(t, err) + } + + cfg := configtesting.V1beta3ToInternalWithDefaults(t, v1beta3.KubeSchedulerConfiguration{ + Profiles: []v1beta3.KubeSchedulerProfile{{ + SchedulerName: pointer.StringPtr("koord-scheduler"), + Plugins: &v1beta3.Plugins{ + QueueSort: v1beta3.PluginSet{ + Enabled: []v1beta3.Plugin{ + {Name: "fakeQueueSortPlugin"}, + }, + Disabled: []v1beta3.Plugin{ + {Name: "*"}, + }, + }, + }, + }}}) + + suit := newPluginTestSuit(t, nil, pgClientSet, cs) + registry := frameworkruntime.Registry{ + "fakeQueueSortPlugin": func(_ apiruntime.Object, fh framework.Handle) (framework.Plugin, error) { + return suit.plugin.(framework.QueueSortPlugin), nil + }, + } + + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: cs.EventsV1()}) + ctx := context.TODO() + sched, err := scheduler.New(cs, + suit.SharedInformerFactory(), + nil, + profile.NewRecorderFactory(eventBroadcaster), + ctx.Done(), + scheduler.WithProfiles(cfg.Profiles...), + scheduler.WithFrameworkOutOfTreeRegistry(registry), + scheduler.WithPodInitialBackoffSeconds(1), + scheduler.WithPodMaxBackoffSeconds(1), + scheduler.WithPodMaxInUnschedulablePodsDuration(0), + ) + + assert.NoError(t, err) + eventBroadcaster.StartRecordingToSink(ctx.Done()) + suit.start() + sched.SchedulingQueue.Run() + + var scheduleOrder []*debugPodScheduleInfo + + _, status := suit.plugin.(*Coscheduling).PostFilter(ctx, framework.NewCycleState(), allPods[0], nil) + assert.False(t, status.IsSuccess()) + + _, status = suit.plugin.(*Coscheduling).PreFilter(ctx, framework.NewCycleState(), memberPodsOfGang[util.GetId("default", gangNames[0])][0]) + assert.False(t, status.IsSuccess()) + + for i := 0; i < 5; i++ { + for j := 0; j < len(allPods); j++ { + if len(sched.SchedulingQueue.PendingPods()) == 0 { + break + } + + simulateScheduleOne(t, ctx, sched, suit, &scheduleOrder, func(pod *corev1.Pod) bool { + return false + }) + } + } + assert.Equal(t, 0, len(sched.SchedulingQueue.PendingPods())) + for _, info := range scheduleOrder { + klog.Infoln(info) + } +}