Skip to content

Commit

Permalink
Merge pull request redpanda-data#15153 from bharathv/force_reconfig_e…
Browse files Browse the repository at this point in the history
…nhancement

force_reconfiguration: start new replicas as learners
  • Loading branch information
bharathv authored Nov 29, 2023
2 parents 913b4e1 + 7f82b21 commit 5f15341
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 40 deletions.
30 changes: 18 additions & 12 deletions src/v/cluster/cluster_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,19 +200,12 @@ ss::future<std::error_code> replicate_and_wait(
std::vector<custom_assignable_topic_configuration>
without_custom_assignments(std::vector<topic_configuration>);

/**
* Subtracts second replica set from the first one. Result contains only brokers
* shards that are present in first replica set but not in the second one.
*/
inline std::vector<model::broker_shard> subtract_replica_sets(
const std::vector<model::broker_shard>& lhs,
const std::vector<model::broker_shard>& rhs) {
std::vector<model::broker_shard> ret;
template<class T>
inline std::vector<T>
subtract(const std::vector<T>& lhs, const std::vector<T>& rhs) {
std::vector<T> ret;
std::copy_if(
lhs.begin(),
lhs.end(),
std::back_inserter(ret),
[&rhs](const model::broker_shard& bs) {
lhs.begin(), lhs.end(), std::back_inserter(ret), [&rhs](const T& bs) {
return std::find(rhs.begin(), rhs.end(), bs) == rhs.end();
});
return ret;
Expand Down Expand Up @@ -240,6 +233,19 @@ inline std::vector<model::broker_shard> union_replica_sets(
return ret;
}

template<class T>
inline std::vector<T>
intersect(const std::vector<T>& lhs, const std::vector<T>& rhs) {
std::vector<T> ret;
ret.reserve(std::min(lhs.size(), rhs.size()));
// Inefficient but constant time for inputs.
std::copy_if(
lhs.begin(), lhs.end(), std::back_inserter(ret), [&](const T& entry) {
return std::find(rhs.begin(), rhs.end(), entry) != rhs.end();
});
return ret;
}

// Checks if lhs is a proper subset of rhs
inline bool is_proper_subset(
const std::vector<model::broker_shard>& lhs,
Expand Down
71 changes: 66 additions & 5 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ ss::future<> controller_backend::bootstrap_partition_claims() {
ss::future<result<ss::stop_iteration>>
controller_backend::force_replica_set_update(
ss::lw_shared_ptr<partition> partition,
const replicas_t& previous_replicas,
const replicas_t& new_replicas,
const replicas_revision_map& initial_replicas_revisions,
model::revision_id cmd_rev) {
Expand All @@ -683,10 +684,12 @@ controller_backend::force_replica_set_update(
co_return ss::stop_iteration::yes;
}

auto [voters, learners] = split_voters_learners_for_force_reconfiguration(
previous_replicas, new_replicas, initial_replicas_revisions, cmd_rev);

// Force raft configuration update locally.
co_return co_await partition->force_update_replica_set(
create_vnode_set(new_replicas, initial_replicas_revisions, cmd_rev),
cmd_rev);
std::move(voters), std::move(learners), cmd_rev);
}

/**
Expand Down Expand Up @@ -1129,9 +1132,20 @@ ss::future<result<ss::stop_iteration>> controller_backend::reconcile_ntp_step(
updated_replicas
&& update_it->second.get_state()
== reconfiguration_state::force_update) {
// For force-update new replica starts with the updated
// configuration right away (joint consensus is not used)
initial_replicas = *updated_replicas;
auto [voters, learners]
= split_voters_learners_for_force_reconfiguration(
*orig_replicas,
*updated_replicas,
p_it->second.replicas_revisions,
last_cmd_revision);
// Current nodes is a voter only if we do not
// retain any of the original nodes. initial
// replicas is populated only if the replica is voter
// because for learners it automatically replicated
// via configuration update at raft level.
if (learners.empty()) {
initial_replicas = *updated_replicas;
}
} else {
// Configuration will be replicate to the new replica
initial_replicas = {};
Expand Down Expand Up @@ -1276,6 +1290,7 @@ controller_backend::reconcile_partition_reconfiguration(
case reconfiguration_state::force_update:
co_return co_await force_replica_set_update(
std::move(partition),
update.get_previous_replicas(),
update.get_target_replicas(),
replicas_revisions,
cmd_revision);
Expand Down Expand Up @@ -1915,6 +1930,52 @@ bool controller_backend::should_skip(const model::ntp& ntp) const {
return config::node().recovery_mode_enabled() && model::is_user_topic(ntp);
}

std::pair<controller_backend::vnodes, controller_backend::vnodes>
controller_backend::split_voters_learners_for_force_reconfiguration(
const replicas_t& original,
const replicas_t& new_replicas,
const replicas_revision_map& replicas_revision_map,
model::revision_id command_revision) {
auto original_vnodes = create_vnode_set(
original, replicas_revision_map, command_revision);
auto new_vnodes = create_vnode_set(
new_replicas, replicas_revision_map, command_revision);

auto enhanced_force_reconfiguration_enabled = _features.local().is_active(
features::feature::enhanced_force_reconfiguration);

vnodes voters;
vnodes learners;
if (unlikely(!enhanced_force_reconfiguration_enabled)) {
voters = std::move(new_vnodes);
} else {
auto remaining_original_nodes = intersect(original_vnodes, new_vnodes);
if (remaining_original_nodes.size() == 0) {
// we do not retain any of the original replicas, so
// the new replica set begins with every replica as a
// voter.
voters = std::move(new_vnodes);
} else {
// Here we do retain some original nodes, so making them
// as voters and the rest as learners ensures that the learners
// are first caught up before they form a majority.
voters = std::move(remaining_original_nodes);
learners = subtract(new_vnodes, original_vnodes);
}
}
vassert(
voters.size() + learners.size() == new_replicas.size(),
"Incorrect computation of voters {} and learners {} during force "
"reconfiguration, previous: {}, new replicas: {}, revision: {}. This is "
"most likely a logic error / bug.",
voters,
learners,
original,
new_replicas,
command_revision);
return std::make_pair(std::move(voters), std::move(learners));
}

std::ostream& operator<<(
std::ostream& o, const controller_backend::in_progress_operation& op) {
fmt::print(
Expand Down
13 changes: 13 additions & 0 deletions src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,18 @@ class controller_backend
ss::future<result<ss::stop_iteration>>
reconcile_ntp_step(const model::ntp&, ntp_reconciliation_state&);

/**
* Given the original and new replica set for a force configuration, splits
* the new replica set into voters and learners and returns the equivalent
* pair.
*/
using vnodes = std::vector<raft::vnode>;
std::pair<vnodes, vnodes> split_voters_learners_for_force_reconfiguration(
const replicas_t& original,
const replicas_t& new_replicas,
const replicas_revision_map&,
model::revision_id command_revision);

ss::future<std::error_code> create_partition(
model::ntp,
raft::group_id,
Expand Down Expand Up @@ -364,6 +376,7 @@ class controller_backend
model::revision_id cmd_revision);
ss::future<result<ss::stop_iteration>> force_replica_set_update(
ss::lw_shared_ptr<partition>,
const replicas_t& previous_replicas,
const replicas_t& new_replicas,
const replicas_revision_map& initial_replicas_revisions,
model::revision_id cmd_revision);
Expand Down
6 changes: 4 additions & 2 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,11 @@ class partition {
}

ss::future<std::error_code> force_update_replica_set(
std::vector<raft::vnode> nodes, model::revision_id new_revision_id) {
std::vector<raft::vnode> voters,
std::vector<raft::vnode> learners,
model::revision_id new_revision_id) {
return _raft->force_replace_configuration_locally(
std::move(nodes), new_revision_id);
std::move(voters), std::move(learners), new_revision_id);
}

raft::group_configuration group_configuration() const {
Expand Down
7 changes: 3 additions & 4 deletions src/v/cluster/partition_balancer_planner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,9 @@ ss::future<> partition_balancer_planner::init_ntp_sizes_from_health_report(
}
const auto& sizes = size_it->second;

auto moving_from = subtract_replica_sets(
auto moving_from = subtract(
update.get_previous_replicas(), update.get_target_replicas());
auto moving_to = subtract_replica_sets(
auto moving_to = subtract(
update.get_target_replicas(), update.get_previous_replicas());

switch (update.get_state()) {
Expand Down Expand Up @@ -594,8 +594,7 @@ class partition_balancer_planner::moving_partition {
auto sizes_it = _ctx._ntp2sizes.find(_ntp);
if (sizes_it != _ctx._ntp2sizes.end()) {
const auto& sizes = sizes_it->second;
auto moving_to = subtract_replica_sets(
_replicas, _orig_replicas);
auto moving_to = subtract(_replicas, _orig_replicas);
for (const auto& bs : moving_to) {
auto node_it = _ctx.node_disk_reports.find(bs.node_id);
if (node_it != _ctx.node_disk_reports.end()) {
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/scheduling/partition_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ partition_allocator::apply_snapshot(const controller_snapshot& snap) {
const auto& update = it->second;
// Both old and new replicas contribute to allocator weights
// regardless of the update state.
auto additional_replicas = subtract_replica_sets(
auto additional_replicas = subtract(
update.target_assignment, partition.replicas);
for (const auto& bs : additional_replicas) {
new_state->add_allocation(bs, domain);
Expand Down
20 changes: 8 additions & 12 deletions src/v/cluster/topic_updates_dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,12 @@ ss::future<std::error_code> topic_updates_dispatcher::apply(
"currently being updated",
ntp);

auto to_add = subtract_replica_sets(
auto to_add = subtract(
*new_target_replicas, current_assignment->replicas);
_partition_allocator.local().add_final_counts(
to_add, get_allocation_domain(ntp));

auto to_remove = subtract_replica_sets(
auto to_remove = subtract(
current_assignment->replicas, *new_target_replicas);
_partition_allocator.local().remove_final_counts(
to_remove, get_allocation_domain(ntp));
Expand Down Expand Up @@ -296,8 +296,7 @@ ss::future<std::error_code> topic_updates_dispatcher::apply(
std::vector<model::broker_shard> to_delete;
// move was successful, not cancelled
if (target_replicas == command_replicas) {
to_delete = subtract_replica_sets(
*previous_replicas, command_replicas);
to_delete = subtract(*previous_replicas, command_replicas);
} else {
vassert(
previous_replicas == command_replicas,
Expand All @@ -308,8 +307,7 @@ ss::future<std::error_code> topic_updates_dispatcher::apply(
ntp,
command_replicas,
previous_replicas);
to_delete = subtract_replica_sets(
*target_replicas, command_replicas);
to_delete = subtract(*target_replicas, command_replicas);
}
_partition_allocator.local().remove_allocations(
to_delete, get_allocation_domain(ntp));
Expand Down Expand Up @@ -420,13 +418,11 @@ ss::future<std::error_code> topic_updates_dispatcher::apply(
"currently being cancelled",
ntp);

auto to_add = subtract_replica_sets(
*target_replicas, *previous_replicas);
auto to_add = subtract(*target_replicas, *previous_replicas);
_partition_allocator.local().add_final_counts(
to_add, get_allocation_domain(ntp));

auto to_delete = subtract_replica_sets(
*previous_replicas, *target_replicas);
auto to_delete = subtract(*previous_replicas, *target_replicas);
_partition_allocator.local().remove_allocations(
to_delete, get_allocation_domain(ntp));
_partition_allocator.local().remove_final_counts(
Expand Down Expand Up @@ -587,11 +583,11 @@ void topic_updates_dispatcher::update_allocations_for_reconfiguration(
const std::vector<model::broker_shard>& previous,
const std::vector<model::broker_shard>& target,
partition_allocation_domain domain) {
auto to_add = subtract_replica_sets(target, previous);
auto to_add = subtract(target, previous);
_partition_allocator.local().add_allocations(to_add, domain);
_partition_allocator.local().add_final_counts(to_add, domain);

auto to_remove = subtract_replica_sets(previous, target);
auto to_remove = subtract(previous, target);
_partition_allocator.local().remove_final_counts(to_remove, domain);
}

Expand Down
16 changes: 14 additions & 2 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1307,10 +1307,22 @@ consensus::abort_configuration_change(model::revision_id revision) {
}

ss::future<std::error_code> consensus::force_replace_configuration_locally(
std::vector<vnode> nodes, model::revision_id new_revision) {
std::vector<vnode> voters,
std::vector<vnode> learners,
model::revision_id new_revision) {
try {
auto units = co_await _op_lock.get_units();
auto new_cfg = group_configuration(std::move(nodes), new_revision);
auto new_cfg = group_configuration(
std::move(voters), std::move(learners), new_revision);
if (
new_cfg.version() == group_configuration::v_5
&& use_serde_configuration()) {
vlog(
_ctxlog.debug,
"Upgrading configuration {} version to 6",
new_cfg);
new_cfg.set_version(group_configuration::v_6);
}
vlog(_ctxlog.info, "Force replacing configuration with: {}", new_cfg);
auto batches = details::serialize_configuration_as_batches(
std::move(new_cfg));
Expand Down
4 changes: 3 additions & 1 deletion src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ class consensus {
* majority.
*/
ss::future<std::error_code> force_replace_configuration_locally(
std::vector<vnode>, model::revision_id);
std::vector<vnode> voters,
std::vector<vnode> learners,
model::revision_id);

// Abort ongoing configuration change - may cause data loss
ss::future<std::error_code> abort_configuration_change(model::revision_id);
Expand Down
10 changes: 10 additions & 0 deletions src/v/raft/group_configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,16 @@ group_configuration::group_configuration(
_current.voters = std::move(initial_nodes);
}

group_configuration::group_configuration(
std::vector<vnode> voters,
std::vector<vnode> learners,
model::revision_id rev)
: _version(v_5)
, _revision(rev) {
_current.voters = std::move(voters);
_current.learners = std::move(learners);
}

group_configuration::group_configuration(
std::vector<model::broker> brokers,
group_nodes current,
Expand Down
5 changes: 5 additions & 0 deletions src/v/raft/group_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ class group_configuration
* This is preferred constructor for group configuration
*/
group_configuration(std::vector<vnode>, model::revision_id);

group_configuration(
std::vector<vnode> voters,
std::vector<vnode> learners,
model::revision_id);
/**
* creates joint configuration, version 4, with brokers
*/
Expand Down
2 changes: 1 addition & 1 deletion src/v/redpanda/admin/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2664,7 +2664,7 @@ admin_server::get_decommission_progress_handler(
status.ns = p.ntp.ns;
status.topic = p.ntp.tp.topic;
status.partition = p.ntp.tp.partition;
auto added_replicas = cluster::subtract_replica_sets(
auto added_replicas = cluster::subtract(
p.current_assignment, p.previous_assignment);
// we are only interested in reconfigurations where one replica was
// added to the node
Expand Down

0 comments on commit 5f15341

Please sign in to comment.