Skip to content

Commit

Permalink
Merge pull request #12990 from mmaslankaprv/fix-lb
Browse files Browse the repository at this point in the history
r/consensus: fixed reusing follower sequence id
  • Loading branch information
mmaslankaprv authored Sep 4, 2023
2 parents 559d110 + 343a92b commit f1cd0f0
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 23 deletions.
26 changes: 20 additions & 6 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -620,18 +620,30 @@ ss::future<result<model::offset>> consensus::linearizable_barrier() {
if (_vstate != vote_state::leader) {
co_return result<model::offset>(make_error_code(errc::not_leader));
}
// store current commit index
auto cfg = config();
auto dirty_offset = _log->offsets().dirty_offset;
/**
* Flush log on leader, to make sure the _commited_index will be updated
*/
co_await flush_log();
const auto cfg = config();
const auto offsets = _log->offsets();

vlog(
_ctxlog.trace,
"Linearizable barrier requested. Log state: {}, flushed offset: {}",
offsets,
_flushed_offset);

/**
* Dispatch round of heartbeats
*/

absl::flat_hash_map<vnode, follower_req_seq> sequences;
std::vector<ss::future<>> send_futures;
send_futures.reserve(cfg.unique_voter_count());
cfg.for_each_voter([this, dirty_offset, &sequences, &send_futures](
vnode target) {
cfg.for_each_voter([this,
dirty_offset = offsets.dirty_offset,
&sequences,
&send_futures](vnode target) {
// do not send request to self
if (target == _self) {
return;
Expand Down Expand Up @@ -696,7 +708,9 @@ ss::future<result<model::offset>> consensus::linearizable_barrier() {
} catch (const ss::broken_condition_variable& e) {
co_return ret_t(make_error_code(errc::shutting_down));
}

// grab an oplock to serialize state updates i.e. wait for all updates in
// the state that were caused by follower replies
auto units = co_await _op_lock.get_units();
// term have changed, not longer a leader
if (term != _term) {
co_return ret_t(make_error_code(errc::not_leader));
Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/heartbeat_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ heartbeat_manager::requests_for_range_v2() {
}
vlog(r->_ctxlog.trace, "[{}] full heartbeat", id);
r->_probe->full_heartbeat();
auto const seq_id = ++follower_metadata.last_sent_seq;
auto const seq_id = follower_metadata.last_sent_seq++;

follower_metadata.last_sent_protocol_meta = raft_metadata;
group_beat.data = heartbeat_request_data{
Expand Down
33 changes: 17 additions & 16 deletions src/v/raft/tests/append_entries_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "model/tests/random_batch.h"
#include "model/timestamp.h"
#include "raft/consensus_utils.h"
#include "raft/errc.h"
#include "raft/tests/raft_group_fixture.h"
#include "raft/types.h"
#include "random/generators.h"
Expand Down Expand Up @@ -740,24 +741,24 @@ FIXTURE_TEST(test_linarizable_barrier, raft_test_fixture) {
auto leader_id = wait_for_group_leader(gr);
auto leader_raft = gr.get_member(leader_id).consensus;

bool success = replicate_random_batches(gr, 5).get0();
bool success = replicate_random_batches(
gr, 500, raft::consistency_level::leader_ack, 10s)
.get0();
BOOST_REQUIRE(success);

leader_id = wait_for_group_leader(gr);
leader_raft = gr.get_member(leader_id).consensus;
auto r = leader_raft->linearizable_barrier().get();

std::vector<size_t> sizes;
if (r) {
auto logs = gr.read_all_logs();
for (auto& l : logs) {
sizes.push_back(l.second.size());
}
std::sort(sizes.begin(), sizes.end());
// at least 2 out of 3 nodes MUST have all entries replicated
BOOST_REQUIRE_GT(sizes[2], 1);
BOOST_REQUIRE_EQUAL(sizes[2], sizes[1]);
}
result<model::offset> r(raft::errc::timeout);
retry_with_leader(gr, 5, 30s, [&r](raft_node& leader_node) {
return leader_node.consensus->linearizable_barrier().then(
[&r](result<model::offset> l_offset) {
r = l_offset;
return l_offset.has_value();
});
}).get();

// linerizable barrier must succeed with stable leader
BOOST_REQUIRE(r.has_value());
BOOST_REQUIRE_EQUAL(r.value(), leader_raft->committed_offset());
BOOST_REQUIRE_EQUAL(r.value(), leader_raft->dirty_offset());
};

FIXTURE_TEST(test_linarizable_barrier_single_node, raft_test_fixture) {
Expand Down

0 comments on commit f1cd0f0

Please sign in to comment.