Skip to content

Commit

Permalink
TAS: Support for LWS.
Browse files Browse the repository at this point in the history
  • Loading branch information
mbobrovskyi committed Feb 13, 2025
1 parent 79e9141 commit 57c2550
Show file tree
Hide file tree
Showing 7 changed files with 434 additions and 20 deletions.
1 change: 1 addition & 0 deletions Makefile-test.mk
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ run-test-tas-e2e-%: FORCE
E2E_KIND_VERSION="kindest/node:v$(K8S_VERSION)" KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) CREATE_KIND_CLUSTER=$(CREATE_KIND_CLUSTER) \
ARTIFACTS="$(ARTIFACTS)/$@" IMAGE_TAG=$(IMAGE_TAG) GINKGO_ARGS="$(GINKGO_ARGS)" \
JOBSET_VERSION=$(JOBSET_VERSION) KUBEFLOW_VERSION=$(KUBEFLOW_VERSION) KUBEFLOW_MPI_VERSION=$(KUBEFLOW_MPI_VERSION) \
LEADERWORKERSET_VERSION=$(LEADERWORKERSET_VERSION) \
KIND_CLUSTER_FILE="tas-kind-cluster.yaml" E2E_TARGET_FOLDER="tas" \
./hack/e2e-test.sh
$(PROJECT_DIR)/bin/ginkgo-top -i $(ARTIFACTS)/$@/e2e.json > $(ARTIFACTS)/$@/e2e-top.yaml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (r *Reconciler) podSets(lws *leaderworkersetv1.LeaderWorkerSet) []kueue.Pod
},
TopologyRequest: jobframework.PodSetTopologyRequest(
&lws.Spec.LeaderWorkerTemplate.WorkerTemplate.ObjectMeta,
nil,
ptr.To(leaderworkersetv1.WorkerIndexLabelKey),
nil,
nil,
),
Expand Down
110 changes: 110 additions & 0 deletions pkg/controller/jobs/leaderworkerset/leaderworkerset_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
leaderworkersetv1 "sigs.k8s.io/lws/api/leaderworkerset/v1"

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
podcontroller "sigs.k8s.io/kueue/pkg/controller/jobs/pod"
Expand Down Expand Up @@ -273,6 +275,114 @@ func TestReconciler(t *testing.T) {
},
},
},
"should create prebuild workload with required topology annotation": {
leaderWorkerSet: leaderworkerset.MakeLeaderWorkerSet(testLWS, testNS).
UID(testUID).
Size(3).
LeaderTemplate(corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
kueuealpha.PodSetRequiredTopologyAnnotation: "cloud.com/block",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "c", Image: "pause"},
},
},
}).
WorkerTemplate(corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
kueuealpha.PodSetRequiredTopologyAnnotation: "cloud.com/block",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "c", Image: "pause"},
},
},
}).
Obj(),
wantLeaderWorkerSet: leaderworkerset.MakeLeaderWorkerSet(testLWS, testNS).
UID(testUID).
Size(3).
LeaderTemplate(corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
kueuealpha.PodSetRequiredTopologyAnnotation: "cloud.com/block",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "c", Image: "pause"},
},
},
}).
WorkerTemplate(corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
kueuealpha.PodSetRequiredTopologyAnnotation: "cloud.com/block",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "c", Image: "pause"},
},
},
}).
Obj(),
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload(GetWorkloadName(types.UID(testUID), testLWS, "0"), testNS).
Annotation(podcontroller.IsGroupWorkloadAnnotationKey, podcontroller.IsGroupWorkloadAnnotationValue).
Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(
kueue.PodSet{
Name: leaderPodSetName,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "c", Image: "pause"},
},
},
},
Count: 1,
TopologyRequest: &kueue.PodSetTopologyRequest{
Required: ptr.To("cloud.com/block"),
PodIndexLabel: ptr.To(leaderworkersetv1.WorkerIndexLabelKey),
},
},
kueue.PodSet{
Name: workerPodSetName,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "c", Image: "pause"},
},
},
},
Count: 2,
TopologyRequest: &kueue.PodSetTopologyRequest{
Required: ptr.To("cloud.com/block"),
PodIndexLabel: ptr.To(leaderworkersetv1.WorkerIndexLabelKey),
},
},
).
Obj(),
},
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Name: testLWS, Namespace: testNS},
EventType: corev1.EventTypeNormal,
Reason: jobframework.ReasonCreatedWorkload,
Message: fmt.Sprintf(
"Created Workload: %s/%s",
testNS,
GetWorkloadName(types.UID(testUID), testLWS, "0"),
),
},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
26 changes: 23 additions & 3 deletions pkg/controller/jobs/leaderworkerset/leaderworkerset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ var (
specPath = field.NewPath("spec")
leaderWorkerTemplatePath = specPath.Child("leaderWorkerTemplate")
leaderTemplatePath = leaderWorkerTemplatePath.Child("leaderTemplate")
leaderTemplateMetaPath = leaderTemplatePath.Child("metadata")
workerTemplatePath = leaderWorkerTemplatePath.Child("workerTemplate")
workerTemplateMetaPath = workerTemplatePath.Child("metadata")
)

