Skip to content

Commit

Permalink
fix: fill in the Fetch{Request,Response} protocol
Browse files Browse the repository at this point in the history
In order to consume zstd-compressed records the consumer needs to send
and receive version 10 FetchRequest/FetchResponses, but they need to do
so in a well-formed manner that adheres to the encoding format.

Ref: https://kafka.apache.org/protocol

Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
  • Loading branch information
dnwe committed Jan 17, 2020
1 parent 7a7d874 commit 1799d6c
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 34 deletions.
138 changes: 124 additions & 14 deletions fetch_request.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
package sarama

type fetchRequestBlock struct {
fetchOffset int64
maxBytes int32
Version int16
currentLeaderEpoch int32
fetchOffset int64
logStartOffset int64
maxBytes int32
}

func (b *fetchRequestBlock) encode(pe packetEncoder) error {
func (b *fetchRequestBlock) encode(pe packetEncoder, version int16) error {
b.Version = version
if b.Version >= 9 {
pe.putInt32(b.currentLeaderEpoch)
}
pe.putInt64(b.fetchOffset)
if b.Version >= 5 {
pe.putInt64(b.logStartOffset)
}
pe.putInt32(b.maxBytes)
return nil
}

func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) {
func (b *fetchRequestBlock) decode(pd packetDecoder, version int16) (err error) {
b.Version = version
if b.Version >= 9 {
if b.currentLeaderEpoch, err = pd.getInt32(); err != nil {
return err
}
}
if b.fetchOffset, err = pd.getInt64(); err != nil {
return err
}
if b.Version >= 5 {
if b.logStartOffset, err = pd.getInt64(); err != nil {
return err
}
}
if b.maxBytes, err = pd.getInt32(); err != nil {
return err
}
Expand All @@ -25,12 +46,15 @@ func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) {
// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
type FetchRequest struct {
MaxWaitTime int32
MinBytes int32
MaxBytes int32
Version int16
Isolation IsolationLevel
blocks map[string]map[int32]*fetchRequestBlock
MaxWaitTime int32
MinBytes int32
MaxBytes int32
Version int16
Isolation IsolationLevel
SessionID int32
SessionEpoch int32
blocks map[string]map[int32]*fetchRequestBlock
forgotten map[string][]int32
}

type IsolationLevel int8
Expand All @@ -50,6 +74,10 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
if r.Version >= 4 {
pe.putInt8(int8(r.Isolation))
}
if r.Version >= 7 {
pe.putInt32(r.SessionID)
pe.putInt32(r.SessionEpoch)
}
err = pe.putArrayLength(len(r.blocks))
if err != nil {
return err
Expand All @@ -65,17 +93,38 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
}
for partition, block := range blocks {
pe.putInt32(partition)
err = block.encode(pe)
err = block.encode(pe, r.Version)
if err != nil {
return err
}
}
}
if r.Version >= 7 {
err = pe.putArrayLength(len(r.forgotten))
if err != nil {
return err
}
for topic, partitions := range r.forgotten {
err = pe.putString(topic)
if err != nil {
return err
}
err = pe.putArrayLength(len(partitions))
if err != nil {
return err
}
for _, partition := range partitions {
pe.putInt32(partition)
}
}
}

return nil
}

func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
r.Version = version

if _, err = pd.getInt32(); err != nil {
return err
}
Expand All @@ -97,6 +146,16 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
}
r.Isolation = IsolationLevel(isolation)
}
if r.Version >= 7 {
r.SessionID, err = pd.getInt32()
if err != nil {
return err
}
r.SessionEpoch, err = pd.getInt32()
if err != nil {
return err
}
}
topicCount, err := pd.getArrayLength()
if err != nil {
return err
Expand All @@ -121,12 +180,43 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
return err
}
fetchBlock := &fetchRequestBlock{}
if err = fetchBlock.decode(pd); err != nil {
if err = fetchBlock.decode(pd, r.Version); err != nil {
return err
}
r.blocks[topic][partition] = fetchBlock
}
}

if r.Version >= 7 {
forgottenCount, err := pd.getArrayLength()
if err != nil {
return err
}
if forgottenCount == 0 {
return nil
}
r.forgotten = make(map[string][]int32)
for i := 0; i < forgottenCount; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
partitionCount, err := pd.getArrayLength()
if err != nil {
return err
}
r.forgotten[topic] = make([]int32, partitionCount)

for j := 0; j < partitionCount; j++ {
partition, err := pd.getInt32()
if err != nil {
return err
}
r.forgotten[topic][j] = partition
}
}
}

return nil
}

Expand All @@ -140,16 +230,28 @@ func (r *FetchRequest) version() int16 {

func (r *FetchRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 0:
return MinVersion
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_10_1_0
case 4:
case 4, 5:
return V0_11_0_0
case 6:
return V1_0_0_0
case 7:
return V1_1_0_0
case 8:
return V2_0_0_0
case 9, 10:
return V2_1_0_0
case 11:
return V2_3_0_0
default:
return MinVersion
return MaxVersion
}
}

