diff --git a/produce_response.go b/produce_response.go index 3f05dd9fb..fc926161f 100644 --- a/produce_response.go +++ b/produce_response.go @@ -1,6 +1,9 @@ package sarama -import "time" +import ( + "fmt" + "time" +) type ProduceResponseBlock struct { Err KError @@ -32,6 +35,23 @@ func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err erro return nil } +func (b *ProduceResponseBlock) encode(pe packetEncoder, version int16) (err error) { + pe.putInt16(int16(b.Err)) + pe.putInt64(b.Offset) + + if version >= 2 { + timestamp := int64(-1) + if !b.Timestamp.Before(time.Unix(0, 0)) { + timestamp = b.Timestamp.UnixNano() / int64(time.Millisecond) + } else if !b.Timestamp.IsZero() { + return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", b.Timestamp)} + } + pe.putInt64(timestamp) + } + + return nil +} + type ProduceResponse struct { Blocks map[string]map[int32]*ProduceResponseBlock Version int16 @@ -103,8 +123,10 @@ func (r *ProduceResponse) encode(pe packetEncoder) error { } for id, prb := range partitions { pe.putInt32(id) - pe.putInt16(int16(prb.Err)) - pe.putInt64(prb.Offset) + err = prb.encode(pe, r.Version) + if err != nil { + return err + } } } if r.Version >= 1 { diff --git a/produce_response_test.go b/produce_response_test.go index f71709fe8..197c7fb50 100644 --- a/produce_response_test.go +++ b/produce_response_test.go @@ -1,67 +1,128 @@ package sarama -import "testing" +import ( + "fmt" + "testing" + "time" +) var ( - produceResponseNoBlocks = []byte{ + produceResponseNoBlocksV0 = []byte{ 0x00, 0x00, 0x00, 0x00} - produceResponseManyBlocks = []byte{ - 0x00, 0x00, 0x00, 0x02, + produceResponseManyBlocksVersions = [][]byte{ + { + 0x00, 0x00, 0x00, 0x01, + + 0x00, 0x03, 'f', 'o', 'o', + 0x00, 0x00, 0x00, 0x01, + + 0x00, 0x00, 0x00, 0x01, // Partition 1 + 0x00, 0x02, // ErrInvalidMessage + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255 + }, { + 0x00, 0x00, 0x00, 0x01, - 0x00, 0x03, 'f', 'o', 'o', - 0x00, 0x00, 0x00, 0x00, + 0x00, 0x03, 'f', 'o', 'o', + 0x00, 0x00, 0x00, 0x01, - 0x00, 0x03, 'b', 'a', 'r', - 0x00, 0x00, 0x00, 0x02, + 0x00, 0x00, 0x00, 0x01, // Partition 1 + 0x00, 0x02, // ErrInvalidMessage + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255 - 0x00, 0x00, 0x00, 0x01, - 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, + 0x00, 0x00, 0x00, 0x64, // 100 ms throttle time + }, { + 0x00, 0x00, 0x00, 0x01, - 0x00, 0x00, 0x00, 0x02, - 0x00, 0x02, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} + 0x00, 0x03, 'f', 'o', 'o', + 0x00, 0x00, 0x00, 0x01, + + 0x00, 0x00, 0x00, 0x01, // Partition 1 + 0x00, 0x02, // ErrInvalidMessage + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255 + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used) + + 0x00, 0x00, 0x00, 0x64, // 100 ms throttle time + }, + } ) -func TestProduceResponse(t *testing.T) { +func TestProduceResponseDecode(t *testing.T) { response := ProduceResponse{} - testVersionDecodable(t, "no blocks", &response, produceResponseNoBlocks, 0) + testVersionDecodable(t, "no blocks", &response, produceResponseNoBlocksV0, 0) if len(response.Blocks) != 0 { t.Error("Decoding produced", len(response.Blocks), "topics where there were none") } - testVersionDecodable(t, "many blocks", &response, produceResponseManyBlocks, 0) - if len(response.Blocks) != 2 { - t.Error("Decoding produced", len(response.Blocks), "topics where there were 2") - } - if len(response.Blocks["foo"]) != 0 { - t.Error("Decoding produced", len(response.Blocks["foo"]), "partitions for 'foo' where there were none") - } - if len(response.Blocks["bar"]) != 2 { - t.Error("Decoding produced", len(response.Blocks["bar"]), "partitions for 'bar' where there were two") - } - block := response.GetBlock("bar", 1) - if block == nil { - t.Error("Decoding did not produce a block for bar/1") - } else { - if block.Err != ErrNoError { - t.Error("Decoding failed for bar/1/Err, got:", int16(block.Err)) + for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions { + t.Logf("Decoding produceResponseManyBlocks version %d", v) + testVersionDecodable(t, "many blocks", &response, produceResponseManyBlocks, int16(v)) + if len(response.Blocks) != 1 { + t.Error("Decoding produced", len(response.Blocks), "topics where there was 1") } - if block.Offset != 0xFF { - t.Error("Decoding failed for bar/1/Offset, got:", block.Offset) + if len(response.Blocks["foo"]) != 1 { + t.Error("Decoding produced", len(response.Blocks["foo"]), "partitions for 'foo' where there was one") } - } - block = response.GetBlock("bar", 2) - if block == nil { - t.Error("Decoding did not produce a block for bar/2") - } else { - if block.Err != ErrInvalidMessage { - t.Error("Decoding failed for bar/2/Err, got:", int16(block.Err)) + block := response.GetBlock("foo", 1) + if block == nil { + t.Error("Decoding did not produce a block for foo/1") + } else { + if block.Err != ErrInvalidMessage { + t.Error("Decoding failed for foo/2/Err, got:", int16(block.Err)) + } + if block.Offset != 255 { + t.Error("Decoding failed for foo/1/Offset, got:", block.Offset) + } + if v >= 2 { + if block.Timestamp != time.Unix(1, 0) { + t.Error("Decoding failed for foo/2/Timestamp, got:", block.Timestamp) + } + } } - if block.Offset != 0 { - t.Error("Decoding failed for bar/2/Offset, got:", block.Offset) + if v >= 1 { + if expected := 100 * time.Millisecond; response.ThrottleTime != expected { + t.Error("Failed decoding produced throttle time, expected:", expected, ", got:", response.ThrottleTime) + } } } } + +func TestProduceResponseEncode(t *testing.T) { + response := ProduceResponse{} + response.Blocks = make(map[string]map[int32]*ProduceResponseBlock) + testEncodable(t, "empty", &response, produceResponseNoBlocksV0) + + response.Blocks["foo"] = make(map[int32]*ProduceResponseBlock) + response.Blocks["foo"][1] = &ProduceResponseBlock{ + Err: ErrInvalidMessage, + Offset: 255, + Timestamp: time.Unix(1, 0), + } + response.ThrottleTime = 100 * time.Millisecond + for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions { + response.Version = int16(v) + testEncodable(t, fmt.Sprintf("many blocks version %d", v), &response, produceResponseManyBlocks) + } +} + +func TestProduceResponseEncodeInvalidTimestamp(t *testing.T) { + response := ProduceResponse{} + response.Version = 2 + response.Blocks = make(map[string]map[int32]*ProduceResponseBlock) + response.Blocks["t"] = make(map[int32]*ProduceResponseBlock) + response.Blocks["t"][0] = &ProduceResponseBlock{ + Err: ErrNoError, + Offset: 0, + // Use a timestamp before Unix time + Timestamp: time.Unix(0, 0).Add(-1 * time.Millisecond), + } + response.ThrottleTime = 100 * time.Millisecond + _, err := encode(&response, nil) + if err == nil { + t.Error("Expecting error, got nil") + } + if _, ok := err.(PacketEncodingError); !ok { + t.Error("Expecting PacketEncodingError, got:", err) + } +}