Skip to content

Commit

Permalink
tests: added retries to raft force reconfiguration test
Browse files Browse the repository at this point in the history
Raft force reconfiguration test is designed to skip one of the nodes
when force updating configuration. In this scenario it may sometimes
happen that the node which wasn't updated with the configuration
override will become a leader. In this situation controller backend in
Redpanda will retry. Added retries to the test to make sure the test
will retry reconfiguration if it was reverted by node which was missing
update.

Fixes: #19938

Signed-off-by: Michał Maślanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Jul 4, 2024
1 parent d6cbe67 commit 30efad6
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 26 deletions.
24 changes: 24 additions & 0 deletions src/v/raft/tests/raft_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@
#include <absl/container/flat_hash_set.h>
#include <fmt/core.h>

#include <algorithm>
#include <chrono>
#include <filesystem>
#include <memory>
#include <optional>
#include <ranges>
#include <stdexcept>
#include <type_traits>

Expand Down Expand Up @@ -655,6 +657,28 @@ raft_fixture::wait_for_leader(model::timeout_clock::time_point deadline) {

co_return get_leader().value();
}

ss::future<model::node_id> raft_fixture::wait_for_leader_change(
model::timeout_clock::time_point deadline, model::term_id term) {
auto has_new_leader = [this, term] {
auto leader_id = get_leader();
if (leader_id && _nodes.contains(*leader_id)) {
auto& leader_node = node(*leader_id);
return leader_node.raft()->is_leader()
&& leader_node.raft()->term() > term;
}
return false;
};

while (!has_new_leader()) {
if (model::timeout_clock::now() > deadline) {
throw std::runtime_error("Timeout waiting for leader");
}
co_await ss::sleep(std::chrono::milliseconds(5));
}

co_return get_leader().value();
}
ss::future<model::node_id>
raft_fixture::wait_for_leader(std::chrono::milliseconds timeout) {
return wait_for_leader(timeout + model::timeout_clock::now());
Expand Down
2 changes: 2 additions & 0 deletions src/v/raft/tests/raft_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ class raft_fixture
ss::future<model::node_id> wait_for_leader(std::chrono::milliseconds);
ss::future<model::node_id>
wait_for_leader(model::timeout_clock::time_point);
ss::future<model::node_id> wait_for_leader_change(
model::timeout_clock::time_point deadline, model::term_id term);
seastar::future<> TearDownAsync() override;
seastar::future<> SetUpAsync() override;

Expand Down
86 changes: 60 additions & 26 deletions src/v/raft/tests/raft_reconfiguration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <seastar/core/coroutine.hh>
#include <seastar/core/future.hh>
#include <seastar/core/io_priority_class.hh>
#include <seastar/core/loop.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/bool_class.hh>

Expand Down Expand Up @@ -497,11 +498,67 @@ TEST_F_CORO(raft_fixture, test_force_reconfiguration) {

std::vector<vnode> base_replica_set = all_vnodes();
size_t reconfiguration_count = 0;
model::revision_id next_rev{1};

auto current_replicas = base_replica_set;

vlog(logger().info, "initial replicas: {}", current_replicas);

auto reconfigure_until_success = [&](
model::revision_id rev,
raft::vnode to_skip) {
auto deadline = model::timeout_clock::now() + 90s;
return ss::repeat([this, rev, deadline, to_skip, &current_replicas] {
vassert(
model::timeout_clock::now() < deadline,
"Timeout waiting for reconfiguration");
auto term = node(get_leader().value()).raft()->term();
return ss::parallel_for_each(
nodes().begin(),
nodes().end(),
[&current_replicas, to_skip, rev](
const raft_nodes_t::value_type& pair) {
auto raft = pair.second->raft();
if (pair.second->get_vnode() == to_skip) {
return ss::now();
}
return raft
->force_replace_configuration_locally(
current_replicas, {}, rev)
.discard_result();
})
.then([&current_replicas, this, rev, term] {
return wait_for_leader_change(
model::timeout_clock::now() + 10s, term)
.then([this, rev, &current_replicas](
model::node_id new_leader_id) {
vlog(
logger().info,
"new leader {} elected in term: {}",
new_leader_id,
nodes()[new_leader_id]->raft()->term());
auto replica_rev
= node(new_leader_id).raft()->config().revision_id();
if (replica_rev < rev) {
vlog(
logger().warn,
"retrying reconfiguration to {}, requested "
"revision: {}, node {} config revision: {}",
current_replicas,
rev,
new_leader_id,
replica_rev);
return ss::stop_iteration::no;
}
vlog(
logger().info,
"successfully reconfigured to {} with revision: {}",
current_replicas,
rev);
return ss::stop_iteration::yes;
});
});
});
};
auto reconfigure_all = [&, this]() {
/**
* Switch between all 5 replicas and randomly selected 3 of them
Expand All @@ -519,22 +576,8 @@ TEST_F_CORO(raft_fixture, test_force_reconfiguration) {

vlog(logger().info, "reconfiguring group to: {}", current_replicas);
auto to_skip = random_generators::random_choice(base_replica_set);

return ss::parallel_for_each(
nodes().begin(),
nodes().end(),
[&, to_skip](const raft_nodes_t::value_type& pair) {
auto raft = pair.second->raft();
model::revision_id rev = ++raft->config().revision_id();
if (
pair.second->get_vnode() == to_skip
&& !pair.second->raft()->is_leader()) {
return ss::now();
}
return raft
->force_replace_configuration_locally(current_replicas, {}, rev)
.discard_result();
});
auto revision = next_rev++;
return reconfigure_until_success(revision, to_skip);
};

auto reconfigure_fiber = ss::do_until(
Expand All @@ -551,15 +594,6 @@ TEST_F_CORO(raft_fixture, test_force_reconfiguration) {
})
.handle_exception([](const std::exception_ptr&) {
// ignore exception
})
.then([this] {
return wait_for_leader(10s).then([this](model::node_id id) {
vlog(
logger().info,
"new leader {}, term: {}",
id,
nodes()[id]->raft()->term());
});
});
});

Expand Down

0 comments on commit 30efad6

Please sign in to comment.