Skip to content

Commit

Permalink
fix: make fetchInitialOffset use correct protocol (#2705)
Browse files Browse the repository at this point in the history
The `fetchInitialOffset` func in offsetManager was hardcoded to send a
Version 1 OffsetFetchRequest rather than sending the appropriate version
based on the config KafkaVersion. As discussed in #2694 this meant that
LeaderEpoch was always being decoded as the default value '0' (because
it was only returned in Version >= 5 OffsetFetchRequest). However, other
areas of the offsetManager code were sending the newer protocol
versions, so for example the OffsetCommitRequest would include a leader
epoch value of 0 rather than an accurate one.

Correct this bug by sending the correct protocol version in
fetchInitialOffset and also ensure we default to `-1` when we decode an
OffsetFetchResponse of a Version < 5

Fixes #2694

Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
  • Loading branch information
dnwe committed Nov 3, 2023
1 parent a46917f commit 27710af
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 37 deletions.
30 changes: 1 addition & 29 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,35 +1018,7 @@ func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions m
return nil, err
}

request := &OffsetFetchRequest{
ConsumerGroup: group,
partitions: topicPartitions,
}

if ca.conf.Version.IsAtLeast(V2_5_0_0) {
// Version 7 is adding the require stable flag.
request.Version = 7
} else if ca.conf.Version.IsAtLeast(V2_4_0_0) {
// Version 6 is the first flexible version.
request.Version = 6
} else if ca.conf.Version.IsAtLeast(V2_1_0_0) {
// Version 3, 4, and 5 are the same as version 2.
request.Version = 5
} else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
request.Version = 4
} else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 3
} else if ca.conf.Version.IsAtLeast(V0_10_2_0) {
// Starting in version 2, the request can contain a null topics array to indicate that offsets
// for all topics should be fetched. It also returns a top level error code
// for group or coordinator level errors.
request.Version = 2
} else if ca.conf.Version.IsAtLeast(V0_8_2_0) {
// In version 0, the request read offsets from ZK.
//
// Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic.
request.Version = 1
}
request := NewOffsetFetchRequest(ca.conf.Version, group, topicPartitions)

return coordinator.FetchOffset(request)
}
Expand Down
37 changes: 37 additions & 0 deletions offset_fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,43 @@ type OffsetFetchRequest struct {
partitions map[string][]int32
}

func NewOffsetFetchRequest(
version KafkaVersion,
group string,
partitions map[string][]int32,
) *OffsetFetchRequest {
request := &OffsetFetchRequest{
ConsumerGroup: group,
partitions: partitions,
}
if version.IsAtLeast(V2_5_0_0) {
// Version 7 is adding the require stable flag.
request.Version = 7
} else if version.IsAtLeast(V2_4_0_0) {
// Version 6 is the first flexible version.
request.Version = 6
} else if version.IsAtLeast(V2_1_0_0) {
// Version 3, 4, and 5 are the same as version 2.
request.Version = 5
} else if version.IsAtLeast(V2_0_0_0) {
request.Version = 4
} else if version.IsAtLeast(V0_11_0_0) {
request.Version = 3
} else if version.IsAtLeast(V0_10_2_0) {
// Starting in version 2, the request can contain a null topics array to indicate that offsets
// for all topics should be fetched. It also returns a top level error code
// for group or coordinator level errors.
request.Version = 2
} else if version.IsAtLeast(V0_8_2_0) {
// In version 0, the request read offsets from ZK.
//
// Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic.
request.Version = 1
}

return request
}

func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
if r.Version < 0 || r.Version > 7 {
return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
Expand Down
2 changes: 2 additions & 0 deletions offset_fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err
if err != nil {
return err
}
} else {
b.LeaderEpoch = -1
}

if isFlexible {
Expand Down
6 changes: 3 additions & 3 deletions offset_fetch_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,19 @@ func TestNormalOffsetFetchResponse(t *testing.T) {

for version := 0; version <= 1; version++ {
response := OffsetFetchResponse{Version: int16(version)}
response.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
response.AddBlock("t", 0, &OffsetFetchResponseBlock{0, -1, "md", ErrRequestTimedOut})
response.Blocks["m"] = nil
testResponse(t, fmt.Sprintf("Normal v%d", version), &response, nil)
}

responseV2 := OffsetFetchResponse{Version: 2, Err: ErrInvalidRequest}
responseV2.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
responseV2.AddBlock("t", 0, &OffsetFetchResponseBlock{0, -1, "md", ErrRequestTimedOut})
responseV2.Blocks["m"] = nil
testResponse(t, "normal V2", &responseV2, nil)

for version := 3; version <= 4; version++ {
responseV3 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9}
responseV3.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
responseV3.AddBlock("t", 0, &OffsetFetchResponseBlock{0, -1, "md", ErrRequestTimedOut})
responseV3.Blocks["m"] = nil
testResponse(t, fmt.Sprintf("Normal v%d", version), &responseV3, nil)
}
Expand Down
7 changes: 2 additions & 5 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,8 @@ func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retri
return om.fetchInitialOffset(topic, partition, retries-1)
}

req := new(OffsetFetchRequest)
req.Version = 1
req.ConsumerGroup = om.group
req.AddPartition(topic, partition)

partitions := map[string][]int32{topic: {partition}}
req := NewOffsetFetchRequest(om.conf.Version, om.group, partitions)
resp, err := broker.FetchOffset(req)
if err != nil {
if retries <= 0 {
Expand Down

0 comments on commit 27710af

Please sign in to comment.