From 6c35525e4d9317aadc8fd43d4d67d71e6115c383 Mon Sep 17 00:00:00 2001 From: Terin Stock Date: Mon, 14 Nov 2022 15:51:09 +0100 Subject: [PATCH] feat(manager): add prestart hook support When implementing a controller that uses leader election, there maybe be work that needs to be done after winning the election but before processing enqueued requests. For example, a controller may need to build up an internal mapping of the current state of the cluster before it can begin reconciling. This changeset adds support for adding prestart hooks to controller-runtime's manager implementation. This hook runs after the manager has been elected leader, immediately before the leader election controllers are started. Related #607 --- pkg/manager/internal.go | 38 ++++++++++++++++++ pkg/manager/manager.go | 26 ++++++++++++ pkg/manager/manager_test.go | 80 +++++++++++++++++++++++++++++++++++++ 3 files changed, 144 insertions(+) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index a16f354a1b..da78dc0613 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -53,6 +53,7 @@ const ( defaultRenewDeadline = 10 * time.Second defaultRetryPeriod = 2 * time.Second defaultGracefulShutdownPeriod = 30 * time.Second + defaultHookPeriod = 15 * time.Second defaultReadinessEndpoint = "/readyz" defaultLivenessEndpoint = "/healthz" @@ -161,6 +162,13 @@ type controllerManager struct { // internalProceduresStop channel is used internally to the manager when coordinating // the proper shutdown of servers. This channel is also used for dependency injection. internalProceduresStop chan struct{} + + // prestartHooks are functions that are run immediately before calling the Start functions + // of the leader election runnables. + prestartHooks []Runnable + + // hookTimeout is the duration given to each hook to return successfully. + hookTimeout time.Duration } type hasCache interface { @@ -217,6 +225,23 @@ func (cm *controllerManager) GetHTTPClient() *http.Client { return cm.cluster.GetHTTPClient() } +// Hook allows you to add hooks. +func (cm *controllerManager) Hook(hook HookType, runnable Runnable) error { + cm.Lock() + defer cm.Unlock() + + if cm.started { + return fmt.Errorf("unable to add new hook because the manager has already been started") + } + + switch hook { + case HookPrestartType: + cm.prestartHooks = append(cm.prestartHooks, runnable) + } + + return nil +} + func (cm *controllerManager) GetConfig() *rest.Config { return cm.cluster.GetConfig() } @@ -554,6 +579,19 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e } func (cm *controllerManager) startLeaderElectionRunnables() error { + cm.logger.Info("Starting prestart hooks") + for _, hook := range cm.prestartHooks { + ctx, cancel := context.WithTimeout(cm.internalCtx, cm.hookTimeout) + if err := hook.Start(ctx); err != nil { + cancel() + return err + } + cancel() + } + + // All the prestart hooks have been run, clear the slice to free the underlying resources. + cm.prestartHooks = nil + return cm.runnables.LeaderElection.Start(cm.internalCtx) } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 25c3c7375b..459902e6cf 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -263,6 +263,10 @@ type Options struct { // +optional Controller config.Controller + // HookTimeout is the duration given to each hook to return successfully. + // To use hooks without timeout, set to a negative duration, e.g. time.Duration(-1) + HookTimeout *time.Duration + // makeBroadcaster allows deferring the creation of the broadcaster to // avoid leaking goroutines if we never call Start on this manager. It also // returns whether or not this is a "owned" broadcaster, and as such should be @@ -277,6 +281,22 @@ type Options struct { newPprofListener func(addr string) (net.Listener, error) } +// Hookable is implemented by Managers that support running hooks during +// controller lifecycle phases. +type Hookable interface { + // Hook allows to add Runnables as hooks to modify the behavior. + Hook(hook HookType, runnable Runnable) error +} + +// HookType defines hooks for use with AddHook. +type HookType int + +const ( + // HookPrestartType defines a hook that is run after leader election and immediately before + // calling Start on the runnalbes that needed leader election. + HookPrestartType HookType = iota +) + // BaseContextFunc is a function used to provide a base Context to Runnables // managed by a Manager. type BaseContextFunc func() context.Context @@ -432,6 +452,7 @@ func New(config *rest.Config, options Options) (Manager, error) { livenessEndpointName: options.LivenessEndpointName, pprofListener: pprofListener, gracefulShutdownTimeout: *options.GracefulShutdownTimeout, + hookTimeout: *options.HookTimeout, internalProceduresStop: make(chan struct{}), leaderElectionStopped: make(chan struct{}), leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel, @@ -653,6 +674,11 @@ func setOptionsDefaults(options Options) Options { options.GracefulShutdownTimeout = &gracefulShutdownTimeout } + if options.HookTimeout == nil { + hookTimeout := defaultHookPeriod + options.HookTimeout = &hookTimeout + } + if options.Logger.GetSink() == nil { options.Logger = log.Log } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 90596e9ace..12ae459ddb 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -1207,6 +1207,86 @@ var _ = Describe("manger.Manager", func() { Expect(time.Since(beforeDone)).To(BeNumerically(">=", 1500*time.Millisecond)) }) + It("should run prestart hooks before calling Start on leader election runnables", func() { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + + runnableRan := make(chan struct{}) + + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + close(runnableRan) + return nil + }))).ToNot(HaveOccurred()) + + Expect(m.(Hookable).Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error { + Expect(m.Elected()).ShouldNot(BeClosed()) + Consistently(runnableRan).ShouldNot(BeClosed()) + return nil + }))).ToNot(HaveOccurred()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(m.Elected()).ShouldNot(BeClosed()) + Expect(m.Start(ctx)).NotTo(HaveOccurred()) + }() + + <-m.Elected() + }) + + It("should run prestart hooks with timeout", func() { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + m.(*controllerManager).hookTimeout = 1 * time.Nanosecond + + Expect(m.(Hookable).Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(1 * time.Second): + return errors.New("prestart hook timeout exceeded expected") + } + }))).ToNot(HaveOccurred()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + Expect(m.Start(ctx)).Should(MatchError(context.DeadlineExceeded)) + }) + + It("should not run leader election runnables if prestart hooks fail", func() { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + + runnableRan := make(chan struct{}) + + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + close(runnableRan) + return nil + }))).ToNot(HaveOccurred()) + + Expect(m.(Hookable).Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error { + Expect(m.Elected()).ShouldNot(BeClosed()) + Consistently(runnableRan).ShouldNot(BeClosed()) + return errors.New("prestart hook failed") + }))).ToNot(HaveOccurred()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + Expect(m.Elected()).ShouldNot(BeClosed()) + Expect(m.Start(ctx)).Should(MatchError(ContainSubstring("prestart hook failed"))) + }) } Context("with defaults", func() {