Skip to content

Commit

Permalink
r/state_machine: wait for the last snapshot to be applied
Browse files Browse the repository at this point in the history
When state machine is applying the snapshot it may be the case that
another install snapshot request is received by underlying Raft protocol
instance. In this case the STM should immediately proceed to applying
the new snapshot instead of trying to read log from an offset which is
lower than the new log start offset.

Signed-off-by: Michal Maslanka <michal@redpanda.com>
(cherry picked from commit 78d2b21)
  • Loading branch information
mmaslankaprv authored and Michal Maslanka committed Jan 22, 2024
1 parent 1b92494 commit c0b16a7
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 6 deletions.
28 changes: 22 additions & 6 deletions src/v/raft/state_machine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@

#include "raft/state_machine.h"

#include "likely.h"
#include "model/fundamental.h"
#include "model/record_batch_reader.h"
#include "raft/consensus.h"
#include "ssx/future-util.h"
#include "storage/log.h"
#include "storage/record_batch_builder.h"

#include <exception>

namespace raft {

state_machine::state_machine(
Expand Down Expand Up @@ -124,17 +128,29 @@ ss::future<result<replicate_result>> state_machine::quorum_write_empty_batch(
});
});
}

ss::future<> state_machine::maybe_apply_raft_snapshot() {
// a loop here is required as install snapshot request may be processed by
// Raft while handling the other snapshot. In this case a new snapshot
// should be applied to the STM.

while (_next < _raft->start_offset()) {
try {
co_await handle_raft_snapshot();
} catch (...) {
const auto& e = std::current_exception();
if (!ssx::is_shutdown_exception(e)) {
vlog(_log.error, "Error applying Raft snapshot - {}", e);
}
std::rethrow_exception(e);
}
}
}
ss::future<> state_machine::apply() {
// wait until consensus commit index is >= _next
return _raft->events()
.wait(_next, model::no_timeout, _as)
.then([this] {
auto f = ss::now();
if (_next < _raft->start_offset()) {
f = handle_raft_snapshot();
}
return f.then([this] {
return maybe_apply_raft_snapshot().then([this] {
/**
* Raft make_reader method allows callers reading up to
* last_visible index. In order to make the STMs safe and working
Expand Down
1 change: 1 addition & 0 deletions src/v/raft/state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class state_machine {
friend batch_applicator;

ss::future<> apply();
ss::future<> maybe_apply_raft_snapshot();
bool stop_batch_applicator();

ss::io_priority_class _io_prio;
Expand Down

0 comments on commit c0b16a7

Please sign in to comment.