diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index fb79c55441..6345a4ba60 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -173,6 +173,10 @@ type controllerManager struct { // internalProceduresStop channel is used internally to the manager when coordinating // the proper shutdown of servers. This channel is also used for dependency injection. internalProceduresStop chan struct{} + + // prestartHooks are functions that are run immediately before calling the Start functions + // of the leader election runnables. + prestartHooks []func(ctx context.Context) error } type hasCache interface { @@ -269,6 +273,19 @@ func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) return nil } +// AddPrestartHook allows you to add prestart hooks. +func (cm *controllerManager) AddPrestartHook(hook func(ctx context.Context) error) error { + cm.Lock() + defer cm.Unlock() + + if cm.started { + return fmt.Errorf("unable to add new prestart hook because the manager has already been started") + } + + cm.prestartHooks = append(cm.prestartHooks, hook) + return nil +} + func (cm *controllerManager) GetConfig() *rest.Config { return cm.cluster.GetConfig() } @@ -606,6 +623,15 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e } func (cm *controllerManager) startLeaderElectionRunnables() error { + for _, hook := range cm.prestartHooks { + if err := hook(cm.internalCtx); err != nil { + return err + } + } + + // All the prestart hooks have been run, clear the slice to free the underlying resources. + cm.prestartHooks = nil + return cm.runnables.LeaderElection.Start(cm.internalCtx) } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 072919058a..2048f75cf6 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -79,6 +79,12 @@ type Manager interface { // AddReadyzCheck allows you to add Readyz checker AddReadyzCheck(name string, check healthz.Checker) error + // AddPrestartHook allows you to add a hook that runs after leader election and immediately + // before starting controllers needing leader election. Prestart hooks block execution of + // leader election controllers until all return nil error. The manager is stopped on non-nil + // errors. + AddPrestartHook(func(ctx context.Context) error) error + // Start starts all registered Controllers and blocks until the context is cancelled. // Returns an error if there is an error starting any controller. // diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 6b01d48293..3176d67b09 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -1076,6 +1076,63 @@ var _ = Describe("manger.Manager", func() { <-runnableStopped }) + It("should run prestart hooks before calling Start on leader election runnables", func() { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + + runnableRan := make(chan struct{}) + + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + close(runnableRan) + return nil + }))) + + Expect(m.AddPrestartHook(func(ctx context.Context) error { + Expect(m.Elected()).ShouldNot(BeClosed()) + Consistently(runnableRan).ShouldNot(BeClosed()) + return nil + })) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(m.Elected()).ShouldNot(BeClosed()) + Expect(m.Start(ctx)).NotTo(HaveOccurred()) + }() + + <-m.Elected() + }) + + It("should not run leader election runnables if prestart hooks fail", func() { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + + runnableRan := make(chan struct{}) + + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + close(runnableRan) + return nil + }))) + + Expect(m.AddPrestartHook(func(ctx context.Context) error { + Expect(m.Elected()).ShouldNot(BeClosed()) + Consistently(runnableRan).ShouldNot(BeClosed()) + return errors.New("prestart hook failed") + })) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + Expect(m.Elected()).ShouldNot(BeClosed()) + Expect(m.Start(ctx)).Should(MatchError(ContainSubstring("prestart hook failed"))) + }) } Context("with defaults", func() {