From 4ff810bc8b872fa0ae04533af8d911173fb7e916 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Tue, 27 Jun 2023 14:24:52 +0200 Subject: [PATCH 1/4] r/consensus: made request log entries more obvious Made entries indicating receiving append entries and vote request more obvious. Signed-off-by: Michal Maslanka --- src/v/raft/consensus.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index dd03d5c6c0d2c..46c5ea5c71f14 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -1635,7 +1635,7 @@ ss::future consensus::do_vote(vote_request&& r) { auto last_log_index = lstats.dirty_offset; _probe->vote_request(); auto last_entry_term = get_last_entry_term(lstats); - vlog(_ctxlog.trace, "Vote request: {}", r); + vlog(_ctxlog.trace, "Received vote request: {}", r); if (unlikely(is_request_target_node_invalid("vote", r))) { reply.log_ok = false; @@ -1722,8 +1722,7 @@ ss::future consensus::do_vote(vote_request&& r) { if (r.term > _term) { vlog( _ctxlog.info, - "Received vote request with larger term from node {}, received " - "{}, " + "Received vote request with larger term from node {}, received {}, " "current {}", r.node_id, r.term, From 7d68e46def71138a77cc71529039b66eb5324a3d Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Tue, 27 Jun 2023 14:33:47 +0200 Subject: [PATCH 2/4] r/consensus: replaced do_vote implementation with coroutines Signed-off-by: Michal Maslanka --- src/v/raft/consensus.cc | 67 ++++++++++++++++++----------------------- src/v/raft/consensus.h | 2 +- 2 files changed, 31 insertions(+), 38 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 46c5ea5c71f14..a2a2e4ad20780 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -1626,7 +1626,7 @@ consensus::get_last_entry_term(const storage::offset_stats& lstats) const { return _last_snapshot_term; } -ss::future consensus::do_vote(vote_request&& r) { +ss::future consensus::do_vote(vote_request r) { vote_reply reply; reply.term = _term; reply.target_node_id = r.node_id; @@ -1640,7 +1640,7 @@ ss::future consensus::do_vote(vote_request&& r) { if (unlikely(is_request_target_node_invalid("vote", r))) { reply.log_ok = false; reply.granted = false; - return ss::make_ready_future(reply); + co_return reply; } // Optimization: for vote requests from nodes that are likely @@ -1664,10 +1664,8 @@ ss::future consensus::do_vote(vote_request&& r) { "Vote request from peer {} with heartbeat failures, " "resetting backoff", r.node_id); - return _client_protocol.reset_backoff(r.node_id.id()) - .then([reply]() { - return ss::make_ready_future(reply); - }); + co_await _client_protocol.reset_backoff(r.node_id.id()); + co_return reply; } } } @@ -1690,7 +1688,7 @@ ss::future consensus::do_vote(vote_request&& r) { "Already heard from the leader, not granting vote to node {}", r.node_id); reply.granted = false; - return ss::make_ready_future(std::move(reply)); + co_return reply; } /// set to true if the caller's log is as up to date as the recipient's @@ -1702,7 +1700,7 @@ ss::future consensus::do_vote(vote_request&& r) { // raft.pdf: reply false if term < currentTerm (§5.1) if (r.term < _term) { - return ss::make_ready_future(std::move(reply)); + co_return reply; } auto n_priority = get_node_priority(r.node_id); // do not grant vote if voter priority is lower than current target @@ -1716,7 +1714,7 @@ ss::future consensus::do_vote(vote_request&& r) { n_priority, _target_priority); reply.granted = false; - return ss::make_ready_future(reply); + co_return reply; } if (r.term > _term) { @@ -1742,45 +1740,39 @@ ss::future consensus::do_vote(vote_request&& r) { // would cause subsequent votes to fail (_hbeat is updated by the // leader) _hbeat = clock_type::time_point::min(); - return ss::make_ready_future(reply); + co_return reply; } } // do not grant vote if log isn't ok if (!reply.log_ok) { - return ss::make_ready_future(reply); + co_return reply; } if (_voted_for.id()() < 0) { - auto node_id = r.node_id; - return write_voted_for({node_id, _term}) - .then_wrapped([this, reply = std::move(reply), r = std::move(r)]( - ss::future<> f) mutable { - bool granted = false; - - if (f.failed()) { - vlog( - _ctxlog.warn, - "Unable to persist raft group state, vote not granted " - "- {}", - f.get_exception()); - } else { - _voted_for = r.node_id; - _hbeat = clock_type::now(); - granted = true; - } - - reply.granted = granted; + try { + co_await write_voted_for({r.node_id, _term}); + } catch (...) { + vlog( + _ctxlog.warn, + "Unable to persist raft group state, vote not granted " + "- {}", + std::current_exception()); + reply.granted = false; + co_return reply; + } + _voted_for = r.node_id; + reply.granted = true; - return ss::make_ready_future(std::move(reply)); - }); } else { reply.granted = (r.node_id == _voted_for); - if (reply.granted) { - _hbeat = clock_type::now(); - } - return ss::make_ready_future(std::move(reply)); } + + if (reply.granted) { + _hbeat = clock_type::now(); + } + + co_return reply; } ss::future @@ -1808,7 +1800,8 @@ consensus::do_append_entries(append_entries_request&& r) { return ss::make_ready_future(reply); } // no need to trigger timeout - vlog(_ctxlog.trace, "Append entries request: {}", request_metadata); + vlog( + _ctxlog.trace, "Received append entries request: {}", request_metadata); // raft.pdf: Reply false if term < currentTerm (§5.1) if (request_metadata.term < _term) { diff --git a/src/v/raft/consensus.h b/src/v/raft/consensus.h index 6c1da6f18747a..97dc70275945f 100644 --- a/src/v/raft/consensus.h +++ b/src/v/raft/consensus.h @@ -462,7 +462,7 @@ class consensus { // all these private functions assume that we are under exclusive operations // via the _op_sem void do_step_down(std::string_view); - ss::future do_vote(vote_request&&); + ss::future do_vote(vote_request); ss::future do_append_entries(append_entries_request&&); ss::future From 995c7750fdcd6315785c1b982dc415979facb31e Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Tue, 27 Jun 2023 16:50:34 +0200 Subject: [PATCH 3/4] r/consensus: do not update heartbeat timestamp with prevote request When a voter receives vote request and it votes for the candidate it updates the last heartbeat timeout. If this happens during the prevote phase and in a deployment with even number of locks it may lead to temporary live lock and not being able to elect the leader. Fixes: #11657 Signed-off-by: Michal Maslanka --- src/v/raft/consensus.cc | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index a2a2e4ad20780..feb795fe74500 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -1635,6 +1635,7 @@ ss::future consensus::do_vote(vote_request r) { auto last_log_index = lstats.dirty_offset; _probe->vote_request(); auto last_entry_term = get_last_entry_term(lstats); + bool term_changed = false; vlog(_ctxlog.trace, "Received vote request: {}", r); if (unlikely(is_request_target_node_invalid("vote", r))) { @@ -1728,6 +1729,7 @@ ss::future consensus::do_vote(vote_request r) { reply.term = r.term; _term = r.term; _voted_for = {}; + term_changed = true; do_step_down("voter_term_greater"); if (_leader_id) { _leader_id = std::nullopt; @@ -1767,8 +1769,11 @@ ss::future consensus::do_vote(vote_request r) { } else { reply.granted = (r.node_id == _voted_for); } - - if (reply.granted) { + /** + * Only update last heartbeat value, indicating existence of a leader if a + * term change i.e. a node is not processing prevote_request. + */ + if (reply.granted && term_changed) { _hbeat = clock_type::now(); } From c6555b6c6076f760902efb3b97aba87e118c4acf Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 28 Jun 2023 09:23:15 +0200 Subject: [PATCH 4/4] tests: added very basic controller availability test Added test verifying if a controller is elected in timely fashion when some of the cluster nodes are down. Signed-off-by: Michal Maslanka --- .../tests/controller_availability_test.py | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 tests/rptest/tests/controller_availability_test.py diff --git a/tests/rptest/tests/controller_availability_test.py b/tests/rptest/tests/controller_availability_test.py new file mode 100644 index 0000000000000..ed48c01ffb7d7 --- /dev/null +++ b/tests/rptest/tests/controller_availability_test.py @@ -0,0 +1,96 @@ +# Copyright 2020 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from math import ceil +import random +from rptest.services.admin import Admin +from rptest.services.cluster import cluster +from ducktape.utils.util import wait_until + +from ducktape.tests.test import Test +from ducktape.mark import matrix + +from rptest.services.redpanda import make_redpanda_service +from rptest.utils.mode_checks import cleanup_on_early_exit + + +class ControllerAvailabilityTest(Test): + def __init__(self, test_ctx, *args, **kwargs): + self.ctx = test_ctx + self.redpanda = None + self.admin = None + super().__init__(test_ctx, *args, **kwargs) + + def start_redpanda(self, cluster_size): + self.redpanda = make_redpanda_service(self.ctx, + num_brokers=cluster_size) + self.redpanda.start() + self.admin = Admin(self.redpanda) + + def _tolerated_failures(self, cluster_size): + return int(ceil(cluster_size / 2.0) - 1) + + def _controller_stable(self): + + started_ids = set( + [self.redpanda.node_id(n) for n in self.redpanda.started_nodes()]) + self.logger.info(f"started redpanda nodes: {started_ids}") + + controller = self.redpanda.controller() + + if controller is None: + self.logger.warn("No controller elected") + return False + self.logger.info( + f"controller exists in the cluster: {controller.account.hostname} node_id: {self.redpanda.node_id(controller)}" + ) + + if self.redpanda.node_id(controller) not in started_ids: + self.logger.info( + f"Reported controller node {controller} is obsolete as it was stopped" + ) + return False + + statuses = [] + for n in self.redpanda.started_nodes(): + controller_status = self.admin.get_controller_status(n) + self.logger.info( + f"Status: {controller_status} from {n.account.hostname}") + statuses.append(controller_status) + + return all([cs == statuses[0] for cs in statuses[1:]]) + + def _check_metrics(self, cluster_size): + sent_vote_metrics = self.redpanda.metrics_sample( + "sent_vote_requests", self.redpanda.started_nodes()) + + for m in sent_vote_metrics.samples: + self.logger.debug("Vote requests metric sample: {m}") + assert ( + m.value <= 2 * cluster_size, + f"two rounds of leader election must be enough to elect a leader, current node vote request count: {m.value}" + ) + + @cluster(num_nodes=5) + @matrix(cluster_size=[3, 4, 5], stop=["single", "minority"]) + def test_controller_availability_with_nodes_down(self, cluster_size, stop): + # start cluster + self.start_redpanda(cluster_size) + to_kill = self._tolerated_failures( + cluster_size) if stop == "minority" else 1 + + for i in range(0, to_kill): + self.logger.info(f"stopping node {i}") + self.redpanda.stop_node(self.redpanda.nodes[i], forced=True) + + wait_until(lambda: self._controller_stable(), 10, 0.5, + "Controller is not available") + self._check_metrics(cluster_size) + + cleanup_on_early_exit(self)