Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: Implement force partition reconfiguration API #9785

Merged
merged 17 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/v/cluster/cluster_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ inline bool has_non_replicable_op_type(const topic_table_delta& d) {
case op_t::del:
case op_t::reset:
case op_t::update:
case op_t::force_update:
case op_t::update_finished:
case op_t::update_properties:
case op_t::cancel_update:
Expand Down Expand Up @@ -314,6 +315,18 @@ inline std::vector<model::broker_shard> union_replica_sets(
return ret;
}

// Checks if lhs is a proper subset of rhs
inline bool is_proper_subset(
const std::vector<model::broker_shard>& lhs,
const std::vector<model::broker_shard>& rhs) {
auto contains_all = std::all_of(
lhs.begin(), lhs.end(), [&rhs](const auto& current) {
return std::find(rhs.begin(), rhs.end(), current) != rhs.end();
});

return contains_all && rhs.size() > lhs.size();
}

/**
* Subtracts second replica set from the first one. Result contains only brokers
* that node_ids are present in the first list but not the other one
Expand Down
8 changes: 8 additions & 0 deletions src/v/cluster/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ static constexpr int8_t cancel_moving_partition_replicas_cmd_type = 7;
static constexpr int8_t move_topic_replicas_cmd_type = 8;
static constexpr int8_t revert_cancel_partition_move_cmd_type = 9;
static constexpr int8_t topic_lifecycle_transition_cmd_type = 10;
static constexpr int8_t force_partition_reconfiguration_type = 11;

static constexpr int8_t create_user_cmd_type = 5;
static constexpr int8_t delete_user_cmd_type = 6;
Expand Down Expand Up @@ -202,6 +203,13 @@ using revert_cancel_partition_move_cmd = controller_command<
model::record_batch_type::topic_management_cmd,
serde_opts::serde_only>;

using force_partition_reconfiguration_cmd = controller_command<
model::ntp,
force_partition_reconfiguration_cmd_data,
force_partition_reconfiguration_type,
model::record_batch_type::topic_management_cmd,
serde_opts::serde_only>;

using create_user_cmd = controller_command<
security::credential_user,
security::scram_credential,
Expand Down
47 changes: 45 additions & 2 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ bool is_interrupting_operation(
* Find interrupting operation following the one that is currently
* processed. Following rules apply:
*
* - all operations i.e. update, cancel_update, and force abort must be
* interrupted by deletion
* - all operations i.e. update, force_update, cancel_update, and force
* abort must be interrupted by deletion.
*
* - update & cancel update operations may be interrupted by
* force_abort_update operation
Expand Down Expand Up @@ -570,6 +570,7 @@ controller_backend::deltas_t calculate_bootstrap_deltas(
case op_t::reset:
return has_local_replicas(self, delta.new_assignment.replicas);
case op_t::update:
case op_t::force_update:
case op_t::cancel_update:
case op_t::force_abort_update:
vassert(
Expand Down Expand Up @@ -648,6 +649,38 @@ ss::future<> controller_backend::fetch_deltas() {
});
}

ss::future<std::error_code> controller_backend::force_replica_set_update(
const model::ntp& ntp,
const std::vector<model::broker_shard>& replicas,
const replicas_revision_map& replica_revisions,
model::revision_id rev) {
if (!has_local_replicas(_self, replicas)) {
// This node will no longer be a part of the raft group,
// will be cleaned up as a part of update_finished command.
co_return errc::success;
}

auto partition = _partition_manager.local().get(ntp);
if (!partition) {
co_return errc::partition_not_exists;
}

const auto current_cfg = partition->group_configuration();

// wait for configuration update, only declare success
// when configuration was actually updated
auto update_ec = check_configuration_update(
_self, partition, replicas, rev);

if (!update_ec) {
co_return errc::success;
}

// Configuration revision is lower, force update locally.
co_return co_await partition->force_update_replica_set(
create_vnode_set(replicas, replica_revisions), rev);
}

/**
* Topic files is qualified as orphan if we don't have it in topic table
* or it's revision is less than revision it topic table
Expand Down Expand Up @@ -996,6 +1029,7 @@ controller_backend::execute_partition_op(const delta_metadata& metadata) {
*delta.replica_revisions,
cmd_rev);
case op_t::update:
case op_t::force_update:
case op_t::force_abort_update:
case op_t::cancel_update:
vassert(
Expand Down Expand Up @@ -1092,6 +1126,7 @@ controller_backend::process_partition_reconfiguration(
target_assignment.replicas);
co_return std::error_code(errc::success);
}

/**
* Check if target assignment has node and core local replicas
*/
Expand Down Expand Up @@ -1308,6 +1343,11 @@ bool controller_backend::can_finish_update(
if (update_type == topic_table_delta::op_type::force_abort_update) {
return true;
}

if (update_type == topic_table_delta::op_type::force_update) {
// Wait for the leader to be elected in the new replica set.
return current_leader == _self;
}
/**
* If the revert feature is active we use current leader to dispatch
* partition move
Expand Down Expand Up @@ -1453,6 +1493,9 @@ ss::future<std::error_code> controller_backend::execute_reconfiguration(
case topic_table_delta::op_type::update:
co_return co_await update_partition_replica_set(
ntp, replica_set, replica_revisions, revision);
case topic_table_delta::op_type::force_update:
co_return co_await force_replica_set_update(
ntp, replica_set, replica_revisions, revision);
case topic_table_delta::op_type::cancel_update:
co_return co_await cancel_replica_set_update(
ntp, replica_set, replica_revisions, previous_replica_set, revision);
Expand Down
6 changes: 6 additions & 0 deletions src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,12 @@ class controller_backend
const std::vector<model::broker_shard>&,
model::revision_id);

ss::future<std::error_code> force_replica_set_update(
const model::ntp&,
const std::vector<model::broker_shard>& /*new replicas*/,
const replicas_revision_map&,
model::revision_id);

ss::future<std::error_code>
dispatch_update_finished(model::ntp, partition_assignment);

Expand Down
6 changes: 6 additions & 0 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ class partition {
return _raft->replace_configuration(std::move(nodes), new_revision_id);
}

ss::future<std::error_code> force_update_replica_set(
std::vector<raft::vnode> nodes, model::revision_id new_revision_id) {
return _raft->force_replace_configuration_locally(
std::move(nodes), new_revision_id);
}

raft::group_configuration group_configuration() const {
return _raft->config();
}
Expand Down
71 changes: 64 additions & 7 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ topic_table::apply(move_partition_replicas_cmd cmd, model::offset o) {
}

change_partition_replicas(
cmd.key, cmd.value, tp->second, *current_assignment_it, o);
cmd.key, cmd.value, tp->second, *current_assignment_it, o, false);
notify_waiters();

