Skip to content

Commit

Permalink
raft: in append_entries skip batches that we already have
Browse files Browse the repository at this point in the history
This is important for the case when we already have _all_ batches locally
(possible if e.g. the request was delayed/duplicated). In this case we don't
want to truncate, otherwise we might lose already committed data.

Fixes #17731
  • Loading branch information
ztlpn committed Apr 18, 2024
1 parent 2f432c2 commit f0c5772
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 9 deletions.
67 changes: 58 additions & 9 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1988,11 +1988,60 @@ consensus::do_append_entries(append_entries_request&& r) {
co_return reply;
}

// special case heartbeat case
model::offset adjusted_prev_log_index = request_metadata.prev_log_index;
if (adjusted_prev_log_index < last_log_offset) {
// The append point is before the end of our log. We need to skip
// over batches that we already have (they will have the matching
// term) to find the true truncation point. This is important for the
// case when we already have _all_ batches locally (possible if e.g.
// the request was delayed/duplicated). In this case we don't want to
// truncate, otherwise we might lose already committed data.

struct find_mismatch_consumer {
const consensus& parent;
model::offset last_log_offset;
model::offset last_matched;

ss::future<ss::stop_iteration>
operator()(const model::record_batch& b) {
model::offset last_batch_offset
= last_matched
+ model::offset(b.header().last_offset_delta + 1);
if (
last_batch_offset > last_log_offset
|| parent.get_term(last_batch_offset) != b.term()) {
co_return ss::stop_iteration::yes;
}
last_matched = last_batch_offset;
co_return ss::stop_iteration::no;
}

model::offset end_of_stream() { return last_matched; }
};

model::offset last_matched = co_await r.batches().peek_each_ref(
find_mismatch_consumer{
.parent = *this,
.last_log_offset = last_log_offset,
.last_matched = adjusted_prev_log_index},
model::no_timeout); // no_timeout as the batches are already in memory
if (last_matched != adjusted_prev_log_index) {
vlog(
_ctxlog.info,
"skipped matching records in append_entries batch from {} to {}, "
"current state: {}",
adjusted_prev_log_index,
last_matched,
meta());
adjusted_prev_log_index = last_matched;
}
}

// special case for heartbeats and batches without new records.
// we need to handle it early (before executing truncation)
// as timeouts are asynchronous to append calls and can have stall data
if (r.batches().is_end_of_stream()) {
if (request_metadata.prev_log_index < last_log_offset) {
if (adjusted_prev_log_index < last_log_offset) {
// do not tuncate on heartbeat just response with false
reply.result = reply_result::failure;
co_return reply;
Expand Down Expand Up @@ -2026,7 +2075,7 @@ consensus::do_append_entries(append_entries_request&& r) {
co_return reply;
}

if (request_metadata.prev_log_index < request_metadata.dirty_offset) {
if (adjusted_prev_log_index < request_metadata.dirty_offset) {
// This is a valid recovery request. In case we haven't allowed it,
// defer to the leader and force-enter the recovery state.
upsert_recovery_state(
Expand All @@ -2035,8 +2084,8 @@ consensus::do_append_entries(append_entries_request&& r) {
}

// section 3
if (request_metadata.prev_log_index < last_log_offset) {
if (unlikely(request_metadata.prev_log_index < _commit_index)) {
if (adjusted_prev_log_index < last_log_offset) {
if (unlikely(adjusted_prev_log_index < _commit_index)) {
reply.result = reply_result::success;
// clamp dirty offset to the current commit index not to allow
// leader reasoning about follower log beyond that point
Expand All @@ -2051,14 +2100,14 @@ consensus::do_append_entries(append_entries_request&& r) {
co_return reply;
}
auto truncate_at = model::next_offset(
model::offset(request_metadata.prev_log_index));
model::offset(adjusted_prev_log_index));
vlog(
_ctxlog.info,
"Truncating log in term: {}, Request previous log index: {} is "
"earlier than log end offset: {}, last visible index: {}, leader "
"last visible index: {}. Truncating to: {}",
request_metadata.term,
request_metadata.prev_log_index,
adjusted_prev_log_index,
lstats.dirty_offset,
last_visible_index(),
_last_leader_visible_offset,
Expand Down Expand Up @@ -2089,12 +2138,12 @@ consensus::do_append_entries(append_entries_request&& r) {
update_follower_stats(_configuration_manager.get_latest());

auto lstats = _log->offsets();
if (unlikely(lstats.dirty_offset != r.metadata().prev_log_index)) {
if (unlikely(lstats.dirty_offset != adjusted_prev_log_index)) {
vlog(
_ctxlog.warn,
"Log truncation error, expected offset: {}, log offsets: {}, "
"requested truncation at {}",
r.metadata().prev_log_index,
adjusted_prev_log_index,
lstats,
truncate_at);
_flushed_offset = std::min(
Expand Down
1 change: 1 addition & 0 deletions src/v/raft/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ struct append_entries_request
return std::move(_batches);
}

model::record_batch_reader& batches() { return _batches; }
const model::record_batch_reader& batches() const { return _batches; }

static append_entries_request make_foreign(append_entries_request&& req);
Expand Down

0 comments on commit f0c5772

Please sign in to comment.