Skip to content

Commit

Permalink
fix(consumer): don't retry FindCoordinator forever (IBM#2427)
Browse files Browse the repository at this point in the history
If the consumer group hit an error finding the coordinator whilst
setting up a session it would end up retrying forever because the retry
count didn't get decremented and there was no exit of the loop. Fix that
up and add a unittest to cover the scenario

Fixes IBM#2426
  • Loading branch information
dnwe authored Jan 25, 2023
1 parent 6acb276 commit 66e60c7
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 1 deletion.
5 changes: 4 additions & 1 deletion consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,10 @@ func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, ha
if refreshCoordinator {
err := c.client.RefreshCoordinator(c.groupID)
if err != nil {
return c.retryNewSession(ctx, topics, handler, retries, true)
if retries <= 0 {
return nil, err
}
return c.retryNewSession(ctx, topics, handler, retries-1, true)
}
}

Expand Down
45 changes: 45 additions & 0 deletions consumer_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync"
"testing"
"time"

assert "github.com/stretchr/testify/require"
)

type handler struct {
Expand Down Expand Up @@ -200,3 +202,46 @@ outerFor:

cancel()
}

// TestConsumerGroupSessionDoesNotRetryForever ensures that an error fetching
// the coordinator decrements the retry attempts and doesn't end up retrying
// forever
func TestConsumerGroupSessionDoesNotRetryForever(t *testing.T) {
config := NewTestConfig()
config.ClientID = t.Name()
config.Version = V2_0_0_0
config.Consumer.Return.Errors = true
config.Consumer.Group.Rebalance.Retry.Max = 1
config.Consumer.Group.Rebalance.Retry.Backoff = 0

broker0 := NewMockBroker(t, 0)

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my-topic", 0, broker0.BrokerID()),
"FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).
SetError(CoordinatorGroup, "my-group", ErrGroupAuthorizationFailed),
})

group, err := NewConsumerGroup([]string{broker0.Addr()}, "my-group", config)
if err != nil {
t.Fatal(err)
}
defer func() { _ = group.Close() }()

ctx, cancel := context.WithCancel(context.Background())
h := &handler{t, cancel}

var wg sync.WaitGroup
wg.Add(1)

go func() {
topics := []string{"my-topic"}
err := group.Consume(ctx, topics, h)
assert.Error(t, err)
wg.Done()
}()

wg.Wait()
}

0 comments on commit 66e60c7

Please sign in to comment.