Skip to content

Commit

Permalink
Merge pull request #1605 from dnwe/fetch-v11-protocol
Browse files Browse the repository at this point in the history
feat: protocol support for V11 fetch w/ rackID
  • Loading branch information
dnwe authored Feb 11, 2020
2 parents 82c97b2 + 48ba0ca commit 4ba4c2f
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 12 deletions.
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,10 @@ type Config struct {
// debugging, and auditing purposes. Defaults to "sarama", but you should
// probably set it to something specific to your application.
ClientID string
// A rack identifier for this client. This can be any string value which
// indicates where this client is physically located.
// It corresponds with the broker config 'broker.rack'
RackID string
// The number of events to buffer in internal and external channels. This
// permits the producer and consumer to continue processing some messages
// in the background while user code is working, greatly improving throughput.
Expand Down
5 changes: 4 additions & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,10 +887,13 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
request.Version = 4
request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
}

if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
request.Version = 10
}
if bc.consumer.conf.Version.IsAtLeast(V2_3_0_0) {
request.Version = 11
request.RackID = bc.consumer.conf.RackID
}

for child := range bc.subscriptions {
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
Expand Down
17 changes: 14 additions & 3 deletions fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type FetchRequest struct {
SessionEpoch int32
blocks map[string]map[int32]*fetchRequestBlock
forgotten map[string][]int32
RackID string
}

type IsolationLevel int8
Expand Down Expand Up @@ -118,6 +119,12 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
}
}
}
if r.Version >= 11 {
err = pe.putString(r.RackID)
if err != nil {
return err
}
}

return nil
}
Expand Down Expand Up @@ -192,9 +199,6 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
if err != nil {
return err
}
if forgottenCount == 0 {
return nil
}
r.forgotten = make(map[string][]int32)
for i := 0; i < forgottenCount; i++ {
topic, err := pd.getString()
Expand All @@ -217,6 +221,13 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
}
}

if r.Version >= 11 {
r.RackID, err = pd.getString()
if err != nil {
return err
}
}

return nil
}

Expand Down
30 changes: 30 additions & 0 deletions fetch_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@ var (
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x56}

fetchRequestOneBlockV11 = []byte{
0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0xFF,
0x01,
0x00, 0x00, 0x00, 0xAA, // sessionID
0x00, 0x00, 0x00, 0xEE, // sessionEpoch
0x00, 0x00, 0x00, 0x01,
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x12, // partitionID
0xFF, 0xFF, 0xFF, 0xFF, // currentLeaderEpoch
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, // fetchOffset
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // logStartOffset
0x00, 0x00, 0x00, 0x56, // maxBytes
0x00, 0x00, 0x00, 0x00,
0x00, 0x06, 'r', 'a', 'c', 'k', '0', '1', // rackID
}
)

func TestFetchRequest(t *testing.T) {
Expand Down Expand Up @@ -57,4 +75,16 @@ func TestFetchRequest(t *testing.T) {
request.AddBlock("topic", 0x12, 0x34, 0x56)
testRequest(t, "one block v4", request, fetchRequestOneBlockV4)
})

t.Run("one block v11 rackid", func(t *testing.T) {
request := new(FetchRequest)
request.Version = 11
request.MaxBytes = 0xFF
request.Isolation = ReadCommitted
request.SessionID = 0xAA
request.SessionEpoch = 0xEE
request.AddBlock("topic", 0x12, 0x34, 0x56)
request.RackID = "rack01"
testRequest(t, "one block v11 rackid", request, fetchRequestOneBlockV11)
})
}
28 changes: 20 additions & 8 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
}

type FetchResponseBlock struct {
Err KError
HighWaterMarkOffset int64
LastStableOffset int64
LogStartOffset int64
AbortedTransactions []*AbortedTransaction
Records *Records // deprecated: use FetchResponseBlock.RecordsSet
RecordsSet []*Records
Partial bool
Err KError
HighWaterMarkOffset int64
LastStableOffset int64
LogStartOffset int64
AbortedTransactions []*AbortedTransaction
PreferredReadReplica int32
Records *Records // deprecated: use FetchResponseBlock.RecordsSet
RecordsSet []*Records
Partial bool
}

