diff --git a/CHANGELOG.md b/CHANGELOG.md index e43610ab..8f9c0e44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Exponential backoffs at degenerately high job attempts (>= 310) no longer risk overflowing `time.Duration`. [PR #698](https://github.com/riverqueue/river/pull/698). + ## [0.14.3] - 2024-12-14 ### Changed diff --git a/job_executor.go b/job_executor.go index 899eaf5f..80d49dcf 100644 --- a/job_executor.go +++ b/job_executor.go @@ -385,6 +385,7 @@ func (e *jobExecutor) reportError(ctx context.Context, res *jobExecutorResult) { if nextRetryScheduledAt.Before(now) { e.Logger.WarnContext(ctx, e.Name+": Retry policy returned invalid next retry before current time; using default retry policy instead", + slog.Int("error_count", len(e.JobRow.Errors)+1), slog.Time("next_retry_scheduled_at", nextRetryScheduledAt), slog.Time("now", now), ) diff --git a/retry_policy.go b/retry_policy.go index f9f1f41a..9566cf1f 100644 --- a/retry_policy.go +++ b/retry_policy.go @@ -40,6 +40,10 @@ type DefaultClientRetryPolicy struct { // used instead of the attempt count. This means that snoozing a job (even // repeatedly) will not lead to a future error having a longer than expected // retry delay. +// +// At degenerately high retry counts (>= 310) the policy starts adding the +// equivalent of the maximum of time.Duration to each retry, about 292 years. +// The schedule is no longer exponential past this point. func (p *DefaultClientRetryPolicy) NextRetry(job *rivertype.JobRow) time.Time { // For the purposes of calculating the backoff, we can look solely at the // number of errors. If we were to use the raw attempt count, this would be @@ -49,6 +53,7 @@ func (p *DefaultClientRetryPolicy) NextRetry(job *rivertype.JobRow) time.Time { // Note that we explicitly add 1 here, because the error hasn't been appended // yet at the time this is called (that happens in the completer). errorCount := len(job.Errors) + 1 + return p.timeNowUTC().Add(timeutil.SecondsAsDuration(p.retrySeconds(errorCount))) } @@ -60,10 +65,25 @@ func (p *DefaultClientRetryPolicy) timeNowUTC() time.Time { return time.Now().UTC() } +// The maximum value of a duration before it overflows. About 292 years. +const maxDuration time.Duration = 1<<63 - 1 + +// Same as the above, but changed to a float represented in seconds. +var maxDurationSeconds = maxDuration.Seconds() //nolint:gochecknoglobals + // Gets a number of retry seconds for the given attempt, random jitter included. func (p *DefaultClientRetryPolicy) retrySeconds(attempt int) float64 { retrySeconds := p.retrySecondsWithoutJitter(attempt) + // After hitting maximum retry durations jitter is no longer applied because + // it might overflow time.Duration. That's okay though because so much + // jitter will already have been applied up to this point (jitter measured + // in decades) that jobs will no longer run anywhere near contemporaneously + // unless there's been considerable manual intervention. + if retrySeconds == maxDurationSeconds { + return maxDurationSeconds + } + // Jitter number of seconds +/- 10%. retrySeconds += retrySeconds * (rand.Float64()*0.2 - 0.1) @@ -71,6 +91,10 @@ func (p *DefaultClientRetryPolicy) retrySeconds(attempt int) float64 { } // Gets a base number of retry seconds for the given attempt, jitter excluded. +// If the number of seconds returned would overflow time.Duration if it were to +// be made one, returns the maximum number of seconds that can fit in a +// time.Duration instead, approximately 292 years. func (p *DefaultClientRetryPolicy) retrySecondsWithoutJitter(attempt int) float64 { - return math.Pow(float64(attempt), 4) + retrySeconds := math.Pow(float64(attempt), 4) + return min(retrySeconds, maxDurationSeconds) } diff --git a/retry_policy_test.go b/retry_policy_test.go index 43e23f12..f431f362 100644 --- a/retry_policy_test.go +++ b/retry_policy_test.go @@ -19,21 +19,68 @@ var _ ClientRetryPolicy = &DefaultClientRetryPolicy{} func TestDefaultClientRetryPolicy_NextRetry(t *testing.T) { t.Parallel() - now := time.Now().UTC() - timeNowFunc := func() time.Time { return now } - retryPolicy := &DefaultClientRetryPolicy{timeNowFunc: timeNowFunc} + type testBundle struct { + now time.Time + } - for attempt := 1; attempt < 10; attempt++ { - retrySecondsWithoutJitter := retryPolicy.retrySecondsWithoutJitter(attempt) - allowedDelta := timeutil.SecondsAsDuration(retrySecondsWithoutJitter * 0.2) - - nextRetryAt := retryPolicy.NextRetry(&rivertype.JobRow{ - Attempt: attempt, - AttemptedAt: &now, - Errors: make([]rivertype.AttemptError, attempt-1), - }) - require.WithinDuration(t, now.Add(timeutil.SecondsAsDuration(retrySecondsWithoutJitter)), nextRetryAt, allowedDelta) + setup := func(t *testing.T) (*DefaultClientRetryPolicy, *testBundle) { + t.Helper() + + var ( + now = time.Now().UTC() + timeNowFunc = func() time.Time { return now } + ) + + return &DefaultClientRetryPolicy{timeNowFunc: timeNowFunc}, &testBundle{ + now: now, + } } + + t.Run("Schedule", func(t *testing.T) { + t.Parallel() + + retryPolicy, bundle := setup(t) + + for attempt := 1; attempt < 10; attempt++ { + retrySecondsWithoutJitter := retryPolicy.retrySecondsWithoutJitter(attempt) + allowedDelta := timeutil.SecondsAsDuration(retrySecondsWithoutJitter * 0.2) + + nextRetryAt := retryPolicy.NextRetry(&rivertype.JobRow{ + Attempt: attempt, + AttemptedAt: &bundle.now, + Errors: make([]rivertype.AttemptError, attempt-1), + }) + require.WithinDuration(t, bundle.now.Add(timeutil.SecondsAsDuration(retrySecondsWithoutJitter)), nextRetryAt, allowedDelta) + } + }) + + t.Run("MaxRetryDuration", func(t *testing.T) { + t.Parallel() + + retryPolicy, bundle := setup(t) + + maxRetryDuration := timeutil.SecondsAsDuration(maxDurationSeconds) + + // First time the maximum will be hit. + require.Equal(t, + bundle.now.Add(maxRetryDuration), + retryPolicy.NextRetry(&rivertype.JobRow{ + Attempt: 310, + AttemptedAt: &bundle.now, + Errors: make([]rivertype.AttemptError, 310-1), + }), + ) + + // And will be hit on all subsequent attempts as well. + require.Equal(t, + bundle.now.Add(maxRetryDuration), + retryPolicy.NextRetry(&rivertype.JobRow{ + Attempt: 1_000, + AttemptedAt: &bundle.now, + Errors: make([]rivertype.AttemptError, 1_000-1), + }), + ) + }) } func TestDefaultRetryPolicy_retrySeconds(t *testing.T) { @@ -64,49 +111,75 @@ func TestDefaultRetryPolicy_retrySeconds(t *testing.T) { func TestDefaultRetryPolicy_retrySecondsWithoutJitter(t *testing.T) { t.Parallel() - retryPolicy := &DefaultClientRetryPolicy{} + type testBundle struct{} - day := 24 * time.Hour - - testCases := []struct { - attempt int - expectedRetry time.Duration - }{ - {attempt: 1, expectedRetry: 1 * time.Second}, - {attempt: 2, expectedRetry: 16 * time.Second}, - {attempt: 3, expectedRetry: 1*time.Minute + 21*time.Second}, - {attempt: 4, expectedRetry: 4*time.Minute + 16*time.Second}, - {attempt: 5, expectedRetry: 10*time.Minute + 25*time.Second}, - {attempt: 6, expectedRetry: 21*time.Minute + 36*time.Second}, - {attempt: 7, expectedRetry: 40*time.Minute + 1*time.Second}, - {attempt: 8, expectedRetry: 1*time.Hour + 8*time.Minute + 16*time.Second}, - {attempt: 9, expectedRetry: 1*time.Hour + 49*time.Minute + 21*time.Second}, - {attempt: 10, expectedRetry: 2*time.Hour + 46*time.Minute + 40*time.Second}, - {attempt: 11, expectedRetry: 4*time.Hour + 4*time.Minute + 1*time.Second}, - {attempt: 12, expectedRetry: 5*time.Hour + 45*time.Minute + 36*time.Second}, - {attempt: 13, expectedRetry: 7*time.Hour + 56*time.Minute + 1*time.Second}, - {attempt: 14, expectedRetry: 10*time.Hour + 40*time.Minute + 16*time.Second}, - {attempt: 15, expectedRetry: 14*time.Hour + 3*time.Minute + 45*time.Second}, - {attempt: 16, expectedRetry: 18*time.Hour + 12*time.Minute + 16*time.Second}, - {attempt: 17, expectedRetry: 23*time.Hour + 12*time.Minute + 1*time.Second}, - {attempt: 18, expectedRetry: 1*day + 5*time.Hour + 9*time.Minute + 36*time.Second}, - {attempt: 19, expectedRetry: 1*day + 12*time.Hour + 12*time.Minute + 1*time.Second}, - {attempt: 20, expectedRetry: 1*day + 20*time.Hour + 26*time.Minute + 40*time.Second}, - {attempt: 21, expectedRetry: 2*day + 6*time.Hour + 1*time.Minute + 21*time.Second}, - {attempt: 22, expectedRetry: 2*day + 17*time.Hour + 4*time.Minute + 16*time.Second}, - {attempt: 23, expectedRetry: 3*day + 5*time.Hour + 44*time.Minute + 1*time.Second}, - {attempt: 24, expectedRetry: 3*day + 20*time.Hour + 9*time.Minute + 36*time.Second}, - } - for _, tt := range testCases { - tt := tt - t.Run(fmt.Sprintf("Attempt_%02d", tt.attempt), func(t *testing.T) { - t.Parallel() - - require.Equal(t, - tt.expectedRetry, - time.Duration(retryPolicy.retrySecondsWithoutJitter(tt.attempt))*time.Second) - }) + setup := func(t *testing.T) (*DefaultClientRetryPolicy, *testBundle) { //nolint:unparam + t.Helper() + + return &DefaultClientRetryPolicy{}, &testBundle{} } + + t.Run("Schedule", func(t *testing.T) { + t.Parallel() + + retryPolicy, _ := setup(t) + + day := 24 * time.Hour + + testCases := []struct { + attempt int + expectedRetry time.Duration + }{ + {attempt: 1, expectedRetry: 1 * time.Second}, + {attempt: 2, expectedRetry: 16 * time.Second}, + {attempt: 3, expectedRetry: 1*time.Minute + 21*time.Second}, + {attempt: 4, expectedRetry: 4*time.Minute + 16*time.Second}, + {attempt: 5, expectedRetry: 10*time.Minute + 25*time.Second}, + {attempt: 6, expectedRetry: 21*time.Minute + 36*time.Second}, + {attempt: 7, expectedRetry: 40*time.Minute + 1*time.Second}, + {attempt: 8, expectedRetry: 1*time.Hour + 8*time.Minute + 16*time.Second}, + {attempt: 9, expectedRetry: 1*time.Hour + 49*time.Minute + 21*time.Second}, + {attempt: 10, expectedRetry: 2*time.Hour + 46*time.Minute + 40*time.Second}, + {attempt: 11, expectedRetry: 4*time.Hour + 4*time.Minute + 1*time.Second}, + {attempt: 12, expectedRetry: 5*time.Hour + 45*time.Minute + 36*time.Second}, + {attempt: 13, expectedRetry: 7*time.Hour + 56*time.Minute + 1*time.Second}, + {attempt: 14, expectedRetry: 10*time.Hour + 40*time.Minute + 16*time.Second}, + {attempt: 15, expectedRetry: 14*time.Hour + 3*time.Minute + 45*time.Second}, + {attempt: 16, expectedRetry: 18*time.Hour + 12*time.Minute + 16*time.Second}, + {attempt: 17, expectedRetry: 23*time.Hour + 12*time.Minute + 1*time.Second}, + {attempt: 18, expectedRetry: 1*day + 5*time.Hour + 9*time.Minute + 36*time.Second}, + {attempt: 19, expectedRetry: 1*day + 12*time.Hour + 12*time.Minute + 1*time.Second}, + {attempt: 20, expectedRetry: 1*day + 20*time.Hour + 26*time.Minute + 40*time.Second}, + {attempt: 21, expectedRetry: 2*day + 6*time.Hour + 1*time.Minute + 21*time.Second}, + {attempt: 22, expectedRetry: 2*day + 17*time.Hour + 4*time.Minute + 16*time.Second}, + {attempt: 23, expectedRetry: 3*day + 5*time.Hour + 44*time.Minute + 1*time.Second}, + {attempt: 24, expectedRetry: 3*day + 20*time.Hour + 9*time.Minute + 36*time.Second}, + } + for _, tt := range testCases { + tt := tt + t.Run(fmt.Sprintf("Attempt_%02d", tt.attempt), func(t *testing.T) { + t.Parallel() + + require.Equal(t, + tt.expectedRetry, + time.Duration(retryPolicy.retrySecondsWithoutJitter(tt.attempt))*time.Second) + }) + } + }) + + t.Run("MaxDurationSeconds", func(t *testing.T) { + t.Parallel() + + retryPolicy, _ := setup(t) + + require.NotEqual(t, time.Duration(maxDurationSeconds)*time.Second, time.Duration(retryPolicy.retrySecondsWithoutJitter(309))*time.Second) + + // Attempt number 310 hits the ceiling, and we'll always hit it from thence on. + require.Equal(t, time.Duration(maxDuration.Seconds())*time.Second, time.Duration(retryPolicy.retrySecondsWithoutJitter(310))*time.Second) + require.Equal(t, time.Duration(maxDuration.Seconds())*time.Second, time.Duration(retryPolicy.retrySecondsWithoutJitter(311))*time.Second) + require.Equal(t, time.Duration(maxDuration.Seconds())*time.Second, time.Duration(retryPolicy.retrySecondsWithoutJitter(1_000))*time.Second) + require.Equal(t, time.Duration(maxDuration.Seconds())*time.Second, time.Duration(retryPolicy.retrySecondsWithoutJitter(1_000_000))*time.Second) + }) } func TestDefaultRetryPolicy_stress(t *testing.T) {