Skip to content

Commit

Permalink
Merge pull request #9785 from bharathv/eh_force_reconfig
Browse files Browse the repository at this point in the history
raft: Implement force partition reconfiguration API
  • Loading branch information
bharathv authored May 10, 2023
2 parents 30624fa + a88a7c5 commit 2937f26
Show file tree
Hide file tree
Showing 25 changed files with 638 additions and 62 deletions.
13 changes: 13 additions & 0 deletions src/v/cluster/cluster_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ inline bool has_non_replicable_op_type(const topic_table_delta& d) {
case op_t::del:
case op_t::reset:
case op_t::update:
case op_t::force_update:
case op_t::update_finished:
case op_t::update_properties:
case op_t::cancel_update:
Expand Down Expand Up @@ -314,6 +315,18 @@ inline std::vector<model::broker_shard> union_replica_sets(
return ret;
}

// Checks if lhs is a proper subset of rhs
inline bool is_proper_subset(
const std::vector<model::broker_shard>& lhs,
const std::vector<model::broker_shard>& rhs) {
auto contains_all = std::all_of(
lhs.begin(), lhs.end(), [&rhs](const auto& current) {
return std::find(rhs.begin(), rhs.end(), current) != rhs.end();
});

return contains_all && rhs.size() > lhs.size();
}

/**
* Subtracts second replica set from the first one. Result contains only brokers
* that node_ids are present in the first list but not the other one
Expand Down
8 changes: 8 additions & 0 deletions src/v/cluster/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ static constexpr int8_t cancel_moving_partition_replicas_cmd_type = 7;
static constexpr int8_t move_topic_replicas_cmd_type = 8;
static constexpr int8_t revert_cancel_partition_move_cmd_type = 9;
static constexpr int8_t topic_lifecycle_transition_cmd_type = 10;
static constexpr int8_t force_partition_reconfiguration_type = 11;

static constexpr int8_t create_user_cmd_type = 5;
static constexpr int8_t delete_user_cmd_type = 6;
Expand Down Expand Up @@ -202,6 +203,13 @@ using revert_cancel_partition_move_cmd = controller_command<
model::record_batch_type::topic_management_cmd,
serde_opts::serde_only>;

using force_partition_reconfiguration_cmd = controller_command<
model::ntp,
force_partition_reconfiguration_cmd_data,
force_partition_reconfiguration_type,
model::record_batch_type::topic_management_cmd,
serde_opts::serde_only>;

using create_user_cmd = controller_command<
security::credential_user,
security::scram_credential,
Expand Down
47 changes: 45 additions & 2 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ bool is_interrupting_operation(
* Find interrupting operation following the one that is currently
* processed. Following rules apply:
*
* - all operations i.e. update, cancel_update, and force abort must be
* interrupted by deletion
* - all operations i.e. update, force_update, cancel_update, and force
* abort must be interrupted by deletion.
*
* - update & cancel update operations may be interrupted by
* force_abort_update operation
Expand Down Expand Up @@ -570,6 +570,7 @@ controller_backend::deltas_t calculate_bootstrap_deltas(
case op_t::reset:
return has_local_replicas(self, delta.new_assignment.replicas);
case op_t::update:
case op_t::force_update:
case op_t::cancel_update:
case op_t::force_abort_update:
vassert(
Expand Down Expand Up @@ -648,6 +649,38 @@ ss::future<> controller_backend::fetch_deltas() {
});
}

ss::future<std::error_code> controller_backend::force_replica_set_update(
const model::ntp& ntp,
const std::vector<model::broker_shard>& replicas,
const replicas_revision_map& replica_revisions,
model::revision_id rev) {
if (!has_local_replicas(_self, replicas)) {
// This node will no longer be a part of the raft group,
// will be cleaned up as a part of update_finished command.
co_return errc::success;
}

auto partition = _partition_manager.local().get(ntp);
if (!partition) {
co_return errc::partition_not_exists;
}

const auto current_cfg = partition->group_configuration();

// wait for configuration update, only declare success
// when configuration was actually updated
auto update_ec = check_configuration_update(
_self, partition, replicas, rev);

if (!update_ec) {
co_return errc::success;
}

// Configuration revision is lower, force update locally.
co_return co_await partition->force_update_replica_set(
create_vnode_set(replicas, replica_revisions), rev);
}

/**
* Topic files is qualified as orphan if we don't have it in topic table
* or it's revision is less than revision it topic table
Expand Down Expand Up @@ -996,6 +1029,7 @@ controller_backend::execute_partition_op(const delta_metadata& metadata) {
*delta.replica_revisions,
cmd_rev);
case op_t::update:
case op_t::force_update:
case op_t::force_abort_update:
case op_t::cancel_update:
vassert(
Expand Down Expand Up @@ -1092,6 +1126,7 @@ controller_backend::process_partition_reconfiguration(
target_assignment.replicas);
co_return std::error_code(errc::success);
}

/**
* Check if target assignment has node and core local replicas
*/
Expand Down Expand Up @@ -1308,6 +1343,11 @@ bool controller_backend::can_finish_update(
if (update_type == topic_table_delta::op_type::force_abort_update) {
return true;
}

if (update_type == topic_table_delta::op_type::force_update) {
// Wait for the leader to be elected in the new replica set.
return current_leader == _self;
}
/**
* If the revert feature is active we use current leader to dispatch
* partition move
Expand Down Expand Up @@ -1453,6 +1493,9 @@ ss::future<std::error_code> controller_backend::execute_reconfiguration(
case topic_table_delta::op_type::update:
co_return co_await update_partition_replica_set(
ntp, replica_set, replica_revisions, revision);
case topic_table_delta::op_type::force_update:
co_return co_await force_replica_set_update(
ntp, replica_set, replica_revisions, revision);
case topic_table_delta::op_type::cancel_update:
co_return co_await cancel_replica_set_update(
ntp, replica_set, replica_revisions, previous_replica_set, revision);
Expand Down
6 changes: 6 additions & 0 deletions src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,12 @@ class controller_backend
const std::vector<model::broker_shard>&,
model::revision_id);

ss::future<std::error_code> force_replica_set_update(
const model::ntp&,
const std::vector<model::broker_shard>& /*new replicas*/,
const replicas_revision_map&,
model::revision_id);

ss::future<std::error_code>
dispatch_update_finished(model::ntp, partition_assignment);

Expand Down
6 changes: 6 additions & 0 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ class partition {
return _raft->replace_configuration(std::move(nodes), new_revision_id);
}

ss::future<std::error_code> force_update_replica_set(
std::vector<raft::vnode> nodes, model::revision_id new_revision_id) {
return _raft->force_replace_configuration_locally(
std::move(nodes), new_revision_id);
}

raft::group_configuration group_configuration() const {
return _raft->config();
}
Expand Down
71 changes: 64 additions & 7 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ topic_table::apply(move_partition_replicas_cmd cmd, model::offset o) {
}

change_partition_replicas(
cmd.key, cmd.value, tp->second, *current_assignment_it, o);
cmd.key, cmd.value, tp->second, *current_assignment_it, o, false);
notify_waiters();

