Skip to content

Commit

Permalink
Merge pull request #205 from ahg-g/ahg-kueue
Browse files Browse the repository at this point in the history
Added an option to control processing jobs with no queue name set
  • Loading branch information
k8s-ci-robot authored Apr 11, 2022
2 parents 23aa676 + 0c2c7b0 commit 9e87558
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 59 deletions.
48 changes: 41 additions & 7 deletions pkg/controller/workload/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,45 @@ var (

// JobReconciler reconciles a Job object
type JobReconciler struct {
client client.Client
scheme *runtime.Scheme
record record.EventRecorder
client client.Client
scheme *runtime.Scheme
record record.EventRecorder
processJobsWithoutQueueName bool
}

func NewReconciler(scheme *runtime.Scheme, client client.Client, record record.EventRecorder) *JobReconciler {
type options struct {
processJobsWithoutQueueName bool
}

// Option configures the reconciler.
type Option func(*options)

// WithProcessJobsWithoutQueueName indicates if the controller should reconcile
// jobs that don't set the queue name annotation.
func WithProcessJobsWithoutQueueName(f bool) Option {
return func(o *options) {
o.processJobsWithoutQueueName = f
}
}

var defaultOptions = options{}

func NewReconciler(
scheme *runtime.Scheme,
client client.Client,
record record.EventRecorder,
opts ...Option) *JobReconciler {

options := defaultOptions
for _, opt := range opts {
opt(&options)
}

return &JobReconciler{
scheme: scheme,
client: client,
record: record,
scheme: scheme,
client: client,
record: record,
processJobsWithoutQueueName: options.processJobsWithoutQueueName,
}
}

Expand Down Expand Up @@ -104,6 +133,11 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R

log := ctrl.LoggerFrom(ctx).WithValues("job", klog.KObj(&job))
ctx = ctrl.LoggerInto(ctx, log)
if queueName(&job) == "" && !r.processJobsWithoutQueueName {
log.V(3).Info(fmt.Sprintf("%s annotation is not set, ignoring the job", constants.QueueAnnotation))
return ctrl.Result{}, nil
}

log.V(2).Info("Reconciling Job")

var childWorkloads kueue.WorkloadList
Expand Down
15 changes: 7 additions & 8 deletions test/integration/controller/core/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/onsi/gomega"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/manager"

"sigs.k8s.io/kueue/pkg/cache"
Expand All @@ -38,9 +37,8 @@ import (
var (
cfg *rest.Config
k8sClient client.Client
testEnv *envtest.Environment
ctx context.Context
cancel context.CancelFunc
fwk *framework.Framework
)

func TestAPIs(t *testing.T) {
Expand All @@ -52,14 +50,15 @@ func TestAPIs(t *testing.T) {
}

var _ = ginkgo.BeforeSuite(func() {
ctx, cancel = context.WithCancel(context.Background())
crdPath := filepath.Join("..", "..", "..", "..", "config", "crd", "bases")
cfg, k8sClient, testEnv = framework.BeforeSuite(ctx, crdPath, managerSetup)
fwk = &framework.Framework{
ManagerSetup: managerSetup,
CRDPath: filepath.Join("..", "..", "..", "..", "config", "crd", "bases"),
}
ctx, cfg, k8sClient = fwk.Setup()
})

var _ = ginkgo.AfterSuite(func() {
cancel()
framework.AfterSuite(testEnv)
fwk.Teardown()
})

func managerSetup(mgr manager.Manager) {
Expand Down
59 changes: 58 additions & 1 deletion test/integration/controller/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ limitations under the License.
package job

import (
"context"
"fmt"
"path/filepath"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
Expand All @@ -27,9 +29,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/core/v1alpha1"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/workload/job"
workloadjob "sigs.k8s.io/kueue/pkg/controller/workload/job"
"sigs.k8s.io/kueue/pkg/util/testing"
"sigs.k8s.io/kueue/pkg/workload"
Expand All @@ -46,10 +51,28 @@ const (
priorityValue = 10
)

var (
cfg *rest.Config
k8sClient client.Client
ctx context.Context
fwk *framework.Framework
crdPath = filepath.Join("..", "..", "..", "..", "config", "crd", "bases")
)

// +kubebuilder:docs-gen:collapse=Imports

var _ = ginkgo.Describe("Job controller", func() {
ginkgo.It("Should reconcile workload and job", func() {
ginkgo.BeforeEach(func() {
fwk = &framework.Framework{
ManagerSetup: managerSetup(job.WithProcessJobsWithoutQueueName(true)),
CRDPath: crdPath,
}
ctx, cfg, k8sClient = fwk.Setup()
})
ginkgo.AfterEach(func() {
fwk.Teardown()
})
ginkgo.It("Should reconcile workload and job for all jobs", func() {
ginkgo.By("checking the job gets suspended when created unsuspended")
priorityClass := testing.MakePriorityClass(priorityClassName).
PriorityValue(int32(priorityValue)).Obj()
Expand Down Expand Up @@ -221,3 +244,37 @@ var _ = ginkgo.Describe("Job controller", func() {
}, framework.Timeout, framework.Interval).Should(gomega.BeTrue())
})
})

var _ = ginkgo.Describe("Job controller for workloads with no queue set", func() {
ginkgo.BeforeEach(func() {
fwk = &framework.Framework{
ManagerSetup: managerSetup(),
CRDPath: crdPath,
}
ctx, cfg, k8sClient = fwk.Setup()
})
ginkgo.AfterEach(func() {
fwk.Teardown()
})
ginkgo.It("Should reconcile jobs only when queue is set", func() {
ginkgo.By("checking the workload is not created when queue name is not set")
job := testing.MakeJob(jobName, jobNamespace).Obj()
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())
lookupKey := types.NamespacedName{Name: jobName, Namespace: jobNamespace}
createdJob := &batchv1.Job{}
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())

createdWorkload := &kueue.Workload{}
gomega.Consistently(func() bool {
return apierrors.IsNotFound(k8sClient.Get(ctx, lookupKey, createdWorkload))
}, framework.ConsistentDuration, framework.Interval).Should(gomega.BeTrue())

ginkgo.By("checking the workload is created when queue name is set")
jobQueueName := "test-queue"
createdJob.Annotations = map[string]string{constants.QueueAnnotation: jobQueueName}
gomega.Expect(k8sClient.Update(ctx, createdJob)).Should(gomega.Succeed())
gomega.Eventually(func() error {
return k8sClient.Get(ctx, lookupKey, createdWorkload)
}, framework.Timeout, framework.Interval).Should(gomega.Succeed())
})
})
37 changes: 10 additions & 27 deletions test/integration/controller/job/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@ limitations under the License.
package job

import (
"context"
"path/filepath"
"testing"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/manager"

"sigs.k8s.io/kueue/pkg/constants"
Expand All @@ -34,14 +29,6 @@ import (
//+kubebuilder:scaffold:imports
)

var (
cfg *rest.Config
k8sClient client.Client
testEnv *envtest.Environment
ctx context.Context
cancel context.CancelFunc
)

func TestAPIs(t *testing.T) {
gomega.RegisterFailHandler(ginkgo.Fail)

Expand All @@ -50,18 +37,14 @@ func TestAPIs(t *testing.T) {
)
}

var _ = ginkgo.BeforeSuite(func() {
ctx, cancel = context.WithCancel(context.Background())
crdPath := filepath.Join("..", "..", "..", "..", "config", "crd", "bases")
cfg, k8sClient, testEnv = framework.BeforeSuite(ctx, crdPath, managerSetup)
})

var _ = ginkgo.AfterSuite(func() {
cancel()
framework.AfterSuite(testEnv)
})

func managerSetup(mgr manager.Manager) {
err := job.NewReconciler(mgr.GetScheme(), mgr.GetClient(), mgr.GetEventRecorderFor(constants.JobControllerName)).SetupWithManager(mgr)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
func managerSetup(opts ...job.Option) framework.ManagerSetup {
return func(mgr manager.Manager) {
reconciler := job.NewReconciler(
mgr.GetScheme(),
mgr.GetClient(),
mgr.GetEventRecorderFor(constants.JobControllerName),
opts...)
err := reconciler.SetupWithManager(mgr)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}
}
26 changes: 18 additions & 8 deletions test/integration/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,23 @@ import (

type ManagerSetup func(manager.Manager)

func BeforeSuite(ctx context.Context, crdPath string, mgrSetup ManagerSetup) (*rest.Config, client.Client, *envtest.Environment) {
type Framework struct {
CRDPath string
ManagerSetup ManagerSetup
testEnv *envtest.Environment
cancel context.CancelFunc
}

func (f *Framework) Setup() (context.Context, *rest.Config, client.Client) {
ctrl.SetLogger(zap.New(zap.WriteTo(ginkgo.GinkgoWriter), zap.UseDevMode(true), zap.Level(zapcore.Level(-3))))

ginkgo.By("bootstrapping test environment")
testEnv := &envtest.Environment{
CRDDirectoryPaths: []string{crdPath},
f.testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{f.CRDPath},
ErrorIfCRDPathMissing: true,
}

cfg, err := testEnv.Start()
cfg, err := f.testEnv.Start()
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
gomega.ExpectWithOffset(1, cfg).NotTo(gomega.BeNil())

Expand All @@ -69,20 +76,23 @@ func BeforeSuite(ctx context.Context, crdPath string, mgrSetup ManagerSetup) (*r
})
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred(), "failed to create manager")

mgrSetup(mgr)
f.ManagerSetup(mgr)
ctx, cancel := context.WithCancel(context.Background())
f.cancel = cancel

go func() {
defer ginkgo.GinkgoRecover()
err := mgr.Start(ctx)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred(), "failed to run manager")
}()

return cfg, k8sClient, testEnv
return ctx, cfg, k8sClient
}

func AfterSuite(testEnv *envtest.Environment) {
func (f *Framework) Teardown() {
ginkgo.By("tearing down the test environment")
err := testEnv.Stop()
f.cancel()
err := f.testEnv.Stop()
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
}

Expand Down
15 changes: 7 additions & 8 deletions test/integration/scheduler/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/onsi/gomega"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/kueue/pkg/scheduler"

Expand All @@ -41,9 +40,8 @@ import (
var (
cfg *rest.Config
k8sClient client.Client
testEnv *envtest.Environment
ctx context.Context
cancel context.CancelFunc
fwk *framework.Framework
)

func TestScheduler(t *testing.T) {
Expand All @@ -55,14 +53,15 @@ func TestScheduler(t *testing.T) {
}

var _ = ginkgo.BeforeSuite(func() {
ctx, cancel = context.WithCancel(context.Background())
crdPath := filepath.Join("..", "..", "..", "config", "crd", "bases")
cfg, k8sClient, testEnv = framework.BeforeSuite(ctx, crdPath, managerAndSchedulerSetup)
fwk = &framework.Framework{
ManagerSetup: managerAndSchedulerSetup,
CRDPath: filepath.Join("..", "..", "..", "config", "crd", "bases"),
}
ctx, cfg, k8sClient = fwk.Setup()
})

var _ = ginkgo.AfterSuite(func() {
cancel()
framework.AfterSuite(testEnv)
fwk.Teardown()
})

func managerAndSchedulerSetup(mgr manager.Manager) {
Expand Down

0 comments on commit 9e87558

Please sign in to comment.