Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow partition balancing in presence of moving partitions #10724

Merged
merged 15 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions src/v/cluster/partition_balancer_planner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "ssx/sformat.h"

#include <seastar/core/sstring.hh>
#include <seastar/util/defer.hh>

#include <optional>

Expand Down Expand Up @@ -86,6 +87,11 @@ class partition_balancer_planner::request_context {
return planned_moves_size >= _parent._config.movement_disk_size_batch;
}

void for_each_partition(
ss::noncopyable_function<ss::stop_iteration(partition&)>);
void with_partition(
const model::ntp&, ss::noncopyable_function<void(partition&)>);

bool is_partition_movement_possible(
const std::vector<model::broker_shard>& current_replicas) const;

Expand All @@ -100,6 +106,12 @@ class partition_balancer_planner::request_context {

bool all_reports_received() const;

template<typename Visitor>
auto do_with_partition(
const model::ntp& ntp,
const std::vector<model::broker_shard>& orig_replicas,
Visitor&);

void collect_actions(plan_data&);

private:
Expand Down Expand Up @@ -317,6 +329,148 @@ result<model::broker_shard> partition_balancer_planner::move_replica(
return moved;
}

class partition_balancer_planner::partition {
public:
const model::ntp& ntp() const { return _ntp; }
const std::vector<model::broker_shard>& replicas() const {
return (_reallocated ? _reallocated->replicas() : _orig_replicas);
};

bool is_reassignment_possible() const { return _is_reassignment_possible; }

bool is_original(const model::broker_shard& replica) const {
return !_reallocated || _reallocated->is_original(replica);
}

std::optional<size_t> size_bytes() const { return _size_bytes; }

result<model::broker_shard> move_replica(
model::node_id replica,
double max_disk_usage_ratio,
std::string_view reason);

private:
friend class request_context;

partition(
model::ntp ntp,
std::optional<size_t> size_bytes,
std::optional<allocated_partition> reallocated,
const std::vector<model::broker_shard>& orig_replicas,
request_context& ctx)
: _ntp(std::move(ntp))
, _size_bytes(size_bytes)
, _reallocated(std::move(reallocated))
, _orig_replicas(orig_replicas)
, _is_reassignment_possible(
_size_bytes && ctx.is_partition_movement_possible(orig_replicas))
, _ctx(ctx) {}

bool has_changes() const {
return _reallocated && _reallocated->has_node_changes();
}

private:
model::ntp _ntp;
std::optional<size_t> _size_bytes;
std::optional<allocated_partition> _reallocated;
const std::vector<model::broker_shard>& _orig_replicas;
bool _is_reassignment_possible = false;
request_context& _ctx;
};

template<typename Visitor>
auto partition_balancer_planner::request_context::do_with_partition(
const model::ntp& ntp,
const std::vector<model::broker_shard>& orig_replicas,
Visitor& visitor) {
std::optional<allocated_partition> reallocated;
auto reassignment_it = reassignments.find(ntp);
if (reassignment_it != reassignments.end()) {
// borrow the allocated_partition object
reallocated = std::move(reassignment_it->second);
}

std::optional<size_t> size_bytes;
auto size_it = ntp_sizes.find(ntp);
if (size_it != ntp_sizes.end()) {
size_bytes = size_it->second;
}

auto part = partition{
ntp, size_bytes, std::move(reallocated), orig_replicas, *this};
auto deferred = ss::defer([&] {
// insert or return part._reallocated to reassignments
if (reassignment_it != reassignments.end()) {
reassignment_it->second = std::move(*part._reallocated);
} else if (part._reallocated && part._reallocated->has_node_changes()) {
reassignments.emplace(ntp, std::move(*part._reallocated));
planned_moves_size += part._size_bytes.value();
}
});

return visitor(part);
}

void partition_balancer_planner::request_context::for_each_partition(
mmaslankaprv marked this conversation as resolved.
Show resolved Hide resolved
ss::noncopyable_function<ss::stop_iteration(partition&)> callback) {
for (const auto& t : _parent._state.topics().topics_map()) {
for (const auto& a : t.second.get_assignments()) {
auto ntp = model::ntp(t.first.ns, t.first.tp, a.id);
auto stop = do_with_partition(ntp, a.replicas, callback);
if (stop == ss::stop_iteration::yes) {
return;
}
}
}
}

void partition_balancer_planner::request_context::with_partition(
const model::ntp& ntp, ss::noncopyable_function<void(partition&)> callback) {
auto topic = model::topic_namespace_view(ntp);
auto topic_meta = _parent._state.topics().get_topic_metadata_ref(topic);
if (!topic_meta) {
vlog(clusterlog.warn, "topic {} not found", topic);
return;
}
auto it = topic_meta->get().get_assignments().find(ntp.tp.partition);
if (it == topic_meta->get().get_assignments().end()) {
vlog(
clusterlog.warn,
"partition {} of topic {} not found",
ntp.tp.partition,
topic);
return;
}

do_with_partition(ntp, it->replicas, callback);
}

result<model::broker_shard> partition_balancer_planner::partition::move_replica(
model::node_id replica,
double max_disk_usage_ratio,
std::string_view reason) {
if (!_is_reassignment_possible) {
return errc::invalid_request;
}

if (!_reallocated) {
_reallocated
= _ctx._parent._partition_allocator.make_allocated_partition(
replicas(), get_allocation_domain(_ntp));
}

auto constraints = _ctx.get_allocation_constraints(
_size_bytes.value(), max_disk_usage_ratio);

auto moved = _ctx._parent.move_replica(
_ntp, *_reallocated, *_size_bytes, replica, constraints, reason, _ctx);
if (!moved) {
_ctx.failed_reassignments_count += 1;
}
return moved;
}

/*
* Function is trying to move ntp out of unavailable nodes
* It can move to nodes that are violating soft_max_disk_usage_ratio constraint
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/partition_balancer_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class partition_balancer_planner {

private:
class request_context;
class partition;

void init_per_node_state(
const cluster_health_report&,
Expand Down