Skip to content

Commit

Permalink
r/consensus: refactored start method to be a coroutine
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Jul 6, 2023
1 parent 4daab41 commit baabe8f
Showing 1 changed file with 176 additions and 202 deletions.
378 changes: 176 additions & 202 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1282,216 +1282,190 @@ ss::future<> consensus::start() {
}

ss::future<> consensus::do_start() {
vlog(_ctxlog.info, "Starting");
return _op_lock
.with([this] {
read_voted_for();

/*
* temporary workaround:
*
* if the group's ntp matches the pattern, then do not load the
* initial configuration snapshto from the keyvalue store. more info
* here:
*
* https://github.com/redpanda-data/redpanda/issues/1870
*/
const auto& ntp = _log.config().ntp();
const auto normalized_ntp = fmt::format(
"{}.{}.{}", ntp.ns(), ntp.tp.topic(), ntp.tp.partition());
const auto& patterns = config::shard_local_cfg()
.full_raft_configuration_recovery_pattern();
auto initial_state = std::any_of(
patterns.cbegin(),
patterns.cend(),
[&normalized_ntp](const ss::sstring& pattern) {
return pattern == "*" || normalized_ntp.starts_with(pattern);
});
if (!initial_state) {
initial_state = is_initial_state();
}
try {
auto u = co_await _op_lock.get_units();

vlog(
_ctxlog.info,
"Starting with voted_for {} term {} initial_state {}",
_voted_for,
_term,
initial_state);
read_voted_for();

return _configuration_manager.start(initial_state, _self.revision())
.then([this] {
vlog(
_ctxlog.trace,
"Configuration manager started: {}",
_configuration_manager);
})
.then([this, initial_state] {
offset_translator::must_reset must_reset{initial_state};

absl::btree_map<model::offset, int64_t> offset2delta;
for (auto it = _configuration_manager.begin();
it != _configuration_manager.end();
++it) {
offset2delta.emplace(it->first, it->second.idx());
}
/*
* temporary workaround:
*
* if the group's ntp matches the pattern, then do not load the
* initial configuration snapshot from the keyvalue store. more info
* here:
*
* https://github.com/redpanda-data/redpanda/issues/1870
*/
const auto& ntp = _log.config().ntp();
const auto normalized_ntp = fmt::format(
"{}.{}.{}", ntp.ns(), ntp.tp.topic(), ntp.tp.partition());
const auto& patterns = config::shard_local_cfg()
.full_raft_configuration_recovery_pattern();
auto initial_state = std::any_of(
patterns.cbegin(),
patterns.cend(),
[&normalized_ntp](const ss::sstring& pattern) {
return pattern == "*" || normalized_ntp.starts_with(pattern);
});
if (!initial_state) {
initial_state = is_initial_state();
}

auto bootstrap = offset_translator::bootstrap_state{
.offset2delta = std::move(offset2delta),
.highest_known_offset
= _configuration_manager.get_highest_known_offset(),
};
vlog(
_ctxlog.info,
"Starting with voted_for {} term {} initial_state {}",
_voted_for,
_term,
initial_state);

return _offset_translator.start(
must_reset, std::move(bootstrap));
})
.then([this] {
return _snapshot_lock.with(
[this] { return hydrate_snapshot(); });
})
.then([this] {
vlog(
_ctxlog.debug,
"Starting raft bootstrap from {}",
_configuration_manager.get_highest_known_offset());
return details::read_bootstrap_state(
_log,
model::next_offset(
_configuration_manager.get_highest_known_offset()),
_as);
})
.then([this](configuration_bootstrap_state st) {
auto lstats = _log.offsets();
co_await _configuration_manager.start(initial_state, _self.revision());

vlog(_ctxlog.info, "Read bootstrap state: {}", st);
vlog(_ctxlog.info, "Current log offsets: {}", lstats);
vlog(
_ctxlog.trace,
"Configuration manager started: {}",
_configuration_manager);
offset_translator::must_reset must_reset{initial_state};

// if log term is newer than the one comming from voted_for
// state, we reset voted_for state
if (lstats.dirty_offset_term > _term) {
_term = lstats.dirty_offset_term;
_voted_for = {};
}
/**
* since we are starting, there were no new writes to the log
* before that point. It is safe to use dirty offset as a
* initial flushed offset since it is equal to last offset that
* exists on disk and was read in log recovery process.
*/
_flushed_offset = lstats.dirty_offset;
/**
* The configuration manager state may be divereged from the log
* state, as log is flushed lazily, we have to make sure that
* the log and configuration manager has exactly the same
* offsets range
*/
vlog(
_ctxlog.info,
"Truncating configurations at {}",
lstats.dirty_offset);

auto f = _configuration_manager.truncate(
model::next_offset(lstats.dirty_offset));

/**
* We read some batches from the log and have to update the
* configuration manager.
*/
if (st.config_batches_seen() > 0) {
f = f.then([this, st = std::move(st)]() mutable {
return _configuration_manager.add(
std::move(st).release_configurations());
});
}
absl::btree_map<model::offset, int64_t> offset2delta;
for (const auto& it : _configuration_manager) {
offset2delta.emplace(it.first, it.second.idx());
}

return f.then([this] {
update_follower_stats(_configuration_manager.get_latest());
});
})
.then(
[this] { return _offset_translator.sync_with_log(_log, _as); })
.then([this] {
/**
* fix for incorrectly persisted configuration index. In
* previous version of redpanda due to the issue with
* incorrectly assigned raft configuration indicies
* (https://github.com/redpanda-data/redpanda/issues/2326) there
* may be a persistent corruption in offset translation caused
* by incorrectly persited configuration index. It may cause log
* offset to be negative. Here we check if this problem exists
* and if so apply necessary offset translation.
*/
const auto so = start_offset();
// no prefix truncation was applied we do not need adjustment
if (so <= model::offset(0)) {
return ss::now();
}
auto delta = _configuration_manager.offset_delta(
start_offset());
auto bootstrap = offset_translator::bootstrap_state{
.offset2delta = std::move(offset2delta),
.highest_known_offset
= _configuration_manager.get_highest_known_offset(),
};

if (so >= delta) {
return ss::now();
}
// if start offset is smaller than offset delta we need to apply
// adjustment
const configuration_manager::configuration_idx new_idx(so());
vlog(
_ctxlog.info,
"adjusting configuration index, current start offset: {}, "
"delta: {}, new initial index: {}",
so,
delta,
new_idx);
return _configuration_manager.adjust_configuration_idx(new_idx);
})
.then([this] {
auto next_election = clock_type::now();
// set last heartbeat timestamp to prevent skipping first
// election
_hbeat = clock_type::time_point::min();
auto conf
= _configuration_manager.get_latest().current_config();
if (!conf.voters.empty() && _self == conf.voters.front()) {
// Arm immediate election for single node scenarios
// or for the very first start of the preferred leader
// in a multi-node group. Otherwise use standard election
// timeout.
if (conf.voters.size() > 1 && _term > model::term_id{0}) {
next_election += _jit.next_duration();
}
} else {
// current node is not a preselected leader, add 2x jitter
// to give opportunity to the preselected leader to win
// the first round
next_election += _jit.base_duration()
+ 2 * _jit.next_jitter_duration();
}
if (!_bg.is_closed()) {
_vote_timeout.rearm(next_election);
}
})
.then([this] {
auto last_applied = read_last_applied();
if (last_applied > _commit_index) {
_commit_index = last_applied;
maybe_update_last_visible_index(_commit_index);
vlog(
_ctxlog.trace,
"Recovered commit_index: {}",
_commit_index);
}
})
.then([this] { return _event_manager.start(); })
.then([this] { _append_requests_buffer.start(); })
.then([this] {
vlog(
_ctxlog.info,
"started raft, log offsets: {}, term: {}, configuration: {}",
_log.offsets(),
_term,
_configuration_manager.get_latest());
});
})
.handle_exception_type([](const ss::broken_semaphore&) {});
co_await _offset_translator.start(must_reset, std::move(bootstrap));

co_await _snapshot_lock.with([this] { return hydrate_snapshot(); });

vlog(
_ctxlog.debug,
"Starting raft bootstrap from {}",
_configuration_manager.get_highest_known_offset());
auto st = co_await details::read_bootstrap_state(
_log,
model::next_offset(_configuration_manager.get_highest_known_offset()),
_as);

const auto lstats = _log.offsets();

vlog(
_ctxlog.info,
"Current log offsets: {}, read bootstrap state: {}",
lstats,
st);

// if log term is newer than the one coming from voted_for
// state, we reset voted_for state
if (lstats.dirty_offset_term > _term) {
_term = lstats.dirty_offset_term;
_voted_for = {};
}
/**
* since we are starting, there were no new writes to the log
* before that point. It is safe to use dirty offset as a
* initial flushed offset since it is equal to last offset that
* exists on disk and was read in log recovery process.
*/
_flushed_offset = lstats.dirty_offset;
/**
* The configuration manager state may be divereged from the log
* state, as log is flushed lazily, we have to make sure that
* the log and configuration manager has exactly the same
* offsets range
*/
vlog(
_ctxlog.info, "Truncating configurations at {}", lstats.dirty_offset);

co_await _configuration_manager.truncate(
model::next_offset(lstats.dirty_offset));

/**
* We read some batches from the log and have to update the
* configuration manager.
*/
if (st.config_batches_seen() > 0) {
co_await _configuration_manager.add(
std::move(st).release_configurations());
}

update_follower_stats(_configuration_manager.get_latest());

co_await _offset_translator.sync_with_log(_log, _as);

/**
* fix for incorrectly persisted configuration index. In
* previous version of redpanda due to the issue with
* incorrectly assigned raft configuration indicies
* (https://github.com/redpanda-data/redpanda/issues/2326) there
* may be a persistent corruption in offset translation caused
* by incorrectly persited configuration index. It may cause log
* offset to be negative. Here we check if this problem exists
* and if so apply necessary offset translation.
*/
const auto so = start_offset();
const auto delta = _configuration_manager.offset_delta(start_offset());
// no prefix truncation was applied we do not need adjustment
if (so > model::offset(0) || so < delta) {
// if start offset is smaller than offset delta we need to apply
// adjustment
const configuration_manager::configuration_idx new_idx(so());
vlog(
_ctxlog.info,
"adjusting configuration index, current start offset: {}, "
"delta: {}, new initial index: {}",
so,
delta,
new_idx);
co_await _configuration_manager.adjust_configuration_idx(new_idx);
}

auto next_election = clock_type::now();
// set last heartbeat timestamp to prevent skipping first
// election
_hbeat = clock_type::time_point::min();
auto conf = _configuration_manager.get_latest().current_config();
if (!conf.voters.empty() && _self == conf.voters.front()) {
// Arm immediate election for single node scenarios
// or for the very first start of the preferred leader
// in a multi-node group. Otherwise use standard election
// timeout.
if (conf.voters.size() > 1 && _term > model::term_id{0}) {
next_election += _jit.next_duration();
}
} else {
// current node is not a preselected leader, add 2x jitter
// to give opportunity to the preselected leader to win
// the first round
next_election += _jit.base_duration()
+ 2 * _jit.next_jitter_duration();
}
if (!_bg.is_closed()) {
_vote_timeout.rearm(next_election);
}

auto last_applied = read_last_applied();
if (last_applied > _commit_index) {
_commit_index = last_applied;
maybe_update_last_visible_index(_commit_index);
vlog(_ctxlog.trace, "Recovered commit_index: {}", _commit_index);
}

co_await _event_manager.start();
_append_requests_buffer.start();

vlog(
_ctxlog.info,
"started raft, log offsets: {}, term: {}, configuration: {}",
lstats,
_term,
_configuration_manager.get_latest());

} catch (ss::broken_semaphore&) {
}
}

ss::future<>
Expand Down

0 comments on commit baabe8f

Please sign in to comment.