From 893be7d533d1d10e18a417108f6e4b9f0e05e059 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Friedrich=20Gro=C3=9Fe?= Date: Wed, 6 Dec 2017 09:20:12 +0100 Subject: [PATCH 1/2] Permit setting version on mock produce response This allows setting the version of the message data struct for MockProduceResponse. This change is very similar to what was already done in pull request #939 In order to mock a "produce" request for Kafka version 0.10.2.0 you would use the following: leader.SetHandlerByMap(map[string]MockResponse{ "ProduceRequest": NewMockProduceResponse(t). SetVersion(2). SetError("my_topic", 0, ErrNoError), }) --- async_producer_test.go | 1 + mockresponses.go | 10 +++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/async_producer_test.go b/async_producer_test.go index 07d23533b..716b654ea 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -639,6 +639,7 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { leader.SetHandlerByMap(map[string]MockResponse{ "ProduceRequest": NewMockProduceResponse(t). + SetVersion(0). SetError("my_topic", 0, ErrNoError), }) diff --git a/mockresponses.go b/mockresponses.go index 9659757b7..1c2922d91 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -384,6 +384,7 @@ func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int3 // MockProduceResponse is a `ProduceResponse` builder. type MockProduceResponse struct { + version int16 errors map[string]map[int32]KError t TestReporter } @@ -392,6 +393,11 @@ func NewMockProduceResponse(t TestReporter) *MockProduceResponse { return &MockProduceResponse{t: t} } +func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse { + mr.version = version + return mr +} + func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse { if mr.errors == nil { mr.errors = make(map[string]map[int32]KError) @@ -407,7 +413,9 @@ func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KE func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder { req := reqBody.(*ProduceRequest) - res := &ProduceResponse{} + res := &ProduceResponse{ + Version: mr.version, + } for topic, partitions := range req.records { for partition := range partitions { res.AddTopicPartition(topic, partition, mr.getError(topic, partition)) From 5169c3158b1810893421810e7fdade82553af965 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Friedrich=20Gro=C3=9Fe?= Date: Thu, 4 Jan 2018 11:01:31 +0100 Subject: [PATCH 2/2] Fix formatting --- mockresponses.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mockresponses.go b/mockresponses.go index 1c2922d91..f79a9d5e9 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -384,9 +384,9 @@ func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int3 // MockProduceResponse is a `ProduceResponse` builder. type MockProduceResponse struct { - version int16 - errors map[string]map[int32]KError - t TestReporter + version int16 + errors map[string]map[int32]KError + t TestReporter } func NewMockProduceResponse(t TestReporter) *MockProduceResponse {