Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat: add function to get cluster migration info #866

Merged
merged 8 commits into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 154 additions & 7 deletions src/meta/greedy_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,44 @@ namespace replication {
DSN_DEFINE_bool("meta_server", balance_cluster, false, "whether to enable cluster balancer");
DSN_TAG_VARIABLE(balance_cluster, FT_MUTABLE);

uint32_t get_count(const node_state &ns, cluster_balance_type type, int32_t app_id)
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
{
unsigned count = 0;
switch (type) {
case cluster_balance_type::Secondary:
if (app_id > 0) {
count = ns.partition_count(app_id) - ns.primary_count(app_id);
} else {
count = ns.partition_count() - ns.primary_count();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
count = ns.partition_count() - ns.primary_count();
count = app_id > 0 : ns.partition_count(app_id) - ns.primary_count(app_id) : ns.partition_count() - ns.primary_count();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's too complex to use ternary expression. I don't think it's a good idea

}
break;
case cluster_balance_type::Primary:
if (app_id > 0) {
count = ns.primary_count(app_id);
} else {
count = ns.primary_count();
}
break;
default:
break;
}
return (uint32_t)count;
}

uint32_t get_skew(const std::map<rpc_address, uint32_t> &count_map)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_diff or get_range?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skew is more accurate

{
uint32_t min = UINT_MAX, max = 0;
for (const auto &kv : count_map) {
if (kv.second < min) {
min = kv.second;
}
if (kv.second > max) {
max = kv.second;
}
}
return max - min;
}

greedy_load_balancer::greedy_load_balancer(meta_service *_svc)
: simple_load_balancer(_svc),
_ctrl_balancer_ignored_apps(nullptr),
Expand Down Expand Up @@ -941,17 +979,16 @@ void greedy_load_balancer::balance_cluster()
}
}

bool need_continue = cluster_replica_balance(cluster_balance_type::Secondary);
bool need_continue = cluster_replica_balance(t_global_view, cluster_balance_type::Secondary);
if (!need_continue) {
return;
}

// TODO(zlw): copy primary
}

