Skip to content

Commit

Permalink
Use patch instead of update on jobframework.
Browse files Browse the repository at this point in the history
  • Loading branch information
mbobrovskyi committed Jul 8, 2024
1 parent ecca594 commit f3d4347
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 26 deletions.
2 changes: 2 additions & 0 deletions charts/kueue/templates/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ rules:
- jobs/status
verbs:
- get
- patch
- update
- apiGroups:
- flowcontrol.apiserver.k8s.io
Expand Down Expand Up @@ -179,6 +180,7 @@ rules:
- jobsets/status
verbs:
- get
- patch
- update
- apiGroups:
- kubeflow.org
Expand Down
2 changes: 2 additions & 0 deletions config/components/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ rules:
- jobs/status
verbs:
- get
- patch
- update
- apiGroups:
- flowcontrol.apiserver.k8s.io
Expand Down Expand Up @@ -178,6 +179,7 @@ rules:
- jobsets/status
verbs:
- get
- patch
- update
- apiGroups:
- kubeflow.org
Expand Down
12 changes: 8 additions & 4 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"sigs.k8s.io/kueue/pkg/podset"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
clientutil "sigs.k8s.io/kueue/pkg/util/client"
"sigs.k8s.io/kueue/pkg/util/equality"
"sigs.k8s.io/kueue/pkg/util/kubeversion"
"sigs.k8s.io/kueue/pkg/util/maps"
Expand Down Expand Up @@ -293,9 +294,10 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
log.Error(err, "couldn't get the parent job workload")
return ctrl.Result{}, err
} else if parentWorkload == nil || !workload.IsAdmitted(parentWorkload) {
objectOriginal := object.DeepCopyObject().(client.Object)
// suspend it
job.Suspend()
if err := r.client.Update(ctx, object); err != nil {
if err := clientutil.Patch(ctx, r.client, objectOriginal, object); err != nil {
log.Error(err, "suspending child job failed")
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -798,11 +800,11 @@ func (r *JobReconciler) startJob(ctx context.Context, job GenericJob, object cli
return err
}
} else {
objectOriginal := object.DeepCopyObject().(client.Object)
if runErr := job.RunWithPodSetsInfo(info); runErr != nil {
return runErr
}

if err := r.client.Update(ctx, object); err != nil {
if err := clientutil.Patch(ctx, r.client, objectOriginal, object); err != nil {
return err
}
r.record.Event(object, corev1.EventTypeNormal, ReasonStarted, msg)
Expand Down Expand Up @@ -844,11 +846,13 @@ func (r *JobReconciler) stopJob(ctx context.Context, job GenericJob, wl *kueue.W
return nil
}

objectOriginal := object.DeepCopyObject().(client.Object)

job.Suspend()
if info != nil {
job.RestorePodSetsInfo(info)
}
if err := r.client.Update(ctx, object); err != nil {
if err := clientutil.Patch(ctx, r.client, objectOriginal, object); err != nil {
return err
}

Expand Down
22 changes: 17 additions & 5 deletions pkg/controller/jobs/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/podset"
clientutil "sigs.k8s.io/kueue/pkg/util/client"
)

var (
Expand Down Expand Up @@ -68,7 +69,7 @@ func init() {
// +kubebuilder:rbac:groups=scheduling.k8s.io,resources=priorityclasses,verbs=list;get;watch
// +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get;update
// +kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=batch,resources=jobs/finalizers,verbs=get;update;patch
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch
Expand Down Expand Up @@ -162,28 +163,39 @@ func (j *Job) Suspend() {
j.Spec.Suspend = ptr.To(true)
}

func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, _ jobframework.StopReason, eventMsg string) (bool, error) {
func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, _ jobframework.StopReason, _ string) (bool, error) {
stoppedNow := false
if !j.IsSuspended() {
objectOriginal := j.Object().DeepCopyObject().(client.Object)
j.Suspend()
if err := c.Update(ctx, j.Object()); err != nil {
object := j.Object()
if err := clientutil.Patch(ctx, c, objectOriginal, object); err != nil {
return false, fmt.Errorf("suspend: %w", err)
}
if err := c.Get(ctx, client.ObjectKeyFromObject(object), object); err != nil {
return false, fmt.Errorf("get after suspend: %w", err)
}
stoppedNow = true
}

// Reset start time if necessary, so we can update the scheduling directives.
if j.Status.StartTime != nil {
objectOriginal := j.Object().DeepCopyObject().(client.Object)
j.Status.StartTime = nil
if err := c.Status().Update(ctx, j.Object()); err != nil {
object := j.Object()
if err := clientutil.PatchStatus(ctx, c, objectOriginal, object); err != nil {
return stoppedNow, fmt.Errorf("reset status: %w", err)
}
if err := c.Get(ctx, client.ObjectKeyFromObject(object), object); err != nil {
return false, fmt.Errorf("get after resetting the start time: %w", err)
}
}

objectOriginal := j.Object().DeepCopyObject().(client.Object)
if changed := j.RestorePodSetsInfo(podSetsInfo); !changed {
return stoppedNow, nil
}
if err := c.Update(ctx, j.Object()); err != nil {
if err := clientutil.Patch(ctx, c, objectOriginal, j.Object()); err != nil {
return false, fmt.Errorf("restore info: %w", err)
}
return stoppedNow, nil
Expand Down
20 changes: 11 additions & 9 deletions pkg/controller/jobs/job/job_multikueue_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package job

import (
Expand All @@ -34,21 +35,23 @@ import (
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/util/api"
clientutil "sigs.k8s.io/kueue/pkg/util/client"
)

type multikueueAdapter struct{}

var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil)

func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error {
localJob := batchv1.Job{}
err := localClient.Get(ctx, key, &localJob)
localJob := &batchv1.Job{}
err := localClient.Get(ctx, key, localJob)
if err != nil {
return err
}
localJobOriginal := localJob.DeepCopy()

remoteJob := batchv1.Job{}
err = remoteClient.Get(ctx, key, &remoteJob)
remoteJob := &batchv1.Job{}
err = remoteClient.Get(ctx, key, remoteJob)
if client.IgnoreNotFound(err) != nil {
return err
}
Expand All @@ -57,7 +60,7 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie
if err == nil {
if features.Enabled(features.MultiKueueBatchJobWithManagedBy) {
localJob.Status = remoteJob.Status
return localClient.Status().Update(ctx, &localJob)
return clientutil.PatchStatus(ctx, localClient, localJobOriginal, localJob)
}
remoteFinished := false
for _, c := range remoteJob.Status.Conditions {
Expand All @@ -66,15 +69,14 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie
break
}
}

if remoteFinished {
localJob.Status = remoteJob.Status
return localClient.Status().Update(ctx, &localJob)
return clientutil.PatchStatus(ctx, localClient, localJobOriginal, localJob)
}
return nil
}

remoteJob = batchv1.Job{
remoteJob = &batchv1.Job{
ObjectMeta: api.CloneObjectMetaForCreation(&localJob.ObjectMeta),
Spec: *localJob.Spec.DeepCopy(),
}
Expand All @@ -99,7 +101,7 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie
remoteJob.Spec.ManagedBy = nil
}

return remoteClient.Create(ctx, &remoteJob)
return remoteClient.Create(ctx, remoteJob)
}

func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobs/jobset/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func init() {
// +kubebuilder:rbac:groups=scheduling.k8s.io,resources=priorityclasses,verbs=list;get;watch
// +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch
// +kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/status,verbs=get;update
// +kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/finalizers,verbs=get;update
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch
Expand Down
17 changes: 10 additions & 7 deletions pkg/controller/jobs/jobset/jobset_multikueue_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package jobset

import (
Expand All @@ -32,32 +33,34 @@ import (
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/util/api"
clientutil "sigs.k8s.io/kueue/pkg/util/client"
)

type multikueueAdapter struct{}

var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil)

func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error {
localJob := jobset.JobSet{}
err := localClient.Get(ctx, key, &localJob)
localJob := &jobset.JobSet{}
err := localClient.Get(ctx, key, localJob)
if err != nil {
return err
}
localJobOriginal := localJob.DeepCopy()

remoteJob := jobset.JobSet{}
err = remoteClient.Get(ctx, key, &remoteJob)
remoteJob := &jobset.JobSet{}
err = remoteClient.Get(ctx, key, remoteJob)
if client.IgnoreNotFound(err) != nil {
return err
}

// if the remote exists, just copy the status
if err == nil {
localJob.Status = remoteJob.Status
return localClient.Status().Update(ctx, &localJob)
return clientutil.PatchStatus(ctx, localClient, localJobOriginal, localJob)
}

remoteJob = jobset.JobSet{
remoteJob = &jobset.JobSet{
ObjectMeta: api.CloneObjectMetaForCreation(&localJob.ObjectMeta),
Spec: *localJob.Spec.DeepCopy(),
}
Expand All @@ -72,7 +75,7 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie
// clear the managedBy enables the JobSet controller to take over
remoteJob.Spec.ManagedBy = nil

return remoteClient.Create(ctx, &remoteJob)
return remoteClient.Create(ctx, remoteJob)
}

func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error {
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,14 @@ func Patch(ctx context.Context, c client.Client, before, after client.Object) er
}
return nil
}

func PatchStatus(ctx context.Context, c client.Client, before, after client.Object) error {
patch, err := CreatePatch(before, after)
if err != nil {
return err
}
if err = c.Status().Patch(ctx, before, patch); err != nil {
return err
}
return nil
}

0 comments on commit f3d4347

Please sign in to comment.