From f1ac0e382edc8aba10c84b707bdcc049c37f9d36 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Wed, 26 Jun 2024 15:23:55 -0700 Subject: [PATCH 1/2] stm_manager: tighten interaction between bg application and snapshots wait for background applicators to finish before applying raft snapshots, else there is a danger of next_to_apply offset moving backwards with interleaved bg_apply and snapshot fibers. (cherry picked from commit c8290e598e8ef527dda8dbe5ee7195c07bcede04) --- src/v/raft/state_machine_manager.cc | 10 ++++++++-- src/v/raft/state_machine_manager.h | 4 +++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/v/raft/state_machine_manager.cc b/src/v/raft/state_machine_manager.cc index dee348909973..6ccd43c363ef 100644 --- a/src/v/raft/state_machine_manager.cc +++ b/src/v/raft/state_machine_manager.cc @@ -215,7 +215,10 @@ ss::future<> state_machine_manager::apply_raft_snapshot() { } auto fut = co_await ss::coroutine::as_future( - do_apply_raft_snapshot(std::move(snapshot->metadata), snapshot->reader)); + acquire_background_apply_mutexes().then([&, this](auto units) mutable { + return do_apply_raft_snapshot( + std::move(snapshot->metadata), snapshot->reader, std::move(units)); + })); co_await snapshot->reader.close(); if (fut.failed()) { const auto e = fut.get_exception(); @@ -228,7 +231,9 @@ ss::future<> state_machine_manager::apply_raft_snapshot() { } ss::future<> state_machine_manager::do_apply_raft_snapshot( - snapshot_metadata metadata, storage::snapshot_reader& reader) { + snapshot_metadata metadata, + storage::snapshot_reader& reader, + std::vector background_apply_units) { const auto snapshot_file_sz = co_await reader.get_snapshot_size(); const auto last_offset = metadata.last_included_index; @@ -276,6 +281,7 @@ ss::future<> state_machine_manager::do_apply_raft_snapshot( }); } _next = model::next_offset(metadata.last_included_index); + background_apply_units.clear(); } ss::future<> state_machine_manager::apply_snapshot_to_stm( diff --git a/src/v/raft/state_machine_manager.h b/src/v/raft/state_machine_manager.h index 18d1ca8db09e..e60d36446df7 100644 --- a/src/v/raft/state_machine_manager.h +++ b/src/v/raft/state_machine_manager.h @@ -152,7 +152,9 @@ class state_machine_manager final { ss::future<> apply_raft_snapshot(); ss::future<> do_apply_raft_snapshot( - raft::snapshot_metadata metadata, storage::snapshot_reader& reader); + raft::snapshot_metadata metadata, + storage::snapshot_reader& reader, + std::vector background_apply_units); ss::future<> apply(); ss::future<> try_apply_in_foreground(); From 657dcddb4f3572bec98fd8d24dd03b9fbc79d5f5 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Sun, 19 May 2024 15:31:48 -0700 Subject: [PATCH 2/2] stm_manager/tests: test racing bg apply with snapshots (cherry picked from commit 9ed66c9623b6bd76fa9cb04904e877659cd97e1a) --- src/v/raft/tests/stm_manager_test.cc | 75 ++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/src/v/raft/tests/stm_manager_test.cc b/src/v/raft/tests/stm_manager_test.cc index 7452ad6d4f04..4044a864bac3 100644 --- a/src/v/raft/tests/stm_manager_test.cc +++ b/src/v/raft/tests/stm_manager_test.cc @@ -75,6 +75,47 @@ struct local_snapshot_stm : public simple_kv { }; }; +// State machine that induces lag from the tip of +// of the log +class slow_kv : public simple_kv { +public: + static constexpr std::string_view name = "slow_kv"; + + explicit slow_kv(raft_node_instance& rn) + : simple_kv(rn) {} + + ss::future<> apply(const model::record_batch& batch) override { + co_await ss::sleep(5ms); + co_return co_await simple_kv::apply(batch); + } + + ss::future<> apply_raft_snapshot(const iobuf&) override { + return ss::now(); + } +}; + +// Fails the first apply, starts a background fiber and not lets the +// background apply fiber finish relative to slow_kv +class bg_only_kv : public slow_kv { +public: + static constexpr std::string_view name = "bg_only_stm"; + + explicit bg_only_kv(raft_node_instance& rn) + : slow_kv(rn) {} + + ss::future<> apply(const model::record_batch& batch) override { + if (_first_apply) { + _first_apply = false; + throw std::runtime_error("induced failure"); + } + co_await ss::sleep(5ms); + co_return co_await slow_kv::apply(batch); + } + +private: + bool _first_apply = true; +}; + TEST_F_CORO(state_machine_fixture, test_basic_apply) { /** * Create 3 replicas group with simple_kv STM @@ -100,6 +141,40 @@ TEST_F_CORO(state_machine_fixture, test_basic_apply) { } } +TEST_F_CORO(state_machine_fixture, test_snapshot_with_bg_fibers) { + create_nodes(); + std::vector> stms; + for (auto& [id, node] : nodes()) { + raft::state_machine_manager_builder builder; + auto slow_kv_stm = builder.create_stm(*node); + auto bg_kv_stm = builder.create_stm(*node); + co_await node->init_and_start(all_vnodes(), std::move(builder)); + stms.push_back(ss::dynamic_pointer_cast(slow_kv_stm)); + stms.push_back(ss::dynamic_pointer_cast(bg_kv_stm)); + } + auto& leader_node = node(co_await wait_for_leader(10s)); + bool stop = false; + auto write_sleep_f = ss::do_until( + [&stop] { return stop; }, + [&] { + return build_random_state(1000).discard_result().then( + [] { return ss::sleep(3ms); }); + }); + + auto truncate_sleep_f = ss::do_until( + [&stop] { return stop; }, + [&] { + return leader_node.raft() + ->write_snapshot({leader_node.raft()->committed_offset(), iobuf{}}) + .then([] { return ss::sleep(3ms); }); + }); + + co_await ss::sleep(10s); + stop = true; + co_await std::move(write_sleep_f); + co_await std::move(truncate_sleep_f); +} + TEST_F_CORO(state_machine_fixture, test_apply_throwing_exception) { /** * Create 3 replicas group with simple_kv STM