Skip to content

Commit

Permalink
Avoid deadlock on start
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziopandini committed Oct 8, 2021
1 parent e1880f5 commit 69e892b
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 28 deletions.
79 changes: 55 additions & 24 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,23 @@ type controllerManager struct {
// Healthz probe handler
healthzHandler *healthz.Handler

mu sync.Mutex
started bool
startedLeader bool
// addMu protects controllerManager from Add, AddHealthzCheck, AddMetricsExtraHandler, AddReadyzCheck being
// called while the data they collect are being read.
addMu sync.RWMutex

// started tracks if the check has been started.
started bool

// leader runnable started.
startedLeader bool

// healthz started. In other words, we should not add healthz or readyz to the manager
healthzStarted bool
errChan chan error

// cacheMu protects waitForCache from being executed twice concurrently.
cacheMu sync.Mutex

errChan chan error

// controllerOptions are the global controller options.
controllerOptions v1alpha1.ControllerConfigurationSpec
Expand Down Expand Up @@ -192,8 +204,8 @@ type hasCache interface {

// Add sets dependencies on i, and adds it to the list of Runnables to start.
func (cm *controllerManager) Add(r Runnable) error {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.addMu.Lock()
defer cm.addMu.Unlock()
if cm.stopProcedureEngaged {
return errors.New("can't accept new runnable as stop procedure is already engaged")
}
Expand Down Expand Up @@ -248,8 +260,8 @@ func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Ha
return fmt.Errorf("overriding builtin %s endpoint is not allowed", defaultMetricsEndpoint)
}

cm.mu.Lock()
defer cm.mu.Unlock()
cm.addMu.Lock()
defer cm.addMu.Unlock()

if _, found := cm.metricsExtraHandlers[path]; found {
return fmt.Errorf("can't register extra handler by duplicate path %q on metrics http server", path)
Expand All @@ -262,8 +274,8 @@ func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Ha

// AddHealthzCheck allows you to add Healthz checker.
func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) error {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.addMu.Lock()
defer cm.addMu.Unlock()

if cm.stopProcedureEngaged {
return errors.New("can't accept new healthCheck as stop procedure is already engaged")
Expand All @@ -283,8 +295,8 @@ func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker)

// AddReadyzCheck allows you to add Readyz checker.
func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) error {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.addMu.Lock()
defer cm.addMu.Unlock()

if cm.stopProcedureEngaged {
return errors.New("can't accept new ready check as stop procedure is already engaged")
Expand Down Expand Up @@ -367,8 +379,8 @@ func (cm *controllerManager) serveMetrics() {
mux.Handle(defaultMetricsEndpoint, handler)

func() {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.addMu.RLock()
defer cm.addMu.RUnlock()

for path, extraHandler := range cm.metricsExtraHandlers {
mux.Handle(path, extraHandler)
Expand Down Expand Up @@ -401,8 +413,8 @@ func (cm *controllerManager) serveHealthProbes() {
}

func() {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.addMu.RLock()
defer cm.addMu.RUnlock()

if cm.readyzHandler != nil {
mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler))
Expand All @@ -422,6 +434,9 @@ func (cm *controllerManager) serveHealthProbes() {
}
return nil
}))

// Note: healthzStarted is used by AddMetricsExtraHandler, AddReadyzCheck, but it is safe to change here because
// addMu.RLock() prevents the above functions to be executed concurrently with this operation.
cm.healthzStarted = true
}()

Expand Down Expand Up @@ -535,8 +550,11 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
if cm.gracefulShutdownTimeout == 0 {
return nil
}
cm.mu.Lock()
defer cm.mu.Unlock()
cm.addMu.RLock()
defer cm.addMu.RUnlock()

// Note: stopProcedureEngaged is used by Add, AddMetricsExtraHandler, AddReadyzCheck, but it is safe to change here because
// addMu.RLock() prevents the above functions to be executed concurrently with this operation.
cm.stopProcedureEngaged = true

// we want to close this after the other runnables stop, because we don't
Expand Down Expand Up @@ -574,8 +592,8 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF
}

func (cm *controllerManager) startNonLeaderElectionRunnables() {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.addMu.RLock()
defer cm.addMu.RUnlock()

// First start any webhook servers, which includes conversion, validation, and defaulting
// webhooks that are registered.
Expand Down Expand Up @@ -605,8 +623,8 @@ func (cm *controllerManager) startNonLeaderElectionRunnables() {
}

func (cm *controllerManager) startLeaderElectionRunnables() {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.addMu.RLock()
defer cm.addMu.RUnlock()

cm.waitForCache(cm.internalCtx)

Expand All @@ -617,10 +635,15 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
cm.startRunnable(c)
}

// Note: startedLeader is used by Add, but it is safe to change here because
// addMu.RLock() prevents the above function to be executed concurrently with this operation.
cm.startedLeader = true
}

func (cm *controllerManager) waitForCache(ctx context.Context) {
cm.cacheMu.Lock()
defer cm.cacheMu.Unlock()

if cm.started {
return
}
Expand All @@ -638,14 +661,22 @@ func (cm *controllerManager) waitForCache(ctx context.Context) {
// cm.started as check if we already started the cache so it must always become true.
// Making sure that the cache doesn't get started twice is needed to not get a "close
// of closed channel" panic

// Note: started is used by Add, so it is required to get an addMu lock/RLock before
// calling this func in order to prevent the above function to be executed concurrently
// with this operation.
cm.started = true
}

func (cm *controllerManager) startLeaderElection() (err error) {
ctx, cancel := context.WithCancel(context.Background())
cm.mu.Lock()

// Note: leaderElectionCancel is used by engageStopProcedure, which already gets a addMu.Rlock;
// thus, in order to prevent the above function to be executed concurrently with this operation, we
// require and addMu.Lock also here.
cm.addMu.Lock()
cm.leaderElectionCancel = cancel
cm.mu.Unlock()
cm.addMu.Unlock()

if cm.onStoppedLeading == nil {
cm.onStoppedLeading = func() {
Expand Down
8 changes: 4 additions & 4 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1350,8 +1350,8 @@ var _ = Describe("manger.Manager", func() {

// Wait for the Manager to start
Eventually(func() bool {
mgr.mu.Lock()
defer mgr.mu.Unlock()
mgr.addMu.Lock()
defer mgr.addMu.Unlock()
return mgr.started
}).Should(BeTrue())

Expand Down Expand Up @@ -1381,8 +1381,8 @@ var _ = Describe("manger.Manager", func() {

// Wait for the Manager to start
Eventually(func() bool {
mgr.mu.Lock()
defer mgr.mu.Unlock()
mgr.addMu.Lock()
defer mgr.addMu.Unlock()
return mgr.started
}).Should(BeTrue())

Expand Down

0 comments on commit 69e892b

Please sign in to comment.