diff --git a/async_producer_test.go b/async_producer_test.go index 07d23533b..716b654ea 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -639,6 +639,7 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { leader.SetHandlerByMap(map[string]MockResponse{ "ProduceRequest": NewMockProduceResponse(t). + SetVersion(0). SetError("my_topic", 0, ErrNoError), }) diff --git a/mockresponses.go b/mockresponses.go index 9659757b7..1c2922d91 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -384,6 +384,7 @@ func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int3 // MockProduceResponse is a `ProduceResponse` builder. type MockProduceResponse struct { + version int16 errors map[string]map[int32]KError t TestReporter } @@ -392,6 +393,11 @@ func NewMockProduceResponse(t TestReporter) *MockProduceResponse { return &MockProduceResponse{t: t} } +func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse { + mr.version = version + return mr +} + func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse { if mr.errors == nil { mr.errors = make(map[string]map[int32]KError) @@ -407,7 +413,9 @@ func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KE func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder { req := reqBody.(*ProduceRequest) - res := &ProduceResponse{} + res := &ProduceResponse{ + Version: mr.version, + } for topic, partitions := range req.records { for partition := range partitions { res.AddTopicPartition(topic, partition, mr.getError(topic, partition))