From 277d759d0db2da3ee7847f8dc442c70617df58f0 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Tue, 24 Oct 2023 13:56:20 -0400 Subject: [PATCH] cloud_storage: Fix offset_for_leader_epoch for RRR RRR partition can return incorrect offset for epoch because it uses information from local Raft group. This commit fixes the issue by always using data from the cloud storage if the topic is a read-replica. (cherry picked from commit 523bb5435c1d1c3ccc3aa2ca0b2aa2296c451b09) --- src/v/kafka/server/replicated_partition.cc | 28 ++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index 8ca56a4f5909..8e72b75a34e6 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -341,14 +341,28 @@ replicated_partition::get_leader_epoch_last_offset_unbounded( const auto first_local_offset = _partition->raft_start_offset(); const auto first_local_term = _partition->get_term(first_local_offset); const auto last_local_term = _partition->term(); - if (term > last_local_term) { + const auto is_read_replica = _partition->is_read_replica_mode_enabled(); + + vlog( + klog.debug, + "{} get_leader_epoch_last_offset_unbounded, term {}, first local offset " + "{}, " + "first local term {}, last local term {}, is read replica {}", + _partition->get_ntp_config().ntp(), + term, + first_local_offset, + first_local_term, + last_local_term, + is_read_replica); + + if (!is_read_replica && term > last_local_term) { // Request for term that is in the future co_return std::nullopt; } // Look for the highest offset in the requested term, or the first offset // in the next term. This mirrors behavior in Kafka, see // https://github.com/apache/kafka/blob/97105a8e5812135515f5a0fa4d5ff554d80df2fe/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java#L255-L281 - if (term >= first_local_term) { + if (!is_read_replica && term >= first_local_term) { auto last_offset = _partition->get_term_last_offset(term); if (last_offset) { co_return _translator->from_log_offset(*last_offset); @@ -358,8 +372,14 @@ replicated_partition::get_leader_epoch_last_offset_unbounded( // Check cloud storage for a viable offset. if ( - _partition->is_remote_fetch_enabled() - && _partition->cloud_data_available()) { + is_read_replica + || (_partition->is_remote_fetch_enabled() && _partition->cloud_data_available())) { + if (is_read_replica && !_partition->cloud_data_available()) { + // If we didn't sync the manifest yet the cloud_data_available will + // return false. We can't call `get_cloud_term_last_offset` in this + // case but we also can't use `first_local_offset` for read replica. + co_return std::nullopt; + } auto last_offset = co_await _partition->get_cloud_term_last_offset( term); if (last_offset) {