From c326e0a43ad498c93c7759d66736e5f96fa89cab Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 14 Aug 2023 16:03:43 -0400 Subject: [PATCH 1/8] fix multiple overflow errors in exponential backoff We use capped exponential backoff in several places in the code when handling failures. The code we've copy-and-pasted all over has a check to see if the backoff is greater than the limit, but this check happens after the bitshift and we always increment the number of attempts. This causes an overflow with a fairly small number of failures (ex. at one place I tested it occurs after only 24 iterations), resulting in a negative backoff which then never recovers. The backoff becomes a tight loop consuming resources and/or DoS'ing a Nomad RPC handler or an external API such as Vault. Note this doesn't occur in places where we cap the number of iterations so the loop breaks (usually to return an error), so long as the number of iterations is reasonable. Introduce a check on the cap before the bitshift to avoid overflow in all places this can occur. Fixes: #18199 --- .changelog/18200.txt | 3 +++ client/allocrunner/taskrunner/stats_hook.go | 11 ++++++---- client/allocrunner/taskrunner/vault_hook.go | 12 ++++++----- client/devicemanager/instance.go | 13 +++++++----- .../pluginmanager/drivermanager/instance.go | 21 +++++++++++-------- command/agent/consul/version_checker.go | 13 +++++++----- drivers/docker/driver.go | 3 ++- drivers/docker/stats.go | 11 +++++----- nomad/state/state_store.go | 9 +++++--- nomad/worker.go | 21 ++++++++++++------- 10 files changed, 73 insertions(+), 44 deletions(-) create mode 100644 .changelog/18200.txt diff --git a/.changelog/18200.txt b/.changelog/18200.txt new file mode 100644 index 000000000000..378043a657a4 --- /dev/null +++ b/.changelog/18200.txt @@ -0,0 +1,3 @@ +```release-note:bug +core: Fixed a bug where exponential backoff could result in excessive CPU usage +``` diff --git a/client/allocrunner/taskrunner/stats_hook.go b/client/allocrunner/taskrunner/stats_hook.go index 997d48d17a9b..21a5f79f2a84 100644 --- a/client/allocrunner/taskrunner/stats_hook.go +++ b/client/allocrunner/taskrunner/stats_hook.go @@ -128,6 +128,8 @@ MAIN: // It logs the errors with appropriate log levels; don't log returned error func (h *statsHook) callStatsWithRetry(ctx context.Context, handle interfaces.DriverStats) (<-chan *cstructs.TaskResourceUsage, error) { var retry int + backoff := time.Duration(0) + limit := time.Second * 5 MAIN: if ctx.Err() != nil { @@ -162,10 +164,11 @@ MAIN: h.logger.Error("failed to start stats collection for task", "error", err) } - limit := time.Second * 5 - backoff := 1 << (2 * uint64(retry)) * time.Second - if backoff > limit || retry > 5 { - backoff = limit + if backoff < limit { + backoff = 1 << (2 * uint64(retry)) * time.Second + if backoff > limit || retry > 5 { + backoff = limit + } } // Increment retry counter diff --git a/client/allocrunner/taskrunner/vault_hook.go b/client/allocrunner/taskrunner/vault_hook.go index 971a0f157937..5f5a48b56acd 100644 --- a/client/allocrunner/taskrunner/vault_hook.go +++ b/client/allocrunner/taskrunner/vault_hook.go @@ -312,6 +312,7 @@ OUTER: // returns the Vault token and whether the manager should exit. func (h *vaultHook) deriveVaultToken() (token string, exit bool) { attempts := 0 + backoff := time.Duration(0) for { tokens, err := h.client.DeriveToken(h.alloc, []string{h.taskName}) if err == nil { @@ -339,14 +340,15 @@ func (h *vaultHook) deriveVaultToken() (token string, exit bool) { } // Handle the retry case - backoff := (1 << (2 * uint64(attempts))) * vaultBackoffBaseline - if backoff > vaultBackoffLimit { - backoff = vaultBackoffLimit + if backoff < vaultBackoffLimit { + backoff = (1 << (2 * uint64(attempts))) * vaultBackoffBaseline + if backoff > vaultBackoffLimit { + backoff = vaultBackoffLimit + } + attempts++ } h.logger.Error("failed to derive Vault token", "error", err, "recoverable", true, "backoff", backoff) - attempts++ - // Wait till retrying select { case <-h.ctx.Done(): diff --git a/client/devicemanager/instance.go b/client/devicemanager/instance.go index 70b55cd80324..6043d800e931 100644 --- a/client/devicemanager/instance.go +++ b/client/devicemanager/instance.go @@ -470,6 +470,7 @@ START: var sresp *device.StatsResponse var ok bool + backoff := time.Duration(0) for { select { case <-i.ctx.Done(): @@ -494,12 +495,14 @@ START: goto START } - // Retry with an exponential backoff - backoff := (1 << (2 * uint64(attempt))) * statsBackoffBaseline - if backoff > statsBackoffLimit { - backoff = statsBackoffLimit + // Retry with an exponential backoff, avoiding overflow + if backoff < statsBackoffLimit { + backoff = (1 << (2 * uint64(attempt))) * statsBackoffBaseline + if backoff > statsBackoffLimit { + backoff = statsBackoffLimit + } + attempt++ } - attempt++ i.logger.Error("stats returned an error", "error", err, "retry", backoff) diff --git a/client/pluginmanager/drivermanager/instance.go b/client/pluginmanager/drivermanager/instance.go index 4d6febfb93db..036db966e128 100644 --- a/client/pluginmanager/drivermanager/instance.go +++ b/client/pluginmanager/drivermanager/instance.go @@ -329,12 +329,13 @@ func (i *instanceManager) fingerprint() { i.handleFingerprintError() // Calculate the new backoff - backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline - if backoff > driverFPBackoffLimit { - backoff = driverFPBackoffLimit + if backoff < driverFPBackoffLimit { + backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline + if backoff > driverFPBackoffLimit { + backoff = driverFPBackoffLimit + } + retry++ } - // Increment retry counter - retry++ continue } cancel() @@ -453,11 +454,13 @@ func (i *instanceManager) handleEvents() { i.logger.Warn("failed to receive task events, retrying", "error", err, "retry", retry) // Calculate the new backoff - backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline - if backoff > driverFPBackoffLimit { - backoff = driverFPBackoffLimit + if backoff < driverFPBackoffLimit { + backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline + if backoff > driverFPBackoffLimit { + backoff = driverFPBackoffLimit + } + retry++ } - retry++ continue } cancel() diff --git a/command/agent/consul/version_checker.go b/command/agent/consul/version_checker.go index e9cd465819dd..00dc468ad3f8 100644 --- a/command/agent/consul/version_checker.go +++ b/command/agent/consul/version_checker.go @@ -27,6 +27,7 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client Age timer, stop := helper.NewSafeTimer(limit) defer stop() + backoff := time.Duration(0) for { self, err := client.Self() @@ -40,11 +41,13 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client Age return } - backoff := (1 << (2 * i)) * baseline - if backoff > limit { - backoff = limit - } else { - i++ + if backoff < limit { + backoff = (1 << (2 * i)) * baseline + if backoff > limit { + backoff = limit + } else { + i++ + } } timer.Reset(backoff) diff --git a/drivers/docker/driver.go b/drivers/docker/driver.go index 51ef03b50bfb..2008dc037a45 100644 --- a/drivers/docker/driver.go +++ b/drivers/docker/driver.go @@ -572,7 +572,8 @@ START: return recoverableErrTimeouts(startErr) } -// nextBackoff returns appropriate docker backoff durations after attempted attempts. +// nextBackoff returns appropriate docker backoff durations after attempted +// attempts. Note that the caller must cap attempted so that it doesn't overflow func nextBackoff(attempted int) time.Duration { // attempts in 200ms, 800ms, 3.2s, 12.8s, 51.2s // TODO: add randomization factor and extract to a helper diff --git a/drivers/docker/stats.go b/drivers/docker/stats.go index 9e0bdf743dfa..127125aabea0 100644 --- a/drivers/docker/stats.go +++ b/drivers/docker/stats.go @@ -137,12 +137,13 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte h.logger.Debug("error collecting stats from container", "error", err) // Calculate the new backoff - backoff = (1 << (2 * uint64(retry))) * statsCollectorBackoffBaseline - if backoff > statsCollectorBackoffLimit { - backoff = statsCollectorBackoffLimit + if backoff < statsCollectorBackoffLimit { + backoff = (1 << (2 * uint64(retry))) * statsCollectorBackoffBaseline + if backoff > statsCollectorBackoffLimit { + backoff = statsCollectorBackoffLimit + } + retry++ } - // Increment retry counter - retry++ continue } // Stats finished either because context was canceled, doneCh was closed diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 0c80ff8a5c67..021dbca2800b 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -256,6 +256,7 @@ func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*State const backoffLimit = 1 * time.Second var retries uint var retryTimer *time.Timer + deadline := time.Duration(0) // XXX: Potential optimization is to set up a watch on the state // store's index table and only unblock via a trigger rather than @@ -279,9 +280,11 @@ func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*State retryTimer = time.NewTimer(backoffBase) } else { // Subsequent retry, reset timer - deadline := 1 << (2 * retries) * backoffBase - if deadline > backoffLimit { - deadline = backoffLimit + if deadline < backoffLimit { + deadline = 1 << (2 * retries) * backoffBase + if deadline > backoffLimit { + deadline = backoffLimit + } } retryTimer.Reset(deadline) } diff --git a/nomad/worker.go b/nomad/worker.go index 75f3691cffd5..e41be57df45f 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -109,8 +109,9 @@ type Worker struct { // failures is the count of errors encountered while dequeueing evaluations // and is used to calculate backoff. - failures uint - evalToken string + failures uint + failureBackoff time.Duration + evalToken string // snapshotIndex is the index of the snapshot in which the scheduler was // first invoked. It is used to mark the SnapshotIndex of evaluations @@ -133,6 +134,7 @@ func newWorker(ctx context.Context, srv *Server, args SchedulerWorkerPoolArgs) * start: time.Now(), status: WorkerStarting, enabledSchedulers: make([]string, len(args.EnabledSchedulers)), + failureBackoff: time.Duration(0), } copy(w.enabledSchedulers, args.EnabledSchedulers) @@ -874,12 +876,17 @@ func (w *Worker) shouldResubmit(err error) bool { // backoff if the server or the worker is shutdown. func (w *Worker) backoffErr(base, limit time.Duration) bool { w.setWorkloadStatus(WorkloadBackoff) - backoff := (1 << (2 * w.failures)) * base - if backoff > limit { - backoff = limit - } else { - w.failures++ + + backoff := w.failureBackoff + if w.failureBackoff < limit { + backoff = (1 << (2 * w.failures)) * base + if backoff > limit { + backoff = limit + } else { + w.failures++ + } } + select { case <-time.After(backoff): return false From 676d15abde0ae72efbeeef1125521ee22b143024 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 14 Aug 2023 16:58:53 -0400 Subject: [PATCH 2/8] address comments from code review --- nomad/state/state_store.go | 2 +- nomad/worker.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 021dbca2800b..9c4efb37b885 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -274,13 +274,13 @@ func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*State } // Exponential back off - retries++ if retryTimer == nil { // First retry, start at baseline retryTimer = time.NewTimer(backoffBase) } else { // Subsequent retry, reset timer if deadline < backoffLimit { + retries++ deadline = 1 << (2 * retries) * backoffBase if deadline > backoffLimit { deadline = backoffLimit diff --git a/nomad/worker.go b/nomad/worker.go index e41be57df45f..70b9d56fa2ae 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -884,6 +884,7 @@ func (w *Worker) backoffErr(base, limit time.Duration) bool { backoff = limit } else { w.failures++ + w.failureBackoff = backoff } } From 379a3ac5be6019903ad8129beb60a6a4ee012109 Mon Sep 17 00:00:00 2001 From: stswidwinski Date: Tue, 15 Aug 2023 06:11:21 +0800 Subject: [PATCH 3/8] Propose a more structured approach to computation of backoff. --- helper/backoff.go | 31 ++++++++++++++++++ helper/backoff_test.go | 72 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+) create mode 100644 helper/backoff.go create mode 100644 helper/backoff_test.go diff --git a/helper/backoff.go b/helper/backoff.go new file mode 100644 index 000000000000..e25e1f1c2b95 --- /dev/null +++ b/helper/backoff.go @@ -0,0 +1,31 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package helper + +import ( + "time" +) + +func Backoff(backoffBase time.Duration, backoffLimit time.Duration, attempt uint64) time.Duration { + const MaxUint = ^uint64(0) + const MaxInt = int64(MaxUint >> 1) + + // Ensure lack of non-positive backoffs since these make no sense + if backoffBase.Nanoseconds() <= 0 { + return max(backoffBase, 0*time.Second) + } + + // Ensure that a large attempt will not cause an overflow + if attempt > 62 || MaxInt/backoffBase.Nanoseconds() < (1< backoffLimit { + deadline = backoffLimit + } + + return deadline +} diff --git a/helper/backoff_test.go b/helper/backoff_test.go new file mode 100644 index 000000000000..29af33f7b1ec --- /dev/null +++ b/helper/backoff_test.go @@ -0,0 +1,72 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package helper + +import ( + "testing" + "time" + + "github.com/shoenig/test/must" +) + +func Test_Backoff(t *testing.T) { + const MaxUint = ^uint64(0) + const MaxInt = int64(MaxUint >> 1) + + cases := []struct { + name string + backoffBase time.Duration + backoffLimit time.Duration + attempt uint64 + expectedResult time.Duration + }{ + { + name: "backoff limit clamps for high base", + backoffBase: time.Hour, + backoffLimit: time.Minute, + attempt: 1, + expectedResult: time.Minute, + }, + { + name: "backoff limit clamps for boundary attempt", + backoffBase: time.Hour, + backoffLimit: time.Minute, + attempt: 63, + expectedResult: time.Minute, + }, + { + name: "small retry value", + backoffBase: time.Minute, + backoffLimit: time.Hour, + attempt: 0, + expectedResult: time.Minute, + }, + { + name: "first retry value", + backoffBase: time.Minute, + backoffLimit: time.Hour, + attempt: 1, + expectedResult: 2 * time.Minute, + }, + { + name: "fifth retry value", + backoffBase: time.Minute, + backoffLimit: time.Hour, + attempt: 5, + expectedResult: 32 * time.Minute, + }, + { + name: "sixth retry value", + backoffBase: time.Minute, + backoffLimit: time.Hour, + attempt: 6, + expectedResult: time.Hour, + }, + } + + for _, tc := range cases { + result := Backoff(tc.backoffBase, tc.backoffLimit, tc.attempt) + must.Eq(t, tc.expectedResult, result) + } +} From bc7f360f645037cf55ea387a654638b91b6b1f84 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 15 Aug 2023 10:06:35 -0400 Subject: [PATCH 4/8] incorporate helper from @stswidwinski --- client/allocrunner/taskrunner/stats_hook.go | 14 +++------ client/allocrunner/taskrunner/vault_hook.go | 15 ++++------ client/devicemanager/instance.go | 19 +++++------- .../pluginmanager/drivermanager/instance.go | 12 +++----- command/agent/consul/version_checker.go | 17 ++++------- drivers/docker/driver.go | 29 ++++++++++--------- drivers/docker/stats.go | 13 +++------ nomad/state/state_store.go | 14 ++++----- nomad/worker.go | 15 +++------- 9 files changed, 55 insertions(+), 93 deletions(-) diff --git a/client/allocrunner/taskrunner/stats_hook.go b/client/allocrunner/taskrunner/stats_hook.go index 21a5f79f2a84..41c57577c7ab 100644 --- a/client/allocrunner/taskrunner/stats_hook.go +++ b/client/allocrunner/taskrunner/stats_hook.go @@ -11,6 +11,7 @@ import ( hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" bstructs "github.com/hashicorp/nomad/plugins/base/structs" ) @@ -127,8 +128,8 @@ MAIN: // // It logs the errors with appropriate log levels; don't log returned error func (h *statsHook) callStatsWithRetry(ctx context.Context, handle interfaces.DriverStats) (<-chan *cstructs.TaskResourceUsage, error) { - var retry int - backoff := time.Duration(0) + var retry uint64 + var backoff time.Duration limit := time.Second * 5 MAIN: @@ -164,14 +165,7 @@ MAIN: h.logger.Error("failed to start stats collection for task", "error", err) } - if backoff < limit { - backoff = 1 << (2 * uint64(retry)) * time.Second - if backoff > limit || retry > 5 { - backoff = limit - } - } - - // Increment retry counter + backoff = helper.Backoff(time.Second, limit, retry) retry++ time.Sleep(backoff) diff --git a/client/allocrunner/taskrunner/vault_hook.go b/client/allocrunner/taskrunner/vault_hook.go index 5f5a48b56acd..7c43bee8e0ca 100644 --- a/client/allocrunner/taskrunner/vault_hook.go +++ b/client/allocrunner/taskrunner/vault_hook.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/nomad/client/allocrunner/interfaces" ti "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" "github.com/hashicorp/nomad/client/vaultclient" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -311,8 +312,8 @@ OUTER: // deriveVaultToken derives the Vault token using exponential backoffs. It // returns the Vault token and whether the manager should exit. func (h *vaultHook) deriveVaultToken() (token string, exit bool) { - attempts := 0 - backoff := time.Duration(0) + var attempts uint64 + var backoff time.Duration for { tokens, err := h.client.DeriveToken(h.alloc, []string{h.taskName}) if err == nil { @@ -340,13 +341,9 @@ func (h *vaultHook) deriveVaultToken() (token string, exit bool) { } // Handle the retry case - if backoff < vaultBackoffLimit { - backoff = (1 << (2 * uint64(attempts))) * vaultBackoffBaseline - if backoff > vaultBackoffLimit { - backoff = vaultBackoffLimit - } - attempts++ - } + backoff = helper.Backoff(vaultBackoffBaseline, vaultBackoffLimit, attempts) + attempts++ + h.logger.Error("failed to derive Vault token", "error", err, "recoverable", true, "backoff", backoff) // Wait till retrying diff --git a/client/devicemanager/instance.go b/client/devicemanager/instance.go index 6043d800e931..ac05a60fd791 100644 --- a/client/devicemanager/instance.go +++ b/client/devicemanager/instance.go @@ -11,6 +11,7 @@ import ( log "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/pluginutils/loader" "github.com/hashicorp/nomad/helper/pluginutils/singleton" "github.com/hashicorp/nomad/nomad/structs" @@ -450,7 +451,8 @@ func (i *instanceManager) handleFingerprint(f *device.FingerprintResponse) error // collectStats is a long lived goroutine for collecting device statistics. It // handles errors by backing off exponentially and retrying. func (i *instanceManager) collectStats() { - attempt := 0 + var attempt uint64 + var backoff time.Duration START: // Get a device plugin @@ -470,7 +472,6 @@ START: var sresp *device.StatsResponse var ok bool - backoff := time.Duration(0) for { select { case <-i.ctx.Done(): @@ -495,14 +496,9 @@ START: goto START } - // Retry with an exponential backoff, avoiding overflow - if backoff < statsBackoffLimit { - backoff = (1 << (2 * uint64(attempt))) * statsBackoffBaseline - if backoff > statsBackoffLimit { - backoff = statsBackoffLimit - } - attempt++ - } + // Retry with an exponential backoff + backoff = helper.Backoff(statsBackoffBaseline, statsBackoffLimit, attempt) + attempt++ i.logger.Error("stats returned an error", "error", err, "retry", backoff) @@ -514,8 +510,9 @@ START: } } - // Reset the attempt since we got statistics + // Reset the backoff since we got statistics attempt = 0 + backoff = 0 // Store the new stats if sresp.Groups != nil { diff --git a/client/pluginmanager/drivermanager/instance.go b/client/pluginmanager/drivermanager/instance.go index 036db966e128..06097f7adf5b 100644 --- a/client/pluginmanager/drivermanager/instance.go +++ b/client/pluginmanager/drivermanager/instance.go @@ -10,6 +10,7 @@ import ( "time" log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/pluginutils/loader" "github.com/hashicorp/nomad/helper/pluginutils/singleton" "github.com/hashicorp/nomad/nomad/structs" @@ -290,7 +291,7 @@ func (i *instanceManager) fingerprint() { // backoff and retry used if the RPC is closed by the other end var backoff time.Duration - var retry int + var retry uint64 for { if backoff > 0 { select { @@ -329,13 +330,8 @@ func (i *instanceManager) fingerprint() { i.handleFingerprintError() // Calculate the new backoff - if backoff < driverFPBackoffLimit { - backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline - if backoff > driverFPBackoffLimit { - backoff = driverFPBackoffLimit - } - retry++ - } + backoff = helper.Backoff(driverFPBackoffBaseline, driverFPBackoffLimit, retry) + retry++ continue } cancel() diff --git a/command/agent/consul/version_checker.go b/command/agent/consul/version_checker.go index 00dc468ad3f8..379f7afb83c7 100644 --- a/command/agent/consul/version_checker.go +++ b/command/agent/consul/version_checker.go @@ -23,11 +23,11 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client Age defer close(done) - i := uint64(0) - timer, stop := helper.NewSafeTimer(limit) defer stop() - backoff := time.Duration(0) + + var attempts uint64 + var backoff time.Duration for { self, err := client.Self() @@ -41,15 +41,8 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client Age return } - if backoff < limit { - backoff = (1 << (2 * i)) * baseline - if backoff > limit { - backoff = limit - } else { - i++ - } - } - + backoff = helper.Backoff(baseline, limit, attempts) + attempts++ timer.Reset(backoff) select { diff --git a/drivers/docker/driver.go b/drivers/docker/driver.go index 2008dc037a45..48ab548aae9d 100644 --- a/drivers/docker/driver.go +++ b/drivers/docker/driver.go @@ -476,7 +476,9 @@ type createContainerClient interface { func (d *Driver) createContainer(client createContainerClient, config docker.CreateContainerOptions, image string) (*docker.Container, error) { // Create a container - attempted := 0 + var attempted uint64 + var backoff time.Duration + CREATE: container, createErr := client.CreateContainer(config) if createErr == nil { @@ -525,17 +527,22 @@ CREATE: } if attempted < 5 { + backoff = helper.Backoff(50*time.Millisecond, time.Minute, attempted) attempted++ - time.Sleep(nextBackoff(attempted)) + time.Sleep(backoff) goto CREATE } + + if attempted < 5 { + attempted++ + } } else if strings.Contains(strings.ToLower(createErr.Error()), "no such image") { // There is still a very small chance this is possible even with the // coordinator so retry. return nil, nstructs.NewRecoverableError(createErr, true) } else if isDockerTransientError(createErr) && attempted < 5 { attempted++ - time.Sleep(nextBackoff(attempted)) + time.Sleep(backoff) goto CREATE } @@ -550,8 +557,9 @@ func (d *Driver) startContainer(c *docker.Container) error { return err } - // Start a container - attempted := 0 + var attempted uint64 + var backoff time.Duration + START: startErr := dockerClient.StartContainer(c.ID, c.HostConfig) if startErr == nil || strings.Contains(startErr.Error(), "Container already running") { @@ -563,7 +571,8 @@ START: if isDockerTransientError(startErr) { if attempted < 5 { attempted++ - time.Sleep(nextBackoff(attempted)) + backoff = helper.Backoff(50*time.Millisecond, time.Minute, attempted) + time.Sleep(backoff) goto START } return nstructs.NewRecoverableError(startErr, true) @@ -572,14 +581,6 @@ START: return recoverableErrTimeouts(startErr) } -// nextBackoff returns appropriate docker backoff durations after attempted -// attempts. Note that the caller must cap attempted so that it doesn't overflow -func nextBackoff(attempted int) time.Duration { - // attempts in 200ms, 800ms, 3.2s, 12.8s, 51.2s - // TODO: add randomization factor and extract to a helper - return 1 << (2 * uint64(attempted)) * 50 * time.Millisecond -} - // createImage creates a docker image either by pulling it from a registry or by // loading it from the file system func (d *Driver) createImage(task *drivers.TaskConfig, driverConfig *TaskConfig, client *docker.Client) (string, error) { diff --git a/drivers/docker/stats.go b/drivers/docker/stats.go index 127125aabea0..e7e9023d3664 100644 --- a/drivers/docker/stats.go +++ b/drivers/docker/stats.go @@ -96,8 +96,8 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte defer destCh.close() // backoff and retry used if the docker stats API returns an error - var backoff time.Duration = 0 - var retry int + var backoff time.Duration + var retry uint64 // create an interval timer timer, stop := helper.NewSafeTimer(backoff) @@ -137,13 +137,8 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte h.logger.Debug("error collecting stats from container", "error", err) // Calculate the new backoff - if backoff < statsCollectorBackoffLimit { - backoff = (1 << (2 * uint64(retry))) * statsCollectorBackoffBaseline - if backoff > statsCollectorBackoffLimit { - backoff = statsCollectorBackoffLimit - } - retry++ - } + backoff = helper.Backoff(statsCollectorBackoffBaseline, statsCollectorBackoffLimit, retry) + retry++ continue } // Stats finished either because context was canceled, doneCh was closed diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 9c4efb37b885..9d00248ba056 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -17,6 +17,7 @@ import ( "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-set" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/lib/lang" "github.com/hashicorp/nomad/nomad/stream" @@ -254,9 +255,9 @@ func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*State const backoffBase = 20 * time.Millisecond const backoffLimit = 1 * time.Second - var retries uint + var retries uint64 var retryTimer *time.Timer - deadline := time.Duration(0) + var deadline time.Duration // XXX: Potential optimization is to set up a watch on the state // store's index table and only unblock via a trigger rather than @@ -279,13 +280,8 @@ func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*State retryTimer = time.NewTimer(backoffBase) } else { // Subsequent retry, reset timer - if deadline < backoffLimit { - retries++ - deadline = 1 << (2 * retries) * backoffBase - if deadline > backoffLimit { - deadline = backoffLimit - } - } + deadline = helper.Backoff(backoffBase, backoffLimit, retries) + retries++ retryTimer.Reset(deadline) } diff --git a/nomad/worker.go b/nomad/worker.go index 70b9d56fa2ae..d9932d56df3e 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -16,6 +16,7 @@ import ( log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-version" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -109,7 +110,7 @@ type Worker struct { // failures is the count of errors encountered while dequeueing evaluations // and is used to calculate backoff. - failures uint + failures uint64 failureBackoff time.Duration evalToken string @@ -877,16 +878,8 @@ func (w *Worker) shouldResubmit(err error) bool { func (w *Worker) backoffErr(base, limit time.Duration) bool { w.setWorkloadStatus(WorkloadBackoff) - backoff := w.failureBackoff - if w.failureBackoff < limit { - backoff = (1 << (2 * w.failures)) * base - if backoff > limit { - backoff = limit - } else { - w.failures++ - w.failureBackoff = backoff - } - } + backoff := helper.Backoff(base, limit, w.failures) + w.failures++ select { case <-time.After(backoff): From 04dcb90842489a84600ced0bcb8f507dcffd43c0 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 15 Aug 2023 11:29:06 -0400 Subject: [PATCH 5/8] remove extraneous assignment --- drivers/docker/driver.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/drivers/docker/driver.go b/drivers/docker/driver.go index 48ab548aae9d..57ea93e121ef 100644 --- a/drivers/docker/driver.go +++ b/drivers/docker/driver.go @@ -533,9 +533,6 @@ CREATE: goto CREATE } - if attempted < 5 { - attempted++ - } } else if strings.Contains(strings.ToLower(createErr.Error()), "no such image") { // There is still a very small chance this is possible even with the // coordinator so retry. From dd009669731a1b9d35ae50703885a4cf5ba1294b Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 15 Aug 2023 11:37:20 -0400 Subject: [PATCH 6/8] missed a helper --- client/pluginmanager/drivermanager/instance.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/client/pluginmanager/drivermanager/instance.go b/client/pluginmanager/drivermanager/instance.go index 06097f7adf5b..642adad7111f 100644 --- a/client/pluginmanager/drivermanager/instance.go +++ b/client/pluginmanager/drivermanager/instance.go @@ -423,7 +423,7 @@ func (i *instanceManager) handleEvents() { } var backoff time.Duration - var retry int + var retry uint64 for { if backoff > 0 { select { @@ -450,13 +450,8 @@ func (i *instanceManager) handleEvents() { i.logger.Warn("failed to receive task events, retrying", "error", err, "retry", retry) // Calculate the new backoff - if backoff < driverFPBackoffLimit { - backoff = (1 << (2 * uint64(retry))) * driverFPBackoffBaseline - if backoff > driverFPBackoffLimit { - backoff = driverFPBackoffLimit - } - retry++ - } + backoff = helper.Backoff(driverFPBackoffBaseline, driverFPBackoffLimit, retry) + retry++ continue } cancel() From 1f92874a0ecdf7e55b71db7531bc845efdd81bad Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 15 Aug 2023 11:47:47 -0400 Subject: [PATCH 7/8] fix ineffectual assignment --- client/devicemanager/instance.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/client/devicemanager/instance.go b/client/devicemanager/instance.go index ac05a60fd791..350a31265608 100644 --- a/client/devicemanager/instance.go +++ b/client/devicemanager/instance.go @@ -510,9 +510,8 @@ START: } } - // Reset the backoff since we got statistics + // Reset the attempts since we got statistics attempt = 0 - backoff = 0 // Store the new stats if sresp.Groups != nil { From dd4649850ae042a536d788a20a5e321b4b446c0b Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 15 Aug 2023 13:47:17 -0400 Subject: [PATCH 8/8] fixes for docker driver --- drivers/docker/driver.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/drivers/docker/driver.go b/drivers/docker/driver.go index 57ea93e121ef..13f873c74564 100644 --- a/drivers/docker/driver.go +++ b/drivers/docker/driver.go @@ -527,8 +527,8 @@ CREATE: } if attempted < 5 { - backoff = helper.Backoff(50*time.Millisecond, time.Minute, attempted) attempted++ + backoff = helper.Backoff(50*time.Millisecond, time.Minute, attempted) time.Sleep(backoff) goto CREATE } @@ -539,6 +539,7 @@ CREATE: return nil, nstructs.NewRecoverableError(createErr, true) } else if isDockerTransientError(createErr) && attempted < 5 { attempted++ + backoff = helper.Backoff(50*time.Millisecond, time.Minute, attempted) time.Sleep(backoff) goto CREATE }