Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

refactor: use latest latency tool to add point for write process #951

Merged
merged 11 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/dsn/tool-api/aio_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@

namespace dsn {

namespace utils {
class latency_tracer;
}

enum aio_type
{
AIO_Invalid,
Expand Down Expand Up @@ -102,6 +106,7 @@ class aio_task : public task

std::vector<dsn_file_buffer_t> _unmerged_write_buffers;
blob _merged_write_buffer_holder;
std::shared_ptr<dsn::utils::latency_tracer> _tracer;

protected:
void clear_non_trivial_on_task_end() override { _cb = nullptr; }
Expand Down
5 changes: 5 additions & 0 deletions include/dsn/utils/latency_tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ class latency_tracer
// |-->stageC2[rpc]-->....
void add_sub_tracer(const std::shared_ptr<latency_tracer> &tracer);

void add_sub_tracer(const std::string &name);

std::shared_ptr<latency_tracer> sub_tracer(const std::string &name);

void set_name(const std::string &name) { _name = name; }
Expand All @@ -152,6 +154,8 @@ class latency_tracer

uint64_t last_time() const { return _last_time; }

const std::string &last_stage_name() const { return _last_stage; }

private:
// report the trace point duration to monitor system
static void report_trace_point(const std::string &name, uint64_t span);
Expand All @@ -165,6 +169,7 @@ class latency_tracer
uint64_t _threshold;
uint64_t _start_time;
uint64_t _last_time;
std::string _last_stage;

dsn::task_code _task_code;
bool _enable_trace;
Expand Down
3 changes: 3 additions & 0 deletions src/aio/aio_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "runtime/task/task_engine.h"
#include <dsn/tool-api/file_io.h>
#include <dsn/utility/error_code.h>
#include <dsn/utils/latency_tracer.h>

namespace dsn {

Expand All @@ -24,6 +25,8 @@ aio_task::aio_task(dsn::task_code code, aio_handler &&cb, int hash, service_node
set_error_code(ERR_IO_PENDING);

_aio_ctx = file::prepare_aio_context(this);

_tracer = std::make_shared<dsn::utils::latency_tracer>(true, "aio_task", 0, code);
}

void aio_task::collapse()
Expand Down
5 changes: 5 additions & 0 deletions src/aio/native_linux_aio_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <dsn/c/api_utilities.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/fail_point.h>
#include <dsn/utils/latency_tracer.h>

namespace dsn {

Expand Down Expand Up @@ -134,12 +135,14 @@ void native_linux_aio_provider::submit_aio_task(aio_task *aio_tsk)
return;
}

ADD_POINT(aio_tsk->_tracer);
tasking::enqueue(
aio_tsk->code(), aio_tsk->tracker(), [=]() { aio_internal(aio_tsk); }, aio_tsk->hash());
}

error_code native_linux_aio_provider::aio_internal(aio_task *aio_tsk)
{
ADD_POINT(aio_tsk->_tracer);
aio_context *aio_ctx = aio_tsk->get_aio_context();
error_code err = ERR_UNKNOWN;
uint32_t processed_bytes = 0;
Expand All @@ -154,6 +157,8 @@ error_code native_linux_aio_provider::aio_internal(aio_task *aio_tsk)
return err;
}

ADD_CUSTOM_POINT(aio_tsk->_tracer, "completed");

complete_io(aio_tsk, err, processed_bytes);
return err;
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/consensus.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ struct prepare_ack
4:i64 decree;
5:i64 last_committed_decree_in_app;
6:i64 last_committed_decree_in_prepare_list;
7:optional i64 receive_timestamp;
8:optional i64 response_timestamp;
}

enum learn_type
Expand Down
8 changes: 8 additions & 0 deletions src/replica/log_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,14 @@ aio_task_ptr log_file::commit_log_blocks(log_appender &pending,
hash);
}

if (utils::FLAGS_enable_latency_tracer) {
tsk->_tracer->set_parent_point_name("commit_pending_mutations");
tsk->_tracer->set_description("log");
for (const auto &mutation : pending.mutations()) {
mutation->_tracer->add_sub_tracer(tsk->_tracer);
}
}

