Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: clear preferredReadReplica if broker shutdown #2108

Merged
merged 2 commits into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,6 @@ func (child *partitionConsumer) dispatcher() {
child.broker = nil
}

Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition)
if err := child.dispatch(); err != nil {
child.sendError(err)
child.trigger <- none{}
Expand All @@ -372,6 +371,14 @@ func (child *partitionConsumer) preferredBroker() (*Broker, error) {
if err == nil {
return broker, nil
}
Logger.Printf(
"consumer/%s/%d failed to find active broker for preferred read replica %d - will fallback to leader",
child.topic, child.partition, child.preferredReadReplica)

// if we couldn't find it, discard the replica preference and trigger a
// metadata refresh whilst falling back to consuming from the leader again
child.preferredReadReplica = invalidPreferredReplicaID
_ = child.consumer.client.RefreshMetadata(child.topic)
}

// if preferred replica cannot be found fallback to leader
Expand Down Expand Up @@ -856,6 +863,9 @@ func (bc *brokerConsumer) handleResponses() {
if preferredBroker, err := child.preferredBroker(); err == nil {
if bc.broker.ID() != preferredBroker.ID() {
// not an error but needs redispatching to consume from preferred replica
Logger.Printf(
"consumer/broker/%d abandoned in favor of preferred replica broker/%d\n",
bc.broker.ID(), preferredBroker.ID())
child.trigger <- none{}
delete(bc.subscriptions, child)
}
Expand All @@ -864,7 +874,7 @@ func (bc *brokerConsumer) handleResponses() {
}

// Discard any replica preference.
child.preferredReadReplica = -1
child.preferredReadReplica = invalidPreferredReplicaID

switch result {
case errTimedOut:
Expand Down
127 changes: 127 additions & 0 deletions functional_consumer_follower_fetch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
//go:build functional
// +build functional

package sarama

import (
"context"
"fmt"
"strconv"
"sync"
"testing"
"time"
)

func TestConsumerFetchFollowerFailover(t *testing.T) {
const (
topic = "test.1"
numMsg = 1000
)

newConfig := func() *Config {
config := NewConfig()
config.ClientID = t.Name()
config.Version = V2_8_0_0
config.Producer.Return.Successes = true
return config
}

config := newConfig()

// pick a partition and find the ID for one of the follower brokers
admin, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}
defer admin.Close()

metadata, err := admin.DescribeTopics([]string{topic})
if err != nil {
t.Fatal(err)
}
partition := metadata[0].Partitions[0]
leader := metadata[0].Partitions[0].Leader
follower := int32(-1)
for _, replica := range partition.Replicas {
if replica == leader {
continue
}
follower = replica
break
}

t.Logf("topic %s has leader kafka-%d and our chosen follower is kafka-%d", topic, leader, follower)

// match our clientID to the given broker so our requests should end up fetching from that follower
config.RackID = strconv.FormatInt(int64(follower), 10)

consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}

pc, err := consumer.ConsumePartition(topic, partition.ID, OffsetOldest)
if err != nil {
t.Fatal(err)
}
defer func() {
pc.Close()
consumer.Close()
}()

producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}
defer producer.Close()

var wg sync.WaitGroup
wg.Add(numMsg)

go func() {
for i := 0; i < numMsg; i++ {
msg := &ProducerMessage{
Topic: topic, Key: nil, Value: StringEncoder(fmt.Sprintf("%s %-3d", t.Name(), i))}
if _, offset, err := producer.SendMessage(msg); err != nil {
t.Error(i, err)
} else if offset%50 == 0 {
t.Logf("sent: %d\n", offset)
}
wg.Done()
time.Sleep(time.Millisecond * 25)
}
}()

i := 0

for ; i < numMsg/8; i++ {
msg := <-pc.Messages()
if msg.Offset%50 == 0 {
t.Logf("recv: %d\n", msg.Offset)
}
}

if err := stopDockerTestBroker(context.Background(), follower); err != nil {
t.Fatal(err)
}

for ; i < numMsg/3; i++ {
msg := <-pc.Messages()
if msg.Offset%50 == 0 {
t.Logf("recv: %d\n", msg.Offset)
}
}

if err := startDockerTestBroker(context.Background(), follower); err != nil {
t.Fatal(err)
}

for ; i < numMsg; i++ {
msg := <-pc.Messages()
if msg.Offset%50 == 0 {
t.Logf("recv: %d\n", msg.Offset)
}
}

wg.Wait()
}
23 changes: 23 additions & 0 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func existingEnvironment(ctx context.Context, env *testEnvironment) (bool, error
toxiproxyHost := toxiproxyURL.Hostname()

env.ToxiproxyClient = toxiproxy.NewClient(toxiproxyAddr)
env.Proxies = map[string]*toxiproxy.Proxy{}
for i := 1; i <= 5; i++ {
proxyName := fmt.Sprintf("kafka%d", i)
proxy, err := env.ToxiproxyClient.Proxy(proxyName)
Expand Down Expand Up @@ -262,6 +263,28 @@ func tearDownDockerTestEnvironment(ctx context.Context, env *testEnvironment) er
return nil
}

func startDockerTestBroker(ctx context.Context, brokerID int32) error {
service := fmt.Sprintf("kafka-%d", brokerID)
c := exec.Command("docker-compose", "start", service)
c.Stdout = os.Stdout
c.Stderr = os.Stderr
if err := c.Run(); err != nil {
return fmt.Errorf("failed to run docker-compose to start test broker kafka-%d: %w", brokerID, err)
}
return nil
}

func stopDockerTestBroker(ctx context.Context, brokerID int32) error {
service := fmt.Sprintf("kafka-%d", brokerID)
c := exec.Command("docker-compose", "stop", service)
c.Stdout = os.Stdout
c.Stderr = os.Stderr
if err := c.Run(); err != nil {
return fmt.Errorf("failed to run docker-compose to stop test broker kafka-%d: %w", brokerID, err)
}
return nil
}

func prepareTestTopics(ctx context.Context, env *testEnvironment) error {
Logger.Println("creating test topics")
var testTopicNames []string
Expand Down