From 23d5f583cfa9af8e111bf79886e02478a25bea96 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Fri, 29 Sep 2023 12:30:36 +0200 Subject: [PATCH] r/recovery_stm: deliver an on demand snapshot to learner If learner start offset is set in Raft configuration. Recovery STM is going to request an on demand snapshot from stm manager (or an empty snapshot if stm manager is not set) and deliver it to the follower as a log starting point. Signed-off-by: Michal Maslanka --- src/v/raft/recovery_stm.cc | 225 ++++++++++++++++++++++++++----------- src/v/raft/recovery_stm.h | 42 ++++++- 2 files changed, 197 insertions(+), 70 deletions(-) diff --git a/src/v/raft/recovery_stm.cc b/src/v/raft/recovery_stm.cc index b2212a70e83c4..e82420c4c7d00 100644 --- a/src/v/raft/recovery_stm.cc +++ b/src/v/raft/recovery_stm.cc @@ -9,6 +9,7 @@ #include "raft/recovery_stm.h" +#include "bytes/iostream.h" #include "model/fundamental.h" #include "model/record_batch_reader.h" #include "outcome_future_utils.h" @@ -18,6 +19,7 @@ #include "raft/logger.h" #include "raft/raftgen_service.h" #include "ssx/sformat.h" +#include "storage/snapshot.h" #include #include @@ -81,8 +83,10 @@ ss::future<> recovery_stm::do_recover(ss::io_priority_class iopc) { auto lstats = _ptr->_log->offsets(); // follower last index was already evicted at the leader, use snapshot - if (meta.value()->next_index <= _ptr->_last_snapshot_index) { - co_return co_await install_snapshot(); + const required_snapshot_type snapshot_needed = get_required_snapshot_type( + *meta.value()); + if (snapshot_needed != required_snapshot_type::none) { + co_return co_await install_snapshot(snapshot_needed); } /** @@ -205,6 +209,24 @@ recovery_stm::should_flush(model::offset follower_committed_match_index) const { return flush_after_append(is_last || should_checkpoint_flush); } +recovery_stm::required_snapshot_type recovery_stm::get_required_snapshot_type( + const follower_index_metadata& follower_metadata) const { + /** + * For on demand snapshot we compare next index with follower start offset + * i.e. next offset of last included in on demand snapshot hence we need to + * use greater than (not greater than or equal) while the other branch is + * comparing next index with last included snapshot offset + */ + if ( + follower_metadata.is_learner + && follower_metadata.next_index < _ptr->get_learner_start_offset()) { + return required_snapshot_type::on_demand; + } else if (follower_metadata.next_index <= _ptr->_last_snapshot_index) { + return required_snapshot_type::current; + } + return required_snapshot_type::none; +} + ss::future> recovery_stm::read_range_for_recovery( model::offset start_offset, @@ -287,67 +309,78 @@ recovery_stm::read_range_for_recovery( } } -ss::future<> recovery_stm::open_snapshot_reader() { +ss::future<> recovery_stm::open_current_snapshot() { return _ptr->_snapshot_mgr.open_snapshot().then( [this](std::optional rdr) { if (rdr) { - _snapshot_reader = std::make_unique( + _snapshot_reader = std::make_unique( std::move(*rdr)); - return _snapshot_reader->get_snapshot_size().then( - [this](size_t sz) { _snapshot_size = sz; }); + _last_included_snapshot_index = _ptr->_last_snapshot_index; + return std::get(*_snapshot_reader) + .get_snapshot_size() + .then([this](size_t sz) { _snapshot_size = sz; }); } return ss::now(); }); } ss::future<> recovery_stm::send_install_snapshot_request() { - // send 32KB at a time - return read_iobuf_exactly(_snapshot_reader->input(), 32_KiB) - .then([this](iobuf chunk) mutable { - auto chunk_size = chunk.size_bytes(); - install_snapshot_request req{ - .target_node_id = _node_id, - .term = _ptr->term(), - .group = _ptr->group(), - .node_id = _ptr->_self, - .last_included_index = _ptr->_last_snapshot_index, - .file_offset = _sent_snapshot_bytes, - .chunk = std::move(chunk), - .done = (_sent_snapshot_bytes + chunk_size) == _snapshot_size, - .dirty_offset = _ptr->dirty_offset()}; - - _sent_snapshot_bytes += chunk_size; - if (req.done) { - auto meta = get_follower_meta(); - if (!meta) { - // stop recovery when node was removed - _stop_requested = true; - return ss::make_ready_future<>(); - } - (*meta)->expected_log_end_offset = _ptr->_last_snapshot_index; - } - vlog(_ctxlog.trace, "sending install_snapshot request: {}", req); - auto hb_guard = _ptr->suppress_heartbeats(_node_id); - return _ptr->_client_protocol - .install_snapshot( - _node_id.id(), - std::move(req), - rpc::client_opts(append_entries_timeout())) - .then([this](result reply) { - return handle_install_snapshot_reply( - _ptr->validate_reply_target_node( - "install_snapshot", reply, _node_id.id())); - }) - .finally([hb_guard = std::move(hb_guard)] {}); - }); + /** + * If follower is being sent current raft snapshot its content may change. + * In this case the read_snapshot_chunk will thrown and will force recovery + * to stop. New snapshot will be delivered with new recovery round. + */ + return read_snapshot_chunk().then([this](iobuf chunk) mutable { + auto chunk_size = chunk.size_bytes(); + install_snapshot_request req{ + .target_node_id = _node_id, + .term = _ptr->term(), + .group = _ptr->group(), + .node_id = _ptr->_self, + .last_included_index = _last_included_snapshot_index, + .file_offset = _sent_snapshot_bytes, + .chunk = std::move(chunk), + .done = (_sent_snapshot_bytes + chunk_size) == _snapshot_size, + .dirty_offset = _ptr->dirty_offset()}; + + _sent_snapshot_bytes += chunk_size; + if (req.done) { + auto meta = get_follower_meta(); + if (!meta) { + // stop recovery when node was removed + _stop_requested = true; + return ss::make_ready_future<>(); + } + (*meta)->expected_log_end_offset = _last_included_snapshot_index; + } + vlog(_ctxlog.trace, "sending install_snapshot request: {}", req); + auto hb_guard = _ptr->suppress_heartbeats(_node_id); + return _ptr->_client_protocol + .install_snapshot( + _node_id.id(), + std::move(req), + rpc::client_opts(append_entries_timeout())) + .then([this](result reply) { + return handle_install_snapshot_reply( + _ptr->validate_reply_target_node( + "install_snapshot", reply, _node_id.id())); + }) + .finally([hb_guard = std::move(hb_guard)] {}); + }); +} +ss::future recovery_stm::read_snapshot_chunk() { + return ss::visit(*_snapshot_reader, [](auto& rdr) { + return read_iobuf_exactly(rdr.input(), 32_KiB); + }); } ss::future<> recovery_stm::close_snapshot_reader() { - return _snapshot_reader->close().then([this] { - _snapshot_reader.reset(); - _snapshot_size = 0; - _sent_snapshot_bytes = 0; - }); + return ss::visit(*_snapshot_reader, [](auto& rdr) { return rdr.close(); }) + .then([this] { + _snapshot_reader.reset(); + _snapshot_size = 0; + _sent_snapshot_bytes = 0; + }); } ss::future<> recovery_stm::handle_install_snapshot_reply( @@ -379,25 +412,86 @@ ss::future<> recovery_stm::handle_install_snapshot_reply( } // snapshot received by the follower, continue with recovery - (*meta)->match_index = _ptr->_last_snapshot_index; - (*meta)->next_index = model::next_offset(_ptr->_last_snapshot_index); + (*meta)->match_index = _last_included_snapshot_index; + (*meta)->next_index = model::next_offset(_last_included_snapshot_index); return close_snapshot_reader(); } -ss::future<> recovery_stm::install_snapshot() { +ss::future<> recovery_stm::install_snapshot(required_snapshot_type s_type) { + const auto learner_start_offset = _ptr->get_learner_start_offset(); // open reader if not yet available - auto f = _snapshot_reader != nullptr ? ss::now() : open_snapshot_reader(); - - return f.then([this]() mutable { - // we are outside of raft operation lock if snapshot isn't yet ready we - // have to wait for it till next recovery loop - if (!_snapshot_reader) { - _stop_requested = true; - return ss::now(); + if (!_snapshot_reader) { + if ( + s_type == required_snapshot_type::on_demand + && learner_start_offset > _ptr->start_offset()) { + co_await take_on_demand_snapshot( + model::prev_offset(*learner_start_offset)); + } else { + co_await open_current_snapshot(); } + } - return send_install_snapshot_request(); - }); + // we are outside of raft operation lock if snapshot isn't yet ready we + // have to wait for it till next recovery loop + if (!_snapshot_reader) { + _stop_requested = true; + co_return; + } + co_return co_await send_install_snapshot_request(); +} + +ss::future<> +recovery_stm::take_on_demand_snapshot(model::offset last_included_offset) { + vlog( + _ctxlog.debug, + "creating on demand snapshot with last included offset: {}", + last_included_offset); + + _last_included_snapshot_index = last_included_offset; + // if there is no stm_manager available for the raft group use empty + // snapshot + iobuf snapshot_data; + + if (_ptr->stm_manager()) { + snapshot_data = co_await _ptr->stm_manager()->take_snapshot( + last_included_offset); + } + auto cfg = _ptr->_configuration_manager.get(last_included_offset); + const auto term = _ptr->log()->get_term(last_included_offset); + + if (!cfg || !term) { + vlog( + _ctxlog.info, + "Configuration or term for on demand snapshot offset {} is not " + "available, stopping recovery", + last_included_offset); + _stop_requested = true; + co_return; + } + + iobuf snapshot; + // using snapshot writer to populate all relevant snapshot metadata i.e. + // header and crc + storage::snapshot_writer writer(make_iobuf_ref_output_stream(snapshot)); + + snapshot_metadata metadata{ + .last_included_index = last_included_offset, + .last_included_term = *term, + .latest_configuration = std::move(*cfg), + .log_start_delta = offset_translator_delta( + _ptr->_offset_translator.state()->delta(last_included_offset)), + }; + + co_await writer.write_metadata(reflection::to_iobuf(std::move(metadata))); + co_await write_iobuf_to_output_stream( + std::move(snapshot_data), writer.output()); + co_await writer.close(); + + _snapshot_size = snapshot.size_bytes(); + _snapshot_reader = std::make_unique( + on_demand_snapshot_reader{ + .stream = make_iobuf_input_stream(std::move(snapshot)), + }); } ss::future<> recovery_stm::replicate( @@ -418,8 +512,9 @@ ss::future<> recovery_stm::replicate( } else if (prev_log_idx == _ptr->_last_snapshot_index) { prev_log_term = _ptr->_last_snapshot_term; } else { - // no entry for prev_log_idx, fallback to install snapshot - return install_snapshot(); + // no entry for prev_log_idx, will fallback to install snapshot on next + // iteration + return ss::now(); } // calculate commit index for follower to update immediately diff --git a/src/v/raft/recovery_stm.h b/src/v/raft/recovery_stm.h index d57a9cd77ca2c..c6e44eb02a0ac 100644 --- a/src/v/raft/recovery_stm.h +++ b/src/v/raft/recovery_stm.h @@ -28,6 +28,34 @@ class recovery_stm { ss::future<> apply(); private: + /** + * Indicates if a snapshot is required for follower to recover, the recovery + * process will either not require a snapshot, create an on demand snapshot + * or will use a currently available snapshot. + */ + enum class required_snapshot_type { + none, + current, + on_demand, + }; + /** + * Holder for on demand snapshot output stream. Right now Redpanda uses + * simple empty snapshots for all data partition STMs hence there is no much + * overhead in creating a separate snapshot for each follower even if it has + * the same content. + * + * In future it may be worth to cache on demand snapshots if more than one + * learner is being recovered. + */ + struct on_demand_snapshot_reader { + ss::input_stream& input() { return stream; } + ss::future<> close() { return stream.close(); } + + ss::input_stream stream; + }; + // variant encapsulating two different reader types + using snapshot_reader_t + = std::variant; ss::future<> recover(); ss::future<> do_recover(ss::io_priority_class); ss::future> @@ -40,12 +68,15 @@ class recovery_stm { std::optional get_follower_meta(); clock_type::time_point append_entries_timeout(); - ss::future<> install_snapshot(); + ss::future<> install_snapshot(required_snapshot_type); ss::future<> send_install_snapshot_request(); ss::future<> handle_install_snapshot_reply(result); - ss::future<> open_snapshot_reader(); + ss::future<> open_current_snapshot(); + ss::future<> take_on_demand_snapshot(model::offset); + ss::future read_snapshot_chunk(); ss::future<> close_snapshot_reader(); - + required_snapshot_type get_required_snapshot_type( + const follower_index_metadata& follower_metadata) const; bool is_recovery_finished(); flush_after_append should_flush(model::offset) const; consensus* _ptr; @@ -53,11 +84,12 @@ class recovery_stm { model::offset _base_batch_offset; model::offset _last_batch_offset; model::offset _committed_offset; + model::offset _last_included_snapshot_index; model::term_id _term; scheduling_config _scheduling; prefix_logger _ctxlog; - // tracking follower snapshot delivery - std::unique_ptr _snapshot_reader; + + std::unique_ptr _snapshot_reader; size_t _sent_snapshot_bytes = 0; size_t _snapshot_size = 0; // needed to early exit. (node down)