From 5e3f33c14b8c479b245fa2afac235c109396f4f9 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Mon, 25 Mar 2024 14:35:53 +0000 Subject: [PATCH 1/3] tests/java: commit offsets with leaderEpoch VerifiableConsumer is KIP-320 compliant now. At least to the degree the java kafka client is. This will facilitates testing of the redpanda implementation of KIP-320 in group recovery (https://github.com/redpanda-data/redpanda/pull/17260) and data loss handling when write caching is enabled. (cherry picked from commit 9316309038fc63f2cc2731ff8b136fca6d848706) --- .../java/org/apache/kafka/tools/VerifiableConsumer.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/java/e2e-verifiers/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tests/java/e2e-verifiers/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index 7c2bbfebd0649..74259e5494178 100644 --- a/tests/java/e2e-verifiers/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tests/java/e2e-verifiers/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -162,10 +162,12 @@ private boolean isFinished() { if (partitionRecords.isEmpty()) continue; long minOffset = partitionRecords.get(0).offset(); - long maxOffset - = partitionRecords.get(partitionRecords.size() - 1).offset(); + var maxRecord = partitionRecords.get(partitionRecords.size() - 1); + long maxOffset = maxRecord.offset(); - offsets.put(tp, new OffsetAndMetadata(maxOffset + 1)); + offsets.put( + tp, + new OffsetAndMetadata(maxOffset + 1, maxRecord.leaderEpoch(), "")); summaries.add(new RecordSetSummary( tp.topic(), tp.partition(), partitionRecords.size(), minOffset, maxOffset)); From 044c87ff67be2b5ed42014c87d56adbd0e86cc14 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Fri, 22 Mar 2024 13:22:01 +0000 Subject: [PATCH 2/3] rptest: Test consumer group commit This tests consumer group commits. The test also shows a bug in which leader_offset is reset after cluster restart. This is fixed in a subsequent commit. (cherry picked from commit e1a3e6540959bd1584e88d94908c6af41631016f) --- tests/rptest/tests/consumer_group_test.py | 142 +++++++++++++++++++++- 1 file changed, 138 insertions(+), 4 deletions(-) diff --git a/tests/rptest/tests/consumer_group_test.py b/tests/rptest/tests/consumer_group_test.py index 06ed83f6ab51f..22439bd5b04f6 100644 --- a/tests/rptest/tests/consumer_group_test.py +++ b/tests/rptest/tests/consumer_group_test.py @@ -8,20 +8,28 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 +from dataclasses import dataclass +from typing import Dict, List + from rptest.clients.offline_log_viewer import OfflineLogViewer from rptest.services.cluster import cluster from rptest.clients.rpk import RpkException, RpkTool from rptest.clients.types import TopicSpec from rptest.services.kafka_cli_consumer import KafkaCliConsumer -from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST +from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, RedpandaService from rptest.services.rpk_producer import RpkProducer +from rptest.services.verifiable_consumer import VerifiableConsumer from rptest.tests.redpanda_test import RedpandaTest +from rptest.utils.mode_checks import skip_debug_mode + from ducktape.utils.util import wait_until from ducktape.mark import parametrize -from kafka import KafkaConsumer - -from rptest.utils.mode_checks import skip_debug_mode +from kafka import KafkaConsumer, TopicPartition +from kafka.admin import KafkaAdminClient +from kafka.protocol.commit import OffsetFetchRequest_v3 +from kafka.protocol.api import Request, Response +import kafka.protocol.types as types class ConsumerGroupTest(RedpandaTest): @@ -362,6 +370,63 @@ def test_consumer_is_removed_when_timedout(self, static_members): c.wait() c.free() + @cluster(num_nodes=4, log_allow_list=RESTART_LOG_ALLOW_LIST) + def test_group_recovery(self): + """ + Test validating that group state is recovered after broker restart. + """ + self.create_topic(1) + + # Produce some messages. + self.start_producer(msg_cnt=1000) + self.producer.wait() + self.producer.free() + + group_id = 'test-gr-1' + + # Consume all messages and commit offsets. + self.consumer = VerifiableConsumer(self.test_context, + num_nodes=1, + redpanda=self.redpanda, + topic=self.topic_spec.name, + group_id=group_id, + max_messages=1000) + self.consumer.start() + self.consumer.wait() + + test_admin = KafkaTestAdminClient(self.redpanda) + offsets = test_admin.list_offsets( + group_id, [TopicPartition(self.topic_spec.name, 0)]) + + # Test that the consumer committed what we expected. + self.logger.info(f"Got offsets: {offsets}") + assert len(offsets) == 1 + assert offsets[TopicPartition(self.topic_spec.name, 0)].offset == 1000 + assert offsets[TopicPartition(self.topic_spec.name, + 0)].leader_epoch > 0 + + # Restart the broker. + self.logger.info("Restarting redpanda nodes.") + self.redpanda.restart_nodes(self.redpanda.nodes) + self.redpanda._admin.await_stable_leader("controller", + partition=0, + namespace='redpanda', + timeout_s=60, + backoff_s=2) + + + # Validate that the group state is recovered. + test_admin = KafkaTestAdminClient(self.redpanda) + offsets = test_admin.list_offsets( + group_id, [TopicPartition(self.topic_spec.name, 0)]) + + self.logger.info(f"Got offsets after restart: {offsets}") + assert len(offsets) == 1 + assert offsets[TopicPartition(self.topic_spec.name, 0)].offset == 1000 + assert offsets[TopicPartition( + self.topic_spec.name, + 0)].leader_epoch < 0 # This is a bug. Fixed in followup commit. + @cluster(num_nodes=6, log_allow_list=RESTART_LOG_ALLOW_LIST) @parametrize(static_members=True) @parametrize(static_members=False) @@ -490,3 +555,72 @@ async def create_groups(r): list = rpk.group_list() assert len(list) == groups_in_round * rounds + + +@dataclass +class OffsetAndMetadata(): + offset: int + leader_epoch: int + metadata: str + + +class KafkaTestAdminClient(): + """ + A wrapper around KafkaAdminClient with support for newer Kafka versions. + At the time of writing, KafkaAdminClient doesn't support KIP-320 + (leader epoch) for consumer groups. + """ + def __init__(self, redpanda: RedpandaService): + self._bootstrap_servers = redpanda.brokers() + self._admin = KafkaAdminClient( + bootstrap_servers=self._bootstrap_servers) + + def list_offsets( + self, group_id: str, partitions: List[TopicPartition] + ) -> Dict[TopicPartition, OffsetAndMetadata]: + coordinator = self._admin._find_coordinator_ids([group_id])[group_id] + future = self._list_offsets_send_request(group_id, coordinator, + partitions) + self._admin._wait_for_futures([future]) + response = future.value + return self._list_offsets_send_process_response(response) + + def _list_offsets_send_request(self, group_id: str, coordinator: int, + partitions: List[TopicPartition]): + request = OffsetFetchRequest_v5(consumer_group=group_id, + topics=[(p.topic, [p.partition]) + for p in partitions]) + return self._admin._send_request_to_node(coordinator, request) + + def _list_offsets_send_process_response(self, response): + offsets = {} + for topic, partitions in response.topics: + for partition, offset, leader_epoch, metadata, error_code in partitions: + if error_code != 0: + raise Exception(f"Error code: {error_code}") + offsets[(topic, partition)] = OffsetAndMetadata( + offset, leader_epoch, metadata) + return offsets + + +class OffsetFetchResponse_v5(Response): + API_KEY = 9 + API_VERSION = 5 + SCHEMA = types.Schema( + ('throttle_time_ms', types.Int32), + ('topics', + types.Array( + ('topic', types.String('utf-8')), + ('partitions', + types.Array(('partition', types.Int32), ('offset', types.Int64), + ('leader_epoch', types.Int32), + ('metadata', types.String('utf-8')), + ('error_code', types.Int16))))), + ('error_code', types.Int16)) + + +class OffsetFetchRequest_v5(Request): + API_KEY = 9 + API_VERSION = 5 + RESPONSE_TYPE = OffsetFetchResponse_v5 + SCHEMA = OffsetFetchRequest_v3.SCHEMA From 232c1288a182ab8449a8aea6a4284ba99cbb29ee Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Fri, 22 Mar 2024 13:22:01 +0000 Subject: [PATCH 3/3] k/group: recover leader epoch on leader change This was discovered while testing write caching feature. After leadership change or node restart we would reply with default field value `-2147483648` which breaks the KIP-320 logic. `check_leader_epoch` in redpanda treats negative epoch values as "not set" and, I believe, franz-go behaves the same. As result, KIP-320 fencing is not being applied and the client ends up with `OFFSET_OUT_OF_RANGE` error. (cherry picked from commit 501b9d35882a303def6061bdc522f67f0502ac1c) --- src/v/kafka/server/group_manager.cc | 1 + tests/rptest/tests/consumer_group_test.py | 7 +++---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index 1745bab9af1a1..be336115d8340 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -767,6 +767,7 @@ ss::future<> group_manager::do_recover_group( .log_offset = meta.log_offset, .offset = meta.metadata.offset, .metadata = meta.metadata.metadata, + .committed_leader_epoch = meta.metadata.leader_epoch, .commit_timestamp = meta.metadata.commit_timestamp, .expiry_timestamp = expiry_timestamp, .non_reclaimable = meta.metadata.non_reclaimable, diff --git a/tests/rptest/tests/consumer_group_test.py b/tests/rptest/tests/consumer_group_test.py index 22439bd5b04f6..871b5b662e6d6 100644 --- a/tests/rptest/tests/consumer_group_test.py +++ b/tests/rptest/tests/consumer_group_test.py @@ -414,6 +414,7 @@ def test_group_recovery(self): timeout_s=60, backoff_s=2) + prev_offsets = offsets # Validate that the group state is recovered. test_admin = KafkaTestAdminClient(self.redpanda) @@ -422,10 +423,8 @@ def test_group_recovery(self): self.logger.info(f"Got offsets after restart: {offsets}") assert len(offsets) == 1 - assert offsets[TopicPartition(self.topic_spec.name, 0)].offset == 1000 - assert offsets[TopicPartition( - self.topic_spec.name, - 0)].leader_epoch < 0 # This is a bug. Fixed in followup commit. + assert offsets == prev_offsets, \ + f"Expected {prev_offsets}, got {offsets}." @cluster(num_nodes=6, log_allow_list=RESTART_LOG_ALLOW_LIST) @parametrize(static_members=True)