Skip to content

Commit

Permalink
rptest: handle not_coordinator error when fetching offsets
Browse files Browse the repository at this point in the history
After broker restart it might take a while before the group offsets
are available during which redpanda will reply with
`error_code::not_coordinator`[^0]. We need to keep retrying after they
become available.

Fixes redpanda-data#17466

[^0]: https://github.com/redpanda-data/redpanda/blob/501b9d35882a303def6061bdc522f67f0502ac1c/src/v/kafka/server/group_manager.cc#L1653

(cherry picked from commit 97b4c33)
  • Loading branch information
nvartolomei authored and vbotbuildovich committed Apr 6, 2024
1 parent 3fe9c0d commit 7f09ee5
Showing 1 changed file with 18 additions and 10 deletions.
28 changes: 18 additions & 10 deletions tests/rptest/tests/consumer_group_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from rptest.services.rpk_producer import RpkProducer
from rptest.services.verifiable_consumer import VerifiableConsumer
from rptest.tests.redpanda_test import RedpandaTest
from rptest.util import wait_until_result
from rptest.utils.mode_checks import skip_debug_mode

from ducktape.utils.util import wait_until
Expand Down Expand Up @@ -406,21 +407,28 @@ def test_group_recovery(self):
assert offsets[TopicPartition(self.topic_spec.name,
0)].leader_epoch > 0

# Remember the old offsets to compare them after the restart.
prev_offsets = offsets

# Restart the broker.
self.logger.info("Restarting redpanda nodes.")
self.redpanda.restart_nodes(self.redpanda.nodes)
self.redpanda._admin.await_stable_leader("controller",
partition=0,
namespace='redpanda',
timeout_s=60,
backoff_s=2)

prev_offsets = offsets

# Validate that the group state is recovered.
test_admin = KafkaTestAdminClient(self.redpanda)
offsets = test_admin.list_offsets(
group_id, [TopicPartition(self.topic_spec.name, 0)])
def try_list_offsets():
try:
test_admin = KafkaTestAdminClient(self.redpanda)
return test_admin.list_offsets(
group_id, [TopicPartition(self.topic_spec.name, 0)])
except Exception as e:
self.logger.debug(f"Failed to list offsets: {e}")
return None

offsets = wait_until_result(
try_list_offsets,
timeout_sec=30,
backoff_sec=3,
err_msg="Failed to make list_offsets request")

self.logger.info(f"Got offsets after restart: {offsets}")
assert len(offsets) == 1
Expand Down

0 comments on commit 7f09ee5

Please sign in to comment.