From 86644261a26750205398b25e472beb42b4f3604d Mon Sep 17 00:00:00 2001 From: Dirk Wilden Date: Tue, 14 May 2019 13:32:54 +0200 Subject: [PATCH 1/2] support ListConsumerGroupOffsets without topicPartition --- admin.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/admin.go b/admin.go index d88a0a493..f212c85cf 100644 --- a/admin.go +++ b/admin.go @@ -608,7 +608,9 @@ func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions m partitions: topicPartitions, } - if ca.conf.Version.IsAtLeast(V0_8_2_2) { + if ca.conf.Version.IsAtLeast(V0_10_2_0) { + request.Version = 2 + } else if ca.conf.Version.IsAtLeast(V0_8_2_2) { request.Version = 1 } From 4a0e1a7328c54bc5864fb4d2e0f62a971cbfaf84 Mon Sep 17 00:00:00 2001 From: Dirk Wilden Date: Tue, 21 May 2019 13:32:35 +0200 Subject: [PATCH 2/2] include error in mocked OffsetFetchResponseV2 --- admin_test.go | 2 +- mockresponses.go | 13 ++++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/admin_test.go b/admin_test.go index aafb1481a..0134770ac 100644 --- a/admin_test.go +++ b/admin_test.go @@ -822,7 +822,7 @@ func TestListConsumerGroupOffsets(t *testing.T) { expectedOffset := int64(0) seedBroker.SetHandlerByMap(map[string]MockResponse{ - "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError), + "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError).SetError(ErrNoError), "MetadataRequest": NewMockMetadataResponse(t). SetController(seedBroker.BrokerID()). SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), diff --git a/mockresponses.go b/mockresponses.go index 919d8bb07..0c08f6a66 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -574,6 +574,7 @@ func (mr *MockProduceResponse) getError(topic string, partition int32) KError { // MockOffsetFetchResponse is a `OffsetFetchResponse` builder. type MockOffsetFetchResponse struct { offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock + error KError t TestReporter } @@ -599,15 +600,25 @@ func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int3 return mr } +func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse { + mr.error = kerror + return mr +} + func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder { req := reqBody.(*OffsetFetchRequest) group := req.ConsumerGroup - res := &OffsetFetchResponse{} + res := &OffsetFetchResponse{Version: req.Version} + for topic, partitions := range mr.offsets[group] { for partition, block := range partitions { res.AddBlock(topic, partition, block) } } + + if res.Version >= 2 { + res.Err = mr.error + } return res }