From ed5ca785f04ea878b960b93430fce05699b8fa45 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Mon, 5 Aug 2024 19:55:14 +0200 Subject: [PATCH 1/3] E2e for updating JobSet on suspend --- test/e2e/e2e_test.go | 71 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 68 insertions(+), 3 deletions(-) diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 8c713a721..d2f9e0f9b 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -24,6 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" "sigs.k8s.io/jobset/pkg/util/testing" @@ -112,7 +113,7 @@ var _ = ginkgo.Describe("JobSet", func() { // Create JobSet. testFinalizer := "fake.example.com/blockDeletion" ginkgo.By("creating jobset with ttl seconds after finished") - js := sleepTestJobSet(ns).Finalizers([]string{testFinalizer}).TTLSecondsAfterFinished(5).Obj() + js := sleepTestJobSet(ns, 20).Finalizers([]string{testFinalizer}).TTLSecondsAfterFinished(5).Obj() // Verify jobset created successfully. ginkgo.By("checking that jobset creation succeeds") @@ -131,6 +132,70 @@ var _ = ginkgo.Describe("JobSet", func() { }) }) + // This test is added to test the JobSet transitions as Kueue would when: + // doing: resume in RF1 -> suspend -> resume in RF2. + // In particular, Kueue updates the PodTemplate on suspending and resuming + // the JobSet. + ginkgo.When("JobSet is suspended and resumed", func() { + + ginkgo.It("should allow to resume JobSet after updating PodTemplate", func() { + ctx := context.Background() + js := sleepTestJobSet(ns, 1).Obj() + jsKey := types.NamespacedName{Name: js.Name, Namespace: js.Namespace} + + ginkgo.By("Create a suspended JobSet", func() { + js.Spec.Suspend = ptr.To(true) + js.Spec.TTLSecondsAfterFinished = ptr.To[int32](5) + gomega.Expect(k8sClient.Create(ctx, js)).Should(gomega.Succeed()) + }) + + ginkgo.By("Unsuspend the JobSet setting nodeSelectors that prevent pods from being scheduled", func() { + gomega.Eventually(func() error { + gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed()) + js.Spec.Suspend = ptr.To(false) + podTemplate := &js.Spec.ReplicatedJobs[0].Template.Spec.Template + if podTemplate.Spec.NodeSelector == nil { + podTemplate.Spec.NodeSelector = make(map[string]string) + } + podTemplate.Spec.NodeSelector["kubernetes.io/os"] = "non-existing-os" + return k8sClient.Update(ctx, js) + }, timeout, interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Await for all Jobs to be active", func() { + gomega.Eventually(func() int32 { + gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed()) + if js.Status.ReplicatedJobsStatus == nil { + return 0 + } + return js.Status.ReplicatedJobsStatus[0].Active + }, timeout, interval).Should(gomega.Equal(js.Spec.ReplicatedJobs[0].Replicas)) + }) + + ginkgo.By("Suspend the JobSet updating the PodTemplate properties", func() { + gomega.Eventually(func() error { + gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed()) + js.Spec.Suspend = ptr.To(true) + podTemplate := &js.Spec.ReplicatedJobs[0].Template.Spec.Template + podTemplate.Spec.NodeSelector["kubernetes.io/os"] = "linux" + return k8sClient.Update(ctx, js) + }, timeout, interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Unsuspending the JobSet again with PodTemplate allowing completion", func() { + gomega.Eventually(func() error { + gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed()) + js.Spec.Suspend = ptr.To(false) + return k8sClient.Update(ctx, js) + }, timeout, interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Await for the JobSet to complete successfully", func() { + util.JobSetCompleted(ctx, k8sClient, js, timeout) + }) + }) + }) + }) // end of Describe // getPingCommand returns ping command for 4 hostnames @@ -225,7 +290,7 @@ func pingTestJobSetSubdomain(ns *corev1.Namespace) *testing.JobSetWrapper { Obj()) } -func sleepTestJobSet(ns *corev1.Namespace) *testing.JobSetWrapper { +func sleepTestJobSet(ns *corev1.Namespace, durationSeconds int32) *testing.JobSetWrapper { jsName := "js" rjobName := "rjob" replicas := 4 @@ -239,7 +304,7 @@ func sleepTestJobSet(ns *corev1.Namespace) *testing.JobSetWrapper { Name: "sleep-test-container", Image: "bash:latest", Command: []string{"bash", "-c"}, - Args: []string{"sleep 20"}, + Args: []string{fmt.Sprintf("sleep %d", durationSeconds)}, }, }, }).Obj()). From c2d148c13cba44d8ba34cf75aaf98640abebfdd0 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Fri, 9 Aug 2024 20:30:21 +0200 Subject: [PATCH 2/3] Allow to mutate the PodTemplate in JobSet on suspend --- pkg/webhooks/jobset_webhook.go | 4 +-- pkg/webhooks/jobset_webhook_test.go | 43 +++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/pkg/webhooks/jobset_webhook.go b/pkg/webhooks/jobset_webhook.go index f66395901..3cdf589f0 100644 --- a/pkg/webhooks/jobset_webhook.go +++ b/pkg/webhooks/jobset_webhook.go @@ -314,9 +314,9 @@ func (j *jobSetWebhook) ValidateUpdate(ctx context.Context, old, newObj runtime. } mungedSpec := js.Spec.DeepCopy() - // Allow pod template to be mutated for suspended JobSets. + // Allow pod template to be mutated for suspended JobSets, or JobSets getting suspended. // This is needed for integration with Kueue/DWS. - if ptr.Deref(oldJS.Spec.Suspend, false) { + if ptr.Deref(oldJS.Spec.Suspend, false) || ptr.Deref(js.Spec.Suspend, false) { for index := range js.Spec.ReplicatedJobs { // Pod values which must be mutable for Kueue are defined here: https://github.com/kubernetes-sigs/kueue/blob/a50d395c36a2cb3965be5232162cf1fded1bdb08/apis/kueue/v1beta1/workload_types.go#L256-L260 mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Annotations = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Annotations diff --git a/pkg/webhooks/jobset_webhook_test.go b/pkg/webhooks/jobset_webhook_test.go index be38b4b54..e86a6e4ae 100644 --- a/pkg/webhooks/jobset_webhook_test.go +++ b/pkg/webhooks/jobset_webhook_test.go @@ -1640,6 +1640,49 @@ func TestValidateUpdate(t *testing.T) { }, }, }, + { + name: "replicated job pod template can be updated for jobset getting suspended", + js: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + Suspend: ptr.To(true), + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "test-jobset-replicated-job-0", + Replicas: 2, + Template: batchv1.JobTemplateSpec{ + // Adding an annotation. + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{"key": "value"}, + }, + }, + }, + }, + }, + }, + }, + }, + oldJs: &jobset.JobSet{ + ObjectMeta: validObjectMeta, + Spec: jobset.JobSetSpec{ + Suspend: ptr.To(false), + ReplicatedJobs: []jobset.ReplicatedJob{ + { + Name: "test-jobset-replicated-job-0", + Replicas: 2, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + }, + }, + }, + }, + }, + }, + }, { name: "replicated job pod template cannot be updated for running jobset", js: &jobset.JobSet{ From c89e112bbc6a6c49eefd723956a1cf58a1bd3ff7 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Fri, 9 Aug 2024 21:16:13 +0200 Subject: [PATCH 3/3] Review remarks --- test/e2e/e2e_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index d2f9e0f9b..9842db40c 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -133,7 +133,7 @@ var _ = ginkgo.Describe("JobSet", func() { }) // This test is added to test the JobSet transitions as Kueue would when: - // doing: resume in RF1 -> suspend -> resume in RF2. + // doing: resume in ResourceFlavor1 -> suspend -> resume in ResourceFlavor2. // In particular, Kueue updates the PodTemplate on suspending and resuming // the JobSet. ginkgo.When("JobSet is suspended and resumed", func() { @@ -145,7 +145,6 @@ var _ = ginkgo.Describe("JobSet", func() { ginkgo.By("Create a suspended JobSet", func() { js.Spec.Suspend = ptr.To(true) - js.Spec.TTLSecondsAfterFinished = ptr.To[int32](5) gomega.Expect(k8sClient.Create(ctx, js)).Should(gomega.Succeed()) }) @@ -163,6 +162,9 @@ var _ = ginkgo.Describe("JobSet", func() { }) ginkgo.By("Await for all Jobs to be active", func() { + // In this step the Pods remain Pending due to the nodeSelector + // which does not match any nodes. Still, JobSet considers as + // active any Jobs which have at least one Pending or Running Pod. gomega.Eventually(func() int32 { gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed()) if js.Status.ReplicatedJobsStatus == nil {