Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

馃尡 Introduce a new runnable group for basic servers of the manager #2337

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 26 additions & 42 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ func (cm *controllerManager) addMetricsServer() error {
})
}

func (cm *controllerManager) serveHealthProbes() {
func (cm *controllerManager) addHealthProbeServer() error {
mux := http.NewServeMux()
server := httpserver.New(mux)
srv := httpserver.New(mux)

if cm.readyzHandler != nil {
mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler))
Expand All @@ -332,7 +332,12 @@ func (cm *controllerManager) serveHealthProbes() {
mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler))
}

go cm.httpServe("health probe", cm.logger, server, cm.healthProbeListener)
return cm.add(&server{
Kind: "health probe",
Log: cm.logger,
Server: srv,
Listener: cm.healthProbeListener,
})
}

func (cm *controllerManager) addPprofServer() error {
Expand All @@ -353,42 +358,6 @@ func (cm *controllerManager) addPprofServer() error {
})
}

func (cm *controllerManager) httpServe(kind string, log logr.Logger, server *http.Server, ln net.Listener) {
log = log.WithValues("kind", kind, "addr", ln.Addr())

go func() {
log.Info("Starting server")
if err := server.Serve(ln); err != nil {
if errors.Is(err, http.ErrServerClosed) {
return
}
if atomic.LoadInt64(cm.stopProcedureEngaged) > 0 {
// There might be cases where connections are still open and we try to shutdown
// but not having enough time to close the connection causes an error in Serve
//
// In that case we want to avoid returning an error to the main error channel.
log.Error(err, "error on Serve after stop has been engaged")
return
}
cm.errChan <- err
}
}()

// Shutdown the server when stop is closed.
<-cm.internalProceduresStop
if err := server.Shutdown(cm.shutdownCtx); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// Avoid logging context related errors.
return
}
if atomic.LoadInt64(cm.stopProcedureEngaged) > 0 {
cm.logger.Error(err, "error on Shutdown after stop has been engaged")
return
}
cm.errChan <- err
}
}

// Start starts the manager and waits indefinitely.
// There is only two ways to have start return:
// An error has occurred during in one of the internal operations,
Expand Down Expand Up @@ -449,7 +418,9 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {

// Serve health probes.
if cm.healthProbeListener != nil {
cm.serveHealthProbes()
if err := cm.addHealthProbeServer(); err != nil {
return fmt.Errorf("failed to add health probe server: %w", err)
}
}

// Add pprof server
Expand All @@ -459,7 +430,17 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
}
}

// First start any webhook servers, which includes conversion, validation, and defaulting
// First start any internal HTTP servers, which includes health probes, metrics and profiling if enabled.
//
// WARNING: Internal HTTP servers MUST start before any cache is populated, otherwise it would block
// conversion webhooks to be ready for serving which make the cache never get ready.
if err := cm.runnables.HTTPServers.Start(cm.internalCtx); err != nil {
if err != nil {
return fmt.Errorf("failed to start HTTP servers: %w", err)
}
}

// Start any webhook servers, which includes conversion, validation, and defaulting
// webhooks that are registered.
//
// WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition
Expand Down Expand Up @@ -591,10 +572,13 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
cm.logger.Info("Stopping and waiting for caches")
cm.runnables.Caches.StopAndWait(cm.shutdownCtx)

// Webhooks should come last, as they might be still serving some requests.
// Webhooks and internal HTTP servers should come last, as they might be still serving some requests.
cm.logger.Info("Stopping and waiting for webhooks")
cm.runnables.Webhooks.StopAndWait(cm.shutdownCtx)

cm.logger.Info("Stopping and waiting for HTTP servers")
cm.runnables.HTTPServers.StopAndWait(cm.shutdownCtx)

// Proceed to close the manager and overall shutdown context.
cm.logger.Info("Wait completed, proceeding to shutdown the manager")
shutdownCancel()
Expand Down
4 changes: 4 additions & 0 deletions pkg/manager/runnable_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type runnableCheck func(ctx context.Context) bool
// runnables handles all the runnables for a manager by grouping them accordingly to their
// type (webhooks, caches etc.).
type runnables struct {
HTTPServers *runnableGroup
Webhooks *runnableGroup
Caches *runnableGroup
LeaderElection *runnableGroup
Expand All @@ -37,6 +38,7 @@ type runnables struct {
// newRunnables creates a new runnables object.
func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables {
return &runnables{
HTTPServers: newRunnableGroup(baseContext, errChan),
Webhooks: newRunnableGroup(baseContext, errChan),
Caches: newRunnableGroup(baseContext, errChan),
LeaderElection: newRunnableGroup(baseContext, errChan),
Expand All @@ -52,6 +54,8 @@ func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables {
// The runnables added after Start are started directly.
func (r *runnables) Add(fn Runnable) error {
switch runnable := fn.(type) {
case *server:
return r.HTTPServers.Add(fn, nil)
case hasCache:
return r.Caches.Add(fn, func(ctx context.Context) bool {
return runnable.GetCache().WaitForCacheSync(ctx)
Expand Down
7 changes: 7 additions & 0 deletions pkg/manager/runnable_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ var _ = Describe("runnables", func() {
Expect(newRunnables(defaultBaseContext, errCh)).ToNot(BeNil())
})

It("should add HTTP servers to the appropriate group", func() {
server := &server{}
r := newRunnables(defaultBaseContext, errCh)
Expect(r.Add(server)).To(Succeed())
Expect(r.HTTPServers.startQueue).To(HaveLen(1))
})

It("should add caches to the appropriate group", func() {
cache := &cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}
r := newRunnables(defaultBaseContext, errCh)
Expand Down