From 9501679e9096ac81ac5a5b15991f6d1aec3e0d46 Mon Sep 17 00:00:00 2001 From: Luke Addison Date: Sat, 2 Dec 2023 12:55:59 +0000 Subject: [PATCH 01/11] Add support for shutdown delay Signed-off-by: Luke Addison --- pkg/manager/signals/signal.go | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/pkg/manager/signals/signal.go b/pkg/manager/signals/signal.go index a79cfb42df..1a89f634ed 100644 --- a/pkg/manager/signals/signal.go +++ b/pkg/manager/signals/signal.go @@ -20,14 +20,19 @@ import ( "context" "os" "os/signal" + "time" ) var onlyOneSignalHandler = make(chan struct{}) -// SetupSignalHandler registers for SIGTERM and SIGINT. A context is returned -// which is canceled on one of these signals. If a second signal is caught, the program -// is terminated with exit code 1. -func SetupSignalHandler() context.Context { +// SetupSignalHandlerWithDelay registers for SIGTERM and SIGINT. A context is +// returned which is canceled on one of these signals after waiting for the +// specified delay. In particular, the delay can be used to give external +// Kubernetes controllers (such as kube-proxy) time to observe the termination +// of this manager before starting shutdown of any webhook servers to avoid +// receiving connection attempts after closing webhook listeners. If a second +// signal is caught, the program is terminated with exit code 1. +func SetupSignalHandlerWithDelay(delay time.Duration) context.Context { close(onlyOneSignalHandler) // panics when called twice ctx, cancel := context.WithCancel(context.Background()) @@ -36,10 +41,21 @@ func SetupSignalHandler() context.Context { signal.Notify(c, shutdownSignals...) go func() { <-c - cancel() + // Cancel the context after delaying for the specified duration but + // avoid blocking if a second signal is caught + go func() { + <-time.After(delay) + cancel() + }() <-c os.Exit(1) // second signal. Exit directly. }() return ctx } + +// SetupSignalHandler is a special case of SetupSignalHandlerWithDelay with no +// delay for backwards compatibility +func SetupSignalHandler() context.Context { + return SetupSignalHandlerWithDelay(time.Duration(0)) +} From bf963c8654094323ee8d771a8f822cde1bcb48e6 Mon Sep 17 00:00:00 2001 From: Luke Addison Date: Sat, 2 Dec 2023 13:38:46 +0000 Subject: [PATCH 02/11] Add unit test Signed-off-by: Luke Addison --- pkg/manager/signals/signal.go | 13 ++++--- pkg/manager/signals/signal_test.go | 61 +++++++----------------------- 2 files changed, 22 insertions(+), 52 deletions(-) diff --git a/pkg/manager/signals/signal.go b/pkg/manager/signals/signal.go index 1a89f634ed..1c1e4901d0 100644 --- a/pkg/manager/signals/signal.go +++ b/pkg/manager/signals/signal.go @@ -23,7 +23,11 @@ import ( "time" ) -var onlyOneSignalHandler = make(chan struct{}) +var ( + onlyOneSignalHandler = make(chan struct{}) + // Define global signal channel for testing + signalCh = make(chan os.Signal, 2) +) // SetupSignalHandlerWithDelay registers for SIGTERM and SIGINT. A context is // returned which is canceled on one of these signals after waiting for the @@ -37,17 +41,16 @@ func SetupSignalHandlerWithDelay(delay time.Duration) context.Context { ctx, cancel := context.WithCancel(context.Background()) - c := make(chan os.Signal, 2) - signal.Notify(c, shutdownSignals...) + signal.Notify(signalCh, shutdownSignals...) go func() { - <-c + <-signalCh // Cancel the context after delaying for the specified duration but // avoid blocking if a second signal is caught go func() { <-time.After(delay) cancel() }() - <-c + <-signalCh os.Exit(1) // second signal. Exit directly. }() diff --git a/pkg/manager/signals/signal_test.go b/pkg/manager/signals/signal_test.go index 134937e012..23378b5bcc 100644 --- a/pkg/manager/signals/signal_test.go +++ b/pkg/manager/signals/signal_test.go @@ -17,10 +17,7 @@ limitations under the License. package signals import ( - "fmt" "os" - "os/signal" - "sync" "time" . "github.com/onsi/ginkgo/v2" @@ -31,53 +28,23 @@ var _ = Describe("runtime signal", func() { Context("SignalHandler Test", func() { - It("test signal handler", func() { - ctx := SetupSignalHandler() - task := &Task{ - ticker: time.NewTicker(time.Second * 2), - } - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - task.wg.Add(1) - go func(c chan os.Signal) { - defer task.wg.Done() - task.Run(c) - }(c) + It("test signal handler with delay", func() { + delay := time.Second + ctx := SetupSignalHandlerWithDelay(delay) - select { - case sig := <-c: - fmt.Printf("Got %s signal. Aborting...\n", sig) - case _, ok := <-ctx.Done(): - Expect(ok).To(BeFalse()) - } + // Save time before sending signal + beforeSendingSignal := time.Now() + + // Send signal + signalCh <- os.Interrupt + + _, ok := <-ctx.Done() + // Verify that the channel was closed + Expect(ok).To(BeFalse()) + // Verify that our delay was respected + Expect(time.Since(beforeSendingSignal)).To(BeNumerically(">=", delay)) }) }) }) - -type Task struct { - wg sync.WaitGroup - ticker *time.Ticker -} - -func (t *Task) Run(c chan os.Signal) { - for { - go sendSignal(c) - handle() - } -} - -func handle() { - for i := 0; i < 5; i++ { - fmt.Print("#") - time.Sleep(time.Millisecond * 100) - } - fmt.Println() -} - -func sendSignal(stopChan chan os.Signal) { - fmt.Printf("...") - time.Sleep(1 * time.Second) - stopChan <- os.Interrupt -} From f9393cbbe8a72def1e79e1f7f8056c9efacd943e Mon Sep 17 00:00:00 2001 From: Luke Addison Date: Sun, 3 Dec 2023 11:32:17 +0000 Subject: [PATCH 03/11] Add ShutdownDelay field --- pkg/manager/signals/signal.go | 39 +++++-------------- pkg/manager/signals/signal_test.go | 61 +++++++++++++++++++++++------- pkg/webhook/server.go | 15 +++++++- 3 files changed, 71 insertions(+), 44 deletions(-) diff --git a/pkg/manager/signals/signal.go b/pkg/manager/signals/signal.go index 1c1e4901d0..a79cfb42df 100644 --- a/pkg/manager/signals/signal.go +++ b/pkg/manager/signals/signal.go @@ -20,45 +20,26 @@ import ( "context" "os" "os/signal" - "time" ) -var ( - onlyOneSignalHandler = make(chan struct{}) - // Define global signal channel for testing - signalCh = make(chan os.Signal, 2) -) +var onlyOneSignalHandler = make(chan struct{}) -// SetupSignalHandlerWithDelay registers for SIGTERM and SIGINT. A context is -// returned which is canceled on one of these signals after waiting for the -// specified delay. In particular, the delay can be used to give external -// Kubernetes controllers (such as kube-proxy) time to observe the termination -// of this manager before starting shutdown of any webhook servers to avoid -// receiving connection attempts after closing webhook listeners. If a second -// signal is caught, the program is terminated with exit code 1. -func SetupSignalHandlerWithDelay(delay time.Duration) context.Context { +// SetupSignalHandler registers for SIGTERM and SIGINT. A context is returned +// which is canceled on one of these signals. If a second signal is caught, the program +// is terminated with exit code 1. +func SetupSignalHandler() context.Context { close(onlyOneSignalHandler) // panics when called twice ctx, cancel := context.WithCancel(context.Background()) - signal.Notify(signalCh, shutdownSignals...) + c := make(chan os.Signal, 2) + signal.Notify(c, shutdownSignals...) go func() { - <-signalCh - // Cancel the context after delaying for the specified duration but - // avoid blocking if a second signal is caught - go func() { - <-time.After(delay) - cancel() - }() - <-signalCh + <-c + cancel() + <-c os.Exit(1) // second signal. Exit directly. }() return ctx } - -// SetupSignalHandler is a special case of SetupSignalHandlerWithDelay with no -// delay for backwards compatibility -func SetupSignalHandler() context.Context { - return SetupSignalHandlerWithDelay(time.Duration(0)) -} diff --git a/pkg/manager/signals/signal_test.go b/pkg/manager/signals/signal_test.go index 23378b5bcc..134937e012 100644 --- a/pkg/manager/signals/signal_test.go +++ b/pkg/manager/signals/signal_test.go @@ -17,7 +17,10 @@ limitations under the License. package signals import ( + "fmt" "os" + "os/signal" + "sync" "time" . "github.com/onsi/ginkgo/v2" @@ -28,23 +31,53 @@ var _ = Describe("runtime signal", func() { Context("SignalHandler Test", func() { - It("test signal handler with delay", func() { - delay := time.Second - ctx := SetupSignalHandlerWithDelay(delay) + It("test signal handler", func() { + ctx := SetupSignalHandler() + task := &Task{ + ticker: time.NewTicker(time.Second * 2), + } + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + task.wg.Add(1) + go func(c chan os.Signal) { + defer task.wg.Done() + task.Run(c) + }(c) - // Save time before sending signal - beforeSendingSignal := time.Now() - - // Send signal - signalCh <- os.Interrupt - - _, ok := <-ctx.Done() - // Verify that the channel was closed - Expect(ok).To(BeFalse()) - // Verify that our delay was respected - Expect(time.Since(beforeSendingSignal)).To(BeNumerically(">=", delay)) + select { + case sig := <-c: + fmt.Printf("Got %s signal. Aborting...\n", sig) + case _, ok := <-ctx.Done(): + Expect(ok).To(BeFalse()) + } }) }) }) + +type Task struct { + wg sync.WaitGroup + ticker *time.Ticker +} + +func (t *Task) Run(c chan os.Signal) { + for { + go sendSignal(c) + handle() + } +} + +func handle() { + for i := 0; i < 5; i++ { + fmt.Print("#") + time.Sleep(time.Millisecond * 100) + } + fmt.Println() +} + +func sendSignal(stopChan chan os.Signal) { + fmt.Printf("...") + time.Sleep(1 * time.Second) + stopChan <- os.Interrupt +} diff --git a/pkg/webhook/server.go b/pkg/webhook/server.go index f8820e8b7c..1da3d848d7 100644 --- a/pkg/webhook/server.go +++ b/pkg/webhook/server.go @@ -101,6 +101,12 @@ type Options struct { // WebhookMux is the multiplexer that handles different webhooks. WebhookMux *http.ServeMux + + // ShutdownDelay delays server shutdown to wait for clients to stop opening + // new connections before closing server listeners. HTTP keep-alives are + // disabled during this time to allow persistent connections to be closed + // gracefully. Defaults to 0. + ShutdownDelay time.Duration } // NewServer constructs a new webhook.Server from the provided options. @@ -247,9 +253,16 @@ func (s *DefaultServer) Start(ctx context.Context) error { go func() { <-ctx.Done() log.Info("Shutting down webhook server with timeout of 1 minute") - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() + // Disable HTTP keep-alives to close persistent connections gracefully + srv.SetKeepAlivesEnabled(false) + // Wait for the specified shutdown delay or until the shutdown context + // expires, whichever happens first + select { + case <-time.After(s.Options.ShutdownDelay): + case <-ctx.Done(): + } if err := srv.Shutdown(ctx); err != nil { // Error from closing listeners, or context timeout log.Error(err, "error shutting down the HTTP server") From 12f986450dfd4e7a504f4aff02cc573e2248af94 Mon Sep 17 00:00:00 2001 From: Luke Addison Date: Sun, 3 Dec 2023 11:34:40 +0000 Subject: [PATCH 04/11] Replace newline --- pkg/webhook/server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/webhook/server.go b/pkg/webhook/server.go index 1da3d848d7..b4bb182860 100644 --- a/pkg/webhook/server.go +++ b/pkg/webhook/server.go @@ -253,6 +253,7 @@ func (s *DefaultServer) Start(ctx context.Context) error { go func() { <-ctx.Done() log.Info("Shutting down webhook server with timeout of 1 minute") + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() // Disable HTTP keep-alives to close persistent connections gracefully From 5c3c96edc09defaa163e17e74efce09236568543 Mon Sep 17 00:00:00 2001 From: Luke Addison Date: Sun, 3 Dec 2023 12:08:07 +0000 Subject: [PATCH 05/11] Do not disable keep-alives --- pkg/webhook/server.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/webhook/server.go b/pkg/webhook/server.go index b4bb182860..7a3c585ae1 100644 --- a/pkg/webhook/server.go +++ b/pkg/webhook/server.go @@ -103,9 +103,7 @@ type Options struct { WebhookMux *http.ServeMux // ShutdownDelay delays server shutdown to wait for clients to stop opening - // new connections before closing server listeners. HTTP keep-alives are - // disabled during this time to allow persistent connections to be closed - // gracefully. Defaults to 0. + // new connections before closing server listeners. Defaults to 0. ShutdownDelay time.Duration } @@ -256,8 +254,6 @@ func (s *DefaultServer) Start(ctx context.Context) error { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() - // Disable HTTP keep-alives to close persistent connections gracefully - srv.SetKeepAlivesEnabled(false) // Wait for the specified shutdown delay or until the shutdown context // expires, whichever happens first select { From 4bd1de4a608eb22da15c8f9f6ade8bbba7e3a30b Mon Sep 17 00:00:00 2001 From: Luke Addison Date: Sun, 3 Dec 2023 14:02:16 +0000 Subject: [PATCH 06/11] Address comments --- pkg/webhook/server.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/webhook/server.go b/pkg/webhook/server.go index 7a3c585ae1..21bc9a2041 100644 --- a/pkg/webhook/server.go +++ b/pkg/webhook/server.go @@ -251,9 +251,13 @@ func (s *DefaultServer) Start(ctx context.Context) error { go func() { <-ctx.Done() log.Info("Shutting down webhook server with timeout of 1 minute") - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() + // Disable HTTP keep-alives to close persistent connections after next + // HTTP request. Clients may reconnect until routes are updated and + // server listeners are closed however this should start gradually + // migrating clients to server instances that are not about to shutdown + srv.SetKeepAlivesEnabled(false) // Wait for the specified shutdown delay or until the shutdown context // expires, whichever happens first select { From 384d8ce13504fefd34a103c71a5b1baad1c75093 Mon Sep 17 00:00:00 2001 From: Luke Addison Date: Sun, 3 Dec 2023 21:41:43 +0000 Subject: [PATCH 07/11] Keep shutdown timeout --- pkg/webhook/server.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/pkg/webhook/server.go b/pkg/webhook/server.go index 21bc9a2041..f260d866db 100644 --- a/pkg/webhook/server.go +++ b/pkg/webhook/server.go @@ -250,20 +250,15 @@ func (s *DefaultServer) Start(ctx context.Context) error { idleConnsClosed := make(chan struct{}) go func() { <-ctx.Done() - log.Info("Shutting down webhook server with timeout of 1 minute") - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - defer cancel() // Disable HTTP keep-alives to close persistent connections after next // HTTP request. Clients may reconnect until routes are updated and // server listeners are closed however this should start gradually // migrating clients to server instances that are not about to shutdown srv.SetKeepAlivesEnabled(false) - // Wait for the specified shutdown delay or until the shutdown context - // expires, whichever happens first - select { - case <-time.After(s.Options.ShutdownDelay): - case <-ctx.Done(): - } + <-time.After(s.Options.ShutdownDelay) + log.Info("Shutting down webhook server with timeout of 1 minute") + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() if err := srv.Shutdown(ctx); err != nil { // Error from closing listeners, or context timeout log.Error(err, "error shutting down the HTTP server") From 2b4c651e03ff45e2ceeaf1e5e4033e13ec3cd551 Mon Sep 17 00:00:00 2001 From: Luke Addison Date: Sun, 3 Dec 2023 21:42:07 +0000 Subject: [PATCH 08/11] Add newline --- pkg/webhook/server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/webhook/server.go b/pkg/webhook/server.go index f260d866db..1356219da0 100644 --- a/pkg/webhook/server.go +++ b/pkg/webhook/server.go @@ -257,6 +257,7 @@ func (s *DefaultServer) Start(ctx context.Context) error { srv.SetKeepAlivesEnabled(false) <-time.After(s.Options.ShutdownDelay) log.Info("Shutting down webhook server with timeout of 1 minute") + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() if err := srv.Shutdown(ctx); err != nil { From 3529110db7a9f15488f4dd4528795153177dd51b Mon Sep 17 00:00:00 2001 From: Luke Addison Date: Sun, 3 Dec 2023 21:49:12 +0000 Subject: [PATCH 09/11] Improve comments --- pkg/webhook/server.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/webhook/server.go b/pkg/webhook/server.go index 1356219da0..4642407056 100644 --- a/pkg/webhook/server.go +++ b/pkg/webhook/server.go @@ -250,14 +250,16 @@ func (s *DefaultServer) Start(ctx context.Context) error { idleConnsClosed := make(chan struct{}) go func() { <-ctx.Done() + // Disable HTTP keep-alives to close persistent connections after next // HTTP request. Clients may reconnect until routes are updated and // server listeners are closed however this should start gradually // migrating clients to server instances that are not about to shutdown srv.SetKeepAlivesEnabled(false) + // Wait before shutting down webhook server <-time.After(s.Options.ShutdownDelay) - log.Info("Shutting down webhook server with timeout of 1 minute") + log.Info("Shutting down webhook server with timeout of 1 minute") ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() if err := srv.Shutdown(ctx); err != nil { From dca33cb17191a12aed2d0dc3353490e2dea2a518 Mon Sep 17 00:00:00 2001 From: Luke Addison Date: Sun, 3 Dec 2023 21:53:04 +0000 Subject: [PATCH 10/11] Add back newline --- pkg/webhook/server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/webhook/server.go b/pkg/webhook/server.go index 4642407056..d45c4f2ca4 100644 --- a/pkg/webhook/server.go +++ b/pkg/webhook/server.go @@ -260,6 +260,7 @@ func (s *DefaultServer) Start(ctx context.Context) error { <-time.After(s.Options.ShutdownDelay) log.Info("Shutting down webhook server with timeout of 1 minute") + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() if err := srv.Shutdown(ctx); err != nil { From 9351414034aa71b1a78b2f33b7a55603806dbaea Mon Sep 17 00:00:00 2001 From: Luke Addison Date: Tue, 12 Dec 2023 15:12:49 +0000 Subject: [PATCH 11/11] Remove unnecessary channel --- pkg/webhook/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/webhook/server.go b/pkg/webhook/server.go index d45c4f2ca4..6aeb5b8d92 100644 --- a/pkg/webhook/server.go +++ b/pkg/webhook/server.go @@ -257,7 +257,7 @@ func (s *DefaultServer) Start(ctx context.Context) error { // migrating clients to server instances that are not about to shutdown srv.SetKeepAlivesEnabled(false) // Wait before shutting down webhook server - <-time.After(s.Options.ShutdownDelay) + time.Sleep(s.Options.ShutdownDelay) log.Info("Shutting down webhook server with timeout of 1 minute")