Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(dup): add metrics for duplication #393

Merged
merged 14 commits into from
Feb 19, 2020
9 changes: 3 additions & 6 deletions src/dist/replication/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ replication_options::replication_options()
delay_for_fd_timeout_on_start = false;
empty_write_disabled = false;
allow_non_idempotent_write = false;
duplication_disabled = true;
duplication_enabled = false;

prepare_timeout_ms_for_secondaries = 1000;
prepare_timeout_ms_for_potential_secondaries = 3000;
Expand Down Expand Up @@ -271,11 +271,8 @@ void replication_options::initialize()
allow_non_idempotent_write,
"whether to allow non-idempotent write, default is false");

duplication_disabled = dsn_config_get_value_bool(
"replication", "duplication_disabled", duplication_disabled, "is duplication disabled");
if (allow_non_idempotent_write && !duplication_disabled) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为什么不要这个约束了?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

暂时不考虑这个约束,一方面因为我们线上默认开启 allow_non_idempotent_write,一方面是开启热备份的表可以在接入层面对业务进行要求,不一定要写死在程序里。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

那现在开热备的表能够同时进行非幂等吗?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

目前不行,如果禁止的话,可能也不会依赖配置来禁止非幂等的写,毕竟一个集群可能有的表热备份,有的表不热备份。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯,好,那目前有什么措施保证在热备的表没有进行非幂等操作呢?如果这个由业务控制而我们代码上没有限制,感觉还是有些不安全

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

目前还没有,初步的想法是在 pegasus 那边改,遇到 INCR 和 CHECK_AND_SET 就写一个 empty write,然后返回错误。但是还没实现。HBase 是支持热备份 INCR 的,就是复制的过程中,把 INCR 转为 PUT,但是这个流程 pegasus 这边很难写。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯,好,记个TODO吧,这个最好还是代码上限制一下,靠业务的自觉性太不安全了

dassert(false, "duplication and non-idempotent write cannot be enabled together");
}
duplication_enabled = dsn_config_get_value_bool(
"replication", "duplication_enabled", duplication_enabled, "is duplication enabled");

