From 209f170e9c935552b3114f490ec0432f938e8ce9 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Tue, 24 Oct 2023 13:55:56 -0400 Subject: [PATCH 1/2] rptest: Add OffsetForLeaderEpoch test for RRR --- ...fset_for_leader_epoch_read_replica_test.py | 172 ++++++++++++++++++ 1 file changed, 172 insertions(+) create mode 100644 tests/rptest/tests/offset_for_leader_epoch_read_replica_test.py diff --git a/tests/rptest/tests/offset_for_leader_epoch_read_replica_test.py b/tests/rptest/tests/offset_for_leader_epoch_read_replica_test.py new file mode 100644 index 0000000000000..3e3e3f4b044b6 --- /dev/null +++ b/tests/rptest/tests/offset_for_leader_epoch_read_replica_test.py @@ -0,0 +1,172 @@ +# Copyright 2023 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from rptest.services.cluster import cluster +from ducktape.mark import parametrize +from ducktape.utils.util import wait_until + +from rptest.services.kgo_verifier_services import KgoVerifierProducer +from rptest.clients.kcl import KCL +from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, SISettings, MetricsEndpoint, make_redpanda_service +from rptest.tests.end_to_end import EndToEndTest +from rptest.clients.types import TopicSpec +from rptest.clients.rpk import RpkTool +from rptest.services.admin import Admin +import time +import random + + +class OffsetForLeaderEpochReadReplicaTest(EndToEndTest): + # Size of the segment in the local storage + segment_size = 0x2000 + # Generate at least 100 terms + num_terms = 20 + + def __init__(self, test_context): + self.extra_rp_conf = { + 'enable_leader_balancer': False, + "log_compaction_interval_ms": 1000 + } + super(OffsetForLeaderEpochReadReplicaTest, self).__init__( + test_context=test_context, + si_settings=SISettings( + test_context, + cloud_storage_max_connections=5, + log_segment_size=OffsetForLeaderEpochReadReplicaTest. + segment_size, + cloud_storage_enable_remote_write=True, + cloud_storage_enable_remote_read=True, + cloud_storage_readreplica_manifest_sync_timeout_ms=500, + cloud_storage_segment_max_upload_interval_sec=5, + fast_uploads=True), + extra_rp_conf=self.extra_rp_conf) + + self.rr_settings = SISettings( + test_context, + bypass_bucket_creation=True, + cloud_storage_enable_remote_write=False, + cloud_storage_max_connections=5, + cloud_storage_readreplica_manifest_sync_timeout_ms=100, + cloud_storage_segment_max_upload_interval_sec=5, + cloud_storage_housekeeping_interval_ms=10) + + self.topic_name = "panda-topic" + self.second_cluster = None + self.epoch_offsets = {} + + def start_second_cluster(self) -> None: + self.second_cluster = make_redpanda_service( + self.test_context, num_brokers=3, si_settings=self.rr_settings) + self.second_cluster.start(start_si=False) + + def create_source_topic(self): + self.rpk_client().create_topic(self.topic_name, + partitions=1, + replicas=3) + + def create_read_replica_topic(self) -> None: + rpk_dst_cluster = RpkTool(self.second_cluster) + bucket_name = self.si_settings.cloud_storage_bucket + + conf = { + 'redpanda.remote.readreplica': bucket_name, + } + + rpk_dst_cluster.create_topic(self.topic_name, config=conf) + + def has_leader(): + partitions = list( + rpk_dst_cluster.describe_topic(self.topic_name, tolerant=True)) + for part in partitions: + if part.leader == -1: + return False + return True + + wait_until(has_leader, + timeout_sec=60, + backoff_sec=10, + err_msg="Failed to create a read-replica, no leadership") + + def produce(self, topic, msg_count): + msg_size = 64 + for _ in range(0, self.num_terms): + KgoVerifierProducer.oneshot(context=self.test_context, + redpanda=self.redpanda, + topic=topic, + msg_size=msg_size, + msg_count=100, + use_transactions=False, + debug_logs=True) + self.transfer_leadership() + time.sleep(1) + + def transfer_leadership(self): + res = list(self.rpk_client().describe_topic(self.topic_name)) + self.epoch_offsets[res[0].leader_epoch] = res[0].high_watermark + self.logger.info( + f"leadership transfer, epoch {res[0]}, high watermark {res[0]}") + + admin = Admin(self.redpanda) + leader = admin.get_partition_leader(namespace='kafka', + topic=self.topic_name, + partition=0) + + broker_ids = [x['node_id'] for x in admin.get_brokers()] + transfer_to = random.choice([n for n in broker_ids if n != leader]) + admin.transfer_leadership_to(namespace="kafka", + topic=self.topic_name, + partition=0, + target_id=transfer_to, + leader_id=leader) + + admin.await_stable_leader(self.topic_name, + partition=0, + namespace='kafka', + timeout_s=20, + backoff_s=2, + check=lambda node_id: node_id == transfer_to) + + def query_offset_for_leader_epoch(self, query_rrr): + self.logger.info( + f"query offset for leader epoch started, RRR={query_rrr}") + + kcl = KCL(self.second_cluster) if query_rrr else KCL(self.redpanda) + + for epoch, offset in self.epoch_offsets.items(): + + def check(): + self.logger.info( + f"querying epoch {epoch} end offsets, expected to get {offset}" + ) + epoch_end_offset = kcl.offset_for_leader_epoch( + topics=self.topic_name, + leader_epoch=epoch)[0].epoch_end_offset + if epoch_end_offset < 0: + # NOT_LEADER_FOR_PARTITION error, retry + return False + self.logger.info( + f"epoch {epoch} end_offset: {epoch_end_offset}, expected offset: {offset}" + ) + assert epoch_end_offset == offset, f"{epoch_end_offset} vs {offset}" + return True + + wait_until(check, + timeout_sec=30, + backoff_sec=10, + err_msg=f"Failed query epoch/offset {epoch}/{offset}") + + @cluster(num_nodes=6, log_allow_list=RESTART_LOG_ALLOW_LIST) + def test_offset_for_leader_epoch(self): + self.start_redpanda(num_nodes=3) + self.create_source_topic() + self.produce(self.topic_name, 1000) + self.query_offset_for_leader_epoch(False) + self.start_second_cluster() + self.create_read_replica_topic() + self.query_offset_for_leader_epoch(True) From 523bb5435c1d1c3ccc3aa2ca0b2aa2296c451b09 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Tue, 24 Oct 2023 13:56:20 -0400 Subject: [PATCH 2/2] cloud_storage: Fix offset_for_leader_epoch for RRR RRR partition can return incorrect offset for epoch because it uses information from local Raft group. This commit fixes the issue by always using data from the cloud storage if the topic is a read-replica. --- src/v/kafka/server/replicated_partition.cc | 28 ++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index ef946c2fbb4a6..4e2170ee2522e 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -353,14 +353,28 @@ replicated_partition::get_leader_epoch_last_offset_unbounded( const auto first_local_offset = _partition->raft_start_offset(); const auto first_local_term = _partition->get_term(first_local_offset); const auto last_local_term = _partition->term(); - if (term > last_local_term) { + const auto is_read_replica = _partition->is_read_replica_mode_enabled(); + + vlog( + klog.debug, + "{} get_leader_epoch_last_offset_unbounded, term {}, first local offset " + "{}, " + "first local term {}, last local term {}, is read replica {}", + _partition->get_ntp_config().ntp(), + term, + first_local_offset, + first_local_term, + last_local_term, + is_read_replica); + + if (!is_read_replica && term > last_local_term) { // Request for term that is in the future co_return std::nullopt; } // Look for the highest offset in the requested term, or the first offset // in the next term. This mirrors behavior in Kafka, see // https://github.com/apache/kafka/blob/97105a8e5812135515f5a0fa4d5ff554d80df2fe/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java#L255-L281 - if (term >= first_local_term) { + if (!is_read_replica && term >= first_local_term) { auto last_offset = _partition->get_term_last_offset(term); if (last_offset) { co_return _translator->from_log_offset(*last_offset); @@ -370,8 +384,14 @@ replicated_partition::get_leader_epoch_last_offset_unbounded( // Check cloud storage for a viable offset. if ( - _partition->is_remote_fetch_enabled() - && _partition->cloud_data_available()) { + is_read_replica + || (_partition->is_remote_fetch_enabled() && _partition->cloud_data_available())) { + if (is_read_replica && !_partition->cloud_data_available()) { + // If we didn't sync the manifest yet the cloud_data_available will + // return false. We can't call `get_cloud_term_last_offset` in this + // case but we also can't use `first_local_offset` for read replica. + co_return std::nullopt; + } auto last_offset = co_await _partition->get_cloud_term_last_offset( term); if (last_offset) {