Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

馃尡 Make metrics and health probe servers be Runnables #2275

Merged
merged 1 commit into from
May 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 24 additions & 45 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,24 +297,30 @@ func (cm *controllerManager) GetControllerOptions() config.Controller {
return cm.controllerConfig
}

func (cm *controllerManager) serveMetrics() {
func (cm *controllerManager) addMetricsServer() error {
mux := http.NewServeMux()
srv := httpserver.New(mux)

handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{
ErrorHandling: promhttp.HTTPErrorOnError,
})
// TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics
mux := http.NewServeMux()
mux.Handle(defaultMetricsEndpoint, handler)
for path, extraHandler := range cm.metricsExtraHandlers {
mux.Handle(path, extraHandler)
}

server := httpserver.New(mux)
go cm.httpServe("metrics", cm.logger.WithValues("path", defaultMetricsEndpoint), server, cm.metricsListener)
return cm.add(&server{
Kind: "metrics",
Log: cm.logger.WithValues("path", defaultMetricsEndpoint),
Server: srv,
Listener: cm.metricsListener,
})
}

func (cm *controllerManager) serveHealthProbes() {
func (cm *controllerManager) addHealthProbeServer() error {
mux := http.NewServeMux()
server := httpserver.New(mux)
srv := httpserver.New(mux)

if cm.readyzHandler != nil {
mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler))
Expand All @@ -327,7 +333,12 @@ func (cm *controllerManager) serveHealthProbes() {
mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler))
}

go cm.httpServe("health probe", cm.logger, server, cm.healthProbeListener)
return cm.add(&server{
Kind: "health probe",
Log: cm.logger,
Server: srv,
Listener: cm.healthProbeListener,
})
}

func (cm *controllerManager) addPprofServer() error {
Expand All @@ -348,42 +359,6 @@ func (cm *controllerManager) addPprofServer() error {
})
}

func (cm *controllerManager) httpServe(kind string, log logr.Logger, server *http.Server, ln net.Listener) {
log = log.WithValues("kind", kind, "addr", ln.Addr())

go func() {
log.Info("Starting server")
if err := server.Serve(ln); err != nil {
if errors.Is(err, http.ErrServerClosed) {
return
}
if atomic.LoadInt64(cm.stopProcedureEngaged) > 0 {
// There might be cases where connections are still open and we try to shutdown
// but not having enough time to close the connection causes an error in Serve
//
// In that case we want to avoid returning an error to the main error channel.
log.Error(err, "error on Serve after stop has been engaged")
return
}
cm.errChan <- err
}
}()

// Shutdown the server when stop is closed.
<-cm.internalProceduresStop
if err := server.Shutdown(cm.shutdownCtx); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// Avoid logging context related errors.
return
}
if atomic.LoadInt64(cm.stopProcedureEngaged) > 0 {
cm.logger.Error(err, "error on Shutdown after stop has been engaged")
return
}
cm.errChan <- err
}
}

// Start starts the manager and waits indefinitely.
// There is only two ways to have start return:
// An error has occurred during in one of the internal operations,
Expand Down Expand Up @@ -437,12 +412,16 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
// (If we don't serve metrics for non-leaders, prometheus will still scrape
// the pod but will get a connection refused).
if cm.metricsListener != nil {
cm.serveMetrics()
if err := cm.addMetricsServer(); err != nil {
return fmt.Errorf("failed to add metrics server: %w", err)
}
}

// Serve health probes.
if cm.healthProbeListener != nil {
cm.serveHealthProbes()
if err := cm.addHealthProbeServer(); err != nil {
return fmt.Errorf("failed to add health probe server: %w", err)
}
}

// Add pprof server
Expand Down