Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add ability to register runnables as pre-start hooks #2044

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const (
defaultRenewDeadline = 10 * time.Second
defaultRetryPeriod = 2 * time.Second
defaultGracefulShutdownPeriod = 30 * time.Second
defaultHookPeriod = 15 * time.Second

defaultReadinessEndpoint = "/readyz"
defaultLivenessEndpoint = "/healthz"
Expand Down Expand Up @@ -161,6 +162,13 @@ type controllerManager struct {
// internalProceduresStop channel is used internally to the manager when coordinating
// the proper shutdown of servers. This channel is also used for dependency injection.
internalProceduresStop chan struct{}

// prestartHooks are functions that are run immediately before calling the Start functions
// of the leader election runnables.
prestartHooks []Runnable

// hookTimeout is the duration given to each hook to return successfully.
hookTimeout time.Duration
}

type hasCache interface {
Expand Down Expand Up @@ -235,6 +243,23 @@ func (cm *controllerManager) GetHTTPClient() *http.Client {
return cm.cluster.GetHTTPClient()
}

// Hook allows you to add hooks.
func (cm *controllerManager) Hook(hook HookType, runnable Runnable) error {
cm.Lock()
defer cm.Unlock()

if cm.started {
return fmt.Errorf("unable to add new hook because the manager has already been started")
}

switch hook {
case HookPrestartType:
cm.prestartHooks = append(cm.prestartHooks, runnable)
}

return nil
}

func (cm *controllerManager) GetConfig() *rest.Config {
return cm.cluster.GetConfig()
}
Expand Down Expand Up @@ -615,6 +640,27 @@ func (cm *controllerManager) initLeaderElector() (*leaderelection.LeaderElector,
}

func (cm *controllerManager) startLeaderElectionRunnables() error {
cm.logger.Info("Running prestart hooks")
for _, hook := range cm.prestartHooks {
var ctx context.Context
var cancel context.CancelFunc

if cm.hookTimeout < 0 {
ctx, cancel = context.WithCancel(cm.internalCtx)
} else {
ctx, cancel = context.WithTimeout(cm.internalCtx, cm.hookTimeout)
}

if err := hook.Start(ctx); err != nil {
cancel()
return err
}
cancel()
}

// All the prestart hooks have ben run, clear the slice to free the underlying resources.
cm.prestartHooks = nil

return cm.runnables.LeaderElection.Start(cm.internalCtx)
}

Expand Down
22 changes: 22 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ type Manager interface {
// AddReadyzCheck allows you to add Readyz checker
AddReadyzCheck(name string, check healthz.Checker) error

// Hook allows to add Runnables as hooks to modify the behavior.
Hook(hook HookType, runnable Runnable) error

// Start starts all registered Controllers and blocks until the context is cancelled.
// Returns an error if there is an error starting any controller.
//
Expand Down Expand Up @@ -269,6 +272,10 @@ type Options struct {
// +optional
Controller config.Controller

// HookTimeout is the duration given to each hook to return successfully.
// To use hooks without timeout, set to a negative duration, e.g. time.Duration(-1)
terinjokes marked this conversation as resolved.
Show resolved Hide resolved
HookTimeout *time.Duration

// makeBroadcaster allows deferring the creation of the broadcaster to
// avoid leaking goroutines if we never call Start on this manager. It also
// returns whether or not this is a "owned" broadcaster, and as such should be
Expand All @@ -283,6 +290,15 @@ type Options struct {
newPprofListener func(addr string) (net.Listener, error)
}

// HookType defines hooks for use with AddHook.
type HookType int

const (
// HookPrestartType defines a hook that is run after leader election and immediately before
// calling Start on the runnables that needed leader election.
HookPrestartType HookType = iota
)

// BaseContextFunc is a function used to provide a base Context to Runnables
// managed by a Manager.
type BaseContextFunc func() context.Context
Expand Down Expand Up @@ -438,6 +454,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
livenessEndpointName: options.LivenessEndpointName,
pprofListener: pprofListener,
gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
hookTimeout: *options.HookTimeout,
internalProceduresStop: make(chan struct{}),
leaderElectionStopped: make(chan struct{}),
leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,
Expand Down Expand Up @@ -539,6 +556,11 @@ func setOptionsDefaults(options Options) Options {
options.GracefulShutdownTimeout = &gracefulShutdownTimeout
}

if options.HookTimeout == nil {
hookTimeout := defaultHookPeriod
options.HookTimeout = &hookTimeout
}

if options.Logger.GetSink() == nil {
options.Logger = log.Log
}
Expand Down
115 changes: 115 additions & 0 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,121 @@ var _ = Describe("manger.Manager", func() {
Expect(time.Since(beforeDone)).To(BeNumerically(">=", 1500*time.Millisecond))
})

It("should run prestart hooks before calling Start on leader election runnables", func() {
m, err := New(cfg, options)
Expect(err).NotTo(HaveOccurred())
for _, cb := range callbacks {
cb(m)
}

runnableRan := make(chan struct{})

Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
close(runnableRan)
return nil
}))).ToNot(HaveOccurred())

Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
Expect(m.Elected()).ShouldNot(BeClosed())
Consistently(runnableRan).ShouldNot(BeClosed())
return nil
}))).ToNot(HaveOccurred())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(m.Elected()).ShouldNot(BeClosed())
Expect(m.Start(ctx)).NotTo(HaveOccurred())
}()

<-m.Elected()
})

It("should run prestart hooks with timeout", func() {
m, err := New(cfg, options)
Expect(err).NotTo(HaveOccurred())
for _, cb := range callbacks {
cb(m)
}
m.(*controllerManager).hookTimeout = 1 * time.Nanosecond

Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(1 * time.Second):
return errors.New("prestart hook timeout exceeded expected")
}
}))).ToNot(HaveOccurred())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expect(m.Start(ctx)).Should(MatchError(context.DeadlineExceeded))
})

It("should run prestart hooks without timeout", func() {
m, err := New(cfg, options)
Expect(err).NotTo(HaveOccurred())
for _, cb := range callbacks {
cb(m)
}
m.(*controllerManager).hookTimeout = -1 * time.Second

Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
fmt.Println("runnable returning")
return nil
}))).ToNot(HaveOccurred())

Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(1 * time.Second):
fmt.Println("prestart hook returning")
return nil
}
}))).ToNot(HaveOccurred())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
defer GinkgoRecover()
Expect(m.Elected()).ShouldNot(BeClosed())
Expect(m.Start(ctx)).NotTo(HaveOccurred())
}()

<-m.Elected()
})

It("should not run leader election runnables if prestart hooks fail", func() {
m, err := New(cfg, options)
Expect(err).NotTo(HaveOccurred())
for _, cb := range callbacks {
cb(m)
}

runnableRan := make(chan struct{})

Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
close(runnableRan)
return nil
}))).ToNot(HaveOccurred())

Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
Expect(m.Elected()).ShouldNot(BeClosed())
Consistently(runnableRan).ShouldNot(BeClosed())
return errors.New("prestart hook failed")
}))).ToNot(HaveOccurred())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expect(m.Elected()).ShouldNot(BeClosed())
Expect(m.Start(ctx)).Should(MatchError(ContainSubstring("prestart hook failed")))
})
}

Context("with defaults", func() {
Expand Down