prepare_timeout_ms_for_secondaries = (int)dsn_config_get_value_uint64(
"replication",
Expand Down
2 changes: 1 addition & 1 deletion src/dist/replication/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class replication_options
bool delay_for_fd_timeout_on_start;
bool empty_write_disabled;
bool allow_non_idempotent_write;
bool duplication_disabled;
bool duplication_enabled;

int32_t prepare_timeout_ms_for_secondaries;
int32_t prepare_timeout_ms_for_potential_secondaries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ void ship_mutation::ship(mutation_tuple_set &&in)
{
_mutation_duplicator->duplicate(std::move(in), [this](size_t total_shipped_size) mutable {
update_progress();
_stub->_counter_dup_shipped_bytes_rate->add(total_shipped_size);
step_down_next_stage();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@ void duplication_sync_timer::run()
req->node = _stub->primary_address();

// collects confirm points from all primaries on this server
uint64_t pending_muts_cnt = 0;
for (const replica_ptr &r : get_all_primaries()) {
auto confirmed = r->get_duplication_manager()->get_duplication_confirms_to_update();
if (!confirmed.empty()) {
req->confirm_list[r->get_gpid()] = std::move(confirmed);
}
pending_muts_cnt += r->get_duplication_manager()->get_pending_mutations_count();
}
_stub->_counter_dup_pending_mutations_count->set(pending_muts_cnt);

duplication_sync_rpc rpc(std::move(req), RPC_CM_DUPLICATION_SYNC, 3_s);
rpc_address meta_server_address(_stub->get_meta_server_address());
Expand Down
20 changes: 11 additions & 9 deletions src/dist/replication/lib/duplication/load_from_private_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,17 @@ void load_from_private_log::find_log_file_to_start(std::map<int, log_file_ptr> l

void load_from_private_log::replay_log_block()
{
error_s err =
mutation_log::replay_block(_current,
[this](int log_bytes_length, mutation_ptr &mu) -> bool {
auto es = _mutation_batch.add(std::move(mu));
dassert_replica(es.is_ok(), es.description());
return true;
},
_start_offset,
_current_global_end_offset);
error_s err = mutation_log::replay_block(
_current,
[this](int log_bytes_length, mutation_ptr &mu) -> bool {
auto es = _mutation_batch.add(std::move(mu));
dassert_replica(es.is_ok(), es.description());
_stub->_counter_dup_log_read_bytes_rate->add(log_bytes_length);
_stub->_counter_dup_log_read_mutations_rate->increment();
return true;
},
_start_offset,
_current_global_end_offset);
if (!err.is_ok()) {
if (err.code() == ERR_HANDLE_EOF && switch_to_next_log_file()) {
repeat();
Expand Down
15 changes: 15 additions & 0 deletions src/dist/replication/lib/duplication/replica_duplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ error_s replica_duplicator::update_progress(const duplication_progress &p)
_progress.confirmed_decree);
}

decree last_confirmed_decree = _progress.confirmed_decree;
_progress.confirmed_decree = std::max(_progress.confirmed_decree, p.confirmed_decree);
_progress.last_decree = std::max(_progress.last_decree, p.last_decree);

Expand All @@ -138,6 +139,11 @@ error_s replica_duplicator::update_progress(const duplication_progress &p)
_progress.last_decree,
_progress.confirmed_decree);
}
if (_progress.confirmed_decree > last_confirmed_decree) {
// has confirmed_decree updated.
_stub->_counter_dup_confirmed_rate->add(_progress.confirmed_decree - last_confirmed_decree);
}

return error_s::ok();
}

Expand All @@ -160,5 +166,14 @@ decree replica_duplicator::get_max_gced_decree() const
return _replica->private_log()->max_gced_decree(_replica->get_gpid());
}

uint64_t replica_duplicator::get_pending_mutations_count() const
{
// it's not atomic to read last_committed_decree in not-REPLICATION thread pool,
// but enough for approximate statistic.
int64_t cnt = _replica->last_committed_decree() - progress().last_decree;
// since last_committed_decree() is not atomic, `cnt` could probably be negative.
return cnt > 0 ? static_cast<uint64_t>(cnt) : 0;
}

} // namespace replication
} // namespace dsn
3 changes: 3 additions & 0 deletions src/dist/replication/lib/duplication/replica_duplicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ class replica_duplicator : public replica_base, public pipeline::base

decree get_max_gced_decree() const;

// For metric "dup.pending_mutations_count"
uint64_t get_pending_mutations_count() const;

private:
friend class replica_duplicator_test;
friend class duplication_sync_timer_test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,14 @@ void replica_duplicator_manager::update_confirmed_decree_if_secondary(decree con
}
}

int64_t replica_duplicator_manager::get_pending_mutations_count() const
{
int64_t total = 0;
for (const auto &dup : _duplications) {
total += dup.second->get_pending_mutations_count();
}
return total;
}

} // namespace replication
} // namespace dsn
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class replica_duplicator_manager : public replica_base
/// \see replica_check.cpp
void update_confirmed_decree_if_secondary(decree confirmed);

/// Sums up the number of pending mutations for all duplications
/// on this replica, for metric "dup.pending_mutations_count".
int64_t get_pending_mutations_count() const;

private:
void sync_duplication(const duplication_entry &ent);

Expand Down
34 changes: 33 additions & 1 deletion src/dist/replication/lib/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,38 @@ void replica_stub::install_perf_counters()
COUNTER_TYPE_VOLATILE_NUMBER,
"trigger emergency checkpoint count in the recent period");

// <- Duplication Metrics ->

_counter_dup_log_read_bytes_rate.init_app_counter("eon.replica_stub",
"dup.log_read_bytes_rate",
COUNTER_TYPE_RATE,
"reading rate of private log in bytes");
_counter_dup_log_read_mutations_rate.init_app_counter(
"eon.replica_stub",
"dup.log_read_mutations_rate",
COUNTER_TYPE_RATE,
"reading rate of mutations from private log");
_counter_dup_shipped_bytes_rate.init_app_counter("eon.replica_stub",
"dup.shipped_bytes_rate",
COUNTER_TYPE_RATE,
"shipping rate of private log in bytes");
_counter_dup_confirmed_rate.init_app_counter("eon.replica_stub",
"dup.confirmed_rate",
COUNTER_TYPE_RATE,
"increasing rate of confirmed mutations");
_counter_dup_pending_mutations_count.init_app_counter(
"eon.replica_stub",
"dup.pending_mutations_count",
COUNTER_TYPE_VOLATILE_NUMBER,
"number of mutations pending for duplication");
_counter_dup_time_lag.init_app_counter(
"eon.replica_stub",
"dup.time_lag(ms)",
COUNTER_TYPE_NUMBER_PERCENTILES,
"time (in ms) lag between master and slave in the duplication");

