Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support new OffsetFetch request/response #1198

Merged
merged 2 commits into from
Dec 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int3
partitions = make(map[int32]*OffsetFetchResponseBlock)
topics[topic] = partitions
}
partitions[partition] = &OffsetFetchResponseBlock{offset, metadata, kerror}
partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
return mr
}

Expand Down
39 changes: 29 additions & 10 deletions offset_fetch_request.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,33 @@
package sarama

type OffsetFetchRequest struct {
ConsumerGroup string
Version int16
ConsumerGroup string
partitions map[string][]int32
}

func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
if r.Version < 0 || r.Version > 1 {
if r.Version < 0 || r.Version > 5 {
return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
}

if err = pe.putString(r.ConsumerGroup); err != nil {
return err
}
if err = pe.putArrayLength(len(r.partitions)); err != nil {
return err
}
for topic, partitions := range r.partitions {
if err = pe.putString(topic); err != nil {

if r.Version >= 2 && r.partitions == nil {
pe.putInt32(-1)
} else {
if err = pe.putArrayLength(len(r.partitions)); err != nil {
return err
}
if err = pe.putInt32Array(partitions); err != nil {
return err
for topic, partitions := range r.partitions {
if err = pe.putString(topic); err != nil {
return err
}
if err = pe.putInt32Array(partitions); err != nil {
return err
}
}
}
return nil
Expand All @@ -37,7 +42,7 @@ func (r *OffsetFetchRequest) decode(pd packetDecoder, version int16) (err error)
if err != nil {
return err
}
if partitionCount == 0 {
if (partitionCount == 0 && version < 2) || partitionCount < 0 {
return nil
}
r.partitions = make(map[string][]int32)
Expand Down Expand Up @@ -67,11 +72,25 @@ func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
return V0_8_2_0
case 2:
return V0_10_2_0
case 3:
return V0_11_0_0
case 4:
return V2_0_0_0
case 5:
return V2_1_0_0
default:
return MinVersion
}
}

func (r *OffsetFetchRequest) ZeroPartitions() {
if r.partitions == nil && r.Version >= 2 {
r.partitions = make(map[string][]int32)
}
}

func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) {
if r.partitions == nil {
r.partitions = make(map[string][]int32)
Expand Down
40 changes: 32 additions & 8 deletions offset_fetch_request_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package sarama

import "testing"
import (
"fmt"
"testing"
)

var (
offsetFetchRequestNoGroupNoPartitions = []byte{
Expand All @@ -17,15 +20,36 @@ var (
0x00, 0x0D, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't',
0x00, 0x00, 0x00, 0x01,
0x4F, 0x4F, 0x4F, 0x4F}

offsetFetchRequestAllPartitions = []byte{
0x00, 0x04, 'b', 'l', 'a', 'h',
0xff, 0xff, 0xff, 0xff}
)

func TestOffsetFetchRequest(t *testing.T) {
request := new(OffsetFetchRequest)
testRequest(t, "no group, no partitions", request, offsetFetchRequestNoGroupNoPartitions)
func TestOffsetFetchRequestNoPartitions(t *testing.T) {
for version := 0; version <= 5; version++ {
request := new(OffsetFetchRequest)
request.Version = int16(version)
request.ZeroPartitions()
testRequest(t, fmt.Sprintf("no group, no partitions %d", version), request, offsetFetchRequestNoGroupNoPartitions)

request.ConsumerGroup = "blah"
testRequest(t, "no partitions", request, offsetFetchRequestNoPartitions)
request.ConsumerGroup = "blah"
testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitions)
}
}
func TestOffsetFetchRequest(t *testing.T) {
for version := 0; version <= 5; version++ {
request := new(OffsetFetchRequest)
request.Version = int16(version)
request.ConsumerGroup = "blah"
request.AddPartition("topicTheFirst", 0x4F4F4F4F)
testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartition)
}
}

request.AddPartition("topicTheFirst", 0x4F4F4F4F)
testRequest(t, "one partition", request, offsetFetchRequestOnePartition)
func TestOffsetFetchRequestAllPartitions(t *testing.T) {
for version := 2; version <= 5; version++ {
request := &OffsetFetchRequest{Version: int16(version), ConsumerGroup: "blah"}
testRequest(t, fmt.Sprintf("all partitions %d", version), request, offsetFetchRequestAllPartitions)
}
}
116 changes: 85 additions & 31 deletions offset_fetch_response.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
package sarama

type OffsetFetchResponseBlock struct {
Offset int64
Metadata string
Err KError
Offset int64
LeaderEpoch int32
Metadata string
Err KError
}

func (b *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) {
func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
b.Offset, err = pd.getInt64()
if err != nil {
return err
}

if version >= 5 {
b.LeaderEpoch, err = pd.getInt32()
if err != nil {
return err
}
}

b.Metadata, err = pd.getString()
if err != nil {
return err
Expand All @@ -26,9 +34,13 @@ func (b *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) {
return nil
}

func (b *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) {
func (b *OffsetFetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
pe.putInt64(b.Offset)

if version >= 5 {
pe.putInt32(b.LeaderEpoch)
}

err = pe.putString(b.Metadata)
if err != nil {
return err
Expand All @@ -40,10 +52,17 @@ func (b *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) {
}

type OffsetFetchResponse struct {
Blocks map[string]map[int32]*OffsetFetchResponseBlock
Version int16
ThrottleTimeMs int32
Blocks map[string]map[int32]*OffsetFetchResponseBlock
Err KError
}

func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
if r.Version >= 3 {
pe.putInt32(r.ThrottleTimeMs)
}

if err := pe.putArrayLength(len(r.Blocks)); err != nil {
return err
}
Expand All @@ -56,51 +75,73 @@ func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
}
for partition, block := range partitions {
pe.putInt32(partition)
if err := block.encode(pe); err != nil {
if err := block.encode(pe, r.Version); err != nil {
return err
}
}
}
if r.Version >= 2 {
pe.putInt16(int16(r.Err))
}
return nil
}

func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error) {
numTopics, err := pd.getArrayLength()
if err != nil || numTopics == 0 {
return err
}

r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics)
for i := 0; i < numTopics; i++ {
name, err := pd.getString()
if err != nil {
return err
}
r.Version = version

numBlocks, err := pd.getArrayLength()
if version >= 3 {
r.ThrottleTimeMs, err = pd.getInt32()
if err != nil {
return err
}
}

if numBlocks == 0 {
r.Blocks[name] = nil
continue
}
r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
numTopics, err := pd.getArrayLength()
if err != nil {
return err
}

for j := 0; j < numBlocks; j++ {
id, err := pd.getInt32()
if numTopics > 0 {
r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics)
for i := 0; i < numTopics; i++ {
name, err := pd.getString()
if err != nil {
return err
}

block := new(OffsetFetchResponseBlock)
err = block.decode(pd)
numBlocks, err := pd.getArrayLength()
if err != nil {
return err
}
r.Blocks[name][id] = block

if numBlocks == 0 {
r.Blocks[name] = nil
continue
}
r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)

for j := 0; j < numBlocks; j++ {
id, err := pd.getInt32()
if err != nil {
return err
}

block := new(OffsetFetchResponseBlock)
err = block.decode(pd, version)
if err != nil {
return err
}
r.Blocks[name][id] = block
}
}
}

if version >= 2 {
kerr, err := pd.getInt16()
if err != nil {
return err
}
r.Err = KError(kerr)
}

return nil
Expand All @@ -111,11 +152,24 @@ func (r *OffsetFetchResponse) key() int16 {
}

func (r *OffsetFetchResponse) version() int16 {
return 0
return r.Version
}

func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
return MinVersion
switch r.Version {
case 1:
return V0_8_2_0
case 2:
return V0_10_2_0
case 3:
return V0_11_0_0
case 4:
return V2_0_0_0
case 5:
return V2_1_0_0
default:
return MinVersion
}
}

func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock {
Expand Down
Loading