Skip to content

Commit

Permalink
Merge pull request redpanda-data#18497 from ztlpn/fix-17150
Browse files Browse the repository at this point in the history
Mute just restarted nodes in leader_balancer
  • Loading branch information
piyushredpanda authored Jun 12, 2024
2 parents 996183e + 5084da5 commit d4b5ad4
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 14 deletions.
69 changes: 58 additions & 11 deletions src/v/cluster/scheduling/leader_balancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,20 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
_need_controller_refresh = false;
}

auto group_replicas = co_await collect_group_replicas_from_health_report();
auto health_report = co_await _health_monitor.get_cluster_health(
cluster_report_filter{},
force_refresh::no,
model::timeout_clock::now() + 5s);
if (!health_report) {
vlog(
clusterlog.warn,
"couldn't get health report: {}",
health_report.error());
co_return ss::stop_iteration::no;
}

auto group_replicas = co_await collect_group_replicas_from_health_report(
health_report.value());
auto index = build_index(std::move(group_replicas));
auto group_id_to_topic = build_group_id_to_topic_rev();

Expand All @@ -412,6 +425,8 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
co_return ss::stop_iteration::no;
}

auto muted_nodes = collect_muted_nodes(health_report.value());

auto mode = config::shard_local_cfg().leader_balancer_mode();
std::unique_ptr<leader_balancer_strategy> strategy;

Expand All @@ -422,12 +437,12 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
leader_balancer_types::random_hill_climbing_strategy>(
std::move(index),
std::move(group_id_to_topic),
leader_balancer_types::muted_index{muted_nodes(), {}});
leader_balancer_types::muted_index{std::move(muted_nodes), {}});
break;
case model::leader_balancer_mode::greedy_balanced_shards:
vlog(clusterlog.debug, "using greedy_balanced_shards");
strategy = std::make_unique<greedy_balanced_shards>(
std::move(index), muted_nodes());
std::move(index), std::move(muted_nodes));
break;
default:
vlog(clusterlog.error, "unexpected mode value: {}", mode);
Expand Down Expand Up @@ -564,7 +579,8 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
co_return ss::stop_iteration::no;
}

absl::flat_hash_set<model::node_id> leader_balancer::muted_nodes() const {
absl::flat_hash_set<model::node_id>
leader_balancer::collect_muted_nodes(const cluster_health_report& hr) {
absl::flat_hash_set<model::node_id> nodes;
const auto now = raft::clock_type::now();
for (const auto& follower : _raft0->get_follower_metrics()) {
Expand All @@ -578,6 +594,7 @@ absl::flat_hash_set<model::node_id> leader_balancer::muted_nodes() const {
std::chrono::duration_cast<std::chrono::milliseconds>(
last_hbeat_age)
.count());
continue;
}

if (auto nm = _members.get_node_metadata_ref(follower.id); nm) {
Expand All @@ -590,8 +607,42 @@ absl::flat_hash_set<model::node_id> leader_balancer::muted_nodes() const {
"Leadership rebalancer muting node {} in a maintenance "
"state.",
follower.id);
continue;
}
}

auto report_it = std::find_if(
hr.node_reports.begin(),
hr.node_reports.end(),
[id = follower.id](const node_health_report_ptr& n) {
return n->id == id;
});
if (report_it == hr.node_reports.end()) {
nodes.insert(follower.id);
vlog(
clusterlog.info,
"Leadership rebalancer muting node {} without a health report.",
follower.id);
continue;
}

auto uptime = (*report_it)->local_state.uptime;
if (uptime < leader_activation_delay) {
nodes.insert(follower.id);
vlog(
clusterlog.info,
"Leadership rebalancer muting node {} that "
"just restarted ({}s. ago)",
follower.id,
uptime / 1s);

// schedule a tick soon so that we can rebalance to the restarted
// node.
_timer.cancel();
_timer.arm(leader_activation_delay);

continue;
}
}
return nodes;
}
Expand Down Expand Up @@ -632,20 +683,16 @@ leader_balancer::build_group_id_to_topic_rev() const {
/// Returns nullopt if shard info from health report can not yet be used. In
/// this case callers have to rely on shard info from topic table.
ss::future<std::optional<leader_balancer::group_replicas_t>>
leader_balancer::collect_group_replicas_from_health_report() {
leader_balancer::collect_group_replicas_from_health_report(
const cluster_health_report& hr) {
if (!_feature_table.is_active(
features::feature::partition_shard_in_health_report)) {
co_return std::nullopt;
}

auto hr = co_await _health_monitor.get_cluster_health(
cluster_report_filter{},
force_refresh::no,
model::timeout_clock::now() + 5s);

group_replicas_t group_replicas;
ssx::async_counter counter;
for (const auto& node : hr.value().node_reports) {
for (const auto& node : hr.node_reports) {
for (const auto& topic : node->topics) {
auto maybe_meta = _topics.get_topic_metadata_ref(topic.tp_ns);
if (!maybe_meta) {
Expand Down
7 changes: 5 additions & 2 deletions src/v/cluster/scheduling/leader_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#pragma once
#include "absl/container/flat_hash_map.h"
#include "base/seastarx.h"
#include "cluster/health_monitor_types.h"
#include "cluster/partition_manager.h"
#include "cluster/scheduling/leader_balancer_probe.h"
#include "cluster/scheduling/leader_balancer_strategy.h"
Expand Down Expand Up @@ -105,12 +106,14 @@ class leader_balancer {

using group_replicas_t = absl::btree_map<raft::group_id, replicas_t>;
ss::future<std::optional<group_replicas_t>>
collect_group_replicas_from_health_report();
collect_group_replicas_from_health_report(const cluster_health_report&);
leader_balancer_types::group_id_to_topic_revision_t
build_group_id_to_topic_rev() const;
index_type build_index(std::optional<group_replicas_t>);
absl::flat_hash_set<model::node_id>
collect_muted_nodes(const cluster_health_report&);

leader_balancer_types::muted_groups_t muted_groups() const;
absl::flat_hash_set<model::node_id> muted_nodes() const;

ss::future<bool> do_transfer(reassignment);
ss::future<bool> do_transfer_local(reassignment) const;
Expand Down
2 changes: 1 addition & 1 deletion tests/rptest/tests/topic_recovery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1762,7 +1762,7 @@ def test_many_partitions(self, cloud_storage_type, check_mode):
num_topics=5,
num_partitions_per_topic=20,
check_mode=check_mode)
self.do_run(test_case)
self.do_run(test_case, upload_delay_sec=120)

@cluster(num_nodes=4,
log_allow_list=TRANSIENT_ERRORS +
Expand Down

0 comments on commit d4b5ad4

Please sign in to comment.