return ss::make_ready_future<std::error_code>(errc::success);
Expand Down Expand Up @@ -429,6 +429,7 @@ topic_table::apply(cancel_moving_partition_replicas_cmd cmd, model::offset o) {
co_return errc::no_update_in_progress;
}
break;
case reconfiguration_state::force_update:
case reconfiguration_state::force_cancelled:
// partition reconfiguration already cancelled forcibly
co_return errc::no_update_in_progress;
Expand Down Expand Up @@ -653,14 +654,48 @@ topic_table::apply(move_topic_replicas_cmd cmd, model::offset o) {
new_replicas,
tp->second,
*assignment,
o);
o,
false);
}

notify_waiters();

co_return errc::success;
}

ss::future<std::error_code>
topic_table::apply(force_partition_reconfiguration_cmd cmd, model::offset o) {
bharathv marked this conversation as resolved.
Show resolved Hide resolved
_last_applied_revision_id = model::revision_id(o);
// Check the topic exists.
auto tp = _topics.find(model::topic_namespace_view(cmd.key));
if (tp == _topics.end()) {
return ss::make_ready_future<std::error_code>(errc::topic_not_exists);
}
if (!tp->second.is_topic_replicable()) {
return ss::make_ready_future<std::error_code>(
errc::topic_operation_error);
}

auto current_assignment_it = tp->second.get_assignments().find(
cmd.key.tp.partition);

if (current_assignment_it == tp->second.get_assignments().end()) {
return ss::make_ready_future<std::error_code>(
errc::partition_not_exists);
}

if (auto it = _updates_in_progress.find(cmd.key);
it != _updates_in_progress.end()) {
return ss::make_ready_future<std::error_code>(errc::update_in_progress);
}

change_partition_replicas(
cmd.key, cmd.value.replicas, tp->second, *current_assignment_it, o, true);
notify_waiters();

return ss::make_ready_future<std::error_code>(errc::success);
}

