Skip to content

Commit

Permalink
Added OffsetFetch support up to v5
Browse files Browse the repository at this point in the history
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
  • Loading branch information
mimaison and edoardocomar committed Dec 5, 2018
1 parent a4ef905 commit 6c7918e
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 35 deletions.
2 changes: 1 addition & 1 deletion mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int3
partitions = make(map[int32]*OffsetFetchResponseBlock)
topics[topic] = partitions
}
partitions[partition] = &OffsetFetchResponseBlock{offset, metadata, kerror}
partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
return mr
}

Expand Down
16 changes: 14 additions & 2 deletions offset_fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ type OffsetFetchRequest struct {
}

func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
if r.Version < 0 || r.Version > 2 {
if r.Version < 0 || r.Version > 5 {
return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
}

Expand Down Expand Up @@ -42,7 +42,7 @@ func (r *OffsetFetchRequest) decode(pd packetDecoder, version int16) (err error)
if err != nil {
return err
}
if partitionCount <= 0 {
if (partitionCount == 0 && version < 2) || partitionCount < 0 {
return nil
}
r.partitions = make(map[string][]int32)
Expand Down Expand Up @@ -74,11 +74,23 @@ func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
return V0_8_2_0
case 2:
return V0_10_2_0
case 3:
return V0_11_0_0
case 4:
return V2_0_0_0
case 5:
return V2_1_0_0
default:
return MinVersion
}
}

func (r *OffsetFetchRequest) ZeroPartitions() {
if r.partitions == nil && r.Version >= 2 {
r.partitions = make(map[string][]int32)
}
}

func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) {
if r.partitions == nil {
r.partitions = make(map[string][]int32)
Expand Down
33 changes: 23 additions & 10 deletions offset_fetch_request_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"fmt"
"testing"
)

Expand All @@ -25,18 +26,30 @@ var (
0xff, 0xff, 0xff, 0xff}
)

func TestOffsetFetchRequest(t *testing.T) {
request := new(OffsetFetchRequest)
testRequest(t, "no group, no partitions", request, offsetFetchRequestNoGroupNoPartitions)

request.ConsumerGroup = "blah"
testRequest(t, "no partitions", request, offsetFetchRequestNoPartitions)
func TestOffsetFetchRequestNoPartitions(t *testing.T) {
for version := 0; version <= 5; version++ {
request := new(OffsetFetchRequest)
request.Version = int16(version)
request.ZeroPartitions()
testRequest(t, fmt.Sprintf("no group, no partitions %d", version), request, offsetFetchRequestNoGroupNoPartitions)

request.AddPartition("topicTheFirst", 0x4F4F4F4F)
testRequest(t, "one partition", request, offsetFetchRequestOnePartition)
request.ConsumerGroup = "blah"
testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitions)
}
}
func TestOffsetFetchRequest(t *testing.T) {
for version := 0; version <= 5; version++ {
request := new(OffsetFetchRequest)
request.Version = int16(version)
request.ConsumerGroup = "blah"
request.AddPartition("topicTheFirst", 0x4F4F4F4F)
testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartition)
}
}

func TestOffsetFetchRequestAllPartitions(t *testing.T) {
requestV2 := &OffsetFetchRequest{Version: 2, ConsumerGroup: "blah"}
testRequest(t, "all partitions", requestV2, offsetFetchRequestAllPartitions)
for version := 2; version <= 5; version++ {
request := &OffsetFetchRequest{Version: int16(version), ConsumerGroup: "blah"}
testRequest(t, fmt.Sprintf("all partitions %d", version), request, offsetFetchRequestAllPartitions)
}
}
50 changes: 40 additions & 10 deletions offset_fetch_response.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
package sarama

type OffsetFetchResponseBlock struct {
Offset int64
Metadata string
Err KError
Offset int64
LeaderEpoch int32
Metadata string
Err KError
}

