Skip to content

Commit

Permalink
k/group: recover leader epoch on leader change
Browse files Browse the repository at this point in the history
This was discovered while testing write caching feature. After
leadership change or node restart we would reply with default field
value `-2147483648` which breaks the KIP-320 logic.

`check_leader_epoch` in redpanda treats negative epoch values as "not
set" and, I believe, franz-go behaves the same.

As result, KIP-320 fencing is not being applied and the client ends up
with `OFFSET_OUT_OF_RANGE` error.

(cherry picked from commit 501b9d3)
  • Loading branch information
nvartolomei authored and vbotbuildovich committed Mar 26, 2024
1 parent 044c87f commit 232c128
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/v/kafka/server/group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,7 @@ ss::future<> group_manager::do_recover_group(
.log_offset = meta.log_offset,
.offset = meta.metadata.offset,
.metadata = meta.metadata.metadata,
.committed_leader_epoch = meta.metadata.leader_epoch,
.commit_timestamp = meta.metadata.commit_timestamp,
.expiry_timestamp = expiry_timestamp,
.non_reclaimable = meta.metadata.non_reclaimable,
Expand Down
7 changes: 3 additions & 4 deletions tests/rptest/tests/consumer_group_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ def test_group_recovery(self):
timeout_s=60,
backoff_s=2)

prev_offsets = offsets

# Validate that the group state is recovered.
test_admin = KafkaTestAdminClient(self.redpanda)
Expand All @@ -422,10 +423,8 @@ def test_group_recovery(self):

self.logger.info(f"Got offsets after restart: {offsets}")
assert len(offsets) == 1
assert offsets[TopicPartition(self.topic_spec.name, 0)].offset == 1000
assert offsets[TopicPartition(
self.topic_spec.name,
0)].leader_epoch < 0 # This is a bug. Fixed in followup commit.
assert offsets == prev_offsets, \
f"Expected {prev_offsets}, got {offsets}."

@cluster(num_nodes=6, log_allow_list=RESTART_LOG_ALLOW_LIST)
@parametrize(static_members=True)
Expand Down

0 comments on commit 232c128

Please sign in to comment.