Skip to content

Commit

Permalink
Cleanup PostBind in coscheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
nayihz committed Mar 24, 2023
1 parent 2ea5a97 commit 280ae89
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 124 deletions.
3 changes: 0 additions & 3 deletions cmd/scheduler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,9 +507,6 @@ profiles:
Enabled: append(defaults.ExpandedPluginsV1.Reserve.Enabled, config.Plugin{Name: coscheduling.Name}),
},
PreBind: defaults.ExpandedPluginsV1.PreBind,
PostBind: config.PluginSet{
Enabled: append(defaults.ExpandedPluginsV1.PostBind.Enabled, config.Plugin{Name: coscheduling.Name}),
},
},
},
},
Expand Down
7 changes: 7 additions & 0 deletions pkg/controllers/podgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -38,6 +39,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/source"

"sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
schedv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
"sigs.k8s.io/scheduler-plugins/pkg/util"
)
Expand Down Expand Up @@ -109,6 +111,11 @@ func (r *PodGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
if len(pods) >= int(pg.Spec.MinMember) {
pgCopy.Status.Phase = schedv1alpha1.PodGroupPreScheduling
fillOccupiedObj(pgCopy, &pods[0])
} else {
pgCopy.Status.Phase = v1alpha1.PodGroupScheduling
if pgCopy.Status.ScheduleStartTime.IsZero() {
pgCopy.Status.ScheduleStartTime = metav1.Time{Time: time.Now()}
}
}
default:
pgCopy.Status.Running, pgCopy.Status.Succeeded, pgCopy.Status.Failed = getCurrentPodStats(pods)
Expand Down
11 changes: 5 additions & 6 deletions pkg/controllers/podgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,13 @@ func Test_Run(t *testing.T) {
podNextPhase: v1.PodSucceeded,
},
{
name: "Group status convert from scheduling to succeed",
name: "Group status convert from pending to scheduling",
pgName: "pg6",
minMember: 2,
minMember: 3,
podNames: []string{"pod1", "pod2"},
podPhase: v1.PodPending,
previousPhase: v1alpha1.PodGroupScheduling,
desiredGroupPhase: v1alpha1.PodGroupFinished,
podNextPhase: v1.PodSucceeded,
podPhase: v1.PodRunning,
previousPhase: v1alpha1.PodGroupPending,
desiredGroupPhase: v1alpha1.PodGroupScheduling,
},
{
name: "Group status convert from pending to prescheduling",
Expand Down
37 changes: 0 additions & 37 deletions pkg/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ const (
type Manager interface {
PreFilter(context.Context, *corev1.Pod) error
Permit(context.Context, *corev1.Pod) Status
PostBind(context.Context, *corev1.Pod, string)
GetPodGroup(*corev1.Pod) (string, *v1alpha1.PodGroup)
GetCreationTimestamp(*corev1.Pod, time.Time) time.Time
DeletePermittedPodGroup(string)
Expand Down Expand Up @@ -204,42 +203,6 @@ func (pgMgr *PodGroupManager) Permit(ctx context.Context, pod *corev1.Pod) Statu
return Wait
}

// PostBind updates a PodGroup's status.
// TODO: move this logic to PodGroup's controller.
func (pgMgr *PodGroupManager) PostBind(ctx context.Context, pod *corev1.Pod, nodeName string) {
pgFullName, pg := pgMgr.GetPodGroup(pod)
if pgFullName == "" || pg == nil {
return
}
pgCopy := pg.DeepCopy()
pgCopy.Status.Scheduled++

if pgCopy.Status.Scheduled >= pgCopy.Spec.MinMember {
pgCopy.Status.Phase = v1alpha1.PodGroupScheduled
} else {
pgCopy.Status.Phase = v1alpha1.PodGroupScheduling
if pgCopy.Status.ScheduleStartTime.IsZero() {
pgCopy.Status.ScheduleStartTime = metav1.Time{Time: time.Now()}
}
}
if pgCopy.Status.Phase != pg.Status.Phase {
pg, err := pgMgr.pgLister.PodGroups(pgCopy.Namespace).Get(pgCopy.Name)
if err != nil {
klog.ErrorS(err, "Failed to get PodGroup", "podGroup", klog.KObj(pgCopy))
return
}
patch, err := util.CreateMergePatch(pg, pgCopy)
if err != nil {
klog.ErrorS(err, "Failed to create merge patch", "podGroup", klog.KObj(pg), "podGroup", klog.KObj(pgCopy))
return
}
if err := pgMgr.PatchPodGroup(pg.Name, pg.Namespace, patch); err != nil {
klog.ErrorS(err, "Failed to patch", "podGroup", klog.KObj(pg))
return
}
}
}

// GetCreationTimestamp returns the creation time of a podGroup or a pod.
func (pgMgr *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time) time.Time {
pgName := util.GetPodGroupLabel(pod)
Expand Down
70 changes: 0 additions & 70 deletions pkg/coscheduling/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
gochache "github.com/patrickmn/go-cache"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
clicache "k8s.io/client-go/tools/cache"
Expand All @@ -35,7 +33,6 @@ import (
"sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
fakepgclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned/fake"
pgformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions"
"sigs.k8s.io/scheduler-plugins/pkg/util"
testutil "sigs.k8s.io/scheduler-plugins/test/util"
)

Expand Down Expand Up @@ -216,73 +213,6 @@ func TestPermit(t *testing.T) {
}
}

func TestPostBind(t *testing.T) {
ctx := context.Background()
pg := testutil.MakePG("pg", "ns1", 1, nil, nil)
pg1 := testutil.MakePG("pg1", "ns1", 2, nil, nil)
pg2 := testutil.MakePG("pg2", "ns1", 3, nil, nil)
pg2.Status.Phase = v1alpha1.PodGroupScheduling
pg2.Status.Scheduled = 1
fakeClient := fakepgclientset.NewSimpleClientset(pg, pg1, pg2)

pgInformerFactory := pgformers.NewSharedInformerFactory(fakeClient, 0)
pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups()
pgInformerFactory.Start(ctx.Done())

pgInformer.Informer().GetStore().Add(pg)
pgInformer.Informer().GetStore().Add(pg1)
pgInformer.Informer().GetStore().Add(pg2)
pgLister := pgInformer.Lister()

tests := []struct {
name string
pod *corev1.Pod
desiredGroupPhase v1alpha1.PodGroupPhase
desiredScheduled int32
}{
{
name: "pg status convert to scheduled",
pod: st.MakePod().Name("p").UID("p").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg").Obj(),
desiredGroupPhase: v1alpha1.PodGroupScheduled,
desiredScheduled: 1,
},
{
name: "pg status convert to scheduling",
pod: st.MakePod().Name("p").UID("p").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(),
desiredGroupPhase: v1alpha1.PodGroupScheduling,
desiredScheduled: 1,
},
{
name: "pg status does not convert, although scheduled pods change",
pod: st.MakePod().Name("p").UID("p").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg2").Obj(),
desiredGroupPhase: v1alpha1.PodGroupScheduling,
desiredScheduled: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pgMgr := &PodGroupManager{pgClient: fakeClient, pgLister: pgLister}
pgMgr.PostBind(ctx, tt.pod, "test")
err := wait.PollImmediate(100*time.Millisecond, 1*time.Second, func() (done bool, err error) {
pg, err := pgMgr.pgClient.SchedulingV1alpha1().PodGroups(tt.pod.Namespace).Get(ctx, util.GetPodGroupLabel(tt.pod), v1.GetOptions{})
if err != nil {
return false, nil
}
if pg.Status.Phase != tt.desiredGroupPhase {
return false, nil
}
if pg.Status.Scheduled != tt.desiredScheduled {
return false, nil
}
return true, nil
})
if err != nil {
t.Error(err)
}
})
}
}

func TestCheckClusterResource(t *testing.T) {
nodeRes := map[corev1.ResourceName]string{corev1.ResourceMemory: "300"}
node := st.MakeNode().Name("fake-node").Capacity(nodeRes).Obj()
Expand Down
8 changes: 1 addition & 7 deletions pkg/coscheduling/coscheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var _ framework.PreFilterPlugin = &Coscheduling{}
var _ framework.PostFilterPlugin = &Coscheduling{}
var _ framework.PermitPlugin = &Coscheduling{}
var _ framework.ReservePlugin = &Coscheduling{}
var _ framework.PostBindPlugin = &Coscheduling{}

var _ framework.EnqueueExtensions = &Coscheduling{}

const (
Expand Down Expand Up @@ -230,9 +230,3 @@ func (cs *Coscheduling) Unreserve(ctx context.Context, state *framework.CycleSta
})
cs.pgMgr.DeletePermittedPodGroup(pgName)
}

// PostBind is called after a pod is successfully bound. These plugins are used update PodGroup when pod is bound.
func (cs *Coscheduling) PostBind(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeName string) {
klog.V(5).InfoS("PostBind", "pod", klog.KObj(pod))
cs.pgMgr.PostBind(ctx, pod, nodeName)
}
1 change: 0 additions & 1 deletion test/integration/coscheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func TestCoschedulingPlugin(t *testing.T) {
cfg.Profiles[0].Plugins.PreFilter.Enabled = append(cfg.Profiles[0].Plugins.PreFilter.Enabled, schedapi.Plugin{Name: coscheduling.Name})
cfg.Profiles[0].Plugins.PostFilter.Enabled = append(cfg.Profiles[0].Plugins.PostFilter.Enabled, schedapi.Plugin{Name: coscheduling.Name})
cfg.Profiles[0].Plugins.Permit.Enabled = append(cfg.Profiles[0].Plugins.Permit.Enabled, schedapi.Plugin{Name: coscheduling.Name})
cfg.Profiles[0].Plugins.PostBind.Enabled = append(cfg.Profiles[0].Plugins.PostBind.Enabled, schedapi.Plugin{Name: coscheduling.Name})
cfg.Profiles[0].PluginConfig = append(cfg.Profiles[0].PluginConfig, schedapi.PluginConfig{
Name: coscheduling.Name,
Args: &schedconfig.CoschedulingArgs{
Expand Down

0 comments on commit 280ae89

Please sign in to comment.