Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added an option to control processing jobs with no queue name set #205

Merged
merged 1 commit into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 41 additions & 7 deletions pkg/controller/workload/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,45 @@ var (

// JobReconciler reconciles a Job object
type JobReconciler struct {
client client.Client
scheme *runtime.Scheme
record record.EventRecorder
client client.Client
scheme *runtime.Scheme
record record.EventRecorder
processJobsWithoutQueueName bool
}

func NewReconciler(scheme *runtime.Scheme, client client.Client, record record.EventRecorder) *JobReconciler {
type options struct {
processJobsWithoutQueueName bool
}

// Option configures the reconciler.
type Option func(*options)

// WithProcessJobsWithoutQueueName indicates if the controller should reconcile
// jobs that don't set the queue name annotation.
func WithProcessJobsWithoutQueueName(f bool) Option {
return func(o *options) {
o.processJobsWithoutQueueName = f
}
}

var defaultOptions = options{}

func NewReconciler(
scheme *runtime.Scheme,
client client.Client,
record record.EventRecorder,
opts ...Option) *JobReconciler {

options := defaultOptions
for _, opt := range opts {
opt(&options)
}

return &JobReconciler{
scheme: scheme,
client: client,
record: record,
scheme: scheme,
client: client,
record: record,
processJobsWithoutQueueName: options.processJobsWithoutQueueName,
}
}

Expand Down Expand Up @@ -104,6 +133,11 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R

log := ctrl.LoggerFrom(ctx).WithValues("job", klog.KObj(&job))
ctx = ctrl.LoggerInto(ctx, log)
if queueName(&job) == "" && !r.processJobsWithoutQueueName {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Should this be queueName(&job) || "" && !r.processJobsWithoutQueueName, if users don't set the queue name, we do not have a default queue name for them now IIRC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify again the question? this basically says if r.processJobsWithoutQueueName=false, then don't manage jobs that don't set the queue-name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A default queue name can be added, but it needs to be set via a webhook at job creation time if desired.

Copy link
Contributor

@kerthcet kerthcet Apr 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify again the question? this basically says if r.processJobsWithoutQueueName=false, then don't manage jobs that don't set the queue-name.

I mean whether we should only check r.processJobsWithoutQueueName here and if queueName not set, we will add a default queue name like default automatically in ConstructWorkloadFor. But this means we should have default queue and default cq in initialization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem we are trying to solve here is not that the queue is missing per se; see #63 (comment)

basically we don't want to miss up with potentially existing jobs when kueue is first installed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved discussion here #208

log.V(3).Info(fmt.Sprintf("%s annotation is not set, ignoring the job", constants.QueueAnnotation))
return ctrl.Result{}, nil
}

log.V(2).Info("Reconciling Job")

var childWorkloads kueue.WorkloadList
Expand Down
15 changes: 7 additions & 8 deletions test/integration/controller/core/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/onsi/gomega"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/manager"

"sigs.k8s.io/kueue/pkg/cache"
Expand All @@ -38,9 +37,8 @@ import (
var (
cfg *rest.Config
k8sClient client.Client
testEnv *envtest.Environment
ctx context.Context
cancel context.CancelFunc
fwk *framework.Framework
)

func TestAPIs(t *testing.T) {
Expand All @@ -52,14 +50,15 @@ func TestAPIs(t *testing.T) {
}

var _ = ginkgo.BeforeSuite(func() {
ctx, cancel = context.WithCancel(context.Background())
crdPath := filepath.Join("..", "..", "..", "..", "config", "crd", "bases")
cfg, k8sClient, testEnv = framework.BeforeSuite(ctx, crdPath, managerSetup)
fwk = &framework.Framework{
ManagerSetup: managerSetup,
CRDPath: filepath.Join("..", "..", "..", "..", "config", "crd", "bases"),
}
ctx, cfg, k8sClient = fwk.Setup()
})

var _ = ginkgo.AfterSuite(func() {
cancel()
framework.AfterSuite(testEnv)
fwk.Teardown()
})

func managerSetup(mgr manager.Manager) {
Expand Down
59 changes: 58 additions & 1 deletion test/integration/controller/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ limitations under the License.
package job

import (
"context"
"fmt"
"path/filepath"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
Expand All @@ -27,9 +29,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/core/v1alpha1"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/workload/job"
workloadjob "sigs.k8s.io/kueue/pkg/controller/workload/job"
"sigs.k8s.io/kueue/pkg/util/testing"
"sigs.k8s.io/kueue/pkg/workload"
Expand All @@ -46,10 +51,28 @@ const (
priorityValue = 10
)

var (
cfg *rest.Config
k8sClient client.Client
ctx context.Context
fwk *framework.Framework
crdPath = filepath.Join("..", "..", "..", "..", "config", "crd", "bases")
)

// +kubebuilder:docs-gen:collapse=Imports

var _ = ginkgo.Describe("Job controller", func() {
ginkgo.It("Should reconcile workload and job", func() {
ginkgo.BeforeEach(func() {
fwk = &framework.Framework{
ManagerSetup: managerSetup(job.WithProcessJobsWithoutQueueName(true)),
CRDPath: crdPath,
}
ctx, cfg, k8sClient = fwk.Setup()
})
ginkgo.AfterEach(func() {
fwk.Teardown()
})
ginkgo.It("Should reconcile workload and job for all jobs", func() {
ginkgo.By("checking the job gets suspended when created unsuspended")
priorityClass := testing.MakePriorityClass(priorityClassName).
PriorityValue(int32(priorityValue)).Obj()
Expand Down Expand Up @@ -221,3 +244,37 @@ var _ = ginkgo.Describe("Job controller", func() {
}, framework.Timeout, framework.Interval).Should(gomega.BeTrue())
})
})

var _ = ginkgo.Describe("Job controller for workloads with no queue set", func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would use a single ginkgo.Describe with two ginkgo.Context, because you are describing the same controller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the indentation is too much for me :)

ginkgo.BeforeEach(func() {
fwk = &framework.Framework{
ManagerSetup: managerSetup(),
CRDPath: crdPath,
}
ctx, cfg, k8sClient = fwk.Setup()
})
ginkgo.AfterEach(func() {
fwk.Teardown()
})
ginkgo.It("Should reconcile jobs only when queue is set", func() {
ginkgo.By("checking the workload is not created when queue name is not set")
job := testing.MakeJob(jobName, jobNamespace).Obj()
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())
lookupKey := types.NamespacedName{Name: jobName, Namespace: jobNamespace}
createdJob := &batchv1.Job{}
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())

createdWorkload := &kueue.Workload{}
gomega.Consistently(func() bool {
return apierrors.IsNotFound(k8sClient.Get(ctx, lookupKey, createdWorkload))
}, framework.ConsistentDuration, framework.Interval).Should(gomega.BeTrue())

ginkgo.By("checking the workload is created when queue name is set")
jobQueueName := "test-queue"
createdJob.Annotations = map[string]string{constants.QueueAnnotation: jobQueueName}
gomega.Expect(k8sClient.Update(ctx, createdJob)).Should(gomega.Succeed())
gomega.Eventually(func() error {
return k8sClient.Get(ctx, lookupKey, createdWorkload)
}, framework.Timeout, framework.Interval).Should(gomega.Succeed())
})
})
37 changes: 10 additions & 27 deletions test/integration/controller/job/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@ limitations under the License.
package job

import (
"context"
"path/filepath"
"testing"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/manager"

"sigs.k8s.io/kueue/pkg/constants"
Expand All @@ -34,14 +29,6 @@ import (
//+kubebuilder:scaffold:imports
)

var (
cfg *rest.Config
k8sClient client.Client
testEnv *envtest.Environment
ctx context.Context
cancel context.CancelFunc
)

func TestAPIs(t *testing.T) {
gomega.RegisterFailHandler(ginkgo.Fail)

Expand All @@ -50,18 +37,14 @@ func TestAPIs(t *testing.T) {
)
}

var _ = ginkgo.BeforeSuite(func() {
ctx, cancel = context.WithCancel(context.Background())
crdPath := filepath.Join("..", "..", "..", "..", "config", "crd", "bases")
cfg, k8sClient, testEnv = framework.BeforeSuite(ctx, crdPath, managerSetup)
})

var _ = ginkgo.AfterSuite(func() {
cancel()
framework.AfterSuite(testEnv)
})

func managerSetup(mgr manager.Manager) {
err := job.NewReconciler(mgr.GetScheme(), mgr.GetClient(), mgr.GetEventRecorderFor(constants.JobControllerName)).SetupWithManager(mgr)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
func managerSetup(opts ...job.Option) framework.ManagerSetup {
return func(mgr manager.Manager) {
reconciler := job.NewReconciler(
mgr.GetScheme(),
mgr.GetClient(),
mgr.GetEventRecorderFor(constants.JobControllerName),
opts...)
err := reconciler.SetupWithManager(mgr)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}
}
26 changes: 18 additions & 8 deletions test/integration/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,23 @@ import (

type ManagerSetup func(manager.Manager)

func BeforeSuite(ctx context.Context, crdPath string, mgrSetup ManagerSetup) (*rest.Config, client.Client, *envtest.Environment) {
type Framework struct {
CRDPath string
ManagerSetup ManagerSetup
testEnv *envtest.Environment
cancel context.CancelFunc
}

func (f *Framework) Setup() (context.Context, *rest.Config, client.Client) {
ctrl.SetLogger(zap.New(zap.WriteTo(ginkgo.GinkgoWriter), zap.UseDevMode(true), zap.Level(zapcore.Level(-3))))

ginkgo.By("bootstrapping test environment")
testEnv := &envtest.Environment{
CRDDirectoryPaths: []string{crdPath},
f.testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{f.CRDPath},
ErrorIfCRDPathMissing: true,
}

cfg, err := testEnv.Start()
cfg, err := f.testEnv.Start()
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
gomega.ExpectWithOffset(1, cfg).NotTo(gomega.BeNil())

Expand All @@ -69,20 +76,23 @@ func BeforeSuite(ctx context.Context, crdPath string, mgrSetup ManagerSetup) (*r
})
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred(), "failed to create manager")

mgrSetup(mgr)
f.ManagerSetup(mgr)
ctx, cancel := context.WithCancel(context.Background())
f.cancel = cancel

go func() {
defer ginkgo.GinkgoRecover()
err := mgr.Start(ctx)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred(), "failed to run manager")
}()

return cfg, k8sClient, testEnv
return ctx, cfg, k8sClient
}

func AfterSuite(testEnv *envtest.Environment) {
func (f *Framework) Teardown() {
ginkgo.By("tearing down the test environment")
err := testEnv.Stop()
f.cancel()
err := f.testEnv.Stop()
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
}

Expand Down
15 changes: 7 additions & 8 deletions test/integration/scheduler/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/onsi/gomega"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/kueue/pkg/scheduler"

Expand All @@ -41,9 +40,8 @@ import (
var (
cfg *rest.Config
k8sClient client.Client
testEnv *envtest.Environment
ctx context.Context
cancel context.CancelFunc
fwk *framework.Framework
)

func TestScheduler(t *testing.T) {
Expand All @@ -55,14 +53,15 @@ func TestScheduler(t *testing.T) {
}

var _ = ginkgo.BeforeSuite(func() {
ctx, cancel = context.WithCancel(context.Background())
crdPath := filepath.Join("..", "..", "..", "config", "crd", "bases")
cfg, k8sClient, testEnv = framework.BeforeSuite(ctx, crdPath, managerAndSchedulerSetup)
fwk = &framework.Framework{
ManagerSetup: managerAndSchedulerSetup,
CRDPath: filepath.Join("..", "..", "..", "config", "crd", "bases"),
}
ctx, cfg, k8sClient = fwk.Setup()
})

var _ = ginkgo.AfterSuite(func() {
cancel()
framework.AfterSuite(testEnv)
fwk.Teardown()
})

func managerAndSchedulerSetup(mgr manager.Manager) {
Expand Down