Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove thread hop to producer thread for trx signature recovery #1859

Merged
merged 6 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions docs/01_nodeos/03_plugins/producer_plugin/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ Config Options for eosio::producer_plugin:
--disable-subjective-api-billing arg (=1)
Disable subjective CPU billing for API
transactions
--producer-threads arg (=2) Number of worker threads in producer
thread pool
--snapshots-dir arg (="snapshots") the location of the snapshots directory
(absolute path or relative to
application data dir)
Expand Down
6 changes: 6 additions & 0 deletions libraries/chain/include/eosio/chain/transaction_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ class transaction_metadata {
start_recover_keys( packed_transaction_ptr trx, boost::asio::io_context& thread_pool,
const chain_id_type& chain_id, fc::microseconds time_limit,
trx_type t, uint32_t max_variable_sig_size = UINT32_MAX );
/// Thread safe.
/// @returns transaction_metadata_ptr or throws
static transaction_metadata_ptr
recover_keys( packed_transaction_ptr trx,
const chain_id_type& chain_id, fc::microseconds time_limit,
trx_type t, uint32_t max_variable_sig_size = UINT32_MAX );

/// @returns constructed transaction_metadata with no key recovery (sig_cpu_usage=0, recovered_pub_keys=empty)
static transaction_metadata_ptr
Expand Down
26 changes: 17 additions & 9 deletions libraries/chain/transaction_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,23 @@ recover_keys_future transaction_metadata::start_recover_keys( packed_transaction
uint32_t max_variable_sig_size )
{
return post_async_task( thread_pool, [trx{std::move(trx)}, chain_id, time_limit, t, max_variable_sig_size]() mutable {
fc::time_point deadline = time_limit == fc::microseconds::maximum() ?
fc::time_point::maximum() : fc::time_point::now() + time_limit;
check_variable_sig_size( trx, max_variable_sig_size );
const signed_transaction& trn = trx->get_signed_transaction();
flat_set<public_key_type> recovered_pub_keys;
fc::microseconds cpu_usage = trn.get_signature_keys( chain_id, deadline, recovered_pub_keys );
return std::make_shared<transaction_metadata>( private_type(), std::move( trx ), cpu_usage, std::move( recovered_pub_keys ), t );
}
);
return recover_keys( std::move(trx), chain_id, time_limit, t, max_variable_sig_size );
});
}

transaction_metadata_ptr transaction_metadata::recover_keys( packed_transaction_ptr trx,
const chain_id_type& chain_id,
fc::microseconds time_limit,
trx_type t,
uint32_t max_variable_sig_size )
{
fc::time_point deadline = time_limit == fc::microseconds::maximum() ?
fc::time_point::maximum() : fc::time_point::now() + time_limit;
check_variable_sig_size( trx, max_variable_sig_size );
const signed_transaction& trn = trx->get_signed_transaction();
flat_set<public_key_type> recovered_pub_keys;
fc::microseconds cpu_usage = trn.get_signature_keys( chain_id, deadline, recovered_pub_keys );
return std::make_shared<transaction_metadata>( private_type(), std::move( trx ), cpu_usage, std::move( recovered_pub_keys ), t );
}

size_t transaction_metadata::get_estimated_size() const {
Expand Down
123 changes: 60 additions & 63 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,12 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
int64_t sub_bill,
uint32_t prev_billed_cpu_time_us);

void log_trx_results(const transaction_metadata_ptr& trx, const transaction_trace_ptr& trace, const fc::time_point& start);
void log_trx_results(const transaction_metadata_ptr& trx, const transaction_trace_ptr& trace);
void log_trx_results(const transaction_metadata_ptr& trx, const fc::exception_ptr& except_ptr);
void log_trx_results(const packed_transaction_ptr& trx,
const transaction_trace_ptr& trace,
const fc::exception_ptr& except_ptr,
uint32_t billed_cpu_us,
const fc::time_point& start,
bool is_transient);

void add_greylist_accounts(const producer_plugin::greylist_params& params) {
Expand Down Expand Up @@ -495,8 +494,6 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
block_timing_util::producer_watermarks _producer_watermarks;
pending_block_mode _pending_block_mode = pending_block_mode::speculating;
unapplied_transaction_queue _unapplied_transactions;
size_t _thread_pool_size = config::default_controller_thread_pool_size;
named_thread_pool<struct prod> _thread_pool;
std::atomic<int32_t> _max_transaction_time_ms; // modified by app thread, read by net_plugin thread pool
std::atomic<uint32_t> _received_block{0}; // modified by net_plugin thread pool
fc::microseconds _max_irreversible_block_age_us;
Expand Down Expand Up @@ -798,17 +795,9 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
return;
}

chain::controller& chain = chain_plug->chain();
const auto max_trx_time_ms = (trx_type == transaction_metadata::trx_type::read_only) ? -1 : _max_transaction_time_ms.load();
fc::microseconds max_trx_cpu_usage = max_trx_time_ms < 0 ? fc::microseconds::maximum() : fc::milliseconds(max_trx_time_ms);

