From 7f09ee5e86dc8137f6e414935662b06327ec10e2 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Thu, 4 Apr 2024 11:34:33 +0100 Subject: [PATCH] rptest: handle not_coordinator error when fetching offsets After broker restart it might take a while before the group offsets are available during which redpanda will reply with `error_code::not_coordinator`[^0]. We need to keep retrying after they become available. Fixes #17466 [^0]: https://github.com/redpanda-data/redpanda/blob/501b9d35882a303def6061bdc522f67f0502ac1c/src/v/kafka/server/group_manager.cc#L1653 (cherry picked from commit 97b4c33b1363bae0c49c4bae82d51fc43b198da2) --- tests/rptest/tests/consumer_group_test.py | 28 +++++++++++++++-------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/tests/rptest/tests/consumer_group_test.py b/tests/rptest/tests/consumer_group_test.py index 7f8e75a09f07..c3e3c9376205 100644 --- a/tests/rptest/tests/consumer_group_test.py +++ b/tests/rptest/tests/consumer_group_test.py @@ -21,6 +21,7 @@ from rptest.services.rpk_producer import RpkProducer from rptest.services.verifiable_consumer import VerifiableConsumer from rptest.tests.redpanda_test import RedpandaTest +from rptest.util import wait_until_result from rptest.utils.mode_checks import skip_debug_mode from ducktape.utils.util import wait_until @@ -406,21 +407,28 @@ def test_group_recovery(self): assert offsets[TopicPartition(self.topic_spec.name, 0)].leader_epoch > 0 + # Remember the old offsets to compare them after the restart. + prev_offsets = offsets + # Restart the broker. self.logger.info("Restarting redpanda nodes.") self.redpanda.restart_nodes(self.redpanda.nodes) - self.redpanda._admin.await_stable_leader("controller", - partition=0, - namespace='redpanda', - timeout_s=60, - backoff_s=2) - - prev_offsets = offsets # Validate that the group state is recovered. - test_admin = KafkaTestAdminClient(self.redpanda) - offsets = test_admin.list_offsets( - group_id, [TopicPartition(self.topic_spec.name, 0)]) + def try_list_offsets(): + try: + test_admin = KafkaTestAdminClient(self.redpanda) + return test_admin.list_offsets( + group_id, [TopicPartition(self.topic_spec.name, 0)]) + except Exception as e: + self.logger.debug(f"Failed to list offsets: {e}") + return None + + offsets = wait_until_result( + try_list_offsets, + timeout_sec=30, + backoff_sec=3, + err_msg="Failed to make list_offsets request") self.logger.info(f"Got offsets after restart: {offsets}") assert len(offsets) == 1