diff --git a/src/v/cluster/partition_balancer_backend.cc b/src/v/cluster/partition_balancer_backend.cc index 80a91e32e514..8ab1dbc71bf7 100644 --- a/src/v/cluster/partition_balancer_backend.cc +++ b/src/v/cluster/partition_balancer_backend.cc @@ -163,20 +163,19 @@ ss::future<> partition_balancer_backend::do_tick() { .segment_fallocation_step = _segment_fallocation_step()}, _state, _partition_allocator) - .plan_reassignments(health_report.value(), follower_metrics); + .plan_actions(health_report.value(), follower_metrics); _last_leader_term = _raft0->term(); _last_tick_time = ss::lowres_clock::now(); _last_violations = std::move(plan_data.violations); if ( _state.topics().has_updates_in_progress() - || plan_data.status == planner_status::cancellations_planned - || plan_data.status == planner_status::movement_planned) { + || plan_data.status == planner_status::actions_planned) { _last_status = partition_balancer_status::in_progress; } else if (plan_data.status == planner_status::waiting_for_reports) { _last_status = partition_balancer_status::starting; } else if ( - plan_data.failed_reassignments_count > 0 + plan_data.failed_actions_count > 0 || plan_data.status == planner_status::waiting_for_maintenance_end) { _last_status = partition_balancer_status::stalled; } else { @@ -189,14 +188,14 @@ ss::future<> partition_balancer_backend::do_tick() { "last status: {}; " "violations: unavailable nodes: {}, full nodes: {}; " "updates in progress: {}; " - "reassignments planned: {}, cancelled: {}, failed: {}", + "action counts: reassignments: {}, cancellations: {}, failed: {}", _last_status, _last_violations.unavailable_nodes.size(), _last_violations.full_nodes.size(), _state.topics().updates_in_progress().size(), plan_data.reassignments.size(), plan_data.cancellations.size(), - plan_data.failed_reassignments_count); + plan_data.failed_actions_count); } co_await ss::max_concurrent_for_each( diff --git a/src/v/cluster/partition_balancer_planner.cc b/src/v/cluster/partition_balancer_planner.cc index 2bfd212728fc..34102cdd0bff 100644 --- a/src/v/cluster/partition_balancer_planner.cc +++ b/src/v/cluster/partition_balancer_planner.cc @@ -19,6 +19,7 @@ #include "ssx/sformat.h" #include +#include #include @@ -66,10 +67,60 @@ partition_balancer_planner::partition_balancer_planner( _config.soft_max_disk_usage_ratio, _config.hard_max_disk_usage_ratio); } +class partition_balancer_planner::request_context { +public: + std::vector all_nodes; + absl::flat_hash_set all_unavailable_nodes; + absl::flat_hash_set timed_out_unavailable_nodes; + size_t num_nodes_in_maintenance = 0; + absl::flat_hash_set decommissioning_nodes; + absl::flat_hash_map node_disk_reports; + + void for_each_partition( + ss::noncopyable_function); + void with_partition( + const model::ntp&, ss::noncopyable_function); + + const partition_balancer_state& state() const { return _parent._state; } + + const planner_config& config() const { return _parent._config; } + + bool is_batch_full() const { + return _planned_moves_size_bytes + >= _parent._config.movement_disk_size_batch; + } + +private: + friend class partition_balancer_planner; + + request_context(partition_balancer_planner& parent) + : _parent(parent) {} + + bool all_reports_received() const; + + template + auto do_with_partition( + const model::ntp& ntp, + const std::vector& orig_replicas, + Visitor&); + + void collect_actions(plan_data&); + +private: + partition_balancer_planner& _parent; + absl::node_hash_map _ntp2size; + absl::node_hash_map> + _moving_ntp2replica_sizes; + absl::node_hash_map _reassignments; + uint64_t _planned_moves_size_bytes = 0; + size_t _failed_actions_count = 0; + absl::node_hash_set _cancellations; +}; + void partition_balancer_planner::init_per_node_state( const cluster_health_report& health_report, const std::vector& follower_metrics, - reallocation_request_state& rrs, + request_context& ctx, plan_data& result) const { for (const auto& [id, broker] : _state.members().nodes()) { if ( @@ -78,20 +129,20 @@ void partition_balancer_planner::init_per_node_state( continue; } - rrs.all_nodes.push_back(id); + ctx.all_nodes.push_back(id); if ( broker.state.get_maintenance_state() == model::maintenance_state::active) { vlog(clusterlog.debug, "node {}: in maintenance", id); - rrs.num_nodes_in_maintenance += 1; + ctx.num_nodes_in_maintenance += 1; } if ( broker.state.get_membership_state() == model::membership_state::draining) { vlog(clusterlog.debug, "node {}: decommissioning", id); - rrs.decommissioning_nodes.insert(id); + ctx.decommissioning_nodes.insert(id); } } @@ -110,10 +161,10 @@ void partition_balancer_planner::init_per_node_state( continue; } - rrs.all_unavailable_nodes.insert(follower.id); + ctx.all_unavailable_nodes.insert(follower.id); if (unavailable_dur > _config.node_availability_timeout_sec) { - rrs.timed_out_unavailable_nodes.insert(follower.id); + ctx.timed_out_unavailable_nodes.insert(follower.id); model::timestamp unavailable_since = model::to_timestamp( model::timestamp_clock::now() - std::chrono::duration_cast( @@ -127,11 +178,11 @@ void partition_balancer_planner::init_per_node_state( const uint64_t total = node_report.local_state.data_disk.total; const uint64_t free = node_report.local_state.data_disk.free; - rrs.node_disk_reports.emplace( + ctx.node_disk_reports.emplace( node_report.id, node_disk_space(node_report.id, total, total - free)); } - for (const auto& [id, disk] : rrs.node_disk_reports) { + for (const auto& [id, disk] : ctx.node_disk_reports) { double used_space_ratio = disk.original_used_ratio(); vlog( clusterlog.debug, @@ -148,24 +199,80 @@ void partition_balancer_planner::init_per_node_state( } void partition_balancer_planner::init_ntp_sizes_from_health_report( - const cluster_health_report& health_report, reallocation_request_state& rrs) { + const cluster_health_report& health_report, request_context& ctx) { for (const auto& node_report : health_report.node_reports) { for (const auto& tp_ns : node_report.topics) { for (const auto& partition : tp_ns.partitions) { - rrs.ntp_sizes[model::ntp( - tp_ns.tp_ns.ns, tp_ns.tp_ns.tp, partition.id)] - = partition.size_bytes; + model::ntp ntp{tp_ns.tp_ns.ns, tp_ns.tp_ns.tp, partition.id}; + auto& ntp_size = ctx._ntp2size[ntp]; + ntp_size = std::max(ntp_size, partition.size_bytes); + + if (_state.topics().is_update_in_progress(ntp)) { + ctx._moving_ntp2replica_sizes[ntp][node_report.id] + = partition.size_bytes; + } } } } + + // Add moving partitions contribution to batch size and node disk sizes. + for (const auto& [ntp, replica2size] : ctx._moving_ntp2replica_sizes) { + const auto& update = _state.topics().updates_in_progress().at(ntp); + + auto moving_from = subtract_replica_sets( + update.get_previous_replicas(), update.get_target_replicas()); + auto moving_to = subtract_replica_sets( + update.get_target_replicas(), update.get_previous_replicas()); + + size_t max_size = ctx._ntp2size.at(ntp); + + switch (update.get_state()) { + case reconfiguration_state::in_progress: + case reconfiguration_state::force_update: + ctx._planned_moves_size_bytes += max_size; + + for (const auto& bs : moving_from) { + auto node_it = ctx.node_disk_reports.find(bs.node_id); + if (node_it != ctx.node_disk_reports.end()) { + auto size_it = replica2size.find(bs.node_id); + size_t replica_size + = (size_it != replica2size.end() ? size_it->second : max_size); + node_it->second.released += replica_size; + } + } + + for (const auto& bs : moving_to) { + auto node_it = ctx.node_disk_reports.find(bs.node_id); + if (node_it != ctx.node_disk_reports.end()) { + auto size_it = replica2size.find(bs.node_id); + size_t replica_size + = (size_it != replica2size.end() ? size_it->second : 0); + node_it->second.assigned += (max_size - replica_size); + } + } + + break; + case reconfiguration_state::cancelled: + case reconfiguration_state::force_cancelled: + for (const auto& bs : moving_to) { + auto node_it = ctx.node_disk_reports.find(bs.node_id); + if (node_it != ctx.node_disk_reports.end()) { + auto size_it = replica2size.find(bs.node_id); + if (size_it != replica2size.end()) { + node_it->second.released += size_it->second; + } + } + } + break; + } + } } -bool partition_balancer_planner::all_reports_received( - const reallocation_request_state& rrs) { - for (auto id : rrs.all_nodes) { +bool partition_balancer_planner::request_context::all_reports_received() const { + for (auto id : all_nodes) { if ( - !rrs.all_unavailable_nodes.contains(id) - && !rrs.node_disk_reports.contains(id)) { + !all_unavailable_nodes.contains(id) + && !node_disk_reports.contains(id)) { vlog(clusterlog.info, "No disk report for node {}", id); return false; } @@ -173,15 +280,15 @@ bool partition_balancer_planner::all_reports_received( return true; } -bool partition_balancer_planner::is_partition_movement_possible( - const std::vector& current_replicas, - const reallocation_request_state& rrs) { +static bool has_quorum( + const absl::flat_hash_set& all_unavailable_nodes, + const std::vector& current_replicas) { // Check that nodes quorum is available size_t available_nodes_amount = std::count_if( current_replicas.begin(), current_replicas.end(), - [&rrs](const model::broker_shard& bs) { - return !rrs.all_unavailable_nodes.contains(bs.node_id); + [&](const model::broker_shard& bs) { + return !all_unavailable_nodes.contains(bs.node_id); }); if (available_nodes_amount * 2 < current_replicas.size()) { return false; @@ -189,193 +296,498 @@ bool partition_balancer_planner::is_partition_movement_possible( return true; } -std::optional partition_balancer_planner::get_partition_size( - const model::ntp& ntp, const reallocation_request_state& rrs) { - const auto ntp_data = rrs.ntp_sizes.find(ntp); - if (ntp_data == rrs.ntp_sizes.end()) { +class partition_balancer_planner::reassignable_partition { +public: + const model::ntp& ntp() const { return _ntp; } + const std::vector& replicas() const { + return (_reallocated ? _reallocated->replicas() : _orig_replicas); + }; + + bool is_original(const model::broker_shard& replica) const { + return !_reallocated || _reallocated->is_original(replica); + } + + size_t size_bytes() const { return _size_bytes; } + + result move_replica( + model::node_id replica, + double max_disk_usage_ratio, + std::string_view reason); + +private: + friend class request_context; + + reassignable_partition( + model::ntp ntp, + size_t size_bytes, + std::optional reallocated, + const std::vector& orig_replicas, + request_context& ctx) + : _ntp(std::move(ntp)) + , _size_bytes(size_bytes) + , _reallocated(std::move(reallocated)) + , _orig_replicas(orig_replicas) + , _ctx(ctx) {} + + bool has_changes() const { + return _reallocated && _reallocated->has_node_changes(); + } + + allocation_constraints + get_allocation_constraints(double max_disk_usage_ratio) const; + +private: + model::ntp _ntp; + size_t _size_bytes; + std::optional _reallocated; + const std::vector& _orig_replicas; + request_context& _ctx; +}; + +class partition_balancer_planner::moving_partition { +public: + const model::ntp& ntp() const { return _ntp; } + const std::vector& replicas() const { + return (_cancel_requested ? _orig_replicas : _replicas); + } + + const std::vector& orig_replicas() const { + return _orig_replicas; + } + + bool cancel_requested() const { return _cancel_requested; } + + void request_cancel(std::string_view reason) { + if (!_cancel_requested) { + vlog( + clusterlog.info, + "ntp: {}, cancelling move {} -> {}, reason: {}", + ntp(), + orig_replicas(), + replicas(), + reason); + + _ctx._cancellations.insert(_ntp); + _cancel_requested = true; + + // Adjust partition contribution to final disk space + auto size_it = _ctx._moving_ntp2replica_sizes.find(_ntp); + if (size_it != _ctx._moving_ntp2replica_sizes.end()) { + const auto& replica2size = size_it->second; + auto moving_to = subtract_replica_sets( + _replicas, _orig_replicas); + for (const auto& bs : moving_to) { + auto node_it = _ctx.node_disk_reports.find(bs.node_id); + if (node_it != _ctx.node_disk_reports.end()) { + auto size_it = replica2size.find(bs.node_id); + if (size_it != replica2size.end()) { + node_it->second.released += size_it->second; + } + } + } + } + + // TODO: adjust contribution to final partition counts + } + } + + void + report_failure(std::string_view reason, std::string_view change_reason) { vlog( clusterlog.info, - "Partition {} status was not found in cluster health " - "report", - ntp); + "[ntp {}, replicas: {}]: can't change replicas with cancellation: {} " + "(change reason: {})", + _ntp, + _replicas, + reason, + change_reason); + ++_ctx._failed_actions_count; + } + +private: + friend class request_context; + + moving_partition( + model::ntp ntp, + const std::vector& replicas, + const std::vector& orig_replicas, + request_context& ctx) + : _ntp(std::move(ntp)) + , _replicas(replicas) + , _orig_replicas(orig_replicas) + , _cancel_requested(ctx._cancellations.contains(_ntp)) + , _ctx(ctx) {} + +private: + model::ntp _ntp; + const std::vector& _replicas; + const std::vector& _orig_replicas; + bool _cancel_requested; + request_context& _ctx; +}; + +/// Partition that we for some reason cannot do anything about. +class partition_balancer_planner::immutable_partition { +public: + const model::ntp& ntp() const { return _ntp; } + const std::vector& replicas() const { + return _replicas; + } + + enum class immutability_reason { + // not enough replicas on live nodes, reassignment unlikely to succeed + no_quorum, + // no partition size information + no_size_info, + // partition reconfiguration + reconfiguration_state, + }; + + immutability_reason reason() const { return _reason; } + + void report_failure(std::string_view change_reason) { + ss::sstring reason; + switch (_reason) { + case immutability_reason::no_quorum: + reason = "no raft quorum"; + break; + case immutability_reason::no_size_info: + reason = "partition size information unavailable"; + break; + case immutability_reason::reconfiguration_state: + reason = ssx::sformat( + "reconfiguration in progress, state: {}", _reconfiguration_state); + break; + } + vlog( + clusterlog.info, + "[ntp {}, replicas: {}]: can't change replicas: {} (change reason: " + "{})", + _ntp, + _replicas, + reason, + change_reason); + ++_ctx._failed_actions_count; + } + +private: + friend class request_context; + + immutable_partition( + model::ntp ntp, + const std::vector& replicas, + immutability_reason reason, + std::optional state, + request_context& ctx) + : _ntp(std::move(ntp)) + , _replicas(replicas) + , _reason(reason) + , _reconfiguration_state(state) + , _ctx(ctx) {} + +private: + model::ntp _ntp; + const std::vector& _replicas; + immutability_reason _reason; + std::optional _reconfiguration_state; + request_context& _ctx; +}; + +class partition_balancer_planner::partition { +public: + const model::ntp& ntp() const { + return std::visit( + [](const auto& p) -> const model::ntp& { return p.ntp(); }, _variant); + } + + const std::vector& replicas() const { + return std::visit( + [](const auto& p) -> const std::vector& { + return p.replicas(); + }, + _variant); + } + + template + auto match_variant(Visitors&&... vs) { + return ss::visit(_variant, std::forward(vs)...); + } + +private: + friend class partition_balancer_planner::request_context; + + template + partition(T&& variant) + : _variant(std::forward(variant)) {} + + std::variant + _variant; +}; + +template +auto partition_balancer_planner::request_context::do_with_partition( + const model::ntp& ntp, + const std::vector& orig_replicas, + Visitor& visitor) { + auto in_progress_it = _parent._state.topics().updates_in_progress().find( + ntp); + if (in_progress_it != _parent._state.topics().updates_in_progress().end()) { + const auto& replicas = in_progress_it->second.get_target_replicas(); + const auto& orig_replicas + = in_progress_it->second.get_previous_replicas(); + auto state = in_progress_it->second.get_state(); + + if (state == reconfiguration_state::in_progress) { + partition part{ + moving_partition{ntp, replicas, orig_replicas, *this}}; + return visitor(part); + } else { + partition part{immutable_partition{ + ntp, + replicas, + immutable_partition::immutability_reason::reconfiguration_state, + state, + *this}}; + return visitor(part); + } + } + + size_t size_bytes = 0; + auto size_it = _ntp2size.find(ntp); + if (size_it != _ntp2size.end()) { + size_bytes = size_it->second; } else { - return ntp_data->second; + partition part{immutable_partition{ + ntp, + orig_replicas, + immutable_partition::immutability_reason::no_size_info, + std::nullopt, + *this}}; + return visitor(part); } - return std::nullopt; + + if (!has_quorum(all_unavailable_nodes, orig_replicas)) { + partition part{immutable_partition{ + ntp, + orig_replicas, + immutable_partition::immutability_reason::no_quorum, + std::nullopt, + *this}}; + return visitor(part); + } + + std::optional reallocated; + auto reassignment_it = _reassignments.find(ntp); + if (reassignment_it != _reassignments.end()) { + // borrow the allocated_partition object + reallocated = std::move(reassignment_it->second); + } + + partition part{reassignable_partition{ + ntp, size_bytes, std::move(reallocated), orig_replicas, *this}}; + auto deferred = ss::defer([&] { + auto& reassignable = std::get(part._variant); + // insert or return part._reallocated to reassignments + if (reassignment_it != _reassignments.end()) { + reassignment_it->second = std::move(*reassignable._reallocated); + } else if ( + reassignable._reallocated + && reassignable._reallocated->has_node_changes()) { + _reassignments.emplace(ntp, std::move(*reassignable._reallocated)); + _planned_moves_size_bytes += reassignable._size_bytes; + } + }); + + return visitor(part); } -partition_constraints partition_balancer_planner::get_partition_constraints( - const partition_assignment& assignments, - size_t partition_size, - double max_disk_usage_ratio, - reallocation_request_state& rrs) const { - allocation_constraints allocation_constraints; +void partition_balancer_planner::request_context::for_each_partition( + ss::noncopyable_function visitor) { + for (const auto& t : _parent._state.topics().topics_map()) { + for (const auto& a : t.second.get_assignments()) { + auto ntp = model::ntp(t.first.ns, t.first.tp, a.id); + auto stop = do_with_partition(ntp, a.replicas, visitor); + if (stop == ss::stop_iteration::yes) { + return; + } + } + } +} + +void partition_balancer_planner::request_context::with_partition( + const model::ntp& ntp, ss::noncopyable_function visitor) { + auto topic = model::topic_namespace_view(ntp); + auto topic_meta = _parent._state.topics().get_topic_metadata_ref(topic); + if (!topic_meta) { + vlog(clusterlog.warn, "topic {} not found", topic); + return; + } + auto it = topic_meta->get().get_assignments().find(ntp.tp.partition); + if (it == topic_meta->get().get_assignments().end()) { + vlog( + clusterlog.warn, + "partition {} of topic {} not found", + ntp.tp.partition, + topic); + return; + } + + do_with_partition(ntp, it->replicas, visitor); +} + +allocation_constraints +partition_balancer_planner::reassignable_partition::get_allocation_constraints( + double max_disk_usage_ratio) const { + allocation_constraints constraints; // Add constraint on least disk usage - allocation_constraints.add( - least_disk_filled(max_disk_usage_ratio, rrs.node_disk_reports)); + constraints.add( + least_disk_filled(max_disk_usage_ratio, _ctx.node_disk_reports)); // Add constraint on partition max_disk_usage_ratio overfill - size_t upper_bound_for_partition_size = partition_size - + _config.segment_fallocation_step; - allocation_constraints.add(disk_not_overflowed_by_partition( + size_t upper_bound_for_partition_size + = _size_bytes + _ctx.config().segment_fallocation_step; + constraints.add(disk_not_overflowed_by_partition( max_disk_usage_ratio, upper_bound_for_partition_size, - rrs.node_disk_reports)); + _ctx.node_disk_reports)); // Add constraint on unavailable nodes - allocation_constraints.add(distinct_from(rrs.timed_out_unavailable_nodes)); + constraints.add(distinct_from(_ctx.timed_out_unavailable_nodes)); // Add constraint on decommissioning nodes - if (!rrs.decommissioning_nodes.empty()) { - allocation_constraints.add(distinct_from(rrs.decommissioning_nodes)); + if (!_ctx.decommissioning_nodes.empty()) { + constraints.add(distinct_from(_ctx.decommissioning_nodes)); } - return partition_constraints( - assignments.id, - assignments.replicas.size(), - std::move(allocation_constraints)); + return constraints; } -result partition_balancer_planner::get_reallocation( - const model::ntp& ntp, - const partition_assignment& assignments, - size_t partition_size, - partition_constraints constraints, - const std::vector& stable_replicas, - reallocation_request_state& rrs) { +result +partition_balancer_planner::reassignable_partition::move_replica( + model::node_id replica, + double max_disk_usage_ratio, + std::string_view reason) { vlog( clusterlog.debug, - "trying to find reallocation for ntp {} with stable_replicas: {}", - ntp, - stable_replicas); - - auto stable_assigments = partition_assignment( - assignments.group, assignments.id, stable_replicas); + "ntp {} (size: {}, current replicas: {}): trying to move replica on " + "node: {}, reason: {}", + _ntp, + _size_bytes, + replicas(), + replica, + reason); - auto reallocation = _partition_allocator.reallocate_partition( - std::move(constraints), stable_assigments, get_allocation_domain(ntp)); + if (!_reallocated) { + _reallocated + = _ctx._parent._partition_allocator.make_allocated_partition( + replicas(), get_allocation_domain(_ntp)); + } - if (!reallocation) { + auto constraints = get_allocation_constraints(max_disk_usage_ratio); + auto moved = _ctx._parent._partition_allocator.reallocate_replica( + *_reallocated, replica, std::move(constraints)); + if (!moved) { vlog( clusterlog.info, - "attempt to find reallocation for ntp {} with " - "stable_replicas: {} failed, error: {}", - ntp, - stable_replicas, - reallocation.error().message()); - - return reallocation; + "ntp {}: attempt to move replica {} (reason: {}) failed, error: " + "{}", + _ntp, + replica, + reason, + moved.error().message()); + _ctx._failed_actions_count += 1; + return moved; } - rrs.moving_partitions.insert(ntp); - rrs.planned_moves_size += partition_size; - for (const auto r : reallocation.value().replicas()) { - if ( - std::find(stable_replicas.begin(), stable_replicas.end(), r) - == stable_replicas.end()) { - auto disk_it = rrs.node_disk_reports.find(r.node_id); - if (disk_it != rrs.node_disk_reports.end()) { - disk_it->second.assigned += partition_size; - } + if (moved.value().node_id != replica) { + auto from_it = _ctx.node_disk_reports.find(replica); + if (from_it != _ctx.node_disk_reports.end()) { + from_it->second.released += _size_bytes; } - } - for (const auto r : assignments.replicas) { - if ( - std::find(stable_replicas.begin(), stable_replicas.end(), r) - == stable_replicas.end()) { - auto disk_it = rrs.node_disk_reports.find(r.node_id); - if (disk_it != rrs.node_disk_reports.end()) { - disk_it->second.released += partition_size; - } + + auto to_it = _ctx.node_disk_reports.find(moved.value().node_id); + if (to_it != _ctx.node_disk_reports.end()) { + to_it->second.assigned += _size_bytes; } + } else { + // TODO: revert? } - return reallocation; -} - -void partition_balancer_planner::plan_data::add_reassignment( - model::ntp ntp, - const std::vector& orig_replicas, - allocated_partition reallocation, - std::string_view reason) { - vlog( - clusterlog.info, - "ntp: {}, planning move {} -> {} (reason: {})", - ntp, - orig_replicas, - reallocation.replicas(), - reason); - - reassignments.emplace_back( - ntp_reassignment{.ntp = ntp, .allocated = std::move(reallocation)}); + return moved; } /* * Function is trying to move ntp out of unavailable nodes * It can move to nodes that are violating soft_max_disk_usage_ratio constraint */ -void partition_balancer_planner::get_unavailable_nodes_reassignments( - plan_data& result, reallocation_request_state& rrs) { - if (rrs.timed_out_unavailable_nodes.empty()) { +void partition_balancer_planner::get_unavailable_nodes_actions( + request_context& ctx) { + if (ctx.timed_out_unavailable_nodes.empty()) { return; } - for (const auto& t : _state.topics().topics_map()) { - for (const auto& a : t.second.get_assignments()) { - // End adding movements if batch is collected - if (rrs.planned_moves_size >= _config.movement_disk_size_batch) { - return; - } - - auto ntp = model::ntp(t.first.ns, t.first.tp, a.id); - if (rrs.moving_partitions.contains(ntp)) { - continue; - } - - std::vector stable_replicas; - for (const auto& bs : a.replicas) { - if (!rrs.timed_out_unavailable_nodes.contains(bs.node_id)) { - stable_replicas.push_back(bs); - } - } - - if (stable_replicas.size() == a.replicas.size()) { - continue; - } + ctx.for_each_partition([&](partition& part) { + // End adding movements if batch is collected + if (ctx.is_batch_full()) { + return ss::stop_iteration::yes; + } - auto partition_size = get_partition_size(ntp, rrs); - if ( - !partition_size.has_value() - || !is_partition_movement_possible(a.replicas, rrs)) { - result.failed_reassignments_count += 1; - continue; + std::vector to_move; + for (const auto& bs : part.replicas()) { + if (ctx.timed_out_unavailable_nodes.contains(bs.node_id)) { + to_move.push_back(bs.node_id); } + } - auto constraints = get_partition_constraints( - a, - partition_size.value(), - _config.hard_max_disk_usage_ratio, - rrs); - - auto new_allocation_units = get_reallocation( - ntp, - a, - partition_size.value(), - std::move(constraints), - stable_replicas, - rrs); - if (new_allocation_units) { - result.add_reassignment( - ntp, - a.replicas, - std::move(new_allocation_units.value()), - "unavailable nodes"); - } else { - result.failed_reassignments_count += 1; - } + if (to_move.empty()) { + return ss::stop_iteration::no; } - } + + part.match_variant( + [&](reassignable_partition& part) { + for (const auto& replica : to_move) { + // ignore result + (void)part.move_replica( + replica, + ctx.config().hard_max_disk_usage_ratio, + "unavailable nodes"); + } + }, + [&](moving_partition& part) { + if (part.cancel_requested()) { + return; + } + + absl::flat_hash_set previous_replicas_set; + bool was_on_decommissioning_node = false; + for (const auto& r : part.orig_replicas()) { + previous_replicas_set.insert(r.node_id); + if (ctx.decommissioning_nodes.contains(r.node_id)) { + was_on_decommissioning_node = true; + } + } + + for (const auto& r : to_move) { + if (!previous_replicas_set.contains(r)) { + if (!was_on_decommissioning_node) { + // makes sense to cancel + part.request_cancel("unavailable nodes"); + } else { + part.report_failure( + "move related to decommission", + "unavailable nodes"); + } + break; + } + } + }, + [](immutable_partition& part) { + part.report_failure("unavailable nodes"); + }); + + return ss::stop_iteration::no; + }); } /// Try to fix ntps that have several replicas in one rack (these ntps can @@ -388,93 +800,68 @@ void partition_balancer_planner::get_unavailable_nodes_reassignments( /// the ntp is replicated to, we try to schedule a move. For each rack we /// arbitrarily choose the first appearing replica to remain there (note: this /// is probably not optimal choice). -void partition_balancer_planner::get_rack_constraint_repair_reassignments( - plan_data& result, reallocation_request_state& rrs) { - if (_state.ntps_with_broken_rack_constraint().empty()) { +void partition_balancer_planner::get_rack_constraint_repair_actions( + request_context& ctx) { + if (ctx.state().ntps_with_broken_rack_constraint().empty()) { return; } absl::flat_hash_set available_racks; - for (auto node_id : rrs.all_nodes) { - if (!rrs.timed_out_unavailable_nodes.contains(node_id)) { - auto rack = _state.members().get_node_rack_id(node_id); + for (auto node_id : ctx.all_nodes) { + if (!ctx.timed_out_unavailable_nodes.contains(node_id)) { + auto rack = ctx.state().members().get_node_rack_id(node_id); if (rack) { available_racks.insert(*rack); } } } - for (const auto& ntp : _state.ntps_with_broken_rack_constraint()) { - if (rrs.planned_moves_size >= _config.movement_disk_size_batch) { + for (const auto& ntp : ctx.state().ntps_with_broken_rack_constraint()) { + if (ctx.is_batch_full()) { return; } - if (rrs.moving_partitions.contains(ntp)) { - continue; - } - - auto assignment = _state.topics().get_partition_assignment(ntp); - if (!assignment) { - vlog(clusterlog.warn, "assignment for ntp {} not found", ntp); - continue; - } - - const auto& orig_replicas = assignment->replicas; - - std::vector stable_replicas; - absl::flat_hash_set cur_racks; - for (const auto& bs : orig_replicas) { - auto rack = _state.members().get_node_rack_id(bs.node_id); - if (rack) { - auto [it, inserted] = cur_racks.insert(*rack); - if (inserted) { - stable_replicas.push_back(bs); + ctx.with_partition(ntp, [&](partition& part) { + std::vector to_move; + absl::flat_hash_set cur_racks; + for (const auto& bs : part.replicas()) { + auto rack = ctx.state().members().get_node_rack_id(bs.node_id); + if (rack) { + auto [it, inserted] = cur_racks.insert(*rack); + if (!inserted) { + to_move.push_back(bs); + } } - } else { - stable_replicas.push_back(bs); } - } - if (stable_replicas.size() == orig_replicas.size()) { - continue; - } - - if (available_racks.size() <= cur_racks.size()) { - // Can't repair the constraint if we don't have an available rack to - // place a replica there. - continue; - } - - auto partition_size = get_partition_size(ntp, rrs); - if ( - !partition_size.has_value() - || !is_partition_movement_possible(orig_replicas, rrs)) { - result.failed_reassignments_count += 1; - continue; - } + if (to_move.empty()) { + return; + } - auto constraints = get_partition_constraints( - *assignment, - partition_size.value(), - _config.hard_max_disk_usage_ratio, - rrs); + if (available_racks.size() <= cur_racks.size()) { + // Can't repair the constraint if we don't have an available + // rack to place a replica there. + return; + } - auto new_allocation_units = get_reallocation( - ntp, - *assignment, - partition_size.value(), - std::move(constraints), - stable_replicas, - rrs); - if (new_allocation_units) { - result.add_reassignment( - ntp, - orig_replicas, - std::move(new_allocation_units.value()), - "rack constraint repair"); - } else { - result.failed_reassignments_count += 1; - } + part.match_variant( + [&](reassignable_partition& part) { + for (const auto& bs : to_move) { + if (part.is_original(bs)) { + // only move replicas that haven't been moved for + // other reasons + (void)part.move_replica( + bs.node_id, + ctx.config().hard_max_disk_usage_ratio, + "rack constraint repair"); + } + } + }, + [](immutable_partition& part) { + part.report_failure("rack constraint repair"); + }, + [](moving_partition&) {}); + }); } } @@ -485,15 +872,16 @@ void partition_balancer_planner::get_rack_constraint_repair_reassignments( * are selected in ascending order of their size. * * If more than one replica in a group is on a node violating disk usage - * constraints, we try to reallocate all such replicas. But if a reallocation - * request fails, we retry while leaving some of these replicas intact. + * constraints, we try to reallocate all such replicas. Some of reallocation + * requests can fail, we just move those replicas that we can. */ -void partition_balancer_planner::get_full_node_reassignments( - plan_data& result, reallocation_request_state& rrs) { +void partition_balancer_planner::get_full_node_actions(request_context& ctx) { std::vector sorted_full_nodes; - for (const auto& kv : rrs.node_disk_reports) { + for (const auto& kv : ctx.node_disk_reports) { const auto* node_disk = &kv.second; - if (node_disk->final_used_ratio() > _config.soft_max_disk_usage_ratio) { + if ( + node_disk->final_used_ratio() + > ctx.config().soft_max_disk_usage_ratio) { sorted_full_nodes.push_back(node_disk); } } @@ -508,212 +896,150 @@ void partition_balancer_planner::get_full_node_reassignments( return; } - absl::flat_hash_map> ntp_on_nodes; - for (const auto& t : _state.topics().topics_map()) { - for (const auto& a : t.second.get_assignments()) { - for (const auto& r : a.replicas) { - ntp_on_nodes[r.node_id].emplace_back( - t.first.ns, t.first.tp, a.id); - } + auto find_full_node = [&](model::node_id id) -> const node_disk_space* { + auto it = ctx.node_disk_reports.find(id); + if (it == ctx.node_disk_reports.end()) { + return nullptr; + } else if ( + it->second.final_used_ratio() + > ctx.config().soft_max_disk_usage_ratio) { + return &it->second; + } else { + return nullptr; } - } + }; + // build an index of move candidates: full node -> movement priority -> ntp + absl:: + flat_hash_map> + full_node2priority2ntp; + ctx.for_each_partition([&](partition& part) { + part.match_variant( + [&](reassignable_partition& part) { + std::vector replicas_on_full_nodes; + for (const auto& bs : part.replicas()) { + if (part.is_original(bs) && find_full_node(bs.node_id)) { + replicas_on_full_nodes.push_back(bs.node_id); + } + } + + for (model::node_id node_id : replicas_on_full_nodes) { + full_node2priority2ntp[node_id].emplace( + part.size_bytes(), part.ntp()); + } + }, + [](auto&) {}); + + return ss::stop_iteration::no; + }); + + // move partitions, starting from partitions with replicas on the most full + // node for (const auto* node_disk : sorted_full_nodes) { - if (rrs.planned_moves_size >= _config.movement_disk_size_batch) { + if (ctx.is_batch_full()) { return; } - absl::btree_multimap ntp_on_node_sizes; - for (const auto& ntp : ntp_on_nodes[node_disk->node_id]) { - auto partition_size_opt = get_partition_size(ntp, rrs); - if (partition_size_opt.has_value()) { - ntp_on_node_sizes.emplace(partition_size_opt.value(), ntp); - } else { - result.failed_reassignments_count += 1; - } + auto ntp_index_it = full_node2priority2ntp.find(node_disk->node_id); + if (ntp_index_it == full_node2priority2ntp.end()) { + // no eligible partitions, skip node + continue; } - auto ntp_size_it = ntp_on_node_sizes.begin(); - while (node_disk->final_used_ratio() > _config.soft_max_disk_usage_ratio - && ntp_size_it != ntp_on_node_sizes.end()) { - if (rrs.planned_moves_size >= _config.movement_disk_size_batch) { + for (const auto& [score, ntp_to_move] : ntp_index_it->second) { + if (ctx.is_batch_full()) { return; } - - const auto& partition_to_move = ntp_size_it->second; - if (rrs.moving_partitions.contains(partition_to_move)) { - ntp_size_it++; - continue; - } - - const auto& topic_metadata = _state.topics().topics_map().at( - model::topic_namespace_view(partition_to_move)); - const auto& current_assignments - = topic_metadata.get_assignments().find( - partition_to_move.tp.partition); - - if (!is_partition_movement_possible( - current_assignments->replicas, rrs)) { - result.failed_reassignments_count += 1; - ntp_size_it++; - continue; - } - - auto constraints = get_partition_constraints( - *current_assignments, - ntp_size_it->first, - _config.soft_max_disk_usage_ratio, - rrs); - - struct full_node_replica { - model::broker_shard bs; - node_disk_space disk; - }; - std::vector full_node_replicas; - std::vector stable_replicas; - - for (const auto& r : current_assignments->replicas) { - if (rrs.timed_out_unavailable_nodes.contains(r.node_id)) { - continue; - } - - auto disk_it = rrs.node_disk_reports.find(r.node_id); - if (disk_it == rrs.node_disk_reports.end()) { - // A replica on a node we recently lost contact with (but - // availability timeout hasn't elapsed yet). Better leave it - // where it is. - stable_replicas.push_back(r); - continue; - } - - const auto& disk = disk_it->second; - if ( - disk.final_used_ratio() < _config.soft_max_disk_usage_ratio) { - stable_replicas.push_back(r); - } else { - full_node_replicas.push_back(full_node_replica{ - .bs = r, - .disk = disk, - }); - } - } - - // We start with a small set of stable replicas that are on "good" - // nodes and try to find a reallocation. If that fails, we add one - // replica from the set of full_node_replicas (starting from the - // least full) to stable replicas and retry until we get a valid - // reallocation. - std::sort( - full_node_replicas.begin(), - full_node_replicas.end(), - [](const auto& lhs, const auto& rhs) { - return lhs.disk.final_used_ratio() - < rhs.disk.final_used_ratio(); - }); - - bool success = false; - for (const auto& replica : full_node_replicas) { - auto new_allocation_units = get_reallocation( - partition_to_move, - *current_assignments, - ntp_size_it->first, - constraints, - stable_replicas, - rrs); - - if (new_allocation_units) { - result.add_reassignment( - partition_to_move, - current_assignments->replicas, - std::move(new_allocation_units.value()), - "full nodes"); - success = true; - break; - } else { - stable_replicas.push_back(replica.bs); - } - } - if (!success) { - result.failed_reassignments_count += 1; + if ( + node_disk->final_used_ratio() + < ctx.config().soft_max_disk_usage_ratio) { + break; } - ntp_size_it++; + ctx.with_partition(ntp_to_move, [&](partition& part) { + part.match_variant( + [&](reassignable_partition& part) { + struct full_node_replica { + model::node_id node_id; + double final_used_ratio; + }; + std::vector full_node_replicas; + + for (const auto& r : part.replicas()) { + if ( + ctx.timed_out_unavailable_nodes.contains(r.node_id) + || !part.is_original(r)) { + continue; + } + + const auto* full_node = find_full_node(r.node_id); + if (full_node) { + full_node_replicas.push_back(full_node_replica{ + .node_id = r.node_id, + .final_used_ratio + = full_node->final_used_ratio()}); + } + } + + // Try to reallocate replicas starting from the most full + // node + std::sort( + full_node_replicas.begin(), + full_node_replicas.end(), + [](const auto& lhs, const auto& rhs) { + return lhs.final_used_ratio > rhs.final_used_ratio; + }); + + for (const auto& replica : full_node_replicas) { + (void)part.move_replica( + replica.node_id, + ctx.config().soft_max_disk_usage_ratio, + "full_nodes"); + } + }, + [](auto&) {}); + }); } } } -/* - * Cancel movement if new assignments contains unavailble node - * and previous replica set doesn't contain this node - */ -void partition_balancer_planner::get_unavailable_node_movement_cancellations( - plan_data& result, const reallocation_request_state& rrs) { - for (const auto& update : _state.topics().updates_in_progress()) { - if (update.second.get_state() != reconfiguration_state::in_progress) { - continue; - } +void partition_balancer_planner::request_context::collect_actions( + partition_balancer_planner::plan_data& result) { + result.reassignments.reserve(_reassignments.size()); + for (auto& [ntp, reallocated] : _reassignments) { + result.reassignments.push_back( + ntp_reassignment{.ntp = ntp, .allocated = std::move(reallocated)}); + } - absl::flat_hash_set previous_replicas_set; - bool was_on_decommissioning_node = false; - for (const auto& r : update.second.get_previous_replicas()) { - previous_replicas_set.insert(r.node_id); - if (rrs.decommissioning_nodes.contains(r.node_id)) { - was_on_decommissioning_node = true; - } - } + result.failed_actions_count = _failed_actions_count; - auto current_assignments = _state.topics().get_partition_assignment( - update.first); - if (!current_assignments.has_value()) { - continue; - } - for (const auto& r : current_assignments->replicas) { - if ( - rrs.timed_out_unavailable_nodes.contains(r.node_id) - && !previous_replicas_set.contains(r.node_id)) { - if (!was_on_decommissioning_node) { - vlog( - clusterlog.info, - "ntp: {}, cancelling move {} -> {}", - update.first, - update.second.get_previous_replicas(), - current_assignments->replicas); - - result.cancellations.push_back(update.first); - } else { - result.failed_reassignments_count += 1; - } - break; - } - } + result.cancellations.reserve(_cancellations.size()); + std::move( + _cancellations.begin(), + _cancellations.end(), + std::back_inserter(result.cancellations)); + + if (!result.cancellations.empty() || !result.reassignments.empty()) { + result.status = status::actions_planned; } } -partition_balancer_planner::plan_data -partition_balancer_planner::plan_reassignments( +partition_balancer_planner::plan_data partition_balancer_planner::plan_actions( const cluster_health_report& health_report, const std::vector& follower_metrics) { - reallocation_request_state rrs; + request_context ctx(*this); plan_data result; - init_per_node_state(health_report, follower_metrics, rrs, result); + init_per_node_state(health_report, follower_metrics, ctx, result); - if (rrs.num_nodes_in_maintenance > 0) { + if (ctx.num_nodes_in_maintenance > 0) { if (!result.violations.is_empty()) { result.status = status::waiting_for_maintenance_end; } return result; } - if (_state.topics().has_updates_in_progress()) { - get_unavailable_node_movement_cancellations(result, rrs); - if (!result.cancellations.empty()) { - result.status = status::cancellations_planned; - } - return result; - } - - if (!all_reports_received(rrs)) { + if (!ctx.all_reports_received()) { result.status = status::waiting_for_reports; return result; } @@ -725,16 +1051,13 @@ partition_balancer_planner::plan_reassignments( return result; } - init_ntp_sizes_from_health_report(health_report, rrs); - - get_unavailable_nodes_reassignments(result, rrs); - get_rack_constraint_repair_reassignments(result, rrs); - get_full_node_reassignments(result, rrs); + init_ntp_sizes_from_health_report(health_report, ctx); - if (!result.reassignments.empty()) { - result.status = status::movement_planned; - } + get_unavailable_nodes_actions(ctx); + get_rack_constraint_repair_actions(ctx); + get_full_node_actions(ctx); + ctx.collect_actions(result); return result; } diff --git a/src/v/cluster/partition_balancer_planner.h b/src/v/cluster/partition_balancer_planner.h index a3284117560c..f87bead24234 100644 --- a/src/v/cluster/partition_balancer_planner.h +++ b/src/v/cluster/partition_balancer_planner.h @@ -47,8 +47,7 @@ class partition_balancer_planner { enum class status { empty, - movement_planned, - cancellations_planned, + actions_planned, waiting_for_maintenance_end, waiting_for_reports, }; @@ -57,77 +56,32 @@ class partition_balancer_planner { partition_balancer_violations violations; std::vector reassignments; std::vector cancellations; - size_t failed_reassignments_count = 0; + size_t failed_actions_count = 0; status status = status::empty; - - void add_reassignment( - model::ntp, - const std::vector& orig_replicas, - allocated_partition, - std::string_view reason); }; - plan_data plan_reassignments( + plan_data plan_actions( const cluster_health_report&, const std::vector&); private: - struct reallocation_request_state { - std::vector all_nodes; - absl::flat_hash_set all_unavailable_nodes; - absl::flat_hash_set timed_out_unavailable_nodes; - size_t num_nodes_in_maintenance = 0; - absl::flat_hash_set decommissioning_nodes; - absl::flat_hash_map node_disk_reports; - - absl::flat_hash_map ntp_sizes; - - // Partitions that are planned to move in current planner request - absl::flat_hash_set moving_partitions; - uint64_t planned_moves_size = 0; - }; - - partition_constraints get_partition_constraints( - const partition_assignment& assignments, - size_t partition_size, - double max_disk_usage_ratio, - reallocation_request_state&) const; - - result get_reallocation( - const model::ntp&, - const partition_assignment&, - size_t partition_size, - partition_constraints, - const std::vector& stable_replicas, - reallocation_request_state&); - - void get_unavailable_nodes_reassignments( - plan_data&, reallocation_request_state&); - - void get_rack_constraint_repair_reassignments( - plan_data&, reallocation_request_state&); - - void get_full_node_reassignments(plan_data&, reallocation_request_state&); + class request_context; + class partition; + class reassignable_partition; + class moving_partition; + class immutable_partition; void init_per_node_state( const cluster_health_report&, const std::vector&, - reallocation_request_state&, + request_context&, plan_data&) const; - void get_unavailable_node_movement_cancellations( - plan_data&, const reallocation_request_state&); - - bool is_partition_movement_possible( - const std::vector& current_replicas, - const reallocation_request_state&); - void init_ntp_sizes_from_health_report( - const cluster_health_report& health_report, reallocation_request_state&); - - std::optional get_partition_size( - const model::ntp& ntp, const reallocation_request_state&); + const cluster_health_report& health_report, request_context&); - bool all_reports_received(const reallocation_request_state&); + static void get_unavailable_nodes_actions(request_context&); + static void get_rack_constraint_repair_actions(request_context&); + static void get_full_node_actions(request_context&); planner_config _config; partition_balancer_state& _state; diff --git a/src/v/cluster/scheduling/types.cc b/src/v/cluster/scheduling/types.cc index f70970684ca0..96d3b88b7f7c 100644 --- a/src/v/cluster/scheduling/types.cc +++ b/src/v/cluster/scheduling/types.cc @@ -208,6 +208,29 @@ bool allocated_partition::has_changes() const { return false; } +bool allocated_partition::has_node_changes() const { + if (!_original) { + return false; + } + if (_replicas.size() != _original->size()) { + return true; + } + + absl::flat_hash_set original; + original.reserve(_original->size()); + for (const auto& bs : *_original) { + original.insert(bs.node_id); + } + + absl::flat_hash_set current; + current.reserve(_replicas.size()); + for (const auto& bs : _replicas) { + current.insert(bs.node_id); + } + + return original != current; +} + bool allocated_partition::is_original( const model::broker_shard& replica) const { if (_original) { diff --git a/src/v/cluster/scheduling/types.h b/src/v/cluster/scheduling/types.h index 87c894393659..7f76442bf4a1 100644 --- a/src/v/cluster/scheduling/types.h +++ b/src/v/cluster/scheduling/types.h @@ -218,6 +218,8 @@ class allocated_partition { } bool has_changes() const; + // true if the set of nodes differs from the original + bool has_node_changes() const; bool is_original(const model::broker_shard&) const; diff --git a/src/v/cluster/tests/CMakeLists.txt b/src/v/cluster/tests/CMakeLists.txt index 17e5cd158778..5706aa5abbb1 100644 --- a/src/v/cluster/tests/CMakeLists.txt +++ b/src/v/cluster/tests/CMakeLists.txt @@ -110,3 +110,11 @@ rp_test( LIBRARIES Boost::unit_test_framework v::cluster LABELS cluster ) + +rp_test( + BENCHMARK_TEST + BINARY_NAME partition_balancer + SOURCES partition_balancer_bench.cc + LIBRARIES Seastar::seastar_perf_testing v::seastar_testing_main v::cluster + LABELS cluster +) diff --git a/src/v/cluster/tests/partition_balancer_bench.cc b/src/v/cluster/tests/partition_balancer_bench.cc new file mode 100644 index 000000000000..9d244541351f --- /dev/null +++ b/src/v/cluster/tests/partition_balancer_bench.cc @@ -0,0 +1,43 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "cluster/tests/partition_balancer_planner_fixture.h" + +#include + +PERF_TEST_C(partition_balancer_planner_fixture, unavailable_nodes) { + static bool initialized = false; + if (!initialized) { + ss::thread_attributes thread_attr; + co_await ss::async(thread_attr, [this] { + allocator_register_nodes(3); + create_topic("topic-1", 2000, 3); + allocator_register_nodes(2); + }); + + initialized = true; + } + + uint64_t local_partition_size = 10_KiB; + uint64_t movement_batch_partitions_amount = (reallocation_batch_size + + local_partition_size - 1) + / local_partition_size; + + auto hr = create_health_report({}, {}, local_partition_size); + + std::set unavailable_nodes = {0}; + auto fm = create_follower_metrics(unavailable_nodes); + + perf_tests::start_measuring_time(); + auto plan_data = planner.plan_actions(hr, fm); + perf_tests::stop_measuring_time(); + + const auto& reassignments = plan_data.reassignments; + BOOST_REQUIRE_EQUAL(reassignments.size(), movement_batch_partitions_amount); +} diff --git a/src/v/cluster/tests/partition_balancer_planner_test.cc b/src/v/cluster/tests/partition_balancer_planner_test.cc index 4177e7b9497d..a244257a688b 100644 --- a/src/v/cluster/tests/partition_balancer_planner_test.cc +++ b/src/v/cluster/tests/partition_balancer_planner_test.cc @@ -71,7 +71,7 @@ FIXTURE_TEST(test_stable, partition_balancer_planner_fixture) { auto hr = create_health_report(); auto fm = create_follower_metrics(); - auto plan_data = planner.plan_reassignments(hr, fm); + auto plan_data = planner.plan_actions(hr, fm); check_violations(plan_data, {}, {}); BOOST_REQUIRE_EQUAL(plan_data.reassignments.size(), 0); } @@ -100,7 +100,7 @@ FIXTURE_TEST(test_node_down, partition_balancer_planner_fixture) { std::set unavailable_nodes = {0}; auto fm = create_follower_metrics(unavailable_nodes); - auto plan_data = planner.plan_reassignments(hr, fm); + auto plan_data = planner.plan_actions(hr, fm); check_violations(plan_data, unavailable_nodes, {}); @@ -137,7 +137,7 @@ FIXTURE_TEST(test_no_quorum_for_partition, partition_balancer_planner_fixture) { std::set unavailable_nodes = {0, 1}; auto fm = create_follower_metrics(unavailable_nodes); - auto plan_data = planner.plan_reassignments(hr, fm); + auto plan_data = planner.plan_actions(hr, fm); BOOST_REQUIRE_EQUAL(plan_data.reassignments.size(), 0); } @@ -170,7 +170,7 @@ FIXTURE_TEST( std::set unavailable_nodes = {0}; auto fm = create_follower_metrics(unavailable_nodes); - auto plan_data = planner.plan_reassignments(hr, fm); + auto plan_data = planner.plan_actions(hr, fm); check_violations(plan_data, unavailable_nodes, {}); @@ -208,7 +208,7 @@ FIXTURE_TEST( std::set unavailable_nodes = {0}; auto fm = create_follower_metrics(unavailable_nodes); - auto plan_data = planner.plan_reassignments(hr, fm); + auto plan_data = planner.plan_actions(hr, fm); check_violations(plan_data, unavailable_nodes, {}); @@ -246,7 +246,7 @@ FIXTURE_TEST( std::set unavailable_nodes = {0}; auto fm = create_follower_metrics(unavailable_nodes); - auto plan_data = planner.plan_reassignments(hr, fm); + auto plan_data = planner.plan_actions(hr, fm); check_violations(plan_data, unavailable_nodes, full_nodes); @@ -277,7 +277,7 @@ FIXTURE_TEST(test_move_from_full_node, partition_balancer_planner_fixture) { auto fm = create_follower_metrics(); - auto plan_data = planner.plan_reassignments(hr, fm); + auto plan_data = planner.plan_actions(hr, fm); check_violations(plan_data, {}, full_nodes); @@ -317,7 +317,7 @@ FIXTURE_TEST( std::set unavailable_nodes = {0}; auto fm = create_follower_metrics(unavailable_nodes); - auto plan_data = planner.plan_reassignments(hr, fm); + auto plan_data = planner.plan_actions(hr, fm); check_violations(plan_data, unavailable_nodes, {}); @@ -364,7 +364,7 @@ FIXTURE_TEST( auto hr = create_health_report(full_nodes); auto fm = create_follower_metrics(); - auto plan_data = planner.plan_reassignments(hr, fm); + auto plan_data = planner.plan_actions(hr, fm); check_violations(plan_data, {}, full_nodes); const auto& reassignments = plan_data.reassignments; @@ -383,7 +383,7 @@ FIXTURE_TEST( /* * 5 nodes; 1 topic; 1 node down; 1 node full; - * Move from unavailable node leaves a replica on the full node. + * One replica is moved from unavailable node, another from the full one. * Actual * node_0: partitions: 1; down: True; disk: unfilled; * node_1: partitions: 1; down: False; disk: full; @@ -410,7 +410,7 @@ FIXTURE_TEST( std::set unavailable_nodes = {0}; auto fm = create_follower_metrics(unavailable_nodes); - auto plan_data = planner.plan_reassignments(hr, fm); + auto plan_data = planner.plan_actions(hr, fm); check_violations(plan_data, unavailable_nodes, full_nodes); const auto& reassignments = plan_data.reassignments; @@ -422,12 +422,10 @@ FIXTURE_TEST( new_replicas_set.insert(bs.node_id); } - std::unordered_set expected1{ - model::node_id(1), model::node_id(2), model::node_id(3)}; - std::unordered_set expected2{ - model::node_id(1), model::node_id(2), model::node_id(4)}; + std::unordered_set expected{ + model::node_id(2), model::node_id(3), model::node_id(4)}; BOOST_REQUIRE_MESSAGE( - new_replicas_set == expected1 || new_replicas_set == expected2, + new_replicas_set == expected, "unexpected new replica set: " << new_replicas); } @@ -462,7 +460,7 @@ FIXTURE_TEST(test_move_part_of_replicas, partition_balancer_planner_fixture) { hr.node_reports[1].local_state.data_disk.free -= 1_MiB; hr.node_reports[2].local_state.data_disk.free -= 2_MiB; - auto plan_data = planner.plan_reassignments(hr, fm); + auto plan_data = planner.plan_actions(hr, fm); check_violations(plan_data, {}, full_nodes); @@ -519,25 +517,23 @@ FIXTURE_TEST( } } - auto plan_data = planner.plan_reassignments(hr, fm); + auto plan_data = planner.plan_actions(hr, fm); check_violations(plan_data, {}, full_nodes); const auto& reassignments = plan_data.reassignments; BOOST_REQUIRE_EQUAL(plan_data.reassignments.size(), 2); - std::unordered_set expected_nodes({model::node_id(2)}); - - auto new_replicas_1 = reassignments[0].allocated.replicas(); - - check_expected_assignments(new_replicas_1, expected_nodes); - // First move less size node - BOOST_REQUIRE_EQUAL(reassignments[0].ntp.tp.topic, "topic-1"); - BOOST_REQUIRE_EQUAL(reassignments[0].ntp.tp.partition, 2); - auto new_replicas_2 = reassignments[1].allocated.replicas(); - check_expected_assignments(new_replicas_2, expected_nodes); - BOOST_REQUIRE_EQUAL(reassignments[1].ntp.tp.topic, "topic-1"); - BOOST_REQUIRE_EQUAL(reassignments[1].ntp.tp.partition, 1); + std::unordered_set expected_nodes({model::node_id(2)}); + for (const auto& reassignment : reassignments) { + BOOST_REQUIRE_EQUAL(reassignment.ntp.tp.topic, "topic-1"); + check_expected_assignments( + reassignment.allocated.replicas(), expected_nodes); + auto partition = reassignment.ntp.tp.partition; + BOOST_REQUIRE_MESSAGE( + partition == 1 || partition == 2, + "unexpected partition: " << partition); + } } /* @@ -574,7 +570,7 @@ FIXTURE_TEST(test_lot_of_partitions, partition_balancer_planner_fixture) { std::set unavailable_nodes = {0}; auto fm = create_follower_metrics(unavailable_nodes); - auto plan_data = planner.plan_reassignments(hr, fm); + auto plan_data = planner.plan_actions(hr, fm); check_violations(plan_data, unavailable_nodes, {}); const auto& reassignments = plan_data.reassignments; @@ -632,7 +628,7 @@ FIXTURE_TEST(test_node_cancelation, partition_balancer_planner_fixture) { std::set unavailable_nodes = {0}; auto fm = create_follower_metrics(unavailable_nodes); - auto planner_result = planner.plan_reassignments(hr, fm); + auto planner_result = planner.plan_actions(hr, fm); BOOST_REQUIRE_EQUAL(planner_result.reassignments.size(), 1); @@ -654,7 +650,7 @@ FIXTURE_TEST(test_node_cancelation, partition_balancer_planner_fixture) { unavailable_nodes = {0, 3}; fm = create_follower_metrics(unavailable_nodes); - planner_result = planner.plan_reassignments(hr, fm); + planner_result = planner.plan_actions(hr, fm); BOOST_REQUIRE(planner_result.reassignments.size() == 0); BOOST_REQUIRE(planner_result.cancellations.size() == 1); BOOST_REQUIRE(planner_result.cancellations.front() == ntp); @@ -692,7 +688,7 @@ FIXTURE_TEST(test_rack_awareness, partition_balancer_planner_fixture) { std::set unavailable_nodes = {0}; auto fm = create_follower_metrics(unavailable_nodes); - auto plan_data = planner.plan_reassignments(hr, fm); + auto plan_data = planner.plan_actions(hr, fm); check_violations(plan_data, unavailable_nodes, {}); @@ -726,7 +722,7 @@ FIXTURE_TEST( set_maintenance_mode(model::node_id{3}); - auto plan_data = planner.plan_reassignments(hr, fm); + auto plan_data = planner.plan_actions(hr, fm); check_violations(plan_data, unavailable_nodes, {}); BOOST_REQUIRE_EQUAL(plan_data.reassignments.size(), 0); @@ -755,11 +751,11 @@ FIXTURE_TEST( auto fm = create_follower_metrics(unavailable_nodes); set_decommissioning(model::node_id{3}); - auto plan_data = planner.plan_reassignments(hr, fm); + auto plan_data = planner.plan_actions(hr, fm); check_violations(plan_data, unavailable_nodes, {}); BOOST_REQUIRE_EQUAL(plan_data.reassignments.size(), 0); - BOOST_REQUIRE_EQUAL(plan_data.failed_reassignments_count, 1); + BOOST_REQUIRE_EQUAL(plan_data.failed_actions_count, 1); } /* @@ -790,12 +786,12 @@ FIXTURE_TEST( model::broker_shard{model::node_id{3}, 0}, }); - auto plan_data = planner.plan_reassignments(hr, fm); + auto plan_data = planner.plan_actions(hr, fm); check_violations(plan_data, unavailable_nodes, {}); BOOST_REQUIRE_EQUAL(plan_data.reassignments.size(), 0); BOOST_REQUIRE_EQUAL(plan_data.cancellations.size(), 0); - BOOST_REQUIRE_EQUAL(plan_data.failed_reassignments_count, 1); + BOOST_REQUIRE_EQUAL(plan_data.failed_actions_count, 1); } FIXTURE_TEST( @@ -855,7 +851,7 @@ FIXTURE_TEST(test_rack_awareness_repair, partition_balancer_planner_fixture) { auto hr = create_health_report(); auto fm = create_follower_metrics({}); - auto plan_data = planner.plan_reassignments(hr, fm); + auto plan_data = planner.plan_actions(hr, fm); check_violations(plan_data, {}, {}); BOOST_REQUIRE_EQUAL(plan_data.reassignments.size(), 2); @@ -871,5 +867,5 @@ FIXTURE_TEST(test_rack_awareness_repair, partition_balancer_planner_fixture) { BOOST_REQUIRE_EQUAL(racks.size(), 3); } BOOST_REQUIRE_EQUAL(plan_data.cancellations.size(), 0); - BOOST_REQUIRE_EQUAL(plan_data.failed_reassignments_count, 0); + BOOST_REQUIRE_EQUAL(plan_data.failed_actions_count, 0); } diff --git a/tests/rptest/tests/partition_balancer_test.py b/tests/rptest/tests/partition_balancer_test.py index 288e7ce3374b..0997e799595b 100644 --- a/tests/rptest/tests/partition_balancer_test.py +++ b/tests/rptest/tests/partition_balancer_test.py @@ -347,6 +347,7 @@ def node_removed(): self.run_validation(min_records=100, consumer_timeout_sec=CONSUMER_TIMEOUT) + @skip_debug_mode @cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST) def test_unavailable_nodes(self): self.start_redpanda(num_nodes=5) @@ -396,6 +397,7 @@ def _throttle_recovery(self, new_value): self.redpanda.set_cluster_config( {"raft_learner_recovery_rate": str(new_value)}) + @skip_debug_mode @cluster(num_nodes=6, log_allow_list=CHAOS_LOG_ALLOW_LIST) def test_movement_cancellations(self): self.start_redpanda(num_nodes=4) @@ -486,6 +488,7 @@ def test_rack_awareness(self): ns.make_available() self.run_validation(consumer_timeout_sec=CONSUMER_TIMEOUT) + @skip_debug_mode @cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST) def test_rack_constraint_repair(self): """ @@ -555,6 +558,7 @@ def num_with_broken_rack_constraint() -> int: self.run_validation(consumer_timeout_sec=CONSUMER_TIMEOUT) assert num_with_broken_rack_constraint() == 0 + @skip_debug_mode @cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST + RACE_BETWEEN_DELETION_AND_ADDING_PARTITION) @@ -731,6 +735,7 @@ def is_ready_and_stable(s): # and partition balancing is not invoked yet assert used_ratio < 0.81 + @skip_debug_mode @cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST) @matrix(kill_same_node=[True, False]) def test_maintenance_mode(self, kill_same_node): @@ -810,6 +815,7 @@ def entered_maintenance_mode(node): self.run_validation(enable_idempotence=False, consumer_timeout_sec=CONSUMER_TIMEOUT) + @skip_debug_mode @cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST + STARTUP_SEQUENCE_ABORTED) @matrix(kill_same_node=[True, False], decommission_first=[True, False]) @@ -926,6 +932,7 @@ def node_removed(): self.run_validation(enable_idempotence=False, consumer_timeout_sec=CONSUMER_TIMEOUT) + @skip_debug_mode @cluster(num_nodes=4, log_allow_list=CHAOS_LOG_ALLOW_LIST) def test_transfer_controller_leadership(self): """