bool greedy_load_balancer::cluster_replica_balance(const cluster_balance_type type)
bool greedy_load_balancer::cluster_replica_balance(const meta_view *global_view,
const cluster_balance_type type)
{
bool enough_information = do_cluster_replica_balance(type);
bool enough_information = do_cluster_replica_balance(global_view, type);
if (!enough_information) {
return false;
}
Expand All @@ -963,10 +1000,120 @@ bool greedy_load_balancer::cluster_replica_balance(const cluster_balance_type ty
return true;
}

bool greedy_load_balancer::do_cluster_replica_balance(const cluster_balance_type type)
bool greedy_load_balancer::do_cluster_replica_balance(const meta_view *global_view,
const cluster_balance_type type)
{
return true;
cluster_migration_info cluster_info;
if (!get_cluster_migration_info(global_view, type, cluster_info)) {
return false;
}

/// TBD(zlw)
return true;
}

bool greedy_load_balancer::get_cluster_migration_info(const meta_view *global_view,
const cluster_balance_type type,
/*out*/ cluster_migration_info &cluster_info)
{
const node_mapper &nodes = *global_view->nodes;
if (nodes.size() < 3) {
return false;
}
for (const auto &kv : nodes) {
if (!all_replica_infos_collected(kv.second)) {
return false;
}
}

const app_mapper &all_apps = *global_view->apps;
app_mapper apps;
for (const auto &kv : all_apps) {
const std::shared_ptr<app_state> &app = kv.second;
if (is_ignored_app(app->app_id) || app->is_bulk_loading || app->splitting()) {
return false;
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
}
if (app->status == app_status::AS_AVAILABLE) {
apps[app->app_id] = app;
}
}

for (const auto &kv : apps) {
std::shared_ptr<app_state> app = kv.second;
app_migration_info info;
if (!get_app_migration_info(app, nodes, type, info)) {
return false;
}
cluster_info.apps_info.emplace(kv.first, std::move(info));
cluster_info.apps_skew[kv.first] = get_skew(info.replicas_count);
}

for (const auto &kv : nodes) {
const node_state &ns = kv.second;
node_migration_info info;
get_node_migration_info(ns, apps, info);
cluster_info.nodes_info.emplace(kv.first, std::move(info));

auto count = get_count(ns, type, -1);
cluster_info.replicas_count[kv.first] = count;
}

cluster_info.type = type;
return true;
}

bool greedy_load_balancer::get_app_migration_info(std::shared_ptr<app_state> app,
const node_mapper &nodes,
const cluster_balance_type type,
app_migration_info &info)
{
info.app_id = app->app_id;
info.app_name = app->app_name;
info.partitions.resize(app->partitions.size());
for (auto i = 0; i < app->partitions.size(); ++i) {
std::map<rpc_address, partition_status::type> pstatus_map;
pstatus_map[app->partitions[i].primary] = partition_status::PS_PRIMARY;
if (app->partitions[i].secondaries.size() != app->partitions[i].max_replica_count - 1) {
// partition is unhealthy
return false;
}
for (const auto &addr : app->partitions[i].secondaries) {
pstatus_map[addr] = partition_status::PS_SECONDARY;
}
info.partitions[i] = pstatus_map;
}

for (const auto &it : nodes) {
const node_state &ns = it.second;
auto count = get_count(ns, type, app->app_id);
info.replicas_count[ns.addr()] = count;
}

return true;
}

void greedy_load_balancer::get_node_migration_info(const node_state &ns,
const app_mapper &apps,
/*out*/ node_migration_info &info)
{
info.address = ns.addr();
for (const auto &iter : apps) {
std::shared_ptr<app_state> app = iter.second;
for (const auto &context : app->helpers->contexts) {
std::string disk_tag;
if (!context.get_disk_tag(ns.addr(), disk_tag)) {
continue;
}
auto pid = context.config_owner->pid;
if (info.partitions.find(disk_tag) != info.partitions.end()) {
info.partitions[disk_tag].insert(pid);
} else {
partition_set pset;
pset.insert(pid);
info.partitions.emplace(disk_tag, pset);
}
}
}
}

bool greedy_load_balancer::balance(meta_view view, migration_list &list)
Expand Down
80 changes: 78 additions & 2 deletions src/meta/greedy_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ ENUM_REG(cluster_balance_type::Primary)
ENUM_REG(cluster_balance_type::Secondary)
ENUM_END(cluster_balance_type)

uint32_t get_count(const node_state &ns, cluster_balance_type type, int32_t app_id);
uint32_t get_skew(const std::map<rpc_address, uint32_t> &count_map);

class greedy_load_balancer : public simple_load_balancer
{
public:
Expand Down Expand Up @@ -150,9 +153,75 @@ class greedy_load_balancer : public simple_load_balancer

void balance_cluster();

bool cluster_replica_balance(const cluster_balance_type type);
bool cluster_replica_balance(const meta_view *global_view, const cluster_balance_type type);

bool do_cluster_replica_balance(const meta_view *global_view, const cluster_balance_type type);

struct app_migration_info
{
int32_t app_id;
std::string app_name;
std::vector<std::map<rpc_address, partition_status::type>> partitions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

std::map<rpc_address, uint32_t> replicas_count;
bool operator<(const app_migration_info &another) const
{
if (app_id < another.app_id)
return true;
return false;
}
bool operator==(const app_migration_info &another) const
{
return app_id == another.app_id;
}
partition_status::type get_partition_status(int32_t pidx, rpc_address addr)
{
for (const auto &kv : partitions[pidx]) {
if (kv.first == addr) {
return kv.second;
}
}
return partition_status::PS_INACTIVE;
}
};

struct node_migration_info
{
rpc_address address;
std::map<std::string, partition_set> partitions;
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
partition_set future_partitions;
bool operator<(const node_migration_info &another) const
{
if (address < another.address)
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
return true;
return false;
}
bool operator==(const node_migration_info &another) const
{
return address == another.address;
}
};

struct cluster_migration_info
{
cluster_balance_type type;
std::map<int32_t, uint32_t> apps_skew;
std::map<int32_t, app_migration_info> apps_info;
std::map<rpc_address, node_migration_info> nodes_info;
std::map<rpc_address, uint32_t> replicas_count;
};

bool do_cluster_replica_balance(const cluster_balance_type type);
bool get_cluster_migration_info(const meta_view *global_view,
const cluster_balance_type type,
/*out*/ cluster_migration_info &cluster_info);

bool get_app_migration_info(std::shared_ptr<app_state> app,
const node_mapper &nodes,
const cluster_balance_type type,
/*out*/ app_migration_info &info);

void get_node_migration_info(const node_state &ns,
const app_mapper &all_apps,
/*out*/ node_migration_info &info);

bool all_replica_infos_collected(const node_state &ns);
// using t_global_view to get disk_tag of node's pid
Expand All @@ -178,6 +247,13 @@ class greedy_load_balancer : public simple_load_balancer
std::string clear_balancer_ignored_app_ids();

bool is_ignored_app(app_id app_id);

FRIEND_TEST(greedy_load_balancer, app_migration_info);
FRIEND_TEST(greedy_load_balancer, node_migration_info);
FRIEND_TEST(greedy_load_balancer, get_skew);
FRIEND_TEST(greedy_load_balancer, get_count);
FRIEND_TEST(greedy_load_balancer, get_app_migration_info);
FRIEND_TEST(greedy_load_balancer, get_node_migration_info);
};

inline configuration_proposal_action
Expand Down
14 changes: 12 additions & 2 deletions src/meta/meta_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,16 @@ void config_context::adjust_proposal(const rpc_address &node, const replica_info
lb_actions.track_current_learner(node, info);
}

bool config_context::get_disk_tag(const rpc_address &node, /*out*/ std::string &disk_tag) const
{
auto iter = find_from_serving(node);
if (iter == serving.end()) {
return false;
}
disk_tag = iter->disk_tag;
return true;
}

void app_state_helper::on_init_partitions()
{
config_context context;
Expand Down Expand Up @@ -546,5 +556,5 @@ partition_status::type node_state::served_as(const gpid &pid) const
return partition_status::PS_SECONDARY;
return partition_status::PS_INACTIVE;
}
}
}
} // namespace replication
} // namespace dsn
2 changes: 2 additions & 0 deletions src/meta/meta_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ class config_context

void adjust_proposal(const dsn::rpc_address &node, const replica_info &info);

bool get_disk_tag(const rpc_address &node, /*out*/ std::string &disk_tag) const;

public:
// intialize to 4 statically.
// and will be set by load-balancer module
Expand Down
Loading