Skip to content

Commit

Permalink
Use leader election defaulting to set up controllers HA (#1652)
Browse files Browse the repository at this point in the history
* Use leader election defaulting to set up controllers HA

* Add configuration defaulting to integration tests
  • Loading branch information
astefanutti committed Jan 29, 2024
1 parent 0edd519 commit b7fc93f
Show file tree
Hide file tree
Showing 17 changed files with 75 additions and 36 deletions.
27 changes: 8 additions & 19 deletions pkg/controller/core/leader_aware_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -29,11 +28,6 @@ import (
config "sigs.k8s.io/kueue/apis/config/v1beta1"
)

// defaultRequeueDuration defaults the duration used by non-leading replicas
// to requeue events, so no events are missed over the period it takes for
// leader election to fail over a new replica.
const defaultRequeueDuration = 15 * time.Second

// WithLeadingManager returns a decorating reconcile.Reconciler that discards reconciliation requests
// for the controllers that are started with the controller.Options.NeedLeaderElection
// option set to false in non-leading replicas.
Expand All @@ -54,28 +48,23 @@ func WithLeadingManager(mgr ctrl.Manager, reconciler reconcile.Reconciler, obj c
return reconciler
}

// Default to the recommended lease duration, that's used for core components
requeueDuration := defaultRequeueDuration
// Otherwise used the configured lease duration for the manager
zero := metav1.Duration{}
if duration := cfg.LeaderElection.LeaseDuration; duration != zero {
requeueDuration = duration.Duration
}

return &leaderAwareReconciler{
elected: mgr.Elected(),
client: mgr.GetClient(),
delegate: reconciler,
object: obj,
requeueDuration: requeueDuration,
requeueDuration: cfg.LeaderElection.LeaseDuration.Duration,
}
}

type leaderAwareReconciler struct {
elected <-chan struct{}
client client.Client
delegate reconcile.Reconciler
object client.Object
elected <-chan struct{}
client client.Client
delegate reconcile.Reconciler
object client.Object
// the duration used by non-leading replicas to requeue events,
// so no events are missed over the period it takes for
// leader election to fail over a new replica.
requeueDuration time.Duration
}

Expand Down
2 changes: 2 additions & 0 deletions test/integration/controller/core/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func managerSetup(mgr manager.Manager, ctx context.Context) {
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook)

controllersCfg := &config.Configuration{}
mgr.GetScheme().Default(controllersCfg)

controllersCfg.Metrics.EnableClusterQueueResources = true
controllersCfg.QueueVisibility = &config.QueueVisibility{
UpdateIntervalSeconds: 2,
Expand Down
5 changes: 4 additions & 1 deletion test/integration/controller/jobs/job/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu
cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{})
configuration := &config.Configuration{}
mgr.GetScheme().Default(configuration)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)

err = job.SetupIndexes(ctx, mgr.GetFieldIndexer())
Expand Down
5 changes: 4 additions & 1 deletion test/integration/controller/jobs/jobset/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu
cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{})
configuration := &config.Configuration{}
mgr.GetScheme().Default(configuration)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)

err = jobset.SetupIndexes(ctx, mgr.GetFieldIndexer())
Expand Down
5 changes: 4 additions & 1 deletion test/integration/controller/jobs/mpijob/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu
cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{})
configuration := &config.Configuration{}
mgr.GetScheme().Default(configuration)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)

err = mpijob.SetupIndexes(ctx, mgr.GetFieldIndexer())
Expand Down
5 changes: 4 additions & 1 deletion test/integration/controller/jobs/mxjob/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu
cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{})
configuration := &config.Configuration{}
mgr.GetScheme().Default(configuration)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)

err = mxjob.SetupIndexes(ctx, mgr.GetFieldIndexer())
Expand Down
5 changes: 4 additions & 1 deletion test/integration/controller/jobs/paddlejob/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu
cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{})
configuration := &config.Configuration{}
mgr.GetScheme().Default(configuration)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)

err = paddlejob.SetupIndexes(ctx, mgr.GetFieldIndexer())
Expand Down
10 changes: 8 additions & 2 deletions test/integration/controller/jobs/pod/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ func managerSetup(opts ...jobframework.Option) framework.ManagerSetup {
cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{})
configuration := &config.Configuration{}
mgr.GetScheme().Default(configuration)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)

