Skip to content

Commit

Permalink
r/recovery_stm: deliver an on demand snapshot to learner
Browse files Browse the repository at this point in the history
If learner start offset is set in Raft configuration. Recovery STM is
going to request an on demand snapshot from stm manager (or an empty
snapshot if stm manager is not set) and deliver it to the follower as a
log starting point.

Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Oct 3, 2023
1 parent 65d9655 commit 23d5f58
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 70 deletions.
225 changes: 160 additions & 65 deletions src/v/raft/recovery_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "raft/recovery_stm.h"

#include "bytes/iostream.h"
#include "model/fundamental.h"
#include "model/record_batch_reader.h"
#include "outcome_future_utils.h"
Expand All @@ -18,6 +19,7 @@
#include "raft/logger.h"
#include "raft/raftgen_service.h"
#include "ssx/sformat.h"
#include "storage/snapshot.h"

#include <seastar/core/condition-variable.hh>
#include <seastar/core/coroutine.hh>
Expand Down Expand Up @@ -81,8 +83,10 @@ ss::future<> recovery_stm::do_recover(ss::io_priority_class iopc) {
auto lstats = _ptr->_log->offsets();

// follower last index was already evicted at the leader, use snapshot
if (meta.value()->next_index <= _ptr->_last_snapshot_index) {
co_return co_await install_snapshot();
const required_snapshot_type snapshot_needed = get_required_snapshot_type(
*meta.value());
if (snapshot_needed != required_snapshot_type::none) {
co_return co_await install_snapshot(snapshot_needed);
}

/**
Expand Down Expand Up @@ -205,6 +209,24 @@ recovery_stm::should_flush(model::offset follower_committed_match_index) const {
return flush_after_append(is_last || should_checkpoint_flush);
}

recovery_stm::required_snapshot_type recovery_stm::get_required_snapshot_type(
const follower_index_metadata& follower_metadata) const {
/**
* For on demand snapshot we compare next index with follower start offset
* i.e. next offset of last included in on demand snapshot hence we need to
* use greater than (not greater than or equal) while the other branch is
* comparing next index with last included snapshot offset
*/
if (
follower_metadata.is_learner
&& follower_metadata.next_index < _ptr->get_learner_start_offset()) {
return required_snapshot_type::on_demand;
} else if (follower_metadata.next_index <= _ptr->_last_snapshot_index) {
return required_snapshot_type::current;
}
return required_snapshot_type::none;
}

ss::future<std::optional<model::record_batch_reader>>
recovery_stm::read_range_for_recovery(
model::offset start_offset,
Expand Down Expand Up @@ -287,67 +309,78 @@ recovery_stm::read_range_for_recovery(
}
}

ss::future<> recovery_stm::open_snapshot_reader() {
ss::future<> recovery_stm::open_current_snapshot() {
return _ptr->_snapshot_mgr.open_snapshot().then(
[this](std::optional<storage::snapshot_reader> rdr) {
if (rdr) {
_snapshot_reader = std::make_unique<storage::snapshot_reader>(
_snapshot_reader = std::make_unique<snapshot_reader_t>(
std::move(*rdr));
return _snapshot_reader->get_snapshot_size().then(
[this](size_t sz) { _snapshot_size = sz; });
_last_included_snapshot_index = _ptr->_last_snapshot_index;
return std::get<storage::snapshot_reader>(*_snapshot_reader)
.get_snapshot_size()
.then([this](size_t sz) { _snapshot_size = sz; });
}
return ss::now();
});
}

ss::future<> recovery_stm::send_install_snapshot_request() {
// send 32KB at a time
return read_iobuf_exactly(_snapshot_reader->input(), 32_KiB)
.then([this](iobuf chunk) mutable {
auto chunk_size = chunk.size_bytes();
install_snapshot_request req{
.target_node_id = _node_id,
.term = _ptr->term(),
.group = _ptr->group(),
.node_id = _ptr->_self,
.last_included_index = _ptr->_last_snapshot_index,
.file_offset = _sent_snapshot_bytes,
.chunk = std::move(chunk),
.done = (_sent_snapshot_bytes + chunk_size) == _snapshot_size,
.dirty_offset = _ptr->dirty_offset()};

_sent_snapshot_bytes += chunk_size;
if (req.done) {
auto meta = get_follower_meta();
if (!meta) {
// stop recovery when node was removed
_stop_requested = true;
return ss::make_ready_future<>();
}
(*meta)->expected_log_end_offset = _ptr->_last_snapshot_index;
}
vlog(_ctxlog.trace, "sending install_snapshot request: {}", req);
auto hb_guard = _ptr->suppress_heartbeats(_node_id);
return _ptr->_client_protocol
.install_snapshot(
_node_id.id(),
std::move(req),
rpc::client_opts(append_entries_timeout()))
.then([this](result<install_snapshot_reply> reply) {
return handle_install_snapshot_reply(
_ptr->validate_reply_target_node(
"install_snapshot", reply, _node_id.id()));
})
.finally([hb_guard = std::move(hb_guard)] {});
});
/**
* If follower is being sent current raft snapshot its content may change.
* In this case the read_snapshot_chunk will thrown and will force recovery
* to stop. New snapshot will be delivered with new recovery round.
*/
return read_snapshot_chunk().then([this](iobuf chunk) mutable {
auto chunk_size = chunk.size_bytes();
install_snapshot_request req{
.target_node_id = _node_id,
.term = _ptr->term(),
.group = _ptr->group(),
.node_id = _ptr->_self,
.last_included_index = _last_included_snapshot_index,
.file_offset = _sent_snapshot_bytes,
.chunk = std::move(chunk),
.done = (_sent_snapshot_bytes + chunk_size) == _snapshot_size,
.dirty_offset = _ptr->dirty_offset()};

_sent_snapshot_bytes += chunk_size;
if (req.done) {
auto meta = get_follower_meta();
if (!meta) {
// stop recovery when node was removed
_stop_requested = true;
return ss::make_ready_future<>();
}
(*meta)->expected_log_end_offset = _last_included_snapshot_index;
}
vlog(_ctxlog.trace, "sending install_snapshot request: {}", req);
auto hb_guard = _ptr->suppress_heartbeats(_node_id);
return _ptr->_client_protocol
.install_snapshot(
_node_id.id(),
std::move(req),
rpc::client_opts(append_entries_timeout()))
.then([this](result<install_snapshot_reply> reply) {
return handle_install_snapshot_reply(
_ptr->validate_reply_target_node(
"install_snapshot", reply, _node_id.id()));
})
.finally([hb_guard = std::move(hb_guard)] {});
});
}
ss::future<iobuf> recovery_stm::read_snapshot_chunk() {
return ss::visit(*_snapshot_reader, [](auto& rdr) {
return read_iobuf_exactly(rdr.input(), 32_KiB);
});
}

ss::future<> recovery_stm::close_snapshot_reader() {
return _snapshot_reader->close().then([this] {
_snapshot_reader.reset();
_snapshot_size = 0;
_sent_snapshot_bytes = 0;
});
return ss::visit(*_snapshot_reader, [](auto& rdr) { return rdr.close(); })
.then([this] {
_snapshot_reader.reset();
_snapshot_size = 0;
_sent_snapshot_bytes = 0;
});
}

ss::future<> recovery_stm::handle_install_snapshot_reply(
Expand Down Expand Up @@ -379,25 +412,86 @@ ss::future<> recovery_stm::handle_install_snapshot_reply(
}

// snapshot received by the follower, continue with recovery
(*meta)->match_index = _ptr->_last_snapshot_index;
(*meta)->next_index = model::next_offset(_ptr->_last_snapshot_index);
(*meta)->match_index = _last_included_snapshot_index;
(*meta)->next_index = model::next_offset(_last_included_snapshot_index);
return close_snapshot_reader();
}

ss::future<> recovery_stm::install_snapshot() {
ss::future<> recovery_stm::install_snapshot(required_snapshot_type s_type) {
const auto learner_start_offset = _ptr->get_learner_start_offset();
// open reader if not yet available
auto f = _snapshot_reader != nullptr ? ss::now() : open_snapshot_reader();

return f.then([this]() mutable {
// we are outside of raft operation lock if snapshot isn't yet ready we
// have to wait for it till next recovery loop
if (!_snapshot_reader) {
_stop_requested = true;
return ss::now();
if (!_snapshot_reader) {
if (
s_type == required_snapshot_type::on_demand
&& learner_start_offset > _ptr->start_offset()) {
co_await take_on_demand_snapshot(
model::prev_offset(*learner_start_offset));
} else {
co_await open_current_snapshot();
}
}

return send_install_snapshot_request();
});
// we are outside of raft operation lock if snapshot isn't yet ready we
// have to wait for it till next recovery loop
if (!_snapshot_reader) {
_stop_requested = true;
co_return;
}
co_return co_await send_install_snapshot_request();
}

ss::future<>
recovery_stm::take_on_demand_snapshot(model::offset last_included_offset) {
vlog(
_ctxlog.debug,
"creating on demand snapshot with last included offset: {}",
last_included_offset);

_last_included_snapshot_index = last_included_offset;
// if there is no stm_manager available for the raft group use empty
// snapshot
iobuf snapshot_data;

if (_ptr->stm_manager()) {
snapshot_data = co_await _ptr->stm_manager()->take_snapshot(
last_included_offset);
}
auto cfg = _ptr->_configuration_manager.get(last_included_offset);
const auto term = _ptr->log()->get_term(last_included_offset);

if (!cfg || !term) {
vlog(
_ctxlog.info,
"Configuration or term for on demand snapshot offset {} is not "
"available, stopping recovery",
last_included_offset);
_stop_requested = true;
co_return;
}

iobuf snapshot;
// using snapshot writer to populate all relevant snapshot metadata i.e.
// header and crc
storage::snapshot_writer writer(make_iobuf_ref_output_stream(snapshot));

snapshot_metadata metadata{
.last_included_index = last_included_offset,
.last_included_term = *term,
.latest_configuration = std::move(*cfg),
.log_start_delta = offset_translator_delta(
_ptr->_offset_translator.state()->delta(last_included_offset)),
};

co_await writer.write_metadata(reflection::to_iobuf(std::move(metadata)));
co_await write_iobuf_to_output_stream(
std::move(snapshot_data), writer.output());
co_await writer.close();

_snapshot_size = snapshot.size_bytes();
_snapshot_reader = std::make_unique<snapshot_reader_t>(
on_demand_snapshot_reader{
.stream = make_iobuf_input_stream(std::move(snapshot)),
});
}

ss::future<> recovery_stm::replicate(
Expand All @@ -418,8 +512,9 @@ ss::future<> recovery_stm::replicate(
} else if (prev_log_idx == _ptr->_last_snapshot_index) {
prev_log_term = _ptr->_last_snapshot_term;
} else {
// no entry for prev_log_idx, fallback to install snapshot
return install_snapshot();
// no entry for prev_log_idx, will fallback to install snapshot on next
// iteration
return ss::now();
}

// calculate commit index for follower to update immediately
Expand Down
42 changes: 37 additions & 5 deletions src/v/raft/recovery_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,34 @@ class recovery_stm {
ss::future<> apply();

private:
/**
* Indicates if a snapshot is required for follower to recover, the recovery
* process will either not require a snapshot, create an on demand snapshot
* or will use a currently available snapshot.
*/
enum class required_snapshot_type {
none,
current,
on_demand,
};
/**
* Holder for on demand snapshot output stream. Right now Redpanda uses
* simple empty snapshots for all data partition STMs hence there is no much
* overhead in creating a separate snapshot for each follower even if it has
* the same content.
*
* In future it may be worth to cache on demand snapshots if more than one
* learner is being recovered.
*/
struct on_demand_snapshot_reader {
ss::input_stream<char>& input() { return stream; }
ss::future<> close() { return stream.close(); }

ss::input_stream<char> stream;
};
// variant encapsulating two different reader types
using snapshot_reader_t
= std::variant<storage::snapshot_reader, on_demand_snapshot_reader>;
ss::future<> recover();
ss::future<> do_recover(ss::io_priority_class);
ss::future<std::optional<model::record_batch_reader>>
Expand All @@ -40,24 +68,28 @@ class recovery_stm {
std::optional<follower_index_metadata*> get_follower_meta();
clock_type::time_point append_entries_timeout();

ss::future<> install_snapshot();
ss::future<> install_snapshot(required_snapshot_type);
ss::future<> send_install_snapshot_request();
ss::future<> handle_install_snapshot_reply(result<install_snapshot_reply>);
ss::future<> open_snapshot_reader();
ss::future<> open_current_snapshot();
ss::future<> take_on_demand_snapshot(model::offset);
ss::future<iobuf> read_snapshot_chunk();
ss::future<> close_snapshot_reader();

required_snapshot_type get_required_snapshot_type(
const follower_index_metadata& follower_metadata) const;
bool is_recovery_finished();
flush_after_append should_flush(model::offset) const;
consensus* _ptr;
vnode _node_id;
model::offset _base_batch_offset;
model::offset _last_batch_offset;
model::offset _committed_offset;
model::offset _last_included_snapshot_index;
model::term_id _term;
scheduling_config _scheduling;
prefix_logger _ctxlog;
// tracking follower snapshot delivery
std::unique_ptr<storage::snapshot_reader> _snapshot_reader;

std::unique_ptr<snapshot_reader_t> _snapshot_reader;
size_t _sent_snapshot_bytes = 0;
size_t _snapshot_size = 0;
// needed to early exit. (node down)
Expand Down

0 comments on commit 23d5f58

Please sign in to comment.