func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
Expand Down Expand Up @@ -83,6 +84,13 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
}
}

if version >= 11 {
b.PreferredReadReplica, err = pd.getInt32()
if err != nil {
return err
}
}

recordsSize, err := pd.getInt32()
if err != nil {
return err
Expand Down Expand Up @@ -188,6 +196,10 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error)
}
}

if version >= 11 {
pe.putInt32(b.PreferredReadReplica)
}

pe.push(&lengthField{})
for _, records := range b.RecordsSet {
err = records.encode(pe)
Expand Down
97 changes: 97 additions & 0 deletions fetch_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,31 @@ var (
0x00,
0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}

preferredReplicaFetchResponseV11 = []byte{
0x00, 0x00, 0x00, 0x00, // ThrottleTime
0x00, 0x02, // ErrorCode
0x00, 0x00, 0x00, 0xAC, // SessionID
0x00, 0x00, 0x00, 0x01, // Number of Topics
0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
0x00, 0x00, 0x00, 0x01, // Number of Partitions
0x00, 0x00, 0x00, 0x05, // Partition
0x00, 0x01, // Error
0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x09, // Last Stable Offset
0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 0x01, // Log Start Offset
0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
0x00, 0x00, 0x00, 0x03, // Preferred Read Replica
0x00, 0x00, 0x00, 0x1C,
// messageSet
0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
0x00, 0x00, 0x00, 0x10,
// message
0x23, 0x96, 0x4a, 0xf7, // CRC
0x00,
0x00,
0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
)

func TestEmptyFetchResponse(t *testing.T) {
Expand Down Expand Up @@ -398,3 +423,75 @@ func TestOneMessageFetchResponseV4(t *testing.T) {
t.Error("Decoding produced incorrect message value.")
}
}

func TestPreferredReplicaFetchResponseV11(t *testing.T) {
response := FetchResponse{}
testVersionDecodable(
t, "preferred replica fetch response v11", &response,
preferredReplicaFetchResponseV11, 11)

if response.ErrorCode != 0x0002 {
t.Fatal("Decoding produced incorrect error code.")
}

if response.SessionID != 0x000000AC {
t.Fatal("Decoding produced incorrect session ID.")
}

if len(response.Blocks) != 1 {
t.Fatal("Decoding produced incorrect number of topic blocks.")
}

if len(response.Blocks["topic"]) != 1 {
t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
}

block := response.GetBlock("topic", 5)
if block == nil {
t.Fatal("GetBlock didn't return block.")
}
if block.Err != ErrOffsetOutOfRange {
t.Error("Decoding didn't produce correct error code.")
}
if block.HighWaterMarkOffset != 0x10101010 {
t.Error("Decoding didn't produce correct high water mark offset.")
}
if block.LastStableOffset != 0x10101009 {
t.Error("Decoding didn't produce correct last stable offset.")
}
if block.LogStartOffset != 0x01010101 {
t.Error("Decoding didn't produce correct log start offset.")
}
if block.PreferredReadReplica != 0x0003 {
t.Error("Decoding didn't produce correct preferred read replica.")
}
partial, err := block.isPartial()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if partial {
t.Error("Decoding detected a partial trailing record where there wasn't one.")
}

n, err := block.numRecords()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if n != 1 {
t.Fatal("Decoding produced incorrect number of records.")
}
msgBlock := block.RecordsSet[0].MsgSet.Messages[0]
if msgBlock.Offset != 0x550000 {
t.Error("Decoding produced incorrect message offset.")
}
msg := msgBlock.Msg
if msg.Codec != CompressionNone {
t.Error("Decoding produced incorrect message compression.")
}
if msg.Key != nil {
t.Error("Decoding produced message key where there was none.")
}
if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
t.Error("Decoding produced incorrect message value.")
}
}

0 comments on commit 4ba4c2f

Please sign in to comment.