diff --git a/src/v/model/record_batch_reader.h b/src/v/model/record_batch_reader.h index 847ff2ab0bc4a..225b1a65c8ca4 100644 --- a/src/v/model/record_batch_reader.h +++ b/src/v/model/record_batch_reader.h @@ -98,6 +98,12 @@ class record_batch_reader final { return do_consume(consumer, timeout); }); } + template + auto peek_each_ref(ReferenceConsumer c, timeout_clock::time_point tm) { + return ss::do_with(std::move(c), [this, tm](ReferenceConsumer& c) { + return do_peek_each_ref(c, tm); + }); + } private: record_batch pop_batch() { @@ -142,6 +148,31 @@ class record_batch_reader final { return c(pop_batch()); }); } + template + auto do_peek_each_ref( + ReferenceConsumer& refc, timeout_clock::time_point timeout) { + return do_action(refc, timeout, [this](ReferenceConsumer& c) { + return ss::visit( + _slice, + [&c](data_t& d) { + return c(d.front()).then([&](ss::stop_iteration stop) { + if (!stop) { + d.pop_front(); + } + return stop; + }); + }, + [&c](foreign_data_t& d) { + return c((*d.buffer)[d.index]) + .then([&](ss::stop_iteration stop) { + if (!stop) { + ++d.index; + } + return stop; + }); + }); + }); + } template auto do_action( ConsumerType& consumer, @@ -249,6 +280,16 @@ class record_batch_reader final { }); } + /// Similar to for_each_ref, but advances only if the consumer returns + /// ss::stop_iteration::no. I.e. the batch where the consumer stopped + /// remains available for reading by subsequent consumers. + template + requires ReferenceBatchReaderConsumer + auto peek_each_ref( + ReferenceConsumer consumer, timeout_clock::time_point timeout) & { + return _impl->peek_each_ref(std::move(consumer), timeout); + } + std::unique_ptr release() && { return std::move(_impl); } private: diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 9f56d19945827..635c559095a47 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -1896,7 +1896,7 @@ consensus::do_append_entries(append_entries_request&& r) { _probe->append_request(); if (unlikely(is_request_target_node_invalid("append_entries", r))) { - return ss::make_ready_future(reply); + co_return reply; } // no need to trigger timeout vlog(_ctxlog.trace, "Received append entries request: {}", r); @@ -1904,7 +1904,7 @@ consensus::do_append_entries(append_entries_request&& r) { // raft.pdf: Reply false if term < currentTerm (§5.1) if (request_metadata.term < _term) { reply.result = reply_result::failure; - return ss::make_ready_future(std::move(reply)); + co_return reply; } /** * When the current leader is alive, whenever a follower receives heartbeat, @@ -1923,7 +1923,7 @@ consensus::do_append_entries(append_entries_request&& r) { _voted_for = {}; maybe_update_leader(r.source_node()); - return do_append_entries(std::move(r)); + co_return co_await do_append_entries(std::move(r)); } // raft.pdf:If AppendEntries RPC received from new leader: convert to // follower (§5.2) @@ -1952,7 +1952,7 @@ consensus::do_append_entries(append_entries_request&& r) { request_metadata.dirty_offset > request_metadata.prev_log_index); reply.may_recover = _follower_recovery_state->is_active(); - return ss::make_ready_future(std::move(reply)); + co_return reply; } // section 2 @@ -1985,18 +1985,66 @@ consensus::do_append_entries(append_entries_request&& r) { request_metadata.dirty_offset > request_metadata.prev_log_index); reply.may_recover = _follower_recovery_state->is_active(); - return ss::make_ready_future(std::move(reply)); + co_return reply; } - // special case heartbeat case + model::offset adjusted_prev_log_index = request_metadata.prev_log_index; + if (adjusted_prev_log_index < last_log_offset) { + // The append point is before the end of our log. We need to skip + // over batches that we already have (they will have the matching + // term) to find the true truncation point. This is important for the + // case when we already have _all_ batches locally (possible if e.g. + // the request was delayed/duplicated). In this case we don't want to + // truncate, otherwise we might lose already committed data. + + struct find_mismatch_consumer { + const consensus& parent; + model::offset last_log_offset; + model::offset last_matched; + + ss::future + operator()(const model::record_batch& b) { + model::offset last_batch_offset + = last_matched + + model::offset(b.header().last_offset_delta + 1); + if ( + last_batch_offset > last_log_offset + || parent.get_term(last_batch_offset) != b.term()) { + co_return ss::stop_iteration::yes; + } + last_matched = last_batch_offset; + co_return ss::stop_iteration::no; + } + + model::offset end_of_stream() { return last_matched; } + }; + + model::offset last_matched = co_await r.batches().peek_each_ref( + find_mismatch_consumer{ + .parent = *this, + .last_log_offset = last_log_offset, + .last_matched = adjusted_prev_log_index}, + model::no_timeout); // no_timeout as the batches are already in memory + if (last_matched != adjusted_prev_log_index) { + vlog( + _ctxlog.info, + "skipped matching records in append_entries batch from {} to {}, " + "current state: {}", + adjusted_prev_log_index, + last_matched, + meta()); + adjusted_prev_log_index = last_matched; + } + } + + // special case for heartbeats and batches without new records. // we need to handle it early (before executing truncation) // as timeouts are asynchronous to append calls and can have stall data if (r.batches().is_end_of_stream()) { - if (request_metadata.prev_log_index < last_log_offset) { + if (adjusted_prev_log_index < last_log_offset) { // do not tuncate on heartbeat just response with false reply.result = reply_result::failure; - return ss::make_ready_future( - std::move(reply)); + co_return reply; } auto f = ss::now(); if (r.is_flush_required() && lstats.dirty_offset > _flushed_offset) { @@ -2019,16 +2067,15 @@ consensus::do_append_entries(append_entries_request&& r) { _follower_recovery_state.reset(); } - return f.then([this, reply, request_metadata]() mutable { - maybe_update_follower_commit_idx( - model::offset(request_metadata.commit_index)); - reply.last_flushed_log_index = _flushed_offset; - reply.result = reply_result::success; - return reply; - }); + co_await std::move(f); + maybe_update_follower_commit_idx( + model::offset(request_metadata.commit_index)); + reply.last_flushed_log_index = _flushed_offset; + reply.result = reply_result::success; + co_return reply; } - if (request_metadata.prev_log_index < request_metadata.dirty_offset) { + if (adjusted_prev_log_index < request_metadata.dirty_offset) { // This is a valid recovery request. In case we haven't allowed it, // defer to the leader and force-enter the recovery state. upsert_recovery_state( @@ -2037,8 +2084,8 @@ consensus::do_append_entries(append_entries_request&& r) { } // section 3 - if (request_metadata.prev_log_index < last_log_offset) { - if (unlikely(request_metadata.prev_log_index < _commit_index)) { + if (adjusted_prev_log_index < last_log_offset) { + if (unlikely(adjusted_prev_log_index < _commit_index)) { reply.result = reply_result::success; // clamp dirty offset to the current commit index not to allow // leader reasoning about follower log beyond that point @@ -2050,18 +2097,17 @@ consensus::do_append_entries(append_entries_request&& r) { "present, request: {}, current state: {}", request_metadata, meta()); - return ss::make_ready_future( - std::move(reply)); + co_return reply; } auto truncate_at = model::next_offset( - model::offset(request_metadata.prev_log_index)); + model::offset(adjusted_prev_log_index)); vlog( _ctxlog.info, "Truncating log in term: {}, Request previous log index: {} is " "earlier than log end offset: {}, last visible index: {}, leader " "last visible index: {}. Truncating to: {}", request_metadata.term, - request_metadata.prev_log_index, + adjusted_prev_log_index, lstats.dirty_offset, last_visible_index(), _last_leader_visible_offset, @@ -2077,95 +2123,99 @@ consensus::do_append_entries(append_entries_request&& r) { // flushed entries _flushed_offset = std::min( model::prev_offset(truncate_at), _flushed_offset); - return _log - ->truncate( - storage::truncate_config(truncate_at, _scheduling.default_iopc)) - .then([this, truncate_at] { - // update flushed offset once again after truncation as flush is - // executed concurrently to append entries and it may race with - // the truncation - _flushed_offset = std::min( - model::prev_offset(truncate_at), _flushed_offset); - - return _configuration_manager.truncate(truncate_at).then([this] { - _probe->configuration_update(); - update_follower_stats(_configuration_manager.get_latest()); - }); - }) - .then([this, r = std::move(r), truncate_at]() mutable { - auto lstats = _log->offsets(); - if (unlikely( - lstats.dirty_offset != r.metadata().prev_log_index)) { - vlog( - _ctxlog.warn, - "Log truncation error, expected offset: {}, log " - "offsets: " - "{}, requested truncation at {}", - r.metadata().prev_log_index, - lstats, - truncate_at); - _flushed_offset = std::min( - model::prev_offset(lstats.dirty_offset), _flushed_offset); - } - return do_append_entries(std::move(r)); - }) - .handle_exception([this, reply](const std::exception_ptr& e) mutable { - vlog(_ctxlog.warn, "Error occurred while truncating log - {}", e); - reply.result = reply_result::failure; - return ss::make_ready_future(reply); - }); + + try { + co_await _log->truncate( + storage::truncate_config(truncate_at, _scheduling.default_iopc)); + // update flushed offset once again after truncation as flush is + // executed concurrently to append entries and it may race with + // the truncation + _flushed_offset = std::min( + model::prev_offset(truncate_at), _flushed_offset); + + co_await _configuration_manager.truncate(truncate_at); + _probe->configuration_update(); + update_follower_stats(_configuration_manager.get_latest()); + + auto lstats = _log->offsets(); + if (unlikely(lstats.dirty_offset != adjusted_prev_log_index)) { + vlog( + _ctxlog.warn, + "Log truncation error, expected offset: {}, log offsets: {}, " + "requested truncation at {}", + adjusted_prev_log_index, + lstats, + truncate_at); + _flushed_offset = std::min( + model::prev_offset(lstats.dirty_offset), _flushed_offset); + } + } catch (...) { + vlog( + _ctxlog.warn, + "Error occurred while truncating log - {}", + std::current_exception()); + reply.result = reply_result::failure; + co_return reply; + } + + co_return co_await do_append_entries(std::move(r)); } // success. copy entries for each subsystem - using offsets_ret = storage::append_result; - return disk_append( - std::move(r).release_batches(), update_last_quorum_index::no) - .then([this, m = request_metadata, target = reply.target_node_id]( - offsets_ret ofs) { - auto last_visible = std::min(ofs.last_offset, m.last_visible_index); - maybe_update_last_visible_index(last_visible); - - _last_leader_visible_offset = std::max( - m.last_visible_index, _last_leader_visible_offset); - _confirmed_term = _term; - - maybe_update_follower_commit_idx(model::offset(m.commit_index)); - - if (_follower_recovery_state) { - _follower_recovery_state->update_progress( - ofs.last_offset, std::max(m.dirty_offset, ofs.last_offset)); - - if (m.dirty_offset == m.prev_log_index) { - // Normal (non-recovery, non-heartbeat) append_entries - // request means that recovery is over. - vlog( - _ctxlog.debug, - "exiting follower_recovery_state, leader meta: {} " - "(our offset: {})", - m, - ofs.last_offset); - _follower_recovery_state.reset(); - } - // m.dirty_offset can be bogus here if we are talking to - // a pre-23.3 redpanda. In this case we can't reliably - // distinguish between recovery and normal append_entries - // and will exit recovery only via heartbeats (which is okay - // but can inflate the number of recovering partitions - // statistic a bit). - } - return make_append_entries_reply(target, ofs); - }) - .handle_exception([this, reply](const std::exception_ptr& e) mutable { - vlog( - _ctxlog.warn, "Error occurred while appending log entries - {}", e); - reply.result = reply_result::failure; - return ss::make_ready_future(reply); - }) - .finally([this] { - // we do not want to include our disk flush latency into - // the leader vote timeout - _hbeat = clock_type::now(); - }); + + try { + auto deferred = ss::defer([this] { + // we do not want to include our disk flush latency into + // the leader vote timeout + _hbeat = clock_type::now(); + }); + + storage::append_result ofs = co_await disk_append( + std::move(r).release_batches(), update_last_quorum_index::no); + auto last_visible = std::min( + ofs.last_offset, request_metadata.last_visible_index); + maybe_update_last_visible_index(last_visible); + + _last_leader_visible_offset = std::max( + request_metadata.last_visible_index, _last_leader_visible_offset); + _confirmed_term = _term; + + maybe_update_follower_commit_idx(request_metadata.commit_index); + + if (_follower_recovery_state) { + _follower_recovery_state->update_progress( + ofs.last_offset, + std::max(request_metadata.dirty_offset, ofs.last_offset)); + + if ( + request_metadata.dirty_offset + == request_metadata.prev_log_index) { + // Normal (non-recovery, non-heartbeat) append_entries + // request means that recovery is over. + vlog( + _ctxlog.debug, + "exiting follower_recovery_state, leader meta: {} " + "(our offset: {})", + request_metadata, + ofs.last_offset); + _follower_recovery_state.reset(); + } + // m.dirty_offset can be bogus here if we are talking to + // a pre-23.3 redpanda. In this case we can't reliably + // distinguish between recovery and normal append_entries + // and will exit recovery only via heartbeats (which is okay + // but can inflate the number of recovering partitions + // statistic a bit). + } + co_return make_append_entries_reply(reply.target_node_id, ofs); + } catch (...) { + vlog( + _ctxlog.warn, + "Error occurred while appending log entries - {}", + std::current_exception()); + reply.result = reply_result::failure; + co_return reply; + } } void consensus::maybe_update_leader(vnode request_node) { diff --git a/src/v/raft/consensus_utils.cc b/src/v/raft/consensus_utils.cc index 82d87af0e78ab..a9482e30e8c8f 100644 --- a/src/v/raft/consensus_utils.cc +++ b/src/v/raft/consensus_utils.cc @@ -230,84 +230,6 @@ group_configuration deserialize_nested_configuration(iobuf_parser& parser) { return reflection::adl{}.from(parser); } -model::record_batch_reader make_config_extracting_reader( - model::offset base_offset, - std::vector& target, - model::record_batch_reader&& source) { - class extracting_reader final : public model::record_batch_reader::impl { - private: - using storage_t = model::record_batch_reader::storage_t; - using data_t = model::record_batch_reader::data_t; - using foreign_t = model::record_batch_reader::foreign_data_t; - - public: - explicit extracting_reader( - model::offset o, - std::vector& target, - std::unique_ptr src) - : _next_offset( - o < model::offset(0) ? model::offset(0) : o + model::offset(1)) - , _configurations(target) - , _ptr(std::move(src)) {} - extracting_reader(const extracting_reader&) = delete; - extracting_reader& operator=(const extracting_reader&) = delete; - extracting_reader(extracting_reader&&) = delete; - extracting_reader& operator=(extracting_reader&&) = delete; - ~extracting_reader() override = default; - - bool is_end_of_stream() const final { - // ok to copy a bool - return _ptr->is_end_of_stream(); - } - - void print(std::ostream& os) final { - fmt::print(os, "configuration extracting reader, proxy for "); - _ptr->print(os); - } - - data_t& get_batches(storage_t& st) { - if (std::holds_alternative(st)) { - return std::get(st); - } else { - return *std::get(st).buffer; - } - } - - ss::future - do_load_slice(model::timeout_clock::time_point t) final { - return _ptr->do_load_slice(t).then([this](storage_t recs) { - for (auto& batch : get_batches(recs)) { - if ( - batch.header().type - == model::record_batch_type::raft_configuration) { - extract_configuration(batch); - } - // calculate next offset - _next_offset += model::offset( - batch.header().last_offset_delta) - + model::offset(1); - } - return recs; - }); - } - - void extract_configuration(model::record_batch& batch) { - iobuf_parser parser(batch.copy_records().begin()->release_value()); - _configurations.emplace_back( - _next_offset, deserialize_configuration(parser)); - } - - private: - model::offset _next_offset; - std::vector& _configurations; - std::unique_ptr _ptr; - }; - auto reader = std::make_unique( - base_offset, target, std::move(source).release()); - - return model::record_batch_reader(std::move(reader)); -} - bytes serialize_group_key(raft::group_id group, metadata_key key_type) { iobuf buf; reflection::serialize(buf, key_type, group); diff --git a/src/v/raft/consensus_utils.h b/src/v/raft/consensus_utils.h index 64756c5cb0329..5f60ac7d7b2b7 100644 --- a/src/v/raft/consensus_utils.h +++ b/src/v/raft/consensus_utils.h @@ -129,17 +129,6 @@ class do_for_each_batch_consumer { Func _f; }; -/** - * Extracts all configurations from underlying reader. Configuration are stored - * in a vector passed as a reference to reader. The reader can will - * automatically assing offsets to following batches using provided base offset - * as a staring point - */ -model::record_batch_reader make_config_extracting_reader( - model::offset, - std::vector&, - model::record_batch_reader&&); - /** * Function that allow consuming batches with given consumer while lazily * extracting raft::group_configuration from the reader. @@ -152,20 +141,43 @@ auto for_each_ref_extract_configuration( model::record_batch_reader&& rdr, ReferenceConsumer c, model::timeout_clock::time_point tm) { - using conf_t = std::vector; - - return ss::do_with( - conf_t{}, - [tm, c = std::move(c), base_offset, rdr = std::move(rdr)]( - conf_t& configurations) mutable { - return make_config_extracting_reader( - base_offset, configurations, std::move(rdr)) - .for_each_ref(std::move(c), tm) - .then([&configurations](auto res) { - return std::make_tuple( - std::move(res), std::move(configurations)); - }); - }); + struct extracting_consumer { + ss::future operator()(model::record_batch& batch) { + if ( + batch.header().type + == model::record_batch_type::raft_configuration) { + iobuf_parser parser( + batch.copy_records().begin()->release_value()); + configurations.emplace_back( + next_offset, deserialize_configuration(parser)); + } + + // we have to calculate offsets manually because the batch may not + // yet have the base offset assigned. + next_offset += model::offset(batch.header().last_offset_delta) + + model::offset(1); + + return wrapped(batch); + } + + auto end_of_stream() { + return ss::futurize_invoke( + [this] { return wrapped.end_of_stream(); }) + .then([confs = std::move(configurations)](auto ret) mutable { + return std::make_tuple(std::move(ret), std::move(confs)); + }); + } + + ReferenceConsumer wrapped; + model::offset next_offset; + std::vector configurations; + }; + + return std::move(rdr).for_each_ref( + extracting_consumer{ + .wrapped = std::move(c), + .next_offset = model::next_offset(base_offset)}, + tm); } bytes serialize_group_key(raft::group_id, metadata_key); diff --git a/src/v/raft/tests/foreign_entry_test.cc b/src/v/raft/tests/foreign_entry_test.cc index aefbbaf94f4db..5532df4856927 100644 --- a/src/v/raft/tests/foreign_entry_test.cc +++ b/src/v/raft/tests/foreign_entry_test.cc @@ -155,18 +155,17 @@ struct foreign_entry_fixture { ss::future extract_configuration(model::record_batch_reader&& rdr) { - using cfgs_t = std::vector; - return ss::do_with(cfgs_t{}, [rdr = std::move(rdr)](cfgs_t& cfgs) mutable { - auto wrapping_rdr = raft::details::make_config_extracting_reader( - model::offset(0), cfgs, std::move(rdr)); - - return model::consume_reader_to_memory( - std::move(wrapping_rdr), model::no_timeout) - .then([&cfgs](ss::circular_buffer) { - BOOST_REQUIRE(!cfgs.empty()); - return cfgs.begin()->cfg; - }); - }); + struct noop_consumer { + ss::future operator()(model::record_batch&) { + co_return ss::stop_iteration::no; + } + int end_of_stream() { return 0; } + }; + + auto [_, cfgs] = co_await raft::details::for_each_ref_extract_configuration( + model::offset(0), std::move(rdr), noop_consumer{}, model::no_timeout); + BOOST_REQUIRE(!cfgs.empty()); + co_return cfgs.begin()->cfg; } FIXTURE_TEST(sharing_one_reader, foreign_entry_fixture) { diff --git a/src/v/raft/types.h b/src/v/raft/types.h index c96160f755534..acf7ec139e638 100644 --- a/src/v/raft/types.h +++ b/src/v/raft/types.h @@ -257,6 +257,7 @@ struct append_entries_request return std::move(_batches); } + model::record_batch_reader& batches() { return _batches; } const model::record_batch_reader& batches() const { return _batches; } static append_entries_request make_foreign(append_entries_request&& req);