_end_offset.fetch_add(size);
return tsk;
}
Expand Down
6 changes: 2 additions & 4 deletions src/replica/mutation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ DSN_DEFINE_uint64("replication",
abnormal_write_trace_latency_threshold,
1000 * 1000 * 1000, // 1s
"latency trace will be logged when exceed the write latency threshold");
DSN_TAG_VARIABLE(abnormal_write_trace_latency_threshold, FT_MUTABLE);

std::atomic<uint64_t> mutation::s_tid(0);

Expand All @@ -60,7 +61,7 @@ mutation::mutation()
_create_ts_ns = dsn_now_ns();
_tid = ++s_tid;
_is_sync_to_child = false;
tracer = std::make_shared<dsn::utils::latency_tracer>(
_tracer = std::make_shared<dsn::utils::latency_tracer>(
false, "mutation", FLAGS_abnormal_write_trace_latency_threshold);
}

Expand Down Expand Up @@ -145,9 +146,6 @@ void mutation::copy_from(mutation_ptr &old)

void mutation::add_client_request(task_code code, dsn::message_ex *request)
{
if (request != nullptr) {
ADD_CUSTOM_POINT(tracer, request->header->id);
}
data.updates.push_back(mutation_update());
mutation_update &update = data.updates.back();
_appro_data_bytes += 32; // approximate code size
Expand Down
2 changes: 1 addition & 1 deletion src/replica/mutation.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class mutation : public ref_counter
// used by pending mutation queue only
mutation *next;

std::shared_ptr<dsn::utils::latency_tracer> tracer;
std::shared_ptr<dsn::utils::latency_tracer> _tracer;

void set_is_sync_to_child(bool sync_to_child) { _is_sync_to_child = sync_to_child; }
bool is_sync_to_child() { return _is_sync_to_child; }
Expand Down
14 changes: 9 additions & 5 deletions src/replica/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ ::dsn::task_ptr mutation_log_shared::append(mutation_ptr &mu,

_slock.lock();

ADD_POINT(mu->tracer);
ADD_POINT(mu->_tracer);
// init pending buffer
if (nullptr == _pending_write) {
_pending_write = std::make_shared<log_appender>(mark_new_offset(0, true).second);
Expand Down Expand Up @@ -129,8 +129,10 @@ void mutation_log_shared::write_pending_mutations(bool release_lock_required)
void mutation_log_shared::commit_pending_mutations(log_file_ptr &lf,
std::shared_ptr<log_appender> &pending)
{
for (auto &mu : pending->mutations()) {
ADD_POINT(mu->tracer);
if (utils::FLAGS_enable_latency_tracer) {
for (auto &mu : pending->mutations()) {
ADD_POINT(mu->_tracer);
}
}
lf->commit_log_blocks( // forces a new line for params
*pending,
Expand All @@ -139,8 +141,10 @@ void mutation_log_shared::commit_pending_mutations(log_file_ptr &lf,
[this, lf, pending](error_code err, size_t sz) mutable {
dassert(_is_writing.load(std::memory_order_relaxed), "");

for (auto &mu : pending->mutations()) {
ADD_CUSTOM_POINT(mu->tracer, "commit_pending_completed");
if (utils::FLAGS_enable_latency_tracer) {
for (auto &mu : pending->mutations()) {
ADD_CUSTOM_POINT(mu->_tracer, "commit_pending_completed");
}
}

for (auto &block : pending->all_blocks()) {
Expand Down
2 changes: 1 addition & 1 deletion src/replica/prepare_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ error_code prepare_list::prepare(mutation_ptr &mu,
decree d = mu->data.header.decree;
dcheck_gt_replica(d, last_committed_decree());

ADD_POINT(mu->tracer);
ADD_POINT(mu->_tracer);
error_code err;
switch (status) {
case partition_status::PS_PRIMARY:
Expand Down
4 changes: 2 additions & 2 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ void replica::execute_mutation(mutation_ptr &mu)
}
break;
case partition_status::PS_PRIMARY: {
ADD_POINT(mu->tracer);
ADD_POINT(mu->_tracer);
check_state_completeness();
dassert(_app->last_committed_decree() + 1 == d,
"app commit: %" PRId64 ", mutation decree: %" PRId64 "",
Expand Down Expand Up @@ -364,7 +364,7 @@ void replica::execute_mutation(mutation_ptr &mu)
}

if (status() == partition_status::PS_PRIMARY) {
ADD_CUSTOM_POINT(mu->tracer, "completed");
ADD_CUSTOM_POINT(mu->_tracer, "completed");
mutation_ptr next = _primary_states.write_queue.check_possible_work(
static_cast<int>(_prepare_list->max_decree() - d));

Expand Down
31 changes: 19 additions & 12 deletions src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c
"invalid partition_status, status = %s",
enum_to_string(status()));

ADD_POINT(mu->tracer);
mu->_tracer->set_description("primary");
ADD_POINT(mu->_tracer);

error_code err = ERR_OK;
uint8_t count = 0;
Expand All @@ -180,7 +181,7 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c
mu->set_id(get_ballot(), mu->data.header.decree);
}

mu->tracer->set_name(fmt::format("mutation[{}]", mu->name()));
mu->_tracer->set_name(fmt::format("mutation[{}]", mu->name()));
dlog(level,
"%s: mutation %s init_prepare, mutation_tid=%" PRIu64,
name(),
Expand Down Expand Up @@ -318,7 +319,9 @@ void replica::send_prepare_message(::dsn::rpc_address addr,
bool pop_all_committed_mutations,
int64_t learn_signature)
{
ADD_CUSTOM_POINT(mu->tracer, addr.to_string());
mu->_tracer->add_sub_tracer(addr.to_string());
ADD_POINT(mu->_tracer->sub_tracer(addr.to_string()));

dsn::message_ex *msg = dsn::message_ex::create_request(
RPC_PREPARE, timeout_milliseconds, get_gpid().thread_hash());
replica_configuration rconfig;
Expand Down Expand Up @@ -361,7 +364,6 @@ void replica::do_possible_commit_on_primary(mutation_ptr &mu)
"invalid partition_status, status = %s",
enum_to_string(status()));

ADD_POINT(mu->tracer);
if (mu->is_ready_for_commit()) {
_prepare_list->commit(mu->data.header.decree, COMMIT_ALL_READY);
}
Expand All @@ -382,12 +384,12 @@ void replica::on_prepare(dsn::message_ex *request)
rconfig.split_sync_to_child = false;
}

ADD_POINT(mu->tracer);

decree decree = mu->data.header.decree;

dinfo("%s: mutation %s on_prepare", name(), mu->name());
mu->tracer->set_name(fmt::format("mutation[{}]", mu->name()));
mu->_tracer->set_name(fmt::format("mutation[{}]", mu->name()));
mu->_tracer->set_description("secondary");
ADD_POINT(mu->_tracer);

dassert(mu->data.header.pid == rconfig.pid,
"(%d.%d) VS (%d.%d)",
Expand Down Expand Up @@ -535,7 +537,7 @@ void replica::on_append_log_completed(mutation_ptr &mu, error_code err, size_t s
size,
err.to_string());

ADD_POINT(mu->tracer);
ADD_POINT(mu->_tracer);

if (err == ERR_OK) {
mu->set_logged();
Expand Down Expand Up @@ -601,8 +603,6 @@ void replica::on_prepare_reply(std::pair<mutation_ptr, partition_status::type> p
mutation_ptr mu = pr.first;
partition_status::type target_status = pr.second;

ADD_CUSTOM_POINT(mu->tracer, request->to_address.to_string());

// skip callback for old mutations
if (partition_status::PS_PRIMARY != status() || mu->data.header.ballot < get_ballot() ||
mu->get_decree() <= last_committed_decree())
Expand All @@ -627,6 +627,11 @@ void replica::on_prepare_reply(std::pair<mutation_ptr, partition_status::type> p
::dsn::unmarshall(reply, resp);
}

auto send_prepare_tracer = mu->_tracer->sub_tracer(request->to_address.to_string());
APPEND_EXTERN_POINT(send_prepare_tracer, resp.receive_timestamp, "remote_receive");
APPEND_EXTERN_POINT(send_prepare_tracer, resp.response_timestamp, "remote_reply");
ADD_CUSTOM_POINT(send_prepare_tracer, resp.err.to_string());

if (resp.err == ERR_OK) {
dinfo("%s: mutation %s on_prepare_reply from %s, appro_data_bytes = %d, "
"target_status = %s, err = %s",
Expand All @@ -637,7 +642,6 @@ void replica::on_prepare_reply(std::pair<mutation_ptr, partition_status::type> p
enum_to_string(target_status),
resp.err.to_string());
} else {
ADD_CUSTOM_POINT(mu->tracer, fmt::format("error:{}", request->to_address.to_string()));
derror("%s: mutation %s on_prepare_reply from %s, appro_data_bytes = %d, "
"target_status = %s, err = %s",
name(),
Expand Down Expand Up @@ -759,13 +763,16 @@ void replica::on_prepare_reply(std::pair<mutation_ptr, partition_status::type> p

void replica::ack_prepare_message(error_code err, mutation_ptr &mu)
{
ADD_CUSTOM_POINT(mu->tracer, name());
ADD_POINT(mu->_tracer);
prepare_ack resp;
resp.pid = get_gpid();
resp.err = err;
resp.ballot = get_ballot();
resp.decree = mu->data.header.decree;

resp.__set_receive_timestamp(mu->_tracer->start_time());
resp.__set_response_timestamp(dsn_now_ns());

// for partition_status::PS_POTENTIAL_SECONDARY ONLY
resp.last_committed_decree_in_app = _app->last_committed_decree();
resp.last_committed_decree_in_prepare_list = last_committed_decree();
Expand Down
2 changes: 1 addition & 1 deletion src/replica/replication_app_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ ::dsn::error_code replication_app_base::apply_mutation(const mutation *mu)
dassert(mu->data.updates.size() > 0, "");

if (_replica->status() == partition_status::PS_PRIMARY) {
ADD_POINT(mu->tracer);
ADD_POINT(mu->_tracer);
}

bool has_ingestion_request = false;
Expand Down
17 changes: 16 additions & 1 deletion src/utils/latency_tracer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ void latency_tracer::add_point(const std::string &stage_name)
utils::auto_write_lock write(_point_lock);
_points.emplace(ts, stage_name);
_last_time = ts;
_last_stage = stage_name;
}

void latency_tracer::append_point(const std::string &stage_name, uint64_t timestamp)
Expand All @@ -161,6 +162,20 @@ void latency_tracer::append_point(const std::string &stage_name, uint64_t timest
uint64_t cur_ts = timestamp > _last_time ? timestamp : _last_time + 1;
_points.emplace(cur_ts, stage_name);
_last_time = cur_ts;
_last_stage = stage_name;
}

void latency_tracer::add_sub_tracer(const std::string &name)
{
if (!_enable_trace) {
return;
}

auto sub_tracer = std::make_shared<dsn::utils::latency_tracer>(true, name, 0);
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
sub_tracer->set_parent_point_name(_last_stage);
sub_tracer->set_description(_description);
utils::auto_write_lock write(_sub_lock);
_sub_tracers.emplace(name, sub_tracer);
}

void latency_tracer::add_sub_tracer(const std::shared_ptr<latency_tracer> &tracer)
Expand Down Expand Up @@ -229,7 +244,7 @@ void latency_tracer::dump_trace_points(/*out*/ std::string &traces)
_is_sub ? fmt::format("{}.{}", _parent_point_name, cur_point_name)
: cur_point_name;
std::string trace_log =
fmt::format("\t{}TRACE:name={:<70}, span={:>20}, total={:>20}, "
fmt::format("\t{}TRACE:name={:<110}, span={:>20}, total={:>20}, "
"ts={:<20}\n",
trace_format,
trace_name,
Expand Down