-
Notifications
You must be signed in to change notification settings - Fork 73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove thread hop to producer thread for trx signature recovery #1859
Merged
Merged
Changes from 3 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
d1ab313
GH-1690 Remove thread hop to producer thread pool and go straight to …
heifner 3096b00
GH-1690 Remove --producer-threads option
heifner 32256ab
GH-1690 Fix GCC compiler error
heifner acf9e95
GH-1690 Remove this-> syntax
heifner 8d428eb
GH-1690 Cleanup to be more readable
heifner c5bb941
Merge remote-tracking branch 'origin/main' into GH-1690-performance
heifner File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -397,13 +397,12 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin | |
int64_t sub_bill, | ||
uint32_t prev_billed_cpu_time_us); | ||
|
||
void log_trx_results(const transaction_metadata_ptr& trx, const transaction_trace_ptr& trace, const fc::time_point& start); | ||
void log_trx_results(const transaction_metadata_ptr& trx, const transaction_trace_ptr& trace); | ||
void log_trx_results(const transaction_metadata_ptr& trx, const fc::exception_ptr& except_ptr); | ||
void log_trx_results(const packed_transaction_ptr& trx, | ||
const transaction_trace_ptr& trace, | ||
const fc::exception_ptr& except_ptr, | ||
uint32_t billed_cpu_us, | ||
const fc::time_point& start, | ||
bool is_transient); | ||
|
||
void add_greylist_accounts(const producer_plugin::greylist_params& params) { | ||
|
@@ -495,8 +494,6 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin | |
block_timing_util::producer_watermarks _producer_watermarks; | ||
pending_block_mode _pending_block_mode = pending_block_mode::speculating; | ||
unapplied_transaction_queue _unapplied_transactions; | ||
size_t _thread_pool_size = config::default_controller_thread_pool_size; | ||
named_thread_pool<struct prod> _thread_pool; | ||
std::atomic<int32_t> _max_transaction_time_ms; // modified by app thread, read by net_plugin thread pool | ||
std::atomic<uint32_t> _received_block{0}; // modified by net_plugin thread pool | ||
fc::microseconds _max_irreversible_block_age_us; | ||
|
@@ -802,13 +799,6 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin | |
const auto max_trx_time_ms = (trx_type == transaction_metadata::trx_type::read_only) ? -1 : _max_transaction_time_ms.load(); | ||
fc::microseconds max_trx_cpu_usage = max_trx_time_ms < 0 ? fc::microseconds::maximum() : fc::milliseconds(max_trx_time_ms); | ||
|
||
auto future = transaction_metadata::start_recover_keys(trx, | ||
_thread_pool.get_executor(), | ||
chain.get_chain_id(), | ||
fc::microseconds(max_trx_cpu_usage), | ||
trx_type, | ||
chain.configured_subjective_signature_length_limit()); | ||
|
||
auto is_transient = (trx_type == transaction_metadata::trx_type::read_only || trx_type == transaction_metadata::trx_type::dry_run); | ||
if (!is_transient) { | ||
next = [this, trx, next{std::move(next)}](const next_function_variant<transaction_trace_ptr>& response) { | ||
|
@@ -825,38 +815,57 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin | |
}; | ||
} | ||
|
||
boost::asio::post(_thread_pool.get_executor(), | ||
[self = this, future{std::move(future)}, api_trx, is_transient, return_failure_traces, | ||
next{std::move(next)}, trx = trx]() mutable { | ||
if (future.valid()) { | ||
future.wait(); | ||
app().executor().post(priority::low, exec_queue::read_write, | ||
[self, future{std::move(future)}, api_trx, is_transient, next{std::move(next)}, trx{std::move(trx)}, | ||
return_failure_traces]() mutable { | ||
auto start = fc::time_point::now(); | ||
auto idle_time = self->_time_tracker.add_idle_time(start); | ||
auto trx_tracker = self->_time_tracker.start_trx(is_transient, start); | ||
fc_tlog(_log, "Time since last trx: ${t}us", ("t", idle_time)); | ||
|
||
auto exception_handler = | ||
[self, is_transient, &next, trx{std::move(trx)}, &start](fc::exception_ptr ex) { | ||
self->log_trx_results(trx, nullptr, ex, 0, start, is_transient); | ||
next(std::move(ex)); | ||
}; | ||
try { | ||
auto result = future.get(); | ||
if (!self->process_incoming_transaction_async(result, api_trx, return_failure_traces, trx_tracker, next)) { | ||
if (self->in_producing_mode()) { | ||
self->schedule_maybe_produce_block(true); | ||
} else { | ||
self->restart_speculative_block(); | ||
} | ||
} | ||
} | ||
CATCH_AND_CALL(exception_handler); | ||
}); | ||
} | ||
}); | ||
boost::asio::post( chain.get_thread_pool(), | ||
[this, trx{trx}, &chain, time_limit{max_trx_cpu_usage}, trx_type, | ||
api_trx, is_transient, next{std::move(next)}, return_failure_traces]() mutable { | ||
|
||
transaction_metadata_ptr trx_meta; | ||
try { | ||
trx_meta = transaction_metadata::recover_keys( trx, chain.get_chain_id(), time_limit, trx_type, chain.configured_subjective_signature_length_limit() ); | ||
} catch(...) { | ||
// use read_write when read is likely fine; this maintains previous behavior of next() always being called from the main thread | ||
app().executor().post(priority::low, exec_queue::read_write, | ||
[ex_ptr = std::current_exception(), this, trx{std::move(trx)}, is_transient, next{std::move(next)}]() { | ||
auto start = fc::time_point::now(); | ||
auto idle_time = this->_time_tracker.add_idle_time(start); | ||
auto trx_tracker = this->_time_tracker.start_trx(is_transient, start); | ||
fc_tlog(_log, "Time since last trx: ${t}us", ("t", idle_time)); | ||
auto ex_handler = | ||
[this, is_transient, &next, &trx](fc::exception_ptr ex) { | ||
this->log_trx_results(trx, nullptr, ex, 0, is_transient); | ||
next(std::move(ex)); | ||
}; | ||
try { | ||
std::rethrow_exception(ex_ptr); | ||
} CATCH_AND_CALL(ex_handler) | ||
}); | ||
return; | ||
} | ||
|
||
app().executor().post(priority::low, exec_queue::read_write, | ||
[this, trx_meta{std::move(trx_meta)}, api_trx, is_transient, next{std::move(next)}, return_failure_traces]() mutable { | ||
auto start = fc::time_point::now(); | ||
auto idle_time = this->_time_tracker.add_idle_time(start); | ||
auto trx_tracker = this->_time_tracker.start_trx(is_transient, start); | ||
fc_tlog(_log, "Time since last trx: ${t}us", ("t", idle_time)); | ||
|
||
auto exception_handler = | ||
[this, is_transient, &next, &trx_meta](fc::exception_ptr ex) { | ||
this->log_trx_results(trx_meta->packed_trx(), nullptr, ex, 0, is_transient); | ||
next(std::move(ex)); | ||
}; | ||
try { | ||
if (!this->process_incoming_transaction_async(trx_meta, api_trx, return_failure_traces, trx_tracker, next)) { | ||
if (this->in_producing_mode()) { | ||
this->schedule_maybe_produce_block(true); | ||
} else { | ||
this->restart_speculative_block(); | ||
} | ||
} | ||
} | ||
CATCH_AND_CALL(exception_handler); | ||
}); | ||
} ); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be better to remove all occurences of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed |
||
} | ||
|
||
bool process_incoming_transaction_async(const transaction_metadata_ptr& trx, | ||
|
@@ -1066,8 +1075,6 @@ void producer_plugin::set_program_options( | |
"Disable subjective CPU billing for P2P transactions") | ||
("disable-subjective-api-billing", bpo::value<bool>()->default_value(true), | ||
"Disable subjective CPU billing for API transactions") | ||
("producer-threads", bpo::value<uint16_t>()->default_value(my->_thread_pool_size), | ||
"Number of worker threads in producer thread pool") | ||
("snapshots-dir", bpo::value<std::filesystem::path>()->default_value("snapshots"), | ||
"the location of the snapshots directory (absolute path or relative to application data dir)") | ||
("read-only-threads", bpo::value<uint32_t>(), | ||
|
@@ -1185,9 +1192,6 @@ void producer_plugin_impl::plugin_initialize(const boost::program_options::varia | |
ilog("Subjective CPU billing of API trxs disabled "); | ||
} | ||
|
||
_thread_pool_size = options.at("producer-threads").as<uint16_t>(); | ||
EOS_ASSERT(_thread_pool_size > 0, plugin_config_exception, "producer-threads ${num} must be greater than 0", ("num", _thread_pool_size)); | ||
|
||
if (options.count("snapshots-dir")) { | ||
auto sd = options.at("snapshots-dir").as<std::filesystem::path>(); | ||
if (sd.is_relative()) { | ||
|
@@ -1313,12 +1317,6 @@ void producer_plugin_impl::plugin_startup() { | |
try { | ||
ilog("producer plugin: plugin_startup() begin"); | ||
|
||
_thread_pool.start(_thread_pool_size, [](const fc::exception& e) { | ||
fc_elog(_log, "Exception in producer thread pool, exiting: ${e}", ("e", e.to_detail_string())); | ||
app().quit(); | ||
}); | ||
|
||
|
||
chain::controller& chain = chain_plug->chain(); | ||
EOS_ASSERT(_producers.empty() || chain.get_read_mode() != chain::db_read_mode::IRREVERSIBLE, plugin_config_exception, | ||
"node cannot have any producer-name configured because block production is impossible when read_mode is \"irreversible\""); | ||
|
@@ -1398,7 +1396,6 @@ void producer_plugin_impl::plugin_shutdown() { | |
_ro_timer.cancel(ec); | ||
app().executor().stop(); | ||
_ro_thread_pool.stop(); | ||
_thread_pool.stop(); | ||
_unapplied_transactions.clear(); | ||
|
||
app().executor().post(0, [me = shared_from_this()]() {}); // keep my pointer alive until queue is drained | ||
|
@@ -2043,22 +2040,20 @@ inline std::string get_detailed_contract_except_info(const packed_transaction_pt | |
} | ||
|
||
void producer_plugin_impl::log_trx_results(const transaction_metadata_ptr& trx, | ||
const transaction_trace_ptr& trace, | ||
const fc::time_point& start) { | ||
const transaction_trace_ptr& trace) { | ||
uint32_t billed_cpu_time_us = (trace && trace->receipt) ? trace->receipt->cpu_usage_us : 0; | ||
log_trx_results(trx->packed_trx(), trace, nullptr, billed_cpu_time_us, start, trx->is_transient()); | ||
log_trx_results(trx->packed_trx(), trace, nullptr, billed_cpu_time_us, trx->is_transient()); | ||
} | ||
|
||
void producer_plugin_impl::log_trx_results(const transaction_metadata_ptr& trx, const fc::exception_ptr& except_ptr) { | ||
uint32_t billed_cpu_time_us = trx ? trx->billed_cpu_time_us : 0; | ||
log_trx_results(trx->packed_trx(), nullptr, except_ptr, billed_cpu_time_us, fc::time_point::now(), trx->is_transient()); | ||
log_trx_results(trx->packed_trx(), nullptr, except_ptr, billed_cpu_time_us, trx->is_transient()); | ||
} | ||
|
||
void producer_plugin_impl::log_trx_results(const packed_transaction_ptr& trx, | ||
const transaction_trace_ptr& trace, | ||
const fc::exception_ptr& except_ptr, | ||
uint32_t billed_cpu_us, | ||
const fc::time_point& start, | ||
bool is_transient) { | ||
chain::controller& chain = chain_plug->chain(); | ||
|
||
|
@@ -2226,7 +2221,7 @@ producer_plugin_impl::handle_push_result(const transaction_metadata_ptr& | |
if (!disable_subjective_enforcement) // subjectively bill failure when producing since not in objective cpu account billing | ||
subjective_bill.subjective_bill_failure(first_auth, trace->elapsed, fc::time_point::now()); | ||
|
||
log_trx_results(trx, trace, start); | ||
log_trx_results(trx, trace); | ||
// this failed our configured maximum transaction time, we don't want to replay it | ||
fc_tlog(_log, "Failed ${c} trx, auth: ${a}, prev billed: ${p}us, ran: ${r}us, id: ${id}, except: ${e}", | ||
("c", e.code())("a", first_auth)("p", prev_billed_cpu_time_us)("r", end - start)("id", trx->id())("e", e)); | ||
|
@@ -2245,7 +2240,7 @@ producer_plugin_impl::handle_push_result(const transaction_metadata_ptr& | |
} else { | ||
fc_tlog(_log, "Subjective bill for success ${a}: ${b} elapsed ${t}us, time ${r}us", | ||
("a", first_auth)("b", sub_bill)("t", trace->elapsed)("r", end - start)); | ||
log_trx_results(trx, trace, start); | ||
log_trx_results(trx, trace); | ||
// if producing then trx is in objective cpu account billing | ||
if (!disable_subjective_enforcement && !in_producing_mode()) { | ||
subjective_bill.subjective_bill(trx->id(), trx->packed_trx()->expiration(), first_auth, trace->elapsed); | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This section of code is deep nested and becomes complicated. Can more comments be added and/or it be refactored a bit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleaned it up some.