diff --git a/charts/kueue/templates/rbac/role.yaml b/charts/kueue/templates/rbac/role.yaml index c7ed355a1e..e1af9b089b 100644 --- a/charts/kueue/templates/rbac/role.yaml +++ b/charts/kueue/templates/rbac/role.yaml @@ -135,6 +135,7 @@ rules: - jobs/status verbs: - get + - patch - update - apiGroups: - flowcontrol.apiserver.k8s.io @@ -179,6 +180,7 @@ rules: - jobsets/status verbs: - get + - patch - update - apiGroups: - kubeflow.org diff --git a/config/components/rbac/role.yaml b/config/components/rbac/role.yaml index c7ed22eb2d..48da53c4c0 100644 --- a/config/components/rbac/role.yaml +++ b/config/components/rbac/role.yaml @@ -134,6 +134,7 @@ rules: - jobs/status verbs: - get + - patch - update - apiGroups: - flowcontrol.apiserver.k8s.io @@ -178,6 +179,7 @@ rules: - jobsets/status verbs: - get + - patch - update - apiGroups: - kubeflow.org diff --git a/pkg/controller/jobframework/reconciler.go b/pkg/controller/jobframework/reconciler.go index 97bacba304..d83fcdce51 100644 --- a/pkg/controller/jobframework/reconciler.go +++ b/pkg/controller/jobframework/reconciler.go @@ -45,6 +45,7 @@ import ( "sigs.k8s.io/kueue/pkg/podset" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/util/admissioncheck" + clientutil "sigs.k8s.io/kueue/pkg/util/client" "sigs.k8s.io/kueue/pkg/util/equality" "sigs.k8s.io/kueue/pkg/util/kubeversion" "sigs.k8s.io/kueue/pkg/util/maps" @@ -293,9 +294,10 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques log.Error(err, "couldn't get the parent job workload") return ctrl.Result{}, err } else if parentWorkload == nil || !workload.IsAdmitted(parentWorkload) { + objectOriginal := object.DeepCopyObject().(client.Object) // suspend it job.Suspend() - if err := r.client.Update(ctx, object); err != nil { + if err := clientutil.Patch(ctx, r.client, objectOriginal, object); err != nil { log.Error(err, "suspending child job failed") return ctrl.Result{}, err } @@ -798,11 +800,11 @@ func (r *JobReconciler) startJob(ctx context.Context, job GenericJob, object cli return err } } else { + objectOriginal := object.DeepCopyObject().(client.Object) if runErr := job.RunWithPodSetsInfo(info); runErr != nil { return runErr } - - if err := r.client.Update(ctx, object); err != nil { + if err := clientutil.Patch(ctx, r.client, objectOriginal, object); err != nil { return err } r.record.Event(object, corev1.EventTypeNormal, ReasonStarted, msg) @@ -844,11 +846,13 @@ func (r *JobReconciler) stopJob(ctx context.Context, job GenericJob, wl *kueue.W return nil } + objectOriginal := object.DeepCopyObject().(client.Object) + job.Suspend() if info != nil { job.RestorePodSetsInfo(info) } - if err := r.client.Update(ctx, object); err != nil { + if err := clientutil.Patch(ctx, r.client, objectOriginal, object); err != nil { return err } diff --git a/pkg/controller/jobs/job/job_controller.go b/pkg/controller/jobs/job/job_controller.go index 5b4fd8c4bb..56f965a61d 100644 --- a/pkg/controller/jobs/job/job_controller.go +++ b/pkg/controller/jobs/job/job_controller.go @@ -41,6 +41,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/podset" + clientutil "sigs.k8s.io/kueue/pkg/util/client" ) var ( @@ -68,7 +69,7 @@ func init() { // +kubebuilder:rbac:groups=scheduling.k8s.io,resources=priorityclasses,verbs=list;get;watch // +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch // +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;update;patch -// +kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get;update +// +kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get;update;patch // +kubebuilder:rbac:groups=batch,resources=jobs/finalizers,verbs=get;update;patch // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch @@ -162,28 +163,39 @@ func (j *Job) Suspend() { j.Spec.Suspend = ptr.To(true) } -func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, _ jobframework.StopReason, eventMsg string) (bool, error) { +func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, _ jobframework.StopReason, _ string) (bool, error) { stoppedNow := false if !j.IsSuspended() { + objectOriginal := j.Object().DeepCopyObject().(client.Object) j.Suspend() - if err := c.Update(ctx, j.Object()); err != nil { + object := j.Object() + if err := clientutil.Patch(ctx, c, objectOriginal, object); err != nil { return false, fmt.Errorf("suspend: %w", err) } + if err := c.Get(ctx, client.ObjectKeyFromObject(object), object); err != nil { + return false, fmt.Errorf("get after suspend: %w", err) + } stoppedNow = true } // Reset start time if necessary, so we can update the scheduling directives. if j.Status.StartTime != nil { + objectOriginal := j.Object().DeepCopyObject().(client.Object) j.Status.StartTime = nil - if err := c.Status().Update(ctx, j.Object()); err != nil { + object := j.Object() + if err := clientutil.PatchStatus(ctx, c, objectOriginal, object); err != nil { return stoppedNow, fmt.Errorf("reset status: %w", err) } + if err := c.Get(ctx, client.ObjectKeyFromObject(object), object); err != nil { + return false, fmt.Errorf("get after resetting the start time: %w", err) + } } + objectOriginal := j.Object().DeepCopyObject().(client.Object) if changed := j.RestorePodSetsInfo(podSetsInfo); !changed { return stoppedNow, nil } - if err := c.Update(ctx, j.Object()); err != nil { + if err := clientutil.Patch(ctx, c, objectOriginal, j.Object()); err != nil { return false, fmt.Errorf("restore info: %w", err) } return stoppedNow, nil diff --git a/pkg/controller/jobs/job/job_multikueue_adapter.go b/pkg/controller/jobs/job/job_multikueue_adapter.go index b07b7a4f0c..a96331e125 100644 --- a/pkg/controller/jobs/job/job_multikueue_adapter.go +++ b/pkg/controller/jobs/job/job_multikueue_adapter.go @@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + package job import ( @@ -34,6 +35,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/util/api" + clientutil "sigs.k8s.io/kueue/pkg/util/client" ) type multikueueAdapter struct{} @@ -41,14 +43,15 @@ type multikueueAdapter struct{} var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil) func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { - localJob := batchv1.Job{} - err := localClient.Get(ctx, key, &localJob) + localJob := &batchv1.Job{} + err := localClient.Get(ctx, key, localJob) if err != nil { return err } + localJobOriginal := localJob.DeepCopy() - remoteJob := batchv1.Job{} - err = remoteClient.Get(ctx, key, &remoteJob) + remoteJob := &batchv1.Job{} + err = remoteClient.Get(ctx, key, remoteJob) if client.IgnoreNotFound(err) != nil { return err } @@ -57,7 +60,7 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie if err == nil { if features.Enabled(features.MultiKueueBatchJobWithManagedBy) { localJob.Status = remoteJob.Status - return localClient.Status().Update(ctx, &localJob) + return clientutil.PatchStatus(ctx, localClient, localJobOriginal, localJob) } remoteFinished := false for _, c := range remoteJob.Status.Conditions { @@ -66,15 +69,14 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie break } } - if remoteFinished { localJob.Status = remoteJob.Status - return localClient.Status().Update(ctx, &localJob) + return clientutil.PatchStatus(ctx, localClient, localJobOriginal, localJob) } return nil } - remoteJob = batchv1.Job{ + remoteJob = &batchv1.Job{ ObjectMeta: api.CloneObjectMetaForCreation(&localJob.ObjectMeta), Spec: *localJob.Spec.DeepCopy(), } @@ -99,7 +101,7 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie remoteJob.Spec.ManagedBy = nil } - return remoteClient.Create(ctx, &remoteJob) + return remoteClient.Create(ctx, remoteJob) } func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error { diff --git a/pkg/controller/jobs/jobset/jobset_controller.go b/pkg/controller/jobs/jobset/jobset_controller.go index 36ae0d7b94..57a388e090 100644 --- a/pkg/controller/jobs/jobset/jobset_controller.go +++ b/pkg/controller/jobs/jobset/jobset_controller.go @@ -56,7 +56,7 @@ func init() { // +kubebuilder:rbac:groups=scheduling.k8s.io,resources=priorityclasses,verbs=list;get;watch // +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch // +kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets,verbs=get;list;watch;update;patch -// +kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/status,verbs=get;update +// +kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/status,verbs=get;update;patch // +kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/finalizers,verbs=get;update // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch diff --git a/pkg/controller/jobs/jobset/jobset_multikueue_adapter.go b/pkg/controller/jobs/jobset/jobset_multikueue_adapter.go index 4dbd31b040..4383d34bb0 100644 --- a/pkg/controller/jobs/jobset/jobset_multikueue_adapter.go +++ b/pkg/controller/jobs/jobset/jobset_multikueue_adapter.go @@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + package jobset import ( @@ -32,6 +33,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/util/api" + clientutil "sigs.k8s.io/kueue/pkg/util/client" ) type multikueueAdapter struct{} @@ -39,14 +41,15 @@ type multikueueAdapter struct{} var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil) func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error { - localJob := jobset.JobSet{} - err := localClient.Get(ctx, key, &localJob) + localJob := &jobset.JobSet{} + err := localClient.Get(ctx, key, localJob) if err != nil { return err } + localJobOriginal := localJob.DeepCopy() - remoteJob := jobset.JobSet{} - err = remoteClient.Get(ctx, key, &remoteJob) + remoteJob := &jobset.JobSet{} + err = remoteClient.Get(ctx, key, remoteJob) if client.IgnoreNotFound(err) != nil { return err } @@ -54,10 +57,10 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie // if the remote exists, just copy the status if err == nil { localJob.Status = remoteJob.Status - return localClient.Status().Update(ctx, &localJob) + return clientutil.PatchStatus(ctx, localClient, localJobOriginal, localJob) } - remoteJob = jobset.JobSet{ + remoteJob = &jobset.JobSet{ ObjectMeta: api.CloneObjectMetaForCreation(&localJob.ObjectMeta), Spec: *localJob.Spec.DeepCopy(), } @@ -72,7 +75,7 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie // clear the managedBy enables the JobSet controller to take over remoteJob.Spec.ManagedBy = nil - return remoteClient.Create(ctx, &remoteJob) + return remoteClient.Create(ctx, remoteJob) } func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error { diff --git a/pkg/util/client/client.go b/pkg/util/client/client.go index d7192e7f84..df24bf1e5f 100644 --- a/pkg/util/client/client.go +++ b/pkg/util/client/client.go @@ -41,3 +41,14 @@ func Patch(ctx context.Context, c client.Client, before, after client.Object) er } return nil } + +func PatchStatus(ctx context.Context, c client.Client, before, after client.Object) error { + patch, err := CreatePatch(before, after) + if err != nil { + return err + } + if err = c.Status().Patch(ctx, before, patch); err != nil { + return err + } + return nil +}