Skip to content

Commit

Permalink
Merge pull request #2517 from hindessm/mrh/fix-some-retry-issues
Browse files Browse the repository at this point in the history
Fix some retry issues
  • Loading branch information
dnwe authored Jul 23, 2023
2 parents 6ecdb50 + 39c18fc commit 08ff0ff
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 9 deletions.
12 changes: 5 additions & 7 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,19 +207,17 @@ func isErrNoController(err error) bool {
// provided retryable func) up to the maximum number of tries permitted by
// the admin client configuration
func (ca *clusterAdmin) retryOnError(retryable func(error) bool, fn func() error) error {
var err error
for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ {
err = fn()
if err == nil || !retryable(err) {
for attemptsRemaining := ca.conf.Admin.Retry.Max; ; {
err := fn()
attemptsRemaining--
if err == nil || attemptsRemaining == 0 || !retryable(err) {
return err
}
Logger.Printf(
"admin/request retrying after %dms... (%d attempts remaining)\n",
ca.conf.Admin.Retry.Backoff/time.Millisecond, ca.conf.Admin.Retry.Max-attempt)
ca.conf.Admin.Retry.Backoff/time.Millisecond, attemptsRemaining)
time.Sleep(ca.conf.Admin.Retry.Backoff)
continue
}
return err
}

func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
Expand Down
70 changes: 70 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1790,3 +1790,73 @@ func TestDescribeLogDirsUnknownBroker(t *testing.T) {
}
}
}

func Test_retryOnError(t *testing.T) {
testBackoffTime := 100 * time.Millisecond
config := NewTestConfig()
config.Version = V1_0_0_0
config.Admin.Retry.Max = 3
config.Admin.Retry.Backoff = testBackoffTime

admin := &clusterAdmin{conf: config}

t.Run("immediate success", func(t *testing.T) {
startTime := time.Now()
attempts := 0
err := admin.retryOnError(
func(error) bool { return true },
func() error {
attempts++
return nil
})
if err != nil {
t.Fatalf("expected no error but was %v", err)
}
if attempts != 1 {
t.Fatalf("expected 1 attempt to have been made but was %d", attempts)
}
if time.Since(startTime) >= testBackoffTime {
t.Fatalf("single attempt should take less than backoff time")
}
})

t.Run("immediate failure", func(t *testing.T) {
startTime := time.Now()
attempts := 0
err := admin.retryOnError(
func(error) bool { return false },
func() error {
attempts++
return errors.New("mock error")
})
if err == nil {
t.Fatalf("expected error but was nil")
}
if attempts != 1 {
t.Fatalf("expected 1 attempt to have been made but was %d", attempts)
}
if time.Since(startTime) >= testBackoffTime {
t.Fatalf("single attempt should take less than backoff time")
}
})

t.Run("failing all attempts", func(t *testing.T) {
startTime := time.Now()
attempts := 0
err := admin.retryOnError(
func(error) bool { return true },
func() error {
attempts++
return errors.New("mock error")
})
if err == nil {
t.Errorf("expected error but was nil")
}
if attempts != 3 {
t.Errorf("expected 3 attempts to have been made but was %d", attempts)
}
if time.Since(startTime) >= 3*testBackoffTime {
t.Errorf("attempt+sleep+attempt+sleep+attempt should take less than 3 * backoff time")
}
})
}
6 changes: 4 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,9 +979,10 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
if time.Since(time.Unix(t/1e3, 0)) < backoff {
return err
}
attemptsRemaining--
Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)

return client.tryRefreshMetadata(topics, attemptsRemaining-1, deadline)
return client.tryRefreshMetadata(topics, attemptsRemaining, deadline)
}
return err
}
Expand Down Expand Up @@ -1160,9 +1161,10 @@ func (client *client) findCoordinator(coordinatorKey string, coordinatorType Coo
retry := func(err error) (*FindCoordinatorResponse, error) {
if attemptsRemaining > 0 {
backoff := client.computeBackoff(attemptsRemaining)
attemptsRemaining--
Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
time.Sleep(backoff)
return client.findCoordinator(coordinatorKey, coordinatorType, attemptsRemaining-1)
return client.findCoordinator(coordinatorKey, coordinatorType, attemptsRemaining)
}
return nil, err
}
Expand Down

0 comments on commit 08ff0ff

Please sign in to comment.