From c1dddd839c7d177ce1608125305c066e88884611 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Thu, 6 Feb 2020 15:54:42 +0800 Subject: [PATCH 1/8] feat(dup): add metrics for duplication --- .../replication/common/replication_common.cpp | 5 +-- .../lib/duplication/duplication_pipeline.cpp | 1 + .../duplication/duplication_sync_timer.cpp | 4 +++ .../lib/duplication/load_from_private_log.cpp | 20 ++++++----- .../lib/duplication/replica_duplicator.cpp | 13 ++++++++ .../lib/duplication/replica_duplicator.h | 3 ++ .../replica_duplicator_manager.cpp | 9 +++++ .../duplication/replica_duplicator_manager.h | 4 +++ src/dist/replication/lib/replica_stub.cpp | 33 +++++++++++++++++++ src/dist/replication/lib/replica_stub.h | 15 +++++++-- 10 files changed, 91 insertions(+), 16 deletions(-) diff --git a/src/dist/replication/common/replication_common.cpp b/src/dist/replication/common/replication_common.cpp index 369d70dd0b..fa54b72600 100644 --- a/src/dist/replication/common/replication_common.cpp +++ b/src/dist/replication/common/replication_common.cpp @@ -49,7 +49,7 @@ replication_options::replication_options() delay_for_fd_timeout_on_start = false; empty_write_disabled = false; allow_non_idempotent_write = false; - duplication_disabled = true; + duplication_disabled = false; prepare_timeout_ms_for_secondaries = 1000; prepare_timeout_ms_for_potential_secondaries = 3000; @@ -273,9 +273,6 @@ void replication_options::initialize() duplication_disabled = dsn_config_get_value_bool( "replication", "duplication_disabled", duplication_disabled, "is duplication disabled"); - if (allow_non_idempotent_write && !duplication_disabled) { - dassert(false, "duplication and non-idempotent write cannot be enabled together"); - } prepare_timeout_ms_for_secondaries = (int)dsn_config_get_value_uint64( "replication", diff --git a/src/dist/replication/lib/duplication/duplication_pipeline.cpp b/src/dist/replication/lib/duplication/duplication_pipeline.cpp index 52bac1c7fc..50a40fe687 100644 --- a/src/dist/replication/lib/duplication/duplication_pipeline.cpp +++ b/src/dist/replication/lib/duplication/duplication_pipeline.cpp @@ -55,6 +55,7 @@ void ship_mutation::ship(mutation_tuple_set &&in) { _mutation_duplicator->duplicate(std::move(in), [this](size_t total_shipped_size) mutable { update_progress(); + _stub->_counter_dup_shipped_size_in_bytes_rate->add(total_shipped_size); step_down_next_stage(); }); } diff --git a/src/dist/replication/lib/duplication/duplication_sync_timer.cpp b/src/dist/replication/lib/duplication/duplication_sync_timer.cpp index 2de0f0a5d2..e3e701ba1b 100644 --- a/src/dist/replication/lib/duplication/duplication_sync_timer.cpp +++ b/src/dist/replication/lib/duplication/duplication_sync_timer.cpp @@ -39,12 +39,16 @@ void duplication_sync_timer::run() req->node = _stub->primary_address(); // collects confirm points from all primaries on this server + int64_t pending_muts_cnt = 0; for (const replica_ptr &r : get_all_primaries()) { auto confirmed = r->get_duplication_manager()->get_duplication_confirms_to_update(); if (!confirmed.empty()) { req->confirm_list[r->get_gpid()] = std::move(confirmed); } + pending_muts_cnt += r->get_duplication_manager()->get_pending_mutations_count(); } + dcheck_ge(pending_muts_cnt, 0); + _stub->_counter_dup_pending_mutations_count->set(pending_muts_cnt); duplication_sync_rpc rpc(std::move(req), RPC_CM_DUPLICATION_SYNC, 3_s); rpc_address meta_server_address(_stub->get_meta_server_address()); diff --git a/src/dist/replication/lib/duplication/load_from_private_log.cpp b/src/dist/replication/lib/duplication/load_from_private_log.cpp index f7acb9447b..4616a40a34 100644 --- a/src/dist/replication/lib/duplication/load_from_private_log.cpp +++ b/src/dist/replication/lib/duplication/load_from_private_log.cpp @@ -103,15 +103,17 @@ void load_from_private_log::find_log_file_to_start(std::map l void load_from_private_log::replay_log_block() { - error_s err = - mutation_log::replay_block(_current, - [this](int log_bytes_length, mutation_ptr &mu) -> bool { - auto es = _mutation_batch.add(std::move(mu)); - dassert_replica(es.is_ok(), es.description()); - return true; - }, - _start_offset, - _current_global_end_offset); + error_s err = mutation_log::replay_block( + _current, + [this](int log_bytes_length, mutation_ptr &mu) -> bool { + auto es = _mutation_batch.add(std::move(mu)); + dassert_replica(es.is_ok(), es.description()); + _stub->_counter_dup_log_read_in_bytes_rate->add(log_bytes_length); + _stub->_counter_dup_log_mutations_read_rate->increment(); + return true; + }, + _start_offset, + _current_global_end_offset); if (!err.is_ok()) { if (err.code() == ERR_HANDLE_EOF && switch_to_next_log_file()) { repeat(); diff --git a/src/dist/replication/lib/duplication/replica_duplicator.cpp b/src/dist/replication/lib/duplication/replica_duplicator.cpp index 5807985eab..4b4b937839 100644 --- a/src/dist/replication/lib/duplication/replica_duplicator.cpp +++ b/src/dist/replication/lib/duplication/replica_duplicator.cpp @@ -129,6 +129,7 @@ error_s replica_duplicator::update_progress(const duplication_progress &p) _progress.confirmed_decree); } + decree last_confirmed_decree = _progress.confirmed_decree; _progress.confirmed_decree = std::max(_progress.confirmed_decree, p.confirmed_decree); _progress.last_decree = std::max(_progress.last_decree, p.last_decree); @@ -138,6 +139,10 @@ error_s replica_duplicator::update_progress(const duplication_progress &p) _progress.last_decree, _progress.confirmed_decree); } + if (_progress.confirmed_decree > last_confirmed_decree) { // progress advances + _stub->_counter_dup_confirmed_rate->add(_progress.confirmed_decree - last_confirmed_decree); + } + return error_s::ok(); } @@ -160,5 +165,13 @@ decree replica_duplicator::get_max_gced_decree() const return _replica->private_log()->max_gced_decree(_replica->get_gpid()); } +int64_t replica_duplicator::get_pending_mutations_count() const +{ + // it's not atomic to read last_committed_decree in not-REPLICATION thread pool, + // but enough for approximate statistic. + int64_t cnt = _replica->last_committed_decree() - progress().last_decree; + return cnt > 0 ? cnt : 0; +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/lib/duplication/replica_duplicator.h b/src/dist/replication/lib/duplication/replica_duplicator.h index ce937469a3..b4e27c0202 100644 --- a/src/dist/replication/lib/duplication/replica_duplicator.h +++ b/src/dist/replication/lib/duplication/replica_duplicator.h @@ -101,6 +101,9 @@ class replica_duplicator : public replica_base, public pipeline::base decree get_max_gced_decree() const; + // For metric "dup.pending_mutations_count" + int64_t get_pending_mutations_count() const; + private: friend class replica_duplicator_test; friend class duplication_sync_timer_test; diff --git a/src/dist/replication/lib/duplication/replica_duplicator_manager.cpp b/src/dist/replication/lib/duplication/replica_duplicator_manager.cpp index 39bc84a8cd..75ebadc7e5 100644 --- a/src/dist/replication/lib/duplication/replica_duplicator_manager.cpp +++ b/src/dist/replication/lib/duplication/replica_duplicator_manager.cpp @@ -119,5 +119,14 @@ void replica_duplicator_manager::update_confirmed_decree_if_secondary(decree con } } +int64_t replica_duplicator_manager::get_pending_mutations_count() const +{ + int64_t total = 0; + for (auto &dup : _duplications) { + total += dup.second->get_pending_mutations_count(); + } + return total; +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/lib/duplication/replica_duplicator_manager.h b/src/dist/replication/lib/duplication/replica_duplicator_manager.h index 28e06b808c..6c0eabb00b 100644 --- a/src/dist/replication/lib/duplication/replica_duplicator_manager.h +++ b/src/dist/replication/lib/duplication/replica_duplicator_manager.h @@ -54,6 +54,10 @@ class replica_duplicator_manager : public replica_base /// \see replica_check.cpp void update_confirmed_decree_if_secondary(decree confirmed); + /// Sums up the number of pending mutations for all duplications + /// on this replica, for metric "dup.pending_mutations_count". + int64_t get_pending_mutations_count() const; + private: void sync_duplication(const duplication_entry &ent); diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index e94591571f..ff5480ab84 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -219,6 +219,39 @@ void replica_stub::install_perf_counters() COUNTER_TYPE_VOLATILE_NUMBER, "trigger emergency checkpoint count in the recent period"); + // <- Duplication Metrics -> + + _counter_dup_log_read_in_bytes_rate.init_app_counter("eon.replica_stub", + "dup.log_read_in_bytes_rate", + COUNTER_TYPE_RATE, + "reading rate of private log in bytes"); + _counter_dup_log_mutations_read_rate.init_app_counter( + "eon.replica_stub", + "dup.log_mutations_read_rate", + COUNTER_TYPE_RATE, + "reading rate of mutations from private log"); + _counter_dup_shipped_size_in_bytes_rate.init_app_counter( + "eon.replica_stub", + "dup.shipped_size_in_bytes_rate", + COUNTER_TYPE_RATE, + "shipping rate of private log in bytes"); + _counter_dup_confirmed_rate.init_app_counter("eon.replica_stub", + "dup.confirmed_rate", + COUNTER_TYPE_RATE, + "increasing rate of confirmed mutations"); + _counter_dup_pending_mutations_count.init_app_counter( + "eon.replica_stub", + "dup.pending_mutations_count", + COUNTER_TYPE_VOLATILE_NUMBER, + "number of mutations pending for duplication"); + _counter_dup_time_lag.init_app_counter( + "eon.replica_stub", + "dup.time_lag(ms)", + COUNTER_TYPE_NUMBER_PERCENTILES, + "time (in ms) lag between master and slave in the duplication"); + + // <- Cold Backup Metrics -> + _counter_cold_backup_running_count.init_app_counter("eon.replica_stub", "cold.backup.running.count", COUNTER_TYPE_NUMBER, diff --git a/src/dist/replication/lib/replica_stub.h b/src/dist/replication/lib/replica_stub.h index 5fa505d46f..1b18445e62 100644 --- a/src/dist/replication/lib/replica_stub.h +++ b/src/dist/replication/lib/replica_stub.h @@ -368,6 +368,16 @@ class replica_stub : public serverlet, public ref_counter perf_counter_wrapper _counter_shared_log_recent_write_size; perf_counter_wrapper _counter_recent_trigger_emergency_checkpoint_count; + // <- Duplication Metrics -> + // TODO(wutao1): calculate the counters independently for each remote cluster + // if we need to duplicate to multiple clusters someday. + perf_counter_wrapper _counter_dup_log_read_in_bytes_rate; + perf_counter_wrapper _counter_dup_log_mutations_read_rate; + perf_counter_wrapper _counter_dup_shipped_size_in_bytes_rate; + perf_counter_wrapper _counter_dup_confirmed_rate; + perf_counter_wrapper _counter_dup_pending_mutations_count; + perf_counter_wrapper _counter_dup_time_lag; + perf_counter_wrapper _counter_cold_backup_running_count; perf_counter_wrapper _counter_cold_backup_recent_start_count; perf_counter_wrapper _counter_cold_backup_recent_succ_count; @@ -387,6 +397,5 @@ class replica_stub : public serverlet, public ref_counter dsn::task_tracker _tracker; }; -//------------ inline impl ---------------------- -} -} // namespace +} // namespace replication +} // namespace dsn From 1c10de512bec5fe6a0459106780484c85245fce5 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Wed, 12 Feb 2020 19:41:20 +0800 Subject: [PATCH 2/8] fix name --- src/dist/replication/lib/duplication/duplication_pipeline.cpp | 2 +- .../replication/lib/duplication/duplication_sync_timer.cpp | 3 +-- src/dist/replication/lib/duplication/replica_duplicator.cpp | 4 ++-- src/dist/replication/lib/duplication/replica_duplicator.h | 2 +- src/dist/replication/lib/replica_stub.cpp | 4 ++-- src/dist/replication/lib/replica_stub.h | 2 +- 6 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/dist/replication/lib/duplication/duplication_pipeline.cpp b/src/dist/replication/lib/duplication/duplication_pipeline.cpp index 50a40fe687..f92a342df7 100644 --- a/src/dist/replication/lib/duplication/duplication_pipeline.cpp +++ b/src/dist/replication/lib/duplication/duplication_pipeline.cpp @@ -55,7 +55,7 @@ void ship_mutation::ship(mutation_tuple_set &&in) { _mutation_duplicator->duplicate(std::move(in), [this](size_t total_shipped_size) mutable { update_progress(); - _stub->_counter_dup_shipped_size_in_bytes_rate->add(total_shipped_size); + _stub->_counter_dup_shipped_bytes_rate->add(total_shipped_size); step_down_next_stage(); }); } diff --git a/src/dist/replication/lib/duplication/duplication_sync_timer.cpp b/src/dist/replication/lib/duplication/duplication_sync_timer.cpp index e3e701ba1b..cfbd142280 100644 --- a/src/dist/replication/lib/duplication/duplication_sync_timer.cpp +++ b/src/dist/replication/lib/duplication/duplication_sync_timer.cpp @@ -39,7 +39,7 @@ void duplication_sync_timer::run() req->node = _stub->primary_address(); // collects confirm points from all primaries on this server - int64_t pending_muts_cnt = 0; + uint64_t pending_muts_cnt = 0; for (const replica_ptr &r : get_all_primaries()) { auto confirmed = r->get_duplication_manager()->get_duplication_confirms_to_update(); if (!confirmed.empty()) { @@ -47,7 +47,6 @@ void duplication_sync_timer::run() } pending_muts_cnt += r->get_duplication_manager()->get_pending_mutations_count(); } - dcheck_ge(pending_muts_cnt, 0); _stub->_counter_dup_pending_mutations_count->set(pending_muts_cnt); duplication_sync_rpc rpc(std::move(req), RPC_CM_DUPLICATION_SYNC, 3_s); diff --git a/src/dist/replication/lib/duplication/replica_duplicator.cpp b/src/dist/replication/lib/duplication/replica_duplicator.cpp index 4b4b937839..70decce194 100644 --- a/src/dist/replication/lib/duplication/replica_duplicator.cpp +++ b/src/dist/replication/lib/duplication/replica_duplicator.cpp @@ -165,11 +165,11 @@ decree replica_duplicator::get_max_gced_decree() const return _replica->private_log()->max_gced_decree(_replica->get_gpid()); } -int64_t replica_duplicator::get_pending_mutations_count() const +uint64_t replica_duplicator::get_pending_mutations_count() const { // it's not atomic to read last_committed_decree in not-REPLICATION thread pool, // but enough for approximate statistic. - int64_t cnt = _replica->last_committed_decree() - progress().last_decree; + uint64_t cnt = _replica->last_committed_decree() - progress().last_decree; return cnt > 0 ? cnt : 0; } diff --git a/src/dist/replication/lib/duplication/replica_duplicator.h b/src/dist/replication/lib/duplication/replica_duplicator.h index b4e27c0202..b7dbdccf4b 100644 --- a/src/dist/replication/lib/duplication/replica_duplicator.h +++ b/src/dist/replication/lib/duplication/replica_duplicator.h @@ -102,7 +102,7 @@ class replica_duplicator : public replica_base, public pipeline::base decree get_max_gced_decree() const; // For metric "dup.pending_mutations_count" - int64_t get_pending_mutations_count() const; + uint64_t get_pending_mutations_count() const; private: friend class replica_duplicator_test; diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index ff5480ab84..518fd83f2c 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -230,9 +230,9 @@ void replica_stub::install_perf_counters() "dup.log_mutations_read_rate", COUNTER_TYPE_RATE, "reading rate of mutations from private log"); - _counter_dup_shipped_size_in_bytes_rate.init_app_counter( + _counter_dup_shipped_bytes_rate.init_app_counter( "eon.replica_stub", - "dup.shipped_size_in_bytes_rate", + "dup.shipped_bytes_rate", COUNTER_TYPE_RATE, "shipping rate of private log in bytes"); _counter_dup_confirmed_rate.init_app_counter("eon.replica_stub", diff --git a/src/dist/replication/lib/replica_stub.h b/src/dist/replication/lib/replica_stub.h index 1b18445e62..bc86d207c7 100644 --- a/src/dist/replication/lib/replica_stub.h +++ b/src/dist/replication/lib/replica_stub.h @@ -373,7 +373,7 @@ class replica_stub : public serverlet, public ref_counter // if we need to duplicate to multiple clusters someday. perf_counter_wrapper _counter_dup_log_read_in_bytes_rate; perf_counter_wrapper _counter_dup_log_mutations_read_rate; - perf_counter_wrapper _counter_dup_shipped_size_in_bytes_rate; + perf_counter_wrapper _counter_dup_shipped_bytes_rate; perf_counter_wrapper _counter_dup_confirmed_rate; perf_counter_wrapper _counter_dup_pending_mutations_count; perf_counter_wrapper _counter_dup_time_lag; From c819cfdfe4650b955eec1b89bcf387402063fe3b Mon Sep 17 00:00:00 2001 From: neverchanje Date: Thu, 13 Feb 2020 11:42:33 +0800 Subject: [PATCH 3/8] fix name --- .../replication/lib/duplication/load_from_private_log.cpp | 2 +- src/dist/replication/lib/replica_stub.cpp | 4 ++-- src/dist/replication/lib/replica_stub.h | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/dist/replication/lib/duplication/load_from_private_log.cpp b/src/dist/replication/lib/duplication/load_from_private_log.cpp index 4616a40a34..054d65cdc1 100644 --- a/src/dist/replication/lib/duplication/load_from_private_log.cpp +++ b/src/dist/replication/lib/duplication/load_from_private_log.cpp @@ -108,7 +108,7 @@ void load_from_private_log::replay_log_block() [this](int log_bytes_length, mutation_ptr &mu) -> bool { auto es = _mutation_batch.add(std::move(mu)); dassert_replica(es.is_ok(), es.description()); - _stub->_counter_dup_log_read_in_bytes_rate->add(log_bytes_length); + _stub->_counter_dup_log_read_bytes_rate->add(log_bytes_length); _stub->_counter_dup_log_mutations_read_rate->increment(); return true; }, diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index 518fd83f2c..a7b02e15de 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -221,8 +221,8 @@ void replica_stub::install_perf_counters() // <- Duplication Metrics -> - _counter_dup_log_read_in_bytes_rate.init_app_counter("eon.replica_stub", - "dup.log_read_in_bytes_rate", + _counter_dup_log_read_bytes_rate.init_app_counter("eon.replica_stub", + "dup.log_read_bytes_rate", COUNTER_TYPE_RATE, "reading rate of private log in bytes"); _counter_dup_log_mutations_read_rate.init_app_counter( diff --git a/src/dist/replication/lib/replica_stub.h b/src/dist/replication/lib/replica_stub.h index bc86d207c7..a4b1c24c6e 100644 --- a/src/dist/replication/lib/replica_stub.h +++ b/src/dist/replication/lib/replica_stub.h @@ -371,7 +371,7 @@ class replica_stub : public serverlet, public ref_counter // <- Duplication Metrics -> // TODO(wutao1): calculate the counters independently for each remote cluster // if we need to duplicate to multiple clusters someday. - perf_counter_wrapper _counter_dup_log_read_in_bytes_rate; + perf_counter_wrapper _counter_dup_log_read_bytes_rate; perf_counter_wrapper _counter_dup_log_mutations_read_rate; perf_counter_wrapper _counter_dup_shipped_bytes_rate; perf_counter_wrapper _counter_dup_confirmed_rate; From 7159ab576926d980deb5e4777ff1b960e3ed7724 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Thu, 13 Feb 2020 11:50:33 +0800 Subject: [PATCH 4/8] format --- src/dist/replication/lib/replica_stub.cpp | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index a7b02e15de..b7f41417a5 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -222,19 +222,18 @@ void replica_stub::install_perf_counters() // <- Duplication Metrics -> _counter_dup_log_read_bytes_rate.init_app_counter("eon.replica_stub", - "dup.log_read_bytes_rate", - COUNTER_TYPE_RATE, - "reading rate of private log in bytes"); + "dup.log_read_bytes_rate", + COUNTER_TYPE_RATE, + "reading rate of private log in bytes"); _counter_dup_log_mutations_read_rate.init_app_counter( "eon.replica_stub", "dup.log_mutations_read_rate", COUNTER_TYPE_RATE, "reading rate of mutations from private log"); - _counter_dup_shipped_bytes_rate.init_app_counter( - "eon.replica_stub", - "dup.shipped_bytes_rate", - COUNTER_TYPE_RATE, - "shipping rate of private log in bytes"); + _counter_dup_shipped_bytes_rate.init_app_counter("eon.replica_stub", + "dup.shipped_bytes_rate", + COUNTER_TYPE_RATE, + "shipping rate of private log in bytes"); _counter_dup_confirmed_rate.init_app_counter("eon.replica_stub", "dup.confirmed_rate", COUNTER_TYPE_RATE, From 8864e3237863812fe848ecebfd4f51f31b33b54d Mon Sep 17 00:00:00 2001 From: neverchanje Date: Fri, 14 Feb 2020 00:32:43 +0800 Subject: [PATCH 5/8] fix case601 --- src/dist/replication/test/simple_kv/case-601.act | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/dist/replication/test/simple_kv/case-601.act b/src/dist/replication/test/simple_kv/case-601.act index 643cd33150..c560de2bb9 100644 --- a/src/dist/replication/test/simple_kv/case-601.act +++ b/src/dist/replication/test/simple_kv/case-601.act @@ -3,6 +3,7 @@ set:load_balance_for_test=1,not_exit_on_log_failure=1 +# Initial ballot is 0, after adding two peers, the ballot becomes 2. # wait primary to add r3 as learner state:{{r1,pri,2,0},{r2,sec,2,0},{r3,pot,2,0}} @@ -13,17 +14,16 @@ inject:on_rpc_call:rpc_name=RPC_LEARN_COMPLETION_NOTIFY,from=r3,to=r1 client:begin_write:id=1,key=k1,timeout=0 inject:on_rpc_call:rpc_name=RPC_PREPARE,from=r1,to=r2 -# r1 will remove r2 due to prepare timeout +# r1 will first downgrade to inactive because of prepare timeout, +# then increase its ballot to 3 and upgrade again as a primary. config:{3,r1,[]} -# after r1 update config, it will broad group_check, so r3 +# after r1 update config, it will broadcast group_check, so r3 # will receive group_check as a learner wait:on_rpc_call:rpc_name=RPC_GROUP_CHECK,from=r1,to=r3 -# after r3 receive the group check, it will update its ballot: 2->3. -# r2 will become learner some while later -state:{{r1,pri,3,0},{r2,pot,3,0},{r3,pot,3,0}} +# Again r1 will fail on prepare, and increase ballot 3->4. +config:{4,r1,[r3]} # both r2 & r2 will be secondary eventually config:{5,r1,[r2,r3]} - From 1cdfc6f83d25ae6d2d1ba85341ade9a9744cb790 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Fri, 14 Feb 2020 12:09:58 +0800 Subject: [PATCH 6/8] fix name --- .../replication/lib/duplication/load_from_private_log.cpp | 2 +- src/dist/replication/lib/replica_stub.cpp | 4 ++-- src/dist/replication/lib/replica_stub.h | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/dist/replication/lib/duplication/load_from_private_log.cpp b/src/dist/replication/lib/duplication/load_from_private_log.cpp index 054d65cdc1..f427841529 100644 --- a/src/dist/replication/lib/duplication/load_from_private_log.cpp +++ b/src/dist/replication/lib/duplication/load_from_private_log.cpp @@ -109,7 +109,7 @@ void load_from_private_log::replay_log_block() auto es = _mutation_batch.add(std::move(mu)); dassert_replica(es.is_ok(), es.description()); _stub->_counter_dup_log_read_bytes_rate->add(log_bytes_length); - _stub->_counter_dup_log_mutations_read_rate->increment(); + _stub->_counter_dup_log_read_mutations_rate->increment(); return true; }, _start_offset, diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index b7f41417a5..d0c38f380f 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -225,9 +225,9 @@ void replica_stub::install_perf_counters() "dup.log_read_bytes_rate", COUNTER_TYPE_RATE, "reading rate of private log in bytes"); - _counter_dup_log_mutations_read_rate.init_app_counter( + _counter_dup_log_read_mutations_rate.init_app_counter( "eon.replica_stub", - "dup.log_mutations_read_rate", + "dup.log_read_mutations_rate", COUNTER_TYPE_RATE, "reading rate of mutations from private log"); _counter_dup_shipped_bytes_rate.init_app_counter("eon.replica_stub", diff --git a/src/dist/replication/lib/replica_stub.h b/src/dist/replication/lib/replica_stub.h index a4b1c24c6e..e2a4cc7063 100644 --- a/src/dist/replication/lib/replica_stub.h +++ b/src/dist/replication/lib/replica_stub.h @@ -372,7 +372,7 @@ class replica_stub : public serverlet, public ref_counter // TODO(wutao1): calculate the counters independently for each remote cluster // if we need to duplicate to multiple clusters someday. perf_counter_wrapper _counter_dup_log_read_bytes_rate; - perf_counter_wrapper _counter_dup_log_mutations_read_rate; + perf_counter_wrapper _counter_dup_log_read_mutations_rate; perf_counter_wrapper _counter_dup_shipped_bytes_rate; perf_counter_wrapper _counter_dup_confirmed_rate; perf_counter_wrapper _counter_dup_pending_mutations_count; From c10ed8c3a58d4478fb9ffb1e2667342eb6803cc0 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Tue, 18 Feb 2020 16:54:42 +0800 Subject: [PATCH 7/8] fix review --- src/dist/replication/common/replication_common.cpp | 6 +++--- src/dist/replication/common/replication_common.h | 2 +- .../replication/lib/duplication/replica_duplicator.cpp | 8 +++++--- .../lib/duplication/replica_duplicator_manager.cpp | 2 +- src/dist/replication/lib/replica_stub.cpp | 2 +- src/dist/replication/meta_server/meta_service.cpp | 2 +- .../replication/test/meta_test/unit_test/config-test.ini | 2 +- 7 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/dist/replication/common/replication_common.cpp b/src/dist/replication/common/replication_common.cpp index fa54b72600..7a2e7ce310 100644 --- a/src/dist/replication/common/replication_common.cpp +++ b/src/dist/replication/common/replication_common.cpp @@ -49,7 +49,7 @@ replication_options::replication_options() delay_for_fd_timeout_on_start = false; empty_write_disabled = false; allow_non_idempotent_write = false; - duplication_disabled = false; + duplication_enabled = false; prepare_timeout_ms_for_secondaries = 1000; prepare_timeout_ms_for_potential_secondaries = 3000; @@ -271,8 +271,8 @@ void replication_options::initialize() allow_non_idempotent_write, "whether to allow non-idempotent write, default is false"); - duplication_disabled = dsn_config_get_value_bool( - "replication", "duplication_disabled", duplication_disabled, "is duplication disabled"); + duplication_enabled = dsn_config_get_value_bool( + "replication", "duplication_enabled", duplication_enabled, "is duplication enabled"); prepare_timeout_ms_for_secondaries = (int)dsn_config_get_value_uint64( "replication", diff --git a/src/dist/replication/common/replication_common.h b/src/dist/replication/common/replication_common.h index 148a6f149f..5107484b8b 100644 --- a/src/dist/replication/common/replication_common.h +++ b/src/dist/replication/common/replication_common.h @@ -52,7 +52,7 @@ class replication_options bool delay_for_fd_timeout_on_start; bool empty_write_disabled; bool allow_non_idempotent_write; - bool duplication_disabled; + bool duplication_enabled; int32_t prepare_timeout_ms_for_secondaries; int32_t prepare_timeout_ms_for_potential_secondaries; diff --git a/src/dist/replication/lib/duplication/replica_duplicator.cpp b/src/dist/replication/lib/duplication/replica_duplicator.cpp index 70decce194..071a4592fb 100644 --- a/src/dist/replication/lib/duplication/replica_duplicator.cpp +++ b/src/dist/replication/lib/duplication/replica_duplicator.cpp @@ -139,7 +139,8 @@ error_s replica_duplicator::update_progress(const duplication_progress &p) _progress.last_decree, _progress.confirmed_decree); } - if (_progress.confirmed_decree > last_confirmed_decree) { // progress advances + if (_progress.confirmed_decree > last_confirmed_decree) { + // has confirmed_decree updated. _stub->_counter_dup_confirmed_rate->add(_progress.confirmed_decree - last_confirmed_decree); } @@ -169,8 +170,9 @@ uint64_t replica_duplicator::get_pending_mutations_count() const { // it's not atomic to read last_committed_decree in not-REPLICATION thread pool, // but enough for approximate statistic. - uint64_t cnt = _replica->last_committed_decree() - progress().last_decree; - return cnt > 0 ? cnt : 0; + int64_t cnt = _replica->last_committed_decree() - progress().last_decree; + // since last_committed_decree() is not atomic, `cnt` could probably be negative. + return cnt > 0 ? static_cast(cnt) : 0; } } // namespace replication diff --git a/src/dist/replication/lib/duplication/replica_duplicator_manager.cpp b/src/dist/replication/lib/duplication/replica_duplicator_manager.cpp index 75ebadc7e5..9139813f22 100644 --- a/src/dist/replication/lib/duplication/replica_duplicator_manager.cpp +++ b/src/dist/replication/lib/duplication/replica_duplicator_manager.cpp @@ -122,7 +122,7 @@ void replica_duplicator_manager::update_confirmed_decree_if_secondary(decree con int64_t replica_duplicator_manager::get_pending_mutations_count() const { int64_t total = 0; - for (auto &dup : _duplications) { + for (const auto &dup : _duplications) { total += dup.second->get_pending_mutations_count(); } return total; diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index d0c38f380f..aa2c16b0bf 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -683,7 +683,7 @@ void replica_stub::initialize_start() } #endif - if (!_options.duplication_disabled) { + if (_options.duplication_enabled) { _duplication_sync_timer = dsn::make_unique(this); _duplication_sync_timer->start(); } diff --git a/src/dist/replication/meta_server/meta_service.cpp b/src/dist/replication/meta_server/meta_service.cpp index 04d2c0a09e..100e9b2e2c 100644 --- a/src/dist/replication/meta_server/meta_service.cpp +++ b/src/dist/replication/meta_server/meta_service.cpp @@ -869,7 +869,7 @@ void meta_service::register_duplication_rpc_handlers() void meta_service::initialize_duplication_service() { - if (!_opts.duplication_disabled) { + if (!_opts.duplication_enabled) { _dup_svc = make_unique(_state.get(), this); } } diff --git a/src/dist/replication/test/meta_test/unit_test/config-test.ini b/src/dist/replication/test/meta_test/unit_test/config-test.ini index 399886b094..98edc71b0e 100644 --- a/src/dist/replication/test/meta_test/unit_test/config-test.ini +++ b/src/dist/replication/test/meta_test/unit_test/config-test.ini @@ -86,7 +86,7 @@ cold_backup_disabled = false [replication] cluster_name = master-cluster -duplication_disabled = false +duplication_enabled = true [replication.app] app_name = simple_kv.instance0 From 0a1ef53bc3be05a4516abf61949bb156db5516ad Mon Sep 17 00:00:00 2001 From: neverchanje Date: Tue, 18 Feb 2020 18:19:55 +0800 Subject: [PATCH 8/8] fix --- src/dist/replication/meta_server/meta_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dist/replication/meta_server/meta_service.cpp b/src/dist/replication/meta_server/meta_service.cpp index 100e9b2e2c..5e9b590a69 100644 --- a/src/dist/replication/meta_server/meta_service.cpp +++ b/src/dist/replication/meta_server/meta_service.cpp @@ -869,7 +869,7 @@ void meta_service::register_duplication_rpc_handlers() void meta_service::initialize_duplication_service() { - if (!_opts.duplication_enabled) { + if (_opts.duplication_enabled) { _dup_svc = make_unique(_state.get(), this); } }