Skip to content

Commit

Permalink
Revert "Use patch instead of update on jobframework. (#2501)" (#2565)
Browse files Browse the repository at this point in the history
This reverts commit 4d89521.
  • Loading branch information
mbobrovskyi committed Jul 9, 2024
1 parent 408f2cb commit e632fe7
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 60 deletions.
2 changes: 0 additions & 2 deletions charts/kueue/templates/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ rules:
- jobs/status
verbs:
- get
- patch
- update
- apiGroups:
- flowcontrol.apiserver.k8s.io
Expand Down Expand Up @@ -180,7 +179,6 @@ rules:
- jobsets/status
verbs:
- get
- patch
- update
- apiGroups:
- kubeflow.org
Expand Down
2 changes: 0 additions & 2 deletions config/components/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ rules:
- jobs/status
verbs:
- get
- patch
- update
- apiGroups:
- flowcontrol.apiserver.k8s.io
Expand Down Expand Up @@ -179,7 +178,6 @@ rules:
- jobsets/status
verbs:
- get
- patch
- update
- apiGroups:
- kubeflow.org
Expand Down
12 changes: 4 additions & 8 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"sigs.k8s.io/kueue/pkg/podset"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
clientutil "sigs.k8s.io/kueue/pkg/util/client"
"sigs.k8s.io/kueue/pkg/util/equality"
"sigs.k8s.io/kueue/pkg/util/kubeversion"
"sigs.k8s.io/kueue/pkg/util/maps"
Expand Down Expand Up @@ -294,10 +293,9 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
log.Error(err, "couldn't get the parent job workload")
return ctrl.Result{}, err
} else if parentWorkload == nil || !workload.IsAdmitted(parentWorkload) {
objectOriginal := object.DeepCopyObject().(client.Object)
// suspend it
job.Suspend()
if err := clientutil.Patch(ctx, r.client, objectOriginal, object); err != nil {
if err := r.client.Update(ctx, object); err != nil {
log.Error(err, "suspending child job failed")
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -800,11 +798,11 @@ func (r *JobReconciler) startJob(ctx context.Context, job GenericJob, object cli
return err
}
} else {
objectOriginal := object.DeepCopyObject().(client.Object)
if runErr := job.RunWithPodSetsInfo(info); runErr != nil {
return runErr
}
if err := clientutil.Patch(ctx, r.client, objectOriginal, object); err != nil {

if err := r.client.Update(ctx, object); err != nil {
return err
}
r.record.Event(object, corev1.EventTypeNormal, ReasonStarted, msg)
Expand Down Expand Up @@ -846,13 +844,11 @@ func (r *JobReconciler) stopJob(ctx context.Context, job GenericJob, wl *kueue.W
return nil
}

objectOriginal := object.DeepCopyObject().(client.Object)

job.Suspend()
if info != nil {
job.RestorePodSetsInfo(info)
}
if err := clientutil.Patch(ctx, r.client, objectOriginal, object); err != nil {
if err := r.client.Update(ctx, object); err != nil {
return err
}

Expand Down
22 changes: 5 additions & 17 deletions pkg/controller/jobs/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/podset"
clientutil "sigs.k8s.io/kueue/pkg/util/client"
)

var (
Expand Down Expand Up @@ -69,7 +68,7 @@ func init() {
// +kubebuilder:rbac:groups=scheduling.k8s.io,resources=priorityclasses,verbs=list;get;watch
// +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get;update
// +kubebuilder:rbac:groups=batch,resources=jobs/finalizers,verbs=get;update;patch
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch
Expand Down Expand Up @@ -163,39 +162,28 @@ func (j *Job) Suspend() {
j.Spec.Suspend = ptr.To(true)
}

func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, _ jobframework.StopReason, _ string) (bool, error) {
func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, _ jobframework.StopReason, eventMsg string) (bool, error) {
stoppedNow := false
if !j.IsSuspended() {
objectOriginal := j.Object().DeepCopyObject().(client.Object)
j.Suspend()
object := j.Object()
if err := clientutil.Patch(ctx, c, objectOriginal, object); err != nil {
if err := c.Update(ctx, j.Object()); err != nil {
return false, fmt.Errorf("suspend: %w", err)
}
if err := c.Get(ctx, client.ObjectKeyFromObject(object), object); err != nil {
return false, fmt.Errorf("get after suspend: %w", err)
}
stoppedNow = true
}

// Reset start time if necessary, so we can update the scheduling directives.
if j.Status.StartTime != nil {
objectOriginal := j.Object().DeepCopyObject().(client.Object)
j.Status.StartTime = nil
object := j.Object()
if err := clientutil.PatchStatus(ctx, c, objectOriginal, object); err != nil {
if err := c.Status().Update(ctx, j.Object()); err != nil {
return stoppedNow, fmt.Errorf("reset status: %w", err)
}
if err := c.Get(ctx, client.ObjectKeyFromObject(object), object); err != nil {
return false, fmt.Errorf("get after resetting the start time: %w", err)
}
}

objectOriginal := j.Object().DeepCopyObject().(client.Object)
if changed := j.RestorePodSetsInfo(podSetsInfo); !changed {
return stoppedNow, nil
}
if err := clientutil.Patch(ctx, c, objectOriginal, j.Object()); err != nil {
if err := c.Update(ctx, j.Object()); err != nil {
return false, fmt.Errorf("restore info: %w", err)
}
return stoppedNow, nil
Expand Down
19 changes: 9 additions & 10 deletions pkg/controller/jobs/job/job_multikueue_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,21 @@ import (
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/util/api"
clientutil "sigs.k8s.io/kueue/pkg/util/client"
)

type multikueueAdapter struct{}

var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil)

func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error {
localJob := &batchv1.Job{}
err := localClient.Get(ctx, key, localJob)
localJob := batchv1.Job{}
err := localClient.Get(ctx, key, &localJob)
if err != nil {
return err
}
localJobOriginal := localJob.DeepCopy()

remoteJob := &batchv1.Job{}
err = remoteClient.Get(ctx, key, remoteJob)
remoteJob := batchv1.Job{}
err = remoteClient.Get(ctx, key, &remoteJob)
if client.IgnoreNotFound(err) != nil {
return err
}
Expand All @@ -60,7 +58,7 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie
if err == nil {
if features.Enabled(features.MultiKueueBatchJobWithManagedBy) {
localJob.Status = remoteJob.Status
return clientutil.PatchStatus(ctx, localClient, localJobOriginal, localJob)
return localClient.Status().Update(ctx, &localJob)
}
remoteFinished := false
for _, c := range remoteJob.Status.Conditions {
Expand All @@ -69,14 +67,15 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie
break
}
}

if remoteFinished {
localJob.Status = remoteJob.Status
return clientutil.PatchStatus(ctx, localClient, localJobOriginal, localJob)
return localClient.Status().Update(ctx, &localJob)
}
return nil
}

remoteJob = &batchv1.Job{
remoteJob = batchv1.Job{
ObjectMeta: api.CloneObjectMetaForCreation(&localJob.ObjectMeta),
Spec: *localJob.Spec.DeepCopy(),
}
Expand All @@ -101,7 +100,7 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie
remoteJob.Spec.ManagedBy = nil
}

return remoteClient.Create(ctx, remoteJob)
return remoteClient.Create(ctx, &remoteJob)
}

func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobs/jobset/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func init() {
// +kubebuilder:rbac:groups=scheduling.k8s.io,resources=priorityclasses,verbs=list;get;watch
// +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch
// +kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/status,verbs=get;update
// +kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/finalizers,verbs=get;update
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch
Expand Down
16 changes: 7 additions & 9 deletions pkg/controller/jobs/jobset/jobset_multikueue_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,34 +33,32 @@ import (
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/util/api"
clientutil "sigs.k8s.io/kueue/pkg/util/client"
)

type multikueueAdapter struct{}

var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil)

func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error {
localJob := &jobset.JobSet{}
err := localClient.Get(ctx, key, localJob)
localJob := jobset.JobSet{}
err := localClient.Get(ctx, key, &localJob)
if err != nil {
return err
}
localJobOriginal := localJob.DeepCopy()

remoteJob := &jobset.JobSet{}
err = remoteClient.Get(ctx, key, remoteJob)
remoteJob := jobset.JobSet{}
err = remoteClient.Get(ctx, key, &remoteJob)
if client.IgnoreNotFound(err) != nil {
return err
}

// if the remote exists, just copy the status
if err == nil {
localJob.Status = remoteJob.Status
return clientutil.PatchStatus(ctx, localClient, localJobOriginal, localJob)
return localClient.Status().Update(ctx, &localJob)
}

remoteJob = &jobset.JobSet{
remoteJob = jobset.JobSet{
ObjectMeta: api.CloneObjectMetaForCreation(&localJob.ObjectMeta),
Spec: *localJob.Spec.DeepCopy(),
}
Expand All @@ -75,7 +73,7 @@ func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Clie
// clear the managedBy enables the JobSet controller to take over
remoteJob.Spec.ManagedBy = nil

return remoteClient.Create(ctx, remoteJob)
return remoteClient.Create(ctx, &remoteJob)
}

func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error {
Expand Down
11 changes: 0 additions & 11 deletions pkg/util/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,3 @@ func Patch(ctx context.Context, c client.Client, before, after client.Object) er
}
return nil
}

func PatchStatus(ctx context.Context, c client.Client, before, after client.Object) error {
patch, err := CreatePatch(before, after)
if err != nil {
return err
}
if err = c.Status().Patch(ctx, before, patch); err != nil {
return err
}
return nil
}

0 comments on commit e632fe7

Please sign in to comment.