return ss::make_ready_future<std::error_code>(errc::success);
Expand Down Expand Up @@ -429,6 +429,7 @@ topic_table::apply(cancel_moving_partition_replicas_cmd cmd, model::offset o) {
co_return errc::no_update_in_progress;
}
break;
case reconfiguration_state::force_update:
case reconfiguration_state::force_cancelled:
// partition reconfiguration already cancelled forcibly
co_return errc::no_update_in_progress;
Expand Down Expand Up @@ -653,14 +654,48 @@ topic_table::apply(move_topic_replicas_cmd cmd, model::offset o) {
new_replicas,
tp->second,
*assignment,
o);
o,
false);
}

notify_waiters();

co_return errc::success;
}

ss::future<std::error_code>
topic_table::apply(force_partition_reconfiguration_cmd cmd, model::offset o) {
_last_applied_revision_id = model::revision_id(o);
// Check the topic exists.
auto tp = _topics.find(model::topic_namespace_view(cmd.key));
if (tp == _topics.end()) {
return ss::make_ready_future<std::error_code>(errc::topic_not_exists);
}
if (!tp->second.is_topic_replicable()) {
return ss::make_ready_future<std::error_code>(
errc::topic_operation_error);
}

auto current_assignment_it = tp->second.get_assignments().find(
cmd.key.tp.partition);

if (current_assignment_it == tp->second.get_assignments().end()) {
return ss::make_ready_future<std::error_code>(
errc::partition_not_exists);
}

if (auto it = _updates_in_progress.find(cmd.key);
it != _updates_in_progress.end()) {
return ss::make_ready_future<std::error_code>(errc::update_in_progress);
}

change_partition_replicas(
cmd.key, cmd.value.replicas, tp->second, *current_assignment_it, o, true);
notify_waiters();

return ss::make_ready_future<std::error_code>(errc::success);
}

