Skip to content

Commit

Permalink
scheduler: no rejecting sbiling when invalid scheduling cycle
Browse files Browse the repository at this point in the history
Signed-off-by: wangjianyu.wjy <wangjianyu.wjy@alibaba-inc.com>
  • Loading branch information
wangjianyu.wjy committed Apr 12, 2024
1 parent 252c29d commit bf2848e
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 17 deletions.
24 changes: 12 additions & 12 deletions pkg/scheduler/plugins/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ const (

// Manager defines the interfaces for PodGroup management.
type Manager interface {
PreFilter(context.Context, *corev1.Pod) error
PreFilter(context.Context, *corev1.Pod) (err error, scheduleCycleInvalid bool)
Permit(context.Context, *corev1.Pod) (time.Duration, Status)
PostBind(context.Context, *corev1.Pod, string)
PostFilter(context.Context, *corev1.Pod, framework.Handle, string, framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status)
Expand Down Expand Up @@ -218,35 +218,35 @@ func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework
// iii.Check whether the Gang is OnceResourceSatisfied
// iv.Check whether the Gang has met the scheduleCycleValid check, and reject the pod if negative(only Strict mode ).
// v.Try update scheduleCycle, scheduleCycleValid, childrenScheduleRoundMap as mentioned above.
func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) error {
func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) (err error, scheduleCycleInvalid bool) {
if !util.IsPodNeedGang(pod) {
return nil
return nil, false
}
gang := pgMgr.GetGangByPod(pod)
if gang == nil {
return fmt.Errorf("can't find gang, gangName: %v, podName: %v", util.GetId(pod.Namespace, util.GetGangNameByPod(pod)),
util.GetId(pod.Namespace, pod.Name))
util.GetId(pod.Namespace, pod.Name)), false
}
pgMgr.cache.setGangGroupLastScheduleTimeOfPod(pod.UID, gang.getLastScheduleTime())

// check if gang is initialized
if !gang.HasGangInit {
return fmt.Errorf("gang has not init, gangName: %v, podName: %v", gang.Name,
util.GetId(pod.Namespace, pod.Name))
util.GetId(pod.Namespace, pod.Name)), false
}
// resourceSatisfied means pod will directly pass the PreFilter
if gang.getGangMatchPolicy() == extension.GangMatchPolicyOnceSatisfied && gang.isGangOnceResourceSatisfied() {
return nil
return nil, false
}

// check minNum
if gang.getChildrenNum() < gang.getGangMinNum() {
return fmt.Errorf("gang child pod not collect enough, gangName: %v, podName: %v", gang.Name,
util.GetId(pod.Namespace, pod.Name))
util.GetId(pod.Namespace, pod.Name)), false
}

if pgMgr.args != nil && pgMgr.args.SkipCheckScheduleCycle {
return nil
return nil, false
}

// first try update the global cycle of gang
Expand All @@ -257,19 +257,19 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er
gangMode := gang.getGangMode()
if gangMode == extension.GangModeStrict {
if pod.Status.NominatedNodeName != "" {
return nil
return nil, false
}
podScheduleCycle := gang.getChildScheduleCycle(pod)
if !gang.isScheduleCycleValid() {
return fmt.Errorf("gang scheduleCycle not valid, gangName: %v, podName: %v",
gang.Name, util.GetId(pod.Namespace, pod.Name))
gang.Name, util.GetId(pod.Namespace, pod.Name)), true
}
if podScheduleCycle >= gangScheduleCycle {
return fmt.Errorf("pod's schedule cycle too large, gangName: %v, podName: %v, podCycle: %v, gangCycle: %v",
gang.Name, util.GetId(pod.Namespace, pod.Name), podScheduleCycle, gangScheduleCycle)
gang.Name, util.GetId(pod.Namespace, pod.Name), podScheduleCycle, gangScheduleCycle), false
}
}
return nil
return nil, false
}

// PostFilter
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/coscheduling/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func TestPlugin_PreFilter(t *testing.T) {
}()
}
// run the case
err := mgr.PreFilter(ctx, tt.pod)
err, _ := mgr.PreFilter(ctx, tt.pod)
var returnMessage string
if err == nil {
returnMessage = ""
Expand Down
34 changes: 30 additions & 4 deletions pkg/scheduler/plugins/coscheduling/coscheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ var _ framework.EnqueueExtensions = &Coscheduling{}

const (
// Name is the name of the plugin used in Registry and configurations.
Name = "Coscheduling"
Name = "Coscheduling"
stateKey = Name
)

// New initializes and returns a new Coscheduling plugin.
Expand Down Expand Up @@ -160,19 +161,44 @@ func (cs *Coscheduling) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool {
// iii.Check whether the Gang has met the scheduleCycleValid check, and reject the pod if negative.
// iv.Try update scheduleCycle, scheduleCycleValid, childrenScheduleRoundMap as mentioned above.
func (cs *Coscheduling) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
// If PreFilter fails, return framework.Error to avoid
// any preemption attempts.
if err := cs.pgMgr.PreFilter(ctx, pod); err != nil {
// If PreFilter fails, return framework.UnschedulableAndUnresolvable to avoid any preemption attempts.
// If Prefilter failed due to scheduleCycle invalid, we shouldn't reject it's assumed sibling.
if err, scheduleCycleInvalid := cs.pgMgr.PreFilter(ctx, pod); err != nil {
state.Write(stateKey, &stateData{skipPostFilter: scheduleCycleInvalid})
klog.ErrorS(err, "PreFilter failed", "pod", klog.KObj(pod))
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
}
return nil, framework.NewStatus(framework.Success, "")
}

type stateData struct {
skipPostFilter bool
}

func (s *stateData) Clone() framework.StateData {
ns := &stateData{
skipPostFilter: s.skipPostFilter,
}
return ns
}

func getPreFilterState(cycleState *framework.CycleState) *stateData {
value, err := cycleState.Read(stateKey)
if err != nil {
return nil
}
state := value.(*stateData)
return state
}

// PostFilter
// i. If strict-mode, we will set scheduleCycleValid to false and release all assumed pods.
// ii. If non-strict mode, we will do nothing.
func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
preFilterState := getPreFilterState(state)
if preFilterState != nil && preFilterState.skipPostFilter {
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable)
}
return cs.pgMgr.PostFilter(ctx, pod, cs.frameworkHandler, Name, filteredNodeStatusMap)
}

