Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: track and supply leader epoch to FetchRequest #2389

Merged
merged 1 commit into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ type Client interface {
// topic/partition, as determined by querying the cluster metadata.
Leader(topic string, partitionID int32) (*Broker, error)

// LeaderAndEpoch returns the the leader and its epoch for the current
// topic/partition, as determined by querying the cluster metadata.
LeaderAndEpoch(topic string, partitionID int32) (*Broker, int32, error)

// Replicas returns the set of all replica IDs for the given partition.
Replicas(topic string, partitionID int32) ([]int32, error)

Expand Down Expand Up @@ -452,21 +456,25 @@ func (client *client) OfflineReplicas(topic string, partitionID int32) ([]int32,
}

func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
leader, _, err := client.LeaderAndEpoch(topic, partitionID)
return leader, err
}

func (client *client) LeaderAndEpoch(topic string, partitionID int32) (*Broker, int32, error) {
if client.Closed() {
return nil, ErrClosedClient
return nil, -1, ErrClosedClient
}

leader, err := client.cachedLeader(topic, partitionID)

leader, epoch, err := client.cachedLeader(topic, partitionID)
if leader == nil {
err = client.RefreshMetadata(topic)
if err != nil {
return nil, err
return nil, -1, err
}
leader, err = client.cachedLeader(topic, partitionID)
leader, epoch, err = client.cachedLeader(topic, partitionID)
}

return leader, err
return leader, epoch, err
}

func (client *client) RefreshBrokers(addrs []string) error {
Expand Down Expand Up @@ -848,7 +856,7 @@ func (client *client) setPartitionCache(topic string, partitionSet partitionType
return ret
}

func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, error) {
func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, int32, error) {
client.lock.RLock()
defer client.lock.RUnlock()

Expand All @@ -857,18 +865,18 @@ func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, er
metadata, ok := partitions[partitionID]
if ok {
if errors.Is(metadata.Err, ErrLeaderNotAvailable) {
return nil, ErrLeaderNotAvailable
return nil, -1, ErrLeaderNotAvailable
}
b := client.brokers[metadata.Leader]
if b == nil {
return nil, ErrLeaderNotAvailable
return nil, -1, ErrLeaderNotAvailable
}
_ = b.Open(client.conf)
return b, nil
return b, metadata.LeaderEpoch, nil
}
}

return nil, ErrUnknownTopicOrPartition
return nil, -1, ErrUnknownTopicOrPartition
}

func (client *client) getOffset(topic string, partitionID int32, time int64) (int64, error) {
Expand Down
29 changes: 18 additions & 11 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
feeder: make(chan *FetchResponse, 1),
leaderEpoch: invalidLeaderEpoch,
preferredReadReplica: invalidPreferredReplicaID,
trigger: make(chan none, 1),
dying: make(chan none),
Expand All @@ -175,9 +176,8 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
return nil, err
}

