diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index f55ad3e45d603..4f26390821993 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -1988,11 +1988,52 @@ consensus::do_append_entries(append_entries_request&& r) { co_return reply; } - // special case heartbeat case + model::offset batch_prev_log_index = request_metadata.prev_log_index; + if (batch_prev_log_index < last_log_offset) { + // The append point is before the end of our log. We need to skip + // over batches that we already have (they will have the matching + // term) to find the true truncation point. This is important for the + // case when we already have _all_ batches locally (possible if e.g. + // the request was delayed/duplicated). In this case we don't want to + // truncate, otherwise we might lose already committed data. + + struct find_mismatch_consumer { + ss::future + operator()(const model::record_batch& b) { + if (parent.get_term(b.last_offset()) != b.term()) { + co_return ss::stop_iteration::yes; + } + last_matched = b.last_offset(); + co_return ss::stop_iteration::no; + } + + model::offset end_of_stream() { return last_matched; } + + consensus& parent; + model::offset last_matched; + }; + + model::offset last_matched = co_await r.batches().peek_each_ref( + find_mismatch_consumer{ + .parent = *this, .last_matched = batch_prev_log_index}, + model::no_timeout); // no_timeout as the batches are already in memory + if (last_matched != batch_prev_log_index) { + vlog( + _ctxlog.info, + "skipped matching records in append_entries batch from {} to {}, " + "current state: {}", + batch_prev_log_index, + last_matched, + meta()); + batch_prev_log_index = last_matched; + } + } + + // special case for heartbeats and batches without new records. // we need to handle it early (before executing truncation) // as timeouts are asynchronous to append calls and can have stall data if (r.batches().is_end_of_stream()) { - if (request_metadata.prev_log_index < last_log_offset) { + if (batch_prev_log_index < last_log_offset) { // do not tuncate on heartbeat just response with false reply.result = reply_result::failure; co_return reply; @@ -2026,7 +2067,7 @@ consensus::do_append_entries(append_entries_request&& r) { co_return reply; } - if (request_metadata.prev_log_index < request_metadata.dirty_offset) { + if (batch_prev_log_index < request_metadata.dirty_offset) { // This is a valid recovery request. In case we haven't allowed it, // defer to the leader and force-enter the recovery state. upsert_recovery_state( @@ -2035,8 +2076,8 @@ consensus::do_append_entries(append_entries_request&& r) { } // section 3 - if (request_metadata.prev_log_index < last_log_offset) { - if (unlikely(request_metadata.prev_log_index < _commit_index)) { + if (batch_prev_log_index < last_log_offset) { + if (unlikely(batch_prev_log_index < _commit_index)) { reply.result = reply_result::success; // clamp dirty offset to the current commit index not to allow // leader reasoning about follower log beyond that point @@ -2051,14 +2092,14 @@ consensus::do_append_entries(append_entries_request&& r) { co_return reply; } auto truncate_at = model::next_offset( - model::offset(request_metadata.prev_log_index)); + model::offset(batch_prev_log_index)); vlog( _ctxlog.info, "Truncating log in term: {}, Request previous log index: {} is " "earlier than log end offset: {}, last visible index: {}, leader " "last visible index: {}. Truncating to: {}", request_metadata.term, - request_metadata.prev_log_index, + batch_prev_log_index, lstats.dirty_offset, last_visible_index(), _last_leader_visible_offset, @@ -2089,12 +2130,12 @@ consensus::do_append_entries(append_entries_request&& r) { update_follower_stats(_configuration_manager.get_latest()); auto lstats = _log->offsets(); - if (unlikely(lstats.dirty_offset != r.metadata().prev_log_index)) { + if (unlikely(lstats.dirty_offset != batch_prev_log_index)) { vlog( _ctxlog.warn, "Log truncation error, expected offset: {}, log offsets: {}, " "requested truncation at {}", - r.metadata().prev_log_index, + batch_prev_log_index, lstats, truncate_at); _flushed_offset = std::min( diff --git a/src/v/raft/types.h b/src/v/raft/types.h index c96160f755534..acf7ec139e638 100644 --- a/src/v/raft/types.h +++ b/src/v/raft/types.h @@ -257,6 +257,7 @@ struct append_entries_request return std::move(_batches); } + model::record_batch_reader& batches() { return _batches; } const model::record_batch_reader& batches() const { return _batches; } static append_entries_request make_foreign(append_entries_request&& req);