Expand Down
114 changes: 114 additions & 0 deletions pkg/scheduler/plugins/coscheduling/coscheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1335,3 +1335,117 @@ func TestDeadLockFree(t *testing.T) {
klog.Infoln(info)
}
}

func TestNoRejectWhenInvalidCycle(t *testing.T) {
gangNames := []string{"gangA", "gangB"}
gangGroups := map[string][]string{
"gangA": {"default/gangA", "default/gangB"},
"gangB": {"default/gangA", "default/gangB"},
}
gangMinRequiredNums := []int{1, 2}

podGroupCRs := map[string]*v1alpha1.PodGroup{}
memberPodsOfGang := map[string][]*corev1.Pod{}
var allPods []*corev1.Pod
for i, gangName := range gangNames {
gangID := util.GetId("default", gangName)

podGroup := makePg(gangName, "default", int32(gangMinRequiredNums[i]), nil, nil)
podGroup.Spec.ScheduleTimeoutSeconds = pointer.Int32(3600)
gangGroup := gangGroups[gangName]
rawGangGroup, err := json.Marshal(gangGroup)
assert.NoError(t, err)
podGroup.Annotations = map[string]string{extension.AnnotationGangGroups: string(rawGangGroup)}
podGroupCRs[gangID] = podGroup

for j := 0; j < gangMinRequiredNums[i]; j++ {
memberPodOfGang := st.MakePod().
Namespace("default").
Name(fmt.Sprintf("%s_%d", gangName, j)).
UID(fmt.Sprintf("%s_%d", gangName, j)).
Label(v1alpha1.PodGroupLabel, gangName).
SchedulerName("koord-scheduler").Obj()
memberPodsOfGang[gangID] = append(memberPodsOfGang[gangID], memberPodOfGang)
allPods = append(allPods, memberPodOfGang)
}
}

pgClientSet := fakepgclientset.NewSimpleClientset()
cs := kubefake.NewSimpleClientset()
for _, podGroupCR := range podGroupCRs {
_, err := pgClientSet.SchedulingV1alpha1().PodGroups(podGroupCR.Namespace).Create(context.Background(), podGroupCR, metav1.CreateOptions{})
assert.NoError(t, err)
}
rand.Shuffle(len(allPods), func(i, j int) {
allPods[i], allPods[j] = allPods[j], allPods[i]
})
for _, pod := range allPods {
_, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
assert.NoError(t, err)
}

cfg := configtesting.V1beta3ToInternalWithDefaults(t, v1beta3.KubeSchedulerConfiguration{
Profiles: []v1beta3.KubeSchedulerProfile{{
SchedulerName: pointer.StringPtr("koord-scheduler"),
Plugins: &v1beta3.Plugins{
QueueSort: v1beta3.PluginSet{
Enabled: []v1beta3.Plugin{
{Name: "fakeQueueSortPlugin"},
},
Disabled: []v1beta3.Plugin{
{Name: "*"},
},
},
},
}}})

suit := newPluginTestSuit(t, nil, pgClientSet, cs)
registry := frameworkruntime.Registry{
"fakeQueueSortPlugin": func(_ apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {
return suit.plugin.(framework.QueueSortPlugin), nil
},
}

eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: cs.EventsV1()})
ctx := context.TODO()
sched, err := scheduler.New(cs,
suit.SharedInformerFactory(),
nil,
profile.NewRecorderFactory(eventBroadcaster),
ctx.Done(),
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry),
scheduler.WithPodInitialBackoffSeconds(1),
scheduler.WithPodMaxBackoffSeconds(1),
scheduler.WithPodMaxInUnschedulablePodsDuration(0),
)

assert.NoError(t, err)
eventBroadcaster.StartRecordingToSink(ctx.Done())
suit.start()
sched.SchedulingQueue.Run()

var scheduleOrder []*debugPodScheduleInfo

_, status := suit.plugin.(*Coscheduling).PostFilter(ctx, framework.NewCycleState(), allPods[0], nil)
assert.False(t, status.IsSuccess())

_, status = suit.plugin.(*Coscheduling).PreFilter(ctx, framework.NewCycleState(), memberPodsOfGang[util.GetId("default", gangNames[0])][0])
assert.False(t, status.IsSuccess())

for i := 0; i < 5; i++ {
for j := 0; j < len(allPods); j++ {
if len(sched.SchedulingQueue.PendingPods()) == 0 {
break
}

simulateScheduleOne(t, ctx, sched, suit, &scheduleOrder, func(pod *corev1.Pod) bool {
return false
})
}
}
assert.Equal(t, 0, len(sched.SchedulingQueue.PendingPods()))
for _, info := range scheduleOrder {
klog.Infoln(info)
}
}

0 comments on commit bf2848e

Please sign in to comment.