Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maintenance mode test fixes #21435

Merged
merged 3 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions src/v/cluster/scheduling/leader_balancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ void leader_balancer::on_leadership_change(
// Update in flight state
if (auto it = _in_flight_changes.find(group);
it != _in_flight_changes.end()) {
vlog(
clusterlog.trace,
"transfer of group {} finished, removing from in-flight set",
group);
_in_flight_changes.erase(it);
check_unregister_leadership_change_notification();

Expand Down Expand Up @@ -511,6 +515,7 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
co_return ss::stop_iteration::yes;
}

size_t num_dispatched = 0;
for (size_t i = 0; i < allowed_change_cnt; i++) {
if (should_stop_balance()) {
co_return ss::stop_iteration::yes;
Expand All @@ -519,18 +524,31 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
auto transfer = strategy->find_movement(muted_groups());
if (!transfer) {
vlog(
clusterlog.debug,
"No leadership balance improvements found with total delta {}, "
"number of muted groups {}",
clusterlog.info,
"Leadership balancer tick: no further improvements found, "
"total error: {:.4}, number of muted groups: {}, "
"number in flight: {}, dispatched in this tick: {}",
strategy->error(),
_muted.size());
_muted.size(),
_in_flight_changes.size(),
num_dispatched);
if (!_timer.armed()) {
_timer.arm(_idle_timeout());
}
_probe.leader_transfer_no_improvement();
co_return ss::stop_iteration::yes;
}

vlog(
clusterlog.trace,
"dispatching transfer of group {}: {} -> {}, "
"current num_dispatched: {}, in_flight: {}",
transfer->group,
transfer->from,
transfer->to,
num_dispatched,
_in_flight_changes.size());

_in_flight_changes[transfer->group] = {
*transfer, clock_type::now() + _mute_timeout()};
check_register_leadership_change_notification();
Expand All @@ -539,10 +557,12 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
if (!success) {
vlog(
clusterlog.info,
"Error transferring leadership group {} from {} to {}",
"Error transferring leadership group {} from {} to {} "
"(already dispatched in this tick: {})",
transfer->group,
transfer->from,
transfer->to);
transfer->to,
num_dispatched);

_in_flight_changes.erase(transfer->group);
check_unregister_leadership_change_notification();
Expand All @@ -562,6 +582,7 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {

} else {
_probe.leader_transfer_succeeded();
num_dispatched += 1;
strategy->apply_movement(*transfer);
}

Expand Down Expand Up @@ -900,13 +921,6 @@ leader_balancer::index_type leader_balancer::build_index(
}

ss::future<bool> leader_balancer::do_transfer(reassignment transfer) {
vlog(
clusterlog.debug,
"Transferring leadership for group {} from {} to {}",
transfer.group,
transfer.from,
transfer.to);

if (transfer.from.node_id == _raft0->self().id()) {
co_return co_await do_transfer_local(transfer);
} else {
Expand Down Expand Up @@ -1033,10 +1047,6 @@ ss::future<bool> leader_balancer::do_transfer_remote(reassignment transfer) {
res.error().message());
co_return false;
} else if (res.value().data.success) {
vlog(
clusterlog.trace,
"Leadership transfer of group {} succeeded",
transfer.group);
co_return true;
} else {
vlog(
Expand Down
10 changes: 7 additions & 3 deletions tests/rptest/tests/maintenance_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ def __init__(self, *args, **kwargs):
# Vary partition count relative to num_cpus. This is to ensure that
# leadership is moved back to a node that exits maintenance.
num_cpus = self.redpanda.get_node_cpu_count()
self.topics = (TopicSpec(partition_count=num_cpus * 3,
self.topics = (TopicSpec(partition_count=num_cpus * 5,
replication_factor=3),
TopicSpec(partition_count=num_cpus * 3,
TopicSpec(partition_count=num_cpus * 10,
replication_factor=3))
self.admin = Admin(self.redpanda)
self.rpk = RpkTool(self.redpanda)
Expand Down Expand Up @@ -131,8 +131,12 @@ def _enable_maintenance(self, node):
"""
self.logger.debug(
f"Checking that node {node.name} has a leadership role")
# In case the node is unlucky and doesn't get any leaders "naturally",
# we have to wait for the leadership balancer to do its job. We have to wait
# at least 1 minute for it to unmute just restarted nodes and perform another
# tick. Wait more than leader_balancer_idle_timeout (2 minutes) just to be sure.
wait_until(lambda: self._has_leadership_role(node),
timeout_sec=60,
timeout_sec=150,
backoff_sec=10)

self.logger.debug(
Expand Down
Loading