From b7fc93f5ba42ebbcd54345702be24c3f76d7ea26 Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Mon, 29 Jan 2024 14:11:56 +0100 Subject: [PATCH] Use leader election defaulting to set up controllers HA (#1652) * Use leader election defaulting to set up controllers HA * Add configuration defaulting to integration tests --- .../core/leader_aware_reconciler.go | 27 ++++++------------- .../integration/controller/core/suite_test.go | 2 ++ .../controller/jobs/job/suite_test.go | 5 +++- .../controller/jobs/jobset/suite_test.go | 5 +++- .../controller/jobs/mpijob/suite_test.go | 5 +++- .../controller/jobs/mxjob/suite_test.go | 5 +++- .../controller/jobs/paddlejob/suite_test.go | 5 +++- .../controller/jobs/pod/suite_test.go | 10 +++++-- .../controller/jobs/pytorchjob/suite_test.go | 5 +++- .../controller/jobs/rayjob/suite_test.go | 5 +++- .../controller/jobs/tfjob/suite_test.go | 5 +++- .../controller/jobs/xgboostjob/suite_test.go | 5 +++- test/integration/framework/framework.go | 6 ++++- test/integration/multikueue/suite_test.go | 5 +++- .../scheduler/podsready/suite_test.go | 5 ++-- test/integration/scheduler/suite_test.go | 5 +++- test/integration/webhook/suite_test.go | 6 ++++- 17 files changed, 75 insertions(+), 36 deletions(-) diff --git a/pkg/controller/core/leader_aware_reconciler.go b/pkg/controller/core/leader_aware_reconciler.go index 3d54aad04e..7853ad66c4 100644 --- a/pkg/controller/core/leader_aware_reconciler.go +++ b/pkg/controller/core/leader_aware_reconciler.go @@ -20,7 +20,6 @@ import ( "context" "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -29,11 +28,6 @@ import ( config "sigs.k8s.io/kueue/apis/config/v1beta1" ) -// defaultRequeueDuration defaults the duration used by non-leading replicas -// to requeue events, so no events are missed over the period it takes for -// leader election to fail over a new replica. -const defaultRequeueDuration = 15 * time.Second - // WithLeadingManager returns a decorating reconcile.Reconciler that discards reconciliation requests // for the controllers that are started with the controller.Options.NeedLeaderElection // option set to false in non-leading replicas. @@ -54,28 +48,23 @@ func WithLeadingManager(mgr ctrl.Manager, reconciler reconcile.Reconciler, obj c return reconciler } - // Default to the recommended lease duration, that's used for core components - requeueDuration := defaultRequeueDuration - // Otherwise used the configured lease duration for the manager - zero := metav1.Duration{} - if duration := cfg.LeaderElection.LeaseDuration; duration != zero { - requeueDuration = duration.Duration - } - return &leaderAwareReconciler{ elected: mgr.Elected(), client: mgr.GetClient(), delegate: reconciler, object: obj, - requeueDuration: requeueDuration, + requeueDuration: cfg.LeaderElection.LeaseDuration.Duration, } } type leaderAwareReconciler struct { - elected <-chan struct{} - client client.Client - delegate reconcile.Reconciler - object client.Object + elected <-chan struct{} + client client.Client + delegate reconcile.Reconciler + object client.Object + // the duration used by non-leading replicas to requeue events, + // so no events are missed over the period it takes for + // leader election to fail over a new replica. requeueDuration time.Duration } diff --git a/test/integration/controller/core/suite_test.go b/test/integration/controller/core/suite_test.go index 4a15689f3e..8b14e3dc3b 100644 --- a/test/integration/controller/core/suite_test.go +++ b/test/integration/controller/core/suite_test.go @@ -62,6 +62,8 @@ func managerSetup(mgr manager.Manager, ctx context.Context) { gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook) controllersCfg := &config.Configuration{} + mgr.GetScheme().Default(controllersCfg) + controllersCfg.Metrics.EnableClusterQueueResources = true controllersCfg.QueueVisibility = &config.QueueVisibility{ UpdateIntervalSeconds: 2, diff --git a/test/integration/controller/jobs/job/suite_test.go b/test/integration/controller/jobs/job/suite_test.go index 2f23adbff2..62d5854a48 100644 --- a/test/integration/controller/jobs/job/suite_test.go +++ b/test/integration/controller/jobs/job/suite_test.go @@ -82,7 +82,10 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu cCache := cache.New(mgr.GetClient()) queues := queue.NewManager(mgr.GetClient(), cCache) - failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{}) + configuration := &config.Configuration{} + mgr.GetScheme().Default(configuration) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) err = job.SetupIndexes(ctx, mgr.GetFieldIndexer()) diff --git a/test/integration/controller/jobs/jobset/suite_test.go b/test/integration/controller/jobs/jobset/suite_test.go index f55fd3962e..90d76c56c5 100644 --- a/test/integration/controller/jobs/jobset/suite_test.go +++ b/test/integration/controller/jobs/jobset/suite_test.go @@ -81,7 +81,10 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu cCache := cache.New(mgr.GetClient()) queues := queue.NewManager(mgr.GetClient(), cCache) - failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{}) + configuration := &config.Configuration{} + mgr.GetScheme().Default(configuration) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) err = jobset.SetupIndexes(ctx, mgr.GetFieldIndexer()) diff --git a/test/integration/controller/jobs/mpijob/suite_test.go b/test/integration/controller/jobs/mpijob/suite_test.go index 83749edf04..c40e659361 100644 --- a/test/integration/controller/jobs/mpijob/suite_test.go +++ b/test/integration/controller/jobs/mpijob/suite_test.go @@ -95,7 +95,10 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu cCache := cache.New(mgr.GetClient()) queues := queue.NewManager(mgr.GetClient(), cCache) - failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{}) + configuration := &config.Configuration{} + mgr.GetScheme().Default(configuration) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) err = mpijob.SetupIndexes(ctx, mgr.GetFieldIndexer()) diff --git a/test/integration/controller/jobs/mxjob/suite_test.go b/test/integration/controller/jobs/mxjob/suite_test.go index 6f16f59d02..07834dc742 100644 --- a/test/integration/controller/jobs/mxjob/suite_test.go +++ b/test/integration/controller/jobs/mxjob/suite_test.go @@ -79,7 +79,10 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu cCache := cache.New(mgr.GetClient()) queues := queue.NewManager(mgr.GetClient(), cCache) - failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{}) + configuration := &config.Configuration{} + mgr.GetScheme().Default(configuration) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) err = mxjob.SetupIndexes(ctx, mgr.GetFieldIndexer()) diff --git a/test/integration/controller/jobs/paddlejob/suite_test.go b/test/integration/controller/jobs/paddlejob/suite_test.go index 3b7c8a9c54..2d135b9b65 100644 --- a/test/integration/controller/jobs/paddlejob/suite_test.go +++ b/test/integration/controller/jobs/paddlejob/suite_test.go @@ -79,7 +79,10 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu cCache := cache.New(mgr.GetClient()) queues := queue.NewManager(mgr.GetClient(), cCache) - failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{}) + configuration := &config.Configuration{} + mgr.GetScheme().Default(configuration) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) err = paddlejob.SetupIndexes(ctx, mgr.GetFieldIndexer()) diff --git a/test/integration/controller/jobs/pod/suite_test.go b/test/integration/controller/jobs/pod/suite_test.go index c95df97c4a..d68d5d93b2 100644 --- a/test/integration/controller/jobs/pod/suite_test.go +++ b/test/integration/controller/jobs/pod/suite_test.go @@ -90,7 +90,10 @@ func managerSetup(opts ...jobframework.Option) framework.ManagerSetup { cCache := cache.New(mgr.GetClient()) queues := queue.NewManager(mgr.GetClient(), cCache) - failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{}) + configuration := &config.Configuration{} + mgr.GetScheme().Default(configuration) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) err = pod.SetupWebhook(mgr, opts...) @@ -118,7 +121,10 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu cCache := cache.New(mgr.GetClient()) queues := queue.NewManager(mgr.GetClient(), cCache) - failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{}) + configuration := &config.Configuration{} + mgr.GetScheme().Default(configuration) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) err = pod.SetupWebhook(mgr, opts...) diff --git a/test/integration/controller/jobs/pytorchjob/suite_test.go b/test/integration/controller/jobs/pytorchjob/suite_test.go index 7d0f6c47d3..623e670a71 100644 --- a/test/integration/controller/jobs/pytorchjob/suite_test.go +++ b/test/integration/controller/jobs/pytorchjob/suite_test.go @@ -80,7 +80,10 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu cCache := cache.New(mgr.GetClient()) queues := queue.NewManager(mgr.GetClient(), cCache) - failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{}) + configuration := &config.Configuration{} + mgr.GetScheme().Default(configuration) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) err = pytorchjob.SetupIndexes(ctx, mgr.GetFieldIndexer()) diff --git a/test/integration/controller/jobs/rayjob/suite_test.go b/test/integration/controller/jobs/rayjob/suite_test.go index 73629351ce..44020fe9c9 100644 --- a/test/integration/controller/jobs/rayjob/suite_test.go +++ b/test/integration/controller/jobs/rayjob/suite_test.go @@ -81,7 +81,10 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu cCache := cache.New(mgr.GetClient()) queues := queue.NewManager(mgr.GetClient(), cCache) - failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{}) + configuration := &config.Configuration{} + mgr.GetScheme().Default(configuration) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) err = rayjob.SetupIndexes(ctx, mgr.GetFieldIndexer()) diff --git a/test/integration/controller/jobs/tfjob/suite_test.go b/test/integration/controller/jobs/tfjob/suite_test.go index 1850c507e2..0c67ac219d 100644 --- a/test/integration/controller/jobs/tfjob/suite_test.go +++ b/test/integration/controller/jobs/tfjob/suite_test.go @@ -80,7 +80,10 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu cCache := cache.New(mgr.GetClient()) queues := queue.NewManager(mgr.GetClient(), cCache) - failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{}) + configuration := &config.Configuration{} + mgr.GetScheme().Default(configuration) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) err = tfjob.SetupIndexes(ctx, mgr.GetFieldIndexer()) diff --git a/test/integration/controller/jobs/xgboostjob/suite_test.go b/test/integration/controller/jobs/xgboostjob/suite_test.go index 10bfc0d19f..d3d74e853f 100644 --- a/test/integration/controller/jobs/xgboostjob/suite_test.go +++ b/test/integration/controller/jobs/xgboostjob/suite_test.go @@ -80,7 +80,10 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu cCache := cache.New(mgr.GetClient()) queues := queue.NewManager(mgr.GetClient(), cCache) - failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{}) + configuration := &config.Configuration{} + mgr.GetScheme().Default(configuration) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) err = xgboostjob.SetupIndexes(ctx, mgr.GetFieldIndexer()) diff --git a/test/integration/framework/framework.go b/test/integration/framework/framework.go index 3794e93b27..92ccc216f1 100644 --- a/test/integration/framework/framework.go +++ b/test/integration/framework/framework.go @@ -42,6 +42,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook" jobsetapi "sigs.k8s.io/jobset/api/jobset/v1alpha2" + config "sigs.k8s.io/kueue/apis/config/v1beta1" kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" ) @@ -85,7 +86,10 @@ func (f *Framework) Init() *rest.Config { } func (f *Framework) RunManager(cfg *rest.Config, managerSetup ManagerSetup) (context.Context, client.Client) { - err := kueue.AddToScheme(scheme.Scheme) + err := config.AddToScheme(scheme.Scheme) + gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) + + err = kueue.AddToScheme(scheme.Scheme) gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) err = kueuealpha.AddToScheme(scheme.Scheme) diff --git a/test/integration/multikueue/suite_test.go b/test/integration/multikueue/suite_test.go index 49f2f255a2..383b56beda 100644 --- a/test/integration/multikueue/suite_test.go +++ b/test/integration/multikueue/suite_test.go @@ -125,7 +125,10 @@ func managerSetup(mgr manager.Manager, ctx context.Context) { cCache := cache.New(mgr.GetClient()) queues := queue.NewManager(mgr.GetClient(), cCache) - failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{}) + configuration := &config.Configuration{} + mgr.GetScheme().Default(configuration) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) failedWebhook, err := webhooks.Setup(mgr) diff --git a/test/integration/scheduler/podsready/suite_test.go b/test/integration/scheduler/podsready/suite_test.go index a649a4111f..68f5ad945c 100644 --- a/test/integration/scheduler/podsready/suite_test.go +++ b/test/integration/scheduler/podsready/suite_test.go @@ -58,7 +58,7 @@ func TestSchedulerWithWaitForPodsReady(t *testing.T) { } func managerAndSchedulerSetupWithTimeoutAdmission(mgr manager.Manager, ctx context.Context, value time.Duration, blockAdmission bool, requeuingTimestamp config.RequeuingTimestamp) { - cfg := config.Configuration{ + cfg := &config.Configuration{ WaitForPodsReady: &config.WaitForPodsReady{ Enable: true, BlockAdmission: &blockAdmission, @@ -66,6 +66,7 @@ func managerAndSchedulerSetupWithTimeoutAdmission(mgr manager.Manager, ctx conte RequeuingTimestamp: ptr.To(requeuingTimestamp), }, } + mgr.GetScheme().Default(cfg) err := indexer.Setup(ctx, mgr.GetFieldIndexer()) gomega.Expect(err).NotTo(gomega.HaveOccurred()) @@ -76,7 +77,7 @@ func managerAndSchedulerSetupWithTimeoutAdmission(mgr manager.Manager, ctx conte queue.WithPodsReadyRequeuingTimestamp(requeuingTimestamp), ) - failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &cfg) + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, cfg) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) failedWebhook, err := webhooks.Setup(mgr) diff --git a/test/integration/scheduler/suite_test.go b/test/integration/scheduler/suite_test.go index 88e0d1c6a2..89bbaf82f4 100644 --- a/test/integration/scheduler/suite_test.go +++ b/test/integration/scheduler/suite_test.go @@ -75,7 +75,10 @@ func managerAndSchedulerSetup(mgr manager.Manager, ctx context.Context) { cCache := cache.New(mgr.GetClient()) queues := queue.NewManager(mgr.GetClient(), cCache) - failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{}) + configuration := &config.Configuration{} + mgr.GetScheme().Default(configuration) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) failedWebhook, err := webhooks.Setup(mgr) diff --git a/test/integration/webhook/suite_test.go b/test/integration/webhook/suite_test.go index 3c3b26d437..6595ef98ff 100644 --- a/test/integration/webhook/suite_test.go +++ b/test/integration/webhook/suite_test.go @@ -66,7 +66,11 @@ var _ = ginkgo.BeforeSuite(func() { cCache := cache.New(mgr.GetClient()) queues := queue.NewManager(mgr.GetClient(), cCache) - failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{}) + + configuration := &config.Configuration{} + mgr.GetScheme().Default(configuration) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) }) })