Skip to content

Commit

Permalink
Add new multikueue test without use of GC. (#2381)
Browse files Browse the repository at this point in the history
Allow to modify manager controller garbage collector interval to be able to disable GC.
  • Loading branch information
mszadkow authored Jun 12, 2024
1 parent 0a7f87c commit 8370e98
Show file tree
Hide file tree
Showing 2 changed files with 235 additions and 83 deletions.
266 changes: 208 additions & 58 deletions test/integration/multikueue/multikueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import (
"sigs.k8s.io/kueue/test/util"
)

var _ = ginkgo.Describe("Multikueue", func() {
var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
var (
managerNs *corev1.Namespace
worker1Ns *corev1.Namespace
Expand All @@ -69,6 +69,15 @@ var _ = ginkgo.Describe("Multikueue", func() {
worker2Cq *kueue.ClusterQueue
worker2Lq *kueue.LocalQueue
)

ginkgo.BeforeAll(func() {
multiclusterSetup(2 * time.Second)
})

ginkgo.AfterAll(func() {
multiclusterTeardown()
})

ginkgo.BeforeEach(func() {
managerNs = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -760,63 +769,6 @@ var _ = ginkgo.Describe("Multikueue", func() {
})
})

ginkgo.It("Should remove the worker's workload and job when managers job is deleted", func() {
job := testingjob.MakeJob("job", managerNs.Name).
Queue(managerLq.Name).
Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, job)).Should(gomega.Succeed())

createdWorkload := &kueue.Workload{}
wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name, job.UID), Namespace: managerNs.Name}

ginkgo.By("setting workload reservation in the management cluster", func() {
admission := utiltesting.MakeAdmission(managerCq.Name).Obj()
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("checking the workload creation in the worker clusters", func() {
managerWl := &kueue.Workload{}
gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed())
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("setting workload reservation in worker1, the job is created in worker1", func() {
admission := utiltesting.MakeAdmission(managerCq.Name).Obj()

gomega.Eventually(func(g gomega.Gomega) {
g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(util.SetQuotaReservation(worker1TestCluster.ctx, worker1TestCluster.client, createdWorkload, admission)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

gomega.Eventually(func(g gomega.Gomega) {
createdJob := batchv1.Job{}
g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, client.ObjectKeyFromObject(job), &createdJob)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("removing the managers job and workload, the workload and job in worker1 are removed", func() {
gomega.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, job)).Should(gomega.Succeed())
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, createdWorkload)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

gomega.Eventually(func(g gomega.Gomega) {
createdJob := batchv1.Job{}
g.Expect(worker1TestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(utiltesting.BeNotFoundError())
g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, client.ObjectKeyFromObject(job), &createdJob)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
})

ginkgo.It("Should remove the worker's workload and job after reconnect when the managers job and workload are deleted", func() {
job := testingjob.MakeJob("job", managerNs.Name).
Queue(managerLq.Name).
Expand Down Expand Up @@ -1122,3 +1074,201 @@ var _ = ginkgo.Describe("Multikueue", func() {
})
})
})

var _ = ginkgo.Describe("Multikueue no GC", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
var (
managerNs *corev1.Namespace
worker1Ns *corev1.Namespace
worker2Ns *corev1.Namespace

managerMultikueueSecret1 *corev1.Secret
managerMultikueueSecret2 *corev1.Secret
workerCluster1 *kueuealpha.MultiKueueCluster
workerCluster2 *kueuealpha.MultiKueueCluster
managerMultiKueueConfig *kueuealpha.MultiKueueConfig
multikueueAC *kueue.AdmissionCheck
managerCq *kueue.ClusterQueue
managerLq *kueue.LocalQueue

worker1Cq *kueue.ClusterQueue
worker1Lq *kueue.LocalQueue

worker2Cq *kueue.ClusterQueue
worker2Lq *kueue.LocalQueue
)

ginkgo.BeforeAll(func() {
multiclusterSetup(0)
})

ginkgo.AfterAll(func() {
multiclusterTeardown()
})

ginkgo.BeforeEach(func() {
managerNs = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "multikueue-",
},
}
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerNs)).To(gomega.Succeed())

worker1Ns = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: managerNs.Name,
},
}
gomega.Expect(worker1TestCluster.client.Create(worker1TestCluster.ctx, worker1Ns)).To(gomega.Succeed())

worker2Ns = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: managerNs.Name,
},
}
gomega.Expect(worker2TestCluster.client.Create(worker2TestCluster.ctx, worker2Ns)).To(gomega.Succeed())

w1Kubeconfig, err := worker1TestCluster.kubeConfigBytes()
gomega.Expect(err).NotTo(gomega.HaveOccurred())

w2Kubeconfig, err := worker2TestCluster.kubeConfigBytes()
gomega.Expect(err).NotTo(gomega.HaveOccurred())

managerMultikueueSecret1 = &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "multikueue1",
Namespace: managersConfigNamespace.Name,
},
Data: map[string][]byte{
kueuealpha.MultiKueueConfigSecretKey: w1Kubeconfig,
},
}
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerMultikueueSecret1)).To(gomega.Succeed())

