diff --git a/include/dsn/dist/replication/replication_types.h b/include/dsn/dist/replication/replication_types.h index 728e5f6861..d86fb9f376 100644 --- a/include/dsn/dist/replication/replication_types.h +++ b/include/dsn/dist/replication/replication_types.h @@ -1563,7 +1563,8 @@ typedef struct _group_check_response__isset last_committed_decree_in_prepare_list(false), learner_status_(true), learner_signature(false), - node(false) + node(false), + is_split_stopped(false) { } bool pid : 1; @@ -1573,6 +1574,7 @@ typedef struct _group_check_response__isset bool learner_status_ : 1; bool learner_signature : 1; bool node : 1; + bool is_split_stopped : 1; } _group_check_response__isset; class group_check_response @@ -1586,7 +1588,8 @@ class group_check_response : last_committed_decree_in_app(0), last_committed_decree_in_prepare_list(0), learner_status_((learner_status::type)0), - learner_signature(0) + learner_signature(0), + is_split_stopped(0) { learner_status_ = (learner_status::type)0; } @@ -1599,6 +1602,7 @@ class group_check_response learner_status::type learner_status_; int64_t learner_signature; ::dsn::rpc_address node; + bool is_split_stopped; _group_check_response__isset __isset; @@ -1616,6 +1620,8 @@ class group_check_response void __set_node(const ::dsn::rpc_address &val); + void __set_is_split_stopped(const bool val); + bool operator==(const group_check_response &rhs) const { if (!(pid == rhs.pid)) @@ -1632,6 +1638,10 @@ class group_check_response return false; if (!(node == rhs.node)) return false; + if (__isset.is_split_stopped != rhs.__isset.is_split_stopped) + return false; + else if (__isset.is_split_stopped && !(is_split_stopped == rhs.is_split_stopped)) + return false; return true; } bool operator!=(const group_check_response &rhs) const { return !(*this == rhs); } diff --git a/src/common/replication_types.cpp b/src/common/replication_types.cpp index 870bffe5fb..91e6c9229b 100644 --- a/src/common/replication_types.cpp +++ b/src/common/replication_types.cpp @@ -2750,6 +2750,12 @@ void group_check_response::__set_learner_signature(const int64_t val) void group_check_response::__set_node(const ::dsn::rpc_address &val) { this->node = val; } +void group_check_response::__set_is_split_stopped(const bool val) +{ + this->is_split_stopped = val; + __isset.is_split_stopped = true; +} + uint32_t group_check_response::read(::apache::thrift::protocol::TProtocol *iprot) { @@ -2827,6 +2833,14 @@ uint32_t group_check_response::read(::apache::thrift::protocol::TProtocol *iprot xfer += iprot->skip(ftype); } break; + case 8: + if (ftype == ::apache::thrift::protocol::T_BOOL) { + xfer += iprot->readBool(this->is_split_stopped); + this->__isset.is_split_stopped = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -2875,6 +2889,11 @@ uint32_t group_check_response::write(::apache::thrift::protocol::TProtocol *opro xfer += this->node.write(oprot); xfer += oprot->writeFieldEnd(); + if (this->__isset.is_split_stopped) { + xfer += oprot->writeFieldBegin("is_split_stopped", ::apache::thrift::protocol::T_BOOL, 8); + xfer += oprot->writeBool(this->is_split_stopped); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -2890,6 +2909,7 @@ void swap(group_check_response &a, group_check_response &b) swap(a.learner_status_, b.learner_status_); swap(a.learner_signature, b.learner_signature); swap(a.node, b.node); + swap(a.is_split_stopped, b.is_split_stopped); swap(a.__isset, b.__isset); } @@ -2902,6 +2922,7 @@ group_check_response::group_check_response(const group_check_response &other73) learner_status_ = other73.learner_status_; learner_signature = other73.learner_signature; node = other73.node; + is_split_stopped = other73.is_split_stopped; __isset = other73.__isset; } group_check_response::group_check_response(group_check_response &&other74) @@ -2914,6 +2935,7 @@ group_check_response::group_check_response(group_check_response &&other74) learner_status_ = std::move(other74.learner_status_); learner_signature = std::move(other74.learner_signature); node = std::move(other74.node); + is_split_stopped = std::move(other74.is_split_stopped); __isset = std::move(other74.__isset); } group_check_response &group_check_response::operator=(const group_check_response &other75) @@ -2925,6 +2947,7 @@ group_check_response &group_check_response::operator=(const group_check_response learner_status_ = other75.learner_status_; learner_signature = other75.learner_signature; node = other75.node; + is_split_stopped = other75.is_split_stopped; __isset = other75.__isset; return *this; } @@ -2938,6 +2961,7 @@ group_check_response &group_check_response::operator=(group_check_response &&oth learner_status_ = std::move(other76.learner_status_); learner_signature = std::move(other76.learner_signature); node = std::move(other76.node); + is_split_stopped = std::move(other76.is_split_stopped); __isset = std::move(other76.__isset); return *this; } @@ -2959,6 +2983,9 @@ void group_check_response::printTo(std::ostream &out) const << "learner_signature=" << to_string(learner_signature); out << ", " << "node=" << to_string(node); + out << ", " + << "is_split_stopped="; + (__isset.is_split_stopped ? (out << to_string(is_split_stopped)) : (out << "")); out << ")"; } diff --git a/src/replica/replica_check.cpp b/src/replica/replica_check.cpp index b86bc33dac..d7d891dec5 100644 --- a/src/replica/replica_check.cpp +++ b/src/replica/replica_check.cpp @@ -234,6 +234,7 @@ void replica::on_group_check_reply(error_code err, req->config.status == partition_status::PS_POTENTIAL_SECONDARY) { handle_learning_succeeded_on_primary(req->node, resp->learner_signature); } + _split_mgr->primary_parent_handle_stop_split(req, resp); } } diff --git a/src/replica/replica_context.cpp b/src/replica/replica_context.cpp index c306b3702c..3fca53e217 100644 --- a/src/replica/replica_context.cpp +++ b/src/replica/replica_context.cpp @@ -67,9 +67,6 @@ void primary_context::cleanup(bool clean_pending_mutations) // clean up checkpoint CLEANUP_TASK_ALWAYS(checkpoint_task) - // clean up register child task - CLEANUP_TASK_ALWAYS(register_child_task) - // cleanup group bulk load for (auto &kv : group_bulk_load_pending_replies) { CLEANUP_TASK_ALWAYS(kv.second); @@ -78,11 +75,9 @@ void primary_context::cleanup(bool clean_pending_mutations) membership.ballot = 0; - caught_up_children.clear(); - - sync_send_write_request = false; - cleanup_bulk_load_states(); + + cleanup_split_states(); } bool primary_context::is_cleaned() @@ -170,6 +165,15 @@ void primary_context::cleanup_bulk_load_states() ingestion_is_empty_prepare_sent = false; } +void primary_context::cleanup_split_states() +{ + CLEANUP_TASK_ALWAYS(register_child_task) + + caught_up_children.clear(); + sync_send_write_request = false; + split_stopped_secondary.clear(); +} + bool secondary_context::cleanup(bool force) { CLEANUP_TASK(checkpoint_task, force) diff --git a/src/replica/replica_context.h b/src/replica/replica_context.h index 354d510ecb..ff01f343f9 100644 --- a/src/replica/replica_context.h +++ b/src/replica/replica_context.h @@ -102,6 +102,8 @@ class primary_context void cleanup_bulk_load_states(); + void cleanup_split_states(); + public: // membership mgr, including learners partition_configuration membership; @@ -152,6 +154,10 @@ class primary_context // primary parent register child on meta_server task dsn::task_ptr register_child_task; + // Used partition split + // secondary replica address who has paused or canceled split + std::unordered_set split_stopped_secondary; + // Used for bulk load // group bulk_load response tasks of RPC_GROUP_BULK_LOAD for each secondary replica node_tasks group_bulk_load_pending_replies; diff --git a/src/replica/split/replica_split_manager.cpp b/src/replica/split/replica_split_manager.cpp index 4678258286..b0aa34fd8d 100644 --- a/src/replica/split/replica_split_manager.cpp +++ b/src/replica/split/replica_split_manager.cpp @@ -68,8 +68,9 @@ void replica_split_manager::parent_start_split( return; } - // TODO(heyuchen): if partition is primary, reset split related varieties - + if (status() == partition_status::PS_PRIMARY) { + _replica->_primary_states.cleanup_split_states(); + } _partition_version.store(_replica->_app_info.partition_count - 1); _split_status = split_status::SPLITTING; @@ -1144,6 +1145,17 @@ void replica_split_manager::trigger_primary_parent_split( return; } + if (meta_split_status == split_status::PAUSING || + meta_split_status == split_status::CANCELING) { + parent_stop_split(meta_split_status); + return; + } + + if (meta_split_status == split_status::PAUSED) { + dwarn_replica("split has been paused, ignore it"); + return; + } + // TODO(heyuchen): add other split_status check } @@ -1169,7 +1181,11 @@ void replica_split_manager::trigger_secondary_parent_split( return; } - // TODO(heyuchen): add other split_status check, response will be used in future + if (request.meta_split_status == split_status::PAUSING || + request.meta_split_status == split_status::CANCELING) { // secondary pause or cancel split + parent_stop_split(request.meta_split_status); + response.__set_is_split_stopped(true); + } } // ThreadPool: THREAD_POOL_REPLICATION @@ -1257,5 +1273,81 @@ void replica_split_manager::on_copy_mutation_reply(error_code ec, // TBD } +// ThreadPool: THREAD_POOL_REPLICATION +void replica_split_manager::parent_stop_split( + split_status::type meta_split_status) // on parent partition +{ + dassert_replica(status() == partition_status::PS_PRIMARY || + status() == partition_status::PS_SECONDARY, + "wrong partition_status({})", + enum_to_string(status())); + dassert_replica(_split_status == split_status::SPLITTING || + _split_status == split_status::NOT_SPLIT, + "wrong split_status({})", + enum_to_string(_split_status)); + + auto old_status = _split_status; + if (_split_status == split_status::SPLITTING) { + _stub->split_replica_error_handler( + _child_gpid, + std::bind(&replica_split_manager::child_handle_split_error, + std::placeholders::_1, + "stop partition split")); + parent_cleanup_split_context(); + } + _partition_version.store(_replica->_app_info.partition_count - 1); + + if (status() == partition_status::PS_PRIMARY) { + _replica->_primary_states.sync_send_write_request = false; + _replica->broadcast_group_check(); + } + ddebug_replica( + "{} split succeed, status = {}, old split_status = {}, child partition_index = {}", + meta_split_status == split_status::PAUSING ? "pause" : "cancel", + enum_to_string(status()), + enum_to_string(old_status), + get_gpid().get_partition_index() + _replica->_app_info.partition_count); +} + +// ThreadPool: THREAD_POOL_REPLICATION +void replica_split_manager::primary_parent_handle_stop_split( + const std::shared_ptr &req, + const std::shared_ptr &resp) // on primary parent partition +{ + if (!req->__isset.meta_split_status || (req->meta_split_status != split_status::PAUSING && + req->meta_split_status != split_status::CANCELING)) { + // partition is not executing split or not stopping split + return; + } + + if (!resp->__isset.is_split_stopped || !resp->is_split_stopped) { + // secondary has not stopped split + return; + } + + _replica->_primary_states.split_stopped_secondary.insert(req->node); + auto count = 0; + for (auto &iter : _replica->_primary_states.statuses) { + if (iter.second == partition_status::PS_SECONDARY && + _replica->_primary_states.split_stopped_secondary.find(iter.first) != + _replica->_primary_states.split_stopped_secondary.end()) { + ++count; + } + } + // all secondaries have already stop split succeed + if (count == _replica->_primary_states.membership.max_replica_count - 1) { + _replica->_primary_states.cleanup_split_states(); + parent_send_notify_stop_request(req->meta_split_status); + } +} + +// ThreadPool: THREAD_POOL_REPLICATION +void replica_split_manager::parent_send_notify_stop_request( + split_status::type meta_split_status) // on primary parent +{ + FAIL_POINT_INJECT_F("replica_parent_send_notify_stop_request", [](dsn::string_view) {}); + // TODO(hyc): TBD +} + } // namespace replication } // namespace dsn diff --git a/src/replica/split/replica_split_manager.h b/src/replica/split/replica_split_manager.h index 17b4ca06ae..93a5e2f4e6 100644 --- a/src/replica/split/replica_split_manager.h +++ b/src/replica/split/replica_split_manager.h @@ -154,6 +154,15 @@ class replica_split_manager : replica_base // when child copy mutation synchronously, parent replica handle child ack void on_copy_mutation_reply(dsn::error_code ec, ballot b, decree d); + // parent partition pause or cancel split + void parent_stop_split(split_status::type meta_split_status); + + // called by `on_group_check_reply` in `replica_check.cpp` + // if group all replica pause/cancel split, send notify request to meta server + void primary_parent_handle_stop_split(const std::shared_ptr &req, + const std::shared_ptr &resp); + void parent_send_notify_stop_request(split_status::type meta_split_status); + // // helper functions // diff --git a/src/replica/split/test/replica_split_test.cpp b/src/replica/split/test/replica_split_test.cpp index 6158b991de..4cdb978b6a 100644 --- a/src/replica/split/test/replica_split_test.cpp +++ b/src/replica/split/test/replica_split_test.cpp @@ -362,6 +362,10 @@ class replica_split_test : public replica_test_base req.config.ballot = INIT_BALLOT; req.config.status = partition_status::PS_SECONDARY; req.node = SECONDARY; + if (meta_split_status == split_status::PAUSING || + meta_split_status == split_status::CANCELING) { + req.__set_meta_split_status(meta_split_status); + } if (meta_split_status == split_status::NOT_SPLIT) { req.app.partition_count *= 2; } @@ -373,6 +377,39 @@ class replica_split_test : public replica_test_base return resp; } + void test_primary_parent_handle_stop_split(split_status::type meta_split_status, + bool lack_of_secondary, + bool will_all_stop) + { + _parent_replica->set_partition_status(partition_status::PS_PRIMARY); + _parent_replica->_primary_states.statuses[PRIMARY] = partition_status::PS_PRIMARY; + _parent_replica->_primary_states.statuses[SECONDARY] = partition_status::PS_SECONDARY; + _parent_replica->_primary_states.statuses[SECONDARY2] = + lack_of_secondary ? partition_status::PS_POTENTIAL_SECONDARY + : partition_status::PS_SECONDARY; + _parent_replica->_primary_states.sync_send_write_request = true; + _parent_replica->_primary_states.split_stopped_secondary.clear(); + mock_parent_primary_configuration(lack_of_secondary); + + std::shared_ptr req = std::make_shared(); + std::shared_ptr resp = std::make_shared(); + req->node = SECONDARY; + if (meta_split_status != split_status::NOT_SPLIT) { + req->__set_meta_split_status(meta_split_status); + } + + if (meta_split_status == split_status::PAUSING || + meta_split_status == split_status::CANCELING) { + resp->__set_is_split_stopped(true); + if (will_all_stop) { + _parent_replica->_primary_states.split_stopped_secondary.insert(SECONDARY2); + } + } + + _parent_split_mgr->primary_parent_handle_stop_split(req, resp); + _parent_replica->tracker()->wait_outstanding_tasks(); + } + /// helper functions void cleanup_prepare_list(mock_replica_ptr rep) { rep->_prepare_list->reset(0); } void cleanup_child_split_context() @@ -402,12 +439,23 @@ class replica_split_test : public replica_test_base { return _parent_replica->_primary_states.sync_send_write_request; } + int32_t parent_stopped_split_size() + { + return _parent_replica->_primary_states.split_stopped_secondary.size(); + } bool is_parent_not_in_split() { return _parent_split_mgr->_child_gpid.get_app_id() == 0 && _parent_split_mgr->_child_init_ballot == 0 && _parent_split_mgr->_split_status == split_status::NOT_SPLIT; } + bool primary_parent_not_in_split() + { + auto context = _parent_replica->_primary_states; + return context.caught_up_children.size() == 0 && context.register_child_task == nullptr && + context.sync_send_write_request == false && + context.split_stopped_secondary.size() == 0 && is_parent_not_in_split(); + } public: const std::string APP_NAME = "split_table"; @@ -738,7 +786,7 @@ TEST_F(replica_split_test, register_child_reply_test) test_on_register_child_reply(test.parent_partition_status, test.resp_err); ASSERT_EQ(_parent_replica->status(), partition_status::PS_PRIMARY); if (test.parent_partition_status == partition_status::PS_INACTIVE) { - // TODO(heyuchen): check primary parent finish split + ASSERT_TRUE(primary_parent_not_in_split()); ASSERT_EQ(_parent_split_mgr->get_partition_version(), test.expected_parent_partition_version); } @@ -755,26 +803,38 @@ TEST_F(replica_split_test, trigger_primary_parent_split_test) // - meta splitting with lack of secondary // - meta splitting with local not_split(See parent_start_split_tests) // - meta splitting with local splitting(See parent_start_split_tests) - // - TODO(heyuchen): add other split status + // - meta pausing with local not_split + // - meta pausing with local splitting + // - meta canceling with local not_split + // - meta canceling with local splitting + // - meta paused with local not_split + // - meta not_split with local splitting(See query_child_tests) struct primary_parent_test { bool lack_of_secondary; split_status::type meta_split_status; int32_t old_partition_version; split_status::type old_split_status; - split_status::type expected_split_status; - } tests[]{{true, - split_status::SPLITTING, - OLD_PARTITION_COUNT - 1, - split_status::NOT_SPLIT, - split_status::NOT_SPLIT}}; - + } tests[]{{true, split_status::SPLITTING, OLD_PARTITION_COUNT - 1, split_status::NOT_SPLIT}, + {false, split_status::PAUSING, -1, split_status::NOT_SPLIT}, + {false, split_status::PAUSING, OLD_PARTITION_COUNT - 1, split_status::SPLITTING}, + {false, split_status::CANCELING, OLD_PARTITION_COUNT - 1, split_status::NOT_SPLIT}, + {false, split_status::CANCELING, -1, split_status::SPLITTING}, + {false, split_status::PAUSED, OLD_PARTITION_COUNT - 1, split_status::NOT_SPLIT}}; for (const auto &test : tests) { mock_parent_primary_configuration(test.lack_of_secondary); + if (test.old_split_status == split_status::SPLITTING) { + mock_child_split_context(true, true); + mock_primary_parent_split_context(true); + } test_trigger_primary_parent_split( test.meta_split_status, test.old_split_status, test.old_partition_version); - ASSERT_EQ(parent_get_split_status(), test.expected_split_status); - // TODO(heyuchen): add other check + ASSERT_EQ(_parent_split_mgr->get_partition_version(), OLD_PARTITION_COUNT - 1); + ASSERT_FALSE(parent_sync_send_write_request()); + if (test.old_split_status == split_status::SPLITTING) { + _child_replica->tracker()->wait_outstanding_tasks(); + ASSERT_EQ(_child_replica->status(), partition_status::PS_ERROR); + } } } @@ -787,13 +847,22 @@ TEST_F(replica_split_test, secondary_handle_split_test) // - secondary parent update partition_count // - meta splitting with local not_split(See parent_start_split_tests) // - meta splitting with local splitting(See parent_start_split_tests) - // TODO(heyuchen): add more cases + // - meta pausing with local splitting + // - meta canceling with local not_split + // - meta canceling with local splitting + // - meta paused with local not_split struct trigger_secondary_parent_split_test { split_status::type meta_split_status; split_status::type local_split_status; int32_t expected_partition_version; - } tests[]{{split_status::NOT_SPLIT, split_status::SPLITTING, NEW_PARTITION_COUNT - 1}}; + } tests[]{ + {split_status::PAUSING, split_status::NOT_SPLIT, OLD_PARTITION_COUNT - 1}, + {split_status::PAUSING, split_status::SPLITTING, OLD_PARTITION_COUNT - 1}, + {split_status::CANCELING, split_status::NOT_SPLIT, OLD_PARTITION_COUNT - 1}, + {split_status::CANCELING, split_status::SPLITTING, OLD_PARTITION_COUNT - 1}, + {split_status::NOT_SPLIT, split_status::SPLITTING, NEW_PARTITION_COUNT - 1}, + }; for (auto test : tests) { if (test.local_split_status == split_status::SPLITTING) { @@ -805,6 +874,47 @@ TEST_F(replica_split_test, secondary_handle_split_test) ASSERT_EQ(resp.err, ERR_OK); ASSERT_TRUE(is_parent_not_in_split()); ASSERT_EQ(_parent_split_mgr->get_partition_version(), test.expected_partition_version); + if (test.meta_split_status == split_status::PAUSING || + test.meta_split_status == split_status::CANCELING) { + ASSERT_TRUE(resp.__isset.is_split_stopped); + ASSERT_TRUE(resp.is_split_stopped); + if (test.local_split_status == split_status::SPLITTING) { + _child_replica->tracker()->wait_outstanding_tasks(); + ASSERT_EQ(_child_replica->status(), partition_status::PS_ERROR); + } + } + } +} + +TEST_F(replica_split_test, primary_parent_handle_stop_test) +{ + fail::cfg("replica_parent_send_notify_stop_request", "return()"); + // Test cases: + // - not_splitting request + // - splitting request + // - pausing request with lack of secondary + // - canceling request with not all secondary + // - group all paused + // - group all canceled + struct primary_parent_handle_stop_test + { + split_status::type meta_split_status; + bool lack_of_secondary; + bool will_all_stop; + int32_t expected_size; + bool expected_all_stopped; + } tests[]{{split_status::NOT_SPLIT, false, false, 0, false}, + {split_status::SPLITTING, false, false, 0, false}, + {split_status::PAUSING, true, false, 1, false}, + {split_status::CANCELING, false, false, 1, false}, + {split_status::PAUSING, false, true, 0, true}, + {split_status::CANCELING, false, true, 0, true}}; + + for (auto test : tests) { + test_primary_parent_handle_stop_split( + test.meta_split_status, test.lack_of_secondary, test.will_all_stop); + ASSERT_EQ(parent_stopped_split_size(), test.expected_size); + ASSERT_EQ(primary_parent_not_in_split(), test.expected_all_stopped); } } diff --git a/src/replication.thrift b/src/replication.thrift index 04749631bc..d1dd23ce9f 100644 --- a/src/replication.thrift +++ b/src/replication.thrift @@ -197,6 +197,9 @@ struct group_check_response 5:learner_status learner_status_ = learner_status.LearningInvalid; 6:i64 learner_signature; 7:dsn.rpc_address node; + // Used for pause or cancel partition split + // if secondary pause or cancel split succeed, is_split_stopped = true + 8:optional bool is_split_stopped; } /////////////////// meta server messages ////////////////////