diff --git a/pkg/coscheduling/core/core.go b/pkg/coscheduling/core/core.go index cedddd093..fd9decdca 100644 --- a/pkg/coscheduling/core/core.go +++ b/pkg/coscheduling/core/core.go @@ -32,11 +32,9 @@ import ( listerv1 "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" - pgclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" - pginformer "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions/scheduling/v1alpha1" - pglister "sigs.k8s.io/scheduler-plugins/pkg/generated/listers/scheduling/v1alpha1" "sigs.k8s.io/scheduler-plugins/pkg/util" ) @@ -56,7 +54,7 @@ const ( type Manager interface { PreFilter(context.Context, *corev1.Pod) error Permit(context.Context, *corev1.Pod) Status - GetPodGroup(*corev1.Pod) (string, *v1alpha1.PodGroup) + GetPodGroup(context.Context, *corev1.Pod) (string, *v1alpha1.PodGroup) GetCreationTimestamp(*corev1.Pod, time.Time) time.Time DeletePermittedPodGroup(string) CalculateAssignedPods(string, string) int @@ -66,8 +64,8 @@ type Manager interface { // PodGroupManager defines the scheduling operation called type PodGroupManager struct { - // pgClient is a podGroup client - pgClient pgclientset.Interface + // client is a generic controller-runtime client to manipulate both core resources and PodGroups. + client client.Client // snapshotSharedLister is pod shared list snapshotSharedLister framework.SharedLister // scheduleTimeout is the default timeout for podgroup scheduling. @@ -77,21 +75,17 @@ type PodGroupManager struct { permittedPG *gochache.Cache // backedOffPG stores the podgorup name which failed scheudling recently. backedOffPG *gochache.Cache - // pgLister is podgroup lister - pgLister pglister.PodGroupLister // podLister is pod lister podLister listerv1.PodLister sync.RWMutex } // NewPodGroupManager creates a new operation object. -func NewPodGroupManager(pgClient pgclientset.Interface, snapshotSharedLister framework.SharedLister, scheduleTimeout *time.Duration, - pgInformer pginformer.PodGroupInformer, podInformer informerv1.PodInformer) *PodGroupManager { +func NewPodGroupManager(client client.Client, snapshotSharedLister framework.SharedLister, scheduleTimeout *time.Duration, podInformer informerv1.PodInformer) *PodGroupManager { pgMgr := &PodGroupManager{ - pgClient: pgClient, + client: client, snapshotSharedLister: snapshotSharedLister, scheduleTimeout: scheduleTimeout, - pgLister: pgInformer.Lister(), podLister: podInformer.Lister(), permittedPG: gochache.New(3*time.Second, 3*time.Second), backedOffPG: gochache.New(10*time.Second, 10*time.Second), @@ -149,7 +143,7 @@ func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework // that is required to be scheduled. func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) error { klog.V(5).InfoS("Pre-filter", "pod", klog.KObj(pod)) - pgFullName, pg := pgMgr.GetPodGroup(pod) + pgFullName, pg := pgMgr.GetPodGroup(ctx, pod) if pg == nil { return nil } @@ -200,7 +194,7 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er // Permit permits a pod to run, if the minMember match, it would send a signal to chan. func (pgMgr *PodGroupManager) Permit(ctx context.Context, pod *corev1.Pod) Status { - pgFullName, pg := pgMgr.GetPodGroup(pod) + pgFullName, pg := pgMgr.GetPodGroup(ctx, pod) if pgFullName == "" { return PodGroupNotSpecified } @@ -224,8 +218,8 @@ func (pgMgr *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time if len(pgName) == 0 { return ts } - pg, err := pgMgr.pgLister.PodGroups(pod.Namespace).Get(pgName) - if err != nil { + var pg v1alpha1.PodGroup + if err := pgMgr.client.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: pgName}, &pg); err != nil { return ts } return pg.CreationTimestamp.Time @@ -236,27 +230,17 @@ func (pgMgr *PodGroupManager) DeletePermittedPodGroup(pgFullName string) { pgMgr.permittedPG.Delete(pgFullName) } -// PatchPodGroup patches a podGroup. -func (pgMgr *PodGroupManager) PatchPodGroup(pgName string, namespace string, patch []byte) error { - if len(patch) == 0 { - return nil - } - _, err := pgMgr.pgClient.SchedulingV1alpha1().PodGroups(namespace).Patch(context.TODO(), pgName, - types.MergePatchType, patch, metav1.PatchOptions{}) - return err -} - // GetPodGroup returns the PodGroup that a Pod belongs to in cache. -func (pgMgr *PodGroupManager) GetPodGroup(pod *corev1.Pod) (string, *v1alpha1.PodGroup) { +func (pgMgr *PodGroupManager) GetPodGroup(ctx context.Context, pod *corev1.Pod) (string, *v1alpha1.PodGroup) { pgName := util.GetPodGroupLabel(pod) if len(pgName) == 0 { return "", nil } - pg, err := pgMgr.pgLister.PodGroups(pod.Namespace).Get(pgName) - if err != nil { + var pg v1alpha1.PodGroup + if err := pgMgr.client.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pgName}, &pg); err != nil { return fmt.Sprintf("%v/%v", pod.Namespace, pgName), nil } - return fmt.Sprintf("%v/%v", pod.Namespace, pgName), pg + return fmt.Sprintf("%v/%v", pod.Namespace, pgName), &pg } // CalculateAssignedPods returns the number of pods that has been assigned nodes: assumed or bound. diff --git a/pkg/coscheduling/core/core_test.go b/pkg/coscheduling/core/core_test.go index 86eeae440..c57f659c8 100644 --- a/pkg/coscheduling/core/core_test.go +++ b/pkg/coscheduling/core/core_test.go @@ -24,247 +24,336 @@ import ( gochache "github.com/patrickmn/go-cache" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" clicache "k8s.io/client-go/tools/cache" - "k8s.io/kubernetes/pkg/scheduler/framework" st "k8s.io/kubernetes/pkg/scheduler/testing" - "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" - fakepgclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned/fake" - pgformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions" - testutil "sigs.k8s.io/scheduler-plugins/test/util" + tu "sigs.k8s.io/scheduler-plugins/test/util" ) func TestPreFilter(t *testing.T) { - ctx := context.Background() - cs := fakepgclientset.NewSimpleClientset() - - pgInformerFactory := pgformers.NewSharedInformerFactory(cs, 0) - pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups() - pgInformerFactory.Start(ctx.Done()) scheduleTimeout := 10 * time.Second - pg := testutil.MakePG("pg", "ns1", 2, nil, nil) - pg1 := testutil.MakePG("pg1", "ns1", 2, nil, nil) - pg2 := testutil.MakePG("pg2", "ns1", 2, nil, &corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("4")}) - pg3 := testutil.MakePG("pg3", "ns1", 2, nil, &corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("40")}) - pgInformer.Informer().GetStore().Add(pg) - pgInformer.Informer().GetStore().Add(pg1) - pgInformer.Informer().GetStore().Add(pg2) - pgInformer.Informer().GetStore().Add(pg3) - pgLister := pgInformer.Lister() + capacity := map[corev1.ResourceName]string{ + corev1.ResourceCPU: "4", + } + nodes := []*corev1.Node{ + st.MakeNode().Name("node-a").Capacity(capacity).Obj(), + st.MakeNode().Name("node-b").Capacity(capacity).Obj(), + } tests := []struct { name string pod *corev1.Pod - pods []*corev1.Pod + pendingPods []*corev1.Pod + pgs []*v1alpha1.PodGroup expectedSuccess bool }{ { name: "pod does not belong to any pg", - pod: st.MakePod().Name("p").UID("p").Namespace("ns1").Obj(), - pods: []*corev1.Pod{ - st.MakePod().Name("pg1-1").UID("pg1-1").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), - st.MakePod().Name("pg2-1").UID("pg2-1").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg2").Obj(), + pod: st.MakePod().Name("p").Obj(), + pendingPods: []*corev1.Pod{ + st.MakePod().Name("p1").Namespace("ns").UID("p1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + st.MakePod().Name("p2").Namespace("ns").UID("p2").Label(v1alpha1.PodGroupLabel, "pg2").Obj(), + }, + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns").MinMember(1).Obj(), + tu.MakePodGroup().Name("pg2").Namespace("ns").MinMember(1).Obj(), }, expectedSuccess: true, }, { - name: "pod belongs to a non-existing pg", - pod: st.MakePod().Name("p2").UID("p2").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg-notexisting").Obj(), + name: "pod belongs to a non-existent pg", + pod: st.MakePod().Name("p").Namespace("ns").UID("p").Label(v1alpha1.PodGroupLabel, "pg-non-existent").Obj(), + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns").MinMember(1).Obj(), + }, expectedSuccess: true, }, { name: "pod count less than minMember", - pod: st.MakePod().Name("p2").UID("p2").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), - pods: []*corev1.Pod{ - st.MakePod().Name("pg2-1").UID("pg2-1").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg2").Obj(), + pod: st.MakePod().Name("p").Namespace("ns").UID("p").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + pendingPods: []*corev1.Pod{ + st.MakePod().Name("pg2-1").Namespace("ns").UID("p2-1").Label(v1alpha1.PodGroupLabel, "pg2").Obj(), + }, + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns").MinMember(2).Obj(), + tu.MakePodGroup().Name("pg2").Namespace("ns").MinMember(2).Obj(), }, expectedSuccess: false, }, { name: "pod count equal minMember", - pod: st.MakePod().Name("p2").UID("p2").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), - pods: []*corev1.Pod{ - st.MakePod().Name("pg1-1").UID("pg1-1").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), - st.MakePod().Name("pg2-1").UID("pg2-1").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + pod: st.MakePod().Name("p1a").Namespace("ns").UID("p1a").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + pendingPods: []*corev1.Pod{ + st.MakePod().Name("p1b").Namespace("ns").UID("p1b").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + st.MakePod().Name("p1c").Namespace("ns").UID("p1c").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + }, + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns").MinMember(2).Obj(), + tu.MakePodGroup().Name("pg2").Namespace("ns").MinMember(2).Obj(), }, expectedSuccess: true, }, { - name: "pod count more minMember", - pod: st.MakePod().Name("p2").UID("p2").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), - pods: []*corev1.Pod{ - st.MakePod().Name("pg1-1").UID("pg1-1").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), - st.MakePod().Name("pg2-1").UID("pg2-1").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), - st.MakePod().Name("pg3-1").UID("pg3-1").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + name: "pod count more than minMember", + pod: st.MakePod().Name("p1a").Namespace("ns").UID("p1a").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + pendingPods: []*corev1.Pod{ + st.MakePod().Name("p1b").Namespace("ns").UID("p1b").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + st.MakePod().Name("p1c").Namespace("ns").UID("p1c").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + }, + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns").MinMember(2).Obj(), + tu.MakePodGroup().Name("pg2").Namespace("ns").MinMember(2).Obj(), }, expectedSuccess: true, }, { - name: "cluster resource enough, min Resource", - pod: st.MakePod().Name("p2-1").UID("p2-1").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg2"). - Req(map[corev1.ResourceName]string{corev1.ResourceCPU: "1"}).Obj(), - pods: []*corev1.Pod{ - st.MakePod().Name("pg1-1").UID("pg1-1").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg2").Obj(), - st.MakePod().Name("pg2-1").UID("pg2-1").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg2").Obj(), + // Previously we defined 2 nodes, each with 4 cpus. Now the PodGroup's minResources req is 6 cpus. + name: "cluster's resource satisfies minResource", // Although it'd fail in Filter() + pod: st.MakePod().Name("p1a").Namespace("ns").UID("p1a").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + pendingPods: []*corev1.Pod{ + st.MakePod().Name("p1b").Namespace("ns").UID("p1b").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + st.MakePod().Name("p1c").Namespace("ns").UID("p1c").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + }, + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns").MinMember(2). + MinResources(map[corev1.ResourceName]string{corev1.ResourceCPU: "6"}).Obj(), }, expectedSuccess: true, }, { - name: "cluster resource not enough, min Resource", - pod: st.MakePod().Name("p2-1").UID("p2-1").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg3"). - Req(map[corev1.ResourceName]string{corev1.ResourceCPU: "20"}).Obj(), - pods: []*corev1.Pod{ - st.MakePod().Name("pg1-1").UID("pg1-1").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg3").Obj(), - st.MakePod().Name("pg2-1").UID("pg2-1").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg3").Obj(), + // Previously we defined 2 nodes, each with 4 cpus. Now the PodGroup's minResources req is 10 cpus. + name: "cluster's resource cannot satisfy minResource", + pod: st.MakePod().Name("p1a").Namespace("ns").UID("p1a").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + pendingPods: []*corev1.Pod{ + st.MakePod().Name("p1b").Namespace("ns").UID("p1b").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + st.MakePod().Name("p1c").Namespace("ns").UID("p1c").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), }, - expectedSuccess: false, - }, - { - name: "cluster resource enough not required", - pod: st.MakePod().Name("p2-1").UID("p2-1").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), - pods: []*corev1.Pod{ - st.MakePod().Name("pg1-1").UID("pg1-1").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), - st.MakePod().Name("pg2-1").UID("pg2-1").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns").MinMember(2). + MinResources(map[corev1.ResourceName]string{corev1.ResourceCPU: "10"}).Obj(), }, - expectedSuccess: true, + expectedSuccess: false, }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Compile all objects into `objs`. + var objs []runtime.Object + for _, pod := range append(tt.pendingPods, tt.pod) { + objs = append(objs, pod) + } + for _, pg := range tt.pgs { + objs = append(objs, pg) + } + + client, err := tu.NewFakeClient(objs...) + if err != nil { + t.Fatal(err) + } + cs := clientsetfake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(cs, 0) podInformer := informerFactory.Core().V1().Pods() - existingPods, allNodes := testutil.MakeNodesAndPods(map[string]string{"test": "a"}, 60, 30) - snapshot := testutil.NewFakeSharedLister(existingPods, allNodes) - pgMgr := &PodGroupManager{pgLister: pgLister, permittedPG: newCache(), - snapshotSharedLister: snapshot, podLister: podInformer.Lister(), scheduleTimeout: &scheduleTimeout, backedOffPG: gochache.New(10*time.Second, 10*time.Second)} + + pgMgr := &PodGroupManager{ + client: client, + snapshotSharedLister: tu.NewFakeSharedLister(tt.pendingPods, nodes), + podLister: podInformer.Lister(), + scheduleTimeout: &scheduleTimeout, + permittedPG: newCache(), + backedOffPG: newCache(), + } + informerFactory.Start(ctx.Done()) if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) { t.Fatal("WaitForCacheSync failed") } - for _, p := range tt.pods { + for _, p := range tt.pendingPods { podInformer.Informer().GetStore().Add(p) } - err := pgMgr.PreFilter(ctx, tt.pod) + + err = pgMgr.PreFilter(ctx, tt.pod) if (err == nil) != tt.expectedSuccess { - t.Errorf("desire %v, get %v", tt.expectedSuccess, err == nil) + t.Errorf("Want %v, but got %v", tt.expectedSuccess, err == nil) } }) } } func TestPermit(t *testing.T) { - ctx := context.Background() - pg := testutil.MakePG("pg", "ns1", 2, nil, nil) - pg1 := testutil.MakePG("pg1", "ns1", 2, nil, nil) - fakeClient := fakepgclientset.NewSimpleClientset(pg, pg1) - - pgInformerFactory := pgformers.NewSharedInformerFactory(fakeClient, 0) - pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups() - pgInformerFactory.Start(ctx.Done()) - - pgInformer.Informer().GetStore().Add(pg) - pgInformer.Informer().GetStore().Add(pg1) - pgLister := pgInformer.Lister() + scheduleTimeout := 10 * time.Second + capacity := map[corev1.ResourceName]string{ + corev1.ResourceCPU: "4", + } + nodes := []*corev1.Node{ + st.MakeNode().Name("node").Capacity(capacity).Obj(), + } - existingPods, allNodes := testutil.MakeNodesAndPods(map[string]string{v1alpha1.PodGroupLabel: "pg1"}, 1, 1) - existingPods[0].Spec.NodeName = allNodes[0].Name - existingPods[0].Namespace = "ns1" - snapshot := testutil.NewFakeSharedLister(existingPods, allNodes) - timeout := 10 * time.Second tests := []struct { - name string - pod *corev1.Pod - snapshot framework.SharedLister - want Status + name string + pod *corev1.Pod + existingPods []*corev1.Pod + pgs []*v1alpha1.PodGroup + want Status }{ { - name: "pod does not belong to any pg, allow", - pod: st.MakePod().Name("p").UID("p").Namespace("ns1").Obj(), + name: "pod does not belong to any pg", + pod: st.MakePod().Name("p").Namespace("ns").UID("p").Obj(), + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns").MinMember(2).Obj(), + }, want: PodGroupNotSpecified, }, { - name: "pod belongs to a non-existing pg", - pod: st.MakePod().Name("p").UID("p").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg-noexist").Obj(), + name: "pod specifies a non-existent pg", + pod: st.MakePod().Name("p").Namespace("ns").UID("p").Label(v1alpha1.PodGroupLabel, "pg-non-existent").Obj(), + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns").MinMember(2).Obj(), + }, want: PodGroupNotFound, }, { - name: "pod belongs to a pg that doesn't have enough pods", - pod: st.MakePod().Name("p").UID("p").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), - snapshot: testutil.NewFakeSharedLister([]*corev1.Pod{}, []*corev1.Node{}), - want: Wait, + name: "pod belongs to a pg that doesn't have quorum", + pod: st.MakePod().Name("p").Namespace("ns").UID("p").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns").MinMember(2).Obj(), + }, + want: Wait, }, { - name: "pod belongs to a pg that has enough pods", - pod: st.MakePod().Name("p").UID("p").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), - snapshot: snapshot, - want: Success, + name: "pod belongs to a pg that have quorum satisfied", + pod: st.MakePod().Name("p1a").Namespace("ns").UID("p1a").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + existingPods: []*corev1.Pod{ + st.MakePod().Name("p1b").Namespace("ns").UID("p1b").Label(v1alpha1.PodGroupLabel, "pg1").Node("node").Obj(), + }, + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns").MinMember(2).Obj(), + }, + want: Success, }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - pgMgr := &PodGroupManager{pgClient: fakeClient, pgLister: pgLister, scheduleTimeout: &timeout, snapshotSharedLister: tt.snapshot} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Compile all objects into `objs`. + var objs []runtime.Object + for _, pod := range append(tt.existingPods, tt.pod) { + objs = append(objs, pod) + } + for _, pg := range tt.pgs { + objs = append(objs, pg) + } + + client, err := tu.NewFakeClient(objs...) + if err != nil { + t.Fatal(err) + } + + cs := clientsetfake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(cs, 0) + podInformer := informerFactory.Core().V1().Pods() + + pgMgr := &PodGroupManager{ + client: client, + snapshotSharedLister: tu.NewFakeSharedLister(tt.existingPods, nodes), + podLister: podInformer.Lister(), + scheduleTimeout: &scheduleTimeout, + } + + informerFactory.Start(ctx.Done()) + if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) { + t.Fatal("WaitForCacheSync failed") + } + for _, p := range tt.existingPods { + podInformer.Informer().GetStore().Add(p) + } + if got := pgMgr.Permit(ctx, tt.pod); got != tt.want { - t.Errorf("Expect %v, but got %v", tt.want, got) + t.Errorf("Want %v, but got %v", tt.want, got) } }) } } func TestCheckClusterResource(t *testing.T) { - nodeRes := map[corev1.ResourceName]string{corev1.ResourceMemory: "300"} - node := st.MakeNode().Name("fake-node").Capacity(nodeRes).Obj() - snapshot := testutil.NewFakeSharedLister(nil, []*corev1.Node{node}) - nodeInfo, _ := snapshot.NodeInfos().List() + capacity := map[corev1.ResourceName]string{ + corev1.ResourceCPU: "3", + } + // In total, the cluster has 3*2 = 6 cpus. + nodes := []*corev1.Node{ + st.MakeNode().Name("node-a").Capacity(capacity).Obj(), + st.MakeNode().Name("node-b").Capacity(capacity).Obj(), + } - pod := st.MakePod().Name("t1-p1-3").Req(map[corev1.ResourceName]string{corev1.ResourceMemory: "100"}).Label(v1alpha1.PodGroupLabel, - "pg1-1").ZeroTerminationGracePeriod().Obj() - snapshotWithAssumedPod := testutil.NewFakeSharedLister([]*corev1.Pod{pod}, []*corev1.Node{node}) - scheduledNodeInfo, _ := snapshotWithAssumedPod.NodeInfos().List() tests := []struct { - name string - resourceRequest corev1.ResourceList - desiredPGName string - nodeList []*framework.NodeInfo - desiredResourceEnough bool + name string + existingPods []*corev1.Pod + minResources corev1.ResourceList + pgName string + want bool }{ { name: "Cluster resource enough", - resourceRequest: corev1.ResourceList{ - corev1.ResourceMemory: *resource.NewQuantity(10, resource.DecimalSI), + existingPods: []*corev1.Pod{ + st.MakePod().Name("p1").Namespace("ns").UID("p1").Node("node-a"). + Req(map[corev1.ResourceName]string{corev1.ResourceCPU: "2"}).Obj(), + }, + minResources: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewQuantity(4, resource.DecimalSI), }, - nodeList: nodeInfo, - desiredResourceEnough: true, + pgName: "ns/pg1", + want: true, }, { name: "Cluster resource not enough", - resourceRequest: corev1.ResourceList{ - corev1.ResourceMemory: *resource.NewQuantity(1000, resource.DecimalSI), + existingPods: []*corev1.Pod{ + st.MakePod().Name("p1").Namespace("ns").UID("p1").Node("node-a"). + Req(map[corev1.ResourceName]string{corev1.ResourceCPU: "2"}).Obj(), + st.MakePod().Name("p2").Namespace("ns").UID("p2").Node("node-b"). + Req(map[corev1.ResourceName]string{corev1.ResourceCPU: "1"}).Obj(), }, - nodeList: nodeInfo, - desiredResourceEnough: false, + minResources: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewQuantity(4, resource.DecimalSI), + }, + pgName: "ns/pg1", + want: false, }, { - name: "Cluster resource enough, some resources of the pods belonging to the group have been included", - resourceRequest: corev1.ResourceList{ - corev1.ResourceMemory: *resource.NewQuantity(250, resource.DecimalSI), + name: "Cluster resource enough as p1's resource needs to be excluded from minResources", + existingPods: []*corev1.Pod{ + st.MakePod().Name("p1").Namespace("ns").UID("p1").Label(v1alpha1.PodGroupLabel, "pg1").Node("node-a"). + Req(map[corev1.ResourceName]string{corev1.ResourceCPU: "2"}).Obj(), + st.MakePod().Name("p2").Namespace("ns").UID("p2").Node("node-b"). + Req(map[corev1.ResourceName]string{corev1.ResourceCPU: "1"}).Obj(), + }, + minResources: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewQuantity(4, resource.DecimalSI), }, - nodeList: scheduledNodeInfo, - desiredResourceEnough: true, - desiredPGName: "pg1-1", + pgName: "ns/pg1", + want: true, }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := CheckClusterResource(tt.nodeList, tt.resourceRequest, tt.desiredPGName) - if (err == nil) != tt.desiredResourceEnough { - t.Errorf("want resource enough %v, but got %v", tt.desiredResourceEnough, err != nil) + snapshotSharedLister := tu.NewFakeSharedLister(tt.existingPods, nodes) + nodeInfoList, _ := snapshotSharedLister.NodeInfos().List() + err := CheckClusterResource(nodeInfoList, tt.minResources, tt.pgName) + if (err == nil) != tt.want { + t.Errorf("Expect the cluster resource to be satified: %v, but got %v", tt.want, err == nil) } }) } - } func newCache() *gochache.Cache { diff --git a/pkg/coscheduling/coscheduling.go b/pkg/coscheduling/coscheduling.go index 01a04407f..81e96e8e7 100644 --- a/pkg/coscheduling/coscheduling.go +++ b/pkg/coscheduling/coscheduling.go @@ -24,17 +24,16 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/cache" + clientscheme "k8s.io/client-go/kubernetes/scheme" corev1helpers "k8s.io/component-helpers/scheduling/corev1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/scheduler-plugins/apis/config" "sigs.k8s.io/scheduler-plugins/apis/scheduling" "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" "sigs.k8s.io/scheduler-plugins/pkg/coscheduling/core" - pgclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" - pgformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions" "sigs.k8s.io/scheduler-plugins/pkg/util" ) @@ -66,16 +65,23 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) return nil, fmt.Errorf("want args to be of type CoschedulingArgs, got %T", obj) } - pgClient := pgclientset.NewForConfigOrDie(handle.KubeConfig()) - pgInformerFactory := pgformers.NewSharedInformerFactory(pgClient, 0) - pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups() - podInformer := handle.SharedInformerFactory().Core().V1().Pods() + scheme := runtime.NewScheme() + _ = clientscheme.AddToScheme(scheme) + _ = v1.AddToScheme(scheme) + _ = v1alpha1.AddToScheme(scheme) + client, err := client.New(handle.KubeConfig(), client.Options{Scheme: scheme}) + if err != nil { + return nil, err + } scheduleTimeDuration := time.Duration(args.PermitWaitingTimeSeconds) * time.Second - - ctx := context.TODO() - - pgMgr := core.NewPodGroupManager(pgClient, handle.SnapshotSharedLister(), &scheduleTimeDuration, pgInformer, podInformer) + pgMgr := core.NewPodGroupManager( + client, + handle.SnapshotSharedLister(), + &scheduleTimeDuration, + // Keep the podInformer (from frameworkHandle) as the single source of Pods. + handle.SharedInformerFactory().Core().V1().Pods(), + ) plugin := &Coscheduling{ frameworkHandler: handle, pgMgr: pgMgr, @@ -89,12 +95,6 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) pgBackoff := time.Duration(args.PodGroupBackoffSeconds) * time.Second plugin.pgBackoff = &pgBackoff } - pgInformerFactory.Start(ctx.Done()) - if !cache.WaitForCacheSync(ctx.Done(), pgInformer.Informer().HasSynced) { - err := fmt.Errorf("WaitForCacheSync failed") - klog.ErrorS(err, "Cannot sync caches") - return nil, err - } return plugin, nil } @@ -147,7 +147,7 @@ func (cs *Coscheduling) PreFilter(ctx context.Context, state *framework.CycleSta // PostFilter is used to reject a group of pods if a pod does not pass PreFilter or Filter. func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { - pgName, pg := cs.pgMgr.GetPodGroup(pod) + pgName, pg := cs.pgMgr.GetPodGroup(ctx, pod) if pg == nil { klog.V(4).InfoS("Pod does not belong to any group", "pod", klog.KObj(pod)) return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable, "can not find pod group") @@ -209,7 +209,7 @@ func (cs *Coscheduling) Permit(ctx context.Context, state *framework.CycleState, return framework.NewStatus(framework.Unschedulable, "PodGroup not found"), 0 case core.Wait: klog.InfoS("Pod is waiting to be scheduled to node", "pod", klog.KObj(pod), "nodeName", nodeName) - _, pg := cs.pgMgr.GetPodGroup(pod) + _, pg := cs.pgMgr.GetPodGroup(ctx, pod) if wait := util.GetWaitTimeDuration(pg, cs.scheduleTimeout); wait != 0 { waitTime = wait } @@ -239,7 +239,7 @@ func (cs *Coscheduling) Reserve(ctx context.Context, state *framework.CycleState // Unreserve rejects all other Pods in the PodGroup when one of the pods in the group times out. func (cs *Coscheduling) Unreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { - pgName, pg := cs.pgMgr.GetPodGroup(pod) + pgName, pg := cs.pgMgr.GetPodGroup(ctx, pod) if pg == nil { return } diff --git a/pkg/coscheduling/coscheduling_test.go b/pkg/coscheduling/coscheduling_test.go index 59cb17955..db6923538 100644 --- a/pkg/coscheduling/coscheduling_test.go +++ b/pkg/coscheduling/coscheduling_test.go @@ -20,81 +20,117 @@ import ( "context" "fmt" "reflect" + "sort" "testing" "time" + "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/tools/events" + clicache "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" - frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + fwkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" "k8s.io/utils/pointer" _ "sigs.k8s.io/scheduler-plugins/apis/config/scheme" "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" "sigs.k8s.io/scheduler-plugins/pkg/coscheduling/core" - fakepgclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned/fake" - pgformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions" - testutil "sigs.k8s.io/scheduler-plugins/test/util" + tu "sigs.k8s.io/scheduler-plugins/test/util" ) func TestPodGroupBackoffTime(t *testing.T) { + scheduleDuration := 10 * time.Second + capacity := map[v1.ResourceName]string{ + v1.ResourceCPU: "4", + } + nodes := []*v1.Node{ + st.MakeNode().Name("node").Capacity(capacity).Obj(), + } + tests := []struct { - name string - pod1 *v1.Pod - pod2 *v1.Pod - pod3 *v1.Pod + name string + pods []*v1.Pod + pgs []*v1alpha1.PodGroup + wantActivatedPods []string + want framework.Code }{ { - name: "pod in infinite scheduling loop.", - pod1: st.MakePod().Name("pod1").UID("pod1").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), - pod2: st.MakePod().Name("pod2").UID("pod2").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), - pod3: st.MakePod().Name("pod3").UID("pod3").Namespace("ns1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + name: "prevent pod falling into infinite scheduling loop", + pods: []*v1.Pod{ + st.MakePod().Name("pod1").UID("pod1").Namespace("ns").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + st.MakePod().Name("pod2").UID("pod2").Namespace("ns").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + st.MakePod().Name("pod3").UID("pod3").Namespace("ns").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + }, + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns").MinMember(3).Obj(), + }, + wantActivatedPods: []string{"ns/pod2", "ns/pod3"}, + want: framework.UnschedulableAndUnresolvable, }, } - ctx := context.Background() - cs := fakepgclientset.NewSimpleClientset() - pgInformerFactory := pgformers.NewSharedInformerFactory(cs, 0) - pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups() - pgInformerFactory.Start(ctx.Done()) - pg1 := testutil.MakePG("pg1", "ns1", 3, nil, nil) - pgInformer.Informer().GetStore().Add(pg1) - - fakeClient := clientsetfake.NewSimpleClientset() - informerFactory := informers.NewSharedInformerFactory(fakeClient, 0) - podInformer := informerFactory.Core().V1().Pods() - informerFactory.Start(ctx.Done()) - existingPods, allNodes := testutil.MakeNodesAndPods(map[string]string{"test": "a"}, 60, 30) - snapshot := testutil.NewFakeSharedLister(existingPods, allNodes) - // Compose a framework handle. - registeredPlugins := []st.RegisterPluginFunc{ - st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), - st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), - } - f, err := st.NewFramework(registeredPlugins, "", ctx.Done(), - frameworkruntime.WithClientSet(fakeClient), - frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), - frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithSnapshotSharedLister(snapshot), - ) - if err != nil { - t.Fatal(err) - } - scheduleDuration := 10 * time.Second + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - podInformer.Informer().GetStore().Add(tt.pod1) - podInformer.Informer().GetStore().Add(tt.pod2) - podInformer.Informer().GetStore().Add(tt.pod3) - pgMgr := core.NewPodGroupManager(cs, snapshot, &scheduleDuration, pgInformer, podInformer) - coscheduling := &Coscheduling{pgMgr: pgMgr, frameworkHandler: f, scheduleTimeout: &scheduleDuration, pgBackoff: pointer.Duration(time.Duration(1 * time.Second))} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Compile all objects into `objs`. + var objs []runtime.Object + for _, pod := range tt.pods { + objs = append(objs, pod) + } + for _, pg := range tt.pgs { + objs = append(objs, pg) + } + + client, err := tu.NewFakeClient(objs...) + if err != nil { + t.Fatal(err) + } + + // Compose a fake framework handle. + cs := clientsetfake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(cs, 0) + podInformer := informerFactory.Core().V1().Pods() + registeredPlugins := []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + } + f, err := st.NewFramework(registeredPlugins, "default-scheduler", ctx.Done(), fwkruntime.WithInformerFactory(informerFactory)) + if err != nil { + t.Fatal(err) + } + + pgMgr := core.NewPodGroupManager( + client, + tu.NewFakeSharedLister(tt.pods, nodes), + // In this UT, 5 seconds should suffice to test the PreFilter's return code. + pointer.Duration(5*time.Second), + podInformer, + ) + pl := &Coscheduling{ + frameworkHandler: f, + pgMgr: pgMgr, + scheduleTimeout: &scheduleDuration, + pgBackoff: pointer.Duration(1 * time.Second), + } + + informerFactory.Start(ctx.Done()) + if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) { + t.Fatal("WaitForCacheSync failed") + } + for _, p := range tt.pods { + podInformer.Informer().GetStore().Add(p) + } + state := framework.NewCycleState() state.Write(framework.PodsToActivateKey, framework.NewPodsToActivate()) - code, _ := coscheduling.Permit(context.Background(), state, tt.pod1, "test") + code, _ := pl.Permit(ctx, state, tt.pods[0], "test") if code.Code() != framework.Wait { t.Errorf("expected %v, got %v", framework.Wait, code.Code()) return @@ -110,23 +146,24 @@ func TestPodGroupBackoffTime(t *testing.T) { t.Errorf("cannot convert type %t to *framework.PodsToActivate", podsToActiveObj) return } - - var expectPodsToActivate = map[string]*v1.Pod{ - "ns1/pod2": tt.pod2, "ns1/pod3": tt.pod3, + var got []string + for podName := range podsToActive.Map { + got = append(got, podName) } - if !reflect.DeepEqual(expectPodsToActivate, podsToActive.Map) { - t.Errorf("expected %v, got %v", expectPodsToActivate, podsToActive.Map) + sort.Strings(got) + if diff := cmp.Diff(got, tt.wantActivatedPods); diff != "" { + t.Errorf("unexpected activatedPods (-want, +got): %s\n", diff) return } - coscheduling.PostFilter(context.Background(), framework.NewCycleState(), tt.pod2, nil) + pl.PostFilter(ctx, framework.NewCycleState(), tt.pods[1], nil) - _, code = coscheduling.PreFilter(context.Background(), framework.NewCycleState(), tt.pod3) - if code.Code() != framework.UnschedulableAndUnresolvable { - t.Errorf("expected %v, got %v", framework.UnschedulableAndUnresolvable, code.Code()) + _, code = pl.PreFilter(ctx, framework.NewCycleState(), tt.pods[2]) + if code.Code() != tt.want { + t.Errorf("expected %v, got %v", tt.want, code.Code()) return } - pgFullName, _ := pgMgr.GetPodGroup(tt.pod1) + pgFullName, _ := pgMgr.GetPodGroup(ctx, tt.pods[0]) if code.Reasons()[0] != fmt.Sprintf("podGroup %v failed recently", pgFullName) { t.Errorf("expected %v, got %v", pgFullName, code.Reasons()[0]) return @@ -135,371 +172,462 @@ func TestPodGroupBackoffTime(t *testing.T) { } } +func ptrTime(tm time.Time) *time.Time { + return &tm +} + func TestLess(t *testing.T) { + lowPriority, highPriority := int32(10), int32(100) now := time.Now() - times := make([]time.Time, 0) - for _, d := range []time.Duration{0, 1, 2, 3, -2, -1} { - times = append(times, now.Add(d*time.Second)) - } - ctx := context.Background() - cs := fakepgclientset.NewSimpleClientset() - pgInformerFactory := pgformers.NewSharedInformerFactory(cs, 0) - pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups() - pgInformerFactory.Start(ctx.Done()) - for _, pgInfo := range []struct { - createTime time.Time - pgNme string - ns string - minMember int32 - }{ - { - createTime: times[2], - pgNme: "pg1", - ns: "namespace1", - }, - { - createTime: times[3], - pgNme: "pg2", - ns: "namespace2", - }, - { - createTime: times[4], - pgNme: "pg3", - ns: "namespace2", - }, - { - createTime: times[5], - pgNme: "pg4", - ns: "namespace2", - }, - } { - pg := testutil.MakePG(pgInfo.pgNme, pgInfo.ns, 5, &pgInfo.createTime, nil) - pgInformer.Informer().GetStore().Add(pg) - } - fakeClient := clientsetfake.NewSimpleClientset() - informerFactory := informers.NewSharedInformerFactory(fakeClient, 0) - podInformer := informerFactory.Core().V1().Pods() - informerFactory.Start(ctx.Done()) - - existingPods, allNodes := testutil.MakeNodesAndPods(map[string]string{"test": "a"}, 60, 30) - snapshot := testutil.NewFakeSharedLister(existingPods, allNodes) - scheduleDuration := 10 * time.Second - var lowPriority, highPriority = int32(10), int32(100) - ns1, ns2 := "namespace1", "namespace2" - for _, tt := range []struct { - name string - p1 *framework.QueuedPodInfo - p2 *framework.QueuedPodInfo - expected bool + tests := []struct { + name string + p1 *framework.QueuedPodInfo + p2 *framework.QueuedPodInfo + pgs []*v1alpha1.PodGroup + want bool }{ { name: "p1.priority less than p2.priority", p1: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns1).Name("pod1").Priority(lowPriority).Obj()), + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p1").Namespace("ns1").Priority(lowPriority).Obj()), }, p2: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns2).Name("pod2").Priority(highPriority).Obj()), + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p2").Namespace("ns2").Priority(highPriority).Obj()), }, - expected: false, // p2 should be ahead of p1 in the queue + want: false, }, { name: "p1.priority greater than p2.priority", p1: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns1).Name("pod1").Priority(highPriority).Obj()), + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p1").Namespace("ns1").Priority(highPriority).Obj()), }, p2: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns2).Name("pod2").Priority(lowPriority).Obj()), + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p2").Namespace("ns2").Priority(lowPriority).Obj()), }, - expected: true, // p1 should be ahead of p2 in the queue + want: true, }, { name: "equal priority. p1 is added to schedulingQ earlier than p2", p1: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns1).Name("pod1").Priority(highPriority).Obj()), - InitialAttemptTimestamp: ×[0], + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p1").Namespace("ns1").Priority(highPriority).Obj()), + InitialAttemptTimestamp: ptrTime(now.Add(time.Second * 1)), }, p2: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns2).Name("pod2").Priority(highPriority).Obj()), - InitialAttemptTimestamp: ×[1], + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p2").Namespace("ns2").Priority(highPriority).Obj()), + InitialAttemptTimestamp: ptrTime(now.Add(time.Second * 2)), }, - expected: true, // p1 should be ahead of p2 in the queue + want: true, }, { name: "equal priority. p2 is added to schedulingQ earlier than p1", p1: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns1).Name("pod1").Priority(highPriority).Obj()), - InitialAttemptTimestamp: ×[1], + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p1").Namespace("ns1").Priority(highPriority).Obj()), + InitialAttemptTimestamp: ptrTime(now.Add(time.Second * 2)), }, p2: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns2).Name("pod2").Priority(highPriority).Obj()), - InitialAttemptTimestamp: ×[0], + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p2").Namespace("ns2").Priority(highPriority).Obj()), + InitialAttemptTimestamp: ptrTime(now.Add(time.Second * 1)), }, - expected: false, // p2 should be ahead of p1 in the queue + want: false, }, { - name: "p1.priority less than p2.priority, p1 belongs to podGroup1", + name: "p1.priority less than p2.priority, p1 belongs to pg1", p1: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns1).Name("pod1").Priority(lowPriority).Label(v1alpha1.PodGroupLabel, "pg1").Obj()), + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p1").Namespace("ns1").Priority(lowPriority). + Label(v1alpha1.PodGroupLabel, "pg1").Obj()), }, p2: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns2).Name("pod2").Priority(highPriority).Obj()), + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p2").Namespace("ns2").Priority(highPriority).Obj()), + }, + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns1").Obj(), }, - expected: false, // p2 should be ahead of p1 in the queue + want: false, }, { - name: "p1.priority greater than p2.priority, p1 belongs to podGroup1", + name: "p1.priority greater than p2.priority, p1 belongs to pg1", p1: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns1).Name("pod1").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "pg1").Obj()), + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p1").Namespace("ns1").Priority(highPriority). + Label(v1alpha1.PodGroupLabel, "pg1").Obj()), }, p2: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns2).Name("pod2").Priority(lowPriority).Obj()), + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p2").Namespace("ns2").Priority(lowPriority).Obj()), }, - expected: true, // p1 should be ahead of p2 in the queue + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns1").Obj(), + }, + want: true, }, { - name: "equal priority. p1 is added to schedulingQ earlier than p2, p1 belongs to podGroup3", + name: "equal priority. p1 is added to schedulingQ earlier than p2, p1 belongs to pg1", p1: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns1).Name("pod1").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "pg3").Obj()), - InitialAttemptTimestamp: ×[0], + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p1").Namespace("ns1").Priority(highPriority). + Label(v1alpha1.PodGroupLabel, "pg1").Obj()), + InitialAttemptTimestamp: ptrTime(now.Add(time.Second * 1)), }, p2: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns2).Name("pod2").Priority(highPriority).Obj()), - InitialAttemptTimestamp: ×[1], + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p2").Namespace("ns2").Priority(highPriority).Obj()), + InitialAttemptTimestamp: ptrTime(now.Add(time.Second * 2)), + }, + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns1").Time(now.Add(time.Second * 1)).Obj(), }, - expected: true, // p1 should be ahead of p2 in the queue + want: true, }, { - name: "equal priority. p2 is added to schedulingQ earlier than p1, p1 belongs to podGroup3", + name: "equal priority. p2 is added to schedulingQ earlier than p1, p1 belongs to pg1", p1: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns1).Name("pod1").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "pg3").Obj()), - InitialAttemptTimestamp: ×[1], + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p1").Namespace("ns1").Priority(highPriority). + Label(v1alpha1.PodGroupLabel, "pg1").Obj()), + InitialAttemptTimestamp: ptrTime(now.Add(time.Second * 2)), }, p2: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns2).Name("pod2").Priority(highPriority).Obj()), - InitialAttemptTimestamp: ×[0], + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p2").Namespace("ns2").Priority(highPriority).Obj()), + InitialAttemptTimestamp: ptrTime(now.Add(time.Second * 1)), }, - expected: false, // p2 should be ahead of p1 in the queue + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns1").Time(now.Add(time.Second * 2)).Obj(), + }, + want: false, }, - { - name: "p1.priority less than p2.priority, p1 belongs to podGroup1 and p2 belongs to podGroup2", + name: "p1.priority less than p2.priority, p1 belongs to pg1 and p2 belongs to pg2", p1: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns1).Name("pod1").Priority(lowPriority).Label(v1alpha1.PodGroupLabel, "pg1").Obj()), + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p1").Namespace("ns1").Priority(lowPriority). + Label(v1alpha1.PodGroupLabel, "pg1").Obj()), }, p2: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns2).Name("pod2").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "pg2").Obj()), + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p2").Namespace("ns2").Priority(highPriority). + Label(v1alpha1.PodGroupLabel, "pg2").Obj()), + }, + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns1").Time(now.Add(time.Second * 1)).Obj(), + tu.MakePodGroup().Name("pg2").Namespace("ns2").Time(now.Add(time.Second * 2)).Obj(), }, - expected: false, // p2 should be ahead of p1 in the queue + want: false, }, { - name: "p1.priority greater than p2.priority, p1 belongs to podGroup1 and p2 belongs to podGroup2", + name: "p1.priority greater than p2.priority, p1 belongs to pg1 and and p2 belongs to pg2", p1: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns1).Name("pod1").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "pg1").Obj()), + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p1").Namespace("ns1").Priority(highPriority). + Label(v1alpha1.PodGroupLabel, "pg1").Obj()), }, p2: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns2).Name("pod2").Priority(lowPriority).Label(v1alpha1.PodGroupLabel, "pg2").Obj()), + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p2").Namespace("ns2").Priority(lowPriority). + Label(v1alpha1.PodGroupLabel, "pg1").Obj()), }, - expected: true, // p1 should be ahead of p2 in the queue + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns1").Time(now.Add(time.Second * 1)).Obj(), + tu.MakePodGroup().Name("pg2").Namespace("ns2").Time(now.Add(time.Second * 2)).Obj(), + }, + want: true, }, { - name: "equal priority. p1 is added to schedulingQ earlier than p2, p1 belongs to podGroup1 and p2 belongs to podGroup2", + name: "equal priority. p1 is added to schedulingQ earlier than p2, p1 belongs to pg1 and p2 belongs to pg2", p1: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns1).Name("pod1").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "pg1").Obj()), - InitialAttemptTimestamp: ×[0], + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p1").Namespace("ns1").Priority(highPriority). + Label(v1alpha1.PodGroupLabel, "pg1").Obj()), + InitialAttemptTimestamp: ptrTime(now.Add(time.Second * 1)), }, p2: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns2).Name("pod2").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "pg2").Obj()), - InitialAttemptTimestamp: ×[1], + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p2").Namespace("ns2").Priority(highPriority). + Label(v1alpha1.PodGroupLabel, "pg2").Obj()), + InitialAttemptTimestamp: ptrTime(now.Add(time.Second * 2)), + }, + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns1").Time(now.Add(time.Second * 1)).Obj(), + tu.MakePodGroup().Name("pg2").Namespace("ns2").Time(now.Add(time.Second * 2)).Obj(), }, - expected: true, // p1 should be ahead of p2 in the queue + want: true, }, { - name: "equal priority. p2 is added to schedulingQ earlier than p1, p1 belongs to podGroup4 and p2 belongs to podGroup3", + name: "equal priority. p2 is added to schedulingQ earlier than p1, p1 belongs to pg2 and p2 belongs to pg1", p1: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns1).Name("pod1").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "pg4").Obj()), - InitialAttemptTimestamp: ×[1], + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p1").Namespace("ns").Priority(lowPriority). + Label(v1alpha1.PodGroupLabel, "pg2").Obj()), + InitialAttemptTimestamp: ptrTime(now.Add(time.Second * 1)), }, p2: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns2).Name("pod2").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "pg3").Obj()), - InitialAttemptTimestamp: ×[0], + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p2").Namespace("ns").Priority(highPriority). + Label(v1alpha1.PodGroupLabel, "pg1").Obj()), + InitialAttemptTimestamp: ptrTime(now.Add(time.Second * 2)), }, - expected: false, // p2 should be ahead of p1 in the queue + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns").Time(now.Add(time.Second * 1)).Obj(), + tu.MakePodGroup().Name("pg2").Namespace("ns").Time(now.Add(time.Second * 2)).Obj(), + }, + want: false, }, { - name: "equal priority and creation time, p1 belongs to podGroup1 and p2 belongs to podGroup2", + name: "equal priority and creation time, p1 belongs pg1 and p2 belong to pg2", p1: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns1).Name("pod1").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "pg1").Obj()), - InitialAttemptTimestamp: ×[0], + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p1").Namespace("ns1").Priority(highPriority). + Label(v1alpha1.PodGroupLabel, "pg2").Obj()), + InitialAttemptTimestamp: ptrTime(now.Add(time.Second * 1)), }, p2: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns2).Name("pod2").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "pg2").Obj()), - InitialAttemptTimestamp: ×[0], + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p2").Namespace("ns2").Priority(highPriority). + Label(v1alpha1.PodGroupLabel, "pg2").Obj()), + InitialAttemptTimestamp: ptrTime(now.Add(time.Second * 1)), + }, + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns1").Time(now.Add(time.Second * 1)).Obj(), + tu.MakePodGroup().Name("pg2").Namespace("ns2").Time(now.Add(time.Second * 2)).Obj(), }, - expected: true, // p1 should be ahead of p2 in the queue + want: true, }, { - name: "equal priority and creation time, p2 belong to podGroup2", + name: "equal priority and creation time, and p2 belong to pg2", p1: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns1).Name("pod1").Priority(highPriority).Obj()), - InitialAttemptTimestamp: ×[0], + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p1").Namespace("ns1").Priority(highPriority).Obj()), + InitialAttemptTimestamp: ptrTime(now.Add(time.Second * 1)), }, p2: &framework.QueuedPodInfo{ - PodInfo: testutil.MustNewPodInfo(t, st.MakePod().Namespace(ns2).Name("pod2").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "pg2").Obj()), - InitialAttemptTimestamp: ×[0], + PodInfo: tu.MustNewPodInfo(t, st.MakePod().Name("p2").Namespace("ns2").Priority(highPriority). + Label(v1alpha1.PodGroupLabel, "pg2").Obj()), + InitialAttemptTimestamp: ptrTime(now.Add(time.Second * 1)), }, - expected: true, // p1 should be ahead of p2 in the queue + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg2").Namespace("ns2").Time(now.Add(time.Second * 2)).Obj(), + }, + want: true, }, - } { + } + + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - pgMgr := core.NewPodGroupManager(cs, snapshot, &scheduleDuration, pgInformer, podInformer) - coscheduling := &Coscheduling{pgMgr: pgMgr} - if got := coscheduling.Less(tt.p1, tt.p2); got != tt.expected { - t.Errorf("expected %v, got %v", tt.expected, got) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Compile all objects into `objs`. + var objs []runtime.Object + for _, pg := range tt.pgs { + objs = append(objs, pg) + } + + client, err := tu.NewFakeClient(objs...) + if err != nil { + t.Fatal(err) + } + cs := clientsetfake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(cs, 0) + podInformer := informerFactory.Core().V1().Pods() + + pl := &Coscheduling{pgMgr: core.NewPodGroupManager(client, nil, nil, podInformer)} + + informerFactory.Start(ctx.Done()) + if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) { + t.Fatal("WaitForCacheSync failed") + } + + if got := pl.Less(tt.p1, tt.p2); got != tt.want { + t.Errorf("Want %v, got %v", tt.want, got) } }) } } func TestPermit(t *testing.T) { + scheduleTimeout := 10 * time.Second + capacity := map[v1.ResourceName]string{ + v1.ResourceCPU: "4", + } + nodes := []*v1.Node{ + st.MakeNode().Name("node").Capacity(capacity).Obj(), + } + tests := []struct { - name string - pod *v1.Pod - expected framework.Code + name string + pod *v1.Pod + pgs []*v1alpha1.PodGroup + want framework.Code }{ { - name: "pods do not belong to any podGroup", - pod: st.MakePod().Name("pod1").UID("pod1").Obj(), - expected: framework.Success, + name: "pods do not belong to any podGroup", + pod: st.MakePod().Name("p").Namespace("ns").UID("p").Obj(), + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns").MinMember(1).Obj(), + tu.MakePodGroup().Name("pg2").Namespace("ns").MinMember(2).Obj(), + }, + want: framework.Success, }, { - name: "pods belong to a podGroup, Wait", - pod: st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), - expected: framework.Wait, + name: "pods belong to a pg1, but quorum not satisfied", + pod: st.MakePod().Name("p").Namespace("ns").UID("p").Label(v1alpha1.PodGroupLabel, "pg2").Obj(), + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns").MinMember(1).Obj(), + tu.MakePodGroup().Name("pg2").Namespace("ns").MinMember(2).Obj(), + }, + want: framework.Wait, }, { - name: "pods belong to a podGroup, Allow", - pod: st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Label(v1alpha1.PodGroupLabel, "pg2").Obj(), - expected: framework.Success, + name: "pods belong to a podGroup, and quorum satisfied", + pod: st.MakePod().Name("p").Namespace("ns").UID("p").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns").MinMember(1).Obj(), + tu.MakePodGroup().Name("pg2").Namespace("ns").MinMember(2).Obj(), + }, + want: framework.Success, }, } - ctx := context.Background() - cs := fakepgclientset.NewSimpleClientset() - pgInformerFactory := pgformers.NewSharedInformerFactory(cs, 0) - pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups() - pgInformerFactory.Start(ctx.Done()) - pg1 := testutil.MakePG("pg1", "ns1", 2, nil, nil) - pg2 := testutil.MakePG("pg2", "ns1", 1, nil, nil) - pgInformer.Informer().GetStore().Add(pg1) - pgInformer.Informer().GetStore().Add(pg2) - - fakeClient := clientsetfake.NewSimpleClientset() - informerFactory := informers.NewSharedInformerFactory(fakeClient, 0) - podInformer := informerFactory.Core().V1().Pods() - informerFactory.Start(ctx.Done()) - existingPods, allNodes := testutil.MakeNodesAndPods(map[string]string{"test": "a"}, 60, 30) - snapshot := testutil.NewFakeSharedLister(existingPods, allNodes) - // Compose a framework handle. - registeredPlugins := []st.RegisterPluginFunc{ - st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), - st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), - } - f, err := st.NewFramework(registeredPlugins, "", ctx.Done(), - frameworkruntime.WithClientSet(fakeClient), - frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), - frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithSnapshotSharedLister(snapshot), - ) - if err != nil { - t.Fatal(err) - } - scheduleDuration := 10 * time.Second + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - pgMgr := core.NewPodGroupManager(cs, snapshot, &scheduleDuration, pgInformer, podInformer) - coscheduling := &Coscheduling{pgMgr: pgMgr, frameworkHandler: f, scheduleTimeout: &scheduleDuration} - code, _ := coscheduling.Permit(context.Background(), framework.NewCycleState(), tt.pod, "test") - if code.Code() != tt.expected { - t.Errorf("expected %v, got %v", tt.expected, code.Code()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Compile all objects into `objs`. + var objs []runtime.Object + objs = append(objs, tt.pod) + for _, pg := range tt.pgs { + objs = append(objs, pg) + } + + client, err := tu.NewFakeClient(objs...) + if err != nil { + t.Fatal(err) + } + + // Compose a fake framework handle. + registeredPlugins := []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + } + f, err := st.NewFramework(registeredPlugins, "default-scheduler", ctx.Done()) + if err != nil { + t.Fatal(err) + } + cs := clientsetfake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(cs, 0) + podInformer := informerFactory.Core().V1().Pods() + + pl := &Coscheduling{ + frameworkHandler: f, + pgMgr: core.NewPodGroupManager(client, tu.NewFakeSharedLister(nil, nodes), nil, podInformer), + scheduleTimeout: &scheduleTimeout, + } + + informerFactory.Start(ctx.Done()) + if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) { + t.Fatal("WaitForCacheSync failed") + } + + code, _ := pl.Permit(ctx, framework.NewCycleState(), tt.pod, "node") + if got := code.Code(); got != tt.want { + t.Errorf("Want %v, but got %v", tt.want, got) } }) } } func TestPostFilter(t *testing.T) { - nodeStatusMap := framework.NodeToStatusMap{"node1": framework.NewStatus(framework.Success, "")} - ctx := context.Background() - cs := fakepgclientset.NewSimpleClientset() - pgInformerFactory := pgformers.NewSharedInformerFactory(cs, 0) - pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups() - pgInformerFactory.Start(ctx.Done()) - pg := testutil.MakePG("pg", "ns1", 2, nil, nil) - pgInformer.Informer().GetStore().Add(pg) - fakeClient := clientsetfake.NewSimpleClientset() - informerFactory := informers.NewSharedInformerFactory(fakeClient, 0) - podInformer := informerFactory.Core().V1().Pods() - informerFactory.Start(ctx.Done()) - - existingPods, allNodes := testutil.MakeNodesAndPods(map[string]string{"test": "a"}, 60, 30) - snapshot := testutil.NewFakeSharedLister(existingPods, allNodes) - // Compose a framework handle. - registeredPlugins := []st.RegisterPluginFunc{ - st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), - st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + scheduleTimeout := 10 * time.Second + capacity := map[v1.ResourceName]string{ + v1.ResourceCPU: "4", } - f, err := st.NewFramework(registeredPlugins, "", ctx.Done(), - frameworkruntime.WithClientSet(fakeClient), - frameworkruntime.WithEventRecorder(&events.FakeRecorder{}), - frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithSnapshotSharedLister(snapshot), - ) - if err != nil { - t.Fatal(err) + nodes := []*v1.Node{ + st.MakeNode().Name("node").Capacity(capacity).Obj(), } + nodeStatusMap := framework.NodeToStatusMap{"node": framework.NewStatus(framework.Success, "")} - existingPods, allNodes = testutil.MakeNodesAndPods(map[string]string{v1alpha1.PodGroupLabel: "pg"}, 10, 30) - for _, pod := range existingPods { - pod.Namespace = "ns1" - } - groupPodSnapshot := testutil.NewFakeSharedLister(existingPods, allNodes) - scheduleDuration := 10 * time.Second tests := []struct { - name string - pod *v1.Pod - expectedEmptyMsg bool - snapshotSharedLister framework.SharedLister + name string + pod *v1.Pod + existingPods []*v1.Pod + pgs []*v1alpha1.PodGroup + want *framework.Status }{ { - name: "pod does not belong to any pod group", - pod: st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Obj(), - expectedEmptyMsg: false, + name: "pod does not belong to any pod group", + pod: st.MakePod().Name("p").Namespace("ns").UID("p").Obj(), + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns").MinMember(2).Obj(), + }, + want: framework.NewStatus(framework.Unschedulable, "can not find pod group"), }, { - name: "enough pods assigned, do not reject all", - pod: st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Label(v1alpha1.PodGroupLabel, "pg").Obj(), - expectedEmptyMsg: true, - snapshotSharedLister: groupPodSnapshot, + name: "enough pods assigned, do not reject all", + pod: st.MakePod().Name("p").Namespace("ns").UID("p").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + existingPods: []*v1.Pod{ + st.MakePod().Name("p1").Namespace("ns").UID("p1").Node("node").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + }, + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns").MinMember(1).Obj(), + }, + want: framework.NewStatus(framework.Unschedulable), }, { - name: "pod failed at filter phase, reject all pods", - pod: st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Label(v1alpha1.PodGroupLabel, "pg").Obj(), - expectedEmptyMsg: false, + name: "pod failed at filter phase, reject all pods", + pod: st.MakePod().Name("p").Namespace("ns").UID("p").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + existingPods: []*v1.Pod{ + st.MakePod().Name("p1").Namespace("ns").UID("p1").Node("node").Label(v1alpha1.PodGroupLabel, "pg1").Obj(), + }, + pgs: []*v1alpha1.PodGroup{ + tu.MakePodGroup().Name("pg1").Namespace("ns").MinMember(2).Obj(), + }, + want: framework.NewStatus( + framework.Unschedulable, + "PodGroup ns/pg1 gets rejected due to Pod p is unschedulable even after PostFilter", + ), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cycleState := framework.NewCycleState() - mgrSnapShot := snapshot - if tt.snapshotSharedLister != nil { - mgrSnapShot = tt.snapshotSharedLister + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Compile all objects into `objs`. + var objs []runtime.Object + for _, pod := range append(tt.existingPods, tt.pod) { + objs = append(objs, pod) + } + for _, pg := range tt.pgs { + objs = append(objs, pg) + } + + client, err := tu.NewFakeClient(objs...) + if err != nil { + t.Fatal(err) + } + + // Compose a fake framework handle. + registeredPlugins := []st.RegisterPluginFunc{ + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + } + f, err := st.NewFramework(registeredPlugins, "default-scheduler", ctx.Done()) + if err != nil { + t.Fatal(err) + } + + cs := clientsetfake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(cs, 0) + podInformer := informerFactory.Core().V1().Pods() + + pl := &Coscheduling{ + frameworkHandler: f, + pgMgr: core.NewPodGroupManager( + client, + tu.NewFakeSharedLister(tt.existingPods, nodes), + &scheduleTimeout, + podInformer, + ), + scheduleTimeout: &scheduleTimeout, + } + + informerFactory.Start(ctx.Done()) + if !clicache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced) { + t.Fatal("WaitForCacheSync failed") + } + for _, p := range tt.existingPods { + podInformer.Informer().GetStore().Add(p) } - pgMgr := core.NewPodGroupManager(cs, mgrSnapShot, &scheduleDuration, pgInformer, podInformer) - coscheduling := &Coscheduling{pgMgr: pgMgr, frameworkHandler: f, scheduleTimeout: &scheduleDuration} - _, code := coscheduling.PostFilter(context.Background(), cycleState, tt.pod, nodeStatusMap) - if code.Message() == "" != tt.expectedEmptyMsg { - t.Errorf("expectedEmptyMsg %v, got %v", tt.expectedEmptyMsg, code.Message() == "") + _, got := pl.PostFilter(ctx, framework.NewCycleState(), tt.pod, nodeStatusMap) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("Want %v, but got %v", tt.want, got) } }) } diff --git a/test/integration/coscheduling_test.go b/test/integration/coscheduling_test.go index 6a4c88665..4a4e2c9f9 100644 --- a/test/integration/coscheduling_test.go +++ b/test/integration/coscheduling_test.go @@ -23,7 +23,7 @@ import ( "time" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" @@ -35,12 +35,12 @@ import ( fwkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" imageutils "k8s.io/kubernetes/test/utils/image" + "sigs.k8s.io/controller-runtime/pkg/client" schedconfig "sigs.k8s.io/scheduler-plugins/apis/config" "sigs.k8s.io/scheduler-plugins/apis/scheduling" "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" "sigs.k8s.io/scheduler-plugins/pkg/coscheduling" - "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" "sigs.k8s.io/scheduler-plugins/test/util" ) @@ -49,7 +49,7 @@ func TestCoschedulingPlugin(t *testing.T) { testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background()) cs := kubernetes.NewForConfigOrDie(globalKubeConfig) - extClient := versioned.NewForConfigOrDie(globalKubeConfig) + extClient := util.NewClientOrDie(globalKubeConfig) testCtx.ClientSet = cs testCtx.KubeConfig = globalKubeConfig @@ -382,7 +382,7 @@ func TestPodgroupBackoff(t *testing.T) { testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background()) cs := kubernetes.NewForConfigOrDie(globalKubeConfig) - extClient := versioned.NewForConfigOrDie(globalKubeConfig) + extClient := util.NewClientOrDie(globalKubeConfig) testCtx.ClientSet = cs testCtx.KubeConfig = globalKubeConfig @@ -561,18 +561,17 @@ func WithContainer(pod *v1.Pod, image string) *v1.Pod { return pod } -func createPodGroups(ctx context.Context, client versioned.Interface, podGroups []*v1alpha1.PodGroup) error { +func createPodGroups(ctx context.Context, c client.Client, podGroups []*v1alpha1.PodGroup) error { for _, pg := range podGroups { - _, err := client.SchedulingV1alpha1().PodGroups(pg.Namespace).Create(ctx, pg, metav1.CreateOptions{}) - if err != nil && !errors.IsAlreadyExists(err) { + if err := c.Create(ctx, pg); err != nil && !apierrors.IsAlreadyExists(err) { return err } } return nil } -func cleanupPodGroups(ctx context.Context, client versioned.Interface, podGroups []*v1alpha1.PodGroup) { +func cleanupPodGroups(ctx context.Context, c client.Client, podGroups []*v1alpha1.PodGroup) { for _, pg := range podGroups { - _ = client.SchedulingV1alpha1().PodGroups(pg.Namespace).Delete(ctx, pg.Name, metav1.DeleteOptions{}) + _ = c.Delete(ctx, pg) } } diff --git a/test/integration/podgroup_controller_test.go b/test/integration/podgroup_controller_test.go index 9444ed263..f24a3b016 100644 --- a/test/integration/podgroup_controller_test.go +++ b/test/integration/podgroup_controller_test.go @@ -25,6 +25,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" @@ -41,7 +42,6 @@ import ( "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" "sigs.k8s.io/scheduler-plugins/pkg/controllers" "sigs.k8s.io/scheduler-plugins/pkg/coscheduling" - "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" "sigs.k8s.io/scheduler-plugins/test/util" gocmp "github.com/google/go-cmp/cmp" @@ -53,7 +53,7 @@ func TestPodGroupController(t *testing.T) { testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background()) cs := kubernetes.NewForConfigOrDie(globalKubeConfig) - extClient := versioned.NewForConfigOrDie(globalKubeConfig) + extClient := util.NewClientOrDie(globalKubeConfig) testCtx.ClientSet = cs testCtx.KubeConfig = globalKubeConfig @@ -301,8 +301,8 @@ func TestPodGroupController(t *testing.T) { if err := wait.Poll(time.Millisecond*200, 10*time.Second, func() (bool, error) { for _, v := range tt.intermediatePGState { - pg, err := extClient.SchedulingV1alpha1().PodGroups(v.Namespace).Get(testCtx.Ctx, v.Name, metav1.GetOptions{}) - if err != nil { + var pg v1alpha1.PodGroup + if err := extClient.Get(testCtx.Ctx, types.NamespacedName{Namespace: v.Namespace, Name: v.Name}, &pg); err != nil { // This could be a connection error so we want to retry. klog.ErrorS(err, "Failed to obtain the PodGroup clientSet") return false, err @@ -337,8 +337,8 @@ func TestPodGroupController(t *testing.T) { if err := wait.Poll(time.Millisecond*200, 10*time.Second, func() (bool, error) { for _, v := range tt.expectedPGState { - pg, err := extClient.SchedulingV1alpha1().PodGroups(v.Namespace).Get(testCtx.Ctx, v.Name, metav1.GetOptions{}) - if err != nil { + var pg v1alpha1.PodGroup + if err := extClient.Get(testCtx.Ctx, types.NamespacedName{Namespace: v.Namespace, Name: v.Name}, &pg); err != nil { // This could be a connection error so we want to retry. klog.ErrorS(err, "Failed to obtain the PodGroup clientSet") return false, err diff --git a/test/util/fake.go b/test/util/fake.go index 888f09490..f99dd90ec 100644 --- a/test/util/fake.go +++ b/test/util/fake.go @@ -20,10 +20,16 @@ import ( "sync" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + clientscheme "k8s.io/client-go/kubernetes/scheme" listersv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/rest" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" ) var _ framework.SharedLister = &fakeSharedLister{} @@ -245,3 +251,32 @@ func NewPodNominator(podLister listersv1.PodLister) framework.PodNominator { func NominatedNodeName(pod *v1.Pod) string { return pod.Status.NominatedNodeName } + +// NewFakeClient returns a generic controller-runtime client with all given `objs` as internal runtime objects. +// It also registers core v1 scheme and this repo's v1alpha1 scheme. +// This function is used by unit tests. +func NewFakeClient(objs ...runtime.Object) (client.Client, error) { + scheme := runtime.NewScheme() + if err := v1.AddToScheme(scheme); err != nil { + return nil, err + } + if err := v1alpha1.AddToScheme(scheme); err != nil { + return nil, err + } + return fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objs...).Build(), nil +} + +// NewClientOrDie returns a generic controller-runtime client or panic upon any error. +// This function is used by integration tests. +func NewClientOrDie(cfg *rest.Config) client.Client { + scheme := runtime.NewScheme() + _ = clientscheme.AddToScheme(scheme) + _ = v1.AddToScheme(scheme) + _ = v1alpha1.AddToScheme(scheme) + + c, err := client.New(cfg, client.Options{Scheme: scheme}) + if err != nil { + panic(err) + } + return c +} diff --git a/test/util/podgroup_wrapper.go b/test/util/podgroup_wrapper.go new file mode 100644 index 000000000..ce6f6268a --- /dev/null +++ b/test/util/podgroup_wrapper.go @@ -0,0 +1,70 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + + "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" +) + +type PodGroupWrapper struct{ v1alpha1.PodGroup } + +func MakePodGroup() *PodGroupWrapper { + return &PodGroupWrapper{v1alpha1.PodGroup{}} +} + +func (p *PodGroupWrapper) Obj() *v1alpha1.PodGroup { + return &p.PodGroup +} + +func (p *PodGroupWrapper) Name(s string) *PodGroupWrapper { + p.SetName(s) + return p +} + +func (p *PodGroupWrapper) Namespace(s string) *PodGroupWrapper { + p.SetNamespace(s) + return p +} + +func (p *PodGroupWrapper) MinMember(i int32) *PodGroupWrapper { + p.Spec.MinMember = i + return p +} + +func (p *PodGroupWrapper) Time(t time.Time) *PodGroupWrapper { + p.CreationTimestamp.Time = t + return p +} + +func (p *PodGroupWrapper) MinResources(resources map[v1.ResourceName]string) *PodGroupWrapper { + res := make(v1.ResourceList) + for name, value := range resources { + res[name] = resource.MustParse(value) + } + p.PodGroup.Spec.MinResources = res + return p +} + +func (p *PodGroupWrapper) Phase(phase v1alpha1.PodGroupPhase) *PodGroupWrapper { + p.Status.Phase = phase + return p +}