Skip to content

Commit

Permalink
Merge pull request #2337 from zqzten/basic_servers_runnable_group
Browse files Browse the repository at this point in the history
🌱 Introduce a new runnable group for basic servers of the manager
  • Loading branch information
k8s-ci-robot authored Jul 28, 2023
2 parents 21779fb + ad70da2 commit 1000fac
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 42 deletions.
68 changes: 26 additions & 42 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ func (cm *controllerManager) addMetricsServer() error {
})
}

func (cm *controllerManager) serveHealthProbes() {
func (cm *controllerManager) addHealthProbeServer() error {
mux := http.NewServeMux()
server := httpserver.New(mux)
srv := httpserver.New(mux)

if cm.readyzHandler != nil {
mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler))
Expand All @@ -332,7 +332,12 @@ func (cm *controllerManager) serveHealthProbes() {
mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler))
}

go cm.httpServe("health probe", cm.logger, server, cm.healthProbeListener)
return cm.add(&server{
Kind: "health probe",
Log: cm.logger,
Server: srv,
Listener: cm.healthProbeListener,
})
}

func (cm *controllerManager) addPprofServer() error {
Expand All @@ -353,42 +358,6 @@ func (cm *controllerManager) addPprofServer() error {
})
}

func (cm *controllerManager) httpServe(kind string, log logr.Logger, server *http.Server, ln net.Listener) {
log = log.WithValues("kind", kind, "addr", ln.Addr())

go func() {
log.Info("Starting server")
if err := server.Serve(ln); err != nil {
if errors.Is(err, http.ErrServerClosed) {
return
}
if atomic.LoadInt64(cm.stopProcedureEngaged) > 0 {
// There might be cases where connections are still open and we try to shutdown
// but not having enough time to close the connection causes an error in Serve
//
// In that case we want to avoid returning an error to the main error channel.
log.Error(err, "error on Serve after stop has been engaged")
return
}
cm.errChan <- err
}
}()

// Shutdown the server when stop is closed.
<-cm.internalProceduresStop
if err := server.Shutdown(cm.shutdownCtx); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// Avoid logging context related errors.
return
}
if atomic.LoadInt64(cm.stopProcedureEngaged) > 0 {
cm.logger.Error(err, "error on Shutdown after stop has been engaged")
return
}
cm.errChan <- err
}
}

// Start starts the manager and waits indefinitely.
// There is only two ways to have start return:
// An error has occurred during in one of the internal operations,
Expand Down Expand Up @@ -449,7 +418,9 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {

// Serve health probes.
if cm.healthProbeListener != nil {
cm.serveHealthProbes()
if err := cm.addHealthProbeServer(); err != nil {
return fmt.Errorf("failed to add health probe server: %w", err)
}
}

// Add pprof server
Expand All @@ -459,7 +430,17 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
}
}

// First start any webhook servers, which includes conversion, validation, and defaulting
// First start any internal HTTP servers, which includes health probes, metrics and profiling if enabled.
//
// WARNING: Internal HTTP servers MUST start before any cache is populated, otherwise it would block
// conversion webhooks to be ready for serving which make the cache never get ready.
if err := cm.runnables.HTTPServers.Start(cm.internalCtx); err != nil {
if err != nil {
return fmt.Errorf("failed to start HTTP servers: %w", err)
}
}

// Start any webhook servers, which includes conversion, validation, and defaulting
// webhooks that are registered.
//
// WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition
Expand Down Expand Up @@ -591,10 +572,13 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
cm.logger.Info("Stopping and waiting for caches")
cm.runnables.Caches.StopAndWait(cm.shutdownCtx)

// Webhooks should come last, as they might be still serving some requests.
// Webhooks and internal HTTP servers should come last, as they might be still serving some requests.
cm.logger.Info("Stopping and waiting for webhooks")
cm.runnables.Webhooks.StopAndWait(cm.shutdownCtx)

cm.logger.Info("Stopping and waiting for HTTP servers")
cm.runnables.HTTPServers.StopAndWait(cm.shutdownCtx)

// Proceed to close the manager and overall shutdown context.
cm.logger.Info("Wait completed, proceeding to shutdown the manager")
shutdownCancel()
Expand Down
4 changes: 4 additions & 0 deletions pkg/manager/runnable_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type runnableCheck func(ctx context.Context) bool
// runnables handles all the runnables for a manager by grouping them accordingly to their
// type (webhooks, caches etc.).
type runnables struct {
HTTPServers *runnableGroup
Webhooks *runnableGroup
Caches *runnableGroup
LeaderElection *runnableGroup
Expand All @@ -37,6 +38,7 @@ type runnables struct {
// newRunnables creates a new runnables object.
func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables {
return &runnables{
HTTPServers: newRunnableGroup(baseContext, errChan),
Webhooks: newRunnableGroup(baseContext, errChan),
Caches: newRunnableGroup(baseContext, errChan),
LeaderElection: newRunnableGroup(baseContext, errChan),
Expand All @@ -52,6 +54,8 @@ func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables {
// The runnables added after Start are started directly.
func (r *runnables) Add(fn Runnable) error {
switch runnable := fn.(type) {
case *server:
return r.HTTPServers.Add(fn, nil)
case hasCache:
return r.Caches.Add(fn, func(ctx context.Context) bool {
return runnable.GetCache().WaitForCacheSync(ctx)
Expand Down
7 changes: 7 additions & 0 deletions pkg/manager/runnable_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ var _ = Describe("runnables", func() {
Expect(newRunnables(defaultBaseContext, errCh)).ToNot(BeNil())
})

It("should add HTTP servers to the appropriate group", func() {
server := &server{}
r := newRunnables(defaultBaseContext, errCh)
Expect(r.Add(server)).To(Succeed())
Expect(r.HTTPServers.startQueue).To(HaveLen(1))
})

It("should add caches to the appropriate group", func() {
cache := &cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}
r := newRunnables(defaultBaseContext, errCh)
Expand Down

0 comments on commit 1000fac

Please sign in to comment.