auto future = transaction_metadata::start_recover_keys(trx,
chain.get_thread_pool(),
chain.get_chain_id(),
fc::microseconds(max_trx_cpu_usage),
trx_type,
chain.configured_subjective_signature_length_limit());

auto is_transient = (trx_type == transaction_metadata::trx_type::read_only || trx_type == transaction_metadata::trx_type::dry_run);
if (!is_transient) {
next = [this, trx, next{std::move(next)}](const next_function_variant<transaction_trace_ptr>& response) {
Expand All @@ -825,38 +814,60 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
};
}

boost::asio::post(_thread_pool.get_executor(),
[self = this, future{std::move(future)}, api_trx, is_transient, return_failure_traces,
next{std::move(next)}, trx = trx]() mutable {
if (future.valid()) {
future.wait();
app().executor().post(priority::low, exec_queue::read_write,
[self, future{std::move(future)}, api_trx, is_transient, next{std::move(next)}, trx{std::move(trx)},
return_failure_traces]() mutable {
auto start = fc::time_point::now();
auto idle_time = self->_time_tracker.add_idle_time(start);
auto trx_tracker = self->_time_tracker.start_trx(is_transient, start);
fc_tlog(_log, "Time since last trx: ${t}us", ("t", idle_time));

auto exception_handler =
[self, is_transient, &next, trx{std::move(trx)}, &start](fc::exception_ptr ex) {
self->log_trx_results(trx, nullptr, ex, 0, start, is_transient);
next(std::move(ex));
};
try {
auto result = future.get();
if (!self->process_incoming_transaction_async(result, api_trx, return_failure_traces, trx_tracker, next)) {
if (self->in_producing_mode()) {
self->schedule_maybe_produce_block(true);
} else {
self->restart_speculative_block();
}
}
}
CATCH_AND_CALL(exception_handler);
});
}
});
boost::asio::post(
chain_plug->chain().get_thread_pool(), // use chain thread pool for key recovery
[this, trx{trx}, time_limit{max_trx_cpu_usage}, trx_type, is_transient, next{std::move(next)}, api_trx, return_failure_traces]() mutable {

chain::controller& chain = chain_plug->chain();
transaction_metadata_ptr trx_meta;
try {
trx_meta = transaction_metadata::recover_keys(trx, chain.get_chain_id(), time_limit, trx_type,
chain.configured_subjective_signature_length_limit());
} catch (...) {
// use read_write when read is likely fine; maintains previous behavior of next() always being called from the main thread
app().executor().post(
priority::low, exec_queue::read_write,
[this, ex_ptr{std::current_exception()}, trx{std::move(trx)}, is_transient, next{std::move(next)}]() {
auto start = fc::time_point::now();
auto idle_time = _time_tracker.add_idle_time(start);
auto trx_tracker = _time_tracker.start_trx(is_transient, start);
fc_tlog(_log, "Time since last trx: ${t}us", ("t", idle_time));
auto ex_handler = [this, is_transient, &next, &trx](fc::exception_ptr ex) {
log_trx_results(trx, nullptr, ex, 0, is_transient);
next(std::move(ex));
};
try {
std::rethrow_exception(ex_ptr);
} CATCH_AND_CALL(ex_handler)
});
return;
}

// key recovery complete, continue execution on the main thread
app().executor().post(
priority::low, exec_queue::read_write,
[this, trx_meta{std::move(trx_meta)}, is_transient, next{std::move(next)}, api_trx, return_failure_traces]() {
auto start = fc::time_point::now();
auto idle_time = _time_tracker.add_idle_time(start);
auto trx_tracker = _time_tracker.start_trx(is_transient, start);
fc_tlog(_log, "Time since last trx: ${t}us", ("t", idle_time));

auto exception_handler = [this, is_transient, &next, &trx_meta](fc::exception_ptr ex) {
log_trx_results(trx_meta->packed_trx(), nullptr, ex, 0, is_transient);
next(std::move(ex));
};
try {
if (!process_incoming_transaction_async(trx_meta, api_trx, return_failure_traces, trx_tracker, next)) {
if (in_producing_mode()) {
schedule_maybe_produce_block(true);
} else {
restart_speculative_block();
}
}
}
CATCH_AND_CALL(exception_handler);
});
});
}