template<typename T>
void incremental_update(
std::optional<T>& property, property_update<std::optional<T>> override) {
Expand Down Expand Up @@ -1092,13 +1127,20 @@ class topic_table::snapshot_applier {
update_it != topic.updates.end()) {
const auto& update = update_it->second;

if (update.state == reconfiguration_state::in_progress) {
if (
update.state == reconfiguration_state::in_progress
|| update.state == reconfiguration_state::force_update) {
cur_assignment.replicas = update_it->second.target_assignment;
}

auto initial_state = update.state
== reconfiguration_state::force_update
? update.state
: reconfiguration_state::in_progress;
in_progress_update inp_update{
partition.replicas,
update.target_assignment,
initial_state,
update.revision,
_probe,
};
Expand Down Expand Up @@ -1126,14 +1168,18 @@ class topic_table::snapshot_applier {
// cancellation because if later the "cancel revert" event
// happens, controller_backend has to execute the update
// delta to make progress.
auto op_type = update.state
== reconfiguration_state::force_update
? delta::op_type::force_update
: delta::op_type::update;
_pending_deltas.emplace_back(
ntp,
partition_assignment(
partition.group,
p_id,
update_it->second.target_assignment),
model::offset{update.revision},
delta::op_type::update,
op_type,
partition.replicas,
update_replicas_revisions(
partition.replicas_revisions,
Expand All @@ -1153,6 +1199,7 @@ class topic_table::snapshot_applier {

switch (update.state) {
case reconfiguration_state::in_progress:
case reconfiguration_state::force_update:
break;
case reconfiguration_state::cancelled:
add_cancel_delta(topic_table_delta::op_type::cancel_update);
Expand Down Expand Up @@ -1595,7 +1642,8 @@ void topic_table::change_partition_replicas(
const std::vector<model::broker_shard>& new_assignment,
topic_metadata_item& metadata,
partition_assignment& current_assignment,
model::offset o) {
model::offset o,
bool is_forced) {
if (are_replica_sets_equal(current_assignment.replicas, new_assignment)) {
return;
}
Expand All @@ -1605,7 +1653,12 @@ void topic_table::change_partition_replicas(
_updates_in_progress.emplace(
ntp,
in_progress_update(
current_assignment.replicas, new_assignment, update_revision, _probe));
current_assignment.replicas,
new_assignment,
is_forced ? reconfiguration_state::force_update
: reconfiguration_state::in_progress,
update_revision,
_probe));
auto previous_assignment = current_assignment.replicas;
// replace partition replica set
current_assignment.replicas = new_assignment;
Expand All @@ -1620,6 +1673,7 @@ void topic_table::change_partition_replicas(
in_progress_update(
current_assignment.replicas,
new_assignment,
reconfiguration_state::in_progress,
update_revision,
_probe));
vassert(
Expand Down Expand Up @@ -1655,11 +1709,14 @@ void topic_table::change_partition_replicas(
"partition {} must exist in the partition map",
ntp);

delta::op_type move_type = is_forced ? delta::op_type::force_update
: delta::op_type::update;

_pending_deltas.emplace_back(
std::move(ntp),
current_assignment,
o,
delta::op_type::update,
move_type,
std::move(previous_assignment),
update_replicas_revisions(
partition_it->second.replicas_revisions,
Expand Down
8 changes: 6 additions & 2 deletions src/v/cluster/topic_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,12 @@ class topic_table {
explicit in_progress_update(
std::vector<model::broker_shard> previous_replicas,
std::vector<model::broker_shard> target_replicas,
reconfiguration_state state,
model::revision_id update_revision,
topic_table_probe& probe)
: _previous_replicas(std::move(previous_replicas))
, _target_replicas(std::move(target_replicas))
, _state(reconfiguration_state::in_progress)
, _state(state)
, _update_revision(update_revision)
, _last_cmd_revision(update_revision)
, _probe(probe) {
Expand Down Expand Up @@ -288,6 +289,8 @@ class topic_table {
ss::future<std::error_code> apply(move_topic_replicas_cmd, model::offset);
ss::future<std::error_code>
apply(revert_cancel_partition_move_cmd, model::offset);
ss::future<std::error_code>
apply(force_partition_reconfiguration_cmd, model::offset);

ss::future<> fill_snapshot(controller_snapshot&) const;
ss::future<>
Expand Down Expand Up @@ -480,7 +483,8 @@ class topic_table {
const std::vector<model::broker_shard>& new_assignment,
topic_metadata_item& metadata,
partition_assignment& current_assignment,
model::offset o);
model::offset o,
bool is_forced);

class snapshot_applier;

Expand Down
7 changes: 7 additions & 0 deletions src/v/cluster/topic_updates_dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,13 @@ ss::future<std::error_code> topic_updates_dispatcher::apply(
});
}

ss::future<std::error_code> topic_updates_dispatcher::apply(
force_partition_reconfiguration_cmd cmd, model::offset base_offset) {
// Post dispatch, allocator updates are skipped because the target
// replica set is a subset of the original replica set.
return dispatch_updates_to_cores(std::move(cmd), base_offset);
}

topic_updates_dispatcher::in_progress_map
topic_updates_dispatcher::collect_in_progress(
const model::topic_namespace& tp_ns,
Expand Down
Loading

0 comments on commit 2937f26

Please sign in to comment.