Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made prevote phase not resetting last heartbeat timestamp #11726

Merged
merged 4 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 37 additions & 40 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1626,7 +1626,7 @@ consensus::get_last_entry_term(const storage::offset_stats& lstats) const {
return _last_snapshot_term;
}

ss::future<vote_reply> consensus::do_vote(vote_request&& r) {
ss::future<vote_reply> consensus::do_vote(vote_request r) {
vote_reply reply;
reply.term = _term;
reply.target_node_id = r.node_id;
Expand All @@ -1635,12 +1635,13 @@ ss::future<vote_reply> consensus::do_vote(vote_request&& r) {
auto last_log_index = lstats.dirty_offset;
_probe->vote_request();
auto last_entry_term = get_last_entry_term(lstats);
vlog(_ctxlog.trace, "Vote request: {}", r);
bool term_changed = false;
vlog(_ctxlog.trace, "Received vote request: {}", r);

if (unlikely(is_request_target_node_invalid("vote", r))) {
reply.log_ok = false;
reply.granted = false;
return ss::make_ready_future<vote_reply>(reply);
co_return reply;
}

// Optimization: for vote requests from nodes that are likely
Expand All @@ -1664,10 +1665,8 @@ ss::future<vote_reply> consensus::do_vote(vote_request&& r) {
"Vote request from peer {} with heartbeat failures, "
"resetting backoff",
r.node_id);
return _client_protocol.reset_backoff(r.node_id.id())
.then([reply]() {
return ss::make_ready_future<vote_reply>(reply);
});
co_await _client_protocol.reset_backoff(r.node_id.id());
co_return reply;
}
}
}
Expand All @@ -1690,7 +1689,7 @@ ss::future<vote_reply> consensus::do_vote(vote_request&& r) {
"Already heard from the leader, not granting vote to node {}",
r.node_id);
reply.granted = false;
return ss::make_ready_future<vote_reply>(std::move(reply));
co_return reply;
}

/// set to true if the caller's log is as up to date as the recipient's
Expand All @@ -1702,7 +1701,7 @@ ss::future<vote_reply> consensus::do_vote(vote_request&& r) {

// raft.pdf: reply false if term < currentTerm (§5.1)
if (r.term < _term) {
return ss::make_ready_future<vote_reply>(std::move(reply));
co_return reply;
}
auto n_priority = get_node_priority(r.node_id);
// do not grant vote if voter priority is lower than current target
Expand All @@ -1716,21 +1715,21 @@ ss::future<vote_reply> consensus::do_vote(vote_request&& r) {
n_priority,
_target_priority);
reply.granted = false;
return ss::make_ready_future<vote_reply>(reply);
co_return reply;
}

if (r.term > _term) {
vlog(
_ctxlog.info,
"Received vote request with larger term from node {}, received "
"{}, "
"Received vote request with larger term from node {}, received {}, "
"current {}",
r.node_id,
r.term,
_term);
reply.term = r.term;
_term = r.term;
_voted_for = {};
term_changed = true;
do_step_down("voter_term_greater");
if (_leader_id) {
_leader_id = std::nullopt;
Expand All @@ -1743,45 +1742,42 @@ ss::future<vote_reply> consensus::do_vote(vote_request&& r) {
// would cause subsequent votes to fail (_hbeat is updated by the
// leader)
_hbeat = clock_type::time_point::min();
return ss::make_ready_future<vote_reply>(reply);
co_return reply;
}
}

// do not grant vote if log isn't ok
if (!reply.log_ok) {
return ss::make_ready_future<vote_reply>(reply);
co_return reply;
}

if (_voted_for.id()() < 0) {
auto node_id = r.node_id;
return write_voted_for({node_id, _term})
.then_wrapped([this, reply = std::move(reply), r = std::move(r)](
ss::future<> f) mutable {
bool granted = false;

if (f.failed()) {
vlog(
_ctxlog.warn,
"Unable to persist raft group state, vote not granted "
"- {}",
f.get_exception());
} else {
_voted_for = r.node_id;
_hbeat = clock_type::now();
granted = true;
}

reply.granted = granted;
try {
co_await write_voted_for({r.node_id, _term});
} catch (...) {
vlog(
_ctxlog.warn,
"Unable to persist raft group state, vote not granted "
"- {}",
std::current_exception());
reply.granted = false;
co_return reply;
}
_voted_for = r.node_id;
reply.granted = true;

return ss::make_ready_future<vote_reply>(std::move(reply));
});
} else {
reply.granted = (r.node_id == _voted_for);
if (reply.granted) {
_hbeat = clock_type::now();
}
return ss::make_ready_future<vote_reply>(std::move(reply));
}
/**
* Only update last heartbeat value, indicating existence of a leader if a
* term change i.e. a node is not processing prevote_request.
*/
if (reply.granted && term_changed) {
_hbeat = clock_type::now();
}

co_return reply;
}