err = pod.SetupWebhook(mgr, opts...)
Expand Down Expand Up @@ -118,7 +121,10 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu
cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{})
configuration := &config.Configuration{}
mgr.GetScheme().Default(configuration)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)

err = pod.SetupWebhook(mgr, opts...)
Expand Down
5 changes: 4 additions & 1 deletion test/integration/controller/jobs/pytorchjob/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu
cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{})
configuration := &config.Configuration{}
mgr.GetScheme().Default(configuration)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)

err = pytorchjob.SetupIndexes(ctx, mgr.GetFieldIndexer())
Expand Down
5 changes: 4 additions & 1 deletion test/integration/controller/jobs/rayjob/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu
cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{})
configuration := &config.Configuration{}
mgr.GetScheme().Default(configuration)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)

err = rayjob.SetupIndexes(ctx, mgr.GetFieldIndexer())
Expand Down
5 changes: 4 additions & 1 deletion test/integration/controller/jobs/tfjob/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu
cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{})
configuration := &config.Configuration{}
mgr.GetScheme().Default(configuration)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)

err = tfjob.SetupIndexes(ctx, mgr.GetFieldIndexer())
Expand Down
5 changes: 4 additions & 1 deletion test/integration/controller/jobs/xgboostjob/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu
cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{})
configuration := &config.Configuration{}
mgr.GetScheme().Default(configuration)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)

err = xgboostjob.SetupIndexes(ctx, mgr.GetFieldIndexer())
Expand Down
6 changes: 5 additions & 1 deletion test/integration/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook"
jobsetapi "sigs.k8s.io/jobset/api/jobset/v1alpha2"

config "sigs.k8s.io/kueue/apis/config/v1beta1"
kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
)
Expand Down Expand Up @@ -85,7 +86,10 @@ func (f *Framework) Init() *rest.Config {
}

func (f *Framework) RunManager(cfg *rest.Config, managerSetup ManagerSetup) (context.Context, client.Client) {
err := kueue.AddToScheme(scheme.Scheme)
err := config.AddToScheme(scheme.Scheme)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())

err = kueue.AddToScheme(scheme.Scheme)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())

err = kueuealpha.AddToScheme(scheme.Scheme)
Expand Down
5 changes: 4 additions & 1 deletion test/integration/multikueue/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ func managerSetup(mgr manager.Manager, ctx context.Context) {
cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{})
configuration := &config.Configuration{}
mgr.GetScheme().Default(configuration)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)

failedWebhook, err := webhooks.Setup(mgr)
Expand Down
5 changes: 3 additions & 2 deletions test/integration/scheduler/podsready/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,15 @@ func TestSchedulerWithWaitForPodsReady(t *testing.T) {
}

func managerAndSchedulerSetupWithTimeoutAdmission(mgr manager.Manager, ctx context.Context, value time.Duration, blockAdmission bool, requeuingTimestamp config.RequeuingTimestamp) {
cfg := config.Configuration{
cfg := &config.Configuration{
WaitForPodsReady: &config.WaitForPodsReady{
Enable: true,
BlockAdmission: &blockAdmission,
Timeout: &metav1.Duration{Duration: value},
RequeuingTimestamp: ptr.To(requeuingTimestamp),
},
}
mgr.GetScheme().Default(cfg)

err := indexer.Setup(ctx, mgr.GetFieldIndexer())
gomega.Expect(err).NotTo(gomega.HaveOccurred())
Expand All @@ -76,7 +77,7 @@ func managerAndSchedulerSetupWithTimeoutAdmission(mgr manager.Manager, ctx conte
queue.WithPodsReadyRequeuingTimestamp(requeuingTimestamp),
)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &cfg)
failedCtrl, err := core.SetupControllers(mgr, queues, cCache, cfg)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)

failedWebhook, err := webhooks.Setup(mgr)
Expand Down
5 changes: 4 additions & 1 deletion test/integration/scheduler/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ func managerAndSchedulerSetup(mgr manager.Manager, ctx context.Context) {
cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{})
configuration := &config.Configuration{}
mgr.GetScheme().Default(configuration)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)

failedWebhook, err := webhooks.Setup(mgr)
Expand Down
6 changes: 5 additions & 1 deletion test/integration/webhook/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ var _ = ginkgo.BeforeSuite(func() {

cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache)
failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{})

configuration := &config.Configuration{}
mgr.GetScheme().Default(configuration)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)
})
})
Expand Down

0 comments on commit b7fc93f

Please sign in to comment.