func (wh *Webhook) ValidateCreate(ctx context.Context, obj runtime.Object) (warnings admission.Warnings, err error) {
Expand All @@ -108,7 +110,7 @@ func (wh *Webhook) ValidateCreate(ctx context.Context, obj runtime.Object) (warn
log := ctrl.LoggerFrom(ctx).WithName("leaderworkerset-webhook")
log.V(5).Info("Validating create")

allErrs := jobframework.ValidateQueueName(lws.Object())
allErrs := validateCreate(lws)

return nil, allErrs.ToAggregate()
}
Expand All @@ -120,11 +122,13 @@ func (wh *Webhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Ob
log := ctrl.LoggerFrom(ctx).WithName("leaderworkerset-webhook")
log.V(5).Info("Validating update")

allErrs := apivalidation.ValidateImmutableField(
allErrs := validateCreate(newLeaderWorkerSet)

allErrs = append(allErrs, apivalidation.ValidateImmutableField(
jobframework.QueueNameForObject(newLeaderWorkerSet.Object()),
jobframework.QueueNameForObject(oldLeaderWorkerSet.Object()),
queueNameLabelPath,
)
)...)
allErrs = append(allErrs, jobframework.ValidateUpdateForWorkloadPriorityClassName(
newLeaderWorkerSet.Object(),
oldLeaderWorkerSet.Object(),
Expand Down Expand Up @@ -155,6 +159,22 @@ func GetWorkloadName(uid types.UID, name string, groupIndex string) string {
return jobframework.GetWorkloadNameForOwnerWithGVK(fmt.Sprintf("%s-%s", name, groupIndex), uid, gvk)
}

func validateCreate(lws *LeaderWorkerSet) field.ErrorList {
var allErrs field.ErrorList
allErrs = append(allErrs, jobframework.ValidateQueueName(lws.Object())...)
allErrs = append(allErrs, validateTopologyRequest(lws)...)
return allErrs
}

func validateTopologyRequest(lws *LeaderWorkerSet) field.ErrorList {
var allErrs field.ErrorList
if lws.Spec.LeaderWorkerTemplate.LeaderTemplate != nil {
allErrs = append(allErrs, jobframework.ValidateTASPodSetRequest(leaderTemplateMetaPath, &lws.Spec.LeaderWorkerTemplate.LeaderTemplate.ObjectMeta)...)
}
allErrs = append(allErrs, jobframework.ValidateTASPodSetRequest(workerTemplateMetaPath, &lws.Spec.LeaderWorkerTemplate.WorkerTemplate.ObjectMeta)...)
return allErrs
}

func validateImmutablePodTemplateSpec(newPodTemplateSpec *corev1.PodTemplateSpec, oldPodTemplateSpec *corev1.PodTemplateSpec, fieldPath *field.Path) field.ErrorList {
allErrors := field.ErrorList{}
if newPodTemplateSpec == nil || oldPodTemplateSpec == nil {
Expand Down
114 changes: 114 additions & 0 deletions pkg/controller/jobs/leaderworkerset/leaderworkerset_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation/field"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
leaderworkersetv1 "sigs.k8s.io/lws/api/leaderworkerset/v1"

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
Expand Down Expand Up @@ -153,6 +155,58 @@ func TestValidateCreate(t *testing.T) {
},
}.ToAggregate(),
},
"valid topology request": {
lws: testingleaderworkerset.MakeLeaderWorkerSet("test-lws", "").
LeaderTemplate(corev1.PodTemplateSpec{}).
Queue("test-queue").
LeaderTemplate(corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
kueuealpha.PodSetRequiredTopologyAnnotation: "cloud.com/block",
},
},
}).
WorkerTemplate(corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
kueuealpha.PodSetRequiredTopologyAnnotation: "cloud.com/block",
},
},
}).
Obj(),
},
"invalid topology request": {
lws: testingleaderworkerset.MakeLeaderWorkerSet("test-lws", "").
LeaderTemplate(corev1.PodTemplateSpec{}).
Queue("test-queue").
LeaderTemplate(corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
kueuealpha.PodSetRequiredTopologyAnnotation: "cloud.com/block",
kueuealpha.PodSetPreferredTopologyAnnotation: "cloud.com/block",
},
},
}).
WorkerTemplate(corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
kueuealpha.PodSetRequiredTopologyAnnotation: "cloud.com/block",
kueuealpha.PodSetPreferredTopologyAnnotation: "cloud.com/block",
},
},
}).
Obj(),
wantErr: field.ErrorList{
&field.Error{
Type: field.ErrorTypeInvalid,
Field: "spec.leaderWorkerTemplate.leaderTemplate.metadata.annotations",
},
&field.Error{
Type: field.ErrorTypeInvalid,
Field: "spec.leaderWorkerTemplate.workerTemplate.metadata.annotations",
},
}.ToAggregate(),
},
}

