From 3f16fb0e7af8c355bc8a1d1f16b2b28e5da4e611 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Mon, 22 Oct 2018 18:20:41 +0100 Subject: [PATCH] Add support for newer OffsetCommit request/response --- offset_commit_request.go | 8 +++++++- offset_commit_request_test.go | 27 ++++++++++++++++----------- offset_commit_response.go | 31 ++++++++++++++++++++++++++++--- offset_commit_response_test.go | 15 +++++++++++++++ 4 files changed, 66 insertions(+), 15 deletions(-) diff --git a/offset_commit_request.go b/offset_commit_request.go index 37e99fbf5..1ec583e6d 100644 --- a/offset_commit_request.go +++ b/offset_commit_request.go @@ -52,12 +52,14 @@ type OffsetCommitRequest struct { // - 0 (kafka 0.8.1 and later) // - 1 (kafka 0.8.2 and later) // - 2 (kafka 0.9.0 and later) + // - 3 (kafka 0.11.0 and later) + // - 4 (kafka 2.0.0 and later) Version int16 blocks map[string]map[int32]*offsetCommitRequestBlock } func (r *OffsetCommitRequest) encode(pe packetEncoder) error { - if r.Version < 0 || r.Version > 2 { + if r.Version < 0 || r.Version > 4 { return PacketEncodingError{"invalid or unsupported OffsetCommitRequest version field"} } @@ -174,6 +176,10 @@ func (r *OffsetCommitRequest) requiredVersion() KafkaVersion { return V0_8_2_0 case 2: return V0_9_0_0 + case 3: + return V0_11_0_0 + case 4: + return V2_0_0_0 default: return MinVersion } diff --git a/offset_commit_request_test.go b/offset_commit_request_test.go index afc25b7b3..efb3d33f1 100644 --- a/offset_commit_request_test.go +++ b/offset_commit_request_test.go @@ -1,6 +1,9 @@ package sarama -import "testing" +import ( + "fmt" + "testing" +) var ( offsetCommitRequestNoBlocksV0 = []byte{ @@ -76,15 +79,17 @@ func TestOffsetCommitRequestV1(t *testing.T) { testRequest(t, "one block v1", request, offsetCommitRequestOneBlockV1) } -func TestOffsetCommitRequestV2(t *testing.T) { - request := new(OffsetCommitRequest) - request.ConsumerGroup = "foobar" - request.ConsumerID = "cons" - request.ConsumerGroupGeneration = 0x1122 - request.RetentionTime = 0x4433 - request.Version = 2 - testRequest(t, "no blocks v2", request, offsetCommitRequestNoBlocksV2) +func TestOffsetCommitRequestV2ToV4(t *testing.T) { + for version := 2; version <= 4; version++ { + request := new(OffsetCommitRequest) + request.ConsumerGroup = "foobar" + request.ConsumerID = "cons" + request.ConsumerGroupGeneration = 0x1122 + request.RetentionTime = 0x4433 + request.Version = int16(version) + testRequest(t, fmt.Sprintf("no blocks v%d", version), request, offsetCommitRequestNoBlocksV2) - request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, "metadata") - testRequest(t, "one block v2", request, offsetCommitRequestOneBlockV2) + request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, "metadata") + testRequest(t, fmt.Sprintf("one block v%d", version), request, offsetCommitRequestOneBlockV2) + } } diff --git a/offset_commit_response.go b/offset_commit_response.go index a4b18acdf..e842298db 100644 --- a/offset_commit_response.go +++ b/offset_commit_response.go @@ -1,7 +1,9 @@ package sarama type OffsetCommitResponse struct { - Errors map[string]map[int32]KError + Version int16 + ThrottleTimeMs int32 + Errors map[string]map[int32]KError } func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError) { @@ -17,6 +19,9 @@ func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KE } func (r *OffsetCommitResponse) encode(pe packetEncoder) error { + if r.Version >= 3 { + pe.putInt32(r.ThrottleTimeMs) + } if err := pe.putArrayLength(len(r.Errors)); err != nil { return err } @@ -36,6 +41,15 @@ func (r *OffsetCommitResponse) encode(pe packetEncoder) error { } func (r *OffsetCommitResponse) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + + if version >= 3 { + r.ThrottleTimeMs, err = pd.getInt32() + if err != nil { + return err + } + } + numTopics, err := pd.getArrayLength() if err != nil || numTopics == 0 { return err @@ -77,9 +91,20 @@ func (r *OffsetCommitResponse) key() int16 { } func (r *OffsetCommitResponse) version() int16 { - return 0 + return r.Version } func (r *OffsetCommitResponse) requiredVersion() KafkaVersion { - return MinVersion + switch r.Version { + case 1: + return V0_8_2_0 + case 2: + return V0_9_0_0 + case 3: + return V0_11_0_0 + case 4: + return V2_0_0_0 + default: + return MinVersion + } } diff --git a/offset_commit_response_test.go b/offset_commit_response_test.go index 074ec9232..3c85713c7 100644 --- a/offset_commit_response_test.go +++ b/offset_commit_response_test.go @@ -1,6 +1,7 @@ package sarama import ( + "fmt" "testing" ) @@ -22,3 +23,17 @@ func TestNormalOffsetCommitResponse(t *testing.T) { // unpredictable map traversal order. testResponse(t, "normal", &response, nil) } + +func TestOffsetCommitResponseWithThrottleTime(t *testing.T) { + for version := 3; version <= 4; version++ { + response := OffsetCommitResponse{ + Version: int16(version), + ThrottleTimeMs: 123, + } + response.AddError("t", 0, ErrNotLeaderForPartition) + response.Errors["m"] = make(map[int32]KError) + // The response encoded form cannot be checked for it varies due to + // unpredictable map traversal order. + testResponse(t, fmt.Sprintf("v%d with throttle time", version), &response, nil) + } +}