Skip to content

Commit

Permalink
Have DefaultClientRetryPolicy account for degenerately high attempt…
Browse files Browse the repository at this point in the history
… counts (#698)

This one's here to address #697, where's it been reported that it may be
possible for retry schedules to overflow to the past when reaching
degenerately high numbers of attempts. I couldn't reproduce the reported
problem, but it is possible for `time.Duration` to overflow, so here we
shore up `DefaultClientRetryPolicy` so that we're explicitly defining
what behavior should occur under these conditions.

The maximum length of time that can go in a `time.Duration` is about 292
years. Here's a sample program that demonstrates an overflow happening:

    func main() {
        const maxDuration time.Duration = 1<<63 - 1
        var maxDurationSeconds = float64(maxDuration / time.Second)

        notOverflowed := time.Duration(maxDurationSeconds) * time.Second
        fmt.Printf("not overflowed: %+v\n", notOverflowed)

        overflowed := time.Duration(maxDurationSeconds+1) * time.Second
        fmt.Printf("overflowed: %+v\n", overflowed)
    }

    not overflowed: 2562047h47m16s
    overflowed: -2562047h47m16.709551616s

Here, modify `DefaultClientRetryPolicy` so that if it were to return a
next retry duration greater than what would fit in `time.Duration`, we
just return 292 years instead. The maximum bound occurs at 310 retries,
so every retry after 310 increments by 292 years.

I didn't bother putting a maximum bound on the time returned by
`NextRetry` because the maximum `time.Time` in Go is somewhere in the
year 219250468, so even in 292 year increments, you'll never get there.
  • Loading branch information
brandur authored Dec 21, 2024
1 parent 13155b7 commit f524461
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 55 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Exponential backoffs at degenerately high job attempts (>= 310) no longer risk overflowing `time.Duration`. [PR #698](https://github.com/riverqueue/river/pull/698).

## [0.14.3] - 2024-12-14

### Changed
Expand Down
1 change: 1 addition & 0 deletions job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ func (e *jobExecutor) reportError(ctx context.Context, res *jobExecutorResult) {
if nextRetryScheduledAt.Before(now) {
e.Logger.WarnContext(ctx,
e.Name+": Retry policy returned invalid next retry before current time; using default retry policy instead",
slog.Int("error_count", len(e.JobRow.Errors)+1),
slog.Time("next_retry_scheduled_at", nextRetryScheduledAt),
slog.Time("now", now),
)
Expand Down
26 changes: 25 additions & 1 deletion retry_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ type DefaultClientRetryPolicy struct {
// used instead of the attempt count. This means that snoozing a job (even
// repeatedly) will not lead to a future error having a longer than expected
// retry delay.
//
// At degenerately high retry counts (>= 310) the policy starts adding the
// equivalent of the maximum of time.Duration to each retry, about 292 years.
// The schedule is no longer exponential past this point.
func (p *DefaultClientRetryPolicy) NextRetry(job *rivertype.JobRow) time.Time {
// For the purposes of calculating the backoff, we can look solely at the
// number of errors. If we were to use the raw attempt count, this would be
Expand All @@ -49,6 +53,7 @@ func (p *DefaultClientRetryPolicy) NextRetry(job *rivertype.JobRow) time.Time {
// Note that we explicitly add 1 here, because the error hasn't been appended
// yet at the time this is called (that happens in the completer).
errorCount := len(job.Errors) + 1

return p.timeNowUTC().Add(timeutil.SecondsAsDuration(p.retrySeconds(errorCount)))
}

Expand All @@ -60,17 +65,36 @@ func (p *DefaultClientRetryPolicy) timeNowUTC() time.Time {
return time.Now().UTC()
}

// The maximum value of a duration before it overflows. About 292 years.
const maxDuration time.Duration = 1<<63 - 1

// Same as the above, but changed to a float represented in seconds.
var maxDurationSeconds = maxDuration.Seconds() //nolint:gochecknoglobals

// Gets a number of retry seconds for the given attempt, random jitter included.
func (p *DefaultClientRetryPolicy) retrySeconds(attempt int) float64 {
retrySeconds := p.retrySecondsWithoutJitter(attempt)

// After hitting maximum retry durations jitter is no longer applied because
// it might overflow time.Duration. That's okay though because so much
// jitter will already have been applied up to this point (jitter measured
// in decades) that jobs will no longer run anywhere near contemporaneously
// unless there's been considerable manual intervention.
if retrySeconds == maxDurationSeconds {
return maxDurationSeconds
}

// Jitter number of seconds +/- 10%.
retrySeconds += retrySeconds * (rand.Float64()*0.2 - 0.1)

return retrySeconds
}

// Gets a base number of retry seconds for the given attempt, jitter excluded.
// If the number of seconds returned would overflow time.Duration if it were to
// be made one, returns the maximum number of seconds that can fit in a
// time.Duration instead, approximately 292 years.
func (p *DefaultClientRetryPolicy) retrySecondsWithoutJitter(attempt int) float64 {
return math.Pow(float64(attempt), 4)
retrySeconds := math.Pow(float64(attempt), 4)
return min(retrySeconds, maxDurationSeconds)
}
181 changes: 127 additions & 54 deletions retry_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,68 @@ var _ ClientRetryPolicy = &DefaultClientRetryPolicy{}
func TestDefaultClientRetryPolicy_NextRetry(t *testing.T) {
t.Parallel()

now := time.Now().UTC()
timeNowFunc := func() time.Time { return now }
retryPolicy := &DefaultClientRetryPolicy{timeNowFunc: timeNowFunc}
type testBundle struct {
now time.Time
}

for attempt := 1; attempt < 10; attempt++ {
retrySecondsWithoutJitter := retryPolicy.retrySecondsWithoutJitter(attempt)
allowedDelta := timeutil.SecondsAsDuration(retrySecondsWithoutJitter * 0.2)

nextRetryAt := retryPolicy.NextRetry(&rivertype.JobRow{
Attempt: attempt,
AttemptedAt: &now,
Errors: make([]rivertype.AttemptError, attempt-1),
})
require.WithinDuration(t, now.Add(timeutil.SecondsAsDuration(retrySecondsWithoutJitter)), nextRetryAt, allowedDelta)
setup := func(t *testing.T) (*DefaultClientRetryPolicy, *testBundle) {
t.Helper()

var (
now = time.Now().UTC()
timeNowFunc = func() time.Time { return now }
)

return &DefaultClientRetryPolicy{timeNowFunc: timeNowFunc}, &testBundle{
now: now,
}
}

t.Run("Schedule", func(t *testing.T) {
t.Parallel()

retryPolicy, bundle := setup(t)

for attempt := 1; attempt < 10; attempt++ {
retrySecondsWithoutJitter := retryPolicy.retrySecondsWithoutJitter(attempt)
allowedDelta := timeutil.SecondsAsDuration(retrySecondsWithoutJitter * 0.2)

nextRetryAt := retryPolicy.NextRetry(&rivertype.JobRow{
Attempt: attempt,
AttemptedAt: &bundle.now,
Errors: make([]rivertype.AttemptError, attempt-1),
})
require.WithinDuration(t, bundle.now.Add(timeutil.SecondsAsDuration(retrySecondsWithoutJitter)), nextRetryAt, allowedDelta)
}
})

t.Run("MaxRetryDuration", func(t *testing.T) {
t.Parallel()

retryPolicy, bundle := setup(t)

maxRetryDuration := timeutil.SecondsAsDuration(maxDurationSeconds)

// First time the maximum will be hit.
require.Equal(t,
bundle.now.Add(maxRetryDuration),
retryPolicy.NextRetry(&rivertype.JobRow{
Attempt: 310,
AttemptedAt: &bundle.now,
Errors: make([]rivertype.AttemptError, 310-1),
}),
)

// And will be hit on all subsequent attempts as well.
require.Equal(t,
bundle.now.Add(maxRetryDuration),
retryPolicy.NextRetry(&rivertype.JobRow{
Attempt: 1_000,
AttemptedAt: &bundle.now,
Errors: make([]rivertype.AttemptError, 1_000-1),
}),
)
})
}

func TestDefaultRetryPolicy_retrySeconds(t *testing.T) {
Expand Down Expand Up @@ -64,49 +111,75 @@ func TestDefaultRetryPolicy_retrySeconds(t *testing.T) {
func TestDefaultRetryPolicy_retrySecondsWithoutJitter(t *testing.T) {
t.Parallel()

retryPolicy := &DefaultClientRetryPolicy{}
type testBundle struct{}

day := 24 * time.Hour

testCases := []struct {
attempt int
expectedRetry time.Duration
}{
{attempt: 1, expectedRetry: 1 * time.Second},
{attempt: 2, expectedRetry: 16 * time.Second},
{attempt: 3, expectedRetry: 1*time.Minute + 21*time.Second},
{attempt: 4, expectedRetry: 4*time.Minute + 16*time.Second},
{attempt: 5, expectedRetry: 10*time.Minute + 25*time.Second},
{attempt: 6, expectedRetry: 21*time.Minute + 36*time.Second},
{attempt: 7, expectedRetry: 40*time.Minute + 1*time.Second},
{attempt: 8, expectedRetry: 1*time.Hour + 8*time.Minute + 16*time.Second},
{attempt: 9, expectedRetry: 1*time.Hour + 49*time.Minute + 21*time.Second},
{attempt: 10, expectedRetry: 2*time.Hour + 46*time.Minute + 40*time.Second},
{attempt: 11, expectedRetry: 4*time.Hour + 4*time.Minute + 1*time.Second},
{attempt: 12, expectedRetry: 5*time.Hour + 45*time.Minute + 36*time.Second},
{attempt: 13, expectedRetry: 7*time.Hour + 56*time.Minute + 1*time.Second},
{attempt: 14, expectedRetry: 10*time.Hour + 40*time.Minute + 16*time.Second},
{attempt: 15, expectedRetry: 14*time.Hour + 3*time.Minute + 45*time.Second},
{attempt: 16, expectedRetry: 18*time.Hour + 12*time.Minute + 16*time.Second},
{attempt: 17, expectedRetry: 23*time.Hour + 12*time.Minute + 1*time.Second},
{attempt: 18, expectedRetry: 1*day + 5*time.Hour + 9*time.Minute + 36*time.Second},
{attempt: 19, expectedRetry: 1*day + 12*time.Hour + 12*time.Minute + 1*time.Second},
{attempt: 20, expectedRetry: 1*day + 20*time.Hour + 26*time.Minute + 40*time.Second},
{attempt: 21, expectedRetry: 2*day + 6*time.Hour + 1*time.Minute + 21*time.Second},
{attempt: 22, expectedRetry: 2*day + 17*time.Hour + 4*time.Minute + 16*time.Second},
{attempt: 23, expectedRetry: 3*day + 5*time.Hour + 44*time.Minute + 1*time.Second},
{attempt: 24, expectedRetry: 3*day + 20*time.Hour + 9*time.Minute + 36*time.Second},
}
for _, tt := range testCases {
tt := tt
t.Run(fmt.Sprintf("Attempt_%02d", tt.attempt), func(t *testing.T) {
t.Parallel()

require.Equal(t,
tt.expectedRetry,
time.Duration(retryPolicy.retrySecondsWithoutJitter(tt.attempt))*time.Second)
})
setup := func(t *testing.T) (*DefaultClientRetryPolicy, *testBundle) { //nolint:unparam
t.Helper()

return &DefaultClientRetryPolicy{}, &testBundle{}
}

t.Run("Schedule", func(t *testing.T) {
t.Parallel()

retryPolicy, _ := setup(t)

day := 24 * time.Hour

testCases := []struct {
attempt int
expectedRetry time.Duration
}{
{attempt: 1, expectedRetry: 1 * time.Second},
{attempt: 2, expectedRetry: 16 * time.Second},
{attempt: 3, expectedRetry: 1*time.Minute + 21*time.Second},
{attempt: 4, expectedRetry: 4*time.Minute + 16*time.Second},
{attempt: 5, expectedRetry: 10*time.Minute + 25*time.Second},
{attempt: 6, expectedRetry: 21*time.Minute + 36*time.Second},
{attempt: 7, expectedRetry: 40*time.Minute + 1*time.Second},
{attempt: 8, expectedRetry: 1*time.Hour + 8*time.Minute + 16*time.Second},
{attempt: 9, expectedRetry: 1*time.Hour + 49*time.Minute + 21*time.Second},
{attempt: 10, expectedRetry: 2*time.Hour + 46*time.Minute + 40*time.Second},
{attempt: 11, expectedRetry: 4*time.Hour + 4*time.Minute + 1*time.Second},
{attempt: 12, expectedRetry: 5*time.Hour + 45*time.Minute + 36*time.Second},
{attempt: 13, expectedRetry: 7*time.Hour + 56*time.Minute + 1*time.Second},
{attempt: 14, expectedRetry: 10*time.Hour + 40*time.Minute + 16*time.Second},
{attempt: 15, expectedRetry: 14*time.Hour + 3*time.Minute + 45*time.Second},
{attempt: 16, expectedRetry: 18*time.Hour + 12*time.Minute + 16*time.Second},
{attempt: 17, expectedRetry: 23*time.Hour + 12*time.Minute + 1*time.Second},
{attempt: 18, expectedRetry: 1*day + 5*time.Hour + 9*time.Minute + 36*time.Second},
{attempt: 19, expectedRetry: 1*day + 12*time.Hour + 12*time.Minute + 1*time.Second},
{attempt: 20, expectedRetry: 1*day + 20*time.Hour + 26*time.Minute + 40*time.Second},
{attempt: 21, expectedRetry: 2*day + 6*time.Hour + 1*time.Minute + 21*time.Second},
{attempt: 22, expectedRetry: 2*day + 17*time.Hour + 4*time.Minute + 16*time.Second},
{attempt: 23, expectedRetry: 3*day + 5*time.Hour + 44*time.Minute + 1*time.Second},
{attempt: 24, expectedRetry: 3*day + 20*time.Hour + 9*time.Minute + 36*time.Second},
}
for _, tt := range testCases {
tt := tt
t.Run(fmt.Sprintf("Attempt_%02d", tt.attempt), func(t *testing.T) {
t.Parallel()

require.Equal(t,
tt.expectedRetry,
time.Duration(retryPolicy.retrySecondsWithoutJitter(tt.attempt))*time.Second)
})
}
})

t.Run("MaxDurationSeconds", func(t *testing.T) {
t.Parallel()

retryPolicy, _ := setup(t)

require.NotEqual(t, time.Duration(maxDurationSeconds)*time.Second, time.Duration(retryPolicy.retrySecondsWithoutJitter(309))*time.Second)

// Attempt number 310 hits the ceiling, and we'll always hit it from thence on.
require.Equal(t, time.Duration(maxDuration.Seconds())*time.Second, time.Duration(retryPolicy.retrySecondsWithoutJitter(310))*time.Second)
require.Equal(t, time.Duration(maxDuration.Seconds())*time.Second, time.Duration(retryPolicy.retrySecondsWithoutJitter(311))*time.Second)
require.Equal(t, time.Duration(maxDuration.Seconds())*time.Second, time.Duration(retryPolicy.retrySecondsWithoutJitter(1_000))*time.Second)
require.Equal(t, time.Duration(maxDuration.Seconds())*time.Second, time.Duration(retryPolicy.retrySecondsWithoutJitter(1_000_000))*time.Second)
})
}

func TestDefaultRetryPolicy_stress(t *testing.T) {
Expand Down

0 comments on commit f524461

Please sign in to comment.