diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index a4749ed64cd11..98c5bd3d9ba3d 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -620,9 +620,19 @@ ss::future> consensus::linearizable_barrier() { if (_vstate != vote_state::leader) { co_return result(make_error_code(errc::not_leader)); } - // store current commit index - auto cfg = config(); - auto dirty_offset = _log->offsets().dirty_offset; + /** + * Flush log on leader, to make sure the _commited_index will be updated + */ + co_await flush_log(); + const auto cfg = config(); + const auto offsets = _log->offsets(); + + vlog( + _ctxlog.trace, + "Linearizable barrier requested. Log state: {}, flushed offset: {}", + offsets, + _flushed_offset); + /** * Dispatch round of heartbeats */ @@ -630,8 +640,10 @@ ss::future> consensus::linearizable_barrier() { absl::flat_hash_map sequences; std::vector> send_futures; send_futures.reserve(cfg.unique_voter_count()); - cfg.for_each_voter([this, dirty_offset, &sequences, &send_futures]( - vnode target) { + cfg.for_each_voter([this, + dirty_offset = offsets.dirty_offset, + &sequences, + &send_futures](vnode target) { // do not send request to self if (target == _self) { return; @@ -696,7 +708,9 @@ ss::future> consensus::linearizable_barrier() { } catch (const ss::broken_condition_variable& e) { co_return ret_t(make_error_code(errc::shutting_down)); } - + // grab an oplock to serialize state updates i.e. wait for all updates in + // the state that were caused by follower replies + auto units = co_await _op_lock.get_units(); // term have changed, not longer a leader if (term != _term) { co_return ret_t(make_error_code(errc::not_leader)); diff --git a/src/v/raft/heartbeat_manager.cc b/src/v/raft/heartbeat_manager.cc index ce4ff31854370..cbeb214abb015 100644 --- a/src/v/raft/heartbeat_manager.cc +++ b/src/v/raft/heartbeat_manager.cc @@ -181,7 +181,7 @@ heartbeat_manager::requests_for_range_v2() { } vlog(r->_ctxlog.trace, "[{}] full heartbeat", id); r->_probe->full_heartbeat(); - auto const seq_id = ++follower_metadata.last_sent_seq; + auto const seq_id = follower_metadata.last_sent_seq++; follower_metadata.last_sent_protocol_meta = raft_metadata; group_beat.data = heartbeat_request_data{ diff --git a/src/v/raft/tests/append_entries_test.cc b/src/v/raft/tests/append_entries_test.cc index c00f79cfa0067..13e9ba497bfe7 100644 --- a/src/v/raft/tests/append_entries_test.cc +++ b/src/v/raft/tests/append_entries_test.cc @@ -16,6 +16,7 @@ #include "model/tests/random_batch.h" #include "model/timestamp.h" #include "raft/consensus_utils.h" +#include "raft/errc.h" #include "raft/tests/raft_group_fixture.h" #include "raft/types.h" #include "random/generators.h" @@ -740,24 +741,24 @@ FIXTURE_TEST(test_linarizable_barrier, raft_test_fixture) { auto leader_id = wait_for_group_leader(gr); auto leader_raft = gr.get_member(leader_id).consensus; - bool success = replicate_random_batches(gr, 5).get0(); + bool success = replicate_random_batches( + gr, 500, raft::consistency_level::leader_ack, 10s) + .get0(); BOOST_REQUIRE(success); - - leader_id = wait_for_group_leader(gr); leader_raft = gr.get_member(leader_id).consensus; - auto r = leader_raft->linearizable_barrier().get(); - - std::vector sizes; - if (r) { - auto logs = gr.read_all_logs(); - for (auto& l : logs) { - sizes.push_back(l.second.size()); - } - std::sort(sizes.begin(), sizes.end()); - // at least 2 out of 3 nodes MUST have all entries replicated - BOOST_REQUIRE_GT(sizes[2], 1); - BOOST_REQUIRE_EQUAL(sizes[2], sizes[1]); - } + result r(raft::errc::timeout); + retry_with_leader(gr, 5, 30s, [&r](raft_node& leader_node) { + return leader_node.consensus->linearizable_barrier().then( + [&r](result l_offset) { + r = l_offset; + return l_offset.has_value(); + }); + }).get(); + + // linerizable barrier must succeed with stable leader + BOOST_REQUIRE(r.has_value()); + BOOST_REQUIRE_EQUAL(r.value(), leader_raft->committed_offset()); + BOOST_REQUIRE_EQUAL(r.value(), leader_raft->dirty_offset()); }; FIXTURE_TEST(test_linarizable_barrier_single_node, raft_test_fixture) {