diff --git a/src/meta/greedy_load_balancer.cpp b/src/meta/greedy_load_balancer.cpp index 9c131ace96..d9b43e174d 100644 --- a/src/meta/greedy_load_balancer.cpp +++ b/src/meta/greedy_load_balancer.cpp @@ -102,6 +102,14 @@ void get_min_max_set(const std::map &node_count_map, } } +template +auto select_random(const S &s, size_t n) +{ + auto it = std::begin(s); + std::advance(it, n); + return it; +} + greedy_load_balancer::greedy_load_balancer(meta_service *_svc) : server_load_balancer(_svc), _ctrl_balancer_ignored_apps(nullptr), @@ -1217,9 +1225,10 @@ bool greedy_load_balancer::get_next_move(const cluster_migration_info &cluster_i std::multimap app_count_multimap = utils::flip_map(app_map); if (app_count_multimap.rbegin()->first <= app_count_multimap.begin()->first + 1 && (app_cluster_min_set.empty() || app_cluster_max_set.empty())) { - ddebug_f( - "do not move replicas of a balanced app if the least (most) loaded servers overall " - "do not intersect the servers hosting the least (most) replicas of the app"); + ddebug_f("do not move replicas of a balanced app({}) if the least (most) loaded " + "servers overall do not intersect the servers hosting the least (most) " + "replicas of the app", + app_id); continue; } @@ -1244,59 +1253,69 @@ bool greedy_load_balancer::pick_up_move(const cluster_migration_info &cluster_in const partition_set &selected_pid, /*out*/ move_info &move_info) { - rpc_address max_load_node; - std::string max_load_disk; - partition_set max_load_partitions; - get_max_load_disk( - cluster_info, max_nodes, app_id, max_load_node, max_load_disk, max_load_partitions); - + std::set max_load_disk_set; + get_max_load_disk_set(cluster_info, max_nodes, app_id, max_load_disk_set); + if (max_load_disk_set.empty()) { + return false; + } + auto index = rand() % max_load_disk_set.size(); + auto max_load_disk = *select_random(max_load_disk_set, index); + ddebug_f("most load disk({}) on node({}) is picked, has {} partition", + max_load_disk.node.to_string(), + max_load_disk.disk_tag, + max_load_disk.partitions.size()); for (const auto &node_addr : min_nodes) { gpid picked_pid; if (pick_up_partition( - cluster_info, node_addr, max_load_partitions, selected_pid, picked_pid)) { + cluster_info, node_addr, max_load_disk.partitions, selected_pid, picked_pid)) { move_info.pid = picked_pid; - move_info.source_node = max_load_node; - move_info.source_disk_tag = max_load_disk; + move_info.source_node = max_load_disk.node; + move_info.source_disk_tag = max_load_disk.disk_tag; move_info.target_node = node_addr; move_info.type = cluster_info.type == cluster_balance_type::COPY_SECONDARY ? balance_type::copy_secondary : balance_type::copy_primary; ddebug_f("partition[{}] will migrate from {} to {}", picked_pid, - max_load_node.to_string(), + max_load_disk.node.to_string(), node_addr.to_string()); return true; } } + ddebug_f("can not find a partition(app_id={}) from random max load disk(node={}, disk={})", + app_id, + max_load_disk.node.to_string(), + max_load_disk.disk_tag); return false; } -void greedy_load_balancer::get_max_load_disk(const cluster_migration_info &cluster_info, - const std::set &max_nodes, - const int32_t app_id, - /*out*/ rpc_address &picked_node, - /*out*/ std::string &picked_disk, - /*out*/ partition_set &target_partitions) +void greedy_load_balancer::get_max_load_disk_set(const cluster_migration_info &cluster_info, + const std::set &max_nodes, + const int32_t app_id, + /*out*/ std::set &max_load_disk_set) { - int32_t max_load_size = 0; + // key: partition count (app_disk_info.partitions.size()) + // value: app_disk_info structure + std::multimap app_disk_info_multimap; for (const auto &node_addr : max_nodes) { // key: disk_tag // value: partition set for app(app id=app_id) in node(addr=node_addr) std::map disk_partitions = get_disk_partitions_map(cluster_info, node_addr, app_id); for (const auto &kv : disk_partitions) { - if (kv.second.size() > max_load_size) { - picked_node = node_addr; - picked_disk = kv.first; - target_partitions = kv.second; - max_load_size = kv.second.size(); - } + app_disk_info info; + info.app_id = app_id; + info.node = node_addr; + info.disk_tag = kv.first; + info.partitions = kv.second; + app_disk_info_multimap.insert( + std::pair(kv.second.size(), info)); } } - ddebug_f("most load is node({}), disk_tag({}), target partition count = {}", - picked_node.to_string(), - picked_disk, - target_partitions.size()); + auto range = app_disk_info_multimap.equal_range(app_disk_info_multimap.rbegin()->first); + for (auto iter = range.first; iter != range.second; ++iter) { + max_load_disk_set.insert(iter->second); + } } std::map greedy_load_balancer::get_disk_partitions_map( diff --git a/src/meta/greedy_load_balancer.h b/src/meta/greedy_load_balancer.h index d0a2fdac36..2d46bf4869 100644 --- a/src/meta/greedy_load_balancer.h +++ b/src/meta/greedy_load_balancer.h @@ -216,6 +216,25 @@ class greedy_load_balancer : public server_load_balancer std::map replicas_count; }; + struct app_disk_info + { + int32_t app_id; + rpc_address node; + std::string disk_tag; + partition_set partitions; + bool operator==(const app_disk_info &another) const + { + return app_id == another.app_id && node == another.node && disk_tag == another.disk_tag; + } + bool operator<(const app_disk_info &another) const + { + if (app_id < another.app_id || (app_id == another.app_id && node < another.node) || + (app_id == another.app_id && node == another.node && disk_tag < another.disk_tag)) + return true; + return false; + } + }; + struct move_info { gpid pid; @@ -249,12 +268,10 @@ class greedy_load_balancer : public server_load_balancer const partition_set &selected_pid, /*out*/ move_info &move_info); - void get_max_load_disk(const cluster_migration_info &cluster_info, - const std::set &max_nodes, - const int32_t app_id, - /*out*/ rpc_address &picked_node, - /*out*/ std::string &picked_disk, - /*out*/ partition_set &target_partitions); + void get_max_load_disk_set(const cluster_migration_info &cluster_info, + const std::set &max_nodes, + const int32_t app_id, + /*out*/ std::set &max_load_disk_set); std::map get_disk_partitions_map( const cluster_migration_info &cluster_info, const rpc_address &addr, const int32_t app_id); @@ -302,7 +319,7 @@ class greedy_load_balancer : public server_load_balancer FRIEND_TEST(greedy_load_balancer, get_app_migration_info); FRIEND_TEST(greedy_load_balancer, get_node_migration_info); FRIEND_TEST(greedy_load_balancer, get_disk_partitions_map); - FRIEND_TEST(greedy_load_balancer, get_max_load_disk); + FRIEND_TEST(greedy_load_balancer, get_max_load_disk_set); FRIEND_TEST(greedy_load_balancer, apply_move); FRIEND_TEST(greedy_load_balancer, pick_up_partition); }; diff --git a/src/meta/test/greedy_load_balancer_test.cpp b/src/meta/test/greedy_load_balancer_test.cpp index 4f875a10db..5855730f21 100644 --- a/src/meta/test/greedy_load_balancer_test.cpp +++ b/src/meta/test/greedy_load_balancer_test.cpp @@ -224,41 +224,54 @@ TEST(greedy_load_balancer, get_disk_partitions_map) ASSERT_EQ(disk_partitions[disk_tag].count(pid), 1); } -TEST(greedy_load_balancer, get_max_load_disk) +TEST(greedy_load_balancer, get_max_load_disk_set) { greedy_load_balancer::cluster_migration_info cluster_info; cluster_info.type = cluster_balance_type::COPY_SECONDARY; - rpc_address addr(1, 10086); int32_t app_id = 1; + rpc_address addr(1, 10086); + rpc_address addr2(2, 10086); std::map partition; partition[addr] = partition_status::PS_SECONDARY; + std::map partition2; + partition2[addr] = partition_status::PS_SECONDARY; + partition2[addr2] = partition_status::PS_SECONDARY; greedy_load_balancer::app_migration_info app_info; app_info.partitions.push_back(partition); + app_info.partitions.push_back(partition2); cluster_info.apps_info[app_id] = app_info; + greedy_load_balancer::node_migration_info node_info; partition_set partitions; gpid pid(app_id, 0); partitions.insert(pid); - greedy_load_balancer::node_migration_info node_info; std::string disk_tag = "disk1"; node_info.partitions[disk_tag] = partitions; + partition_set partitions2; + gpid pid2(app_id, 1); + partitions2.insert(pid2); + std::string disk_tag2 = "disk2"; + node_info.partitions[disk_tag2] = partitions2; cluster_info.nodes_info[addr] = node_info; + greedy_load_balancer::node_migration_info node_info2; + partition_set partitions3; + gpid pid3(app_id, 1); + partitions3.insert(pid3); + std::string disk_tag3 = "disk3"; + node_info2.partitions[disk_tag3] = partitions3; + cluster_info.nodes_info[addr2] = node_info2; + + greedy_load_balancer balancer(nullptr); std::set max_nodes; max_nodes.insert(addr); + max_nodes.insert(addr2); - greedy_load_balancer balancer(nullptr); - rpc_address picked_node; - std::string picked_disk; - partition_set target_partitions; - balancer.get_max_load_disk( - cluster_info, max_nodes, app_id, picked_node, picked_disk, target_partitions); - - ASSERT_EQ(picked_node, addr); - ASSERT_EQ(picked_disk, disk_tag); - ASSERT_EQ(target_partitions.size(), 1); - ASSERT_EQ(target_partitions.count(pid), 1); + std::set max_load_disk_set; + balancer.get_max_load_disk_set(cluster_info, max_nodes, app_id, max_load_disk_set); + + ASSERT_EQ(max_load_disk_set.size(), 3); } TEST(greedy_load_balancer, apply_move)