Skip to content

Commit

Permalink
Merge pull request #1374 from dwi-di/consumer-group-offsets-nil
Browse files Browse the repository at this point in the history
support ListConsumerGroupOffsets without topicPartition
  • Loading branch information
bai committed Jul 2, 2019
2 parents 2e58677 + 4a0e1a7 commit d9dbc20
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 3 deletions.
4 changes: 3 additions & 1 deletion admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,9 @@ func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions m
partitions: topicPartitions,
}

if ca.conf.Version.IsAtLeast(V0_8_2_2) {
if ca.conf.Version.IsAtLeast(V0_10_2_0) {
request.Version = 2
} else if ca.conf.Version.IsAtLeast(V0_8_2_2) {
request.Version = 1
}

Expand Down
2 changes: 1 addition & 1 deletion admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ func TestListConsumerGroupOffsets(t *testing.T) {
expectedOffset := int64(0)

seedBroker.SetHandlerByMap(map[string]MockResponse{
"OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError),
"OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError).SetError(ErrNoError),
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
Expand Down
13 changes: 12 additions & 1 deletion mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
// MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
type MockOffsetFetchResponse struct {
offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
error KError
t TestReporter
}

Expand All @@ -599,15 +600,25 @@ func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int3
return mr
}

func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse {
mr.error = kerror
return mr
}

func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
req := reqBody.(*OffsetFetchRequest)
group := req.ConsumerGroup
res := &OffsetFetchResponse{}
res := &OffsetFetchResponse{Version: req.Version}

for topic, partitions := range mr.offsets[group] {
for partition, block := range partitions {
res.AddBlock(topic, partition, block)
}
}

if res.Version >= 2 {
res.Err = mr.error
}
return res
}

Expand Down

0 comments on commit d9dbc20

Please sign in to comment.