From 997c2dc8497bf2bc7a5667e1dd38949b3a4acb66 Mon Sep 17 00:00:00 2001 From: Jiashuo Date: Thu, 24 Mar 2022 15:26:30 +0800 Subject: [PATCH] fix(dup_enhancement#21): add batch replay log to support batch send mutation (#1080) --- .../dsn/dist/replication/duplication_common.h | 3 +++ src/common/duplication_common.cpp | 6 +++++ .../duplication/load_from_private_log.cpp | 25 +++++++++++++------ src/replica/duplication/mutation_batch.cpp | 10 +++++--- src/replica/duplication/mutation_batch.h | 7 ++++-- .../duplication/replica_duplicator.cpp | 12 ++++++--- src/replica/duplication/replica_duplicator.h | 2 ++ .../replica_duplicator_manager.cpp | 10 ++++++-- .../duplication/replica_duplicator_manager.h | 2 ++ .../duplication/test/mutation_batch_test.cpp | 21 ++++++++++------ src/replica/replica_stub.cpp | 2 +- src/replica/test/replica_test.cpp | 1 + 12 files changed, 74 insertions(+), 27 deletions(-) diff --git a/include/dsn/dist/replication/duplication_common.h b/include/dsn/dist/replication/duplication_common.h index 88800be8b3..ff84e13186 100644 --- a/include/dsn/dist/replication/duplication_common.h +++ b/include/dsn/dist/replication/duplication_common.h @@ -22,10 +22,13 @@ #include #include #include +#include namespace dsn { namespace replication { +DSN_DECLARE_uint32(duplicate_log_batch_bytes); + typedef rpc_holder duplication_modify_rpc; typedef rpc_holder duplication_add_rpc; typedef rpc_holder duplication_query_rpc; diff --git a/src/common/duplication_common.cpp b/src/common/duplication_common.cpp index b65c41541d..9184bcee16 100644 --- a/src/common/duplication_common.cpp +++ b/src/common/duplication_common.cpp @@ -25,6 +25,12 @@ namespace dsn { namespace replication { +DSN_DEFINE_uint32("replication", + duplicate_log_batch_bytes, + 4096, + "send mutation log batch bytes size per rpc"); +DSN_TAG_VARIABLE(duplicate_log_batch_bytes, FT_MUTABLE); + const std::string duplication_constants::kDuplicationCheckpointRootDir /*NOLINT*/ = "duplication"; const std::string duplication_constants::kClustersSectionName /*NOLINT*/ = "pegasus.clusters"; const std::string duplication_constants::kDuplicationEnvMasterClusterKey /*NOLINT*/ = diff --git a/src/replica/duplication/load_from_private_log.cpp b/src/replica/duplication/load_from_private_log.cpp index 28b0e22e62..9046591635 100644 --- a/src/replica/duplication/load_from_private_log.cpp +++ b/src/replica/duplication/load_from_private_log.cpp @@ -169,12 +169,7 @@ void load_from_private_log::replay_log_block() }, _start_offset, _current_global_end_offset); - if (!err.is_ok()) { - if (err.code() == ERR_HANDLE_EOF && switch_to_next_log_file()) { - repeat(); - return; - } - + if (!err.is_ok() && err.code() != ERR_HANDLE_EOF) { // Error handling on loading failure: // - If block loading failed for `MAX_ALLOWED_REPEATS` times, it restarts reading the file. // - If file loading failed for `MAX_ALLOWED_FILE_REPEATS` times, which means it @@ -220,9 +215,23 @@ void load_from_private_log::replay_log_block() return; } - _start_offset = static_cast(_current_global_end_offset - _current->start_offset()); - + if (err.is_ok()) { + _start_offset = static_cast(_current_global_end_offset - _current->start_offset()); + if (_mutation_batch.bytes() < FLAGS_duplicate_log_batch_bytes) { + repeat(); + return; + } + } else if (switch_to_next_log_file()) { + // !err.is_ok() means that err.code() == ERR_HANDLE_EOF, the current file read completed and + // try next file + repeat(); + return; + } // update last_decree even for empty batch. + // case1: err.is_ok(err.code() != ERR_HANDLE_EOF), but _mutation_batch.bytes() >= + // FLAGS_duplicate_log_batch_bytes + // case2: !err.is_ok(err.code() == ERR_HANDLE_EOF) and no next file, need commit the last + // mutations() step_down_next_stage(_mutation_batch.last_decree(), _mutation_batch.move_all_mutations()); } diff --git a/src/replica/duplication/mutation_batch.cpp b/src/replica/duplication/mutation_batch.cpp index 6d1dd7c287..473ec7e1e6 100644 --- a/src/replica/duplication/mutation_batch.cpp +++ b/src/replica/duplication/mutation_batch.cpp @@ -122,6 +122,7 @@ mutation_tuple_set mutation_batch::move_all_mutations() { // free the internal space _mutation_buffer->truncate(last_decree()); + _total_bytes = 0; return std::move(_loaded_mutations); } @@ -135,15 +136,14 @@ mutation_batch::mutation_batch(replica_duplicator *r) : replica_base(r) _mutation_buffer = make_unique(&base, 0, PREPARE_LIST_NUM_ENTRIES, [this](mutation_ptr &mu) { // committer - add_mutation_if_valid(mu, _loaded_mutations, _start_decree); + add_mutation_if_valid(mu, _start_decree); }); // start duplication from confirmed_decree _mutation_buffer->reset(r->progress().confirmed_decree); } -/*extern*/ void -add_mutation_if_valid(mutation_ptr &mu, mutation_tuple_set &mutations, decree start_decree) +void mutation_batch::add_mutation_if_valid(mutation_ptr &mu, decree start_decree) { if (mu->get_decree() < start_decree) { // ignore @@ -169,7 +169,9 @@ add_mutation_if_valid(mutation_ptr &mu, mutation_tuple_set &mutations, decree st bb = blob::create_from_bytes(update.data.data(), update.data.length()); } - mutations.emplace(std::make_tuple(mu->data.header.timestamp, update.code, std::move(bb))); + _total_bytes += bb.length(); + _loaded_mutations.emplace( + std::make_tuple(mu->data.header.timestamp, update.code, std::move(bb))); } } diff --git a/src/replica/duplication/mutation_batch.h b/src/replica/duplication/mutation_batch.h index 0fa10e0b05..5eaf700446 100644 --- a/src/replica/duplication/mutation_batch.h +++ b/src/replica/duplication/mutation_batch.h @@ -53,6 +53,8 @@ class mutation_batch : replica_base error_s add(mutation_ptr mu); + void add_mutation_if_valid(mutation_ptr &, decree start_decree); + mutation_tuple_set move_all_mutations(); decree last_decree() const; @@ -64,6 +66,8 @@ class mutation_batch : replica_base size_t size() const { return _loaded_mutations.size(); } + uint64_t bytes() const { return _total_bytes; } + private: friend class replica_duplicator_test; friend class mutation_batch_test; @@ -71,12 +75,11 @@ class mutation_batch : replica_base std::unique_ptr _mutation_buffer; mutation_tuple_set _loaded_mutations; decree _start_decree{invalid_decree}; + uint64_t _total_bytes{0}; }; using mutation_batch_u_ptr = std::unique_ptr; /// Extract mutations into mutation_tuple_set if they are not WRITE_EMPTY. -extern void add_mutation_if_valid(mutation_ptr &, mutation_tuple_set &, decree start_decree); - } // namespace replication } // namespace dsn diff --git a/src/replica/duplication/replica_duplicator.cpp b/src/replica/duplication/replica_duplicator.cpp index 2f22a82d86..c6d7704491 100644 --- a/src/replica/duplication/replica_duplicator.cpp +++ b/src/replica/duplication/replica_duplicator.cpp @@ -45,11 +45,17 @@ replica_duplicator::replica_duplicator(const duplication_entry &ent, replica *r) } else { _progress.last_decree = _progress.confirmed_decree = it->second; } - ddebug_replica( - "initialize replica_duplicator [dupid:{}, meta_confirmed_decree:{}]", id(), it->second); + ddebug_replica("initialize replica_duplicator[{}] [dupid:{}, meta_confirmed_decree:{}]", + duplication_status_to_string(_status), + id(), + it->second); thread_pool(LPC_REPLICATION_LOW).task_tracker(tracker()).thread_hash(get_gpid().thread_hash()); - prepare_dup(); + if (_status == duplication_status::DS_PREPARE) { + prepare_dup(); + } else if (_status == duplication_status::DS_LOG) { + start_dup_log(); + } } void replica_duplicator::prepare_dup() diff --git a/src/replica/duplication/replica_duplicator.h b/src/replica/duplication/replica_duplicator.h index 7d72272e3f..2f5d20a3fb 100644 --- a/src/replica/duplication/replica_duplicator.h +++ b/src/replica/duplication/replica_duplicator.h @@ -130,6 +130,8 @@ class replica_duplicator : public replica_base, public pipeline::base // For metric "dup.pending_mutations_count" uint64_t get_pending_mutations_count() const; + duplication_status::type status() const { return _status; }; + private: friend class duplication_test_base; friend class replica_duplicator_test; diff --git a/src/replica/duplication/replica_duplicator_manager.cpp b/src/replica/duplication/replica_duplicator_manager.cpp index c6a9b80f3f..95d89c38cf 100644 --- a/src/replica/duplication/replica_duplicator_manager.cpp +++ b/src/replica/duplication/replica_duplicator_manager.cpp @@ -32,8 +32,14 @@ replica_duplicator_manager::get_duplication_confirms_to_update() const for (const auto &kv : _duplications) { replica_duplicator *duplicator = kv.second.get(); duplication_progress p = duplicator->progress(); - if (p.last_decree != p.confirmed_decree && p.checkpoint_has_prepared) { - dcheck_gt_replica(p.last_decree, p.confirmed_decree); + if (p.last_decree != p.confirmed_decree || + (kv.second->status() == duplication_status::DS_PREPARE && p.checkpoint_has_prepared)) { + if (p.last_decree < p.confirmed_decree) { + derror_replica("invalid decree state: p.last_decree({}) < p.confirmed_decree({})", + p.last_decree, + p.confirmed_decree); + continue; + } duplication_confirm_entry entry; entry.dupid = duplicator->id(); entry.confirmed_decree = p.last_decree; diff --git a/src/replica/duplication/replica_duplicator_manager.h b/src/replica/duplication/replica_duplicator_manager.h index 18b396bd9d..5cd00ba4b8 100644 --- a/src/replica/duplication/replica_duplicator_manager.h +++ b/src/replica/duplication/replica_duplicator_manager.h @@ -94,6 +94,8 @@ class replica_duplicator_manager : public replica_base if (_duplications.empty()) return; + dwarn_replica("remove all duplication, replica status = {}", + enum_to_string(_replica->status())); _duplications.clear(); } diff --git a/src/replica/duplication/test/mutation_batch_test.cpp b/src/replica/duplication/test/mutation_batch_test.cpp index cb8431fe89..419ae245a6 100644 --- a/src/replica/duplication/test/mutation_batch_test.cpp +++ b/src/replica/duplication/test/mutation_batch_test.cpp @@ -40,18 +40,21 @@ class mutation_batch_test : public duplication_test_base TEST_F(mutation_batch_test, add_mutation_if_valid) { + auto duplicator = create_test_duplicator(0); + mutation_batch batcher(duplicator.get()); + mutation_tuple_set result; std::string s = "hello"; mutation_ptr mu1 = create_test_mutation(1, s); - add_mutation_if_valid(mu1, result, 0); + batcher.add_mutation_if_valid(mu1, 0); + result = batcher.move_all_mutations(); mutation_tuple mt1 = *result.begin(); - result.clear(); - s = "world"; mutation_ptr mu2 = create_test_mutation(2, s); - add_mutation_if_valid(mu2, result, 0); + batcher.add_mutation_if_valid(mu2, 0); + result = batcher.move_all_mutations(); mutation_tuple mt2 = *result.begin(); ASSERT_EQ(std::get<2>(mt1).to_string(), "hello"); @@ -59,18 +62,22 @@ TEST_F(mutation_batch_test, add_mutation_if_valid) // decree 1 should be ignored mutation_ptr mu3 = create_test_mutation(1, s); - add_mutation_if_valid(mu2, result, 2); + batcher.add_mutation_if_valid(mu2, 2); + batcher.add_mutation_if_valid(mu3, 1); + result = batcher.move_all_mutations(); ASSERT_EQ(result.size(), 2); } TEST_F(mutation_batch_test, ignore_non_idempotent_write) { - mutation_tuple_set result; + auto duplicator = create_test_duplicator(0); + mutation_batch batcher(duplicator.get()); std::string s = "hello"; mutation_ptr mu = create_test_mutation(1, s); mu->data.updates[0].code = RPC_DUPLICATION_NON_IDEMPOTENT_WRITE; - add_mutation_if_valid(mu, result, 0); + batcher.add_mutation_if_valid(mu, 0); + mutation_tuple_set result = batcher.move_all_mutations(); ASSERT_EQ(result.size(), 0); } diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 321b08c34b..161324b518 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -71,7 +71,7 @@ DSN_DEFINE_bool("replication", DSN_DEFINE_uint32("replication", max_concurrent_manual_emergency_checkpointing_count, - 1, + 10, "max concurrent manual emergency checkpoint running count"); DSN_TAG_VARIABLE(max_concurrent_manual_emergency_checkpointing_count, FT_MUTABLE); diff --git a/src/replica/test/replica_test.cpp b/src/replica/test/replica_test.cpp index b4c18103da..925a06bde7 100644 --- a/src/replica/test/replica_test.cpp +++ b/src/replica/test/replica_test.cpp @@ -365,6 +365,7 @@ TEST_F(replica_test, test_trigger_manual_emergency_checkpoint) // test exceed max concurrent count ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(101), ERR_OK); force_update_checkpointing(false); + FLAGS_max_concurrent_manual_emergency_checkpointing_count = 1; ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(101), ERR_TRY_AGAIN); ASSERT_FALSE(is_checkpointing()); _mock_replica->tracker()->wait_outstanding_tasks();