managerMultikueueSecret2 = &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "multikueue2",
Namespace: managersConfigNamespace.Name,
},
Data: map[string][]byte{
kueuealpha.MultiKueueConfigSecretKey: w2Kubeconfig,
},
}
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerMultikueueSecret2)).To(gomega.Succeed())

workerCluster1 = utiltesting.MakeMultiKueueCluster("worker1").KubeConfig(kueuealpha.SecretLocationType, managerMultikueueSecret1.Name).Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, workerCluster1)).To(gomega.Succeed())

workerCluster2 = utiltesting.MakeMultiKueueCluster("worker2").KubeConfig(kueuealpha.SecretLocationType, managerMultikueueSecret2.Name).Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, workerCluster2)).To(gomega.Succeed())

managerMultiKueueConfig = utiltesting.MakeMultiKueueConfig("multikueueconfig").Clusters(workerCluster1.Name, workerCluster2.Name).Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerMultiKueueConfig)).Should(gomega.Succeed())

multikueueAC = utiltesting.MakeAdmissionCheck("ac1").
ControllerName(multikueue.ControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", managerMultiKueueConfig.Name).
Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, multikueueAC)).Should(gomega.Succeed())

ginkgo.By("wait for check active", func() {
updatedAc := kueue.AdmissionCheck{}
acKey := client.ObjectKeyFromObject(multikueueAC)
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, acKey, &updatedAc)).To(gomega.Succeed())
cond := apimeta.FindStatusCondition(updatedAc.Status.Conditions, kueue.AdmissionCheckActive)
g.Expect(cond).NotTo(gomega.BeNil())
g.Expect(cond.Status).To(gomega.Equal(metav1.ConditionTrue), "Reason: %s, Message: %q", cond.Reason, cond.Message)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

managerCq = utiltesting.MakeClusterQueue("q1").
AdmissionChecks(multikueueAC.Name).
Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerCq)).Should(gomega.Succeed())

managerLq = utiltesting.MakeLocalQueue(managerCq.Name, managerNs.Name).ClusterQueue(managerCq.Name).Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, managerLq)).Should(gomega.Succeed())

worker1Cq = utiltesting.MakeClusterQueue("q1").Obj()
gomega.Expect(worker1TestCluster.client.Create(worker1TestCluster.ctx, worker1Cq)).Should(gomega.Succeed())
worker1Lq = utiltesting.MakeLocalQueue(worker1Cq.Name, worker1Ns.Name).ClusterQueue(worker1Cq.Name).Obj()
gomega.Expect(worker1TestCluster.client.Create(worker1TestCluster.ctx, worker1Lq)).Should(gomega.Succeed())

worker2Cq = utiltesting.MakeClusterQueue("q1").Obj()
gomega.Expect(worker2TestCluster.client.Create(worker2TestCluster.ctx, worker2Cq)).Should(gomega.Succeed())
worker2Lq = utiltesting.MakeLocalQueue(worker2Cq.Name, worker2Ns.Name).ClusterQueue(worker2Cq.Name).Obj()
gomega.Expect(worker2TestCluster.client.Create(worker2TestCluster.ctx, worker2Lq)).Should(gomega.Succeed())
})

ginkgo.AfterEach(func() {
gomega.Expect(util.DeleteNamespace(managerTestCluster.ctx, managerTestCluster.client, managerNs)).To(gomega.Succeed())
gomega.Expect(util.DeleteNamespace(worker1TestCluster.ctx, worker1TestCluster.client, worker1Ns)).To(gomega.Succeed())
gomega.Expect(util.DeleteNamespace(worker2TestCluster.ctx, worker2TestCluster.client, worker2Ns)).To(gomega.Succeed())
util.ExpectClusterQueueToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, managerCq, true)
util.ExpectClusterQueueToBeDeleted(worker1TestCluster.ctx, worker1TestCluster.client, worker1Cq, true)
util.ExpectClusterQueueToBeDeleted(worker2TestCluster.ctx, worker2TestCluster.client, worker2Cq, true)
util.ExpectAdmissionCheckToBeDeleted(managerTestCluster.ctx, managerTestCluster.client, multikueueAC, true)
gomega.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, managerMultiKueueConfig)).To(gomega.Succeed())
gomega.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, workerCluster1)).To(gomega.Succeed())
gomega.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, workerCluster2)).To(gomega.Succeed())
gomega.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, managerMultikueueSecret1)).To(gomega.Succeed())
gomega.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, managerMultikueueSecret2)).To(gomega.Succeed())
})

