diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index f298229e57..fbe829bb22 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -317,9 +317,9 @@ func (cm *controllerManager) addMetricsServer() error { }) } -func (cm *controllerManager) serveHealthProbes() { +func (cm *controllerManager) addHealthProbeServer() error { mux := http.NewServeMux() - server := httpserver.New(mux) + srv := httpserver.New(mux) if cm.readyzHandler != nil { mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler)) @@ -332,7 +332,12 @@ func (cm *controllerManager) serveHealthProbes() { mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler)) } - go cm.httpServe("health probe", cm.logger, server, cm.healthProbeListener) + return cm.add(&server{ + Kind: "health probe", + Log: cm.logger, + Server: srv, + Listener: cm.healthProbeListener, + }) } func (cm *controllerManager) addPprofServer() error { @@ -353,42 +358,6 @@ func (cm *controllerManager) addPprofServer() error { }) } -func (cm *controllerManager) httpServe(kind string, log logr.Logger, server *http.Server, ln net.Listener) { - log = log.WithValues("kind", kind, "addr", ln.Addr()) - - go func() { - log.Info("Starting server") - if err := server.Serve(ln); err != nil { - if errors.Is(err, http.ErrServerClosed) { - return - } - if atomic.LoadInt64(cm.stopProcedureEngaged) > 0 { - // There might be cases where connections are still open and we try to shutdown - // but not having enough time to close the connection causes an error in Serve - // - // In that case we want to avoid returning an error to the main error channel. - log.Error(err, "error on Serve after stop has been engaged") - return - } - cm.errChan <- err - } - }() - - // Shutdown the server when stop is closed. - <-cm.internalProceduresStop - if err := server.Shutdown(cm.shutdownCtx); err != nil { - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - // Avoid logging context related errors. - return - } - if atomic.LoadInt64(cm.stopProcedureEngaged) > 0 { - cm.logger.Error(err, "error on Shutdown after stop has been engaged") - return - } - cm.errChan <- err - } -} - // Start starts the manager and waits indefinitely. // There is only two ways to have start return: // An error has occurred during in one of the internal operations, @@ -449,7 +418,9 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { // Serve health probes. if cm.healthProbeListener != nil { - cm.serveHealthProbes() + if err := cm.addHealthProbeServer(); err != nil { + return fmt.Errorf("failed to add health probe server: %w", err) + } } // Add pprof server @@ -459,7 +430,17 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { } } - // First start any webhook servers, which includes conversion, validation, and defaulting + // First start any internal HTTP servers, which includes health probes, metrics and profiling if enabled. + // + // WARNING: Internal HTTP servers MUST start before any cache is populated, otherwise it would block + // conversion webhooks to be ready for serving which make the cache never get ready. + if err := cm.runnables.HTTPServers.Start(cm.internalCtx); err != nil { + if err != nil { + return fmt.Errorf("failed to start HTTP servers: %w", err) + } + } + + // Start any webhook servers, which includes conversion, validation, and defaulting // webhooks that are registered. // // WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition @@ -591,10 +572,13 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e cm.logger.Info("Stopping and waiting for caches") cm.runnables.Caches.StopAndWait(cm.shutdownCtx) - // Webhooks should come last, as they might be still serving some requests. + // Webhooks and internal HTTP servers should come last, as they might be still serving some requests. cm.logger.Info("Stopping and waiting for webhooks") cm.runnables.Webhooks.StopAndWait(cm.shutdownCtx) + cm.logger.Info("Stopping and waiting for HTTP servers") + cm.runnables.HTTPServers.StopAndWait(cm.shutdownCtx) + // Proceed to close the manager and overall shutdown context. cm.logger.Info("Wait completed, proceeding to shutdown the manager") shutdownCancel() diff --git a/pkg/manager/runnable_group.go b/pkg/manager/runnable_group.go index 549741e6e5..96566f5df1 100644 --- a/pkg/manager/runnable_group.go +++ b/pkg/manager/runnable_group.go @@ -28,6 +28,7 @@ type runnableCheck func(ctx context.Context) bool // runnables handles all the runnables for a manager by grouping them accordingly to their // type (webhooks, caches etc.). type runnables struct { + HTTPServers *runnableGroup Webhooks *runnableGroup Caches *runnableGroup LeaderElection *runnableGroup @@ -37,6 +38,7 @@ type runnables struct { // newRunnables creates a new runnables object. func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables { return &runnables{ + HTTPServers: newRunnableGroup(baseContext, errChan), Webhooks: newRunnableGroup(baseContext, errChan), Caches: newRunnableGroup(baseContext, errChan), LeaderElection: newRunnableGroup(baseContext, errChan), @@ -52,6 +54,8 @@ func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables { // The runnables added after Start are started directly. func (r *runnables) Add(fn Runnable) error { switch runnable := fn.(type) { + case *server: + return r.HTTPServers.Add(fn, nil) case hasCache: return r.Caches.Add(fn, func(ctx context.Context) bool { return runnable.GetCache().WaitForCacheSync(ctx) diff --git a/pkg/manager/runnable_group_test.go b/pkg/manager/runnable_group_test.go index 456b8d7ac0..251ce46fb3 100644 --- a/pkg/manager/runnable_group_test.go +++ b/pkg/manager/runnable_group_test.go @@ -21,6 +21,13 @@ var _ = Describe("runnables", func() { Expect(newRunnables(defaultBaseContext, errCh)).ToNot(BeNil()) }) + It("should add HTTP servers to the appropriate group", func() { + server := &server{} + r := newRunnables(defaultBaseContext, errCh) + Expect(r.Add(server)).To(Succeed()) + Expect(r.HTTPServers.startQueue).To(HaveLen(1)) + }) + It("should add caches to the appropriate group", func() { cache := &cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}} r := newRunnables(defaultBaseContext, errCh)