From 00f4d520db7cc41042c911d8f953e35075e4ea84 Mon Sep 17 00:00:00 2001 From: WEI-YU YEN Date: Wed, 29 May 2024 22:09:33 -0700 Subject: [PATCH] fix volcano podgroup update issue (#2079) * fix volcano podgroup update issue Signed-off-by: Weiyu Yen * queue value shouldn't be reset once it has been set Signed-off-by: Weiyu Yen * make queue immutable Signed-off-by: Weiyu Yen * add unit test Signed-off-by: Weiyu Yen * add retry for update operation Signed-off-by: Weiyu Yen --------- Signed-off-by: Weiyu Yen --- manifests/base/crds/kubeflow.org_mpijobs.yaml | 3 ++ manifests/base/crds/kubeflow.org_mxjobs.yaml | 3 ++ .../base/crds/kubeflow.org_paddlejobs.yaml | 3 ++ .../base/crds/kubeflow.org_pytorchjobs.yaml | 3 ++ manifests/base/crds/kubeflow.org_tfjobs.yaml | 3 ++ .../base/crds/kubeflow.org_xgboostjobs.yaml | 3 ++ pkg/apis/kubeflow.org/v1/common_types.go | 3 +- pkg/controller.v1/common/job.go | 5 ++++ .../pytorch/pytorchjob_controller_test.go | 28 +++++++++++++++++++ 9 files changed, 53 insertions(+), 1 deletion(-) diff --git a/manifests/base/crds/kubeflow.org_mpijobs.yaml b/manifests/base/crds/kubeflow.org_mpijobs.yaml index 7938cd0797..4139d49fef 100644 --- a/manifests/base/crds/kubeflow.org_mpijobs.yaml +++ b/manifests/base/crds/kubeflow.org_mpijobs.yaml @@ -7330,6 +7330,9 @@ spec: type: string queue: type: string + x-kubernetes-validations: + - message: spec.runPolicy.schedulingPolicy.queue is immutable + rule: self == oldSelf scheduleTimeoutSeconds: format: int32 type: integer diff --git a/manifests/base/crds/kubeflow.org_mxjobs.yaml b/manifests/base/crds/kubeflow.org_mxjobs.yaml index ead44a5576..470f4a8f1a 100644 --- a/manifests/base/crds/kubeflow.org_mxjobs.yaml +++ b/manifests/base/crds/kubeflow.org_mxjobs.yaml @@ -7333,6 +7333,9 @@ spec: type: string queue: type: string + x-kubernetes-validations: + - message: spec.runPolicy.schedulingPolicy.queue is immutable + rule: self == oldSelf scheduleTimeoutSeconds: format: int32 type: integer diff --git a/manifests/base/crds/kubeflow.org_paddlejobs.yaml b/manifests/base/crds/kubeflow.org_paddlejobs.yaml index ee07b19763..c8a5acb8c3 100644 --- a/manifests/base/crds/kubeflow.org_paddlejobs.yaml +++ b/manifests/base/crds/kubeflow.org_paddlejobs.yaml @@ -7812,6 +7812,9 @@ spec: type: string queue: type: string + x-kubernetes-validations: + - message: spec.runPolicy.schedulingPolicy.queue is immutable + rule: self == oldSelf scheduleTimeoutSeconds: format: int32 type: integer diff --git a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml index 5286139833..a5c261e124 100644 --- a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml +++ b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml @@ -7849,6 +7849,9 @@ spec: type: string queue: type: string + x-kubernetes-validations: + - message: spec.runPolicy.schedulingPolicy.queue is immutable + rule: self == oldSelf scheduleTimeoutSeconds: format: int32 type: integer diff --git a/manifests/base/crds/kubeflow.org_tfjobs.yaml b/manifests/base/crds/kubeflow.org_tfjobs.yaml index c00c720a2e..153243aae6 100644 --- a/manifests/base/crds/kubeflow.org_tfjobs.yaml +++ b/manifests/base/crds/kubeflow.org_tfjobs.yaml @@ -90,6 +90,9 @@ spec: type: string queue: type: string + x-kubernetes-validations: + - message: spec.runPolicy.schedulingPolicy.queue is immutable + rule: self == oldSelf scheduleTimeoutSeconds: format: int32 type: integer diff --git a/manifests/base/crds/kubeflow.org_xgboostjobs.yaml b/manifests/base/crds/kubeflow.org_xgboostjobs.yaml index 9553e3b0a3..34237c6aba 100644 --- a/manifests/base/crds/kubeflow.org_xgboostjobs.yaml +++ b/manifests/base/crds/kubeflow.org_xgboostjobs.yaml @@ -86,6 +86,9 @@ spec: type: string queue: type: string + x-kubernetes-validations: + - message: spec.runPolicy.schedulingPolicy.queue is immutable + rule: self == oldSelf scheduleTimeoutSeconds: format: int32 type: integer diff --git a/pkg/apis/kubeflow.org/v1/common_types.go b/pkg/apis/kubeflow.org/v1/common_types.go index b880486f7e..42fd1d0dad 100644 --- a/pkg/apis/kubeflow.org/v1/common_types.go +++ b/pkg/apis/kubeflow.org/v1/common_types.go @@ -226,7 +226,8 @@ type RunPolicy struct { // SchedulingPolicy encapsulates various scheduling policies of the distributed training // job, for example `minAvailable` for gang-scheduling. type SchedulingPolicy struct { - MinAvailable *int32 `json:"minAvailable,omitempty"` + MinAvailable *int32 `json:"minAvailable,omitempty"` + // +kubebuilder:validation:XValidation:rule="self == oldSelf", message="spec.runPolicy.schedulingPolicy.queue is immutable" Queue string `json:"queue,omitempty"` MinResources *map[v1.ResourceName]resource.Quantity `json:"minResources,omitempty"` PriorityClass string `json:"priorityClass,omitempty"` diff --git a/pkg/controller.v1/common/job.go b/pkg/controller.v1/common/job.go index b4e7df83cc..825afdf8b1 100644 --- a/pkg/controller.v1/common/job.go +++ b/pkg/controller.v1/common/job.go @@ -284,6 +284,11 @@ func (jc *JobController) ReconcileJobs( if !match { return fmt.Errorf("unable to recognize PodGroup: %v", klog.KObj(pg)) } + + if q := volcanoPodGroup.Spec.Queue; len(q) > 0 { + queue = q + } + volcanoPodGroup.Spec = volcanov1beta1.PodGroupSpec{ MinMember: minMember, Queue: queue, diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller_test.go b/pkg/controller.v1/pytorch/pytorchjob_controller_test.go index e64dc65e03..b035251fa3 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller_test.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller_test.go @@ -195,6 +195,34 @@ var _ = Describe("PyTorchJob controller", func() { cond := getCondition(created.Status, kubeflowv1.JobSucceeded) Expect(cond.Status).To(Equal(corev1.ConditionTrue)) }) + It("Shouldn't be updated resources if spec.runPolicy.schedulingPolicy.queue is changed after the job is created", func() { + By("Creating a PyTorchJob with a specific queue") + job.Spec.RunPolicy.SchedulingPolicy = &kubeflowv1.SchedulingPolicy{} + job.Spec.RunPolicy.SchedulingPolicy.Queue = "initial-queue" + Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) + + By("Attempting to update the PyTorchJob with a different queue value") + updatedJob := &kubeflowv1.PyTorchJob{} + Eventually(func() bool { + err := testK8sClient.Get(ctx, jobKey, updatedJob) + return err == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue(), "Failed to get PyTorchJob") + + Eventually(func() bool { + updatedJob.Spec.RunPolicy.SchedulingPolicy.Queue = "test" + err := testK8sClient.Update(ctx, updatedJob) + By("Checking that the queue update fails") + Expect(err).To(HaveOccurred(), "Expected an error when updating the queue, but update succeeded") + Expect(err).To(MatchError(ContainSubstring("spec.runPolicy.schedulingPolicy.queue is immutable"), "The error message did not contain the expected message")) + return err != nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + + By("Validating the queue was not updated") + freshJob := &kubeflowv1.PyTorchJob{} + Expect(testK8sClient.Get(ctx, client.ObjectKeyFromObject(job), freshJob)).Should(Succeed(), "Failed to get PyTorchJob after update attempt") + Expect(freshJob.Spec.RunPolicy.SchedulingPolicy.Queue).To(Equal("initial-queue"), "The queue should remain as the initial value since it should be immutable") + + }) It("Shouldn't create resources if PyTorchJob is suspended", func() { By("By creating a new PyTorchJob with suspend=true")