Skip to content

Commit

Permalink
c/leader_balancer: mute just restarted nodes
Browse files Browse the repository at this point in the history
Just restarted nodes may have their health reports incomplete because
not all partitions have started yet. Also right after restart the node
is probably busy catching up and replicating data that was produced in
its absense. Because of these two reasons just restarted nodes are bad
candidates for leadership transfers, mute them.
  • Loading branch information
ztlpn committed Jun 11, 2024
1 parent 0ca0de7 commit 9042537
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 13 deletions.
69 changes: 58 additions & 11 deletions src/v/cluster/scheduling/leader_balancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,20 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
_need_controller_refresh = false;
}

auto group_replicas = co_await collect_group_replicas_from_health_report();
auto health_report = co_await _health_monitor.get_cluster_health(
cluster_report_filter{},
force_refresh::no,
model::timeout_clock::now() + 5s);
if (!health_report) {
vlog(
clusterlog.warn,
"couldn't get health report: {}",
health_report.error());
co_return ss::stop_iteration::no;
}

auto group_replicas = co_await collect_group_replicas_from_health_report(
health_report.value());
auto index = build_index(std::move(group_replicas));
auto group_id_to_topic = build_group_id_to_topic_rev();

Expand All @@ -412,6 +425,8 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
co_return ss::stop_iteration::no;
}

auto muted_nodes = collect_muted_nodes(health_report.value());

auto mode = config::shard_local_cfg().leader_balancer_mode();
std::unique_ptr<leader_balancer_strategy> strategy;

Expand All @@ -422,12 +437,12 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
leader_balancer_types::random_hill_climbing_strategy>(
std::move(index),
std::move(group_id_to_topic),
leader_balancer_types::muted_index{muted_nodes(), {}});
leader_balancer_types::muted_index{std::move(muted_nodes), {}});
break;
case model::leader_balancer_mode::greedy_balanced_shards:
vlog(clusterlog.debug, "using greedy_balanced_shards");
strategy = std::make_unique<greedy_balanced_shards>(
std::move(index), muted_nodes());
std::move(index), std::move(muted_nodes));
break;
default:
vlog(clusterlog.error, "unexpected mode value: {}", mode);
Expand Down Expand Up @@ -564,7 +579,8 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
co_return ss::stop_iteration::no;
}

absl::flat_hash_set<model::node_id> leader_balancer::muted_nodes() const {
absl::flat_hash_set<model::node_id>
leader_balancer::collect_muted_nodes(const cluster_health_report& hr) {
absl::flat_hash_set<model::node_id> nodes;
const auto now = raft::clock_type::now();
for (const auto& follower : _raft0->get_follower_metrics()) {
Expand All @@ -578,6 +594,7 @@ absl::flat_hash_set<model::node_id> leader_balancer::muted_nodes() const {
std::chrono::duration_cast<std::chrono::milliseconds>(
last_hbeat_age)
.count());
continue;
}

if (auto nm = _members.get_node_metadata_ref(follower.id); nm) {
Expand All @@ -590,8 +607,42 @@ absl::flat_hash_set<model::node_id> leader_balancer::muted_nodes() const {
"Leadership rebalancer muting node {} in a maintenance "
"state.",
follower.id);
continue;
}
}

auto report_it = std::find_if(
hr.node_reports.begin(),
hr.node_reports.end(),
[id = follower.id](const node_health_report_ptr& n) {
return n->id == id;
});
if (report_it == hr.node_reports.end()) {
nodes.insert(follower.id);
vlog(
clusterlog.info,
"Leadership rebalancer muting node {} without a health report.",
follower.id);
continue;
}

auto uptime = (*report_it)->local_state.uptime;
if (uptime < leader_activation_delay) {
nodes.insert(follower.id);
vlog(
clusterlog.info,
"Leadership rebalancer muting node {} that "
"just restarted ({}s. ago)",
follower.id,
uptime / 1s);

// schedule a tick soon so that we can rebalance to the restarted
// node.
_timer.cancel();
_timer.arm(leader_activation_delay);

continue;
}
}
return nodes;
}
Expand Down Expand Up @@ -632,20 +683,16 @@ leader_balancer::build_group_id_to_topic_rev() const {
/// Returns nullopt if shard info from health report can not yet be used. In
/// this case callers have to rely on shard info from topic table.
ss::future<std::optional<leader_balancer::group_replicas_t>>
leader_balancer::collect_group_replicas_from_health_report() {
leader_balancer::collect_group_replicas_from_health_report(
const cluster_health_report& hr) {
if (!_feature_table.is_active(
features::feature::partition_shard_in_health_report)) {
co_return std::nullopt;
}

auto hr = co_await _health_monitor.get_cluster_health(
cluster_report_filter{},
force_refresh::no,
model::timeout_clock::now() + 5s);

group_replicas_t group_replicas;
ssx::async_counter counter;
for (const auto& node : hr.value().node_reports) {
for (const auto& node : hr.node_reports) {
for (const auto& topic : node->topics) {
auto maybe_meta = _topics.get_topic_metadata_ref(topic.tp_ns);
if (!maybe_meta) {
Expand Down
7 changes: 5 additions & 2 deletions src/v/cluster/scheduling/leader_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#pragma once
#include "absl/container/flat_hash_map.h"
#include "base/seastarx.h"
#include "cluster/health_monitor_types.h"
#include "cluster/partition_manager.h"
#include "cluster/scheduling/leader_balancer_probe.h"
#include "cluster/scheduling/leader_balancer_strategy.h"
Expand Down Expand Up @@ -105,12 +106,14 @@ class leader_balancer {

using group_replicas_t = absl::btree_map<raft::group_id, replicas_t>;
ss::future<std::optional<group_replicas_t>>
collect_group_replicas_from_health_report();
collect_group_replicas_from_health_report(const cluster_health_report&);
leader_balancer_types::group_id_to_topic_revision_t
build_group_id_to_topic_rev() const;
index_type build_index(std::optional<group_replicas_t>);
absl::flat_hash_set<model::node_id>
collect_muted_nodes(const cluster_health_report&);

leader_balancer_types::muted_groups_t muted_groups() const;
absl::flat_hash_set<model::node_id> muted_nodes() const;

ss::future<bool> do_transfer(reassignment);
ss::future<bool> do_transfer_local(reassignment) const;
Expand Down

0 comments on commit 9042537

Please sign in to comment.