for name, tc := range testCases {
Expand Down Expand Up @@ -519,6 +573,66 @@ func TestValidateUpdate(t *testing.T) {
},
}.ToAggregate(),
},
"set valid topology request": {
oldObj: testingleaderworkerset.MakeLeaderWorkerSet("test-lws", "").
LeaderTemplate(corev1.PodTemplateSpec{}).
WorkerTemplate(corev1.PodTemplateSpec{}).
Queue("test-queue").
Obj(),
newObj: testingleaderworkerset.MakeLeaderWorkerSet("test-lws", "").
LeaderTemplate(corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
kueuealpha.PodSetRequiredTopologyAnnotation: "cloud.com/block",
},
},
}).
WorkerTemplate(corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
kueuealpha.PodSetRequiredTopologyAnnotation: "cloud.com/block",
},
},
}).
Queue("test-queue").
Obj(),
},
"set invalid topology request": {
oldObj: testingleaderworkerset.MakeLeaderWorkerSet("test-lws", "").
LeaderTemplate(corev1.PodTemplateSpec{}).
WorkerTemplate(corev1.PodTemplateSpec{}).
Queue("test-queue").
Obj(),
newObj: testingleaderworkerset.MakeLeaderWorkerSet("test-lws", "").
LeaderTemplate(corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
kueuealpha.PodSetRequiredTopologyAnnotation: "cloud.com/block",
kueuealpha.PodSetPreferredTopologyAnnotation: "cloud.com/block",
},
},
}).
WorkerTemplate(corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
kueuealpha.PodSetRequiredTopologyAnnotation: "cloud.com/block",
kueuealpha.PodSetPreferredTopologyAnnotation: "cloud.com/block",
},
},
}).
Queue("test-queue").
Obj(),
wantErr: field.ErrorList{
&field.Error{
Type: field.ErrorTypeInvalid,
Field: "spec.leaderWorkerTemplate.leaderTemplate.metadata.annotations",
},
&field.Error{
Type: field.ErrorTypeInvalid,
Field: "spec.leaderWorkerTemplate.workerTemplate.metadata.annotations",
},
}.ToAggregate(),
},
}

for name, tc := range testCases {
Expand Down
Loading

0 comments on commit 57c2550

Please sign in to comment.