diff --git a/admin.go b/admin.go index 18ca4918d..a4d1bc510 100644 --- a/admin.go +++ b/admin.go @@ -611,7 +611,9 @@ func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions m partitions: topicPartitions, } - if ca.conf.Version.IsAtLeast(V0_8_2_2) { + if ca.conf.Version.IsAtLeast(V0_10_2_0) { + request.Version = 2 + } else if ca.conf.Version.IsAtLeast(V0_8_2_2) { request.Version = 1 } diff --git a/admin_test.go b/admin_test.go index 40061113b..6182adf1d 100644 --- a/admin_test.go +++ b/admin_test.go @@ -822,7 +822,7 @@ func TestListConsumerGroupOffsets(t *testing.T) { expectedOffset := int64(0) seedBroker.SetHandlerByMap(map[string]MockResponse{ - "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError), + "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError).SetError(ErrNoError), "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), diff --git a/mockresponses.go b/mockresponses.go index 3b0747c1f..c78f0acc0 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -574,6 +574,7 @@ func (mr *MockProduceResponse) getError(topic string, partition int32) KError { // MockOffsetFetchResponse is a `OffsetFetchResponse` builder. type MockOffsetFetchResponse struct { offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock + error KError t TestReporter } @@ -599,15 +600,25 @@ func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int3 return mr } +func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse { + mr.error = kerror + return mr +} + func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder { req := reqBody.(*OffsetFetchRequest) group := req.ConsumerGroup - res := &OffsetFetchResponse{} + res := &OffsetFetchResponse{Version: req.Version} + for topic, partitions := range mr.offsets[group] { for partition, block := range partitions { res.AddBlock(topic, partition, block) } } + + if res.Version >= 2 { + res.Err = mr.error + } return res }