Skip to content

Commit

Permalink
Merge pull request #17350 from ballard26/large-alloc-lb
Browse files Browse the repository at this point in the history
Change information stored in ` _topic_node_index` to avoid oversized alloc
  • Loading branch information
piyushredpanda authored May 9, 2024
2 parents e6417b5 + a5be6a6 commit 8c6bdf3
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 152 deletions.
109 changes: 11 additions & 98 deletions src/v/cluster/scheduling/leader_balancer_constraints.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,8 @@ void even_topic_distributon_constraint::update_index(const reassignment& r) {
_error += skew;

// Update _topic_node_index

auto& groups_from_node = _topic_node_index.at(topic_id).at(r.from.node_id);
auto it = std::find_if(
groups_from_node.begin(),
groups_from_node.end(),
[&r](const auto& g_info) { return r.group == g_info.group_id; });

vassert(it != groups_from_node.end(), "reassigning non-existent group");

auto moved_group_info = std::move(*it);
groups_from_node.erase(it);

_topic_node_index[topic_id][r.to.node_id].emplace_back(
moved_group_info.group_id, r.to, std::move(moved_group_info.replicas));
_topic_node_index.at(topic_id).at(r.from.node_id) -= 1;
_topic_node_index.at(topic_id).at(r.to.node_id) += 1;

// Update _si

Expand All @@ -64,85 +52,11 @@ void even_topic_distributon_constraint::update_index(const reassignment& r) {

std::optional<reassignment>
even_topic_distributon_constraint::recommended_reassignment() {
// Sort topics based on topic error here
std::vector<decltype(_topic_node_index)::const_iterator> sorted_topics;
sorted_topics.reserve(_topic_node_index.size());

for (auto it = _topic_node_index.cbegin(); it != _topic_node_index.cend();
++it) {
sorted_topics.push_back(it);
}

std::sort(
sorted_topics.begin(),
sorted_topics.end(),
[this](const auto& a, const auto& b) {
return _topic_skew[a->first] > _topic_skew[b->first];
});

// Look for a topic with the most skew
for (const auto& topic : sorted_topics) {
const auto& nodes = topic->second;

if (nodes.size() == 0) {
continue;
}

std::vector<decltype(nodes.cbegin())> nodes_sorted;
nodes_sorted.reserve(nodes.size());

for (auto it = nodes.cbegin(); it != nodes.cend(); ++it) {
nodes_sorted.push_back(it);
}

std::sort(
nodes_sorted.begin(),
nodes_sorted.end(),
[](const auto& a, const auto& b) {
return a->second.size() > b->second.size();
});

// Try to move leadership off the node with the most leadership.
for (const auto& node : nodes_sorted) {
// Don't try moving a group from a muted node.
if (mi().muted_nodes().contains(node->first)) {
continue;
}

for (const auto& g_info : node->second) {
const auto& leader = g_info.leader;
const auto& group = g_info.group_id;
const auto& replicas = g_info.replicas;

// Don't try moving any groups that are currently muted.
if (mi().muted_groups().contains(
static_cast<uint64_t>(group))) {
continue;
}

for (const auto& replica : replicas) {
// Don't try a move to a different shard to on the same
// node. As it won't decrease error
if (replica.node_id == node->first || leader == replica) {
continue;
}

// Don't try moving group to a muted node.
if (mi().muted_nodes().contains(replica.node_id)) {
continue;
}

reassignment r{group, leader, replica};

if (evaluate_internal(r) > error_jitter) {
return r;
}
}
}
}
}

return std::nullopt;
// This method is deprecated and is ony used in `leader_balancer_greedy`
// which doesn't use the `even_topic_distributon_constraint`. Hence there is
// no need to implement it here. Once the greedy balancer has been removed
// this should be removed as well.
vassert(false, "not implemented");
}

void even_topic_distributon_constraint::rebuild_indexes() {
Expand All @@ -155,8 +69,7 @@ void even_topic_distributon_constraint::rebuild_indexes() {
auto topic_id = group_to_topic_id().at(group_p.first);
const auto& node_id = broker_shard.first.node_id;

_topic_node_index[topic_id][node_id].emplace_back(
group_p.first, broker_shard.first, group_p.second);
_topic_node_index[topic_id][node_id] += 1;
_topic_partition_index[topic_id] += 1;

// Some of the replicas may not have leadership. So add
Expand Down Expand Up @@ -199,7 +112,7 @@ void even_topic_distributon_constraint::calc_topic_skew() {
skew = 0;

for (const auto& node : topic.second) {
auto leaders = static_cast<double>(node.second.size());
auto leaders = static_cast<double>(node.second);

skew += pow(leaders - opt_leaders, 2);
}
Expand Down Expand Up @@ -227,7 +140,7 @@ double even_topic_distributon_constraint::adjusted_error(
double from_node_leaders = 0;
const auto from_it = topic_leaders.find(from.node_id);
if (from_it != topic_leaders.cend()) {
from_node_leaders = static_cast<double>(from_it->second.size());
from_node_leaders = static_cast<double>(from_it->second);
} else {
// If there are no leaders for the topic on the from node
// then there is nothing to move and no change to the error.
Expand All @@ -237,7 +150,7 @@ double even_topic_distributon_constraint::adjusted_error(
double to_node_leaders = 0;
const auto to_it = topic_leaders.find(to.node_id);
if (to_it != topic_leaders.cend()) {
to_node_leaders = static_cast<double>(to_it->second.size());
to_node_leaders = static_cast<double>(to_it->second);
}

// Subtract old weights
Expand Down
18 changes: 2 additions & 16 deletions src/v/cluster/scheduling/leader_balancer_constraints.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,6 @@ class even_topic_distributon_constraint final
// more than this value.
static constexpr double error_jitter = 0.000001;

struct group_info {
raft::group_id group_id;
model::broker_shard leader;
std::vector<model::broker_shard> replicas;

group_info(
const raft::group_id& gid,
const model::broker_shard& bs,
std::vector<model::broker_shard> r)
: group_id(gid)
, leader(bs)
, replicas(std::move(r)) {}
};

using topic_id_t = model::revision_id::type;

template<typename ValueType>
Expand Down Expand Up @@ -171,8 +157,8 @@ class even_topic_distributon_constraint final
group_id_to_topic_revision_t _group_to_topic_rev;
double _error{0};

topic_map<absl::flat_hash_map<model::node_id, std::vector<group_info>>>
_topic_node_index;
// Stores the number of leaders on a given node per topic.
topic_map<absl::flat_hash_map<model::node_id, size_t>> _topic_node_index;
topic_map<size_t> _topic_partition_index;
topic_map<absl::flat_hash_set<model::node_id>> _topic_replica_index;
topic_map<double> _topic_skew;
Expand Down
43 changes: 5 additions & 38 deletions src/v/cluster/tests/leader_balancer_constraints_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ BOOST_AUTO_TEST_CASE(even_topic_distribution_empty) {
gntp_i, shard_index, muted_index);

BOOST_REQUIRE(even_topic_con.error() == 0);
BOOST_REQUIRE(!even_topic_con.recommended_reassignment());
}

BOOST_AUTO_TEST_CASE(even_topic_distribution_constraint_no_error) {
Expand Down Expand Up @@ -326,41 +325,12 @@ BOOST_AUTO_TEST_CASE(even_topic_distributon_constraint_find_reassignment) {
BOOST_REQUIRE(
topic_constraint.error() == topic_constraint.evaluate(reassignment));

auto rreassignment = topic_constraint.recommended_reassignment();
BOOST_REQUIRE(rreassignment.has_value());

index_cl.update_index(rreassignment.value());
topic_constraint.update_index(rreassignment.value());
index_cl.update_index(reassignment);
topic_constraint.update_index(reassignment);

BOOST_REQUIRE(topic_constraint.error() == 0);
}

BOOST_AUTO_TEST_CASE(even_topic_odd_partition_cnt) {
// In cases where a topic as a partition count that
// is not even divisible by the nodes they are replicated on.
// In these cases its not possible for every node to has_value
// equal leadership for the topic.
//
// This tests that in those cases the even topic constraint
// correctly recommends no further reassignments.

auto g_id_to_t_id = group_to_topic_from_spec({
{0, {1, 2, 3}},
});

auto [shard_index, muted_index] = from_spec(
{
{{1, 2}, {3}},
{{3}, {1, 2}},
},
{});

auto even_topic_con = lbt::even_topic_distributon_constraint(
g_id_to_t_id, shard_index, muted_index);

BOOST_REQUIRE(!even_topic_con.recommended_reassignment());
}

BOOST_AUTO_TEST_CASE(even_shard_no_error_even_topic_error) {
// Here even_shard_load_constraint.error = 0, but
// even_topic_distributon_constraint.error > 0 and any move will increase
Expand Down Expand Up @@ -390,11 +360,10 @@ BOOST_AUTO_TEST_CASE(even_shard_no_error_even_topic_error) {
BOOST_REQUIRE(even_shard_con.error() == 0);
BOOST_REQUIRE(even_topic_con.error() > 0);

auto rea = even_topic_con.recommended_reassignment();
BOOST_REQUIRE(rea.has_value());
auto rea = re(1, 0, 1);

BOOST_REQUIRE(even_shard_con.evaluate(*rea) < 0);
BOOST_REQUIRE(even_topic_con.evaluate(*rea) > 0);
BOOST_REQUIRE(even_shard_con.evaluate(rea) < 0);
BOOST_REQUIRE(even_topic_con.evaluate(rea) > 0);
}

BOOST_AUTO_TEST_CASE(even_topic_no_error_even_shard_error) {
Expand Down Expand Up @@ -426,8 +395,6 @@ BOOST_AUTO_TEST_CASE(even_topic_no_error_even_shard_error) {
g_id_to_t_id, shard_index, muted_index);

BOOST_REQUIRE(even_shard_con.error() > 0);

BOOST_REQUIRE(!even_topic_con.recommended_reassignment());
BOOST_REQUIRE(even_shard_con.recommended_reassignment().has_value());
}

Expand Down

0 comments on commit 8c6bdf3

Please sign in to comment.