Skip to content

Commit

Permalink
Merge pull request #11726 from mmaslankaprv/fix-11657
Browse files Browse the repository at this point in the history
Made prevote phase not resetting last heartbeat timestamp
  • Loading branch information
mmaslankaprv authored Jul 11, 2023
2 parents 412f31a + c6555b6 commit e2c9962
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 41 deletions.
77 changes: 37 additions & 40 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1619,7 +1619,7 @@ consensus::get_last_entry_term(const storage::offset_stats& lstats) const {
return _last_snapshot_term;
}

ss::future<vote_reply> consensus::do_vote(vote_request&& r) {
ss::future<vote_reply> consensus::do_vote(vote_request r) {
vote_reply reply;
reply.term = _term;
reply.target_node_id = r.node_id;
Expand All @@ -1628,12 +1628,13 @@ ss::future<vote_reply> consensus::do_vote(vote_request&& r) {
auto last_log_index = lstats.dirty_offset;
_probe->vote_request();
auto last_entry_term = get_last_entry_term(lstats);
vlog(_ctxlog.trace, "Vote request: {}", r);
bool term_changed = false;
vlog(_ctxlog.trace, "Received vote request: {}", r);

if (unlikely(is_request_target_node_invalid("vote", r))) {
reply.log_ok = false;
reply.granted = false;
return ss::make_ready_future<vote_reply>(reply);
co_return reply;
}

// Optimization: for vote requests from nodes that are likely
Expand All @@ -1657,10 +1658,8 @@ ss::future<vote_reply> consensus::do_vote(vote_request&& r) {
"Vote request from peer {} with heartbeat failures, "
"resetting backoff",
r.node_id);
return _client_protocol.reset_backoff(r.node_id.id())
.then([reply]() {
return ss::make_ready_future<vote_reply>(reply);
});
co_await _client_protocol.reset_backoff(r.node_id.id());
co_return reply;
}
}
}
Expand All @@ -1683,7 +1682,7 @@ ss::future<vote_reply> consensus::do_vote(vote_request&& r) {
"Already heard from the leader, not granting vote to node {}",
r.node_id);
reply.granted = false;
return ss::make_ready_future<vote_reply>(std::move(reply));
co_return reply;
}

/// set to true if the caller's log is as up to date as the recipient's
Expand All @@ -1695,7 +1694,7 @@ ss::future<vote_reply> consensus::do_vote(vote_request&& r) {

// raft.pdf: reply false if term < currentTerm (§5.1)
if (r.term < _term) {
return ss::make_ready_future<vote_reply>(std::move(reply));
co_return reply;
}
auto n_priority = get_node_priority(r.node_id);
// do not grant vote if voter priority is lower than current target
Expand All @@ -1709,21 +1708,21 @@ ss::future<vote_reply> consensus::do_vote(vote_request&& r) {
n_priority,
_target_priority);
reply.granted = false;
return ss::make_ready_future<vote_reply>(reply);
co_return reply;
}

if (r.term > _term) {
vlog(
_ctxlog.info,
"Received vote request with larger term from node {}, received "
"{}, "
"Received vote request with larger term from node {}, received {}, "
"current {}",
r.node_id,
r.term,
_term);
reply.term = r.term;
_term = r.term;
_voted_for = {};
term_changed = true;
do_step_down("voter_term_greater");
if (_leader_id) {
_leader_id = std::nullopt;
Expand All @@ -1736,45 +1735,42 @@ ss::future<vote_reply> consensus::do_vote(vote_request&& r) {
// would cause subsequent votes to fail (_hbeat is updated by the
// leader)
_hbeat = clock_type::time_point::min();
return ss::make_ready_future<vote_reply>(reply);
co_return reply;
}
}

// do not grant vote if log isn't ok
if (!reply.log_ok) {
return ss::make_ready_future<vote_reply>(reply);
co_return reply;
}

if (_voted_for.id()() < 0) {
auto node_id = r.node_id;
return write_voted_for({node_id, _term})
.then_wrapped([this, reply = std::move(reply), r = std::move(r)](
ss::future<> f) mutable {
bool granted = false;

if (f.failed()) {
vlog(
_ctxlog.warn,
"Unable to persist raft group state, vote not granted "
"- {}",
f.get_exception());
} else {
_voted_for = r.node_id;
_hbeat = clock_type::now();
granted = true;
}

reply.granted = granted;
try {
co_await write_voted_for({r.node_id, _term});
} catch (...) {
vlog(
_ctxlog.warn,
"Unable to persist raft group state, vote not granted "
"- {}",
std::current_exception());
reply.granted = false;
co_return reply;
}
_voted_for = r.node_id;
reply.granted = true;

return ss::make_ready_future<vote_reply>(std::move(reply));
});
} else {
reply.granted = (r.node_id == _voted_for);
if (reply.granted) {
_hbeat = clock_type::now();
}
return ss::make_ready_future<vote_reply>(std::move(reply));
}
/**
* Only update last heartbeat value, indicating existence of a leader if a
* term change i.e. a node is not processing prevote_request.
*/
if (reply.granted && term_changed) {
_hbeat = clock_type::now();
}

co_return reply;
}

