diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 789aaaf14b..3185a13d24 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -810,7 +810,7 @@ class producer_plugin_impl : public std::enable_shared_from_this 0, unsupported_feature, "read-only transactions execution not enabled on API node. Set read-only-threads > 0" ); - // Post all read only trxs to read_only queue for execution. + // Post all read only trxs to read_exclusive queue for execution. auto trx_metadata = transaction_metadata::create_no_recover_keys(trx, transaction_metadata::trx_type::read_only); app().executor().post(priority::low, exec_queue::read_exclusive, [this, trx{std::move(trx_metadata)}, next{std::move(next)}]() mutable { push_read_only_transaction(std::move(trx), std::move(next)); @@ -1276,6 +1276,7 @@ void producer_plugin_impl::plugin_initialize(const boost::program_options::varia ilog("read-only-write-window-time-us: ${ww} us, read-only-read-window-time-us: ${rw} us, effective read window time to be used: ${w} us", ("ww", _ro_write_window_time_us)("rw", _ro_read_window_time_us)("w", _ro_read_window_effective_time_us)); } + app().executor().init_read_threads(_ro_thread_pool_size); // Make sure _ro_max_trx_time_us is always set. // Make sure a read-only transaction can finish within the read @@ -2853,8 +2854,7 @@ void producer_plugin_impl::switch_to_read_window() { uint32_t pending_block_num = chain.head_block_num() + 1; _ro_read_window_start_time = fc::time_point::now(); _ro_window_deadline = _ro_read_window_start_time + _ro_read_window_effective_time_us; - app().executor().set_to_read_window( - _ro_thread_pool_size, [received_block = &_received_block, pending_block_num, ro_window_deadline = _ro_window_deadline]() { + app().executor().set_to_read_window([received_block = &_received_block, pending_block_num, ro_window_deadline = _ro_window_deadline]() { return fc::time_point::now() >= ro_window_deadline || (received_block->load() >= pending_block_num); // should_exit() }); chain.set_to_read_window(); @@ -2910,7 +2910,7 @@ bool producer_plugin_impl::read_only_execution_task(uint32_t pending_block_num) // will be executed from the main app thread because all read-only threads are idle now self->switch_to_write_window(); }); - // last thread post any exhausted back into read_only queue with slightly higher priority (low+1) so they are executed first + // last thread post any exhausted back into read_exclusive queue with slightly higher priority (low+1) so they are executed first ro_trx_t t; while (_ro_exhausted_trx_queue.pop_front(t)) { app().executor().post(priority::low + 1, exec_queue::read_exclusive, [this, trx{std::move(t.trx)}, next{std::move(t.next)}]() mutable { @@ -2928,7 +2928,7 @@ void producer_plugin_impl::repost_exhausted_transactions(const fc::time_point& d if (!_ro_exhausted_trx_queue.empty()) { chain::controller& chain = chain_plug->chain(); uint32_t pending_block_num = chain.pending_block_num(); - // post any exhausted back into read_only queue with slightly higher priority (low+1) so they are executed first + // post any exhausted back into read_exclusive queue with slightly higher priority (low+1) so they are executed first ro_trx_t t; while (!should_interrupt_start_block(deadline, pending_block_num) && _ro_exhausted_trx_queue.pop_front(t)) { app().executor().post(priority::low + 1, exec_queue::read_exclusive, [this, trx{std::move(t.trx)}, next{std::move(t.next)}]() mutable {