Skip to content

Commit

Permalink
Merge pull request #11183 from bharathv/async_pb
Browse files Browse the repository at this point in the history
partition_balancer: futurize ticks to avoid reactor stalls with large number of topics
  • Loading branch information
piyushredpanda authored Jun 13, 2023
2 parents c198125 + 3bd3fb0 commit a867d89
Show file tree
Hide file tree
Showing 16 changed files with 348 additions and 84 deletions.
43 changes: 35 additions & 8 deletions src/v/cluster/partition_balancer_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ namespace cluster {
static constexpr std::chrono::seconds controller_stm_sync_timeout = 10s;
static constexpr std::chrono::seconds add_move_cmd_timeout = 10s;

class balancer_tick_aborted_exception final : public std::runtime_error {
public:
explicit balancer_tick_aborted_exception(const std::string& msg)
: std::runtime_error(msg) {}
};

partition_balancer_backend::partition_balancer_backend(
consensus_ptr raft0,
ss::sharded<controller_stm>& controller_stm,
Expand Down Expand Up @@ -119,6 +125,10 @@ void partition_balancer_backend::on_members_update(
if (
state == model::membership_state::active
|| state == model::membership_state::draining) {
if (_tick_in_progress) {
_tick_in_progress->request_abort_ex(balancer_tick_aborted_exception{
fmt::format("new membership update: {}", state)});
}
maybe_rearm_timer(/*now = */ true);
}
}
Expand All @@ -138,12 +148,21 @@ void partition_balancer_backend::on_topic_table_update() {
}

void partition_balancer_backend::tick() {
ssx::background = ssx::spawn_with_gate_then(_gate, [this] {
return do_tick().finally(
[this] { maybe_rearm_timer(); });
}).handle_exception([](const std::exception_ptr& e) {
vlog(clusterlog.warn, "tick error: {}", e);
});
ssx::background
= ssx::spawn_with_gate_then(
_gate,
[this] {
return do_tick().finally([this] {
_tick_in_progress = {};
maybe_rearm_timer();
});
})
.handle_exception_type([](balancer_tick_aborted_exception& e) {
vlog(clusterlog.info, "tick aborted, reason: {}", e.what());
})
.handle_exception([](const std::exception_ptr& e) {
vlog(clusterlog.warn, "tick error: {}", e);
});
}

ss::future<> partition_balancer_backend::stop() {
Expand All @@ -152,6 +171,10 @@ ss::future<> partition_balancer_backend::stop() {
_state.members().unregister_members_updated_notification(_member_updates);
_timer.cancel();
_lock.broken();
if (_tick_in_progress) {
_tick_in_progress->request_abort_ex(
balancer_tick_aborted_exception{"shutting down"});
}
return _gate.close();
}

Expand All @@ -176,6 +199,8 @@ ss::future<> partition_balancer_backend::do_tick() {
co_return;
}

_tick_in_progress = ss::abort_source{};

auto health_report = co_await _health_monitor.get_cluster_health(
cluster_report_filter{},
force_refresh::no,
Expand All @@ -200,7 +225,7 @@ ss::future<> partition_balancer_backend::do_tick() {
// status requests by default 700ms
auto const node_responsiveness_timeout = _node_status_interval() * 7;
auto plan_data
= partition_balancer_planner(
= co_await partition_balancer_planner(
planner_config{
.mode = _mode(),
.soft_max_disk_usage_ratio = soft_max_disk_usage_ratio,
Expand All @@ -213,7 +238,7 @@ ss::future<> partition_balancer_backend::do_tick() {
.node_responsiveness_timeout = node_responsiveness_timeout},
_state,
_partition_allocator)
.plan_actions(health_report.value());
.plan_actions(health_report.value(), _tick_in_progress.value());

_last_leader_term = _raft0->term();
_last_tick_time = clock_t::now();
Expand Down Expand Up @@ -252,6 +277,7 @@ ss::future<> partition_balancer_backend::do_tick() {

co_await ss::max_concurrent_for_each(
plan_data.cancellations, 32, [this, current_term](model::ntp& ntp) {
_tick_in_progress->check();
auto f = _topics_frontend.cancel_moving_partition_replicas(
ntp,
model::timeout_clock::now() + add_move_cmd_timeout,
Expand All @@ -272,6 +298,7 @@ ss::future<> partition_balancer_backend::do_tick() {
plan_data.reassignments,
32,
[this, current_term](ntp_reassignment& reassignment) {
_tick_in_progress->check();
auto f = _topics_frontend.move_partition_replicas(
reassignment.ntp,
reassignment.allocated.replicas(),
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/partition_balancer_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class partition_balancer_backend {
ss::timer<clock_t> _timer;
notification_id_type _topic_table_updates;
notification_id_type _member_updates;
std::optional<ss::abort_source> _tick_in_progress;
};

} // namespace cluster
Loading

0 comments on commit a867d89

Please sign in to comment.