Skip to content

Commit

Permalink
r/tests: added raft reconfiguration tests
Browse files Browse the repository at this point in the history
Added test covering reconfiguration scenarios with and without
snapshots.

Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Oct 3, 2023
1 parent 23d5f58 commit 4b64144
Show file tree
Hide file tree
Showing 4 changed files with 296 additions and 13 deletions.
1 change: 1 addition & 0 deletions src/v/raft/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ set(gsrcs
raft_fixture.cc
basic_raft_fixture_test.cc
stm_manager_test.cc
raft_reconfiguration_test.cc
)

rp_test(
Expand Down
41 changes: 34 additions & 7 deletions src/v/raft/tests/raft_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,9 @@ ss::future<> raft_node_instance::start(
co_await _hb_manager->start();

co_await _recovery_throttle.start(
config::mock_binding<size_t>(10 * 1024 * 1024),
config::mock_binding<bool>(false));
config::mock_binding<size_t>(100_MiB), config::mock_binding<bool>(false));
co_await _recovery_throttle.invoke_on_all(
&coordinated_recovery_throttle::start);

co_await _storage.start(
[this]() {
Expand Down Expand Up @@ -451,10 +452,13 @@ void raft_node_instance::leadership_notification_callback(

ss::future<ss::circular_buffer<model::record_batch>>
raft_node_instance::read_all_data_batches() {
storage::log_reader_config cfg(
_raft->start_offset(),
model::offset::max(),
ss::default_priority_class());
return read_batches_in_range(_raft->start_offset(), model::offset::max());
}

ss::future<ss::circular_buffer<model::record_batch>>
raft_node_instance::read_batches_in_range(
model::offset min, model::offset max) {
storage::log_reader_config cfg(min, max, ss::default_priority_class());
cfg.type_filter = model::record_batch_type::raft_data;

auto rdr = co_await _raft->make_reader(cfg);
Expand All @@ -463,6 +467,20 @@ raft_node_instance::read_all_data_batches() {
std::move(rdr), model::no_timeout);
}

ss::future<model::offset>
raft_node_instance::random_batch_base_offset(model::offset max) {
model::offset read_start(
random_generators::get_int<int64_t>(_raft->start_offset(), max));
model::offset last = model::next_offset(read_start);
ss::circular_buffer<model::record_batch> batches;
while (batches.empty()) {
batches = co_await read_batches_in_range(read_start, last);
last++;
}

co_return batches.front().base_offset();
}

seastar::future<> raft_fixture::TearDownAsync() {
co_await seastar::coroutine::parallel_for_each(
_nodes, [](auto& pair) { return pair.second->stop(); });
Expand Down Expand Up @@ -543,13 +561,22 @@ ss::future<> raft_fixture::create_simple_group(size_t number_of_nodes) {

ss::future<> raft_fixture::wait_for_committed_offset(
model::offset offset, std::chrono::milliseconds timeout) {
return tests::cooperative_spin_wait_with_timeout(timeout, [this, offset] {
RPTEST_REQUIRE_EVENTUALLY_CORO(timeout, [this, offset] {
return std::all_of(
nodes().begin(), nodes().end(), [offset](auto& pair) {
return pair.second->raft()->committed_offset() >= offset;
});
});
}
ss::future<> raft_fixture::wait_for_visible_offset(
model::offset offset, std::chrono::milliseconds timeout) {
RPTEST_REQUIRE_EVENTUALLY_CORO(timeout, [this, offset] {
return std::all_of(
nodes().begin(), nodes().end(), [offset](auto& pair) {
return pair.second->raft()->last_visible_index() >= offset;
});
});
}
std::ostream& operator<<(std::ostream& o, msg_type type) {
switch (type) {
case msg_type::append_entries:
Expand Down
36 changes: 30 additions & 6 deletions src/v/raft/tests/raft_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "bytes/iobuf.h"
#include "config/property.h"
#include "features/feature_table.h"
#include "model/fundamental.h"
#include "raft/consensus.h"
#include "raft/consensus_client_protocol.h"
#include "raft/coordinated_recovery_throttle.h"
Expand All @@ -24,6 +25,7 @@
#include "raft/recovery_memory_quota.h"
#include "raft/state_machine_manager.h"
#include "raft/types.h"
#include "random/generators.h"
#include "storage/api.h"
#include "test_utils/test.h"
#include "utils/prefix_logger.h"
Expand Down Expand Up @@ -172,6 +174,10 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {

ss::future<ss::circular_buffer<model::record_batch>>
read_all_data_batches();
ss::future<ss::circular_buffer<model::record_batch>>
read_batches_in_range(model::offset min, model::offset max);

ss::future<model::offset> random_batch_base_offset(model::offset max);

private:
model::node_id _id;
Expand Down Expand Up @@ -244,23 +250,38 @@ class raft_fixture

model::record_batch_reader
make_batches(std::vector<std::pair<ss::sstring, ss::sstring>> batch_spec) {
ss::circular_buffer<model::record_batch> batches;
batches.reserve(batch_spec.size());
for (auto& [k, v] : batch_spec) {
const auto sz = batch_spec.size();
return make_batches(sz, [spec = std::move(batch_spec)](size_t idx) {
auto [k, v] = spec[idx];
storage::record_batch_builder builder(
model::record_batch_type::raft_data, model::offset(0));
builder.add_raw_kv(
serde::to_iobuf(std::move(k)), serde::to_iobuf(std::move(v)));
batches.push_back(std::move(builder).build());
return std::move(builder).build();
});
}

template<typename Generator>
model::record_batch_reader
make_batches(size_t batch_count, Generator&& generator) {
ss::circular_buffer<model::record_batch> batches;
batches.reserve(batch_count);
for (auto b_idx : boost::irange(batch_count)) {
batches.push_back(generator(b_idx));
}

return model::make_memory_record_batch_reader(std::move(batches));
}

ss::future<> assert_logs_equal() {
ss::future<>
assert_logs_equal(model::offset start_offset = model::offset{}) {
std::vector<ss::circular_buffer<model::record_batch>> node_batches;
for (auto& [id, n] : _nodes) {
node_batches.push_back(co_await n->read_all_data_batches());
auto read_from = start_offset == model::offset{}
? n->raft()->start_offset()
: start_offset;
node_batches.push_back(co_await n->read_batches_in_range(
read_from, model::offset::max()));
}
ASSERT_TRUE_CORO(std::all_of(
node_batches.begin(),
Expand All @@ -275,6 +296,9 @@ class raft_fixture
ss::future<> wait_for_committed_offset(
model::offset offset, std::chrono::milliseconds timeout);

ss::future<> wait_for_visible_offset(
model::offset offset, std::chrono::milliseconds timeout);

template<typename Func>
auto with_leader(std::chrono::milliseconds timeout, Func&& f) {
return wait_for_leader(timeout).then(
Expand Down
231 changes: 231 additions & 0 deletions src/v/raft/tests/raft_reconfiguration_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
// Copyright 2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "bytes/bytes.h"
#include "gtest/gtest.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/record.h"
#include "model/record_batch_reader.h"
#include "model/record_batch_types.h"
#include "model/timeout_clock.h"
#include "raft/group_configuration.h"
#include "raft/tests/raft_fixture.h"
#include "raft/tests/raft_group_fixture.h"
#include "raft/types.h"
#include "random/generators.h"
#include "serde/rw/rw.h"
#include "serde/serde.h"
#include "storage/record_batch_builder.h"
#include "test_utils/async.h"
#include "test_utils/test.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/io_priority_class.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/bool_class.hh>

#include <absl/container/flat_hash_set.h>
#include <fmt/core.h>
#include <gtest/gtest.h>

#include <optional>

using namespace raft;

/**
* Some basic Raft tests validating if Raft test fixture is working correctly
*/

using use_snapshot = ss::bool_class<struct use_snapshot_tag>;
using use_initial_learner_offset = ss::bool_class<struct use_snapshot_tag>;
struct test_params {
use_snapshot snapshot;
int initial_size;
int nodes_to_add;
int nodes_to_remove;
use_initial_learner_offset learner_start_offset;
consistency_level consistency_level = raft::consistency_level::quorum_ack;
};

struct reconfiguration_test
: testing::WithParamInterface<std::tuple<
use_snapshot,
int,
int,
int,
use_initial_learner_offset,
consistency_level>>
, raft_fixture {
ss::future<>
wait_for_reconfiguration_to_finish(std::chrono::milliseconds timeout) {
RPTEST_REQUIRE_EVENTUALLY_CORO(timeout, [this] {
for (auto& [_, n] : nodes()) {
if (
n->raft()->config().get_state()
!= raft::configuration_state::simple) {
return false;
}
}
return true;
});
}

auto make_random_batches() {
return make_batches(
random_generators::get_int(100, 500), [](size_t b_idx) {
storage::record_batch_builder builder(
model::record_batch_type::raft_data, model::offset(0));

for (int i = 0; i < random_generators::get_int(1, 10); ++i) {
auto r_size = random_generators::get_int<size_t>(32, 1_KiB);
builder.add_raw_kv(
serde::to_iobuf(fmt::format("{}-{}", b_idx, i)),
bytes_to_iobuf(random_generators::get_bytes(r_size)));
}

return std::move(builder).build();
});
}
};

TEST_P_CORO(reconfiguration_test, configuration_replace_test) {
const auto param = GetParam();
use_snapshot snapshot = std::get<0>(param);
int initial_size = std::get<1>(param);
int nodes_to_add = std::get<2>(param);
int nodes_to_remove = std::get<3>(param);
use_initial_learner_offset use_learner_start_offset = std::get<4>(param);
consistency_level consistency_level = std::get<5>(param);
// skip test cases that makes no sense
if (
nodes_to_add + initial_size - nodes_to_remove <= 0
|| initial_size < nodes_to_remove) {
co_return;
}
fmt::print(
"test parameters: {{snapshot: {}, initial_size: {}, "
"nodes_to_add: {}, nodes_to_remove: {}, use_learner_start_offset: {}, "
"consistency_lvl: {}}}\n",
snapshot,
initial_size,
nodes_to_add,
nodes_to_remove,
use_learner_start_offset,
consistency_level);
// create group with initial configuration
co_await create_simple_group(initial_size);

// wait for leader
auto leader = co_await wait_for_leader(10s);
auto& leader_node = node(leader);

// replicate batches
auto result = co_await leader_node.raft()->replicate(
make_random_batches(), replicate_options(consistency_level));

ASSERT_TRUE_CORO(result.has_value());
model::offset start_offset = leader_node.raft()->start_offset();
if (snapshot) {
if (consistency_level == consistency_level::leader_ack) {
co_await wait_for_visible_offset(
leader_node.raft()->dirty_offset(), 30s);
for (auto& [_, n] : nodes()) {
co_await n->raft()->refresh_commit_index();
}
co_await wait_for_committed_offset(result.value().last_offset, 10s);
}
auto committed_offset = leader_node.raft()->committed_offset();

auto last_included_offset = model::prev_offset(
co_await leader_node.random_batch_base_offset(
committed_offset - model::offset(50)));
start_offset = model::next_offset(last_included_offset);
co_await leader_node.raft()->write_snapshot(
raft::write_snapshot_cfg(last_included_offset, {}));
}
std::optional<model::offset> learner_start_offset;
if (use_learner_start_offset) {
model::offset offset(random_generators::get_int<int64_t>(
start_offset, leader_node.raft()->dirty_offset()));

learner_start_offset = co_await leader_node.random_batch_base_offset(
leader_node.raft()->committed_offset());
start_offset = *learner_start_offset;
}

auto current_nodes = all_vnodes();

for (int i = 0; i < nodes_to_remove; ++i) {
current_nodes.pop_back();
}
absl::flat_hash_set<raft::vnode> added_nodes;
co_await ss::coroutine::parallel_for_each(
boost::irange(nodes_to_add), [&](int i) {
auto& n = add_node(
model::node_id(initial_size + i), model::revision_id(0));
current_nodes.push_back(n.get_vnode());
added_nodes.emplace(n.get_vnode());
return n.start({});
});

ASSERT_EQ_CORO(
current_nodes.size(), initial_size + nodes_to_add - nodes_to_remove);

// update group configuration
co_await leader_node.raft()->replace_configuration(
current_nodes, model::revision_id(0), learner_start_offset);
leader = co_await wait_for_leader(10s);
auto& new_leader_node = node(leader);
// wait for committed offset to propagate
auto committed_offset = new_leader_node.raft()->committed_offset();

// wait for committed offset to propagate
if (consistency_level == raft::consistency_level::quorum_ack) {
co_await wait_for_committed_offset(committed_offset, 30s);
} else {
co_await wait_for_visible_offset(
new_leader_node.raft()->last_visible_index(), 30s);
}

co_await wait_for_reconfiguration_to_finish(30s);

co_await assert_logs_equal(start_offset);

absl::flat_hash_set<raft::vnode> current_nodes_set(
current_nodes.begin(), current_nodes.end());

// validate configuration
for (const auto& [_, n] : nodes()) {
auto cfg = n->raft()->config();
auto cfg_vnodes = cfg.all_nodes();
ASSERT_EQ_CORO(
current_nodes_set,
absl::flat_hash_set<raft::vnode>(
cfg_vnodes.begin(), cfg_vnodes.end()));
ASSERT_FALSE_CORO(cfg.old_config().has_value());
ASSERT_TRUE_CORO(cfg.current_config().learners.empty());

if (learner_start_offset && added_nodes.contains(n->get_vnode())) {
ASSERT_EQ_CORO(n->raft()->start_offset(), learner_start_offset);
}
}
}

INSTANTIATE_TEST_SUITE_P(
validate_replacing_raft_configuration,
reconfiguration_test,
testing::Combine(
testing::Values(use_snapshot::yes, use_snapshot::no),
testing::Values(1, 3), // initial size
testing::Values(0, 1, 3), // to add
testing::Values(0, 1, 3), // to remove
testing::Values(use_initial_learner_offset::yes),
testing::Values(
consistency_level::quorum_ack, consistency_level::leader_ack)));

0 comments on commit 4b64144

Please sign in to comment.