diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index 1745bab9af1a..be336115d834 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -767,6 +767,7 @@ ss::future<> group_manager::do_recover_group( .log_offset = meta.log_offset, .offset = meta.metadata.offset, .metadata = meta.metadata.metadata, + .committed_leader_epoch = meta.metadata.leader_epoch, .commit_timestamp = meta.metadata.commit_timestamp, .expiry_timestamp = expiry_timestamp, .non_reclaimable = meta.metadata.non_reclaimable, diff --git a/tests/java/e2e-verifiers/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tests/java/e2e-verifiers/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index 7c2bbfebd064..74259e549417 100644 --- a/tests/java/e2e-verifiers/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tests/java/e2e-verifiers/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -162,10 +162,12 @@ private boolean isFinished() { if (partitionRecords.isEmpty()) continue; long minOffset = partitionRecords.get(0).offset(); - long maxOffset - = partitionRecords.get(partitionRecords.size() - 1).offset(); + var maxRecord = partitionRecords.get(partitionRecords.size() - 1); + long maxOffset = maxRecord.offset(); - offsets.put(tp, new OffsetAndMetadata(maxOffset + 1)); + offsets.put( + tp, + new OffsetAndMetadata(maxOffset + 1, maxRecord.leaderEpoch(), "")); summaries.add(new RecordSetSummary( tp.topic(), tp.partition(), partitionRecords.size(), minOffset, maxOffset)); diff --git a/tests/rptest/tests/consumer_group_test.py b/tests/rptest/tests/consumer_group_test.py index 06ed83f6ab51..871b5b662e6d 100644 --- a/tests/rptest/tests/consumer_group_test.py +++ b/tests/rptest/tests/consumer_group_test.py @@ -8,20 +8,28 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 +from dataclasses import dataclass +from typing import Dict, List + from rptest.clients.offline_log_viewer import OfflineLogViewer from rptest.services.cluster import cluster from rptest.clients.rpk import RpkException, RpkTool from rptest.clients.types import TopicSpec from rptest.services.kafka_cli_consumer import KafkaCliConsumer -from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST +from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, RedpandaService from rptest.services.rpk_producer import RpkProducer +from rptest.services.verifiable_consumer import VerifiableConsumer from rptest.tests.redpanda_test import RedpandaTest +from rptest.utils.mode_checks import skip_debug_mode + from ducktape.utils.util import wait_until from ducktape.mark import parametrize -from kafka import KafkaConsumer - -from rptest.utils.mode_checks import skip_debug_mode +from kafka import KafkaConsumer, TopicPartition +from kafka.admin import KafkaAdminClient +from kafka.protocol.commit import OffsetFetchRequest_v3 +from kafka.protocol.api import Request, Response +import kafka.protocol.types as types class ConsumerGroupTest(RedpandaTest): @@ -362,6 +370,62 @@ def test_consumer_is_removed_when_timedout(self, static_members): c.wait() c.free() + @cluster(num_nodes=4, log_allow_list=RESTART_LOG_ALLOW_LIST) + def test_group_recovery(self): + """ + Test validating that group state is recovered after broker restart. + """ + self.create_topic(1) + + # Produce some messages. + self.start_producer(msg_cnt=1000) + self.producer.wait() + self.producer.free() + + group_id = 'test-gr-1' + + # Consume all messages and commit offsets. + self.consumer = VerifiableConsumer(self.test_context, + num_nodes=1, + redpanda=self.redpanda, + topic=self.topic_spec.name, + group_id=group_id, + max_messages=1000) + self.consumer.start() + self.consumer.wait() + + test_admin = KafkaTestAdminClient(self.redpanda) + offsets = test_admin.list_offsets( + group_id, [TopicPartition(self.topic_spec.name, 0)]) + + # Test that the consumer committed what we expected. + self.logger.info(f"Got offsets: {offsets}") + assert len(offsets) == 1 + assert offsets[TopicPartition(self.topic_spec.name, 0)].offset == 1000 + assert offsets[TopicPartition(self.topic_spec.name, + 0)].leader_epoch > 0 + + # Restart the broker. + self.logger.info("Restarting redpanda nodes.") + self.redpanda.restart_nodes(self.redpanda.nodes) + self.redpanda._admin.await_stable_leader("controller", + partition=0, + namespace='redpanda', + timeout_s=60, + backoff_s=2) + + prev_offsets = offsets + + # Validate that the group state is recovered. + test_admin = KafkaTestAdminClient(self.redpanda) + offsets = test_admin.list_offsets( + group_id, [TopicPartition(self.topic_spec.name, 0)]) + + self.logger.info(f"Got offsets after restart: {offsets}") + assert len(offsets) == 1 + assert offsets == prev_offsets, \ + f"Expected {prev_offsets}, got {offsets}." + @cluster(num_nodes=6, log_allow_list=RESTART_LOG_ALLOW_LIST) @parametrize(static_members=True) @parametrize(static_members=False) @@ -490,3 +554,72 @@ async def create_groups(r): list = rpk.group_list() assert len(list) == groups_in_round * rounds + + +@dataclass +class OffsetAndMetadata(): + offset: int + leader_epoch: int + metadata: str + + +class KafkaTestAdminClient(): + """ + A wrapper around KafkaAdminClient with support for newer Kafka versions. + At the time of writing, KafkaAdminClient doesn't support KIP-320 + (leader epoch) for consumer groups. + """ + def __init__(self, redpanda: RedpandaService): + self._bootstrap_servers = redpanda.brokers() + self._admin = KafkaAdminClient( + bootstrap_servers=self._bootstrap_servers) + + def list_offsets( + self, group_id: str, partitions: List[TopicPartition] + ) -> Dict[TopicPartition, OffsetAndMetadata]: + coordinator = self._admin._find_coordinator_ids([group_id])[group_id] + future = self._list_offsets_send_request(group_id, coordinator, + partitions) + self._admin._wait_for_futures([future]) + response = future.value + return self._list_offsets_send_process_response(response) + + def _list_offsets_send_request(self, group_id: str, coordinator: int, + partitions: List[TopicPartition]): + request = OffsetFetchRequest_v5(consumer_group=group_id, + topics=[(p.topic, [p.partition]) + for p in partitions]) + return self._admin._send_request_to_node(coordinator, request) + + def _list_offsets_send_process_response(self, response): + offsets = {} + for topic, partitions in response.topics: + for partition, offset, leader_epoch, metadata, error_code in partitions: + if error_code != 0: + raise Exception(f"Error code: {error_code}") + offsets[(topic, partition)] = OffsetAndMetadata( + offset, leader_epoch, metadata) + return offsets + + +class OffsetFetchResponse_v5(Response): + API_KEY = 9 + API_VERSION = 5 + SCHEMA = types.Schema( + ('throttle_time_ms', types.Int32), + ('topics', + types.Array( + ('topic', types.String('utf-8')), + ('partitions', + types.Array(('partition', types.Int32), ('offset', types.Int64), + ('leader_epoch', types.Int32), + ('metadata', types.String('utf-8')), + ('error_code', types.Int16))))), + ('error_code', types.Int16)) + + +class OffsetFetchRequest_v5(Request): + API_KEY = 9 + API_VERSION = 5 + RESPONSE_TYPE = OffsetFetchResponse_v5 + SCHEMA = OffsetFetchRequest_v3.SCHEMA