bool process_incoming_transaction_async(const transaction_metadata_ptr& trx,
Expand Down Expand Up @@ -1066,8 +1077,6 @@ void producer_plugin::set_program_options(
"Disable subjective CPU billing for P2P transactions")
("disable-subjective-api-billing", bpo::value<bool>()->default_value(true),
"Disable subjective CPU billing for API transactions")
("producer-threads", bpo::value<uint16_t>()->default_value(my->_thread_pool_size),
"Number of worker threads in producer thread pool")
("snapshots-dir", bpo::value<std::filesystem::path>()->default_value("snapshots"),
"the location of the snapshots directory (absolute path or relative to application data dir)")
("read-only-threads", bpo::value<uint32_t>(),
Expand Down Expand Up @@ -1185,9 +1194,6 @@ void producer_plugin_impl::plugin_initialize(const boost::program_options::varia
ilog("Subjective CPU billing of API trxs disabled ");
}

_thread_pool_size = options.at("producer-threads").as<uint16_t>();
EOS_ASSERT(_thread_pool_size > 0, plugin_config_exception, "producer-threads ${num} must be greater than 0", ("num", _thread_pool_size));

if (options.count("snapshots-dir")) {
auto sd = options.at("snapshots-dir").as<std::filesystem::path>();
if (sd.is_relative()) {
Expand Down Expand Up @@ -1313,12 +1319,6 @@ void producer_plugin_impl::plugin_startup() {
try {
ilog("producer plugin: plugin_startup() begin");

_thread_pool.start(_thread_pool_size, [](const fc::exception& e) {
fc_elog(_log, "Exception in producer thread pool, exiting: ${e}", ("e", e.to_detail_string()));
app().quit();
});


chain::controller& chain = chain_plug->chain();
EOS_ASSERT(_producers.empty() || chain.get_read_mode() != chain::db_read_mode::IRREVERSIBLE, plugin_config_exception,
"node cannot have any producer-name configured because block production is impossible when read_mode is \"irreversible\"");
Expand Down Expand Up @@ -1398,7 +1398,6 @@ void producer_plugin_impl::plugin_shutdown() {
_ro_timer.cancel(ec);
app().executor().stop();
_ro_thread_pool.stop();
_thread_pool.stop();
_unapplied_transactions.clear();

app().executor().post(0, [me = shared_from_this()]() {}); // keep my pointer alive until queue is drained
Expand Down Expand Up @@ -2043,22 +2042,20 @@ inline std::string get_detailed_contract_except_info(const packed_transaction_pt
}

void producer_plugin_impl::log_trx_results(const transaction_metadata_ptr& trx,
const transaction_trace_ptr& trace,
const fc::time_point& start) {
const transaction_trace_ptr& trace) {
uint32_t billed_cpu_time_us = (trace && trace->receipt) ? trace->receipt->cpu_usage_us : 0;
log_trx_results(trx->packed_trx(), trace, nullptr, billed_cpu_time_us, start, trx->is_transient());
log_trx_results(trx->packed_trx(), trace, nullptr, billed_cpu_time_us, trx->is_transient());
}

void producer_plugin_impl::log_trx_results(const transaction_metadata_ptr& trx, const fc::exception_ptr& except_ptr) {
uint32_t billed_cpu_time_us = trx ? trx->billed_cpu_time_us : 0;
log_trx_results(trx->packed_trx(), nullptr, except_ptr, billed_cpu_time_us, fc::time_point::now(), trx->is_transient());
log_trx_results(trx->packed_trx(), nullptr, except_ptr, billed_cpu_time_us, trx->is_transient());
}

void producer_plugin_impl::log_trx_results(const packed_transaction_ptr& trx,
const transaction_trace_ptr& trace,
const fc::exception_ptr& except_ptr,
uint32_t billed_cpu_us,
const fc::time_point& start,
bool is_transient) {
chain::controller& chain = chain_plug->chain();

Expand Down Expand Up @@ -2226,7 +2223,7 @@ producer_plugin_impl::handle_push_result(const transaction_metadata_ptr&
if (!disable_subjective_enforcement) // subjectively bill failure when producing since not in objective cpu account billing
subjective_bill.subjective_bill_failure(first_auth, trace->elapsed, fc::time_point::now());

log_trx_results(trx, trace, start);
log_trx_results(trx, trace);
// this failed our configured maximum transaction time, we don't want to replay it
fc_tlog(_log, "Failed ${c} trx, auth: ${a}, prev billed: ${p}us, ran: ${r}us, id: ${id}, except: ${e}",
("c", e.code())("a", first_auth)("p", prev_billed_cpu_time_us)("r", end - start)("id", trx->id())("e", e));
Expand All @@ -2245,7 +2242,7 @@ producer_plugin_impl::handle_push_result(const transaction_metadata_ptr&
} else {
fc_tlog(_log, "Subjective bill for success ${a}: ${b} elapsed ${t}us, time ${r}us",
("a", first_auth)("b", sub_bill)("t", trace->elapsed)("r", end - start));
log_trx_results(trx, trace, start);
log_trx_results(trx, trace);
// if producing then trx is in objective cpu account billing
if (!disable_subjective_enforcement && !in_producing_mode()) {
subjective_bill.subjective_bill(trx->id(), trx->packed_trx()->expiration(), first_auth, trace->elapsed);
Expand Down
Loading