Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(split): update register_child with split_status #643

Merged
merged 4 commits into from
Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 63 additions & 38 deletions src/meta/meta_split_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,47 +131,74 @@ void meta_split_service::do_start_partition_split(std::shared_ptr<app_state> app
void meta_split_service::register_child_on_meta(register_child_rpc rpc)
{
const auto &request = rpc.request();
const std::string &app_name = request.app.app_name;
auto &response = rpc.response();
response.err = ERR_IO_PENDING;

zauto_write_lock(app_lock());
std::shared_ptr<app_state> app = _state->get_app(request.app.app_id);
dassert_f(app != nullptr, "app is not existed, id({})", request.app.app_id);
dassert_f(app->is_stateful, "app is stateless currently, id({})", request.app.app_id);

dsn::gpid parent_gpid = request.parent_config.pid;
dsn::gpid child_gpid = request.child_config.pid;
const partition_configuration &parent_config =
app->partitions[parent_gpid.get_partition_index()];
const partition_configuration &child_config = app->partitions[child_gpid.get_partition_index()];
config_context &parent_context = app->helpers->contexts[parent_gpid.get_partition_index()];

if (request.parent_config.ballot < parent_config.ballot) {
dwarn_f("partition({}) register child failed, request is out-dated, request ballot = {}, "
"meta ballot = {}",
parent_gpid,
request.parent_config.ballot,
parent_config.ballot);
std::shared_ptr<app_state> app = _state->get_app(app_name);
dassert_f(app != nullptr, "app({}) is not existed", app_name);
dassert_f(app->is_stateful, "app({}) is stateless currently", app_name);

const gpid &parent_gpid = request.parent_config.pid;
const gpid &child_gpid = request.child_config.pid;
const auto &parent_config = app->partitions[parent_gpid.get_partition_index()];
if (request.parent_config.ballot != parent_config.ballot) {
derror_f("app({}) partition({}) register child({}) failed, request is outdated, request "
"parent ballot = {}, local parent ballot = {}",
app_name,
parent_gpid,
child_gpid,
request.parent_config.ballot,
parent_config.ballot);
response.err = ERR_INVALID_VERSION;
response.parent_config = parent_config;
return;
}

if (child_config.ballot != invalid_ballot) {
dwarn_f(
"duplicated register child request, child({}) has already been registered, ballot = {}",
child_gpid,
child_config.ballot);
config_context &parent_context = app->helpers->contexts[parent_gpid.get_partition_index()];
if (parent_context.stage == config_status::pending_proposal ||
parent_context.stage == config_status::pending_remote_sync) {
dwarn_f("app({}) partition({}): another request is syncing with remote storage, ignore "
"this request",
app_name,
parent_gpid);
return;
}

// TODO(heyuchen): pause/cancel split check

auto iter = app->helpers->split_states.status.find(parent_gpid.get_partition_index());
if (iter == app->helpers->split_states.status.end()) {
derror_f(
"duplicated register request, app({}) child partition({}) has already been registered",
app_name,
child_gpid);
const auto &child_config = app->partitions[child_gpid.get_partition_index()];
dassert_f(child_config.ballot > 0,
"app({}) partition({}) should have been registered",
app_name,
child_gpid);
response.err = ERR_CHILD_REGISTERED;
response.parent_config = parent_config;
return;
}

if (parent_context.stage == config_status::pending_proposal ||
parent_context.stage == config_status::pending_remote_sync) {
dwarn_f("another request is syncing with remote storage, ignore this request");
if (iter->second != split_status::SPLITTING) {
derror_f(
"app({}) partition({}) register child({}) failed, current partition split_status = {}",
app_name,
parent_gpid,
child_gpid,
dsn::enum_to_string(iter->second));
response.err = ERR_INVALID_STATE;
return;
}

ddebug_f("parent({}) will register child({})", parent_gpid, child_gpid);
app->helpers->split_states.status.erase(parent_gpid.get_partition_index());
app->helpers->split_states.splitting_count--;
ddebug_f("app({}) parent({}) will register child({})", app_name, parent_gpid, child_gpid);

parent_context.stage = config_status::pending_remote_sync;
parent_context.msg = rpc.dsn_request();
parent_context.pending_sync_task = add_child_on_remote_storage(rpc, true);
Expand Down Expand Up @@ -211,19 +238,17 @@ void meta_split_service::on_add_child_on_remote_storage_reply(error_code ec,
register_child_rpc rpc,
bool create_new)
{
zauto_write_lock(app_lock());

const auto &request = rpc.request();
auto &response = rpc.response();
const std::string &app_name = request.app.app_name;

std::shared_ptr<app_state> app = _state->get_app(request.app.app_id);
dassert_f(app != nullptr, "app is not existed, id({})", request.app.app_id);
dassert_f(app->status == app_status::AS_AVAILABLE || app->status == app_status::AS_DROPPING,
"app is not available now, id({})",
request.app.app_id);
zauto_write_lock(app_lock());
std::shared_ptr<app_state> app = _state->get_app(app_name);
dassert_f(app != nullptr, "app({}) is not existed", app_name);
dassert_f(app->is_stateful, "app({}) is stateless currently", app_name);

dsn::gpid parent_gpid = request.parent_config.pid;
dsn::gpid child_gpid = request.child_config.pid;
const gpid &parent_gpid = request.parent_config.pid;
const gpid &child_gpid = request.child_config.pid;
config_context &parent_context = app->helpers->contexts[parent_gpid.get_partition_index()];

if (ec == ERR_TIMEOUT ||
Expand All @@ -241,7 +266,7 @@ void meta_split_service::on_add_child_on_remote_storage_reply(error_code ec,
std::chrono::seconds(delay));
return;
}
dassert_f(ec == ERR_OK, "we can't handle this right now, err = {}", ec.to_string());
dassert_f(ec == ERR_OK, "we can't handle this right now, err = {}", ec);

ddebug_f("parent({}) resgiter child({}) on remote storage succeed", parent_gpid, child_gpid);

Expand All @@ -257,15 +282,15 @@ void meta_split_service::on_add_child_on_remote_storage_reply(error_code ec,
child_config.secondaries = request.child_config.secondaries;
_state->update_configuration_locally(*app, update_child_request);

parent_context.pending_sync_task = nullptr;
parent_context.stage = config_status::not_pending;
if (parent_context.msg) {
response.err = ERR_OK;
response.app = *app;
response.parent_config = app->partitions[parent_gpid.get_partition_index()];
response.child_config = app->partitions[child_gpid.get_partition_index()];
parent_context.msg = nullptr;
}
parent_context.pending_sync_task = nullptr;
parent_context.stage = config_status::not_pending;
}

} // namespace replication
Expand Down
128 changes: 83 additions & 45 deletions src/meta/test/meta_split_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ class meta_split_service_test : public meta_test_base
{
meta_test_base::SetUp();
create_app(NAME, PARTITION_COUNT);
app = find_app(NAME);
}

void TearDown()
{
app.reset();
meta_test_base::TearDown();
}

error_code start_partition_split(const std::string &app_name, int new_partition_count)
Expand All @@ -56,56 +63,64 @@ class meta_split_service_test : public meta_test_base
return rpc.response().err;
}

register_child_response
register_child(ballot req_parent_ballot, ballot child_ballot, bool wait_zk = false)
error_code register_child(int32_t parent_index, ballot req_parent_ballot, bool wait_zk)
{
// mock local app info
auto app = find_app(NAME);
app->partition_count *= 2;
app->partitions.resize(app->partition_count);
app->helpers->contexts.resize(app->partition_count);
for (int i = 0; i < app->partition_count; ++i) {
app->helpers->contexts[i].config_owner = &app->partitions[i];
app->partitions[i].pid = dsn::gpid(app->app_id, i);
if (i >= app->partition_count / 2) {
app->partitions[i].ballot = invalid_ballot;
} else {
app->partitions[i].ballot = PARENT_BALLOT;
}
}
app->partitions[CHILD_INDEX].ballot = child_ballot;

// mock node state
node_state node;
node.put_partition(dsn::gpid(app->app_id, PARENT_INDEX), true);
mock_node_state(dsn::rpc_address("127.0.0.1", 10086), node);

// mock register_child_request
partition_configuration parent_config;
parent_config.ballot = req_parent_ballot;
parent_config.last_committed_decree = 5;
parent_config.max_replica_count = 3;
parent_config.pid = dsn::gpid(app->app_id, PARENT_INDEX);
parent_config.pid = gpid(app->app_id, parent_index);

dsn::partition_configuration child_config;
partition_configuration child_config;
child_config.ballot = PARENT_BALLOT + 1;
child_config.last_committed_decree = 5;
child_config.pid = dsn::gpid(app->app_id, CHILD_INDEX);
child_config.pid = gpid(app->app_id, parent_index + PARTITION_COUNT);

// mock node state
node_state node;
node.put_partition(gpid(app->app_id, PARENT_INDEX), true);
mock_node_state(NODE, node);

// register_child_request request;
auto request = dsn::make_unique<register_child_request>();
request->app.app_name = app->app_name;
request->app.app_id = app->app_id;
request->parent_config = parent_config;
request->child_config = child_config;
request->primary_address = dsn::rpc_address("127.0.0.1", 10086);
request->primary_address = NODE;

register_child_rpc rpc(std::move(request), RPC_CM_REGISTER_CHILD_REPLICA);
split_svc().register_child_on_meta(rpc);
wait_all();
if (wait_zk) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
return rpc.response();
return rpc.response().err;
}

void mock_app_partition_split_context()
{
app->partition_count = NEW_PARTITION_COUNT;
app->partitions.resize(app->partition_count);
app->helpers->contexts.resize(app->partition_count);
app->helpers->split_states.splitting_count = app->partition_count / 2;
for (int i = 0; i < app->partition_count; ++i) {
app->helpers->contexts[i].config_owner = &app->partitions[i];
app->partitions[i].pid = gpid(app->app_id, i);
if (i >= app->partition_count / 2) {
app->partitions[i].ballot = invalid_ballot;
} else {
app->partitions[i].ballot = PARENT_BALLOT;
app->helpers->contexts[i].stage = config_status::not_pending;
app->helpers->split_states.status[i] = split_status::SPLITTING;
}
}
}

void mock_child_registered()
{
app->partitions[CHILD_INDEX].ballot = PARENT_BALLOT;
app->helpers->split_states.splitting_count--;
app->helpers->split_states.status.erase(PARENT_INDEX);
}

const std::string NAME = "split_table";
Expand All @@ -114,6 +129,8 @@ class meta_split_service_test : public meta_test_base
const int32_t PARENT_BALLOT = 3;
const int32_t PARENT_INDEX = 0;
const int32_t CHILD_INDEX = 4;
const rpc_address NODE = rpc_address("127.0.0.1", 10086);
std::shared_ptr<app_state> app;
};

// start split unit tests
Expand Down Expand Up @@ -145,23 +162,44 @@ TEST_F(meta_split_service_test, start_split_test)
}
}

// TODO(heyuchen): refactor register unit tests
TEST_F(meta_split_service_test, register_child_with_wrong_ballot)
{
auto resp = register_child(PARENT_BALLOT - 1, invalid_ballot);
ASSERT_EQ(resp.err, ERR_INVALID_VERSION);
}

TEST_F(meta_split_service_test, register_child_with_child_registered)
// register child unit tests
TEST_F(meta_split_service_test, register_child_test)
{
auto resp = register_child(PARENT_BALLOT, PARENT_BALLOT + 1);
ASSERT_EQ(resp.err, ERR_CHILD_REGISTERED);
}
// Test case:
// - request is out-dated
// - child has been registered
// - TODO(heyuchen): parent partition has been paused splitting
// - parent partition is sync config to remote storage
// - register child succeed
struct register_test
{
int32_t parent_ballot;
bool mock_child_registered;
bool mock_parent_paused;
bool mock_pending;
error_code expected_err;
bool wait_zk;
} tests[] = {
{PARENT_BALLOT - 1, false, false, false, ERR_INVALID_VERSION, false},
{PARENT_BALLOT, true, false, false, ERR_CHILD_REGISTERED, false},
{PARENT_BALLOT, false, false, true, ERR_IO_PENDING, false},
{PARENT_BALLOT, false, false, false, ERR_OK, true},
};

TEST_F(meta_split_service_test, register_child_succeed)
{
auto resp = register_child(PARENT_BALLOT, invalid_ballot, true);
ASSERT_EQ(resp.err, ERR_OK);
for (auto test : tests) {
mock_app_partition_split_context();
if (test.mock_child_registered) {
mock_child_registered();
}
if (test.mock_parent_paused) {
// TODO(heyuchen): mock split paused
}
if (test.mock_pending) {
app->helpers->contexts[PARENT_INDEX].stage = config_status::pending_remote_sync;
}
ASSERT_EQ(register_child(PARENT_INDEX, test.parent_ballot, test.wait_zk),
test.expected_err);
}
}

} // namespace replication
Expand Down
5 changes: 5 additions & 0 deletions src/replica/replica_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@

#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/utility/fail_point.h>

namespace dsn {
namespace replication {

void replica::init_group_check()
{
FAIL_POINT_INJECT_F("replica_init_group_check", [](dsn::string_view) {});

_checker.only_one_thread_access();

ddebug("%s: init group check", name());
Expand All @@ -66,6 +69,8 @@ void replica::init_group_check()

void replica::broadcast_group_check()
{
FAIL_POINT_INJECT_F("replica_broadcast_group_check", [](dsn::string_view) {});

dassert(nullptr != _primary_states.group_check_task, "");

ddebug("%s: start to broadcast group check", name());
Expand Down
3 changes: 2 additions & 1 deletion src/replica/replica_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,8 @@ bool replica::update_local_configuration(const replica_configuration &config,
FAIL_POINT_INJECT_F("replica_update_local_configuration", [=](dsn::string_view) -> bool {
auto old_status = status();
_config = config;
ddebug_replica("update status from {} to {}", enum_to_string(old_status), status());
ddebug_replica(
"update status from {} to {}", enum_to_string(old_status), enum_to_string(status()));
return true;
});

Expand Down
Loading