template<typename T>
void incremental_update(
std::optional<T>& property, property_update<std::optional<T>> override) {
Expand Down Expand Up @@ -1092,13 +1127,20 @@ class topic_table::snapshot_applier {
update_it != topic.updates.end()) {
const auto& update = update_it->second;

if (update.state == reconfiguration_state::in_progress) {
if (
update.state == reconfiguration_state::in_progress
|| update.state == reconfiguration_state::force_update) {
cur_assignment.replicas = update_it->second.target_assignment;
}

auto initial_state = update.state
== reconfiguration_state::force_update
? update.state
: reconfiguration_state::in_progress;
in_progress_update inp_update{
partition.replicas,
update.target_assignment,
initial_state,
update.revision,
_probe,
};
Expand Down Expand Up @@ -1126,14 +1168,18 @@ class topic_table::snapshot_applier {
// cancellation because if later the "cancel revert" event
// happens, controller_backend has to execute the update
// delta to make progress.
auto op_type = update.state
== reconfiguration_state::force_update
? delta::op_type::force_update
: delta::op_type::update;
_pending_deltas.emplace_back(
ntp,
partition_assignment(
partition.group,
p_id,
update_it->second.target_assignment),
model::offset{update.revision},
delta::op_type::update,
op_type,
partition.replicas,
update_replicas_revisions(
partition.replicas_revisions,
Expand All @@ -1153,6 +1199,7 @@ class topic_table::snapshot_applier {

switch (update.state) {
case reconfiguration_state::in_progress:
case reconfiguration_state::force_update:
break;
case reconfiguration_state::cancelled:
add_cancel_delta(topic_table_delta::op_type::cancel_update);
Expand Down Expand Up @@ -1595,7 +1642,8 @@ void topic_table::change_partition_replicas(
const std::vector<model::broker_shard>& new_assignment,
topic_metadata_item& metadata,
partition_assignment& current_assignment,
model::offset o) {
model::offset o,
bool is_forced) {
ztlpn marked this conversation as resolved.
Show resolved Hide resolved
if (are_replica_sets_equal(current_assignment.replicas, new_assignment)) {
return;
}
Expand All @@ -1605,7 +1653,12 @@ void topic_table::change_partition_replicas(
_updates_in_progress.emplace(
ntp,
in_progress_update(
current_assignment.replicas, new_assignment, update_revision, _probe));
current_assignment.replicas,
new_assignment,
is_forced ? reconfiguration_state::force_update
: reconfiguration_state::in_progress,
update_revision,
_probe));
auto previous_assignment = current_assignment.replicas;
// replace partition replica set
current_assignment.replicas = new_assignment;
Expand All @@ -1620,6 +1673,7 @@ void topic_table::change_partition_replicas(
in_progress_update(
current_assignment.replicas,
new_assignment,
reconfiguration_state::in_progress,
update_revision,
_probe));
vassert(
Expand Down Expand Up @@ -1655,11 +1709,14 @@ void topic_table::change_partition_replicas(
"partition {} must exist in the partition map",
ntp);

delta::op_type move_type = is_forced ? delta::op_type::force_update
: delta::op_type::update;

_pending_deltas.emplace_back(
std::move(ntp),
current_assignment,
o,
delta::op_type::update,
move_type,
std::move(previous_assignment),
update_replicas_revisions(
partition_it->second.replicas_revisions,
Expand Down
8 changes: 6 additions & 2 deletions src/v/cluster/topic_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,12 @@ class topic_table {
explicit in_progress_update(
std::vector<model::broker_shard> previous_replicas,
std::vector<model::broker_shard> target_replicas,
reconfiguration_state state,
model::revision_id update_revision,
topic_table_probe& probe)
: _previous_replicas(std::move(previous_replicas))
, _target_replicas(std::move(target_replicas))
, _state(reconfiguration_state::in_progress)
, _state(state)
, _update_revision(update_revision)
, _last_cmd_revision(update_revision)
, _probe(probe) {
Expand Down Expand Up @@ -288,6 +289,8 @@ class topic_table {
ss::future<std::error_code> apply(move_topic_replicas_cmd, model::offset);
ss::future<std::error_code>
apply(revert_cancel_partition_move_cmd, model::offset);
ss::future<std::error_code>
apply(force_partition_reconfiguration_cmd, model::offset);

ss::future<> fill_snapshot(controller_snapshot&) const;
ss::future<>
Expand Down Expand Up @@ -480,7 +483,8 @@ class topic_table {
const std::vector<model::broker_shard>& new_assignment,
topic_metadata_item& metadata,
partition_assignment& current_assignment,
model::offset o);
model::offset o,
bool is_forced);

class snapshot_applier;

Expand Down
7 changes: 7 additions & 0 deletions src/v/cluster/topic_updates_dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,13 @@ ss::future<std::error_code> topic_updates_dispatcher::apply(
});
}

ss::future<std::error_code> topic_updates_dispatcher::apply(
force_partition_reconfiguration_cmd cmd, model::offset base_offset) {
// Post dispatch, allocator updates are skipped because the target
// replica set is a subset of the original replica set.
return dispatch_updates_to_cores(std::move(cmd), base_offset);
ztlpn marked this conversation as resolved.
Show resolved Hide resolved
}

topic_updates_dispatcher::in_progress_map
topic_updates_dispatcher::collect_in_progress(
const model::topic_namespace& tp_ns,
Expand Down
Loading