Skip to content

Commit

Permalink
cloud_storage: Fix offset_for_leader_epoch for RRR
Browse files Browse the repository at this point in the history
RRR partition can return incorrect offset for epoch because it uses
information from local Raft group. This commit fixes the issue by always
using data from the cloud storage if the topic is a read-replica.
  • Loading branch information
Lazin committed Oct 24, 2023
1 parent 209f170 commit 7a8d871
Showing 1 changed file with 18 additions and 4 deletions.
22 changes: 18 additions & 4 deletions src/v/kafka/server/replicated_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,14 +353,28 @@ replicated_partition::get_leader_epoch_last_offset_unbounded(
const auto first_local_offset = _partition->raft_start_offset();
const auto first_local_term = _partition->get_term(first_local_offset);
const auto last_local_term = _partition->term();
if (term > last_local_term) {
const auto is_read_replica = _partition->is_read_replica_mode_enabled();

vlog(
klog.debug,
"{} get_leader_epoch_last_offset_unbounded, term {}, first local offset "
"{}, "
"first local term {}, last local term {}, is read replica {}",
_partition->get_ntp_config().ntp(),
term,
first_local_offset,
first_local_term,
last_local_term,
is_read_replica);

if (!is_read_replica && term > last_local_term) {
// Request for term that is in the future
co_return std::nullopt;
}
// Look for the highest offset in the requested term, or the first offset
// in the next term. This mirrors behavior in Kafka, see
// https://github.com/apache/kafka/blob/97105a8e5812135515f5a0fa4d5ff554d80df2fe/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java#L255-L281
if (term >= first_local_term) {
if (!is_read_replica && term >= first_local_term) {
auto last_offset = _partition->get_term_last_offset(term);
if (last_offset) {
co_return _translator->from_log_offset(*last_offset);
Expand All @@ -370,8 +384,8 @@ replicated_partition::get_leader_epoch_last_offset_unbounded(

// Check cloud storage for a viable offset.
if (
_partition->is_remote_fetch_enabled()
&& _partition->cloud_data_available()) {
is_read_replica
|| (_partition->is_remote_fetch_enabled() && _partition->cloud_data_available())) {
auto last_offset = co_await _partition->get_cloud_term_last_offset(
term);
if (last_offset) {
Expand Down

0 comments on commit 7a8d871

Please sign in to comment.