From f3c86b03173ed8fe312f9f65d58858a64b38ec18 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Fri, 27 Mar 2015 14:28:20 +0000 Subject: [PATCH] The wiki of the offset API has been updated again This should bring us up to date. --- offset_commit_request.go | 61 ++++++++++++++++++++++++----------- offset_commit_request_test.go | 49 ++++++++++++++++++++++++---- offset_fetch_request.go | 7 +++- 3 files changed, 90 insertions(+), 27 deletions(-) diff --git a/offset_commit_request.go b/offset_commit_request.go index f807e9b76..68756c8f2 100644 --- a/offset_commit_request.go +++ b/offset_commit_request.go @@ -13,44 +13,71 @@ type offsetCommitRequestBlock struct { func (r *offsetCommitRequestBlock) encode(pe packetEncoder, version int16) error { pe.putInt64(r.offset) - if version >= 1 { + if version == 1 { pe.putInt64(r.timestamp) + } else if r.timestamp != 0 { + Logger.Println("Non-zero timestamp specified for OffsetCommitRequest not v1, it will be ignored") } + return pe.putString(r.metadata) } type OffsetCommitRequest struct { - ConsumerGroup string - Version int16 // 0 (0.8.1 and later) or 1 (0.8.2 and later, includes timestamp field) - blocks map[string]map[int32]*offsetCommitRequestBlock + ConsumerGroup string + ConsumerGroupGeneration int32 // v1 or later + ConsumerID string // v1 or later + RetentionTime int64 // v2 or later + + // Version can be: + // - 0 (kafka 0.8.1 and later) + // - 1 (kafka 0.8.2 and later) + // - 2 (kafka 0.8.3 and later) + Version int16 + blocks map[string]map[int32]*offsetCommitRequestBlock } func (r *OffsetCommitRequest) encode(pe packetEncoder) error { - if r.Version < 0 || r.Version > 1 { + if r.Version < 0 || r.Version > 2 { return PacketEncodingError{"invalid or unsupported OffsetCommitRequest version field"} } - err := pe.putString(r.ConsumerGroup) - if err != nil { + if err := pe.putString(r.ConsumerGroup); err != nil { return err } - err = pe.putArrayLength(len(r.blocks)) - if err != nil { + + if r.Version >= 1 { + pe.putInt32(r.ConsumerGroupGeneration) + if err := pe.putString(r.ConsumerID); err != nil { + return err + } + } else { + if r.ConsumerGroupGeneration != 0 { + Logger.Println("Non-zero ConsumerGroupGeneration specified for OffsetCommitRequest v0, it will be ignored") + } + if r.ConsumerID != "" { + Logger.Println("Non-empty ConsumerID specified for OffsetCommitRequest v0, it will be ignored") + } + } + + if r.Version >= 2 { + pe.putInt64(r.RetentionTime) + } else if r.RetentionTime != 0 { + Logger.Println("Non-zero RetentionTime specified for OffsetCommitRequest version <2, it will be ignored") + } + + if err := pe.putArrayLength(len(r.blocks)); err != nil { return err } for topic, partitions := range r.blocks { - err = pe.putString(topic) - if err != nil { + if err := pe.putString(topic); err != nil { return err } - err = pe.putArrayLength(len(partitions)) - if err != nil { + if err := pe.putArrayLength(len(partitions)); err != nil { return err } for partition, block := range partitions { pe.putInt32(partition) - err = block.encode(pe, r.Version) - if err != nil { + if err := block.encode(pe, r.Version); err != nil { return err } } @@ -75,9 +102,5 @@ func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset i r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock) } - if r.Version == 0 && timestamp != 0 { - Logger.Println("Non-zero timestamp specified for OffsetCommitRequest v0, it will be ignored") - } - r.blocks[topic][partitionID] = &offsetCommitRequestBlock{offset, timestamp, metadata} } diff --git a/offset_commit_request_test.go b/offset_commit_request_test.go index 89dd4d974..6e7f79bfc 100644 --- a/offset_commit_request_test.go +++ b/offset_commit_request_test.go @@ -3,12 +3,21 @@ package sarama import "testing" var ( - offsetCommitRequestNoGroupNoBlocks = []byte{ - 0x00, 0x00, + offsetCommitRequestNoBlocksV0 = []byte{ + 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', + 0x00, 0x00, 0x00, 0x00} + + offsetCommitRequestNoBlocksV1 = []byte{ + 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', + 0x00, 0x00, 0x11, 0x22, + 0x00, 0x04, 'c', 'o', 'n', 's', 0x00, 0x00, 0x00, 0x00} - offsetCommitRequestNoBlocks = []byte{ + offsetCommitRequestNoBlocksV2 = []byte{ 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', + 0x00, 0x00, 0x11, 0x22, + 0x00, 0x04, 'c', 'o', 'n', 's', + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x44, 0x33, 0x00, 0x00, 0x00, 0x00} offsetCommitRequestOneBlockV0 = []byte{ @@ -22,6 +31,8 @@ var ( offsetCommitRequestOneBlockV1 = []byte{ 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', + 0x00, 0x00, 0x11, 0x22, + 0x00, 0x04, 'c', 'o', 'n', 's', 0x00, 0x00, 0x00, 0x01, 0x00, 0x05, 't', 'o', 'p', 'i', 'c', 0x00, 0x00, 0x00, 0x01, @@ -29,18 +40,42 @@ var ( 0x00, 0x00, 0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a'} + + offsetCommitRequestOneBlockV2 = []byte{ + 0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r', + 0x00, 0x00, 0x11, 0x22, + 0x00, 0x04, 'c', 'o', 'n', 's', + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x44, 0x33, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x05, 't', 'o', 'p', 'i', 'c', + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x52, 0x21, + 0x00, 0x00, 0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF, + 0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a'} ) func TestOffsetCommitRequest(t *testing.T) { request := new(OffsetCommitRequest) - testEncodable(t, "no group, no blocks", request, offsetCommitRequestNoGroupNoBlocks) request.ConsumerGroup = "foobar" - testEncodable(t, "no blocks", request, offsetCommitRequestNoBlocks) + testEncodable(t, "no blocks v0", request, offsetCommitRequestNoBlocksV0) + + request.ConsumerGroupGeneration = 0x1122 + request.ConsumerID = "cons" + request.Version = 1 + testEncodable(t, "no blocks v1", request, offsetCommitRequestNoBlocksV1) + + request.RetentionTime = 0x4433 + request.Version = 2 + testEncodable(t, "no blocks v2", request, offsetCommitRequestNoBlocksV2) request.AddBlock("topic", 0x5221, 0xDEADBEEF, ReceiveTime, "metadata") - testEncodable(t, "one block", request, offsetCommitRequestOneBlockV0) + request.Version = 0 + testEncodable(t, "one block v0", request, offsetCommitRequestOneBlockV0) request.Version = 1 - testEncodable(t, "one block", request, offsetCommitRequestOneBlockV1) + testEncodable(t, "one block v1", request, offsetCommitRequestOneBlockV1) + + request.Version = 2 + testEncodable(t, "one block v2", request, offsetCommitRequestOneBlockV2) } diff --git a/offset_fetch_request.go b/offset_fetch_request.go index 295bb840e..c5815a0af 100644 --- a/offset_fetch_request.go +++ b/offset_fetch_request.go @@ -2,10 +2,15 @@ package sarama type OffsetFetchRequest struct { ConsumerGroup string + Version int16 partitions map[string][]int32 } func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) { + if r.Version < 0 || r.Version > 1 { + return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"} + } + if err = pe.putString(r.ConsumerGroup); err != nil { return err } @@ -28,7 +33,7 @@ func (r *OffsetFetchRequest) key() int16 { } func (r *OffsetFetchRequest) version() int16 { - return 0 + return r.Version } func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) {