Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(load-balance): update selection strategy for max load disk #916

Merged
merged 2 commits into from
Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 49 additions & 30 deletions src/meta/greedy_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ void get_min_max_set(const std::map<rpc_address, uint32_t> &node_count_map,
}
}

template <typename S>
auto select_random(const S &s, size_t n)
{
auto it = std::begin(s);
std::advance(it, n);
return it;
}

greedy_load_balancer::greedy_load_balancer(meta_service *_svc)
: server_load_balancer(_svc),
_ctrl_balancer_ignored_apps(nullptr),
Expand Down Expand Up @@ -1217,9 +1225,10 @@ bool greedy_load_balancer::get_next_move(const cluster_migration_info &cluster_i
std::multimap<uint32_t, rpc_address> app_count_multimap = utils::flip_map(app_map);
if (app_count_multimap.rbegin()->first <= app_count_multimap.begin()->first + 1 &&
(app_cluster_min_set.empty() || app_cluster_max_set.empty())) {
ddebug_f(
"do not move replicas of a balanced app if the least (most) loaded servers overall "
"do not intersect the servers hosting the least (most) replicas of the app");
ddebug_f("do not move replicas of a balanced app({}) if the least (most) loaded "
"servers overall do not intersect the servers hosting the least (most) "
"replicas of the app",
app_id);
continue;
}

Expand All @@ -1244,59 +1253,69 @@ bool greedy_load_balancer::pick_up_move(const cluster_migration_info &cluster_in
const partition_set &selected_pid,
/*out*/ move_info &move_info)
{
rpc_address max_load_node;
std::string max_load_disk;
partition_set max_load_partitions;
get_max_load_disk(
cluster_info, max_nodes, app_id, max_load_node, max_load_disk, max_load_partitions);

std::set<app_disk_info> max_load_disk_set;
get_max_load_disk_set(cluster_info, max_nodes, app_id, max_load_disk_set);
if (max_load_disk_set.empty()) {
return false;
}
auto index = rand() % max_load_disk_set.size();
auto max_load_disk = *select_random(max_load_disk_set, index);
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
ddebug_f("most load disk({}) on node({}) is picked, has {} partition",
max_load_disk.node.to_string(),
max_load_disk.disk_tag,
max_load_disk.partitions.size());
for (const auto &node_addr : min_nodes) {
gpid picked_pid;
if (pick_up_partition(
cluster_info, node_addr, max_load_partitions, selected_pid, picked_pid)) {
cluster_info, node_addr, max_load_disk.partitions, selected_pid, picked_pid)) {
move_info.pid = picked_pid;
move_info.source_node = max_load_node;
move_info.source_disk_tag = max_load_disk;
move_info.source_node = max_load_disk.node;
move_info.source_disk_tag = max_load_disk.disk_tag;
move_info.target_node = node_addr;
move_info.type = cluster_info.type == cluster_balance_type::COPY_SECONDARY
? balance_type::copy_secondary
: balance_type::copy_primary;
ddebug_f("partition[{}] will migrate from {} to {}",
picked_pid,
max_load_node.to_string(),
max_load_disk.node.to_string(),
node_addr.to_string());
return true;
}
}
ddebug_f("can not find a partition(app_id={}) from random max load disk(node={}, disk={})",
app_id,
max_load_disk.node.to_string(),
max_load_disk.disk_tag);
return false;
}

void greedy_load_balancer::get_max_load_disk(const cluster_migration_info &cluster_info,
const std::set<rpc_address> &max_nodes,
const int32_t app_id,
/*out*/ rpc_address &picked_node,
/*out*/ std::string &picked_disk,
/*out*/ partition_set &target_partitions)
void greedy_load_balancer::get_max_load_disk_set(const cluster_migration_info &cluster_info,
const std::set<rpc_address> &max_nodes,
const int32_t app_id,
/*out*/ std::set<app_disk_info> &max_load_disk_set)
{
int32_t max_load_size = 0;
// key: partition count (app_disk_info.partitions.size())
// value: app_disk_info structure
std::multimap<uint32_t, app_disk_info> app_disk_info_multimap;
for (const auto &node_addr : max_nodes) {
// key: disk_tag
// value: partition set for app(app id=app_id) in node(addr=node_addr)
std::map<std::string, partition_set> disk_partitions =
get_disk_partitions_map(cluster_info, node_addr, app_id);
for (const auto &kv : disk_partitions) {
if (kv.second.size() > max_load_size) {
picked_node = node_addr;
picked_disk = kv.first;
target_partitions = kv.second;
max_load_size = kv.second.size();
}
app_disk_info info;
info.app_id = app_id;
info.node = node_addr;
info.disk_tag = kv.first;
info.partitions = kv.second;
app_disk_info_multimap.insert(
std::pair<uint32_t, app_disk_info>(kv.second.size(), info));
}
}
ddebug_f("most load is node({}), disk_tag({}), target partition count = {}",
picked_node.to_string(),
picked_disk,
target_partitions.size());
auto range = app_disk_info_multimap.equal_range(app_disk_info_multimap.rbegin()->first);
for (auto iter = range.first; iter != range.second; ++iter) {
max_load_disk_set.insert(iter->second);
}
}

