Skip to content

Commit

Permalink
cluster: store number of leaders per node in _topic_node_index
Browse files Browse the repository at this point in the history
  • Loading branch information
ballard26 committed Mar 22, 2024
1 parent 2b0d5a5 commit ae11e01
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 35 deletions.
25 changes: 6 additions & 19 deletions src/v/cluster/scheduling/leader_balancer_constraints.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,8 @@ void even_topic_distributon_constraint::update_index(const reassignment& r) {
_error += skew;

// Update _topic_node_index

auto& groups_from_node = _topic_node_index.at(topic_id).at(r.from.node_id);
auto it = std::find_if(
groups_from_node.begin(),
groups_from_node.end(),
[&r](const auto& g_info) { return r.group == g_info.group_id; });

vassert(it != groups_from_node.end(), "reassigning non-existent group");

auto moved_group_info = std::move(*it);
groups_from_node.erase(it);

_topic_node_index[topic_id][r.to.node_id].emplace_back(
moved_group_info.group_id, r.to, std::move(moved_group_info.replicas));
_topic_node_index.at(topic_id).at(r.from.node_id) -= 1;
_topic_node_index.at(topic_id).at(r.to.node_id) += 1;

// Update _si

Expand All @@ -77,8 +65,7 @@ void even_topic_distributon_constraint::rebuild_indexes() {
auto topic_id = group_to_topic_id().at(group_p.first);
const auto& node_id = broker_shard.first.node_id;

_topic_node_index[topic_id][node_id].emplace_back(
group_p.first, broker_shard.first, group_p.second);
_topic_node_index[topic_id][node_id] += 1;
_topic_partition_index[topic_id] += 1;

// Some of the replicas may not have leadership. So add
Expand Down Expand Up @@ -121,7 +108,7 @@ void even_topic_distributon_constraint::calc_topic_skew() {
skew = 0;

for (const auto& node : topic.second) {
auto leaders = static_cast<double>(node.second.size());
auto leaders = static_cast<double>(node.second);

skew += pow(leaders - opt_leaders, 2);
}
Expand Down Expand Up @@ -149,7 +136,7 @@ double even_topic_distributon_constraint::adjusted_error(
double from_node_leaders = 0;
const auto from_it = topic_leaders.find(from.node_id);
if (from_it != topic_leaders.cend()) {
from_node_leaders = static_cast<double>(from_it->second.size());
from_node_leaders = static_cast<double>(from_it->second);
} else {
// If there are no leaders for the topic on the from node
// then there is nothing to move and no change to the error.
Expand All @@ -159,7 +146,7 @@ double even_topic_distributon_constraint::adjusted_error(
double to_node_leaders = 0;
const auto to_it = topic_leaders.find(to.node_id);
if (to_it != topic_leaders.cend()) {
to_node_leaders = static_cast<double>(to_it->second.size());
to_node_leaders = static_cast<double>(to_it->second);
}

// Subtract old weights
Expand Down
17 changes: 1 addition & 16 deletions src/v/cluster/scheduling/leader_balancer_constraints.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,6 @@ class even_topic_distributon_constraint final
// more than this value.
static constexpr double error_jitter = 0.000001;

struct group_info {
raft::group_id group_id;
model::broker_shard leader;
std::vector<model::broker_shard> replicas;

group_info(
const raft::group_id& gid,
const model::broker_shard& bs,
std::vector<model::broker_shard> r)
: group_id(gid)
, leader(bs)
, replicas(std::move(r)) {}
};

using topic_id_t = model::revision_id::type;

template<typename ValueType>
Expand Down Expand Up @@ -171,8 +157,7 @@ class even_topic_distributon_constraint final
group_id_to_topic_revision_t _group_to_topic_rev;
double _error{0};

topic_map<absl::flat_hash_map<model::node_id, std::vector<group_info>>>
_topic_node_index;
topic_map<absl::flat_hash_map<model::node_id, size_t>> _topic_node_index;
topic_map<size_t> _topic_partition_index;
topic_map<absl::flat_hash_set<model::node_id>> _topic_replica_index;
topic_map<double> _topic_skew;
Expand Down

0 comments on commit ae11e01

Please sign in to comment.