diff --git a/mockresponses.go b/mockresponses.go index 172044199..fe55200c6 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -523,7 +523,7 @@ func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int3 partitions = make(map[int32]*OffsetFetchResponseBlock) topics[topic] = partitions } - partitions[partition] = &OffsetFetchResponseBlock{offset, metadata, kerror} + partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror} return mr } diff --git a/offset_fetch_request.go b/offset_fetch_request.go index 5a05014b4..68608241f 100644 --- a/offset_fetch_request.go +++ b/offset_fetch_request.go @@ -1,28 +1,33 @@ package sarama type OffsetFetchRequest struct { - ConsumerGroup string Version int16 + ConsumerGroup string partitions map[string][]int32 } func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) { - if r.Version < 0 || r.Version > 1 { + if r.Version < 0 || r.Version > 5 { return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"} } if err = pe.putString(r.ConsumerGroup); err != nil { return err } - if err = pe.putArrayLength(len(r.partitions)); err != nil { - return err - } - for topic, partitions := range r.partitions { - if err = pe.putString(topic); err != nil { + + if r.Version >= 2 && r.partitions == nil { + pe.putInt32(-1) + } else { + if err = pe.putArrayLength(len(r.partitions)); err != nil { return err } - if err = pe.putInt32Array(partitions); err != nil { - return err + for topic, partitions := range r.partitions { + if err = pe.putString(topic); err != nil { + return err + } + if err = pe.putInt32Array(partitions); err != nil { + return err + } } } return nil @@ -37,7 +42,7 @@ func (r *OffsetFetchRequest) decode(pd packetDecoder, version int16) (err error) if err != nil { return err } - if partitionCount == 0 { + if (partitionCount == 0 && version < 2) || partitionCount < 0 { return nil } r.partitions = make(map[string][]int32) @@ -67,11 +72,25 @@ func (r *OffsetFetchRequest) requiredVersion() KafkaVersion { switch r.Version { case 1: return V0_8_2_0 + case 2: + return V0_10_2_0 + case 3: + return V0_11_0_0 + case 4: + return V2_0_0_0 + case 5: + return V2_1_0_0 default: return MinVersion } } +func (r *OffsetFetchRequest) ZeroPartitions() { + if r.partitions == nil && r.Version >= 2 { + r.partitions = make(map[string][]int32) + } +} + func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) { if r.partitions == nil { r.partitions = make(map[string][]int32) diff --git a/offset_fetch_request_test.go b/offset_fetch_request_test.go index 025d725c9..55b46eea7 100644 --- a/offset_fetch_request_test.go +++ b/offset_fetch_request_test.go @@ -1,6 +1,9 @@ package sarama -import "testing" +import ( + "fmt" + "testing" +) var ( offsetFetchRequestNoGroupNoPartitions = []byte{ @@ -17,15 +20,36 @@ var ( 0x00, 0x0D, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't', 0x00, 0x00, 0x00, 0x01, 0x4F, 0x4F, 0x4F, 0x4F} + + offsetFetchRequestAllPartitions = []byte{ + 0x00, 0x04, 'b', 'l', 'a', 'h', + 0xff, 0xff, 0xff, 0xff} ) -func TestOffsetFetchRequest(t *testing.T) { - request := new(OffsetFetchRequest) - testRequest(t, "no group, no partitions", request, offsetFetchRequestNoGroupNoPartitions) +func TestOffsetFetchRequestNoPartitions(t *testing.T) { + for version := 0; version <= 5; version++ { + request := new(OffsetFetchRequest) + request.Version = int16(version) + request.ZeroPartitions() + testRequest(t, fmt.Sprintf("no group, no partitions %d", version), request, offsetFetchRequestNoGroupNoPartitions) - request.ConsumerGroup = "blah" - testRequest(t, "no partitions", request, offsetFetchRequestNoPartitions) + request.ConsumerGroup = "blah" + testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitions) + } +} +func TestOffsetFetchRequest(t *testing.T) { + for version := 0; version <= 5; version++ { + request := new(OffsetFetchRequest) + request.Version = int16(version) + request.ConsumerGroup = "blah" + request.AddPartition("topicTheFirst", 0x4F4F4F4F) + testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartition) + } +} - request.AddPartition("topicTheFirst", 0x4F4F4F4F) - testRequest(t, "one partition", request, offsetFetchRequestOnePartition) +func TestOffsetFetchRequestAllPartitions(t *testing.T) { + for version := 2; version <= 5; version++ { + request := &OffsetFetchRequest{Version: int16(version), ConsumerGroup: "blah"} + testRequest(t, fmt.Sprintf("all partitions %d", version), request, offsetFetchRequestAllPartitions) + } } diff --git a/offset_fetch_response.go b/offset_fetch_response.go index 11e4b1f3f..9e2570280 100644 --- a/offset_fetch_response.go +++ b/offset_fetch_response.go @@ -1,17 +1,25 @@ package sarama type OffsetFetchResponseBlock struct { - Offset int64 - Metadata string - Err KError + Offset int64 + LeaderEpoch int32 + Metadata string + Err KError } -func (b *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) { +func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err error) { b.Offset, err = pd.getInt64() if err != nil { return err } + if version >= 5 { + b.LeaderEpoch, err = pd.getInt32() + if err != nil { + return err + } + } + b.Metadata, err = pd.getString() if err != nil { return err @@ -26,9 +34,13 @@ func (b *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) { return nil } -func (b *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) { +func (b *OffsetFetchResponseBlock) encode(pe packetEncoder, version int16) (err error) { pe.putInt64(b.Offset) + if version >= 5 { + pe.putInt32(b.LeaderEpoch) + } + err = pe.putString(b.Metadata) if err != nil { return err @@ -40,10 +52,17 @@ func (b *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) { } type OffsetFetchResponse struct { - Blocks map[string]map[int32]*OffsetFetchResponseBlock + Version int16 + ThrottleTimeMs int32 + Blocks map[string]map[int32]*OffsetFetchResponseBlock + Err KError } func (r *OffsetFetchResponse) encode(pe packetEncoder) error { + if r.Version >= 3 { + pe.putInt32(r.ThrottleTimeMs) + } + if err := pe.putArrayLength(len(r.Blocks)); err != nil { return err } @@ -56,51 +75,73 @@ func (r *OffsetFetchResponse) encode(pe packetEncoder) error { } for partition, block := range partitions { pe.putInt32(partition) - if err := block.encode(pe); err != nil { + if err := block.encode(pe, r.Version); err != nil { return err } } } + if r.Version >= 2 { + pe.putInt16(int16(r.Err)) + } return nil } func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error) { - numTopics, err := pd.getArrayLength() - if err != nil || numTopics == 0 { - return err - } - - r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics) - for i := 0; i < numTopics; i++ { - name, err := pd.getString() - if err != nil { - return err - } + r.Version = version - numBlocks, err := pd.getArrayLength() + if version >= 3 { + r.ThrottleTimeMs, err = pd.getInt32() if err != nil { return err } + } - if numBlocks == 0 { - r.Blocks[name] = nil - continue - } - r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks) + numTopics, err := pd.getArrayLength() + if err != nil { + return err + } - for j := 0; j < numBlocks; j++ { - id, err := pd.getInt32() + if numTopics > 0 { + r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics) + for i := 0; i < numTopics; i++ { + name, err := pd.getString() if err != nil { return err } - block := new(OffsetFetchResponseBlock) - err = block.decode(pd) + numBlocks, err := pd.getArrayLength() if err != nil { return err } - r.Blocks[name][id] = block + + if numBlocks == 0 { + r.Blocks[name] = nil + continue + } + r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks) + + for j := 0; j < numBlocks; j++ { + id, err := pd.getInt32() + if err != nil { + return err + } + + block := new(OffsetFetchResponseBlock) + err = block.decode(pd, version) + if err != nil { + return err + } + r.Blocks[name][id] = block + } + } + } + + if version >= 2 { + kerr, err := pd.getInt16() + if err != nil { + return err } + r.Err = KError(kerr) } return nil @@ -111,11 +152,24 @@ func (r *OffsetFetchResponse) key() int16 { } func (r *OffsetFetchResponse) version() int16 { - return 0 + return r.Version } func (r *OffsetFetchResponse) requiredVersion() KafkaVersion { - return MinVersion + switch r.Version { + case 1: + return V0_8_2_0 + case 2: + return V0_10_2_0 + case 3: + return V0_11_0_0 + case 4: + return V2_0_0_0 + case 5: + return V2_1_0_0 + default: + return MinVersion + } } func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock { diff --git a/offset_fetch_response_test.go b/offset_fetch_response_test.go index 7614ae424..b564f70f9 100644 --- a/offset_fetch_response_test.go +++ b/offset_fetch_response_test.go @@ -1,22 +1,65 @@ package sarama -import "testing" +import ( + "fmt" + "testing" +) var ( emptyOffsetFetchResponse = []byte{ 0x00, 0x00, 0x00, 0x00} + + emptyOffsetFetchResponseV2 = []byte{ + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x2A} + + emptyOffsetFetchResponseV3 = []byte{ + 0x00, 0x00, 0x00, 0x09, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x2A} ) func TestEmptyOffsetFetchResponse(t *testing.T) { - response := OffsetFetchResponse{} - testResponse(t, "empty", &response, emptyOffsetFetchResponse) + for version := 0; version <= 1; version++ { + response := OffsetFetchResponse{Version: int16(version)} + testResponse(t, fmt.Sprintf("empty v%d", version), &response, emptyOffsetFetchResponse) + } + + responseV2 := OffsetFetchResponse{Version: 2, Err: ErrInvalidRequest} + testResponse(t, "empty V2", &responseV2, emptyOffsetFetchResponseV2) + + for version := 3; version <= 5; version++ { + responseV3 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9} + testResponse(t, fmt.Sprintf("empty v%d", version), &responseV3, emptyOffsetFetchResponseV3) + } } func TestNormalOffsetFetchResponse(t *testing.T) { - response := OffsetFetchResponse{} - response.AddBlock("t", 0, &OffsetFetchResponseBlock{0, "md", ErrRequestTimedOut}) - response.Blocks["m"] = nil // The response encoded form cannot be checked for it varies due to // unpredictable map traversal order. - testResponse(t, "normal", &response, nil) + // Hence the 'nil' as byte[] parameter in the 'testResponse(..)' calls + + for version := 0; version <= 1; version++ { + response := OffsetFetchResponse{Version: int16(version)} + response.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut}) + response.Blocks["m"] = nil + testResponse(t, fmt.Sprintf("Normal v%d", version), &response, nil) + } + + responseV2 := OffsetFetchResponse{Version: 2, Err: ErrInvalidRequest} + responseV2.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut}) + responseV2.Blocks["m"] = nil + testResponse(t, "normal V2", &responseV2, nil) + + for version := 3; version <= 4; version++ { + responseV3 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9} + responseV3.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut}) + responseV3.Blocks["m"] = nil + testResponse(t, fmt.Sprintf("Normal v%d", version), &responseV3, nil) + } + + responseV5 := OffsetFetchResponse{Version: 5, Err: ErrInvalidRequest, ThrottleTimeMs: 9} + responseV5.AddBlock("t", 0, &OffsetFetchResponseBlock{Offset: 10, LeaderEpoch: 100, Metadata: "md", Err: ErrRequestTimedOut}) + responseV5.Blocks["m"] = nil + testResponse(t, "normal V5", &responseV5, nil) }