diff --git a/src/v/raft/tests/CMakeLists.txt b/src/v/raft/tests/CMakeLists.txt index 524904d320fa6..363dc16812c4c 100644 --- a/src/v/raft/tests/CMakeLists.txt +++ b/src/v/raft/tests/CMakeLists.txt @@ -51,6 +51,7 @@ set(gsrcs raft_fixture.cc basic_raft_fixture_test.cc stm_manager_test.cc + raft_reconfiguration_test.cc ) rp_test( diff --git a/src/v/raft/tests/raft_fixture.cc b/src/v/raft/tests/raft_fixture.cc index 17c59098e72cb..94366625f6e83 100644 --- a/src/v/raft/tests/raft_fixture.cc +++ b/src/v/raft/tests/raft_fixture.cc @@ -370,8 +370,9 @@ ss::future<> raft_node_instance::start( co_await _hb_manager->start(); co_await _recovery_throttle.start( - config::mock_binding(10 * 1024 * 1024), - config::mock_binding(false)); + config::mock_binding(100_MiB), config::mock_binding(false)); + co_await _recovery_throttle.invoke_on_all( + &coordinated_recovery_throttle::start); co_await _storage.start( [this]() { @@ -451,10 +452,13 @@ void raft_node_instance::leadership_notification_callback( ss::future> raft_node_instance::read_all_data_batches() { - storage::log_reader_config cfg( - _raft->start_offset(), - model::offset::max(), - ss::default_priority_class()); + return read_batches_in_range(_raft->start_offset(), model::offset::max()); +} + +ss::future> +raft_node_instance::read_batches_in_range( + model::offset min, model::offset max) { + storage::log_reader_config cfg(min, max, ss::default_priority_class()); cfg.type_filter = model::record_batch_type::raft_data; auto rdr = co_await _raft->make_reader(cfg); @@ -463,6 +467,20 @@ raft_node_instance::read_all_data_batches() { std::move(rdr), model::no_timeout); } +ss::future +raft_node_instance::random_batch_base_offset(model::offset max) { + model::offset read_start( + random_generators::get_int(_raft->start_offset(), max)); + model::offset last = model::next_offset(read_start); + ss::circular_buffer batches; + while (batches.empty()) { + batches = co_await read_batches_in_range(read_start, last); + last++; + } + + co_return batches.front().base_offset(); +} + seastar::future<> raft_fixture::TearDownAsync() { co_await seastar::coroutine::parallel_for_each( _nodes, [](auto& pair) { return pair.second->stop(); }); @@ -543,13 +561,22 @@ ss::future<> raft_fixture::create_simple_group(size_t number_of_nodes) { ss::future<> raft_fixture::wait_for_committed_offset( model::offset offset, std::chrono::milliseconds timeout) { - return tests::cooperative_spin_wait_with_timeout(timeout, [this, offset] { + RPTEST_REQUIRE_EVENTUALLY_CORO(timeout, [this, offset] { return std::all_of( nodes().begin(), nodes().end(), [offset](auto& pair) { return pair.second->raft()->committed_offset() >= offset; }); }); } +ss::future<> raft_fixture::wait_for_visible_offset( + model::offset offset, std::chrono::milliseconds timeout) { + RPTEST_REQUIRE_EVENTUALLY_CORO(timeout, [this, offset] { + return std::all_of( + nodes().begin(), nodes().end(), [offset](auto& pair) { + return pair.second->raft()->last_visible_index() >= offset; + }); + }); +} std::ostream& operator<<(std::ostream& o, msg_type type) { switch (type) { case msg_type::append_entries: diff --git a/src/v/raft/tests/raft_fixture.h b/src/v/raft/tests/raft_fixture.h index d8ed8f779ff11..5dbbb1953c2d5 100644 --- a/src/v/raft/tests/raft_fixture.h +++ b/src/v/raft/tests/raft_fixture.h @@ -15,6 +15,7 @@ #include "bytes/iobuf.h" #include "config/property.h" #include "features/feature_table.h" +#include "model/fundamental.h" #include "raft/consensus.h" #include "raft/consensus_client_protocol.h" #include "raft/coordinated_recovery_throttle.h" @@ -24,6 +25,7 @@ #include "raft/recovery_memory_quota.h" #include "raft/state_machine_manager.h" #include "raft/types.h" +#include "random/generators.h" #include "storage/api.h" #include "test_utils/test.h" #include "utils/prefix_logger.h" @@ -172,6 +174,10 @@ class raft_node_instance : public ss::weakly_referencable { ss::future> read_all_data_batches(); + ss::future> + read_batches_in_range(model::offset min, model::offset max); + + ss::future random_batch_base_offset(model::offset max); private: model::node_id _id; @@ -244,23 +250,38 @@ class raft_fixture model::record_batch_reader make_batches(std::vector> batch_spec) { - ss::circular_buffer batches; - batches.reserve(batch_spec.size()); - for (auto& [k, v] : batch_spec) { + const auto sz = batch_spec.size(); + return make_batches(sz, [spec = std::move(batch_spec)](size_t idx) { + auto [k, v] = spec[idx]; storage::record_batch_builder builder( model::record_batch_type::raft_data, model::offset(0)); builder.add_raw_kv( serde::to_iobuf(std::move(k)), serde::to_iobuf(std::move(v))); - batches.push_back(std::move(builder).build()); + return std::move(builder).build(); + }); + } + + template + model::record_batch_reader + make_batches(size_t batch_count, Generator&& generator) { + ss::circular_buffer batches; + batches.reserve(batch_count); + for (auto b_idx : boost::irange(batch_count)) { + batches.push_back(generator(b_idx)); } return model::make_memory_record_batch_reader(std::move(batches)); } - ss::future<> assert_logs_equal() { + ss::future<> + assert_logs_equal(model::offset start_offset = model::offset{}) { std::vector> node_batches; for (auto& [id, n] : _nodes) { - node_batches.push_back(co_await n->read_all_data_batches()); + auto read_from = start_offset == model::offset{} + ? n->raft()->start_offset() + : start_offset; + node_batches.push_back(co_await n->read_batches_in_range( + read_from, model::offset::max())); } ASSERT_TRUE_CORO(std::all_of( node_batches.begin(), @@ -275,6 +296,9 @@ class raft_fixture ss::future<> wait_for_committed_offset( model::offset offset, std::chrono::milliseconds timeout); + ss::future<> wait_for_visible_offset( + model::offset offset, std::chrono::milliseconds timeout); + template auto with_leader(std::chrono::milliseconds timeout, Func&& f) { return wait_for_leader(timeout).then( diff --git a/src/v/raft/tests/raft_reconfiguration_test.cc b/src/v/raft/tests/raft_reconfiguration_test.cc new file mode 100644 index 0000000000000..4a8ff86fedfbb --- /dev/null +++ b/src/v/raft/tests/raft_reconfiguration_test.cc @@ -0,0 +1,231 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "bytes/bytes.h" +#include "gtest/gtest.h" +#include "model/fundamental.h" +#include "model/metadata.h" +#include "model/record.h" +#include "model/record_batch_reader.h" +#include "model/record_batch_types.h" +#include "model/timeout_clock.h" +#include "raft/group_configuration.h" +#include "raft/tests/raft_fixture.h" +#include "raft/tests/raft_group_fixture.h" +#include "raft/types.h" +#include "random/generators.h" +#include "serde/rw/rw.h" +#include "serde/serde.h" +#include "storage/record_batch_builder.h" +#include "test_utils/async.h" +#include "test_utils/test.h" + +#include +#include +#include +#include + +#include +#include +#include + +#include + +using namespace raft; + +/** + * Some basic Raft tests validating if Raft test fixture is working correctly + */ + +using use_snapshot = ss::bool_class; +using use_initial_learner_offset = ss::bool_class; +struct test_params { + use_snapshot snapshot; + int initial_size; + int nodes_to_add; + int nodes_to_remove; + use_initial_learner_offset learner_start_offset; + consistency_level consistency_level = raft::consistency_level::quorum_ack; +}; + +struct reconfiguration_test + : testing::WithParamInterface> + , raft_fixture { + ss::future<> + wait_for_reconfiguration_to_finish(std::chrono::milliseconds timeout) { + RPTEST_REQUIRE_EVENTUALLY_CORO(timeout, [this] { + for (auto& [_, n] : nodes()) { + if ( + n->raft()->config().get_state() + != raft::configuration_state::simple) { + return false; + } + } + return true; + }); + } + + auto make_random_batches() { + return make_batches( + random_generators::get_int(100, 500), [](size_t b_idx) { + storage::record_batch_builder builder( + model::record_batch_type::raft_data, model::offset(0)); + + for (int i = 0; i < random_generators::get_int(1, 10); ++i) { + auto r_size = random_generators::get_int(32, 1_KiB); + builder.add_raw_kv( + serde::to_iobuf(fmt::format("{}-{}", b_idx, i)), + bytes_to_iobuf(random_generators::get_bytes(r_size))); + } + + return std::move(builder).build(); + }); + } +}; + +TEST_P_CORO(reconfiguration_test, configuration_replace_test) { + const auto param = GetParam(); + use_snapshot snapshot = std::get<0>(param); + int initial_size = std::get<1>(param); + int nodes_to_add = std::get<2>(param); + int nodes_to_remove = std::get<3>(param); + use_initial_learner_offset use_learner_start_offset = std::get<4>(param); + consistency_level consistency_level = std::get<5>(param); + // skip test cases that makes no sense + if ( + nodes_to_add + initial_size - nodes_to_remove <= 0 + || initial_size < nodes_to_remove) { + co_return; + } + fmt::print( + "test parameters: {{snapshot: {}, initial_size: {}, " + "nodes_to_add: {}, nodes_to_remove: {}, use_learner_start_offset: {}, " + "consistency_lvl: {}}}\n", + snapshot, + initial_size, + nodes_to_add, + nodes_to_remove, + use_learner_start_offset, + consistency_level); + // create group with initial configuration + co_await create_simple_group(initial_size); + + // wait for leader + auto leader = co_await wait_for_leader(10s); + auto& leader_node = node(leader); + + // replicate batches + auto result = co_await leader_node.raft()->replicate( + make_random_batches(), replicate_options(consistency_level)); + + ASSERT_TRUE_CORO(result.has_value()); + model::offset start_offset = leader_node.raft()->start_offset(); + if (snapshot) { + if (consistency_level == consistency_level::leader_ack) { + co_await wait_for_visible_offset( + leader_node.raft()->dirty_offset(), 30s); + for (auto& [_, n] : nodes()) { + co_await n->raft()->refresh_commit_index(); + } + co_await wait_for_committed_offset(result.value().last_offset, 10s); + } + auto committed_offset = leader_node.raft()->committed_offset(); + + auto last_included_offset = model::prev_offset( + co_await leader_node.random_batch_base_offset( + committed_offset - model::offset(50))); + start_offset = model::next_offset(last_included_offset); + co_await leader_node.raft()->write_snapshot( + raft::write_snapshot_cfg(last_included_offset, {})); + } + std::optional learner_start_offset; + if (use_learner_start_offset) { + model::offset offset(random_generators::get_int( + start_offset, leader_node.raft()->dirty_offset())); + + learner_start_offset = co_await leader_node.random_batch_base_offset( + leader_node.raft()->committed_offset()); + start_offset = *learner_start_offset; + } + + auto current_nodes = all_vnodes(); + + for (int i = 0; i < nodes_to_remove; ++i) { + current_nodes.pop_back(); + } + absl::flat_hash_set added_nodes; + co_await ss::coroutine::parallel_for_each( + boost::irange(nodes_to_add), [&](int i) { + auto& n = add_node( + model::node_id(initial_size + i), model::revision_id(0)); + current_nodes.push_back(n.get_vnode()); + added_nodes.emplace(n.get_vnode()); + return n.start({}); + }); + + ASSERT_EQ_CORO( + current_nodes.size(), initial_size + nodes_to_add - nodes_to_remove); + + // update group configuration + co_await leader_node.raft()->replace_configuration( + current_nodes, model::revision_id(0), learner_start_offset); + leader = co_await wait_for_leader(10s); + auto& new_leader_node = node(leader); + // wait for committed offset to propagate + auto committed_offset = new_leader_node.raft()->committed_offset(); + + // wait for committed offset to propagate + if (consistency_level == raft::consistency_level::quorum_ack) { + co_await wait_for_committed_offset(committed_offset, 30s); + } else { + co_await wait_for_visible_offset( + new_leader_node.raft()->last_visible_index(), 30s); + } + + co_await wait_for_reconfiguration_to_finish(30s); + + co_await assert_logs_equal(start_offset); + + absl::flat_hash_set current_nodes_set( + current_nodes.begin(), current_nodes.end()); + + // validate configuration + for (const auto& [_, n] : nodes()) { + auto cfg = n->raft()->config(); + auto cfg_vnodes = cfg.all_nodes(); + ASSERT_EQ_CORO( + current_nodes_set, + absl::flat_hash_set( + cfg_vnodes.begin(), cfg_vnodes.end())); + ASSERT_FALSE_CORO(cfg.old_config().has_value()); + ASSERT_TRUE_CORO(cfg.current_config().learners.empty()); + + if (learner_start_offset && added_nodes.contains(n->get_vnode())) { + ASSERT_EQ_CORO(n->raft()->start_offset(), learner_start_offset); + } + } +} + +INSTANTIATE_TEST_SUITE_P( + validate_replacing_raft_configuration, + reconfiguration_test, + testing::Combine( + testing::Values(use_snapshot::yes, use_snapshot::no), + testing::Values(1, 3), // initial size + testing::Values(0, 1, 3), // to add + testing::Values(0, 1, 3), // to remove + testing::Values(use_initial_learner_offset::yes), + testing::Values( + consistency_level::quorum_ack, consistency_level::leader_ack)));