Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.2.x] CORE-5686 Decouple different raft group shutdown sequences #21584

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/v/cluster/partition_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ partition_manager::remove(const model::ntp& ntp, partition_removal_mode mode) {
"manager",
ntp));
}
vlog(clusterlog.debug, "removing partition {}", ntp);
partition_shutdown_state shutdown_state(partition);
_partitions_shutting_down.push_back(shutdown_state);
auto group_id = partition->group();
Expand Down
63 changes: 31 additions & 32 deletions src/v/raft/group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,18 @@ ss::future<ss::lw_shared_ptr<raft::consensus>> group_manager::create_group(
_feature_table,
_is_ready ? std::nullopt : std::make_optional(min_voter_priority),
keep_snapshotted_log);
return _groups_mutex.with([this, raft = std::move(raft)] {
return ss::with_gate(_gate, [this, raft] {
return _heartbeats.register_group(raft).then([this, raft] {
if (_is_ready) {
// Check _is_ready flag again to guard against the case when
// set_ready() was called after we created this consensus
// instance but before we insert it into the _groups
// collection.
raft->reset_node_priority();
}
_groups.push_back(raft);
return raft;
});

return ss::with_gate(_gate, [this, raft] {
return _heartbeats.register_group(raft).then([this, raft] {
if (_is_ready) {
// Check _is_ready flag again to guard against the case when
// set_ready() was called after we created this consensus
// instance but before we insert it into the _groups
// collection.
raft->reset_node_priority();
}
_groups.push_back(raft);
return raft;
});
});
}
Expand Down Expand Up @@ -190,30 +189,30 @@ raft::group_configuration group_manager::create_initial_configuration(
}

ss::future<> group_manager::remove(ss::lw_shared_ptr<raft::consensus> c) {
return _groups_mutex.with([this, c = std::move(c)] {
return c->stop()
.discard_result()
.then([c] { return c->remove_persistent_state(); })
.then([this, id = c->group()] {
return _heartbeats.deregister_group(id);
})
.finally([this, c] {
_groups.erase(
std::remove(_groups.begin(), _groups.end(), c), _groups.end());
});
});
return do_shutdown(std::move(c), true).discard_result();
}

ss::future<xshard_transfer_state>
group_manager::shutdown(ss::lw_shared_ptr<raft::consensus> c) {
auto units = co_await _groups_mutex.get_units();

auto xst_state = co_await c->stop();
co_await _heartbeats.deregister_group(c->group());
_groups.erase(
std::remove(_groups.begin(), _groups.end(), c), _groups.end());
return do_shutdown(std::move(c), false);
}

co_return xst_state;
ss::future<xshard_transfer_state> group_manager::do_shutdown(
ss::lw_shared_ptr<raft::consensus> c, bool remove_persistent_state) {
const auto group_id = c->group();
auto transfer_state = co_await c->stop();
if (remove_persistent_state) {
co_await c->remove_persistent_state();
}
co_await _heartbeats.deregister_group(group_id);
auto it = std::find(_groups.begin(), _groups.end(), c);
vassert(
it != _groups.end(),
"A consensus instance with group id: {} that is requested to be removed "
"must be managed by the manager",
group_id);
_groups.erase(it);
co_return transfer_state;
}

void group_manager::trigger_leadership_notification(
Expand Down
4 changes: 3 additions & 1 deletion src/v/raft/group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,11 @@ class group_manager {
void trigger_config_update_notification();
void collect_learner_metrics();

ss::future<xshard_transfer_state>
do_shutdown(ss::lw_shared_ptr<consensus>, bool remove_persistent_state);

raft::group_configuration create_initial_configuration(
std::vector<model::broker>, model::revision_id) const;
mutex _groups_mutex{"group_manager"};
model::node_id _self;
ss::scheduling_group _raft_sg;
raft::consensus_client_protocol _client;
Expand Down
Loading