Skip to content

Commit

Permalink
Merge pull request #21255 from vbotbuildovich/backport-pr-18576-v23.3…
Browse files Browse the repository at this point in the history
….x-406

[v23.3.x] stm_manager: tighten interaction between bg application and snapshots
  • Loading branch information
piyushredpanda authored Jul 5, 2024
2 parents 54af94f + f2f9302 commit 002d8cd
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 3 deletions.
10 changes: 8 additions & 2 deletions src/v/raft/state_machine_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,10 @@ ss::future<> state_machine_manager::apply_raft_snapshot() {
}

auto fut = co_await ss::coroutine::as_future(
do_apply_raft_snapshot(std::move(snapshot->metadata), snapshot->reader));
acquire_background_apply_mutexes().then([&, this](auto units) mutable {
return do_apply_raft_snapshot(
std::move(snapshot->metadata), snapshot->reader, std::move(units));
}));
co_await snapshot->reader.close();
if (fut.failed()) {
const auto e = fut.get_exception();
Expand All @@ -228,7 +231,9 @@ ss::future<> state_machine_manager::apply_raft_snapshot() {
}

ss::future<> state_machine_manager::do_apply_raft_snapshot(
snapshot_metadata metadata, storage::snapshot_reader& reader) {
snapshot_metadata metadata,
storage::snapshot_reader& reader,
std::vector<ssx::semaphore_units> background_apply_units) {
const auto snapshot_file_sz = co_await reader.get_snapshot_size();
const auto last_offset = metadata.last_included_index;

Expand Down Expand Up @@ -276,6 +281,7 @@ ss::future<> state_machine_manager::do_apply_raft_snapshot(
});
}
_next = model::next_offset(metadata.last_included_index);
background_apply_units.clear();
}

ss::future<> state_machine_manager::apply_snapshot_to_stm(
Expand Down
4 changes: 3 additions & 1 deletion src/v/raft/state_machine_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ class state_machine_manager final {

ss::future<> apply_raft_snapshot();
ss::future<> do_apply_raft_snapshot(
raft::snapshot_metadata metadata, storage::snapshot_reader& reader);
raft::snapshot_metadata metadata,
storage::snapshot_reader& reader,
std::vector<ssx::semaphore_units> background_apply_units);
ss::future<> apply();
ss::future<> try_apply_in_foreground();

Expand Down
75 changes: 75 additions & 0 deletions src/v/raft/tests/stm_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,47 @@ struct local_snapshot_stm : public simple_kv {
};
};

// State machine that induces lag from the tip of
// of the log
class slow_kv : public simple_kv {
public:
static constexpr std::string_view name = "slow_kv";

explicit slow_kv(raft_node_instance& rn)
: simple_kv(rn) {}

ss::future<> apply(const model::record_batch& batch) override {
co_await ss::sleep(5ms);
co_return co_await simple_kv::apply(batch);
}

ss::future<> apply_raft_snapshot(const iobuf&) override {
return ss::now();
}
};

// Fails the first apply, starts a background fiber and not lets the
// background apply fiber finish relative to slow_kv
class bg_only_kv : public slow_kv {
public:
static constexpr std::string_view name = "bg_only_stm";

explicit bg_only_kv(raft_node_instance& rn)
: slow_kv(rn) {}

ss::future<> apply(const model::record_batch& batch) override {
if (_first_apply) {
_first_apply = false;
throw std::runtime_error("induced failure");
}
co_await ss::sleep(5ms);
co_return co_await slow_kv::apply(batch);
}

private:
bool _first_apply = true;
};

TEST_F_CORO(state_machine_fixture, test_basic_apply) {
/**
* Create 3 replicas group with simple_kv STM
Expand All @@ -100,6 +141,40 @@ TEST_F_CORO(state_machine_fixture, test_basic_apply) {
}
}

TEST_F_CORO(state_machine_fixture, test_snapshot_with_bg_fibers) {
create_nodes();
std::vector<ss::shared_ptr<simple_kv>> stms;
for (auto& [id, node] : nodes()) {
raft::state_machine_manager_builder builder;
auto slow_kv_stm = builder.create_stm<slow_kv>(*node);
auto bg_kv_stm = builder.create_stm<bg_only_kv>(*node);
co_await node->init_and_start(all_vnodes(), std::move(builder));
stms.push_back(ss::dynamic_pointer_cast<simple_kv>(slow_kv_stm));
stms.push_back(ss::dynamic_pointer_cast<simple_kv>(bg_kv_stm));
}
auto& leader_node = node(co_await wait_for_leader(10s));
bool stop = false;
auto write_sleep_f = ss::do_until(
[&stop] { return stop; },
[&] {
return build_random_state(1000).discard_result().then(
[] { return ss::sleep(3ms); });
});

auto truncate_sleep_f = ss::do_until(
[&stop] { return stop; },
[&] {
return leader_node.raft()
->write_snapshot({leader_node.raft()->committed_offset(), iobuf{}})
.then([] { return ss::sleep(3ms); });
});

co_await ss::sleep(10s);
stop = true;
co_await std::move(write_sleep_f);
co_await std::move(truncate_sleep_f);
}

TEST_F_CORO(state_machine_fixture, test_apply_throwing_exception) {
/**
* Create 3 replicas group with simple_kv STM
Expand Down

0 comments on commit 002d8cd

Please sign in to comment.