diff --git a/async_producer_test.go b/async_producer_test.go index 07d23533b..716b654ea 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -639,6 +639,7 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { leader.SetHandlerByMap(map[string]MockResponse{ "ProduceRequest": NewMockProduceResponse(t). + SetVersion(0). SetError("my_topic", 0, ErrNoError), }) diff --git a/mockresponses.go b/mockresponses.go index 9659757b7..f79a9d5e9 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -384,14 +384,20 @@ func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int3 // MockProduceResponse is a `ProduceResponse` builder. type MockProduceResponse struct { - errors map[string]map[int32]KError - t TestReporter + version int16 + errors map[string]map[int32]KError + t TestReporter } func NewMockProduceResponse(t TestReporter) *MockProduceResponse { return &MockProduceResponse{t: t} } +func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse { + mr.version = version + return mr +} + func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse { if mr.errors == nil { mr.errors = make(map[string]map[int32]KError) @@ -407,7 +413,9 @@ func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KE func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder { req := reqBody.(*ProduceRequest) - res := &ProduceResponse{} + res := &ProduceResponse{ + Version: mr.version, + } for topic, partitions := range req.records { for partition := range partitions { res.AddTopicPartition(topic, partition, mr.getError(topic, partition))