From baabe8fe8208a1f0575fb822e75347cb6c9010c7 Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 5 Jul 2023 15:27:38 +0200 Subject: [PATCH] r/consensus: refactored start method to be a coroutine Signed-off-by: Michal Maslanka --- src/v/raft/consensus.cc | 378 +++++++++++++++++++--------------------- 1 file changed, 176 insertions(+), 202 deletions(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index dd03d5c6c0d2..fb59f511b714 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -1282,216 +1282,190 @@ ss::future<> consensus::start() { } ss::future<> consensus::do_start() { - vlog(_ctxlog.info, "Starting"); - return _op_lock - .with([this] { - read_voted_for(); - - /* - * temporary workaround: - * - * if the group's ntp matches the pattern, then do not load the - * initial configuration snapshto from the keyvalue store. more info - * here: - * - * https://github.com/redpanda-data/redpanda/issues/1870 - */ - const auto& ntp = _log.config().ntp(); - const auto normalized_ntp = fmt::format( - "{}.{}.{}", ntp.ns(), ntp.tp.topic(), ntp.tp.partition()); - const auto& patterns = config::shard_local_cfg() - .full_raft_configuration_recovery_pattern(); - auto initial_state = std::any_of( - patterns.cbegin(), - patterns.cend(), - [&normalized_ntp](const ss::sstring& pattern) { - return pattern == "*" || normalized_ntp.starts_with(pattern); - }); - if (!initial_state) { - initial_state = is_initial_state(); - } + try { + auto u = co_await _op_lock.get_units(); - vlog( - _ctxlog.info, - "Starting with voted_for {} term {} initial_state {}", - _voted_for, - _term, - initial_state); + read_voted_for(); - return _configuration_manager.start(initial_state, _self.revision()) - .then([this] { - vlog( - _ctxlog.trace, - "Configuration manager started: {}", - _configuration_manager); - }) - .then([this, initial_state] { - offset_translator::must_reset must_reset{initial_state}; - - absl::btree_map offset2delta; - for (auto it = _configuration_manager.begin(); - it != _configuration_manager.end(); - ++it) { - offset2delta.emplace(it->first, it->second.idx()); - } + /* + * temporary workaround: + * + * if the group's ntp matches the pattern, then do not load the + * initial configuration snapshot from the keyvalue store. more info + * here: + * + * https://github.com/redpanda-data/redpanda/issues/1870 + */ + const auto& ntp = _log.config().ntp(); + const auto normalized_ntp = fmt::format( + "{}.{}.{}", ntp.ns(), ntp.tp.topic(), ntp.tp.partition()); + const auto& patterns = config::shard_local_cfg() + .full_raft_configuration_recovery_pattern(); + auto initial_state = std::any_of( + patterns.cbegin(), + patterns.cend(), + [&normalized_ntp](const ss::sstring& pattern) { + return pattern == "*" || normalized_ntp.starts_with(pattern); + }); + if (!initial_state) { + initial_state = is_initial_state(); + } - auto bootstrap = offset_translator::bootstrap_state{ - .offset2delta = std::move(offset2delta), - .highest_known_offset - = _configuration_manager.get_highest_known_offset(), - }; + vlog( + _ctxlog.info, + "Starting with voted_for {} term {} initial_state {}", + _voted_for, + _term, + initial_state); - return _offset_translator.start( - must_reset, std::move(bootstrap)); - }) - .then([this] { - return _snapshot_lock.with( - [this] { return hydrate_snapshot(); }); - }) - .then([this] { - vlog( - _ctxlog.debug, - "Starting raft bootstrap from {}", - _configuration_manager.get_highest_known_offset()); - return details::read_bootstrap_state( - _log, - model::next_offset( - _configuration_manager.get_highest_known_offset()), - _as); - }) - .then([this](configuration_bootstrap_state st) { - auto lstats = _log.offsets(); + co_await _configuration_manager.start(initial_state, _self.revision()); - vlog(_ctxlog.info, "Read bootstrap state: {}", st); - vlog(_ctxlog.info, "Current log offsets: {}", lstats); + vlog( + _ctxlog.trace, + "Configuration manager started: {}", + _configuration_manager); + offset_translator::must_reset must_reset{initial_state}; - // if log term is newer than the one comming from voted_for - // state, we reset voted_for state - if (lstats.dirty_offset_term > _term) { - _term = lstats.dirty_offset_term; - _voted_for = {}; - } - /** - * since we are starting, there were no new writes to the log - * before that point. It is safe to use dirty offset as a - * initial flushed offset since it is equal to last offset that - * exists on disk and was read in log recovery process. - */ - _flushed_offset = lstats.dirty_offset; - /** - * The configuration manager state may be divereged from the log - * state, as log is flushed lazily, we have to make sure that - * the log and configuration manager has exactly the same - * offsets range - */ - vlog( - _ctxlog.info, - "Truncating configurations at {}", - lstats.dirty_offset); - - auto f = _configuration_manager.truncate( - model::next_offset(lstats.dirty_offset)); - - /** - * We read some batches from the log and have to update the - * configuration manager. - */ - if (st.config_batches_seen() > 0) { - f = f.then([this, st = std::move(st)]() mutable { - return _configuration_manager.add( - std::move(st).release_configurations()); - }); - } + absl::btree_map offset2delta; + for (const auto& it : _configuration_manager) { + offset2delta.emplace(it.first, it.second.idx()); + } - return f.then([this] { - update_follower_stats(_configuration_manager.get_latest()); - }); - }) - .then( - [this] { return _offset_translator.sync_with_log(_log, _as); }) - .then([this] { - /** - * fix for incorrectly persisted configuration index. In - * previous version of redpanda due to the issue with - * incorrectly assigned raft configuration indicies - * (https://github.com/redpanda-data/redpanda/issues/2326) there - * may be a persistent corruption in offset translation caused - * by incorrectly persited configuration index. It may cause log - * offset to be negative. Here we check if this problem exists - * and if so apply necessary offset translation. - */ - const auto so = start_offset(); - // no prefix truncation was applied we do not need adjustment - if (so <= model::offset(0)) { - return ss::now(); - } - auto delta = _configuration_manager.offset_delta( - start_offset()); + auto bootstrap = offset_translator::bootstrap_state{ + .offset2delta = std::move(offset2delta), + .highest_known_offset + = _configuration_manager.get_highest_known_offset(), + }; - if (so >= delta) { - return ss::now(); - } - // if start offset is smaller than offset delta we need to apply - // adjustment - const configuration_manager::configuration_idx new_idx(so()); - vlog( - _ctxlog.info, - "adjusting configuration index, current start offset: {}, " - "delta: {}, new initial index: {}", - so, - delta, - new_idx); - return _configuration_manager.adjust_configuration_idx(new_idx); - }) - .then([this] { - auto next_election = clock_type::now(); - // set last heartbeat timestamp to prevent skipping first - // election - _hbeat = clock_type::time_point::min(); - auto conf - = _configuration_manager.get_latest().current_config(); - if (!conf.voters.empty() && _self == conf.voters.front()) { - // Arm immediate election for single node scenarios - // or for the very first start of the preferred leader - // in a multi-node group. Otherwise use standard election - // timeout. - if (conf.voters.size() > 1 && _term > model::term_id{0}) { - next_election += _jit.next_duration(); - } - } else { - // current node is not a preselected leader, add 2x jitter - // to give opportunity to the preselected leader to win - // the first round - next_election += _jit.base_duration() - + 2 * _jit.next_jitter_duration(); - } - if (!_bg.is_closed()) { - _vote_timeout.rearm(next_election); - } - }) - .then([this] { - auto last_applied = read_last_applied(); - if (last_applied > _commit_index) { - _commit_index = last_applied; - maybe_update_last_visible_index(_commit_index); - vlog( - _ctxlog.trace, - "Recovered commit_index: {}", - _commit_index); - } - }) - .then([this] { return _event_manager.start(); }) - .then([this] { _append_requests_buffer.start(); }) - .then([this] { - vlog( - _ctxlog.info, - "started raft, log offsets: {}, term: {}, configuration: {}", - _log.offsets(), - _term, - _configuration_manager.get_latest()); - }); - }) - .handle_exception_type([](const ss::broken_semaphore&) {}); + co_await _offset_translator.start(must_reset, std::move(bootstrap)); + + co_await _snapshot_lock.with([this] { return hydrate_snapshot(); }); + + vlog( + _ctxlog.debug, + "Starting raft bootstrap from {}", + _configuration_manager.get_highest_known_offset()); + auto st = co_await details::read_bootstrap_state( + _log, + model::next_offset(_configuration_manager.get_highest_known_offset()), + _as); + + const auto lstats = _log.offsets(); + + vlog( + _ctxlog.info, + "Current log offsets: {}, read bootstrap state: {}", + lstats, + st); + + // if log term is newer than the one coming from voted_for + // state, we reset voted_for state + if (lstats.dirty_offset_term > _term) { + _term = lstats.dirty_offset_term; + _voted_for = {}; + } + /** + * since we are starting, there were no new writes to the log + * before that point. It is safe to use dirty offset as a + * initial flushed offset since it is equal to last offset that + * exists on disk and was read in log recovery process. + */ + _flushed_offset = lstats.dirty_offset; + /** + * The configuration manager state may be divereged from the log + * state, as log is flushed lazily, we have to make sure that + * the log and configuration manager has exactly the same + * offsets range + */ + vlog( + _ctxlog.info, "Truncating configurations at {}", lstats.dirty_offset); + + co_await _configuration_manager.truncate( + model::next_offset(lstats.dirty_offset)); + + /** + * We read some batches from the log and have to update the + * configuration manager. + */ + if (st.config_batches_seen() > 0) { + co_await _configuration_manager.add( + std::move(st).release_configurations()); + } + + update_follower_stats(_configuration_manager.get_latest()); + + co_await _offset_translator.sync_with_log(_log, _as); + + /** + * fix for incorrectly persisted configuration index. In + * previous version of redpanda due to the issue with + * incorrectly assigned raft configuration indicies + * (https://github.com/redpanda-data/redpanda/issues/2326) there + * may be a persistent corruption in offset translation caused + * by incorrectly persited configuration index. It may cause log + * offset to be negative. Here we check if this problem exists + * and if so apply necessary offset translation. + */ + const auto so = start_offset(); + const auto delta = _configuration_manager.offset_delta(start_offset()); + // no prefix truncation was applied we do not need adjustment + if (so > model::offset(0) || so < delta) { + // if start offset is smaller than offset delta we need to apply + // adjustment + const configuration_manager::configuration_idx new_idx(so()); + vlog( + _ctxlog.info, + "adjusting configuration index, current start offset: {}, " + "delta: {}, new initial index: {}", + so, + delta, + new_idx); + co_await _configuration_manager.adjust_configuration_idx(new_idx); + } + + auto next_election = clock_type::now(); + // set last heartbeat timestamp to prevent skipping first + // election + _hbeat = clock_type::time_point::min(); + auto conf = _configuration_manager.get_latest().current_config(); + if (!conf.voters.empty() && _self == conf.voters.front()) { + // Arm immediate election for single node scenarios + // or for the very first start of the preferred leader + // in a multi-node group. Otherwise use standard election + // timeout. + if (conf.voters.size() > 1 && _term > model::term_id{0}) { + next_election += _jit.next_duration(); + } + } else { + // current node is not a preselected leader, add 2x jitter + // to give opportunity to the preselected leader to win + // the first round + next_election += _jit.base_duration() + + 2 * _jit.next_jitter_duration(); + } + if (!_bg.is_closed()) { + _vote_timeout.rearm(next_election); + } + + auto last_applied = read_last_applied(); + if (last_applied > _commit_index) { + _commit_index = last_applied; + maybe_update_last_visible_index(_commit_index); + vlog(_ctxlog.trace, "Recovered commit_index: {}", _commit_index); + } + + co_await _event_manager.start(); + _append_requests_buffer.start(); + + vlog( + _ctxlog.info, + "started raft, log offsets: {}, term: {}, configuration: {}", + lstats, + _term, + _configuration_manager.get_latest()); + + } catch (ss::broken_semaphore&) { + } } ss::future<>