From 7e37e0cc9419e5328c133307c69ea748fa99698f Mon Sep 17 00:00:00 2001 From: Mykhailo Bobrovskyi Date: Wed, 26 Jun 2024 14:08:21 +0300 Subject: [PATCH] [kjobctl] Add job builder. (#2394) --- cmd/experimental/kjobctl/go.mod | 7 +- cmd/experimental/kjobctl/go.sum | 2 + .../kjobctl/pkg/builder/builder.go | 217 ++++++++++ .../kjobctl/pkg/builder/builder_test.go | 277 +++++++++++++ .../kjobctl/pkg/builder/job_builder.go | 128 ++++++ .../kjobctl/pkg/builder/job_builder_test.go | 389 ++++++++++++++++++ .../kjobctl/pkg/cmd/completion/completion.go | 2 +- .../kjobctl/pkg/cmd/util/client_getter.go | 4 +- .../kjobctl/pkg/constants/constants.go | 21 + 9 files changed, 1040 insertions(+), 7 deletions(-) create mode 100644 cmd/experimental/kjobctl/pkg/builder/builder.go create mode 100644 cmd/experimental/kjobctl/pkg/builder/builder_test.go create mode 100644 cmd/experimental/kjobctl/pkg/builder/job_builder.go create mode 100644 cmd/experimental/kjobctl/pkg/builder/job_builder_test.go create mode 100644 cmd/experimental/kjobctl/pkg/constants/constants.go diff --git a/cmd/experimental/kjobctl/go.mod b/cmd/experimental/kjobctl/go.mod index 1dde6b5f5a..1ad55b3ce5 100644 --- a/cmd/experimental/kjobctl/go.mod +++ b/cmd/experimental/kjobctl/go.mod @@ -3,13 +3,16 @@ module sigs.k8s.io/kueue/cmd/experimental/kjobctl go 1.22.3 require ( + github.com/google/go-cmp v0.6.0 github.com/spf13/cobra v1.8.1 k8s.io/api v0.30.2 k8s.io/apimachinery v0.30.2 k8s.io/cli-runtime v0.30.2 k8s.io/client-go v0.30.2 k8s.io/klog/v2 v2.120.1 + k8s.io/utils v0.0.0-20240102154912-e7106e64919e sigs.k8s.io/controller-runtime v0.18.4 + sigs.k8s.io/kueue v0.7.0 ) require ( @@ -41,12 +44,9 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/onsi/ginkgo/v2 v2.19.0 // indirect - github.com/onsi/gomega v1.33.1 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect github.com/pkg/errors v0.9.1 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/stretchr/objx v0.5.2 // indirect github.com/xlab/treeprint v1.2.0 // indirect go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect golang.org/x/net v0.25.0 // indirect @@ -62,7 +62,6 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect - k8s.io/utils v0.0.0-20240102154912-e7106e64919e // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/kustomize/api v0.13.5-0.20230601165947-6ce0bf390ce3 // indirect sigs.k8s.io/kustomize/kyaml v0.14.3-0.20230601165947-6ce0bf390ce3 // indirect diff --git a/cmd/experimental/kjobctl/go.sum b/cmd/experimental/kjobctl/go.sum index fae4f7f56a..2729d28b3b 100644 --- a/cmd/experimental/kjobctl/go.sum +++ b/cmd/experimental/kjobctl/go.sum @@ -274,6 +274,8 @@ sigs.k8s.io/controller-runtime v0.18.4 h1:87+guW1zhvuPLh1PHybKdYFLU0YJp4FhJRmiHv sigs.k8s.io/controller-runtime v0.18.4/go.mod h1:TVoGrfdpbA9VRFaRnKgk9P5/atA0pMwq+f+msb9M8Sg= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= +sigs.k8s.io/kueue v0.7.0 h1:Rwg2Ce/0kjZwdov1XdsAFb11QQtOOHx7HCTwFSMd8wc= +sigs.k8s.io/kueue v0.7.0/go.mod h1:tjzIB8Y1vWwBJRWJsXByRIx89PGUq5/mzeLnYHZoFtk= sigs.k8s.io/kustomize/api v0.13.5-0.20230601165947-6ce0bf390ce3 h1:XX3Ajgzov2RKUdc5jW3t5jwY7Bo7dcRm+tFxT+NfgY0= sigs.k8s.io/kustomize/api v0.13.5-0.20230601165947-6ce0bf390ce3/go.mod h1:9n16EZKMhXBNSiUC5kSdFQJkdH3zbxS/JoO619G1VAY= sigs.k8s.io/kustomize/kyaml v0.14.3-0.20230601165947-6ce0bf390ce3 h1:W6cLQc5pnqM7vh3b7HvGNfXrJ/xL6BDMS0v1V/HHg5U= diff --git a/cmd/experimental/kjobctl/pkg/builder/builder.go b/cmd/experimental/kjobctl/pkg/builder/builder.go new file mode 100644 index 0000000000..8c36af611e --- /dev/null +++ b/cmd/experimental/kjobctl/pkg/builder/builder.go @@ -0,0 +1,217 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package builder + +import ( + "context" + "errors" + "slices" + + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + k8s "k8s.io/client-go/kubernetes" + + "sigs.k8s.io/kueue/cmd/experimental/kjobctl/apis/v1alpha1" + "sigs.k8s.io/kueue/cmd/experimental/kjobctl/client-go/clientset/versioned" + "sigs.k8s.io/kueue/cmd/experimental/kjobctl/pkg/cmd/util" +) + +var ( + noNamespaceSpecifiedErr = errors.New("no namespace specified") + noApplicationProfileSpecifiedErr = errors.New("no application profile specified") + noApplicationProfileModeSpecifiedErr = errors.New("no application profile mode specified") + invalidApplicationProfileModeErr = errors.New("invalid application profile mode") + applicationProfileModeNotConfiguredErr = errors.New("application profile mode not configured") + noCommandSpecifiedErr = errors.New("no command specified") + noParallelismSpecifiedErr = errors.New("no parallelism specified") + noCompletionsSpecifiedErr = errors.New("no completions specified") + noRequestsSpecifiedErr = errors.New("no requests specified") + noLocalQueueSpecifiedErr = errors.New("no local queue specified") +) + +type builder interface { + build(ctx context.Context) (runtime.Object, error) +} + +type Builder struct { + clientGetter util.ClientGetter + kjobctlClientset versioned.Interface + k8sClientset k8s.Interface + + namespace string + profileName string + modeName v1alpha1.ApplicationProfileMode + + command []string + parallelism *int32 + completions *int32 + requests corev1.ResourceList + localQueue string + + profile *v1alpha1.ApplicationProfile + mode *v1alpha1.SupportedMode + volumeBundles []v1alpha1.VolumeBundle +} + +func NewBuilder(clientGetter util.ClientGetter) *Builder { + return &Builder{clientGetter: clientGetter} +} + +func (b *Builder) WithNamespace(namespace string) *Builder { + b.namespace = namespace + return b +} + +func (b *Builder) WithProfileName(profileName string) *Builder { + b.profileName = profileName + return b +} + +func (b *Builder) WithModeName(modeName v1alpha1.ApplicationProfileMode) *Builder { + b.modeName = modeName + return b +} + +func (b *Builder) WithCommand(command []string) *Builder { + b.command = command + return b +} + +func (b *Builder) WithParallelism(parallelism *int32) *Builder { + b.parallelism = parallelism + return b +} + +func (b *Builder) WithCompletions(completions *int32) *Builder { + b.completions = completions + return b +} + +func (b *Builder) WithRequests(requests corev1.ResourceList) *Builder { + b.requests = requests + return b +} + +func (b *Builder) WithLocalQueue(localQueue string) *Builder { + b.localQueue = localQueue + return b +} + +func (b *Builder) validateGeneral() error { + if b.namespace == "" { + return noNamespaceSpecifiedErr + } + + if b.profileName == "" { + return noApplicationProfileSpecifiedErr + } + + if b.modeName == "" { + return noApplicationProfileModeSpecifiedErr + } + + return nil +} + +func (b *Builder) complete(ctx context.Context) error { + var err error + + b.kjobctlClientset, err = b.clientGetter.KjobctlClientset() + if err != nil { + return err + } + + b.k8sClientset, err = b.clientGetter.K8sClientset() + if err != nil { + return err + } + + b.profile, err = b.kjobctlClientset.KjobctlV1alpha1().ApplicationProfiles(b.namespace).Get(ctx, b.profileName, v1.GetOptions{}) + if err != nil { + return err + } + + for i, mode := range b.profile.Spec.SupportedModes { + if mode.Name == b.modeName { + b.mode = &b.profile.Spec.SupportedModes[i] + } + } + + if b.mode == nil { + return applicationProfileModeNotConfiguredErr + } + + volumeBundlesList, err := b.kjobctlClientset.KjobctlV1alpha1().VolumeBundles(b.profile.Namespace).List(ctx, v1.ListOptions{}) + if err != nil { + return err + } + + b.volumeBundles = volumeBundlesList.Items + + return nil +} + +func (b *Builder) validateFlags() error { + if slices.Contains(b.mode.RequiredFlags, v1alpha1.CmdFlag) && len(b.command) == 0 { + return noCommandSpecifiedErr + } + + if slices.Contains(b.mode.RequiredFlags, v1alpha1.ParallelismFlag) && b.parallelism == nil { + return noParallelismSpecifiedErr + } + + if slices.Contains(b.mode.RequiredFlags, v1alpha1.CompletionsFlag) && b.completions == nil { + return noCompletionsSpecifiedErr + } + + if slices.Contains(b.mode.RequiredFlags, v1alpha1.RequestFlag) && b.requests == nil { + return noRequestsSpecifiedErr + } + + if slices.Contains(b.mode.RequiredFlags, v1alpha1.LocalQueueFlag) && b.localQueue == "" { + return noLocalQueueSpecifiedErr + } + + return nil +} + +func (b *Builder) Do(ctx context.Context) (runtime.Object, error) { + if err := b.validateGeneral(); err != nil { + return nil, err + } + + var bImpl builder + + if b.modeName == v1alpha1.JobMode { + bImpl = newJobBuilder(b) + } + + if bImpl == nil { + return nil, invalidApplicationProfileModeErr + } + + if err := b.complete(ctx); err != nil { + return nil, err + } + + if err := b.validateFlags(); err != nil { + return nil, err + } + + return bImpl.build(ctx) +} diff --git a/cmd/experimental/kjobctl/pkg/builder/builder_test.go b/cmd/experimental/kjobctl/pkg/builder/builder_test.go new file mode 100644 index 0000000000..cf665d063b --- /dev/null +++ b/cmd/experimental/kjobctl/pkg/builder/builder_test.go @@ -0,0 +1,277 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package builder + +import ( + "context" + "errors" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + batchv1 "k8s.io/api/batch/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + + "sigs.k8s.io/kueue/cmd/experimental/kjobctl/apis/v1alpha1" + "sigs.k8s.io/kueue/cmd/experimental/kjobctl/client-go/clientset/versioned/fake" + cmdtesting "sigs.k8s.io/kueue/cmd/experimental/kjobctl/pkg/cmd/testing" + "sigs.k8s.io/kueue/cmd/experimental/kjobctl/pkg/constants" +) + +func TestBuilder(t *testing.T) { + testCases := map[string]struct { + namespace string + profile string + mode v1alpha1.ApplicationProfileMode + kjobctlObjs []runtime.Object + wantObj runtime.Object + wantErr error + }{ + "shouldn't build job because no namespace specified": { + wantErr: noNamespaceSpecifiedErr, + }, + "shouldn't build job because no application profile specified": { + namespace: metav1.NamespaceDefault, + wantErr: noApplicationProfileSpecifiedErr, + }, + "shouldn't build job because application profile not found": { + namespace: metav1.NamespaceDefault, + profile: "profile", + mode: v1alpha1.JobMode, + wantErr: apierrors.NewNotFound(schema.GroupResource{Group: "kjobctl.x-k8s.io", Resource: "applicationprofiles"}, "profile"), + }, + "shouldn't build job because no application profile mode specified": { + namespace: metav1.NamespaceDefault, + profile: "profile", + kjobctlObjs: []runtime.Object{ + &v1alpha1.ApplicationProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "profile", + Namespace: metav1.NamespaceDefault, + }, + Spec: v1alpha1.ApplicationProfileSpec{ + SupportedModes: []v1alpha1.SupportedMode{{Name: v1alpha1.JobMode}}, + }, + }, + }, + wantErr: noApplicationProfileModeSpecifiedErr, + }, + "shouldn't build job because application profile mode not configured": { + namespace: metav1.NamespaceDefault, + profile: "profile", + mode: v1alpha1.JobMode, + kjobctlObjs: []runtime.Object{ + &v1alpha1.ApplicationProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "profile", + Namespace: metav1.NamespaceDefault, + }, + Spec: v1alpha1.ApplicationProfileSpec{ + SupportedModes: []v1alpha1.SupportedMode{{Name: v1alpha1.InteractiveMode}}, + }, + }, + }, + wantErr: applicationProfileModeNotConfiguredErr, + }, + "shouldn't build job because invalid application profile mode": { + namespace: metav1.NamespaceDefault, + profile: "profile", + mode: "Invalid", + kjobctlObjs: []runtime.Object{ + &v1alpha1.ApplicationProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "profile", + Namespace: metav1.NamespaceDefault, + }, + Spec: v1alpha1.ApplicationProfileSpec{ + SupportedModes: []v1alpha1.SupportedMode{{Name: v1alpha1.InteractiveMode}}, + }, + }, + }, + wantErr: invalidApplicationProfileModeErr, + }, + "shouldn't build job because command not specified with required flags": { + namespace: metav1.NamespaceDefault, + profile: "profile", + mode: v1alpha1.JobMode, + kjobctlObjs: []runtime.Object{ + &v1alpha1.ApplicationProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "profile", + Namespace: metav1.NamespaceDefault, + }, + Spec: v1alpha1.ApplicationProfileSpec{ + SupportedModes: []v1alpha1.SupportedMode{{ + Name: v1alpha1.JobMode, + RequiredFlags: []v1alpha1.Flag{v1alpha1.CmdFlag}, + }}, + }, + }, + }, + wantErr: noCommandSpecifiedErr, + }, + "shouldn't build job because parallelism not specified with required flags": { + namespace: metav1.NamespaceDefault, + profile: "profile", + mode: v1alpha1.JobMode, + kjobctlObjs: []runtime.Object{ + &v1alpha1.ApplicationProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "profile", + Namespace: metav1.NamespaceDefault, + }, + Spec: v1alpha1.ApplicationProfileSpec{ + SupportedModes: []v1alpha1.SupportedMode{{ + Name: v1alpha1.JobMode, + RequiredFlags: []v1alpha1.Flag{v1alpha1.ParallelismFlag}, + }}, + }, + }, + }, + wantErr: noParallelismSpecifiedErr, + }, + "shouldn't build job because completions not specified with required flags": { + namespace: metav1.NamespaceDefault, + profile: "profile", + mode: v1alpha1.JobMode, + kjobctlObjs: []runtime.Object{ + &v1alpha1.ApplicationProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "profile", + Namespace: metav1.NamespaceDefault, + }, + Spec: v1alpha1.ApplicationProfileSpec{ + SupportedModes: []v1alpha1.SupportedMode{{ + Name: v1alpha1.JobMode, + RequiredFlags: []v1alpha1.Flag{v1alpha1.CompletionsFlag}, + }}, + }, + }, + }, + wantErr: noCompletionsSpecifiedErr, + }, + "shouldn't build job because request not specified with required flags": { + namespace: metav1.NamespaceDefault, + profile: "profile", + mode: v1alpha1.JobMode, + kjobctlObjs: []runtime.Object{ + &v1alpha1.ApplicationProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "profile", + Namespace: metav1.NamespaceDefault, + }, + Spec: v1alpha1.ApplicationProfileSpec{ + SupportedModes: []v1alpha1.SupportedMode{{ + Name: v1alpha1.JobMode, + RequiredFlags: []v1alpha1.Flag{v1alpha1.RequestFlag}, + }}, + }, + }, + }, + wantErr: noRequestsSpecifiedErr, + }, + "shouldn't build job because local queue not specified with required flags": { + namespace: metav1.NamespaceDefault, + profile: "profile", + mode: v1alpha1.JobMode, + kjobctlObjs: []runtime.Object{ + &v1alpha1.ApplicationProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "profile", + Namespace: metav1.NamespaceDefault, + }, + Spec: v1alpha1.ApplicationProfileSpec{ + SupportedModes: []v1alpha1.SupportedMode{{ + Name: v1alpha1.JobMode, + RequiredFlags: []v1alpha1.Flag{v1alpha1.LocalQueueFlag}, + }}, + }, + }, + }, + wantErr: noLocalQueueSpecifiedErr, + }, + "should build job": { + namespace: metav1.NamespaceDefault, + profile: "profile", + mode: v1alpha1.JobMode, + kjobctlObjs: []runtime.Object{ + &v1alpha1.JobTemplate{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "job-template", + }, + }, + &v1alpha1.ApplicationProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "profile", + Namespace: metav1.NamespaceDefault, + }, + Spec: v1alpha1.ApplicationProfileSpec{ + SupportedModes: []v1alpha1.SupportedMode{{ + Name: v1alpha1.JobMode, + Template: "job-template", + }}, + }, + }, + }, + wantObj: &batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + Kind: "Job", + APIVersion: "batch/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "profile-", + Namespace: metav1.NamespaceDefault, + Labels: map[string]string{ + constants.ProfileLabel: "profile", + }, + }, + }, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tcg := cmdtesting.NewTestClientGetter(). + WithKjobctlClientset(fake.NewSimpleClientset(tc.kjobctlObjs...)) + gotObjs, gotErr := NewBuilder(tcg). + WithNamespace(tc.namespace). + WithProfileName(tc.profile). + WithModeName(tc.mode). + Do(ctx) + + var opts []cmp.Option + var statusError *apierrors.StatusError + if !errors.As(tc.wantErr, &statusError) { + opts = append(opts, cmpopts.EquateErrors()) + } + if diff := cmp.Diff(tc.wantErr, gotErr, opts...); diff != "" { + t.Errorf("Unexpected error (-want/+got)\n%s", diff) + return + } + + if diff := cmp.Diff(tc.wantObj, gotObjs); diff != "" { + t.Errorf("Objects after build (-want,+got):\n%s", diff) + } + }) + } +} diff --git a/cmd/experimental/kjobctl/pkg/builder/job_builder.go b/cmd/experimental/kjobctl/pkg/builder/job_builder.go new file mode 100644 index 0000000000..71039ab294 --- /dev/null +++ b/cmd/experimental/kjobctl/pkg/builder/job_builder.go @@ -0,0 +1,128 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package builder + +import ( + "context" + "slices" + + batchv1 "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + "sigs.k8s.io/kueue/cmd/experimental/kjobctl/pkg/constants" + kueueconstants "sigs.k8s.io/kueue/pkg/controller/constants" +) + +type jobBuilder struct { + *Builder +} + +var _ builder = (*jobBuilder)(nil) + +func (b *jobBuilder) build(ctx context.Context) (runtime.Object, error) { + template, err := b.kjobctlClientset.KjobctlV1alpha1().JobTemplates(b.profile.Namespace). + Get(ctx, string(b.mode.Template), metav1.GetOptions{}) + if err != nil { + return nil, err + } + + job := &batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + Kind: "Job", + APIVersion: "batch/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: b.profile.Namespace, + GenerateName: b.profile.Name + "-", + Labels: map[string]string{}, + }, + Spec: template.Template.Spec, + } + + if b.profile != nil { + job.Labels[constants.ProfileLabel] = b.profile.Name + } + + for _, vb := range b.volumeBundles { + for _, volume := range vb.Spec.Volumes { + index := slices.IndexFunc(job.Spec.Template.Spec.Volumes, func(v v1.Volume) bool { + return v.Name == volume.Name + }) + if index != -1 { + job.Spec.Template.Spec.Volumes[index] = volume + } else { + job.Spec.Template.Spec.Volumes = append(job.Spec.Template.Spec.Volumes, volume) + } + } + } + + for i := range job.Spec.Template.Spec.Containers { + container := &job.Spec.Template.Spec.Containers[i] + + if len(b.command) > 0 { + container.Command = b.command + } + + if len(b.requests) > 0 { + container.Resources.Requests = b.requests + } + + for _, vb := range b.volumeBundles { + for _, volumeMount := range vb.Spec.ContainerVolumeMounts { + index := slices.IndexFunc(container.VolumeMounts, func(vm v1.VolumeMount) bool { + return vm.Name == volumeMount.Name + }) + if index != -1 { + container.VolumeMounts[index] = volumeMount + } else { + container.VolumeMounts = append(container.VolumeMounts, volumeMount) + } + } + + for _, envVar := range vb.Spec.EnvVars { + index := slices.IndexFunc(container.Env, func(ev v1.EnvVar) bool { + return ev.Name == envVar.Name + }) + if index != -1 { + container.Env[index] = envVar + } else { + container.Env = append(container.Env, envVar) + } + } + } + } + + if b.parallelism != nil { + job.Spec.Parallelism = b.parallelism + } + + if b.completions != nil { + job.Spec.Completions = b.completions + } + + if len(b.localQueue) > 0 { + job.ObjectMeta.Labels[kueueconstants.QueueLabel] = b.localQueue + } + + return job, nil +} + +func newJobBuilder(b *Builder) *jobBuilder { + return &jobBuilder{Builder: b} +} diff --git a/cmd/experimental/kjobctl/pkg/builder/job_builder_test.go b/cmd/experimental/kjobctl/pkg/builder/job_builder_test.go new file mode 100644 index 0000000000..60174ff4ab --- /dev/null +++ b/cmd/experimental/kjobctl/pkg/builder/job_builder_test.go @@ -0,0 +1,389 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package builder + +import ( + "context" + "errors" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/utils/ptr" + + "sigs.k8s.io/kueue/cmd/experimental/kjobctl/apis/v1alpha1" + kjobctlfake "sigs.k8s.io/kueue/cmd/experimental/kjobctl/client-go/clientset/versioned/fake" + cmdtesting "sigs.k8s.io/kueue/cmd/experimental/kjobctl/pkg/cmd/testing" + "sigs.k8s.io/kueue/cmd/experimental/kjobctl/pkg/constants" + kueueconstants "sigs.k8s.io/kueue/pkg/controller/constants" +) + +func TestJobBuilder(t *testing.T) { + testJobTemplate := &v1alpha1.JobTemplate{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "job-template", + Labels: map[string]string{"foo": "bar"}, + }, + Template: v1alpha1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](1), + Completions: ptr.To[int32](1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "c1", + Command: []string{""}, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + }, + }, + Env: []corev1.EnvVar{ + {Name: "e1", Value: "default-value1"}, + {Name: "e2", Value: "default-value2"}, + }, + VolumeMounts: []corev1.VolumeMount{ + {Name: "vm1", MountPath: "/etc/default-config1"}, + {Name: "vm2", MountPath: "/etc/default-config2"}, + }, + }, + { + Name: "c2", + Command: []string{""}, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + }, + }, + Env: []corev1.EnvVar{ + {Name: "e1", Value: "default-value1"}, + {Name: "e2", Value: "default-value2"}, + }, + VolumeMounts: []corev1.VolumeMount{ + {Name: "vm1", MountPath: "/etc/default-config1"}, + {Name: "vm2", MountPath: "/etc/default-config2"}, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "v1", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "default-config1", + }, + }, + }, + }, + { + Name: "v2", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "default-config2", + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + testCases := map[string]struct { + namespace string + profile string + mode v1alpha1.ApplicationProfileMode + command []string + parallelism *int32 + completions *int32 + requests corev1.ResourceList + localQueue string + kjobctlObjs []runtime.Object + wantObj runtime.Object + wantErr error + }{ + "shouldn't build job because template not found": { + namespace: metav1.NamespaceDefault, + profile: "profile", + mode: v1alpha1.JobMode, + kjobctlObjs: []runtime.Object{ + &v1alpha1.ApplicationProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "profile", + Namespace: metav1.NamespaceDefault, + }, + Spec: v1alpha1.ApplicationProfileSpec{ + SupportedModes: []v1alpha1.SupportedMode{{ + Name: v1alpha1.JobMode, + Template: "job-template", + }}, + }, + }, + }, + wantErr: apierrors.NewNotFound(schema.GroupResource{Group: "kjobctl.x-k8s.io", Resource: "jobtemplates"}, "job-template"), + }, + "should build job without replacements": { + namespace: metav1.NamespaceDefault, + profile: "profile", + mode: v1alpha1.JobMode, + kjobctlObjs: []runtime.Object{ + testJobTemplate, + &v1alpha1.ApplicationProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "profile", + Namespace: metav1.NamespaceDefault, + }, + Spec: v1alpha1.ApplicationProfileSpec{ + SupportedModes: []v1alpha1.SupportedMode{{ + Name: v1alpha1.JobMode, + Template: "job-template", + }}, + }, + }, + }, + wantObj: &batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + Kind: "Job", + APIVersion: "batch/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "profile-", + Namespace: metav1.NamespaceDefault, + Labels: map[string]string{ + constants.ProfileLabel: "profile", + }, + }, + Spec: testJobTemplate.Template.Spec, + }, + }, + "should build job with replacements": { + namespace: metav1.NamespaceDefault, + profile: "profile", + mode: v1alpha1.JobMode, + command: []string{"sleep"}, + parallelism: ptr.To[int32](2), + completions: ptr.To[int32](3), + requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("3"), + }, + localQueue: "lq1", + kjobctlObjs: []runtime.Object{ + testJobTemplate, + &v1alpha1.ApplicationProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "profile", + Namespace: metav1.NamespaceDefault, + }, + Spec: v1alpha1.ApplicationProfileSpec{ + SupportedModes: []v1alpha1.SupportedMode{{ + Name: v1alpha1.JobMode, + Template: "job-template", + }}, + VolumeBundles: []v1alpha1.VolumeBundleReference{"vb1", "vb2"}, + }, + }, + &v1alpha1.VolumeBundle{ + ObjectMeta: metav1.ObjectMeta{ + Name: "vb1", + Namespace: metav1.NamespaceDefault, + }, + Spec: v1alpha1.VolumeBundleSpec{ + Volumes: []corev1.Volume{ + { + Name: "v1", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "config1", + }, + }, + }, + }, + { + Name: "v3", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "config3", + }, + }, + }, + }, + }, + ContainerVolumeMounts: []corev1.VolumeMount{ + {Name: "vm1", MountPath: "/etc/config1"}, + {Name: "vm3", MountPath: "/etc/config3"}, + }, + EnvVars: []corev1.EnvVar{ + {Name: "e1", Value: "value1"}, + {Name: "e3", Value: "value3"}, + }, + }, + }, + &v1alpha1.VolumeBundle{ + ObjectMeta: metav1.ObjectMeta{ + Name: "vb2", + Namespace: metav1.NamespaceDefault, + }, + Spec: v1alpha1.VolumeBundleSpec{}, + }, + }, + wantObj: &batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + Kind: "Job", + APIVersion: "batch/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "profile-", + Namespace: metav1.NamespaceDefault, + Labels: map[string]string{ + constants.ProfileLabel: "profile", + kueueconstants.QueueLabel: "lq1", + }, + }, + Spec: batchv1.JobSpec{ + Parallelism: ptr.To[int32](2), + Completions: ptr.To[int32](3), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "c1", + Command: []string{"sleep"}, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("3"), + }, + }, + Env: []corev1.EnvVar{ + {Name: "e1", Value: "value1"}, + {Name: "e2", Value: "default-value2"}, + {Name: "e3", Value: "value3"}, + }, + VolumeMounts: []corev1.VolumeMount{ + {Name: "vm1", MountPath: "/etc/config1"}, + {Name: "vm2", MountPath: "/etc/default-config2"}, + {Name: "vm3", MountPath: "/etc/config3"}, + }, + }, + { + Name: "c2", + Command: []string{"sleep"}, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("3"), + }, + }, + Env: []corev1.EnvVar{ + {Name: "e1", Value: "value1"}, + {Name: "e2", Value: "default-value2"}, + {Name: "e3", Value: "value3"}, + }, + VolumeMounts: []corev1.VolumeMount{ + {Name: "vm1", MountPath: "/etc/config1"}, + {Name: "vm2", MountPath: "/etc/default-config2"}, + {Name: "vm3", MountPath: "/etc/config3"}, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "v1", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "config1", + }, + }, + }, + }, + { + Name: "v2", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "default-config2", + }, + }, + }, + }, + { + Name: "v3", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "config3", + }, + }, + }, + }, + }, + }, + }, + }, + }, + wantErr: nil, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tcg := cmdtesting.NewTestClientGetter(). + WithKjobctlClientset(kjobctlfake.NewSimpleClientset(tc.kjobctlObjs...)) + + gotObjs, gotErr := NewBuilder(tcg). + WithNamespace(tc.namespace). + WithProfileName(tc.profile). + WithModeName(tc.mode). + WithCommand(tc.command). + WithParallelism(tc.parallelism). + WithCompletions(tc.completions). + WithRequests(tc.requests). + WithLocalQueue(tc.localQueue). + Do(ctx) + + var opts []cmp.Option + var statusError *apierrors.StatusError + if !errors.As(tc.wantErr, &statusError) { + opts = append(opts, cmpopts.EquateErrors()) + } + if diff := cmp.Diff(tc.wantErr, gotErr, opts...); diff != "" { + t.Errorf("Unexpected error (-want/+got)\n%s", diff) + return + } + + if diff := cmp.Diff(tc.wantObj, gotObjs); diff != "" { + t.Errorf("Objects after build (-want,+got):\n%s", diff) + } + }) + } +} diff --git a/cmd/experimental/kjobctl/pkg/cmd/completion/completion.go b/cmd/experimental/kjobctl/pkg/cmd/completion/completion.go index 8358b946fc..86d100067e 100644 --- a/cmd/experimental/kjobctl/pkg/cmd/completion/completion.go +++ b/cmd/experimental/kjobctl/pkg/cmd/completion/completion.go @@ -29,7 +29,7 @@ const completionLimit = 100 func NamespaceNameFunc(clientGetter util.ClientGetter) func(*cobra.Command, []string, string) ([]string, cobra.ShellCompDirective) { return func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { - clientSet, err := clientGetter.K8sClientSet() + clientSet, err := clientGetter.K8sClientset() if err != nil { return []string{}, cobra.ShellCompDirectiveError } diff --git a/cmd/experimental/kjobctl/pkg/cmd/util/client_getter.go b/cmd/experimental/kjobctl/pkg/cmd/util/client_getter.go index e7e56e55ff..803cbc5be1 100644 --- a/cmd/experimental/kjobctl/pkg/cmd/util/client_getter.go +++ b/cmd/experimental/kjobctl/pkg/cmd/util/client_getter.go @@ -26,7 +26,7 @@ import ( type ClientGetter interface { genericclioptions.RESTClientGetter - K8sClientSet() (k8s.Interface, error) + K8sClientset() (k8s.Interface, error) KjobctlClientset() (versioned.Interface, error) } @@ -42,7 +42,7 @@ func NewClientGetter(clientGetter genericclioptions.RESTClientGetter) ClientGett } } -func (cg *clientGetterImpl) K8sClientSet() (k8s.Interface, error) { +func (cg *clientGetterImpl) K8sClientset() (k8s.Interface, error) { config, err := cg.ToRESTConfig() if err != nil { return nil, err diff --git a/cmd/experimental/kjobctl/pkg/constants/constants.go b/cmd/experimental/kjobctl/pkg/constants/constants.go new file mode 100644 index 0000000000..8684994474 --- /dev/null +++ b/cmd/experimental/kjobctl/pkg/constants/constants.go @@ -0,0 +1,21 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package constants + +const ( + ProfileLabel = "kjobctl.x-k8s.io/profile" +)