// <- Cold Backup Metrics ->

levy5307 marked this conversation as resolved.
Show resolved Hide resolved
_counter_cold_backup_running_count.init_app_counter("eon.replica_stub",
"cold.backup.running.count",
COUNTER_TYPE_NUMBER,
Expand Down Expand Up @@ -651,7 +683,7 @@ void replica_stub::initialize_start()
}
#endif

if (!_options.duplication_disabled) {
if (_options.duplication_enabled) {
_duplication_sync_timer = dsn::make_unique<duplication_sync_timer>(this);
_duplication_sync_timer->start();
}
Expand Down
15 changes: 12 additions & 3 deletions src/dist/replication/lib/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,16 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
perf_counter_wrapper _counter_shared_log_recent_write_size;
perf_counter_wrapper _counter_recent_trigger_emergency_checkpoint_count;

// <- Duplication Metrics ->
// TODO(wutao1): calculate the counters independently for each remote cluster
// if we need to duplicate to multiple clusters someday.
perf_counter_wrapper _counter_dup_log_read_bytes_rate;
perf_counter_wrapper _counter_dup_log_read_mutations_rate;
perf_counter_wrapper _counter_dup_shipped_bytes_rate;
perf_counter_wrapper _counter_dup_confirmed_rate;
perf_counter_wrapper _counter_dup_pending_mutations_count;
perf_counter_wrapper _counter_dup_time_lag;

perf_counter_wrapper _counter_cold_backup_running_count;
perf_counter_wrapper _counter_cold_backup_recent_start_count;
perf_counter_wrapper _counter_cold_backup_recent_succ_count;
Expand All @@ -387,6 +397,5 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter

dsn::task_tracker _tracker;
};
//------------ inline impl ----------------------
}
} // namespace
} // namespace replication
} // namespace dsn
2 changes: 1 addition & 1 deletion src/dist/replication/meta_server/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ void meta_service::register_duplication_rpc_handlers()

void meta_service::initialize_duplication_service()
{
if (!_opts.duplication_disabled) {
if (_opts.duplication_enabled) {
_dup_svc = make_unique<meta_duplication_service>(_state.get(), this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ cold_backup_disabled = false

[replication]
cluster_name = master-cluster
duplication_disabled = false
duplication_enabled = true

[replication.app]
app_name = simple_kv.instance0
Expand Down
12 changes: 6 additions & 6 deletions src/dist/replication/test/simple_kv/case-601.act
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

set:load_balance_for_test=1,not_exit_on_log_failure=1

# Initial ballot is 0, after adding two peers, the ballot becomes 2.
# wait primary to add r3 as learner
state:{{r1,pri,2,0},{r2,sec,2,0},{r3,pot,2,0}}

Expand All @@ -13,17 +14,16 @@ inject:on_rpc_call:rpc_name=RPC_LEARN_COMPLETION_NOTIFY,from=r3,to=r1
client:begin_write:id=1,key=k1,timeout=0
inject:on_rpc_call:rpc_name=RPC_PREPARE,from=r1,to=r2

# r1 will remove r2 due to prepare timeout
# r1 will first downgrade to inactive because of prepare timeout,
# then increase its ballot to 3 and upgrade again as a primary.
config:{3,r1,[]}

# after r1 update config, it will broad group_check, so r3
# after r1 update config, it will broadcast group_check, so r3
# will receive group_check as a learner
wait:on_rpc_call:rpc_name=RPC_GROUP_CHECK,from=r1,to=r3

# after r3 receive the group check, it will update its ballot: 2->3.
# r2 will become learner some while later
state:{{r1,pri,3,0},{r2,pot,3,0},{r3,pot,3,0}}
# Again r1 will fail on prepare, and increase ballot 3->4.
config:{4,r1,[r3]}

# both r2 & r2 will be secondary eventually
config:{5,r1,[r2,r3]}