func (b *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) {
func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
b.Offset, err = pd.getInt64()
if err != nil {
return err
}

if version >= 5 {
b.LeaderEpoch, err = pd.getInt32()
if err != nil {
return err
}
}

b.Metadata, err = pd.getString()
if err != nil {
return err
Expand All @@ -26,9 +34,13 @@ func (b *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) {
return nil
}

func (b *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) {
func (b *OffsetFetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
pe.putInt64(b.Offset)

if version >= 5 {
pe.putInt32(b.LeaderEpoch)
}

err = pe.putString(b.Metadata)
if err != nil {
return err
Expand All @@ -40,12 +52,17 @@ func (b *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) {
}

type OffsetFetchResponse struct {
Version int16
Blocks map[string]map[int32]*OffsetFetchResponseBlock
Err KError
Version int16
ThrottleTimeMs int32
Blocks map[string]map[int32]*OffsetFetchResponseBlock
Err KError
}

func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
if r.Version >= 3 {
pe.putInt32(r.ThrottleTimeMs)
}

if err := pe.putArrayLength(len(r.Blocks)); err != nil {
return err
}
Expand All @@ -58,7 +75,7 @@ func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
}
for partition, block := range partitions {
pe.putInt32(partition)
if err := block.encode(pe); err != nil {
if err := block.encode(pe, r.Version); err != nil {
return err
}
}
Expand All @@ -72,6 +89,13 @@ func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error) {
r.Version = version

if version >= 3 {
r.ThrottleTimeMs, err = pd.getInt32()
if err != nil {
return err
}
}

numTopics, err := pd.getArrayLength()
if err != nil {
return err
Expand Down Expand Up @@ -103,7 +127,7 @@ func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error
}

block := new(OffsetFetchResponseBlock)
err = block.decode(pd)
err = block.decode(pd, version)
if err != nil {
return err
}
Expand Down Expand Up @@ -137,6 +161,12 @@ func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
return V0_8_2_0
case 2:
return V0_10_2_0
case 3:
return V0_11_0_0
case 4:
return V2_0_0_0
case 5:
return V2_1_0_0
default:
return MinVersion
}
Expand Down
54 changes: 42 additions & 12 deletions offset_fetch_response_test.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,65 @@
package sarama

import "testing"
import (
"fmt"
"testing"
)

var (
emptyOffsetFetchResponse = []byte{
0x00, 0x00, 0x00, 0x00}

emptyOffsetFetchResponseV2 = []byte{
0x00, 0x00, 0x00, 0x00,
0x00, 0x2A}

emptyOffsetFetchResponseV3 = []byte{
0x00, 0x00, 0x00, 0x09,
0x00, 0x00, 0x00, 0x00,
0x00, 0x2A}
)

func TestEmptyOffsetFetchResponse(t *testing.T) {
response := OffsetFetchResponse{}
testResponse(t, "empty", &response, emptyOffsetFetchResponse)
for version := 0; version <= 1; version++ {
response := OffsetFetchResponse{Version: int16(version)}
testResponse(t, fmt.Sprintf("empty v%d", version), &response, emptyOffsetFetchResponse)
}

responseV2 := OffsetFetchResponse{Version: 2, Err: ErrInvalidRequest}
testResponse(t, "emptyV2", &responseV2, emptyOffsetFetchResponseV2)
testResponse(t, "empty V2", &responseV2, emptyOffsetFetchResponseV2)

for version := 3; version <= 5; version++ {
responseV3 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9}
testResponse(t, fmt.Sprintf("empty v%d", version), &responseV3, emptyOffsetFetchResponseV3)
}
}

func TestNormalOffsetFetchResponse(t *testing.T) {
response := OffsetFetchResponse{}
response.AddBlock("t", 0, &OffsetFetchResponseBlock{0, "md", ErrRequestTimedOut})
response.Blocks["m"] = nil
// The response encoded form cannot be checked for it varies due to
// unpredictable map traversal order.
testResponse(t, "normal", &response, nil)
// Hence the 'nil' as byte[] parameter in the 'testResponse(..)' calls

for version := 0; version <= 1; version++ {
response := OffsetFetchResponse{Version: int16(version)}
response.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
response.Blocks["m"] = nil
testResponse(t, fmt.Sprintf("Normal v%d", version), &response, nil)
}

responseV2 := OffsetFetchResponse{Version: 2, Err: ErrInvalidRequest}
responseV2.AddBlock("t", 0, &OffsetFetchResponseBlock{0, "md", ErrRequestTimedOut})
responseV2.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
responseV2.Blocks["m"] = nil
// The response encoded form cannot be checked for it varies due to
// unpredictable map traversal order.
testResponse(t, "normalV2", &responseV2, nil)
testResponse(t, "normal V2", &responseV2, nil)

for version := 3; version <= 4; version++ {
responseV3 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9}
responseV3.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
responseV3.Blocks["m"] = nil
testResponse(t, fmt.Sprintf("Normal v%d", version), &responseV3, nil)
}

responseV5 := OffsetFetchResponse{Version: 5, Err: ErrInvalidRequest, ThrottleTimeMs: 9}
responseV5.AddBlock("t", 0, &OffsetFetchResponseBlock{Offset: 10, LeaderEpoch: 100, Metadata: "md", Err: ErrRequestTimedOut})
responseV5.Blocks["m"] = nil
testResponse(t, "normal V5", &responseV5, nil)
}

0 comments on commit 6c7918e

Please sign in to comment.