Skip to content

Commit

Permalink
GH-1690 Remove thread hop to producer thread pool and go straight to …
Browse files Browse the repository at this point in the history
…app thread
  • Loading branch information
heifner committed Nov 2, 2023
1 parent 87e34cb commit d1ab313
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 57 deletions.
6 changes: 6 additions & 0 deletions libraries/chain/include/eosio/chain/transaction_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ class transaction_metadata {
start_recover_keys( packed_transaction_ptr trx, boost::asio::io_context& thread_pool,
const chain_id_type& chain_id, fc::microseconds time_limit,
trx_type t, uint32_t max_variable_sig_size = UINT32_MAX );
/// Thread safe.
/// @returns transaction_metadata_ptr or throws
static transaction_metadata_ptr
recover_keys( packed_transaction_ptr trx,
const chain_id_type& chain_id, fc::microseconds time_limit,
trx_type t, uint32_t max_variable_sig_size = UINT32_MAX );

/// @returns constructed transaction_metadata with no key recovery (sig_cpu_usage=0, recovered_pub_keys=empty)
static transaction_metadata_ptr
Expand Down
26 changes: 17 additions & 9 deletions libraries/chain/transaction_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,23 @@ recover_keys_future transaction_metadata::start_recover_keys( packed_transaction
uint32_t max_variable_sig_size )
{
return post_async_task( thread_pool, [trx{std::move(trx)}, chain_id, time_limit, t, max_variable_sig_size]() mutable {
fc::time_point deadline = time_limit == fc::microseconds::maximum() ?
fc::time_point::maximum() : fc::time_point::now() + time_limit;
check_variable_sig_size( trx, max_variable_sig_size );
const signed_transaction& trn = trx->get_signed_transaction();
flat_set<public_key_type> recovered_pub_keys;
fc::microseconds cpu_usage = trn.get_signature_keys( chain_id, deadline, recovered_pub_keys );
return std::make_shared<transaction_metadata>( private_type(), std::move( trx ), cpu_usage, std::move( recovered_pub_keys ), t );
}
);
return recover_keys( std::move(trx), chain_id, time_limit, t, max_variable_sig_size );
});
}

transaction_metadata_ptr transaction_metadata::recover_keys( packed_transaction_ptr trx,
const chain_id_type& chain_id,
fc::microseconds time_limit,
trx_type t,
uint32_t max_variable_sig_size )
{
fc::time_point deadline = time_limit == fc::microseconds::maximum() ?
fc::time_point::maximum() : fc::time_point::now() + time_limit;
check_variable_sig_size( trx, max_variable_sig_size );
const signed_transaction& trn = trx->get_signed_transaction();
flat_set<public_key_type> recovered_pub_keys;
fc::microseconds cpu_usage = trn.get_signature_keys( chain_id, deadline, recovered_pub_keys );
return std::make_shared<transaction_metadata>( private_type(), std::move( trx ), cpu_usage, std::move( recovered_pub_keys ), t );
}

size_t transaction_metadata::get_estimated_size() const {
Expand Down
105 changes: 57 additions & 48 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,12 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
int64_t sub_bill,
uint32_t prev_billed_cpu_time_us);

void log_trx_results(const transaction_metadata_ptr& trx, const transaction_trace_ptr& trace, const fc::time_point& start);
void log_trx_results(const transaction_metadata_ptr& trx, const transaction_trace_ptr& trace);
void log_trx_results(const transaction_metadata_ptr& trx, const fc::exception_ptr& except_ptr);
void log_trx_results(const packed_transaction_ptr& trx,
const transaction_trace_ptr& trace,
const fc::exception_ptr& except_ptr,
uint32_t billed_cpu_us,
const fc::time_point& start,
bool is_transient);