ginkgo.It("Should remove the worker's workload and job when managers job is deleted", func() {
job := testingjob.MakeJob("job", managerNs.Name).
Queue(managerLq.Name).
Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, job)).Should(gomega.Succeed())

createdWorkload := &kueue.Workload{}
wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name, job.UID), Namespace: managerNs.Name}

ginkgo.By("setting workload reservation in the management cluster", func() {
admission := utiltesting.MakeAdmission(managerCq.Name).Obj()
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("checking the workload creation in the worker clusters", func() {
managerWl := &kueue.Workload{}
gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed())
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("setting workload reservation in worker1, the job is created in worker1", func() {
admission := utiltesting.MakeAdmission(managerCq.Name).Obj()

gomega.Eventually(func(g gomega.Gomega) {
g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(util.SetQuotaReservation(worker1TestCluster.ctx, worker1TestCluster.client, createdWorkload, admission)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

gomega.Eventually(func(g gomega.Gomega) {
createdJob := batchv1.Job{}
g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, client.ObjectKeyFromObject(job), &createdJob)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("removing the managers job and workload, the workload and job in worker1 are removed", func() {
gomega.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, job)).Should(gomega.Succeed())
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, createdWorkload)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

gomega.Eventually(func(g gomega.Gomega) {
createdJob := batchv1.Job{}
g.Expect(worker1TestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(utiltesting.BeNotFoundError())
g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, client.ObjectKeyFromObject(job), &createdJob)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
})
})
52 changes: 27 additions & 25 deletions test/integration/multikueue/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,29 +96,6 @@ func createCluster(setupFnc framework.ManagerSetup, apiFeatureGates ...string) c
return c
}

var _ = ginkgo.BeforeSuite(func() {
var managerFeatureGates []string
version, err := versionutil.ParseGeneric(os.Getenv("ENVTEST_K8S_VERSION"))
if err != nil || !version.LessThan(versionutil.MustParseSemantic("1.30.0")) {
managerFeatureGates = []string{"JobManagedBy=true"}
}

managerTestCluster = createCluster(managerAndMultiKueueSetup, managerFeatureGates...)
worker1TestCluster = createCluster(managerSetup)
worker2TestCluster = createCluster(managerSetup)

discoveryClient, err := discovery.NewDiscoveryClientForConfig(managerTestCluster.cfg)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
managerK8sVersion, err = kubeversion.FetchServerVersion(discoveryClient)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
})

var _ = ginkgo.AfterSuite(func() {
managerTestCluster.fwk.Teardown()
worker1TestCluster.fwk.Teardown()
worker2TestCluster.fwk.Teardown()
})

func managerSetup(mgr manager.Manager, ctx context.Context) {
err := indexer.Setup(ctx, mgr.GetFieldIndexer())
gomega.Expect(err).NotTo(gomega.HaveOccurred())
Expand Down Expand Up @@ -160,7 +137,7 @@ func managerSetup(mgr manager.Manager, ctx context.Context) {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

func managerAndMultiKueueSetup(mgr manager.Manager, ctx context.Context) {
func managerAndMultiKueueSetup(mgr manager.Manager, ctx context.Context, gcInterval time.Duration) {
managerSetup(mgr, ctx)

managersConfigNamespace = &corev1.Namespace{
Expand All @@ -174,9 +151,34 @@ func managerAndMultiKueueSetup(mgr manager.Manager, ctx context.Context) {
gomega.Expect(err).NotTo(gomega.HaveOccurred())

err = multikueue.SetupControllers(mgr, managersConfigNamespace.Name,
multikueue.WithGCInterval(2*time.Second),
multikueue.WithGCInterval(gcInterval),
multikueue.WithWorkerLostTimeout(testingWorkerLostTimeout),
multikueue.WithEventsBatchPeriod(100*time.Millisecond),
)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

func multiclusterSetup(gcInterval time.Duration) {
var managerFeatureGates []string
version, err := versionutil.ParseGeneric(os.Getenv("ENVTEST_K8S_VERSION"))
if err != nil || !version.LessThan(versionutil.MustParseSemantic("1.30.0")) {
managerFeatureGates = []string{"JobManagedBy=true"}
}

managerTestCluster = createCluster(func(mgr manager.Manager, ctx context.Context) {
managerAndMultiKueueSetup(mgr, ctx, gcInterval)
}, managerFeatureGates...)
worker1TestCluster = createCluster(managerSetup)
worker2TestCluster = createCluster(managerSetup)

discoveryClient, err := discovery.NewDiscoveryClientForConfig(managerTestCluster.cfg)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
managerK8sVersion, err = kubeversion.FetchServerVersion(discoveryClient)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

func multiclusterTeardown() {
managerTestCluster.fwk.Teardown()
worker1TestCluster.fwk.Teardown()
worker2TestCluster.fwk.Teardown()
}

0 comments on commit 8370e98

Please sign in to comment.