Skip to content

Commit

Permalink
[multikueue] Use batch/Job spec.managedBy field. (kubernetes-sigs#2331
Browse files Browse the repository at this point in the history
)

* [multikueue] Use batch/Job `spec.managedBy` field.

Use `spec.managedBy` to detect delegatable jobs.
Enable live status updates for batch/Jobs.
No longer keep the multikueue admission check state `Pending`
when the job is running in a worker cluster.

* Review remarks

* [multikueue] Add `MultiKueueBatchJobWithManageBy` featuregate

* Review remarks

* Review remarks

* Fix linter issues.
  • Loading branch information
trasc authored and Fiona-Waters committed Jun 25, 2024
1 parent d9a58d0 commit 5f08ee9
Show file tree
Hide file tree
Showing 15 changed files with 613 additions and 39 deletions.
1 change: 1 addition & 0 deletions Makefile-test.mk
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ test: gotestsum ## Run tests.
test-integration: gomod-download envtest ginkgo mpi-operator-crd ray-operator-crd jobset-operator-crd kf-training-operator-crd cluster-autoscaler-crd kueuectl ## Run tests.
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" \
KUEUE_BIN=$(PROJECT_DIR)/bin \
ENVTEST_K8S_VERSION=$(ENVTEST_K8S_VERSION) \
$(GINKGO) $(GINKGO_ARGS) -procs=$(INTEGRATION_NPROCS) --junit-report=junit.xml --output-dir=$(ARTIFACTS) -v $(INTEGRATION_TARGET)

CREATE_KIND_CLUSTER ?= true
Expand Down
10 changes: 9 additions & 1 deletion hack/multikueue-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,16 @@ function startup {
if [ ! -d "$ARTIFACTS" ]; then
mkdir -p "$ARTIFACTS"
fi

KIND_VERSION=${E2E_KIND_VERSION/"kindest/node:v"/}
MANAGER_KIND_CONFIG="${SOURCE_DIR}/multikueue/manager-cluster.kind-${KIND_VERSION}.yaml"
if [ ! -f $MANAGER_KIND_CONFIG ]; then
MANAGER_KIND_CONFIG="${SOURCE_DIR}/multikueue/manager-cluster.kind.yaml"
fi

cluster_create "$MANAGER_KIND_CLUSTER_NAME" "$SOURCE_DIR/multikueue/manager-cluster.kind.yaml"
echo "Using manager config: $MANAGER_KIND_CONFIG"

cluster_create "$MANAGER_KIND_CLUSTER_NAME" "$MANAGER_KIND_CONFIG"
cluster_create $WORKER1_KIND_CLUSTER_NAME "$SOURCE_DIR/multikueue/worker-cluster.kind.yaml"
cluster_create $WORKER2_KIND_CLUSTER_NAME "$SOURCE_DIR/multikueue/worker-cluster.kind.yaml"
fi
Expand Down
20 changes: 20 additions & 0 deletions hack/multikueue/manager-cluster.kind-1.30.0.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
featureGates:
JobManagedBy: true
nodes:
- role: control-plane
kubeadmConfigPatches:
- |
kind: ClusterConfiguration
apiVersion: kubeadm.k8s.io/v1beta3
scheduler:
extraArgs:
v: "2"
controllerManager:
extraArgs:
v: "2"
apiServer:
extraArgs:
enable-aggregator-routing: "true"
v: "2"
57 changes: 51 additions & 6 deletions pkg/controller/admissionchecks/multikueue/batchjob_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@ package multikueue

import (
"context"
"errors"
"fmt"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/controller/constants"
kueuejob "sigs.k8s.io/kueue/pkg/controller/jobs/job"
"sigs.k8s.io/kueue/pkg/features"
)

type batchJobAdapter struct{}
Expand All @@ -47,8 +53,10 @@ func (b *batchJobAdapter) SyncJob(ctx context.Context, localClient client.Client

// the remote job exists
if err == nil {
// This will no longer be necessary when batchJob will support live status update, by then
// we should only sync the Status of the job if it's "Finished".
if features.Enabled(features.MultiKueueBatchJobWithManagedBy) {
localJob.Status = remoteJob.Status
return localClient.Status().Update(ctx, &localJob)
}
remoteFinished := false
for _, c := range remoteJob.Status.Conditions {
if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue {
Expand All @@ -60,9 +68,8 @@ func (b *batchJobAdapter) SyncJob(ctx context.Context, localClient client.Client
if remoteFinished {
localJob.Status = remoteJob.Status
return localClient.Status().Update(ctx, &localJob)
} else {
return nil
}
return nil
}

remoteJob = batchv1.Job{
Expand All @@ -85,6 +92,11 @@ func (b *batchJobAdapter) SyncJob(ctx context.Context, localClient client.Client
remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName
remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin

if features.Enabled(features.MultiKueueBatchJobWithManagedBy) {
// clear the managedBy enables the batch/Job controller to take over
remoteJob.Spec.ManagedBy = nil
}

return remoteClient.Create(ctx, &remoteJob)
}

Expand All @@ -98,9 +110,42 @@ func (b *batchJobAdapter) DeleteRemoteObject(ctx context.Context, remoteClient c
}

func (b *batchJobAdapter) KeepAdmissionCheckPending() bool {
return true
return !features.Enabled(features.MultiKueueBatchJobWithManagedBy)
}

func (b *batchJobAdapter) IsJobManagedByKueue(_ context.Context, _ client.Client, _ types.NamespacedName) (bool, string, error) {
func (b *batchJobAdapter) IsJobManagedByKueue(ctx context.Context, c client.Client, key types.NamespacedName) (bool, string, error) {
if !features.Enabled(features.MultiKueueBatchJobWithManagedBy) {
return true, "", nil
}

job := batchv1.Job{}
err := c.Get(ctx, key, &job)
if err != nil {
return false, "", err
}
jobControllerName := ptr.Deref(job.Spec.ManagedBy, "")
if jobControllerName != ControllerName {
return false, fmt.Sprintf("Expecting spec.managedBy to be %q not %q", ControllerName, jobControllerName), nil
}
return true, "", nil
}

var _ multiKueueWatcher = (*batchJobAdapter)(nil)

func (*batchJobAdapter) GetEmptyList() client.ObjectList {
return &batchv1.JobList{}
}

func (*batchJobAdapter) GetWorkloadKey(o runtime.Object) (types.NamespacedName, error) {
job, isJob := o.(*batchv1.Job)
if !isJob {
return types.NamespacedName{}, errors.New("not a job")
}

prebuiltWl, hasPrebuiltWorkload := job.Labels[constants.PrebuiltWorkloadLabel]
if !hasPrebuiltWorkload {
return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for job: %s", klog.KObj(job))
}

return types.NamespacedName{Name: prebuiltWl, Namespace: job.Namespace}, nil
}
Loading

0 comments on commit 5f08ee9

Please sign in to comment.