Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The wiki of the offset API has been updated again #400

Merged
merged 1 commit into from
Mar 30, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 42 additions & 19 deletions offset_commit_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,44 +13,71 @@ type offsetCommitRequestBlock struct {

func (r *offsetCommitRequestBlock) encode(pe packetEncoder, version int16) error {
pe.putInt64(r.offset)
if version >= 1 {
if version == 1 {
pe.putInt64(r.timestamp)
} else if r.timestamp != 0 {
Logger.Println("Non-zero timestamp specified for OffsetCommitRequest not v1, it will be ignored")
}

return pe.putString(r.metadata)
}

type OffsetCommitRequest struct {
ConsumerGroup string
Version int16 // 0 (0.8.1 and later) or 1 (0.8.2 and later, includes timestamp field)
blocks map[string]map[int32]*offsetCommitRequestBlock
ConsumerGroup string
ConsumerGroupGeneration int32 // v1 or later
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any idea what this used for? Couldn't find anything useful in the wiki.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConsumerID string // v1 or later
RetentionTime int64 // v2 or later

// Version can be:
// - 0 (kafka 0.8.1 and later)
// - 1 (kafka 0.8.2 and later)
// - 2 (kafka 0.8.3 and later)
Version int16
blocks map[string]map[int32]*offsetCommitRequestBlock
}

func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
if r.Version < 0 || r.Version > 1 {
if r.Version < 0 || r.Version > 2 {
return PacketEncodingError{"invalid or unsupported OffsetCommitRequest version field"}
}

err := pe.putString(r.ConsumerGroup)
if err != nil {
if err := pe.putString(r.ConsumerGroup); err != nil {
return err
}
err = pe.putArrayLength(len(r.blocks))
if err != nil {

if r.Version >= 1 {
pe.putInt32(r.ConsumerGroupGeneration)
if err := pe.putString(r.ConsumerID); err != nil {
return err
}
} else {
if r.ConsumerGroupGeneration != 0 {
Logger.Println("Non-zero ConsumerGroupGeneration specified for OffsetCommitRequest v0, it will be ignored")
}
if r.ConsumerID != "" {
Logger.Println("Non-empty ConsumerID specified for OffsetCommitRequest v0, it will be ignored")
}
}

if r.Version >= 2 {
pe.putInt64(r.RetentionTime)
} else if r.RetentionTime != 0 {
Logger.Println("Non-zero RetentionTime specified for OffsetCommitRequest version <2, it will be ignored")
}

if err := pe.putArrayLength(len(r.blocks)); err != nil {
return err
}
for topic, partitions := range r.blocks {
err = pe.putString(topic)
if err != nil {
if err := pe.putString(topic); err != nil {
return err
}
err = pe.putArrayLength(len(partitions))
if err != nil {
if err := pe.putArrayLength(len(partitions)); err != nil {
return err
}
for partition, block := range partitions {
pe.putInt32(partition)
err = block.encode(pe, r.Version)
if err != nil {
if err := block.encode(pe, r.Version); err != nil {
return err
}
}
Expand All @@ -75,9 +102,5 @@ func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset i
r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
}

if r.Version == 0 && timestamp != 0 {
Logger.Println("Non-zero timestamp specified for OffsetCommitRequest v0, it will be ignored")
}

r.blocks[topic][partitionID] = &offsetCommitRequestBlock{offset, timestamp, metadata}
}
49 changes: 42 additions & 7 deletions offset_commit_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,21 @@ package sarama
import "testing"

var (
offsetCommitRequestNoGroupNoBlocks = []byte{
0x00, 0x00,
offsetCommitRequestNoBlocksV0 = []byte{
0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r',
0x00, 0x00, 0x00, 0x00}

offsetCommitRequestNoBlocksV1 = []byte{
0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r',
0x00, 0x00, 0x11, 0x22,
0x00, 0x04, 'c', 'o', 'n', 's',
0x00, 0x00, 0x00, 0x00}

offsetCommitRequestNoBlocks = []byte{
offsetCommitRequestNoBlocksV2 = []byte{
0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r',
0x00, 0x00, 0x11, 0x22,
0x00, 0x04, 'c', 'o', 'n', 's',
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x44, 0x33,
0x00, 0x00, 0x00, 0x00}

offsetCommitRequestOneBlockV0 = []byte{
Expand All @@ -22,25 +31,51 @@ var (

offsetCommitRequestOneBlockV1 = []byte{
0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r',
0x00, 0x00, 0x11, 0x22,
0x00, 0x04, 'c', 'o', 'n', 's',
0x00, 0x00, 0x00, 0x01,
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x52, 0x21,
0x00, 0x00, 0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF,
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a'}

offsetCommitRequestOneBlockV2 = []byte{
0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r',
0x00, 0x00, 0x11, 0x22,
0x00, 0x04, 'c', 'o', 'n', 's',
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x44, 0x33,
0x00, 0x00, 0x00, 0x01,
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x52, 0x21,
0x00, 0x00, 0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF,
0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a'}
)

func TestOffsetCommitRequest(t *testing.T) {
request := new(OffsetCommitRequest)
testEncodable(t, "no group, no blocks", request, offsetCommitRequestNoGroupNoBlocks)

request.ConsumerGroup = "foobar"
testEncodable(t, "no blocks", request, offsetCommitRequestNoBlocks)
testEncodable(t, "no blocks v0", request, offsetCommitRequestNoBlocksV0)

request.ConsumerGroupGeneration = 0x1122
request.ConsumerID = "cons"
request.Version = 1
testEncodable(t, "no blocks v1", request, offsetCommitRequestNoBlocksV1)

request.RetentionTime = 0x4433
request.Version = 2
testEncodable(t, "no blocks v2", request, offsetCommitRequestNoBlocksV2)

request.AddBlock("topic", 0x5221, 0xDEADBEEF, ReceiveTime, "metadata")
testEncodable(t, "one block", request, offsetCommitRequestOneBlockV0)
request.Version = 0
testEncodable(t, "one block v0", request, offsetCommitRequestOneBlockV0)

request.Version = 1
testEncodable(t, "one block", request, offsetCommitRequestOneBlockV1)
testEncodable(t, "one block v1", request, offsetCommitRequestOneBlockV1)

request.Version = 2
testEncodable(t, "one block v2", request, offsetCommitRequestOneBlockV2)
}
7 changes: 6 additions & 1 deletion offset_fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ package sarama

type OffsetFetchRequest struct {
ConsumerGroup string
Version int16
partitions map[string][]int32
}

func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
if r.Version < 0 || r.Version > 1 {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not r.version != 1, you don't have decimal version do you ?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually it should be > 2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean? Based on my reading of the protocol guide (and Jun's comment in JIRA) the version can be 0 or 1, so if it is <0 or >1 we need to bail.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind then, that looks good

return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
}

if err = pe.putString(r.ConsumerGroup); err != nil {
return err
}
Expand All @@ -28,7 +33,7 @@ func (r *OffsetFetchRequest) key() int16 {
}

func (r *OffsetFetchRequest) version() int16 {
return 0
return r.Version
}

func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) {
Expand Down