-
Notifications
You must be signed in to change notification settings - Fork 592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change information stored in _topic_node_index
to avoid oversized alloc
#17350
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,20 +42,8 @@ void even_topic_distributon_constraint::update_index(const reassignment& r) { | |
_error += skew; | ||
|
||
// Update _topic_node_index | ||
|
||
auto& groups_from_node = _topic_node_index.at(topic_id).at(r.from.node_id); | ||
auto it = std::find_if( | ||
groups_from_node.begin(), | ||
groups_from_node.end(), | ||
[&r](const auto& g_info) { return r.group == g_info.group_id; }); | ||
|
||
vassert(it != groups_from_node.end(), "reassigning non-existent group"); | ||
|
||
auto moved_group_info = std::move(*it); | ||
groups_from_node.erase(it); | ||
|
||
_topic_node_index[topic_id][r.to.node_id].emplace_back( | ||
moved_group_info.group_id, r.to, std::move(moved_group_info.replicas)); | ||
_topic_node_index.at(topic_id).at(r.from.node_id) -= 1; | ||
_topic_node_index.at(topic_id).at(r.to.node_id) += 1; | ||
|
||
// Update _si | ||
|
||
|
@@ -64,85 +52,11 @@ void even_topic_distributon_constraint::update_index(const reassignment& r) { | |
|
||
std::optional<reassignment> | ||
even_topic_distributon_constraint::recommended_reassignment() { | ||
// Sort topics based on topic error here | ||
std::vector<decltype(_topic_node_index)::const_iterator> sorted_topics; | ||
sorted_topics.reserve(_topic_node_index.size()); | ||
|
||
for (auto it = _topic_node_index.cbegin(); it != _topic_node_index.cend(); | ||
++it) { | ||
sorted_topics.push_back(it); | ||
} | ||
|
||
std::sort( | ||
sorted_topics.begin(), | ||
sorted_topics.end(), | ||
[this](const auto& a, const auto& b) { | ||
return _topic_skew[a->first] > _topic_skew[b->first]; | ||
}); | ||
|
||
// Look for a topic with the most skew | ||
for (const auto& topic : sorted_topics) { | ||
const auto& nodes = topic->second; | ||
|
||
if (nodes.size() == 0) { | ||
continue; | ||
} | ||
|
||
std::vector<decltype(nodes.cbegin())> nodes_sorted; | ||
nodes_sorted.reserve(nodes.size()); | ||
|
||
for (auto it = nodes.cbegin(); it != nodes.cend(); ++it) { | ||
nodes_sorted.push_back(it); | ||
} | ||
|
||
std::sort( | ||
nodes_sorted.begin(), | ||
nodes_sorted.end(), | ||
[](const auto& a, const auto& b) { | ||
return a->second.size() > b->second.size(); | ||
}); | ||
|
||
// Try to move leadership off the node with the most leadership. | ||
for (const auto& node : nodes_sorted) { | ||
// Don't try moving a group from a muted node. | ||
if (mi().muted_nodes().contains(node->first)) { | ||
continue; | ||
} | ||
|
||
for (const auto& g_info : node->second) { | ||
const auto& leader = g_info.leader; | ||
const auto& group = g_info.group_id; | ||
const auto& replicas = g_info.replicas; | ||
|
||
// Don't try moving any groups that are currently muted. | ||
if (mi().muted_groups().contains( | ||
static_cast<uint64_t>(group))) { | ||
continue; | ||
} | ||
|
||
for (const auto& replica : replicas) { | ||
// Don't try a move to a different shard to on the same | ||
// node. As it won't decrease error | ||
if (replica.node_id == node->first || leader == replica) { | ||
continue; | ||
} | ||
|
||
// Don't try moving group to a muted node. | ||
if (mi().muted_nodes().contains(replica.node_id)) { | ||
continue; | ||
} | ||
|
||
reassignment r{group, leader, replica}; | ||
|
||
if (evaluate_internal(r) > error_jitter) { | ||
return r; | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
return std::nullopt; | ||
// This method is deprecated and is ony used in `leader_balancer_greedy` | ||
// which doesn't use the `even_topic_distributon_constraint`. Hence there is | ||
// no need to implement it here. Once the greedy balancer has been removed | ||
// this should be removed as well. | ||
vassert(false, "not implemented"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you include a clear comment here about why this won't be called? Is it called on other types implementing this interface? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ballard26 this seems to have introduced a CI failure on v24.1.x: https://buildkite.com/redpanda/redpanda/builds/51764#0190c949-1a0f-42ba-a040-4e33bc85fc38 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I searched a little but didn't see a ticket for this. is there one @BenPope? |
||
} | ||
|
||
void even_topic_distributon_constraint::rebuild_indexes() { | ||
|
@@ -155,8 +69,7 @@ void even_topic_distributon_constraint::rebuild_indexes() { | |
auto topic_id = group_to_topic_id().at(group_p.first); | ||
const auto& node_id = broker_shard.first.node_id; | ||
|
||
_topic_node_index[topic_id][node_id].emplace_back( | ||
group_p.first, broker_shard.first, group_p.second); | ||
_topic_node_index[topic_id][node_id] += 1; | ||
_topic_partition_index[topic_id] += 1; | ||
|
||
// Some of the replicas may not have leadership. So add | ||
|
@@ -199,7 +112,7 @@ void even_topic_distributon_constraint::calc_topic_skew() { | |
skew = 0; | ||
|
||
for (const auto& node : topic.second) { | ||
auto leaders = static_cast<double>(node.second.size()); | ||
auto leaders = static_cast<double>(node.second); | ||
|
||
skew += pow(leaders - opt_leaders, 2); | ||
} | ||
|
@@ -227,7 +140,7 @@ double even_topic_distributon_constraint::adjusted_error( | |
double from_node_leaders = 0; | ||
const auto from_it = topic_leaders.find(from.node_id); | ||
if (from_it != topic_leaders.cend()) { | ||
from_node_leaders = static_cast<double>(from_it->second.size()); | ||
from_node_leaders = static_cast<double>(from_it->second); | ||
} else { | ||
// If there are no leaders for the topic on the from node | ||
// then there is nothing to move and no change to the error. | ||
|
@@ -237,7 +150,7 @@ double even_topic_distributon_constraint::adjusted_error( | |
double to_node_leaders = 0; | ||
const auto to_it = topic_leaders.find(to.node_id); | ||
if (to_it != topic_leaders.cend()) { | ||
to_node_leaders = static_cast<double>(to_it->second.size()); | ||
to_node_leaders = static_cast<double>(to_it->second); | ||
} | ||
|
||
// Subtract old weights | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why this second
at
is supposed to be always successful. Isn't it possible that some nodes didn't have a leader of any partition of that topic previously?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should always be.
even_topic_distributon_constraint::update_index
is only used internally and the reassignment is validated by the callers before its called.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, we initialize it on (new) line 79. But maybe it's easier for the reader to just use
operator[]