diff --git a/src/v/raft/group_manager.cc b/src/v/raft/group_manager.cc index 290cac0f659ee..d0cddb723d610 100644 --- a/src/v/raft/group_manager.cc +++ b/src/v/raft/group_manager.cc @@ -143,19 +143,18 @@ ss::future> group_manager::create_group( _feature_table, _is_ready ? std::nullopt : std::make_optional(min_voter_priority), keep_snapshotted_log); - return _groups_mutex.with([this, raft = std::move(raft)] { - return ss::with_gate(_gate, [this, raft] { - return _heartbeats.register_group(raft).then([this, raft] { - if (_is_ready) { - // Check _is_ready flag again to guard against the case when - // set_ready() was called after we created this consensus - // instance but before we insert it into the _groups - // collection. - raft->reset_node_priority(); - } - _groups.push_back(raft); - return raft; - }); + + return ss::with_gate(_gate, [this, raft] { + return _heartbeats.register_group(raft).then([this, raft] { + if (_is_ready) { + // Check _is_ready flag again to guard against the case when + // set_ready() was called after we created this consensus + // instance but before we insert it into the _groups + // collection. + raft->reset_node_priority(); + } + _groups.push_back(raft); + return raft; }); }); } @@ -190,30 +189,30 @@ raft::group_configuration group_manager::create_initial_configuration( } ss::future<> group_manager::remove(ss::lw_shared_ptr c) { - return _groups_mutex.with([this, c = std::move(c)] { - return c->stop() - .discard_result() - .then([c] { return c->remove_persistent_state(); }) - .then([this, id = c->group()] { - return _heartbeats.deregister_group(id); - }) - .finally([this, c] { - _groups.erase( - std::remove(_groups.begin(), _groups.end(), c), _groups.end()); - }); - }); + return do_shutdown(std::move(c), true).discard_result(); } ss::future group_manager::shutdown(ss::lw_shared_ptr c) { - auto units = co_await _groups_mutex.get_units(); - - auto xst_state = co_await c->stop(); - co_await _heartbeats.deregister_group(c->group()); - _groups.erase( - std::remove(_groups.begin(), _groups.end(), c), _groups.end()); + return do_shutdown(std::move(c), false); +} - co_return xst_state; +ss::future group_manager::do_shutdown( + ss::lw_shared_ptr c, bool remove_persistent_state) { + const auto group_id = c->group(); + auto transfer_state = co_await c->stop(); + if (remove_persistent_state) { + co_await c->remove_persistent_state(); + } + co_await _heartbeats.deregister_group(group_id); + auto it = std::find(_groups.begin(), _groups.end(), c); + vassert( + it != _groups.end(), + "A consensus instance with group id: {} that is requested to be removed " + "must be managed by the manager", + group_id); + _groups.erase(it); + co_return transfer_state; } void group_manager::trigger_leadership_notification( diff --git a/src/v/raft/group_manager.h b/src/v/raft/group_manager.h index cdb3e19af4793..3c2d113b12694 100644 --- a/src/v/raft/group_manager.h +++ b/src/v/raft/group_manager.h @@ -103,9 +103,11 @@ class group_manager { void trigger_config_update_notification(); void collect_learner_metrics(); + ss::future + do_shutdown(ss::lw_shared_ptr, bool remove_persistent_state); + raft::group_configuration create_initial_configuration( std::vector, model::revision_id) const; - mutex _groups_mutex{"group_manager"}; model::node_id _self; ss::scheduling_group _raft_sg; raft::consensus_client_protocol _client;