var leader *Broker
var err error
if leader, err = c.client.Leader(child.topic, child.partition); err != nil {
leader, epoch, err := c.client.LeaderAndEpoch(child.topic, child.partition)
if err != nil {
return nil, err
}

Expand All @@ -188,6 +188,7 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
go withRecover(child.dispatcher)
go withRecover(child.responseFeeder)

child.leaderEpoch = epoch
child.broker = c.refBrokerConsumer(leader)
child.broker.input <- child

Expand Down Expand Up @@ -400,6 +401,7 @@ type partitionConsumer struct {
errors chan *ConsumerError
feeder chan *FetchResponse

leaderEpoch int32
preferredReadReplica int32

trigger, dying chan none
Expand Down Expand Up @@ -463,11 +465,11 @@ func (child *partitionConsumer) dispatcher() {
close(child.feeder)
}

func (child *partitionConsumer) preferredBroker() (*Broker, error) {
func (child *partitionConsumer) preferredBroker() (*Broker, int32, error) {
if child.preferredReadReplica >= 0 {
broker, err := child.consumer.client.Broker(child.preferredReadReplica)
if err == nil {
return broker, nil
return broker, child.leaderEpoch, nil
}
Logger.Printf(
"consumer/%s/%d failed to find active broker for preferred read replica %d - will fallback to leader",
Expand All @@ -480,21 +482,21 @@ func (child *partitionConsumer) preferredBroker() (*Broker, error) {
}

// if preferred replica cannot be found fallback to leader
return child.consumer.client.Leader(child.topic, child.partition)
return child.consumer.client.LeaderAndEpoch(child.topic, child.partition)
}

func (child *partitionConsumer) dispatch() error {
if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
return err
}

broker, err := child.preferredBroker()
broker, epoch, err := child.preferredBroker()
if err != nil {
return err
}

child.leaderEpoch = epoch
child.broker = child.consumer.refBrokerConsumer(broker)

child.broker.input <- child

return nil
Expand Down Expand Up @@ -987,7 +989,7 @@ func (bc *brokerConsumer) handleResponses() {
child.responseResult = nil

if result == nil {
if preferredBroker, err := child.preferredBroker(); err == nil {
if preferredBroker, _, err := child.preferredBroker(); err == nil {
if bc.broker.ID() != preferredBroker.ID() {
// not an error but needs redispatching to consume from preferred replica
Logger.Printf(
Expand All @@ -1014,7 +1016,12 @@ func (bc *brokerConsumer) handleResponses() {
Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
close(child.trigger)
delete(bc.subscriptions, child)
} else if errors.Is(result, ErrUnknownTopicOrPartition) || errors.Is(result, ErrNotLeaderForPartition) || errors.Is(result, ErrLeaderNotAvailable) || errors.Is(result, ErrReplicaNotAvailable) {
} else if errors.Is(result, ErrUnknownTopicOrPartition) ||
errors.Is(result, ErrNotLeaderForPartition) ||
errors.Is(result, ErrLeaderNotAvailable) ||
errors.Is(result, ErrReplicaNotAvailable) ||
errors.Is(result, ErrFencedLeaderEpoch) ||
errors.Is(result, ErrUnknownLeaderEpoch) {
// not an error, but does need redispatching
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
bc.broker.ID(), child.topic, child.partition, result)
Expand Down Expand Up @@ -1092,7 +1099,7 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {

for child := range bc.subscriptions {
if !child.IsPaused() {
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize, child.leaderEpoch)
}
}

Expand Down
4 changes: 2 additions & 2 deletions fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (r *FetchRequest) requiredVersion() KafkaVersion {
}
}

func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) {
func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32, leaderEpoch int32) {
if r.blocks == nil {
r.blocks = make(map[string]map[int32]*fetchRequestBlock)
}
Expand All @@ -320,7 +320,7 @@ func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int
tmp.maxBytes = maxBytes
tmp.fetchOffset = fetchOffset
if r.Version >= 9 {
tmp.currentLeaderEpoch = int32(-1)
tmp.currentLeaderEpoch = leaderEpoch
}

r.blocks[topic][partitionID] = tmp
Expand Down
10 changes: 5 additions & 5 deletions fetch_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var (
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x12, // partitionID
0xFF, 0xFF, 0xFF, 0xFF, // currentLeaderEpoch
0x00, 0x00, 0x00, 0x66, // currentLeaderEpoch
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, // fetchOffset
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // logStartOffset
0x00, 0x00, 0x00, 0x56, // maxBytes
Expand All @@ -67,7 +67,7 @@ func TestFetchRequest(t *testing.T) {
request := new(FetchRequest)
request.MaxWaitTime = 0
request.MinBytes = 0
request.AddBlock("topic", 0x12, 0x34, 0x56)
request.AddBlock("topic", 0x12, 0x34, 0x56, -1)
testRequest(t, "one block", request, fetchRequestOneBlock)
})

Expand All @@ -76,18 +76,18 @@ func TestFetchRequest(t *testing.T) {
request.Version = 4
request.MaxBytes = 0xFF
request.Isolation = ReadCommitted
request.AddBlock("topic", 0x12, 0x34, 0x56)
request.AddBlock("topic", 0x12, 0x34, 0x56, -1)
testRequest(t, "one block v4", request, fetchRequestOneBlockV4)
})

t.Run("one block v11 rackid", func(t *testing.T) {
t.Run("one block v11 rackid and leader epoch", func(t *testing.T) {
request := new(FetchRequest)
request.Version = 11
request.MaxBytes = 0xFF
request.Isolation = ReadCommitted
request.SessionID = 0xAA
request.SessionEpoch = 0xEE
request.AddBlock("topic", 0x12, 0x34, 0x56)
request.AddBlock("topic", 0x12, 0x34, 0x56, 0x66)
request.RackID = "rack01"
testRequest(t, "one block v11 rackid", request, fetchRequestOneBlockV11)
})
Expand Down
5 changes: 4 additions & 1 deletion fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import (
"github.com/rcrowley/go-metrics"
)

const invalidPreferredReplicaID = -1
const (
invalidLeaderEpoch = -1
invalidPreferredReplicaID = -1
)

type AbortedTransaction struct {
// ProducerID contains the producer id associated with the aborted transaction.
Expand Down