Skip to content

Commit

Permalink
GH-891 Integrate new received block into ro_trx_queue
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Mar 27, 2023
1 parent 728f9b3 commit 9e79e82
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
named_thread_pool<struct prod> _thread_pool;

std::atomic<int32_t> _max_transaction_time_ms; // modified by app thread, read by net_plugin thread pool
std::atomic<uint32_t> _received_block{false}; // modified by net_plugin thread pool
std::atomic<uint32_t> _received_block{0}; // modified by net_plugin thread pool
fc::microseconds _max_irreversible_block_age_us;
int32_t _produce_time_offset_us = 0;
int32_t _last_block_time_offset_us = 0;
Expand Down Expand Up @@ -486,20 +486,20 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
// - all threads would be idle
// - or the net_plugin received a block.
// - or we have reached the read_window_deadline
void set_exit_criteria(uint32_t num_tasks, std::atomic<bool>* received_block, fc::time_point deadline) {
void set_exit_criteria(uint32_t num_tasks, std::atomic<uint32_t>* received_block, uint32_t block_num, fc::time_point deadline) {
std::lock_guard<std::mutex> g( mtx ); // not strictly necessary with current usage from single thread
assert(num_tasks > 0 && num_waiting == 0 && received_block != nullptr);
assert(received_block && *received_block == false);
max_waiting = num_tasks;
num_waiting = 0;
received_block_ptr = received_block;
pending_block_num = block_num;
read_window_deadline = deadline;
exiting_read_window = false;
}

private:
bool should_exit() {
return exiting_read_window || fc::time_point::now() >= read_window_deadline || *received_block_ptr;
return exiting_read_window || fc::time_point::now() >= read_window_deadline || (*received_block_ptr >= pending_block_num);
}

mutable std::mutex mtx;
Expand All @@ -508,7 +508,8 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
uint32_t num_waiting{0};
uint32_t max_waiting{0};
bool exiting_read_window{false};
std::atomic<bool>* received_block_ptr{nullptr};
std::atomic<uint32_t>* received_block_ptr{nullptr};
uint32_t pending_block_num{0};
fc::time_point read_window_deadline;
};

Expand Down Expand Up @@ -2927,16 +2928,17 @@ void producer_plugin_impl::switch_to_read_window() {
return;
}

auto& chain = chain_plug->chain();
uint32_t pending_block_num = chain.head_block_num() + 1;
app().executor().set_to_read_window();
chain_plug->chain().set_db_read_only_mode();
_received_block = false;
chain.set_db_read_only_mode();
_ro_read_window_start_time = fc::time_point::now();
_ro_all_threads_exec_time_us = 0;

// start a read-only transaction execution task in each thread in the thread pool
_ro_num_active_trx_exec_tasks = _ro_thread_pool_size;
auto start_time = fc::time_point::now();
_ro_trx_queue.set_exit_criteria(_ro_thread_pool_size, &_received_block, start_time + _ro_read_window_effective_time_us);
_ro_trx_queue.set_exit_criteria(_ro_thread_pool_size, &_received_block, pending_block_num, start_time + _ro_read_window_effective_time_us);
for (auto i = 0; i < _ro_thread_pool_size; ++i ) {
_ro_trx_exec_tasks_fut.emplace_back( post_async_task( _ro_thread_pool.get_executor(), [this, start_time] () {
return read_only_trx_execution_task(start_time);
Expand Down

0 comments on commit 9e79e82

Please sign in to comment.