Skip to content

Commit

Permalink
raft: in append_entries skip batches that we already have
Browse files Browse the repository at this point in the history
This is important for the case when we already have _all_ batches locally
(possible if e.g. the request was delayed/duplicated). In this case we don't
want to truncate, otherwise we might lose already committed data.

Fixes #17731
  • Loading branch information
ztlpn committed Apr 16, 2024
1 parent 2f432c2 commit 09b8df7
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 9 deletions.
59 changes: 50 additions & 9 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1988,11 +1988,52 @@ consensus::do_append_entries(append_entries_request&& r) {
co_return reply;
}

// special case heartbeat case
model::offset batch_prev_log_index = request_metadata.prev_log_index;
if (batch_prev_log_index < last_log_offset) {
// The append point is before the end of our log. We need to skip
// over batches that we already have (they will have the matching
// term) to find the true truncation point. This is important for the
// case when we already have _all_ batches locally (possible if e.g.
// the request was delayed/duplicated). In this case we don't want to
// truncate, otherwise we might lose already committed data.

struct find_mismatch_consumer {
ss::future<ss::stop_iteration>
operator()(const model::record_batch& b) {
if (parent.get_term(b.last_offset()) != b.term()) {
co_return ss::stop_iteration::yes;
}
last_matched = b.last_offset();
co_return ss::stop_iteration::no;
}

model::offset end_of_stream() { return last_matched; }

consensus& parent;
model::offset last_matched;
};

model::offset last_matched = co_await r.batches().peek_each_ref(
find_mismatch_consumer{
.parent = *this, .last_matched = batch_prev_log_index},
model::no_timeout); // no_timeout as the batches are already in memory
if (last_matched != batch_prev_log_index) {
vlog(
_ctxlog.info,
"skipped matching records in append_entries batch from {} to {}, "
"current state: {}",
batch_prev_log_index,
last_matched,
meta());
batch_prev_log_index = last_matched;
}
}

// special case for heartbeats and batches without new records.
// we need to handle it early (before executing truncation)
// as timeouts are asynchronous to append calls and can have stall data
if (r.batches().is_end_of_stream()) {
if (request_metadata.prev_log_index < last_log_offset) {
if (batch_prev_log_index < last_log_offset) {
// do not tuncate on heartbeat just response with false
reply.result = reply_result::failure;
co_return reply;
Expand Down Expand Up @@ -2026,7 +2067,7 @@ consensus::do_append_entries(append_entries_request&& r) {
co_return reply;
}

if (request_metadata.prev_log_index < request_metadata.dirty_offset) {
if (batch_prev_log_index < request_metadata.dirty_offset) {
// This is a valid recovery request. In case we haven't allowed it,
// defer to the leader and force-enter the recovery state.
upsert_recovery_state(
Expand All @@ -2035,8 +2076,8 @@ consensus::do_append_entries(append_entries_request&& r) {
}

// section 3
if (request_metadata.prev_log_index < last_log_offset) {
if (unlikely(request_metadata.prev_log_index < _commit_index)) {
if (batch_prev_log_index < last_log_offset) {
if (unlikely(batch_prev_log_index < _commit_index)) {
reply.result = reply_result::success;
// clamp dirty offset to the current commit index not to allow
// leader reasoning about follower log beyond that point
Expand All @@ -2051,14 +2092,14 @@ consensus::do_append_entries(append_entries_request&& r) {
co_return reply;
}
auto truncate_at = model::next_offset(
model::offset(request_metadata.prev_log_index));
model::offset(batch_prev_log_index));
vlog(
_ctxlog.info,
"Truncating log in term: {}, Request previous log index: {} is "
"earlier than log end offset: {}, last visible index: {}, leader "
"last visible index: {}. Truncating to: {}",
request_metadata.term,
request_metadata.prev_log_index,
batch_prev_log_index,
lstats.dirty_offset,
last_visible_index(),
_last_leader_visible_offset,
Expand Down Expand Up @@ -2089,12 +2130,12 @@ consensus::do_append_entries(append_entries_request&& r) {
update_follower_stats(_configuration_manager.get_latest());

auto lstats = _log->offsets();
if (unlikely(lstats.dirty_offset != r.metadata().prev_log_index)) {
if (unlikely(lstats.dirty_offset != batch_prev_log_index)) {
vlog(
_ctxlog.warn,
"Log truncation error, expected offset: {}, log offsets: {}, "
"requested truncation at {}",
r.metadata().prev_log_index,
batch_prev_log_index,
lstats,
truncate_at);
_flushed_offset = std::min(
Expand Down
1 change: 1 addition & 0 deletions src/v/raft/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ struct append_entries_request
return std::move(_batches);
}

model::record_batch_reader& batches() { return _batches; }
const model::record_batch_reader& batches() const { return _batches; }

static append_entries_request make_foreign(append_entries_request&& req);
Expand Down

0 comments on commit 09b8df7

Please sign in to comment.