Skip to content

Commit

Permalink
[multikueue] Allow local admission if supported by the job.
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Jan 18, 2024
1 parent bd11b3f commit 1dbec50
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 13 deletions.
4 changes: 4 additions & 0 deletions pkg/controller/admissionchecks/multikueue/batchjob_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,7 @@ func (b *batchJobAdapter) DeleteRemoteObject(ctx context.Context, remoteClient c
}
return client.IgnoreNotFound(remoteClient.Delete(ctx, &job))
}

func (b *batchJobAdapter) KeepAdmissionCheckPending() bool {
return true
}
4 changes: 4 additions & 0 deletions pkg/controller/admissionchecks/multikueue/jobset_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,7 @@ func (b *jobsetAdapter) DeleteRemoteObject(ctx context.Context, remoteClient cli
}
return client.IgnoreNotFound(remoteClient.Delete(ctx, &job))
}

func (b *jobsetAdapter) KeepAdmissionCheckPending() bool {
return false
}
10 changes: 5 additions & 5 deletions pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestWlReconcileJobset(t *testing.T) {
*baseWorklodBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{
Name: "ac1",
State: kueue.CheckStatePending,
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestWlReconcileJobset(t *testing.T) {
*baseWorklodBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{
Name: "ac1",
State: kueue.CheckStatePending,
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestWlReconcileJobset(t *testing.T) {
*baseWorklodBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{
Name: "ac1",
State: kueue.CheckStatePending,
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestWlReconcileJobset(t *testing.T) {
*baseWorklodBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{
Name: "ac1",
State: kueue.CheckStatePending,
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestWlReconcileJobset(t *testing.T) {
*baseWorklodBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{
Name: "ac1",
State: kueue.CheckStatePending,
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference("jobset.x-k8s.io/v1alpha2", "JobSet", "jobset1", "uid1", true, true).
Expand Down
14 changes: 11 additions & 3 deletions pkg/controller/admissionchecks/multikueue/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ type jobAdapter interface {
CopyStatusRemoteObject(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName) error
// Deletes the Job in the worker cluster.
DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error
// KeepAdmissionCheckPending returns true if the state of the multikueue admission check should be
// kept Pending while the job runs in a worker. This might be needed to keep the managers job
// suspended and not start the execution locally.
// Going forward, if the job management is controlled at object label, this should get a workload
// reference as argument.
KeepAdmissionCheckPending() bool
}

type wlGroup struct {
Expand Down Expand Up @@ -287,9 +293,11 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) error
}

if acs.State != kueue.CheckStateRetry && acs.State != kueue.CheckStateRejected {
// For now, the admission check is keept in pending to avoid the execution in the
// local cluster.
acs.State = kueue.CheckStatePending
if group.jobAdapter.KeepAdmissionCheckPending() {
acs.State = kueue.CheckStatePending
} else {
acs.State = kueue.CheckStateReady
}
// update the message
acs.Message = fmt.Sprintf("The workload got reservation on %q", reservingRemote)
wlPatch := workload.BaseSSAWorkload(group.local)
Expand Down
12 changes: 9 additions & 3 deletions test/e2e/multikueue/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,14 +280,20 @@ var _ = ginkgo.Describe("MultiKueue", func() {
wlLookupKey := types.NamespacedName{Name: workloadjobset.GetWorkloadNameForJobSet(jobSet.Name), Namespace: managerNs.Name}

// the execution should be given to the worker
ginkgo.By("Waiting to be admitted in worker1", func() {
ginkgo.By("Waiting to be admitted in worker1 and manager", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdLeaderWorkload)).To(gomega.Succeed())
g.Expect(workload.FindAdmissionCheck(createdLeaderWorkload.Status.AdmissionChecks, multiKueueAc.Name)).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{
Name: multiKueueAc.Name,
State: kueue.CheckStatePending,
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime")))
g.Expect(apimeta.FindStatusCondition(createdLeaderWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionTrue,
Reason: "Admitted",
Message: "The workload is admitted",
}, cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

Expand Down Expand Up @@ -316,7 +322,7 @@ var _ = ginkgo.Describe("MultiKueue", func() {

createdJobSet := &jobset.JobSet{}
gomega.Expect(k8sManagerClient.Get(ctx, client.ObjectKeyFromObject(jobSet), createdJobSet)).To(gomega.Succeed())
gomega.Expect(ptr.Deref(createdJobSet.Spec.Suspend, false)).To(gomega.BeTrue())
gomega.Expect(ptr.Deref(createdJobSet.Spec.Suspend, true)).To(gomega.BeFalse())
gomega.Expect(createdJobSet.Status.Conditions).To(gomega.ContainElement(gomega.BeComparableTo(
metav1.Condition{
Type: string(jobset.JobSetCompleted),
Expand Down
12 changes: 10 additions & 2 deletions test/integration/multikueue/multikueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ var _ = ginkgo.Describe("Multikueue", func() {
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("setting workload reservation in worker1, acs is updated in manager amd worker2 wl is removed", func() {
ginkgo.By("setting workload reservation in worker1, the workload is admitted in manager amd worker2 wl is removed", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(worker1Cluster.client.Get(worker1Cluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(util.SetQuotaReservation(worker1Cluster.ctx, worker1Cluster.client, createdWorkload, admission)).To(gomega.Succeed())
Expand All @@ -322,8 +322,16 @@ var _ = ginkgo.Describe("Multikueue", func() {
g.Expect(managerCluster.client.Get(managerCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
acs := workload.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, multikueueAC.Name)
g.Expect(acs).NotTo(gomega.BeNil())
g.Expect(acs.State).To(gomega.Equal(kueue.CheckStatePending))
g.Expect(acs.State).To(gomega.Equal(kueue.CheckStateReady))
g.Expect(acs.Message).To(gomega.Equal(`The workload got reservation on "worker1"`))

g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionTrue,
Reason: "Admitted",
Message: "The workload is admitted",
}, cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")))

}, util.Timeout, util.Interval).Should(gomega.Succeed())

gomega.Eventually(func(g gomega.Gomega) {
Expand Down

0 comments on commit 1dbec50

Please sign in to comment.