Skip to content

Commit

Permalink
Merge pull request #21461 from vbotbuildovich/backport-pr-21435-v24.1…
Browse files Browse the repository at this point in the history
….x-820

[v24.1.x] Maintenance mode test fixes
  • Loading branch information
ztlpn authored Jul 17, 2024
2 parents a022cde + fed355e commit 7937fab
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 20 deletions.
44 changes: 27 additions & 17 deletions src/v/cluster/scheduling/leader_balancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ void leader_balancer::on_leadership_change(
// Update in flight state
if (auto it = _in_flight_changes.find(group);
it != _in_flight_changes.end()) {
vlog(
clusterlog.trace,
"transfer of group {} finished, removing from in-flight set",
group);
_in_flight_changes.erase(it);
check_unregister_leadership_change_notification();

Expand Down Expand Up @@ -511,6 +515,7 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
co_return ss::stop_iteration::yes;
}

size_t num_dispatched = 0;
for (size_t i = 0; i < allowed_change_cnt; i++) {
if (should_stop_balance()) {
co_return ss::stop_iteration::yes;
Expand All @@ -519,18 +524,31 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
auto transfer = strategy->find_movement(muted_groups());
if (!transfer) {
vlog(
clusterlog.debug,
"No leadership balance improvements found with total delta {}, "
"number of muted groups {}",
clusterlog.info,
"Leadership balancer tick: no further improvements found, "
"total error: {:.4}, number of muted groups: {}, "
"number in flight: {}, dispatched in this tick: {}",
strategy->error(),
_muted.size());
_muted.size(),
_in_flight_changes.size(),
num_dispatched);
if (!_timer.armed()) {
_timer.arm(_idle_timeout());
}
_probe.leader_transfer_no_improvement();
co_return ss::stop_iteration::yes;
}

vlog(
clusterlog.trace,
"dispatching transfer of group {}: {} -> {}, "
"current num_dispatched: {}, in_flight: {}",
transfer->group,
transfer->from,
transfer->to,
num_dispatched,
_in_flight_changes.size());

_in_flight_changes[transfer->group] = {
*transfer, clock_type::now() + _mute_timeout()};
check_register_leadership_change_notification();
Expand All @@ -539,10 +557,12 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
if (!success) {
vlog(
clusterlog.info,
"Error transferring leadership group {} from {} to {}",
"Error transferring leadership group {} from {} to {} "
"(already dispatched in this tick: {})",
transfer->group,
transfer->from,
transfer->to);
transfer->to,
num_dispatched);

_in_flight_changes.erase(transfer->group);
check_unregister_leadership_change_notification();
Expand All @@ -562,6 +582,7 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {

} else {
_probe.leader_transfer_succeeded();
num_dispatched += 1;
strategy->apply_movement(*transfer);
}

Expand Down Expand Up @@ -900,13 +921,6 @@ leader_balancer::index_type leader_balancer::build_index(
}

ss::future<bool> leader_balancer::do_transfer(reassignment transfer) {
vlog(
clusterlog.debug,
"Transferring leadership for group {} from {} to {}",
transfer.group,
transfer.from,
transfer.to);

if (transfer.from.node_id == _raft0->self().id()) {
co_return co_await do_transfer_local(transfer);
} else {
Expand Down Expand Up @@ -1033,10 +1047,6 @@ ss::future<bool> leader_balancer::do_transfer_remote(reassignment transfer) {
res.error().message());
co_return false;
} else if (res.value().data.success) {
vlog(
clusterlog.trace,
"Leadership transfer of group {} succeeded",
transfer.group);
co_return true;
} else {
vlog(
Expand Down
10 changes: 7 additions & 3 deletions tests/rptest/tests/maintenance_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ def __init__(self, *args, **kwargs):
# Vary partition count relative to num_cpus. This is to ensure that
# leadership is moved back to a node that exits maintenance.
num_cpus = self.redpanda.get_node_cpu_count()
self.topics = (TopicSpec(partition_count=num_cpus * 3,
self.topics = (TopicSpec(partition_count=num_cpus * 5,
replication_factor=3),
TopicSpec(partition_count=num_cpus * 3,
TopicSpec(partition_count=num_cpus * 10,
replication_factor=3))
self.admin = Admin(self.redpanda)
self.rpk = RpkTool(self.redpanda)
Expand Down Expand Up @@ -131,8 +131,12 @@ def _enable_maintenance(self, node):
"""
self.logger.debug(
f"Checking that node {node.name} has a leadership role")
# In case the node is unlucky and doesn't get any leaders "naturally",
# we have to wait for the leadership balancer to do its job. We have to wait
# at least 1 minute for it to unmute just restarted nodes and perform another
# tick. Wait more than leader_balancer_idle_timeout (2 minutes) just to be sure.
wait_until(lambda: self._has_leadership_role(node),
timeout_sec=60,
timeout_sec=150,
backoff_sec=10)

self.logger.debug(
Expand Down

0 comments on commit 7937fab

Please sign in to comment.