Skip to content

Commit

Permalink
GH-1639 Add init_read_threads
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Sep 22, 2023
1 parent b077dc3 commit b012cce
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
EOS_ASSERT( _ro_thread_pool_size > 0, unsupported_feature,
"read-only transactions execution not enabled on API node. Set read-only-threads > 0" );

// Post all read only trxs to read_only queue for execution.
// Post all read only trxs to read_exclusive queue for execution.
auto trx_metadata = transaction_metadata::create_no_recover_keys(trx, transaction_metadata::trx_type::read_only);
app().executor().post(priority::low, exec_queue::read_exclusive, [this, trx{std::move(trx_metadata)}, next{std::move(next)}]() mutable {
push_read_only_transaction(std::move(trx), std::move(next));
Expand Down Expand Up @@ -1276,6 +1276,7 @@ void producer_plugin_impl::plugin_initialize(const boost::program_options::varia
ilog("read-only-write-window-time-us: ${ww} us, read-only-read-window-time-us: ${rw} us, effective read window time to be used: ${w} us",
("ww", _ro_write_window_time_us)("rw", _ro_read_window_time_us)("w", _ro_read_window_effective_time_us));
}
app().executor().init_read_threads(_ro_thread_pool_size);

// Make sure _ro_max_trx_time_us is always set.
// Make sure a read-only transaction can finish within the read
Expand Down Expand Up @@ -2853,8 +2854,7 @@ void producer_plugin_impl::switch_to_read_window() {
uint32_t pending_block_num = chain.head_block_num() + 1;
_ro_read_window_start_time = fc::time_point::now();
_ro_window_deadline = _ro_read_window_start_time + _ro_read_window_effective_time_us;
app().executor().set_to_read_window(
_ro_thread_pool_size, [received_block = &_received_block, pending_block_num, ro_window_deadline = _ro_window_deadline]() {
app().executor().set_to_read_window([received_block = &_received_block, pending_block_num, ro_window_deadline = _ro_window_deadline]() {
return fc::time_point::now() >= ro_window_deadline || (received_block->load() >= pending_block_num); // should_exit()
});
chain.set_to_read_window();
Expand Down Expand Up @@ -2910,7 +2910,7 @@ bool producer_plugin_impl::read_only_execution_task(uint32_t pending_block_num)
// will be executed from the main app thread because all read-only threads are idle now
self->switch_to_write_window();
});
// last thread post any exhausted back into read_only queue with slightly higher priority (low+1) so they are executed first
// last thread post any exhausted back into read_exclusive queue with slightly higher priority (low+1) so they are executed first
ro_trx_t t;
while (_ro_exhausted_trx_queue.pop_front(t)) {
app().executor().post(priority::low + 1, exec_queue::read_exclusive, [this, trx{std::move(t.trx)}, next{std::move(t.next)}]() mutable {
Expand All @@ -2928,7 +2928,7 @@ void producer_plugin_impl::repost_exhausted_transactions(const fc::time_point& d
if (!_ro_exhausted_trx_queue.empty()) {
chain::controller& chain = chain_plug->chain();
uint32_t pending_block_num = chain.pending_block_num();
// post any exhausted back into read_only queue with slightly higher priority (low+1) so they are executed first
// post any exhausted back into read_exclusive queue with slightly higher priority (low+1) so they are executed first
ro_trx_t t;
while (!should_interrupt_start_block(deadline, pending_block_num) && _ro_exhausted_trx_queue.pop_front(t)) {
app().executor().post(priority::low + 1, exec_queue::read_exclusive, [this, trx{std::move(t.trx)}, next{std::move(t.next)}]() mutable {
Expand Down

0 comments on commit b012cce

Please sign in to comment.