Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: protocol support for V11 fetch w/ rackID #1605

Merged
merged 1 commit into from
Feb 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,10 @@ type Config struct {
// debugging, and auditing purposes. Defaults to "sarama", but you should
// probably set it to something specific to your application.
ClientID string
// A rack identifier for this client. This can be any string value which
// indicates where this client is physically located.
// It corresponds with the broker config 'broker.rack'
RackID string
// The number of events to buffer in internal and external channels. This
// permits the producer and consumer to continue processing some messages
// in the background while user code is working, greatly improving throughput.
Expand Down
5 changes: 4 additions & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,10 +887,13 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
request.Version = 4
request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
}

if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
request.Version = 10
}
if bc.consumer.conf.Version.IsAtLeast(V2_3_0_0) {
request.Version = 11
request.RackID = bc.consumer.conf.RackID
}

for child := range bc.subscriptions {
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
Expand Down
17 changes: 14 additions & 3 deletions fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type FetchRequest struct {
SessionEpoch int32
blocks map[string]map[int32]*fetchRequestBlock
forgotten map[string][]int32
RackID string
}

type IsolationLevel int8
Expand Down Expand Up @@ -118,6 +119,12 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
}
}
}
if r.Version >= 11 {
err = pe.putString(r.RackID)
if err != nil {
return err
}
}

return nil
}
Expand Down Expand Up @@ -192,9 +199,6 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
if err != nil {
return err
}
if forgottenCount == 0 {
return nil
}
r.forgotten = make(map[string][]int32)
for i := 0; i < forgottenCount; i++ {
topic, err := pd.getString()
Expand All @@ -217,6 +221,13 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
}
}

if r.Version >= 11 {
r.RackID, err = pd.getString()
if err != nil {
return err
}
}

return nil
}

Expand Down
30 changes: 30 additions & 0 deletions fetch_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@ var (
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x56}

fetchRequestOneBlockV11 = []byte{
0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0xFF,
0x01,
0x00, 0x00, 0x00, 0xAA, // sessionID
0x00, 0x00, 0x00, 0xEE, // sessionEpoch
0x00, 0x00, 0x00, 0x01,
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x12, // partitionID
0xFF, 0xFF, 0xFF, 0xFF, // currentLeaderEpoch
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, // fetchOffset
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // logStartOffset
0x00, 0x00, 0x00, 0x56, // maxBytes
0x00, 0x00, 0x00, 0x00,
0x00, 0x06, 'r', 'a', 'c', 'k', '0', '1', // rackID
}
)

func TestFetchRequest(t *testing.T) {
Expand Down Expand Up @@ -57,4 +75,16 @@ func TestFetchRequest(t *testing.T) {
request.AddBlock("topic", 0x12, 0x34, 0x56)
testRequest(t, "one block v4", request, fetchRequestOneBlockV4)
})

t.Run("one block v11 rackid", func(t *testing.T) {
request := new(FetchRequest)
request.Version = 11
request.MaxBytes = 0xFF
request.Isolation = ReadCommitted
request.SessionID = 0xAA
request.SessionEpoch = 0xEE
request.AddBlock("topic", 0x12, 0x34, 0x56)
request.RackID = "rack01"
testRequest(t, "one block v11 rackid", request, fetchRequestOneBlockV11)
})
}
28 changes: 20 additions & 8 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
}

type FetchResponseBlock struct {
Err KError
HighWaterMarkOffset int64
LastStableOffset int64
LogStartOffset int64
AbortedTransactions []*AbortedTransaction
Records *Records // deprecated: use FetchResponseBlock.RecordsSet
RecordsSet []*Records
Partial bool
Err KError
HighWaterMarkOffset int64
LastStableOffset int64
LogStartOffset int64
AbortedTransactions []*AbortedTransaction
PreferredReadReplica int32
Records *Records // deprecated: use FetchResponseBlock.RecordsSet
RecordsSet []*Records
Partial bool
}

func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
Expand Down Expand Up @@ -83,6 +84,13 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
}
}

if version >= 11 {
b.PreferredReadReplica, err = pd.getInt32()
if err != nil {
return err
}
}

recordsSize, err := pd.getInt32()
if err != nil {
return err
Expand Down Expand Up @@ -188,6 +196,10 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error)
}
}

if version >= 11 {
pe.putInt32(b.PreferredReadReplica)
}

pe.push(&lengthField{})
for _, records := range b.RecordsSet {
err = records.encode(pe)
Expand Down
97 changes: 97 additions & 0 deletions fetch_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,31 @@ var (
0x00,
0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}

preferredReplicaFetchResponseV11 = []byte{
0x00, 0x00, 0x00, 0x00, // ThrottleTime
0x00, 0x02, // ErrorCode
0x00, 0x00, 0x00, 0xAC, // SessionID
0x00, 0x00, 0x00, 0x01, // Number of Topics
0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
0x00, 0x00, 0x00, 0x01, // Number of Partitions
0x00, 0x00, 0x00, 0x05, // Partition
0x00, 0x01, // Error
0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x09, // Last Stable Offset
0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 0x01, // Log Start Offset
0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
0x00, 0x00, 0x00, 0x03, // Preferred Read Replica
0x00, 0x00, 0x00, 0x1C,
// messageSet
0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
0x00, 0x00, 0x00, 0x10,
// message
0x23, 0x96, 0x4a, 0xf7, // CRC
0x00,
0x00,
0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
)

func TestEmptyFetchResponse(t *testing.T) {
Expand Down Expand Up @@ -398,3 +423,75 @@ func TestOneMessageFetchResponseV4(t *testing.T) {
t.Error("Decoding produced incorrect message value.")
}
}

func TestPreferredReplicaFetchResponseV11(t *testing.T) {
response := FetchResponse{}
testVersionDecodable(
t, "preferred replica fetch response v11", &response,
preferredReplicaFetchResponseV11, 11)

if response.ErrorCode != 0x0002 {
t.Fatal("Decoding produced incorrect error code.")
}

if response.SessionID != 0x000000AC {
t.Fatal("Decoding produced incorrect session ID.")
}

if len(response.Blocks) != 1 {
t.Fatal("Decoding produced incorrect number of topic blocks.")
}

if len(response.Blocks["topic"]) != 1 {
t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
}

block := response.GetBlock("topic", 5)
if block == nil {
t.Fatal("GetBlock didn't return block.")
}
if block.Err != ErrOffsetOutOfRange {
t.Error("Decoding didn't produce correct error code.")
}
if block.HighWaterMarkOffset != 0x10101010 {
t.Error("Decoding didn't produce correct high water mark offset.")
}
if block.LastStableOffset != 0x10101009 {
t.Error("Decoding didn't produce correct last stable offset.")
}
if block.LogStartOffset != 0x01010101 {
t.Error("Decoding didn't produce correct log start offset.")
}
if block.PreferredReadReplica != 0x0003 {
t.Error("Decoding didn't produce correct preferred read replica.")
}
partial, err := block.isPartial()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if partial {
t.Error("Decoding detected a partial trailing record where there wasn't one.")
}

n, err := block.numRecords()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if n != 1 {
t.Fatal("Decoding produced incorrect number of records.")
}
msgBlock := block.RecordsSet[0].MsgSet.Messages[0]
if msgBlock.Offset != 0x550000 {
t.Error("Decoding produced incorrect message offset.")
}
msg := msgBlock.Msg
if msg.Codec != CompressionNone {
t.Error("Decoding produced incorrect message compression.")
}
if msg.Key != nil {
t.Error("Decoding produced message key where there was none.")
}
if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
t.Error("Decoding produced incorrect message value.")
}
}