ss::future<append_entries_reply>
Expand Down Expand Up @@ -1802,7 +1798,8 @@ consensus::do_append_entries(append_entries_request&& r) {
return ss::make_ready_future<append_entries_reply>(reply);
}
// no need to trigger timeout
vlog(_ctxlog.trace, "Append entries request: {}", request_metadata);
vlog(
_ctxlog.trace, "Received append entries request: {}", request_metadata);

// raft.pdf: Reply false if term < currentTerm (§5.1)
if (request_metadata.term < _term) {
Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ class consensus {
// all these private functions assume that we are under exclusive operations
// via the _op_sem
void do_step_down(std::string_view);
ss::future<vote_reply> do_vote(vote_request&&);
ss::future<vote_reply> do_vote(vote_request);
ss::future<append_entries_reply>
do_append_entries(append_entries_request&&);
ss::future<install_snapshot_reply>
Expand Down
96 changes: 96 additions & 0 deletions tests/rptest/tests/controller_availability_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright 2020 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from math import ceil
import random
from rptest.services.admin import Admin
from rptest.services.cluster import cluster
from ducktape.utils.util import wait_until

from ducktape.tests.test import Test
from ducktape.mark import matrix

from rptest.services.redpanda import make_redpanda_service
from rptest.utils.mode_checks import cleanup_on_early_exit


class ControllerAvailabilityTest(Test):
def __init__(self, test_ctx, *args, **kwargs):
self.ctx = test_ctx
self.redpanda = None
self.admin = None
super().__init__(test_ctx, *args, **kwargs)

def start_redpanda(self, cluster_size):
self.redpanda = make_redpanda_service(self.ctx,
num_brokers=cluster_size)
self.redpanda.start()
self.admin = Admin(self.redpanda)

def _tolerated_failures(self, cluster_size):
return int(ceil(cluster_size / 2.0) - 1)

def _controller_stable(self):

started_ids = set(
[self.redpanda.node_id(n) for n in self.redpanda.started_nodes()])
self.logger.info(f"started redpanda nodes: {started_ids}")

controller = self.redpanda.controller()

if controller is None:
self.logger.warn("No controller elected")
return False
self.logger.info(
f"controller exists in the cluster: {controller.account.hostname} node_id: {self.redpanda.node_id(controller)}"
)

if self.redpanda.node_id(controller) not in started_ids:
self.logger.info(
f"Reported controller node {controller} is obsolete as it was stopped"
)
return False

statuses = []
for n in self.redpanda.started_nodes():
controller_status = self.admin.get_controller_status(n)
self.logger.info(
f"Status: {controller_status} from {n.account.hostname}")
statuses.append(controller_status)

return all([cs == statuses[0] for cs in statuses[1:]])

def _check_metrics(self, cluster_size):
sent_vote_metrics = self.redpanda.metrics_sample(
"sent_vote_requests", self.redpanda.started_nodes())

for m in sent_vote_metrics.samples:
self.logger.debug("Vote requests metric sample: {m}")
assert (
m.value <= 2 * cluster_size,
f"two rounds of leader election must be enough to elect a leader, current node vote request count: {m.value}"
)

@cluster(num_nodes=5)
@matrix(cluster_size=[3, 4, 5], stop=["single", "minority"])
def test_controller_availability_with_nodes_down(self, cluster_size, stop):
# start cluster
self.start_redpanda(cluster_size)
to_kill = self._tolerated_failures(
cluster_size) if stop == "minority" else 1

for i in range(0, to_kill):
self.logger.info(f"stopping node {i}")
self.redpanda.stop_node(self.redpanda.nodes[i], forced=True)

wait_until(lambda: self._controller_stable(), 10, 0.5,
"Controller is not available")
self._check_metrics(cluster_size)

cleanup_on_early_exit(self)

0 comments on commit e2c9962

Please sign in to comment.