From 0c2c7b08696445d74ea0fb63aeb63a77593d4d72 Mon Sep 17 00:00:00 2001 From: Abdullah Gharaibeh Date: Mon, 11 Apr 2022 14:17:48 -0400 Subject: [PATCH] Added a flag to control whether or not to reconcile jobs with no queue name set; by default those jobs are not processed --- pkg/controller/workload/job/job_controller.go | 48 ++++++++++++--- .../integration/controller/core/suite_test.go | 15 +++-- .../controller/job/job_controller_test.go | 59 ++++++++++++++++++- test/integration/controller/job/suite_test.go | 37 ++++-------- test/integration/framework/framework.go | 26 +++++--- test/integration/scheduler/suite_test.go | 15 +++-- 6 files changed, 141 insertions(+), 59 deletions(-) diff --git a/pkg/controller/workload/job/job_controller.go b/pkg/controller/workload/job/job_controller.go index f8791e3090..786a3da4f3 100644 --- a/pkg/controller/workload/job/job_controller.go +++ b/pkg/controller/workload/job/job_controller.go @@ -46,16 +46,45 @@ var ( // JobReconciler reconciles a Job object type JobReconciler struct { - client client.Client - scheme *runtime.Scheme - record record.EventRecorder + client client.Client + scheme *runtime.Scheme + record record.EventRecorder + processJobsWithoutQueueName bool } -func NewReconciler(scheme *runtime.Scheme, client client.Client, record record.EventRecorder) *JobReconciler { +type options struct { + processJobsWithoutQueueName bool +} + +// Option configures the reconciler. +type Option func(*options) + +// WithProcessJobsWithoutQueueName indicates if the controller should reconcile +// jobs that don't set the queue name annotation. +func WithProcessJobsWithoutQueueName(f bool) Option { + return func(o *options) { + o.processJobsWithoutQueueName = f + } +} + +var defaultOptions = options{} + +func NewReconciler( + scheme *runtime.Scheme, + client client.Client, + record record.EventRecorder, + opts ...Option) *JobReconciler { + + options := defaultOptions + for _, opt := range opts { + opt(&options) + } + return &JobReconciler{ - scheme: scheme, - client: client, - record: record, + scheme: scheme, + client: client, + record: record, + processJobsWithoutQueueName: options.processJobsWithoutQueueName, } } @@ -104,6 +133,11 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R log := ctrl.LoggerFrom(ctx).WithValues("job", klog.KObj(&job)) ctx = ctrl.LoggerInto(ctx, log) + if queueName(&job) == "" && !r.processJobsWithoutQueueName { + log.V(3).Info(fmt.Sprintf("%s annotation is not set, ignoring the job", constants.QueueAnnotation)) + return ctrl.Result{}, nil + } + log.V(2).Info("Reconciling Job") var childWorkloads kueue.WorkloadList diff --git a/test/integration/controller/core/suite_test.go b/test/integration/controller/core/suite_test.go index e0251fe46c..37da99586b 100644 --- a/test/integration/controller/core/suite_test.go +++ b/test/integration/controller/core/suite_test.go @@ -25,7 +25,6 @@ import ( "github.com/onsi/gomega" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/kueue/pkg/cache" @@ -38,9 +37,8 @@ import ( var ( cfg *rest.Config k8sClient client.Client - testEnv *envtest.Environment ctx context.Context - cancel context.CancelFunc + fwk *framework.Framework ) func TestAPIs(t *testing.T) { @@ -52,14 +50,15 @@ func TestAPIs(t *testing.T) { } var _ = ginkgo.BeforeSuite(func() { - ctx, cancel = context.WithCancel(context.Background()) - crdPath := filepath.Join("..", "..", "..", "..", "config", "crd", "bases") - cfg, k8sClient, testEnv = framework.BeforeSuite(ctx, crdPath, managerSetup) + fwk = &framework.Framework{ + ManagerSetup: managerSetup, + CRDPath: filepath.Join("..", "..", "..", "..", "config", "crd", "bases"), + } + ctx, cfg, k8sClient = fwk.Setup() }) var _ = ginkgo.AfterSuite(func() { - cancel() - framework.AfterSuite(testEnv) + fwk.Teardown() }) func managerSetup(mgr manager.Manager) { diff --git a/test/integration/controller/job/job_controller_test.go b/test/integration/controller/job/job_controller_test.go index b7a33e0c23..c1ada5c202 100644 --- a/test/integration/controller/job/job_controller_test.go +++ b/test/integration/controller/job/job_controller_test.go @@ -17,7 +17,9 @@ limitations under the License. package job import ( + "context" "fmt" + "path/filepath" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" @@ -27,9 +29,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" kueue "sigs.k8s.io/kueue/apis/core/v1alpha1" "sigs.k8s.io/kueue/pkg/constants" + "sigs.k8s.io/kueue/pkg/controller/workload/job" workloadjob "sigs.k8s.io/kueue/pkg/controller/workload/job" "sigs.k8s.io/kueue/pkg/util/testing" "sigs.k8s.io/kueue/pkg/workload" @@ -46,10 +51,28 @@ const ( priorityValue = 10 ) +var ( + cfg *rest.Config + k8sClient client.Client + ctx context.Context + fwk *framework.Framework + crdPath = filepath.Join("..", "..", "..", "..", "config", "crd", "bases") +) + // +kubebuilder:docs-gen:collapse=Imports var _ = ginkgo.Describe("Job controller", func() { - ginkgo.It("Should reconcile workload and job", func() { + ginkgo.BeforeEach(func() { + fwk = &framework.Framework{ + ManagerSetup: managerSetup(job.WithProcessJobsWithoutQueueName(true)), + CRDPath: crdPath, + } + ctx, cfg, k8sClient = fwk.Setup() + }) + ginkgo.AfterEach(func() { + fwk.Teardown() + }) + ginkgo.It("Should reconcile workload and job for all jobs", func() { ginkgo.By("checking the job gets suspended when created unsuspended") priorityClass := testing.MakePriorityClass(priorityClassName). PriorityValue(int32(priorityValue)).Obj() @@ -221,3 +244,37 @@ var _ = ginkgo.Describe("Job controller", func() { }, framework.Timeout, framework.Interval).Should(gomega.BeTrue()) }) }) + +var _ = ginkgo.Describe("Job controller for workloads with no queue set", func() { + ginkgo.BeforeEach(func() { + fwk = &framework.Framework{ + ManagerSetup: managerSetup(), + CRDPath: crdPath, + } + ctx, cfg, k8sClient = fwk.Setup() + }) + ginkgo.AfterEach(func() { + fwk.Teardown() + }) + ginkgo.It("Should reconcile jobs only when queue is set", func() { + ginkgo.By("checking the workload is not created when queue name is not set") + job := testing.MakeJob(jobName, jobNamespace).Obj() + gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed()) + lookupKey := types.NamespacedName{Name: jobName, Namespace: jobNamespace} + createdJob := &batchv1.Job{} + gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed()) + + createdWorkload := &kueue.Workload{} + gomega.Consistently(func() bool { + return apierrors.IsNotFound(k8sClient.Get(ctx, lookupKey, createdWorkload)) + }, framework.ConsistentDuration, framework.Interval).Should(gomega.BeTrue()) + + ginkgo.By("checking the workload is created when queue name is set") + jobQueueName := "test-queue" + createdJob.Annotations = map[string]string{constants.QueueAnnotation: jobQueueName} + gomega.Expect(k8sClient.Update(ctx, createdJob)).Should(gomega.Succeed()) + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lookupKey, createdWorkload) + }, framework.Timeout, framework.Interval).Should(gomega.Succeed()) + }) +}) diff --git a/test/integration/controller/job/suite_test.go b/test/integration/controller/job/suite_test.go index 83092a3af2..1a234ee590 100644 --- a/test/integration/controller/job/suite_test.go +++ b/test/integration/controller/job/suite_test.go @@ -17,15 +17,10 @@ limitations under the License. package job import ( - "context" - "path/filepath" "testing" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" - "k8s.io/client-go/rest" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/kueue/pkg/constants" @@ -34,14 +29,6 @@ import ( //+kubebuilder:scaffold:imports ) -var ( - cfg *rest.Config - k8sClient client.Client - testEnv *envtest.Environment - ctx context.Context - cancel context.CancelFunc -) - func TestAPIs(t *testing.T) { gomega.RegisterFailHandler(ginkgo.Fail) @@ -50,18 +37,14 @@ func TestAPIs(t *testing.T) { ) } -var _ = ginkgo.BeforeSuite(func() { - ctx, cancel = context.WithCancel(context.Background()) - crdPath := filepath.Join("..", "..", "..", "..", "config", "crd", "bases") - cfg, k8sClient, testEnv = framework.BeforeSuite(ctx, crdPath, managerSetup) -}) - -var _ = ginkgo.AfterSuite(func() { - cancel() - framework.AfterSuite(testEnv) -}) - -func managerSetup(mgr manager.Manager) { - err := job.NewReconciler(mgr.GetScheme(), mgr.GetClient(), mgr.GetEventRecorderFor(constants.JobControllerName)).SetupWithManager(mgr) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) +func managerSetup(opts ...job.Option) framework.ManagerSetup { + return func(mgr manager.Manager) { + reconciler := job.NewReconciler( + mgr.GetScheme(), + mgr.GetClient(), + mgr.GetEventRecorderFor(constants.JobControllerName), + opts...) + err := reconciler.SetupWithManager(mgr) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + } } diff --git a/test/integration/framework/framework.go b/test/integration/framework/framework.go index 1c529c8b98..bc8ccbdcc5 100644 --- a/test/integration/framework/framework.go +++ b/test/integration/framework/framework.go @@ -41,16 +41,23 @@ import ( type ManagerSetup func(manager.Manager) -func BeforeSuite(ctx context.Context, crdPath string, mgrSetup ManagerSetup) (*rest.Config, client.Client, *envtest.Environment) { +type Framework struct { + CRDPath string + ManagerSetup ManagerSetup + testEnv *envtest.Environment + cancel context.CancelFunc +} + +func (f *Framework) Setup() (context.Context, *rest.Config, client.Client) { ctrl.SetLogger(zap.New(zap.WriteTo(ginkgo.GinkgoWriter), zap.UseDevMode(true), zap.Level(zapcore.Level(-3)))) ginkgo.By("bootstrapping test environment") - testEnv := &envtest.Environment{ - CRDDirectoryPaths: []string{crdPath}, + f.testEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{f.CRDPath}, ErrorIfCRDPathMissing: true, } - cfg, err := testEnv.Start() + cfg, err := f.testEnv.Start() gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) gomega.ExpectWithOffset(1, cfg).NotTo(gomega.BeNil()) @@ -69,7 +76,9 @@ func BeforeSuite(ctx context.Context, crdPath string, mgrSetup ManagerSetup) (*r }) gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred(), "failed to create manager") - mgrSetup(mgr) + f.ManagerSetup(mgr) + ctx, cancel := context.WithCancel(context.Background()) + f.cancel = cancel go func() { defer ginkgo.GinkgoRecover() @@ -77,12 +86,13 @@ func BeforeSuite(ctx context.Context, crdPath string, mgrSetup ManagerSetup) (*r gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred(), "failed to run manager") }() - return cfg, k8sClient, testEnv + return ctx, cfg, k8sClient } -func AfterSuite(testEnv *envtest.Environment) { +func (f *Framework) Teardown() { ginkgo.By("tearing down the test environment") - err := testEnv.Stop() + f.cancel() + err := f.testEnv.Stop() gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) } diff --git a/test/integration/scheduler/suite_test.go b/test/integration/scheduler/suite_test.go index c364bfba12..a0db2b86fc 100644 --- a/test/integration/scheduler/suite_test.go +++ b/test/integration/scheduler/suite_test.go @@ -25,7 +25,6 @@ import ( "github.com/onsi/gomega" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/kueue/pkg/scheduler" @@ -41,9 +40,8 @@ import ( var ( cfg *rest.Config k8sClient client.Client - testEnv *envtest.Environment ctx context.Context - cancel context.CancelFunc + fwk *framework.Framework ) func TestScheduler(t *testing.T) { @@ -55,14 +53,15 @@ func TestScheduler(t *testing.T) { } var _ = ginkgo.BeforeSuite(func() { - ctx, cancel = context.WithCancel(context.Background()) - crdPath := filepath.Join("..", "..", "..", "config", "crd", "bases") - cfg, k8sClient, testEnv = framework.BeforeSuite(ctx, crdPath, managerAndSchedulerSetup) + fwk = &framework.Framework{ + ManagerSetup: managerAndSchedulerSetup, + CRDPath: filepath.Join("..", "..", "..", "config", "crd", "bases"), + } + ctx, cfg, k8sClient = fwk.Setup() }) var _ = ginkgo.AfterSuite(func() { - cancel() - framework.AfterSuite(testEnv) + fwk.Teardown() }) func managerAndSchedulerSetup(mgr manager.Manager) {