diff --git a/pkg/webhook/server.go b/pkg/webhook/server.go index f8820e8b7c..a0ecde0407 100644 --- a/pkg/webhook/server.go +++ b/pkg/webhook/server.go @@ -27,6 +27,7 @@ import ( "path/filepath" "strconv" "sync" + "sync/atomic" "time" "sigs.k8s.io/controller-runtime/pkg/certwatcher" @@ -122,9 +123,13 @@ type DefaultServer struct { // started is set to true immediately before the server is started // and thus can be used to check if the server has been started - started bool + started atomic.Bool - // mu protects access to the webhook map & setFields for Start, Register, etc + // stopping is set to true after the server received a stop signal + // and is used to set the readiness probe to unhealthy + stopping atomic.Bool + + // mu protects access to the webhook map for Register mu sync.Mutex webhookMux *http.ServeMux @@ -246,9 +251,26 @@ func (s *DefaultServer) Start(ctx context.Context) error { idleConnsClosed := make(chan struct{}) go func() { <-ctx.Done() - log.Info("Shutting down webhook server with timeout of 1 minute") - - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + // The context is done, we are shutting down. To shut down cleanly, we + // first start by failing the readiness probe, so that the k8s stops routing + // traffic to us (in case this was still happening). Additionally, we close + // all idle connections, so that long-running requests are redirected to another + // instance of the webhook server. + log.Info("Draining webhook server for 10 seconds") + // We set the stopping flag to true, so that the readiness probe fails. If kubernetes + // is causing the shutdown, it should have already stopped sending traffic to us. This + // is for the case where we triggered a shutdown without kubernetes knowing about it. + s.stopping.Store(true) + srv.SetKeepAlivesEnabled(false) + + // Wait for 10 seconds before we stop accepting new connections. + time.Sleep(10 * time.Second) + + // After 10 seconds, we stop accepting new connections. We now start the + // webserver shutdown, which will wait for all active connections to be + // closed before returning. + log.Info("Shutting down webhook server with timeout of 50 seconds") + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Second) defer cancel() if err := srv.Shutdown(ctx); err != nil { // Error from closing listeners, or context timeout @@ -257,9 +279,7 @@ func (s *DefaultServer) Start(ctx context.Context) error { close(idleConnsClosed) }() - s.mu.Lock() - s.started = true - s.mu.Unlock() + s.started.Store(true) if err := srv.Serve(listener); err != nil && err != http.ErrServerClosed { return err } @@ -275,13 +295,14 @@ func (s *DefaultServer) StartedChecker() healthz.Checker { InsecureSkipVerify: true, //nolint:gosec // config is used to connect to our own webhook port. } return func(req *http.Request) error { - s.mu.Lock() - defer s.mu.Unlock() - - if !s.started { + if !s.started.Load() { return fmt.Errorf("webhook server has not been started yet") } + if s.stopping.Load() { + return fmt.Errorf("webhook server is stopping") + } + d := &net.Dialer{Timeout: 10 * time.Second} conn, err := tls.DialWithDialer(d, "tcp", net.JoinHostPort(s.Options.Host, strconv.Itoa(s.Options.Port)), config) if err != nil {