ss::future<append_entries_reply>
Expand Down Expand Up @@ -1809,7 +1805,8 @@ consensus::do_append_entries(append_entries_request&& r) {
return ss::make_ready_future<append_entries_reply>(reply);
}
// no need to trigger timeout
vlog(_ctxlog.trace, "Append entries request: {}", request_metadata);
vlog(
_ctxlog.trace, "Received append entries request: {}", request_metadata);

// raft.pdf: Reply false if term < currentTerm (§5.1)
if (request_metadata.term < _term) {
Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ class consensus {
// all these private functions assume that we are under exclusive operations
// via the _op_sem
void do_step_down(std::string_view);
ss::future<vote_reply> do_vote(vote_request&&);
ss::future<vote_reply> do_vote(vote_request);
ss::future<append_entries_reply>
do_append_entries(append_entries_request&&);
ss::future<install_snapshot_reply>
Expand Down
96 changes: 96 additions & 0 deletions tests/rptest/tests/controller_availability_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright 2020 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from math import ceil
import random
from rptest.services.admin import Admin
from rptest.services.cluster import cluster
from ducktape.utils.util import wait_until

from ducktape.tests.test import Test
from ducktape.mark import matrix

from rptest.services.redpanda import make_redpanda_service
from rptest.utils.mode_checks import cleanup_on_early_exit


class ControllerAvailabilityTest(Test):
def __init__(self, test_ctx, *args, **kwargs):
self.ctx = test_ctx
self.redpanda = None
self.admin = None
super().__init__(test_ctx, *args, **kwargs)

def start_redpanda(self, cluster_size):
self.redpanda = make_redpanda_service(self.ctx,
num_brokers=cluster_size)
self.redpanda.start()
self.admin = Admin(self.redpanda)

def _tolerated_failures(self, cluster_size):
return int(ceil(cluster_size / 2.0) - 1)

def _controller_stable(self):

started_ids = set(
[self.redpanda.node_id(n) for n in self.redpanda.started_nodes()])
self.logger.info(f"started redpanda nodes: {started_ids}")

controller = self.redpanda.controller()

if controller is None:
self.logger.warn("No controller elected")
return False
self.logger.info(
f"controller exists in the cluster: {controller.account.hostname} node_id: {self.redpanda.node_id(controller)}"
)

if self.redpanda.node_id(controller) not in started_ids:
self.logger.info(
f"Reported controller node {controller} is obsolete as it was stopped"
)
return False

statuses = []
for n in self.redpanda.started_nodes():
controller_status = self.admin.get_controller_status(n)
self.logger.info(
f"Status: {controller_status} from {n.account.hostname}")
statuses.append(controller_status)

return all([cs == statuses[0] for cs in statuses[1:]])

def _check_metrics(self, cluster_size):
sent_vote_metrics = self.redpanda.metrics_sample(
"sent_vote_requests", self.redpanda.started_nodes())

for m in sent_vote_metrics.samples:
self.logger.debug("Vote requests metric sample: {m}")
assert (
m.value <= 2 * cluster_size,
f"two rounds of leader election must be enough to elect a leader, current node vote request count: {m.value}"
)

@cluster(num_nodes=5)
@matrix(cluster_size=[3, 4, 5], stop=["single", "minority"])
def test_controller_availability_with_nodes_down(self, cluster_size, stop):
# start cluster
self.start_redpanda(cluster_size)
to_kill = self._tolerated_failures(
cluster_size) if stop == "minority" else 1

for i in range(0, to_kill):
self.logger.info(f"stopping node {i}")
self.redpanda.stop_node(self.redpanda.nodes[i], forced=True)

wait_until(lambda: self._controller_stable(), 10, 0.5,
"Controller is not available")
self._check_metrics(cluster_size)

cleanup_on_early_exit(self)