void add_greylist_accounts(const producer_plugin::greylist_params& params) {
Expand Down Expand Up @@ -802,13 +801,6 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
const auto max_trx_time_ms = (trx_type == transaction_metadata::trx_type::read_only) ? -1 : _max_transaction_time_ms.load();
fc::microseconds max_trx_cpu_usage = max_trx_time_ms < 0 ? fc::microseconds::maximum() : fc::milliseconds(max_trx_time_ms);

auto future = transaction_metadata::start_recover_keys(trx,
_thread_pool.get_executor(),
chain.get_chain_id(),
fc::microseconds(max_trx_cpu_usage),
trx_type,
chain.configured_subjective_signature_length_limit());

auto is_transient = (trx_type == transaction_metadata::trx_type::read_only || trx_type == transaction_metadata::trx_type::dry_run);
if (!is_transient) {
next = [this, trx, next{std::move(next)}](const next_function_variant<transaction_trace_ptr>& response) {
Expand All @@ -825,38 +817,57 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
};
}

boost::asio::post(_thread_pool.get_executor(),
[self = this, future{std::move(future)}, api_trx, is_transient, return_failure_traces,
next{std::move(next)}, trx = trx]() mutable {
if (future.valid()) {
future.wait();
app().executor().post(priority::low, exec_queue::read_write,
[self, future{std::move(future)}, api_trx, is_transient, next{std::move(next)}, trx{std::move(trx)},
return_failure_traces]() mutable {
auto start = fc::time_point::now();
auto idle_time = self->_time_tracker.add_idle_time(start);
auto trx_tracker = self->_time_tracker.start_trx(is_transient, start);
fc_tlog(_log, "Time since last trx: ${t}us", ("t", idle_time));

auto exception_handler =
[self, is_transient, &next, trx{std::move(trx)}, &start](fc::exception_ptr ex) {
self->log_trx_results(trx, nullptr, ex, 0, start, is_transient);
next(std::move(ex));
};
try {
auto result = future.get();
if (!self->process_incoming_transaction_async(result, api_trx, return_failure_traces, trx_tracker, next)) {
if (self->in_producing_mode()) {
self->schedule_maybe_produce_block(true);
} else {
self->restart_speculative_block();
}
}
}
CATCH_AND_CALL(exception_handler);
});
}
});
boost::asio::post( chain.get_thread_pool(),
[this, trx{trx}, &chain, time_limit{max_trx_cpu_usage}, trx_type,
api_trx, is_transient, next{std::move(next)}, return_failure_traces]() mutable {

transaction_metadata_ptr trx_meta;
try {
trx_meta = transaction_metadata::recover_keys( trx, chain.get_chain_id(), time_limit, trx_type, chain.configured_subjective_signature_length_limit() );
} catch(...) {
// use read_write when read is likely fine; this maintains previous behavior of next() always being called from the main thread
app().executor().post(priority::low, exec_queue::read_write,
[ex_ptr = std::current_exception(), this, trx{std::move(trx)}, is_transient, next{std::move(next)}]() {
auto start = fc::time_point::now();
auto idle_time = this->_time_tracker.add_idle_time(start);
auto trx_tracker = this->_time_tracker.start_trx(is_transient, start);
fc_tlog(_log, "Time since last trx: ${t}us", ("t", idle_time));
auto ex_handler =
[this, is_transient, &next, &trx](fc::exception_ptr ex) {
this->log_trx_results(trx, nullptr, ex, 0, is_transient);
next(std::move(ex));
};
try {
std::rethrow_exception(ex_ptr);
} CATCH_AND_CALL(ex_handler)
});
return;
}

app().executor().post(priority::low, exec_queue::read_write,
[this, trx_meta{std::move(trx_meta)}, api_trx, is_transient, next{std::move(next)}, return_failure_traces]() mutable {
auto start = fc::time_point::now();
auto idle_time = this->_time_tracker.add_idle_time(start);
auto trx_tracker = this->_time_tracker.start_trx(is_transient, start);
fc_tlog(_log, "Time since last trx: ${t}us", ("t", idle_time));

auto exception_handler =
[this, is_transient, &next, &trx{trx_meta->packed_trx()}](fc::exception_ptr ex) {
this->log_trx_results(trx, nullptr, ex, 0, is_transient);
next(std::move(ex));
};
try {
if (!this->process_incoming_transaction_async(trx_meta, api_trx, return_failure_traces, trx_tracker, next)) {
if (this->in_producing_mode()) {
this->schedule_maybe_produce_block(true);
} else {
this->restart_speculative_block();
}
}
}
CATCH_AND_CALL(exception_handler);
});
} );
}

bool process_incoming_transaction_async(const transaction_metadata_ptr& trx,
Expand Down Expand Up @@ -2043,22 +2054,20 @@ inline std::string get_detailed_contract_except_info(const packed_transaction_pt
}

void producer_plugin_impl::log_trx_results(const transaction_metadata_ptr& trx,
const transaction_trace_ptr& trace,
const fc::time_point& start) {
const transaction_trace_ptr& trace) {
uint32_t billed_cpu_time_us = (trace && trace->receipt) ? trace->receipt->cpu_usage_us : 0;
log_trx_results(trx->packed_trx(), trace, nullptr, billed_cpu_time_us, start, trx->is_transient());
log_trx_results(trx->packed_trx(), trace, nullptr, billed_cpu_time_us, trx->is_transient());
}

void producer_plugin_impl::log_trx_results(const transaction_metadata_ptr& trx, const fc::exception_ptr& except_ptr) {
uint32_t billed_cpu_time_us = trx ? trx->billed_cpu_time_us : 0;
log_trx_results(trx->packed_trx(), nullptr, except_ptr, billed_cpu_time_us, fc::time_point::now(), trx->is_transient());
log_trx_results(trx->packed_trx(), nullptr, except_ptr, billed_cpu_time_us, trx->is_transient());
}

void producer_plugin_impl::log_trx_results(const packed_transaction_ptr& trx,
const transaction_trace_ptr& trace,
const fc::exception_ptr& except_ptr,
uint32_t billed_cpu_us,
const fc::time_point& start,
bool is_transient) {
chain::controller& chain = chain_plug->chain();

Expand Down Expand Up @@ -2226,7 +2235,7 @@ producer_plugin_impl::handle_push_result(const transaction_metadata_ptr&
if (!disable_subjective_enforcement) // subjectively bill failure when producing since not in objective cpu account billing
subjective_bill.subjective_bill_failure(first_auth, trace->elapsed, fc::time_point::now());

log_trx_results(trx, trace, start);
log_trx_results(trx, trace);
// this failed our configured maximum transaction time, we don't want to replay it
fc_tlog(_log, "Failed ${c} trx, auth: ${a}, prev billed: ${p}us, ran: ${r}us, id: ${id}, except: ${e}",
("c", e.code())("a", first_auth)("p", prev_billed_cpu_time_us)("r", end - start)("id", trx->id())("e", e));
Expand All @@ -2245,7 +2254,7 @@ producer_plugin_impl::handle_push_result(const transaction_metadata_ptr&
} else {
fc_tlog(_log, "Subjective bill for success ${a}: ${b} elapsed ${t}us, time ${r}us",
("a", first_auth)("b", sub_bill)("t", trace->elapsed)("r", end - start));
log_trx_results(trx, trace, start);
log_trx_results(trx, trace);
// if producing then trx is in objective cpu account billing
if (!disable_subjective_enforcement && !in_producing_mode()) {
subjective_bill.subjective_bill(trx->id(), trx->packed_trx()->expiration(), first_auth, trace->elapsed);
Expand Down

0 comments on commit d1ab313

Please sign in to comment.