Expand All @@ -158,13 +260,21 @@ func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int
r.blocks = make(map[string]map[int32]*fetchRequestBlock)
}

if r.Version >= 7 && r.forgotten == nil {
r.forgotten = make(map[string][]int32)
}

if r.blocks[topic] == nil {
r.blocks[topic] = make(map[int32]*fetchRequestBlock)
}

tmp := new(fetchRequestBlock)
tmp.Version = r.Version
tmp.maxBytes = maxBytes
tmp.fetchOffset = fetchOffset
if r.Version >= 9 {
tmp.currentLeaderEpoch = int32(-1)
}

r.blocks[topic][partitionID] = tmp
}
44 changes: 28 additions & 16 deletions fetch_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,32 @@ var (
)

func TestFetchRequest(t *testing.T) {
request := new(FetchRequest)
testRequest(t, "no blocks", request, fetchRequestNoBlocks)

request.MaxWaitTime = 0x20
request.MinBytes = 0xEF
testRequest(t, "with properties", request, fetchRequestWithProperties)

request.MaxWaitTime = 0
request.MinBytes = 0
request.AddBlock("topic", 0x12, 0x34, 0x56)
testRequest(t, "one block", request, fetchRequestOneBlock)

request.Version = 4
request.MaxBytes = 0xFF
request.Isolation = ReadCommitted
testRequest(t, "one block v4", request, fetchRequestOneBlockV4)
t.Run("no blocks", func(t *testing.T) {
request := new(FetchRequest)
testRequest(t, "no blocks", request, fetchRequestNoBlocks)
})

t.Run("with properties", func(t *testing.T) {
request := new(FetchRequest)
request.MaxWaitTime = 0x20
request.MinBytes = 0xEF
testRequest(t, "with properties", request, fetchRequestWithProperties)
})

t.Run("one block", func(t *testing.T) {
request := new(FetchRequest)
request.MaxWaitTime = 0
request.MinBytes = 0
request.AddBlock("topic", 0x12, 0x34, 0x56)
testRequest(t, "one block", request, fetchRequestOneBlock)
})

t.Run("one block v4", func(t *testing.T) {
request := new(FetchRequest)
request.Version = 4
request.MaxBytes = 0xFF
request.Isolation = ReadCommitted
request.AddBlock("topic", 0x12, 0x34, 0x56)
testRequest(t, "one block v4", request, fetchRequestOneBlockV4)
})
}
48 changes: 45 additions & 3 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type FetchResponseBlock struct {
Err KError
HighWaterMarkOffset int64
LastStableOffset int64
LogStartOffset int64
AbortedTransactions []*AbortedTransaction
Records *Records // deprecated: use FetchResponseBlock.RecordsSet
RecordsSet []*Records
Expand All @@ -57,6 +58,13 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
return err
}

if version >= 5 {
b.LogStartOffset, err = pd.getInt64()
if err != nil {
return err
}
}

numTransact, err := pd.getArrayLength()
if err != nil {
return err
Expand Down Expand Up @@ -166,6 +174,10 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error)
if version >= 4 {
pe.putInt64(b.LastStableOffset)

if version >= 5 {
pe.putInt64(b.LogStartOffset)
}

if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
return err
}
Expand Down Expand Up @@ -200,7 +212,9 @@ func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
Version int16 // v1 requires 0.9+, v2 requires 0.10+
ErrorCode int16
SessionID int32
Version int16
LogAppendTime bool
Timestamp time.Time
}
Expand All @@ -216,6 +230,17 @@ func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
r.ThrottleTime = time.Duration(throttle) * time.Millisecond
}

if r.Version >= 7 {
r.ErrorCode, err = pd.getInt16()
if err != nil {
return err
}
r.SessionID, err = pd.getInt32()
if err != nil {
return err
}
}

numTopics, err := pd.getArrayLength()
if err != nil {
return err
Expand Down Expand Up @@ -258,6 +283,11 @@ func (r *FetchResponse) encode(pe packetEncoder) (err error) {
pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
}

if r.Version >= 7 {
pe.putInt16(r.ErrorCode)
pe.putInt32(r.SessionID)
}

err = pe.putArrayLength(len(r.Blocks))
if err != nil {
return err
Expand Down Expand Up @@ -296,16 +326,28 @@ func (r *FetchResponse) version() int16 {

func (r *FetchResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 0:
return MinVersion
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_10_1_0
case 4:
case 4, 5:
return V0_11_0_0
case 6:
return V1_0_0_0
case 7:
return V1_1_0_0
case 8:
return V2_0_0_0
case 9, 10:
return V2_1_0_0
case 11:
return V2_3_0_0
default:
return MinVersion
return MaxVersion
}
}

Expand Down
2 changes: 1 addition & 1 deletion request.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func allocateBody(key, version int16) protocolBody {
case 0:
return &ProduceRequest{}
case 1:
return &FetchRequest{}
return &FetchRequest{Version: version}
case 2:
return &OffsetRequest{Version: version}
case 3:
Expand Down

0 comments on commit 1799d6c

Please sign in to comment.