From e632fe7d802884ba15dff7d09124fcef99f9093d Mon Sep 17 00:00:00 2001 From: Mykhailo Bobrovskyi Date: Tue, 9 Jul 2024 23:52:23 +0300 Subject: [PATCH] Revert "Use patch instead of update on jobframework. (#2501)" (#2565) This reverts commit 4d89521fc4e7f57007a796e70539b7f110b35845. --- charts/kueue/templates/rbac/role.yaml | 2 -- config/components/rbac/role.yaml | 2 -- pkg/controller/jobframework/reconciler.go | 12 ++++------ pkg/controller/jobs/job/job_controller.go | 22 +++++-------------- .../jobs/job/job_multikueue_adapter.go | 19 ++++++++-------- .../jobs/jobset/jobset_controller.go | 2 +- .../jobs/jobset/jobset_multikueue_adapter.go | 16 ++++++-------- pkg/util/client/client.go | 11 ---------- 8 files changed, 26 insertions(+), 60 deletions(-) diff --git a/charts/kueue/templates/rbac/role.yaml b/charts/kueue/templates/rbac/role.yaml index e1af9b089b..c7ed355a1e 100644 --- a/charts/kueue/templates/rbac/role.yaml +++ b/charts/kueue/templates/rbac/role.yaml @@ -135,7 +135,6 @@ rules: - jobs/status verbs: - get - - patch - update - apiGroups: - flowcontrol.apiserver.k8s.io @@ -180,7 +179,6 @@ rules: - jobsets/status verbs: - get - - patch - update - apiGroups: - kubeflow.org diff --git a/config/components/rbac/role.yaml b/config/components/rbac/role.yaml index 48da53c4c0..c7ed22eb2d 100644 --- a/config/components/rbac/role.yaml +++ b/config/components/rbac/role.yaml @@ -134,7 +134,6 @@ rules: - jobs/status verbs: - get - - patch - update - apiGroups: - flowcontrol.apiserver.k8s.io @@ -179,7 +178,6 @@ rules: - jobsets/status verbs: - get - - patch - update - apiGroups: - kubeflow.org diff --git a/pkg/controller/jobframework/reconciler.go b/pkg/controller/jobframework/reconciler.go index d83fcdce51..97bacba304 100644 --- a/pkg/controller/jobframework/reconciler.go +++ b/pkg/controller/jobframework/reconciler.go @@ -45,7 +45,6 @@ import ( "sigs.k8s.io/kueue/pkg/podset" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/util/admissioncheck" - clientutil "sigs.k8s.io/kueue/pkg/util/client" "sigs.k8s.io/kueue/pkg/util/equality" "sigs.k8s.io/kueue/pkg/util/kubeversion" "sigs.k8s.io/kueue/pkg/util/maps" @@ -294,10 +293,9 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques log.Error(err, "couldn't get the parent job workload") return ctrl.Result{}, err } else if parentWorkload == nil || !workload.IsAdmitted(parentWorkload) { - objectOriginal := object.DeepCopyObject().(client.Object) // suspend it job.Suspend() - if err := clientutil.Patch(ctx, r.client, objectOriginal, object); err != nil { + if err := r.client.Update(ctx, object); err != nil { log.Error(err, "suspending child job failed") return ctrl.Result{}, err } @@ -800,11 +798,11 @@ func (r *JobReconciler) startJob(ctx context.Context, job GenericJob, object cli return err } } else { - objectOriginal := object.DeepCopyObject().(client.Object) if runErr := job.RunWithPodSetsInfo(info); runErr != nil { return runErr } - if err := clientutil.Patch(ctx, r.client, objectOriginal, object); err != nil { + + if err := r.client.Update(ctx, object); err != nil { return err } r.record.Event(object, corev1.EventTypeNormal, ReasonStarted, msg) @@ -846,13 +844,11 @@ func (r *JobReconciler) stopJob(ctx context.Context, job GenericJob, wl *kueue.W return nil } - objectOriginal := object.DeepCopyObject().(client.Object) - job.Suspend() if info != nil { job.RestorePodSetsInfo(info) } - if err := clientutil.Patch(ctx, r.client, objectOriginal, object); err != nil { + if err := r.client.Update(ctx, object); err != nil { return err } diff --git a/pkg/controller/jobs/job/job_controller.go b/pkg/controller/jobs/job/job_controller.go index 56f965a61d..5b4fd8c4bb 100644 --- a/pkg/controller/jobs/job/job_controller.go +++ b/pkg/controller/jobs/job/job_controller.go @@ -41,7 +41,6 @@ import ( "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/podset" - clientutil "sigs.k8s.io/kueue/pkg/util/client" ) var ( @@ -69,7 +68,7 @@ func init() { // +kubebuilder:rbac:groups=scheduling.k8s.io,resources=priorityclasses,verbs=list;get;watch // +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch // +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;update;patch -// +kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get;update // +kubebuilder:rbac:groups=batch,resources=jobs/finalizers,verbs=get;update;patch // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch @@ -163,39 +162,28 @@ func (j *Job) Suspend() { j.Spec.Suspend = ptr.To(true) } -func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, _ jobframework.StopReason, _ string) (bool, error) { +func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, _ jobframework.StopReason, eventMsg string) (bool, error) { stoppedNow := false if !j.IsSuspended() { - objectOriginal := j.Object().DeepCopyObject().(client.Object) j.Suspend() - object := j.Object() - if err := clientutil.Patch(ctx, c, objectOriginal, object); err != nil { + if err := c.Update(ctx, j.Object()); err != nil { return false, fmt.Errorf("suspend: %w", err) } - if err := c.Get(ctx, client.ObjectKeyFromObject(object), object); err != nil { - return false, fmt.Errorf("get after suspend: %w", err) - } stoppedNow = true } // Reset start time if necessary, so we can update the scheduling directives. if j.Status.StartTime != nil { - objectOriginal := j.Object().DeepCopyObject().(client.Object) j.Status.StartTime = nil - object := j.Object() - if err := clientutil.PatchStatus(ctx, c, objectOriginal, object); err != nil { + if err := c.Status().Update(ctx, j.Object()); err != nil { return stoppedNow, fmt.Errorf("reset status: %w", err) } - if err := c.Get(ctx, client.ObjectKeyFromObject(object), object); err != nil { - return false, fmt.Errorf("get after resetting the start time: %w", err) - } } - objectOriginal := j.Object().DeepCopyObject().(client.Object) if changed := j.RestorePodSetsInfo(podSetsInfo); !changed { return stoppedNow, nil } - if err := clientutil.Patch(ctx, c, objectOriginal, j.Object()); err != nil { + if err := c.Update(ctx, j.Object()); err != nil { return false, fmt.Errorf("restore info: %w", err) } return stoppedNow, nil diff --git a/pkg/controller/jobs/job/job_multikueue_adapter.go b/pkg/controller/jobs/job/job_multikueue_adapter.go index a96331e125..136cb93eaa 100644 --- a/pkg/controller/jobs/job/job_multikueue_adapter.go +++ b/pkg/controller/jobs/job/job_multikueue_adapter.go @@ -35,7 +35,6 @@ import ( "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/util/api" - clientutil "sigs.k8s.io/kueue/pkg/util/client" ) type multikueueAdapter struct{} @@ -43,15 +42,14 @@ type multikueueAdapter struct{} var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil) func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { - localJob := &batchv1.Job{} - err := localClient.Get(ctx, key, localJob) + localJob := batchv1.Job{} + err := localClient.Get(ctx, key, &localJob) if err != nil { return err } - localJobOriginal := localJob.DeepCopy() - remoteJob := &batchv1.Job{} - err = remoteClient.Get(ctx, key, remoteJob) + remoteJob := batchv1.Job{} + err = remoteClient.Get(ctx, key, &remoteJob) if client.IgnoreNotFound(err) != nil { return err } @@ -60,7 +58,7 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie if err == nil { if features.Enabled(features.MultiKueueBatchJobWithManagedBy) { localJob.Status = remoteJob.Status - return clientutil.PatchStatus(ctx, localClient, localJobOriginal, localJob) + return localClient.Status().Update(ctx, &localJob) } remoteFinished := false for _, c := range remoteJob.Status.Conditions { @@ -69,14 +67,15 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie break } } + if remoteFinished { localJob.Status = remoteJob.Status - return clientutil.PatchStatus(ctx, localClient, localJobOriginal, localJob) + return localClient.Status().Update(ctx, &localJob) } return nil } - remoteJob = &batchv1.Job{ + remoteJob = batchv1.Job{ ObjectMeta: api.CloneObjectMetaForCreation(&localJob.ObjectMeta), Spec: *localJob.Spec.DeepCopy(), } @@ -101,7 +100,7 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie remoteJob.Spec.ManagedBy = nil } - return remoteClient.Create(ctx, remoteJob) + return remoteClient.Create(ctx, &remoteJob) } func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error { diff --git a/pkg/controller/jobs/jobset/jobset_controller.go b/pkg/controller/jobs/jobset/jobset_controller.go index 57a388e090..36ae0d7b94 100644 --- a/pkg/controller/jobs/jobset/jobset_controller.go +++ b/pkg/controller/jobs/jobset/jobset_controller.go @@ -56,7 +56,7 @@ func init() { // +kubebuilder:rbac:groups=scheduling.k8s.io,resources=priorityclasses,verbs=list;get;watch // +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch // +kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets,verbs=get;list;watch;update;patch -// +kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/status,verbs=get;update // +kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/finalizers,verbs=get;update // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch diff --git a/pkg/controller/jobs/jobset/jobset_multikueue_adapter.go b/pkg/controller/jobs/jobset/jobset_multikueue_adapter.go index 4383d34bb0..59a71698e5 100644 --- a/pkg/controller/jobs/jobset/jobset_multikueue_adapter.go +++ b/pkg/controller/jobs/jobset/jobset_multikueue_adapter.go @@ -33,7 +33,6 @@ import ( "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/util/api" - clientutil "sigs.k8s.io/kueue/pkg/util/client" ) type multikueueAdapter struct{} @@ -41,15 +40,14 @@ type multikueueAdapter struct{} var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil) func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { - localJob := &jobset.JobSet{} - err := localClient.Get(ctx, key, localJob) + localJob := jobset.JobSet{} + err := localClient.Get(ctx, key, &localJob) if err != nil { return err } - localJobOriginal := localJob.DeepCopy() - remoteJob := &jobset.JobSet{} - err = remoteClient.Get(ctx, key, remoteJob) + remoteJob := jobset.JobSet{} + err = remoteClient.Get(ctx, key, &remoteJob) if client.IgnoreNotFound(err) != nil { return err } @@ -57,10 +55,10 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie // if the remote exists, just copy the status if err == nil { localJob.Status = remoteJob.Status - return clientutil.PatchStatus(ctx, localClient, localJobOriginal, localJob) + return localClient.Status().Update(ctx, &localJob) } - remoteJob = &jobset.JobSet{ + remoteJob = jobset.JobSet{ ObjectMeta: api.CloneObjectMetaForCreation(&localJob.ObjectMeta), Spec: *localJob.Spec.DeepCopy(), } @@ -75,7 +73,7 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie // clear the managedBy enables the JobSet controller to take over remoteJob.Spec.ManagedBy = nil - return remoteClient.Create(ctx, remoteJob) + return remoteClient.Create(ctx, &remoteJob) } func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error { diff --git a/pkg/util/client/client.go b/pkg/util/client/client.go index df24bf1e5f..d7192e7f84 100644 --- a/pkg/util/client/client.go +++ b/pkg/util/client/client.go @@ -41,14 +41,3 @@ func Patch(ctx context.Context, c client.Client, before, after client.Object) er } return nil } - -func PatchStatus(ctx context.Context, c client.Client, before, after client.Object) error { - patch, err := CreatePatch(before, after) - if err != nil { - return err - } - if err = c.Status().Patch(ctx, before, patch); err != nil { - return err - } - return nil -}