Skip to content

Commit

Permalink
Support ProduceResponse v1 and v2 encoding
Browse files Browse the repository at this point in the history
- sort Blocks by increasing order prior to encoding to ensure
  deterministic payload to simplify unit tests
- encode ProduceResponse ThrottleTime when version >= 1
- encode ProduceResponseBlock Timestamp when version >= 2
- add unit tests for ProduceResponse decoding (version 1 and 2)
- add unit tests for ProduceResponse encoding
  • Loading branch information
slaunay committed Oct 18, 2017
1 parent 70f6a70 commit c1bc8e1
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 47 deletions.
35 changes: 31 additions & 4 deletions produce_response.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package sarama

import "time"
import (
"fmt"
"sort"
"time"
)

type ProduceResponseBlock struct {
Err KError
Expand Down Expand Up @@ -92,7 +96,14 @@ func (r *ProduceResponse) encode(pe packetEncoder) error {
if err != nil {
return err
}
for topic, partitions := range r.Blocks {
// Sort by topic name to make the encoding deterministic (map order is undefined)
var topics []string
for t := range r.Blocks {
topics = append(topics, t)
}
sort.Strings(topics)
for _, topic := range topics {
partitions := r.Blocks[topic]
err = pe.putString(topic)
if err != nil {
return err
Expand All @@ -101,10 +112,26 @@ func (r *ProduceResponse) encode(pe packetEncoder) error {
if err != nil {
return err
}
for id, prb := range partitions {
pe.putInt32(id)
// Sort by partition number to make the encoding deterministic (map order is undefined)
var partitionNumbers []int
for p := range partitions {
partitionNumbers = append(partitionNumbers, int(p))
}
sort.Ints(partitionNumbers)
for _, p := range partitionNumbers {
prb := partitions[int32(p)]
pe.putInt32(int32(p))
pe.putInt16(int16(prb.Err))
pe.putInt64(prb.Offset)
if r.Version >= 2 {
timestamp := int64(-1)
if !prb.Timestamp.Before(time.Unix(0, 0)) {
timestamp = prb.Timestamp.UnixNano() / int64(time.Millisecond)
} else if !prb.Timestamp.IsZero() {
return PacketDecodingError{fmt.Sprintf("invalid timestamp (%v)", prb.Timestamp)}
}
pe.putInt64(timestamp)
}
}
}
if r.Version >= 1 {
Expand Down
194 changes: 151 additions & 43 deletions produce_response_test.go
Original file line number Diff line number Diff line change
@@ -1,67 +1,175 @@
package sarama

import "testing"
import (
"fmt"
"testing"
"time"
)

var (
produceResponseNoBlocks = []byte{
produceResponseNoBlocksV0 = []byte{
0x00, 0x00, 0x00, 0x00}

produceResponseManyBlocks = []byte{
0x00, 0x00, 0x00, 0x02,
produceResponseManyBlocksVersions = [][]byte{
{
0x00, 0x00, 0x00, 0x02,

0x00, 0x03, 'b', 'a', 'r',
0x00, 0x00, 0x00, 0x00,

0x00, 0x03, 'f', 'o', 'o',
0x00, 0x00, 0x00, 0x02,

0x00, 0x00, 0x00, 0x01, // Partition 1
0x00, 0x00, // ErrNoError
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255

0x00, 0x00, 0x00, 0x02, // Partition 2
0x00, 0x02, // ErrInvalidMessage
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // Offset 0
}, {
0x00, 0x00, 0x00, 0x02,

0x00, 0x03, 'b', 'a', 'r',
0x00, 0x00, 0x00, 0x00,

0x00, 0x03, 'f', 'o', 'o',
0x00, 0x00, 0x00, 0x02,

0x00, 0x00, 0x00, 0x01, // Partition 1
0x00, 0x00, // ErrNoError
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255

0x00, 0x03, 'f', 'o', 'o',
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x02, // Partition 2
0x00, 0x02, // ErrInvalidMessage
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // Offset 0

0x00, 0x03, 'b', 'a', 'r',
0x00, 0x00, 0x00, 0x02,
0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
}, {
0x00, 0x00, 0x00, 0x02,

0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF,
0x00, 0x03, 'b', 'a', 'r',
0x00, 0x00, 0x00, 0x00,

0x00, 0x00, 0x00, 0x02,
0x00, 0x02,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
0x00, 0x03, 'f', 'o', 'o',
0x00, 0x00, 0x00, 0x02,

0x00, 0x00, 0x00, 0x01, // Partition 1
0x00, 0x00, // ErrNoError
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, // Timestamp -1 (CreateTime was used)

0x00, 0x00, 0x00, 0x02, // Partition 2
0x00, 0x02, // ErrInvalidMessage
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // Offset 0
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used)

0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
},
}
)

func TestProduceResponse(t *testing.T) {
func TestProduceResponseDecode(t *testing.T) {
response := ProduceResponse{}

testVersionDecodable(t, "no blocks", &response, produceResponseNoBlocks, 0)
testVersionDecodable(t, "no blocks", &response, produceResponseNoBlocksV0, 0)
if len(response.Blocks) != 0 {
t.Error("Decoding produced", len(response.Blocks), "topics where there were none")
}

testVersionDecodable(t, "many blocks", &response, produceResponseManyBlocks, 0)
if len(response.Blocks) != 2 {
t.Error("Decoding produced", len(response.Blocks), "topics where there were 2")
}
if len(response.Blocks["foo"]) != 0 {
t.Error("Decoding produced", len(response.Blocks["foo"]), "partitions for 'foo' where there were none")
}
if len(response.Blocks["bar"]) != 2 {
t.Error("Decoding produced", len(response.Blocks["bar"]), "partitions for 'bar' where there were two")
}
block := response.GetBlock("bar", 1)
if block == nil {
t.Error("Decoding did not produce a block for bar/1")
} else {
if block.Err != ErrNoError {
t.Error("Decoding failed for bar/1/Err, got:", int16(block.Err))
for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions {
t.Logf("Decoding produceResponseManyBlocks version %d", v)
testVersionDecodable(t, "many blocks", &response, produceResponseManyBlocks, int16(v))
if len(response.Blocks) != 2 {
t.Error("Decoding produced", len(response.Blocks), "topics where there were 2")
}
if block.Offset != 0xFF {
t.Error("Decoding failed for bar/1/Offset, got:", block.Offset)
if len(response.Blocks["bar"]) != 0 {
t.Error("Decoding produced", len(response.Blocks["bar"]), "partitions for 'bar' where there were none")
}
}
block = response.GetBlock("bar", 2)
if block == nil {
t.Error("Decoding did not produce a block for bar/2")
} else {
if block.Err != ErrInvalidMessage {
t.Error("Decoding failed for bar/2/Err, got:", int16(block.Err))
if len(response.Blocks["foo"]) != 2 {
t.Error("Decoding produced", len(response.Blocks["foo"]), "partitions for 'foo' where there were two")
}
if block.Offset != 0 {
t.Error("Decoding failed for bar/2/Offset, got:", block.Offset)
block := response.GetBlock("foo", 1)
if block == nil {
t.Error("Decoding did not produce a block for foo/1")
} else {
if block.Err != ErrNoError {
t.Error("Decoding failed for foo/1/Err, got:", int16(block.Err))
}
if block.Offset != 255 {
t.Error("Decoding failed for foo/1/Offset, got:", block.Offset)
}
if v >= 2 {
if !block.Timestamp.IsZero() {
t.Error("Decoding failed for foo/1/Timestamp, got:", block.Timestamp)
}
}
}
block = response.GetBlock("foo", 2)
if block == nil {
t.Error("Decoding did not produce a block for foo/2")
} else {
if block.Err != ErrInvalidMessage {
t.Error("Decoding failed for foo/2/Err, got:", int16(block.Err))
}
if block.Offset != 0 {
t.Error("Decoding failed for foo/2/Offset, got:", block.Offset)
}
if v >= 2 {
if block.Timestamp != time.Unix(1, 0) {
t.Error("Decoding failed for foo/2/Timestamp, got:", block.Timestamp)
}
}
}
if v >= 1 {
if expected := 100 * time.Millisecond; response.ThrottleTime != expected {
t.Error("Failed decoding produced throttle time, expected:", expected, ", got:", response.ThrottleTime)
}
}
}
}

func TestProduceResponseEncode(t *testing.T) {
response := ProduceResponse{}
response.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
testEncodable(t, "empty", &response, produceResponseNoBlocksV0)

response.Blocks["bar"] = make(map[int32]*ProduceResponseBlock)
response.Blocks["foo"] = make(map[int32]*ProduceResponseBlock)
response.Blocks["foo"][1] = &ProduceResponseBlock{
Err: ErrNoError,
Offset: 255,
Timestamp: time.Time{},
}
response.Blocks["foo"][2] = &ProduceResponseBlock{
Err: ErrInvalidMessage,
Offset: 0,
Timestamp: time.Unix(1, 0),
}
response.ThrottleTime = 100 * time.Millisecond
for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions {
response.Version = int16(v)
testEncodable(t, fmt.Sprintf("many blocks version %d", v), &response, produceResponseManyBlocks)
}
}

func TestProduceResponseEncodeInvalidTimestamp(t *testing.T) {
response := ProduceResponse{}
response.Version = 2
response.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
response.Blocks["t"] = make(map[int32]*ProduceResponseBlock)
response.Blocks["t"][0] = &ProduceResponseBlock{
Err: ErrNoError,
Offset: 0,
// Use a timestamp before Unix time
Timestamp: time.Unix(0, 0).Add(-1 * time.Millisecond),
}
response.ThrottleTime = 100 * time.Millisecond
_, err := encode(&response, nil)
if err == nil {
t.Error("Expecting error, got nil")
}
if _, ok := err.(PacketDecodingError); !ok {
t.Error("Expecting PacketDecodingErrorerror, got:", err)
}
}

0 comments on commit c1bc8e1

Please sign in to comment.