diff --git a/fetch_request.go b/fetch_request.go index 4db9ddd3d..836e6dec1 100644 --- a/fetch_request.go +++ b/fetch_request.go @@ -1,20 +1,41 @@ package sarama type fetchRequestBlock struct { - fetchOffset int64 - maxBytes int32 + Version int16 + currentLeaderEpoch int32 + fetchOffset int64 + logStartOffset int64 + maxBytes int32 } -func (b *fetchRequestBlock) encode(pe packetEncoder) error { +func (b *fetchRequestBlock) encode(pe packetEncoder, version int16) error { + b.Version = version + if b.Version >= 9 { + pe.putInt32(b.currentLeaderEpoch) + } pe.putInt64(b.fetchOffset) + if b.Version >= 5 { + pe.putInt64(b.logStartOffset) + } pe.putInt32(b.maxBytes) return nil } -func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) { +func (b *fetchRequestBlock) decode(pd packetDecoder, version int16) (err error) { + b.Version = version + if b.Version >= 9 { + if b.currentLeaderEpoch, err = pd.getInt32(); err != nil { + return err + } + } if b.fetchOffset, err = pd.getInt64(); err != nil { return err } + if b.Version >= 5 { + if b.logStartOffset, err = pd.getInt64(); err != nil { + return err + } + } if b.maxBytes, err = pd.getInt32(); err != nil { return err } @@ -25,12 +46,15 @@ func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) { // https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at // https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes type FetchRequest struct { - MaxWaitTime int32 - MinBytes int32 - MaxBytes int32 - Version int16 - Isolation IsolationLevel - blocks map[string]map[int32]*fetchRequestBlock + MaxWaitTime int32 + MinBytes int32 + MaxBytes int32 + Version int16 + Isolation IsolationLevel + SessionID int32 + SessionEpoch int32 + blocks map[string]map[int32]*fetchRequestBlock + forgotten map[string][]int32 } type IsolationLevel int8 @@ -50,6 +74,10 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) { if r.Version >= 4 { pe.putInt8(int8(r.Isolation)) } + if r.Version >= 7 { + pe.putInt32(r.SessionID) + pe.putInt32(r.SessionEpoch) + } err = pe.putArrayLength(len(r.blocks)) if err != nil { return err @@ -65,17 +93,38 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) { } for partition, block := range blocks { pe.putInt32(partition) - err = block.encode(pe) + err = block.encode(pe, r.Version) + if err != nil { + return err + } + } + } + if r.Version >= 7 { + err = pe.putArrayLength(len(r.forgotten)) + if err != nil { + return err + } + for topic, partitions := range r.forgotten { + err = pe.putString(topic) + if err != nil { + return err + } + err = pe.putArrayLength(len(partitions)) if err != nil { return err } + for _, partition := range partitions { + pe.putInt32(partition) + } } } + return nil } func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) { r.Version = version + if _, err = pd.getInt32(); err != nil { return err } @@ -97,6 +146,16 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) { } r.Isolation = IsolationLevel(isolation) } + if r.Version >= 7 { + r.SessionID, err = pd.getInt32() + if err != nil { + return err + } + r.SessionEpoch, err = pd.getInt32() + if err != nil { + return err + } + } topicCount, err := pd.getArrayLength() if err != nil { return err @@ -121,12 +180,43 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) { return err } fetchBlock := &fetchRequestBlock{} - if err = fetchBlock.decode(pd); err != nil { + if err = fetchBlock.decode(pd, r.Version); err != nil { return err } r.blocks[topic][partition] = fetchBlock } } + + if r.Version >= 7 { + forgottenCount, err := pd.getArrayLength() + if err != nil { + return err + } + if forgottenCount == 0 { + return nil + } + r.forgotten = make(map[string][]int32) + for i := 0; i < forgottenCount; i++ { + topic, err := pd.getString() + if err != nil { + return err + } + partitionCount, err := pd.getArrayLength() + if err != nil { + return err + } + r.forgotten[topic] = make([]int32, partitionCount) + + for j := 0; j < partitionCount; j++ { + partition, err := pd.getInt32() + if err != nil { + return err + } + r.forgotten[topic][j] = partition + } + } + } + return nil } @@ -140,16 +230,28 @@ func (r *FetchRequest) version() int16 { func (r *FetchRequest) requiredVersion() KafkaVersion { switch r.Version { + case 0: + return MinVersion case 1: return V0_9_0_0 case 2: return V0_10_0_0 case 3: return V0_10_1_0 - case 4: + case 4, 5: return V0_11_0_0 + case 6: + return V1_0_0_0 + case 7: + return V1_1_0_0 + case 8: + return V2_0_0_0 + case 9, 10: + return V2_1_0_0 + case 11: + return V2_3_0_0 default: - return MinVersion + return MaxVersion } } @@ -158,13 +260,21 @@ func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int r.blocks = make(map[string]map[int32]*fetchRequestBlock) } + if r.Version >= 7 && r.forgotten == nil { + r.forgotten = make(map[string][]int32) + } + if r.blocks[topic] == nil { r.blocks[topic] = make(map[int32]*fetchRequestBlock) } tmp := new(fetchRequestBlock) + tmp.Version = r.Version tmp.maxBytes = maxBytes tmp.fetchOffset = fetchOffset + if r.Version >= 9 { + tmp.currentLeaderEpoch = int32(-1) + } r.blocks[topic][partitionID] = tmp } diff --git a/fetch_request_test.go b/fetch_request_test.go index 1a94c2d1f..2fdd90509 100644 --- a/fetch_request_test.go +++ b/fetch_request_test.go @@ -29,20 +29,32 @@ var ( ) func TestFetchRequest(t *testing.T) { - request := new(FetchRequest) - testRequest(t, "no blocks", request, fetchRequestNoBlocks) - - request.MaxWaitTime = 0x20 - request.MinBytes = 0xEF - testRequest(t, "with properties", request, fetchRequestWithProperties) - - request.MaxWaitTime = 0 - request.MinBytes = 0 - request.AddBlock("topic", 0x12, 0x34, 0x56) - testRequest(t, "one block", request, fetchRequestOneBlock) - - request.Version = 4 - request.MaxBytes = 0xFF - request.Isolation = ReadCommitted - testRequest(t, "one block v4", request, fetchRequestOneBlockV4) + t.Run("no blocks", func(t *testing.T) { + request := new(FetchRequest) + testRequest(t, "no blocks", request, fetchRequestNoBlocks) + }) + + t.Run("with properties", func(t *testing.T) { + request := new(FetchRequest) + request.MaxWaitTime = 0x20 + request.MinBytes = 0xEF + testRequest(t, "with properties", request, fetchRequestWithProperties) + }) + + t.Run("one block", func(t *testing.T) { + request := new(FetchRequest) + request.MaxWaitTime = 0 + request.MinBytes = 0 + request.AddBlock("topic", 0x12, 0x34, 0x56) + testRequest(t, "one block", request, fetchRequestOneBlock) + }) + + t.Run("one block v4", func(t *testing.T) { + request := new(FetchRequest) + request.Version = 4 + request.MaxBytes = 0xFF + request.Isolation = ReadCommitted + request.AddBlock("topic", 0x12, 0x34, 0x56) + testRequest(t, "one block v4", request, fetchRequestOneBlockV4) + }) } diff --git a/fetch_response.go b/fetch_response.go index 3afc18778..11e4543ba 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -33,6 +33,7 @@ type FetchResponseBlock struct { Err KError HighWaterMarkOffset int64 LastStableOffset int64 + LogStartOffset int64 AbortedTransactions []*AbortedTransaction Records *Records // deprecated: use FetchResponseBlock.RecordsSet RecordsSet []*Records @@ -57,6 +58,13 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) return err } + if version >= 5 { + b.LogStartOffset, err = pd.getInt64() + if err != nil { + return err + } + } + numTransact, err := pd.getArrayLength() if err != nil { return err @@ -166,6 +174,10 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) if version >= 4 { pe.putInt64(b.LastStableOffset) + if version >= 5 { + pe.putInt64(b.LogStartOffset) + } + if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil { return err } @@ -200,7 +212,9 @@ func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction { type FetchResponse struct { Blocks map[string]map[int32]*FetchResponseBlock ThrottleTime time.Duration - Version int16 // v1 requires 0.9+, v2 requires 0.10+ + ErrorCode int16 + SessionID int32 + Version int16 LogAppendTime bool Timestamp time.Time } @@ -216,6 +230,17 @@ func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) { r.ThrottleTime = time.Duration(throttle) * time.Millisecond } + if r.Version >= 7 { + r.ErrorCode, err = pd.getInt16() + if err != nil { + return err + } + r.SessionID, err = pd.getInt32() + if err != nil { + return err + } + } + numTopics, err := pd.getArrayLength() if err != nil { return err @@ -258,6 +283,11 @@ func (r *FetchResponse) encode(pe packetEncoder) (err error) { pe.putInt32(int32(r.ThrottleTime / time.Millisecond)) } + if r.Version >= 7 { + pe.putInt16(r.ErrorCode) + pe.putInt32(r.SessionID) + } + err = pe.putArrayLength(len(r.Blocks)) if err != nil { return err @@ -296,16 +326,28 @@ func (r *FetchResponse) version() int16 { func (r *FetchResponse) requiredVersion() KafkaVersion { switch r.Version { + case 0: + return MinVersion case 1: return V0_9_0_0 case 2: return V0_10_0_0 case 3: return V0_10_1_0 - case 4: + case 4, 5: return V0_11_0_0 + case 6: + return V1_0_0_0 + case 7: + return V1_1_0_0 + case 8: + return V2_0_0_0 + case 9, 10: + return V2_1_0_0 + case 11: + return V2_3_0_0 default: - return MinVersion + return MaxVersion } } diff --git a/request.go b/request.go index 97437d67b..6e4ad8731 100644 --- a/request.go +++ b/request.go @@ -105,7 +105,7 @@ func allocateBody(key, version int16) protocolBody { case 0: return &ProduceRequest{} case 1: - return &FetchRequest{} + return &FetchRequest{Version: version} case 2: return &OffsetRequest{Version: version} case 3: