Skip to content

Commit

Permalink
cloud_storage: Fix offset_for_leader_epoch for RRR
Browse files Browse the repository at this point in the history
RRR partition can return incorrect offset for epoch because it uses
information from local Raft group. This commit fixes the issue by always
using data from the cloud storage if the topic is a read-replica.

(cherry picked from commit 523bb54)
  • Loading branch information
Lazin authored and vbotbuildovich committed Oct 25, 2023
1 parent aa60294 commit 277d759
Showing 1 changed file with 24 additions and 4 deletions.
28 changes: 24 additions & 4 deletions src/v/kafka/server/replicated_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -341,14 +341,28 @@ replicated_partition::get_leader_epoch_last_offset_unbounded(
const auto first_local_offset = _partition->raft_start_offset();
const auto first_local_term = _partition->get_term(first_local_offset);
const auto last_local_term = _partition->term();
if (term > last_local_term) {
const auto is_read_replica = _partition->is_read_replica_mode_enabled();

vlog(
klog.debug,
"{} get_leader_epoch_last_offset_unbounded, term {}, first local offset "
"{}, "
"first local term {}, last local term {}, is read replica {}",
_partition->get_ntp_config().ntp(),
term,
first_local_offset,
first_local_term,
last_local_term,
is_read_replica);

if (!is_read_replica && term > last_local_term) {
// Request for term that is in the future
co_return std::nullopt;
}
// Look for the highest offset in the requested term, or the first offset
// in the next term. This mirrors behavior in Kafka, see
// https://github.com/apache/kafka/blob/97105a8e5812135515f5a0fa4d5ff554d80df2fe/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java#L255-L281
if (term >= first_local_term) {
if (!is_read_replica && term >= first_local_term) {
auto last_offset = _partition->get_term_last_offset(term);
if (last_offset) {
co_return _translator->from_log_offset(*last_offset);
Expand All @@ -358,8 +372,14 @@ replicated_partition::get_leader_epoch_last_offset_unbounded(

// Check cloud storage for a viable offset.
if (
_partition->is_remote_fetch_enabled()
&& _partition->cloud_data_available()) {
is_read_replica
|| (_partition->is_remote_fetch_enabled() && _partition->cloud_data_available())) {
if (is_read_replica && !_partition->cloud_data_available()) {
// If we didn't sync the manifest yet the cloud_data_available will
// return false. We can't call `get_cloud_term_last_offset` in this
// case but we also can't use `first_local_offset` for read replica.
co_return std::nullopt;
}
auto last_offset = co_await _partition->get_cloud_term_last_offset(
term);
if (last_offset) {
Expand Down

0 comments on commit 277d759

Please sign in to comment.