std::map<std::string, partition_set> greedy_load_balancer::get_disk_partitions_map(
Expand Down
31 changes: 24 additions & 7 deletions src/meta/greedy_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,25 @@ class greedy_load_balancer : public server_load_balancer
std::map<rpc_address, uint32_t> replicas_count;
};

struct app_disk_info
{
int32_t app_id;
rpc_address node;
std::string disk_tag;
partition_set partitions;
bool operator==(const app_disk_info &another) const
{
return app_id == another.app_id && node == another.node && disk_tag == another.disk_tag;
}
bool operator<(const app_disk_info &another) const
{
if (app_id < another.app_id || (app_id == another.app_id && node < another.node) ||
(app_id == another.app_id && node == another.node && disk_tag < another.disk_tag))
return true;
return false;
}
};

struct move_info
{
gpid pid;
Expand Down Expand Up @@ -249,12 +268,10 @@ class greedy_load_balancer : public server_load_balancer
const partition_set &selected_pid,
/*out*/ move_info &move_info);

void get_max_load_disk(const cluster_migration_info &cluster_info,
const std::set<rpc_address> &max_nodes,
const int32_t app_id,
/*out*/ rpc_address &picked_node,
/*out*/ std::string &picked_disk,
/*out*/ partition_set &target_partitions);
void get_max_load_disk_set(const cluster_migration_info &cluster_info,
const std::set<rpc_address> &max_nodes,
const int32_t app_id,
/*out*/ std::set<app_disk_info> &max_load_disk_set);

std::map<std::string, partition_set> get_disk_partitions_map(
const cluster_migration_info &cluster_info, const rpc_address &addr, const int32_t app_id);
Expand Down Expand Up @@ -302,7 +319,7 @@ class greedy_load_balancer : public server_load_balancer
FRIEND_TEST(greedy_load_balancer, get_app_migration_info);
FRIEND_TEST(greedy_load_balancer, get_node_migration_info);
FRIEND_TEST(greedy_load_balancer, get_disk_partitions_map);
FRIEND_TEST(greedy_load_balancer, get_max_load_disk);
FRIEND_TEST(greedy_load_balancer, get_max_load_disk_set);
FRIEND_TEST(greedy_load_balancer, apply_move);
FRIEND_TEST(greedy_load_balancer, pick_up_partition);
};
Expand Down
41 changes: 27 additions & 14 deletions src/meta/test/greedy_load_balancer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,41 +224,54 @@ TEST(greedy_load_balancer, get_disk_partitions_map)
ASSERT_EQ(disk_partitions[disk_tag].count(pid), 1);
}

TEST(greedy_load_balancer, get_max_load_disk)
TEST(greedy_load_balancer, get_max_load_disk_set)
{
greedy_load_balancer::cluster_migration_info cluster_info;
cluster_info.type = cluster_balance_type::COPY_SECONDARY;

rpc_address addr(1, 10086);
int32_t app_id = 1;
rpc_address addr(1, 10086);
rpc_address addr2(2, 10086);
std::map<rpc_address, partition_status::type> partition;
partition[addr] = partition_status::PS_SECONDARY;
std::map<rpc_address, partition_status::type> partition2;
partition2[addr] = partition_status::PS_SECONDARY;
partition2[addr2] = partition_status::PS_SECONDARY;
greedy_load_balancer::app_migration_info app_info;
app_info.partitions.push_back(partition);
app_info.partitions.push_back(partition2);
cluster_info.apps_info[app_id] = app_info;

greedy_load_balancer::node_migration_info node_info;
partition_set partitions;
gpid pid(app_id, 0);
partitions.insert(pid);
greedy_load_balancer::node_migration_info node_info;
std::string disk_tag = "disk1";
node_info.partitions[disk_tag] = partitions;
partition_set partitions2;
gpid pid2(app_id, 1);
partitions2.insert(pid2);
std::string disk_tag2 = "disk2";
node_info.partitions[disk_tag2] = partitions2;
cluster_info.nodes_info[addr] = node_info;

greedy_load_balancer::node_migration_info node_info2;
partition_set partitions3;
gpid pid3(app_id, 1);
partitions3.insert(pid3);
std::string disk_tag3 = "disk3";
node_info2.partitions[disk_tag3] = partitions3;
cluster_info.nodes_info[addr2] = node_info2;

greedy_load_balancer balancer(nullptr);
std::set<rpc_address> max_nodes;
max_nodes.insert(addr);
max_nodes.insert(addr2);

greedy_load_balancer balancer(nullptr);
rpc_address picked_node;
std::string picked_disk;
partition_set target_partitions;
balancer.get_max_load_disk(
cluster_info, max_nodes, app_id, picked_node, picked_disk, target_partitions);

ASSERT_EQ(picked_node, addr);
ASSERT_EQ(picked_disk, disk_tag);
ASSERT_EQ(target_partitions.size(), 1);
ASSERT_EQ(target_partitions.count(pid), 1);
std::set<greedy_load_balancer::app_disk_info> max_load_disk_set;
balancer.get_max_load_disk_set(cluster_info, max_nodes, app_id, max_load_disk_set);

ASSERT_EQ(max_load_disk_set.size(), 3);
}

TEST(greedy_load_balancer, apply_move)
Expand Down