diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index ef946c2fbb4a6..4e2170ee2522e 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -353,14 +353,28 @@ replicated_partition::get_leader_epoch_last_offset_unbounded( const auto first_local_offset = _partition->raft_start_offset(); const auto first_local_term = _partition->get_term(first_local_offset); const auto last_local_term = _partition->term(); - if (term > last_local_term) { + const auto is_read_replica = _partition->is_read_replica_mode_enabled(); + + vlog( + klog.debug, + "{} get_leader_epoch_last_offset_unbounded, term {}, first local offset " + "{}, " + "first local term {}, last local term {}, is read replica {}", + _partition->get_ntp_config().ntp(), + term, + first_local_offset, + first_local_term, + last_local_term, + is_read_replica); + + if (!is_read_replica && term > last_local_term) { // Request for term that is in the future co_return std::nullopt; } // Look for the highest offset in the requested term, or the first offset // in the next term. This mirrors behavior in Kafka, see // https://github.com/apache/kafka/blob/97105a8e5812135515f5a0fa4d5ff554d80df2fe/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java#L255-L281 - if (term >= first_local_term) { + if (!is_read_replica && term >= first_local_term) { auto last_offset = _partition->get_term_last_offset(term); if (last_offset) { co_return _translator->from_log_offset(*last_offset); @@ -370,8 +384,14 @@ replicated_partition::get_leader_epoch_last_offset_unbounded( // Check cloud storage for a viable offset. if ( - _partition->is_remote_fetch_enabled() - && _partition->cloud_data_available()) { + is_read_replica + || (_partition->is_remote_fetch_enabled() && _partition->cloud_data_available())) { + if (is_read_replica && !_partition->cloud_data_available()) { + // If we didn't sync the manifest yet the cloud_data_available will + // return false. We can't call `get_cloud_term_last_offset` in this + // case but we also can't use `first_local_offset` for read replica. + co_return std::nullopt; + } auto last_offset = co_await _partition->get_cloud_term_last_offset( term); if (last_offset) { diff --git a/tests/rptest/tests/offset_for_leader_epoch_read_replica_test.py b/tests/rptest/tests/offset_for_leader_epoch_read_replica_test.py new file mode 100644 index 0000000000000..3e3e3f4b044b6 --- /dev/null +++ b/tests/rptest/tests/offset_for_leader_epoch_read_replica_test.py @@ -0,0 +1,172 @@ +# Copyright 2023 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from rptest.services.cluster import cluster +from ducktape.mark import parametrize +from ducktape.utils.util import wait_until + +from rptest.services.kgo_verifier_services import KgoVerifierProducer +from rptest.clients.kcl import KCL +from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, SISettings, MetricsEndpoint, make_redpanda_service +from rptest.tests.end_to_end import EndToEndTest +from rptest.clients.types import TopicSpec +from rptest.clients.rpk import RpkTool +from rptest.services.admin import Admin +import time +import random + + +class OffsetForLeaderEpochReadReplicaTest(EndToEndTest): + # Size of the segment in the local storage + segment_size = 0x2000 + # Generate at least 100 terms + num_terms = 20 + + def __init__(self, test_context): + self.extra_rp_conf = { + 'enable_leader_balancer': False, + "log_compaction_interval_ms": 1000 + } + super(OffsetForLeaderEpochReadReplicaTest, self).__init__( + test_context=test_context, + si_settings=SISettings( + test_context, + cloud_storage_max_connections=5, + log_segment_size=OffsetForLeaderEpochReadReplicaTest. + segment_size, + cloud_storage_enable_remote_write=True, + cloud_storage_enable_remote_read=True, + cloud_storage_readreplica_manifest_sync_timeout_ms=500, + cloud_storage_segment_max_upload_interval_sec=5, + fast_uploads=True), + extra_rp_conf=self.extra_rp_conf) + + self.rr_settings = SISettings( + test_context, + bypass_bucket_creation=True, + cloud_storage_enable_remote_write=False, + cloud_storage_max_connections=5, + cloud_storage_readreplica_manifest_sync_timeout_ms=100, + cloud_storage_segment_max_upload_interval_sec=5, + cloud_storage_housekeeping_interval_ms=10) + + self.topic_name = "panda-topic" + self.second_cluster = None + self.epoch_offsets = {} + + def start_second_cluster(self) -> None: + self.second_cluster = make_redpanda_service( + self.test_context, num_brokers=3, si_settings=self.rr_settings) + self.second_cluster.start(start_si=False) + + def create_source_topic(self): + self.rpk_client().create_topic(self.topic_name, + partitions=1, + replicas=3) + + def create_read_replica_topic(self) -> None: + rpk_dst_cluster = RpkTool(self.second_cluster) + bucket_name = self.si_settings.cloud_storage_bucket + + conf = { + 'redpanda.remote.readreplica': bucket_name, + } + + rpk_dst_cluster.create_topic(self.topic_name, config=conf) + + def has_leader(): + partitions = list( + rpk_dst_cluster.describe_topic(self.topic_name, tolerant=True)) + for part in partitions: + if part.leader == -1: + return False + return True + + wait_until(has_leader, + timeout_sec=60, + backoff_sec=10, + err_msg="Failed to create a read-replica, no leadership") + + def produce(self, topic, msg_count): + msg_size = 64 + for _ in range(0, self.num_terms): + KgoVerifierProducer.oneshot(context=self.test_context, + redpanda=self.redpanda, + topic=topic, + msg_size=msg_size, + msg_count=100, + use_transactions=False, + debug_logs=True) + self.transfer_leadership() + time.sleep(1) + + def transfer_leadership(self): + res = list(self.rpk_client().describe_topic(self.topic_name)) + self.epoch_offsets[res[0].leader_epoch] = res[0].high_watermark + self.logger.info( + f"leadership transfer, epoch {res[0]}, high watermark {res[0]}") + + admin = Admin(self.redpanda) + leader = admin.get_partition_leader(namespace='kafka', + topic=self.topic_name, + partition=0) + + broker_ids = [x['node_id'] for x in admin.get_brokers()] + transfer_to = random.choice([n for n in broker_ids if n != leader]) + admin.transfer_leadership_to(namespace="kafka", + topic=self.topic_name, + partition=0, + target_id=transfer_to, + leader_id=leader) + + admin.await_stable_leader(self.topic_name, + partition=0, + namespace='kafka', + timeout_s=20, + backoff_s=2, + check=lambda node_id: node_id == transfer_to) + + def query_offset_for_leader_epoch(self, query_rrr): + self.logger.info( + f"query offset for leader epoch started, RRR={query_rrr}") + + kcl = KCL(self.second_cluster) if query_rrr else KCL(self.redpanda) + + for epoch, offset in self.epoch_offsets.items(): + + def check(): + self.logger.info( + f"querying epoch {epoch} end offsets, expected to get {offset}" + ) + epoch_end_offset = kcl.offset_for_leader_epoch( + topics=self.topic_name, + leader_epoch=epoch)[0].epoch_end_offset + if epoch_end_offset < 0: + # NOT_LEADER_FOR_PARTITION error, retry + return False + self.logger.info( + f"epoch {epoch} end_offset: {epoch_end_offset}, expected offset: {offset}" + ) + assert epoch_end_offset == offset, f"{epoch_end_offset} vs {offset}" + return True + + wait_until(check, + timeout_sec=30, + backoff_sec=10, + err_msg=f"Failed query epoch/offset {epoch}/{offset}") + + @cluster(num_nodes=6, log_allow_list=RESTART_LOG_ALLOW_LIST) + def test_offset_for_leader_epoch(self): + self.start_redpanda(num_nodes=3) + self.create_source_topic() + self.produce(self.topic_name, 1000) + self.query_offset_for_leader_epoch(False) + self.start_second_cluster() + self.create_read_replica_topic() + self.query_offset_for_leader_epoch(True)