Skip to content

Commit

Permalink
r/group_manager: do not hold mutex when shutting down consensus
Browse files Browse the repository at this point in the history
Individual raft groups shutdown operations are not correlated and do not
need coordination with mutex. The `group_manager::_groups` vector is
always updated in synchronous parts of code therefore it do not require
the mutex either.

Removed the mutex to make shutting down different Raft groups
independent from each other.

Fixes CORE-5686

Signed-off-by: Michał Maślanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Jul 22, 2024
1 parent 5f457ef commit e0236f6
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 33 deletions.
63 changes: 31 additions & 32 deletions src/v/raft/group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,18 @@ ss::future<ss::lw_shared_ptr<raft::consensus>> group_manager::create_group(
_feature_table,
_is_ready ? std::nullopt : std::make_optional(min_voter_priority),
keep_snapshotted_log);
return _groups_mutex.with([this, raft = std::move(raft)] {
return ss::with_gate(_gate, [this, raft] {
return _heartbeats.register_group(raft).then([this, raft] {
if (_is_ready) {
// Check _is_ready flag again to guard against the case when
// set_ready() was called after we created this consensus
// instance but before we insert it into the _groups
// collection.
raft->reset_node_priority();
}
_groups.push_back(raft);
return raft;
});

return ss::with_gate(_gate, [this, raft] {
return _heartbeats.register_group(raft).then([this, raft] {
if (_is_ready) {
// Check _is_ready flag again to guard against the case when
// set_ready() was called after we created this consensus
// instance but before we insert it into the _groups
// collection.
raft->reset_node_priority();
}
_groups.push_back(raft);
return raft;
});
});
}
Expand Down Expand Up @@ -190,30 +189,30 @@ raft::group_configuration group_manager::create_initial_configuration(
}

ss::future<> group_manager::remove(ss::lw_shared_ptr<raft::consensus> c) {
return _groups_mutex.with([this, c = std::move(c)] {
return c->stop()
.discard_result()
.then([c] { return c->remove_persistent_state(); })
.then([this, id = c->group()] {
return _heartbeats.deregister_group(id);
})
.finally([this, c] {
_groups.erase(
std::remove(_groups.begin(), _groups.end(), c), _groups.end());
});
});
return do_shutdown(std::move(c), true).discard_result();
}

ss::future<xshard_transfer_state>
group_manager::shutdown(ss::lw_shared_ptr<raft::consensus> c) {
auto units = co_await _groups_mutex.get_units();

auto xst_state = co_await c->stop();
co_await _heartbeats.deregister_group(c->group());
_groups.erase(
std::remove(_groups.begin(), _groups.end(), c), _groups.end());
return do_shutdown(std::move(c), false);
}

co_return xst_state;
ss::future<xshard_transfer_state> group_manager::do_shutdown(
ss::lw_shared_ptr<raft::consensus> c, bool remove_persistent_state) {
const auto group_id = c->group();
auto transfer_state = co_await c->stop();
if (remove_persistent_state) {
co_await c->remove_persistent_state();
}
co_await _heartbeats.deregister_group(group_id);
auto it = std::find(_groups.begin(), _groups.end(), c);
vassert(
it != _groups.end(),
"A consensus instance with group id: {} that is requested to be removed "
"must be managed by the manager",
group_id);
_groups.erase(it);
co_return transfer_state;
}

void group_manager::trigger_leadership_notification(
Expand Down
4 changes: 3 additions & 1 deletion src/v/raft/group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,11 @@ class group_manager {
void trigger_config_update_notification();
void collect_learner_metrics();

ss::future<xshard_transfer_state>
do_shutdown(ss::lw_shared_ptr<consensus>, bool remove_persistent_state);

raft::group_configuration create_initial_configuration(
std::vector<model::broker>, model::revision_id) const;
mutex _groups_mutex{"group_manager"};
model::node_id _self;
ss::scheduling_group _raft_sg;
raft::consensus_client_protocol _client;
Expand Down

0 comments on commit e0236f6

Please sign in to comment.