Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multikueue] Job live status update #1668

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 32 additions & 18 deletions pkg/controller/admissionchecks/multikueue/batchjob_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (
"context"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/controller/constants"
kueuejob "sigs.k8s.io/kueue/pkg/controller/jobs/job"
)
Expand All @@ -30,14 +32,41 @@ type batchJobAdapter struct{}

var _ jobAdapter = (*batchJobAdapter)(nil)

func (b *batchJobAdapter) CreateRemoteObject(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName string) error {
func (b *batchJobAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error {
localJob := batchv1.Job{}
err := localClient.Get(ctx, key, &localJob)
if err != nil {
return err
}

remoteJob := batchv1.Job{
remoteJob := batchv1.Job{}
err = remoteClient.Get(ctx, key, &remoteJob)
if client.IgnoreNotFound(err) != nil {
return err
}

// the remote job exists
if err == nil {
// This will no longer be necessary when batchJob will support live status update, by then
// we should only sync the Status of the job if it's "Finished".
remoteFinished := false
for _, c := range remoteJob.Status.Conditions {
trasc marked this conversation as resolved.
Show resolved Hide resolved
if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue {
remoteFinished = true
break
}
}

if remoteFinished {
localJob.Status = remoteJob.Status
return localClient.Status().Update(ctx, &localJob)
} else {
return nil
}

}

remoteJob = batchv1.Job{
ObjectMeta: cleanObjectMeta(&localJob.ObjectMeta),
Spec: *localJob.Spec.DeepCopy(),
}
Expand All @@ -55,26 +84,11 @@ func (b *batchJobAdapter) CreateRemoteObject(ctx context.Context, localClient cl
remoteJob.Labels = map[string]string{}
}
remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName
remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin

return remoteClient.Create(ctx, &remoteJob)
}

func (b *batchJobAdapter) CopyStatusRemoteObject(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName) error {
localJob := batchv1.Job{}
err := localClient.Get(ctx, key, &localJob)
if err != nil {
return client.IgnoreNotFound(err)
}

remoteJob := batchv1.Job{}
err = remoteClient.Get(ctx, key, &remoteJob)
if err != nil {
return err
}
localJob.Status = remoteJob.Status
return localClient.Status().Update(ctx, &localJob)
}

func (b *batchJobAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error {
job := batchv1.Job{}
err := remoteClient.Get(ctx, key, &job)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/admissionchecks/multikueue/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,6 @@ func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) e
return err
}

wlRec := newWlReconciler(mgr.GetClient(), helper, cRec)
wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin)
return wlRec.setupWithManager(mgr)
}
58 changes: 40 additions & 18 deletions pkg/controller/admissionchecks/multikueue/jobset_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,43 @@ package multikueue

import (
"context"
"errors"
"fmt"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/controller/constants"
)

type jobsetAdapter struct{}

var _ jobAdapter = (*jobsetAdapter)(nil)

func (b *jobsetAdapter) CreateRemoteObject(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName string) error {
func (b *jobsetAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error {
localJob := jobset.JobSet{}
err := localClient.Get(ctx, key, &localJob)
if err != nil {
return err
}

remoteJob := jobset.JobSet{
remoteJob := jobset.JobSet{}
err = remoteClient.Get(ctx, key, &remoteJob)
if client.IgnoreNotFound(err) != nil {
return err
}

// if the remote exists, just copy the status
if err == nil {
localJob.Status = remoteJob.Status
return localClient.Status().Update(ctx, &localJob)
}

remoteJob = jobset.JobSet{
ObjectMeta: cleanObjectMeta(&localJob.ObjectMeta),
Spec: *localJob.Spec.DeepCopy(),
}
Expand All @@ -46,26 +63,11 @@ func (b *jobsetAdapter) CreateRemoteObject(ctx context.Context, localClient clie
remoteJob.Labels = map[string]string{}
}
remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName
remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin

return remoteClient.Create(ctx, &remoteJob)
}

func (b *jobsetAdapter) CopyStatusRemoteObject(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName) error {
localJob := jobset.JobSet{}
err := localClient.Get(ctx, key, &localJob)
if err != nil {
return client.IgnoreNotFound(err)
}

remoteJob := jobset.JobSet{}
err = remoteClient.Get(ctx, key, &remoteJob)
if err != nil {
return err
}
localJob.Status = remoteJob.Status
return localClient.Status().Update(ctx, &localJob)
}

func (b *jobsetAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error {
job := jobset.JobSet{}
err := remoteClient.Get(ctx, key, &job)
Expand All @@ -78,3 +80,23 @@ func (b *jobsetAdapter) DeleteRemoteObject(ctx context.Context, remoteClient cli
func (b *jobsetAdapter) KeepAdmissionCheckPending() bool {
return false
}

var _ multiKueueWatcher = (*jobsetAdapter)(nil)

func (*jobsetAdapter) GetEmptyList() client.ObjectList {
return &jobset.JobSetList{}
}

func (*jobsetAdapter) GetWorkloadKey(o runtime.Object) (types.NamespacedName, error) {
jobSet, isJobSet := o.(*jobset.JobSet)
if !isJobSet {
return types.NamespacedName{}, errors.New("not a jobset")
}

prebuiltWl, hasPrebuiltWorkload := jobSet.Labels[constants.PrebuiltWorkloadLabel]
if !hasPrebuiltWorkload {
return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for jobset: %s", klog.KObj(jobSet))
}

return types.NamespacedName{Name: prebuiltWl, Namespace: jobSet.Namespace}, nil
}
100 changes: 96 additions & 4 deletions pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,98 @@ func TestWlReconcileJobset(t *testing.T) {
wantWorker1JobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().
Label(constants.PrebuiltWorkloadLabel, "wl1").
Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
Obj(),
},
},
"remote jobset status is changed, the status is copied in the local Jobset ": {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{
Name: "ac1",
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},

managersJobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().Obj(),
},

worker1JobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().
Label(constants.PrebuiltWorkloadLabel, "wl1").
JobsStatus(
jobset.ReplicatedJobStatus{
Name: "replicated-job-1",
Ready: 1,
Succeeded: 1,
},
jobset.ReplicatedJobStatus{
Name: "replicated-job-2",
Ready: 3,
Succeeded: 0,
},
).
Obj(),
},

worker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{
Name: "ac1",
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
wantManagersJobsSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().
JobsStatus(
jobset.ReplicatedJobStatus{
Name: "replicated-job-1",
Ready: 1,
Succeeded: 1,
},
jobset.ReplicatedJobStatus{
Name: "replicated-job-2",
Ready: 3,
Succeeded: 0,
},
).
Obj(),
},

wantWorker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
wantWorker1JobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().
Label(constants.PrebuiltWorkloadLabel, "wl1").
JobsStatus(
jobset.ReplicatedJobStatus{
Name: "replicated-job-1",
Ready: 1,
Succeeded: 1,
},
jobset.ReplicatedJobStatus{
Name: "replicated-job-2",
Ready: 3,
Succeeded: 0,
},
).
Obj(),
},
},
Expand Down Expand Up @@ -142,7 +234,7 @@ func TestWlReconcileJobset(t *testing.T) {
}).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `by test`}).
Obj(),
},
wantManagersJobsSets: []jobset.JobSet{
Expand Down Expand Up @@ -174,7 +266,7 @@ func TestWlReconcileJobset(t *testing.T) {
}).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `by test`}).
Obj(),
},

Expand Down Expand Up @@ -206,7 +298,7 @@ func TestWlReconcileJobset(t *testing.T) {
}).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `by test`}).
Obj(),
},
wantManagersJobsSets: []jobset.JobSet{
Expand Down Expand Up @@ -245,7 +337,7 @@ func TestWlReconcileJobset(t *testing.T) {
cRec.remoteClients["worker1"] = w1remoteClient

helper, _ := newMultiKueueStoreHelper(managerClient)
reconciler := newWlReconciler(managerClient, helper, cRec)
reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin)

_, gotErr := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: "wl1", Namespace: TestNamespace}})
if gotErr != nil {
Expand Down
Loading