Skip to content

Commit

Permalink
Merge pull request #892 from AntelopeIO/gh-822-4.0
Browse files Browse the repository at this point in the history
[4.0] Cleanup and prevent thread starvation while in the read window.
  • Loading branch information
greg7mdp authored Mar 27, 2023
2 parents f07e688 + f331ff9 commit 8de2986
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,12 @@ class producer_plugin : public appbase::plugin<producer_plugin> {
// thread-safe, called when a new block is received
void received_block();

const std::set<account_name>& producer_accounts() const;
const std::set<account_name>& producer_accounts() const;

static void set_test_mode(bool m) { test_mode_ = m; }
private:
inline static bool test_mode_{false}; // to be moved into appbase (application_base)

std::shared_ptr<class producer_plugin_impl> my;
};

Expand Down
122 changes: 97 additions & 25 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,11 +431,87 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
packed_transaction_ptr trx;
next_func_t next;
};

// The queue storing read-only transactions to be executed by read-only threads
struct ro_trx_queue_t {
std::mutex mtx;
std::deque<ro_trx_t> queue;
class ro_trx_queue_t {
public:
void push_back(ro_trx_t&& trx) {
std::unique_lock<std::mutex> g( mtx );
queue.push_back(std::move(trx));
if (num_waiting)
cond.notify_one();
}

void push_front(ro_trx_t&& trx) {
std::unique_lock<std::mutex> g( mtx );
queue.push_front(std::move(trx));
if (num_waiting)
cond.notify_one();
}

bool empty() const {
std::lock_guard<std::mutex> g( mtx );
return queue.empty();
}

// may wait if the queue is empty, and not all other threads are already waiting.
// returns true if a transaction was dequeued and should be executed, or false
// if conditions are met to stop processing transactions.
bool pop_front(ro_trx_t& trx) {
std::unique_lock<std::mutex> g( mtx );

++num_waiting;
cond.wait(g, [this]() {
bool _should_exit = should_exit();
bool _queue_empty = queue.empty();
if (_queue_empty || _should_exit) {
if (((_queue_empty && num_waiting == max_waiting) || _should_exit) && !exiting_read_window) {
cond.notify_all();
exiting_read_window = true;
}
return _should_exit || exiting_read_window; // same as calling should_exit(), but faster
}
return true;
});
--num_waiting;
if (should_exit())
return false;

trx = std::move(queue.front());
queue.pop_front();
return true;
}

// We exit the read window when either:
// - all threads would be idle
// - or the net_plugin received a block.
// - or we have reached the read_window_deadline
void set_exit_criteria(uint32_t num_tasks, std::atomic<bool>* received_block, fc::time_point deadline) {
std::lock_guard<std::mutex> g( mtx ); // not strictly necessary with current usage from single thread
assert(num_tasks > 0 && num_waiting == 0 && received_block != nullptr);
assert(received_block && *received_block == false);
max_waiting = num_tasks;
num_waiting = 0;
received_block_ptr = received_block;
read_window_deadline = deadline;
exiting_read_window = false;
}

private:
bool should_exit() {
return exiting_read_window || fc::time_point::now() >= read_window_deadline || *received_block_ptr;
}

mutable std::mutex mtx;
std::condition_variable cond;
deque<ro_trx_t> queue;
uint32_t num_waiting{0};
uint32_t max_waiting{0};
bool exiting_read_window{false};
std::atomic<bool>* received_block_ptr{nullptr};
fc::time_point read_window_deadline;
};

uint16_t _ro_thread_pool_size{ 0 };
static constexpr uint16_t _ro_max_eos_vm_oc_threads_allowed{ 8 }; // Due to uncertainty to get total virtual memory size on a 5-level paging system, set a hard limit
named_thread_pool<struct read> _ro_thread_pool;
Expand All @@ -455,7 +531,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
void start_write_window();
void switch_to_write_window();
void switch_to_read_window();
bool read_only_trx_execution_task();
bool read_only_trx_execution_task(fc::time_point start);
bool process_read_only_transaction(const packed_transaction_ptr& trx,
const next_function<transaction_trace_ptr>& next,
const fc::time_point& read_window_start_time);
Expand Down Expand Up @@ -659,8 +735,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
// Parallel read-only trx execution enabled.
// Store the transaction in read-only-trx queue so that it is
// executed in read window
std::lock_guard<std::mutex> g( _ro_trx_queue.mtx );
_ro_trx_queue.queue.push_back({trx, std::move(next)});
_ro_trx_queue.push_back({trx, std::move(next)});
return;
}

Expand Down Expand Up @@ -1093,8 +1168,9 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_
my->_ro_thread_pool_size = options.at( "read-only-threads" ).as<uint16_t>();
// only initialize other read-only options when read-only thread pool is enabled
if ( my->_ro_thread_pool_size > 0 ) {
EOS_ASSERT( my->_producers.empty(), plugin_config_exception, "--read-only-threads not allowed on producer node" );

if (!test_mode_)
EOS_ASSERT( my->_producers.empty(), plugin_config_exception, "--read-only-threads not allowed on producer node" );

#ifdef EOSIO_EOS_VM_OC_RUNTIME_ENABLED
if (chain.is_eos_vm_oc_enabled()) {
// EOS VM OC requires 4.2TB Virtual for each executing thread. Make sure the memory
Expand Down Expand Up @@ -2840,7 +2916,7 @@ void producer_plugin_impl::switch_to_read_window() {

// we are in write window, so no read-only trx threads are processing transactions.
// _ro_trx_queue is not being accessed. No need to lock.
if ( _ro_trx_queue.queue.empty() ) { // no read-only trxs to process. stay in write window
if ( _ro_trx_queue.empty() ) { // no read-only trxs to process. stay in write window
start_write_window(); // restart write window timer for next round
return;
}
Expand All @@ -2853,9 +2929,11 @@ void producer_plugin_impl::switch_to_read_window() {

// start a read-only transaction execution task in each thread in the thread pool
_ro_num_active_trx_exec_tasks = _ro_thread_pool_size;
auto start_time = fc::time_point::now();
_ro_trx_queue.set_exit_criteria(_ro_thread_pool_size, &_received_block, start_time + _ro_read_window_effective_time_us);
for (auto i = 0; i < _ro_thread_pool_size; ++i ) {
_ro_trx_exec_tasks_fut.emplace_back( post_async_task( _ro_thread_pool.get_executor(), [self = this] () {
return self->read_only_trx_execution_task();
_ro_trx_exec_tasks_fut.emplace_back( post_async_task( _ro_thread_pool.get_executor(), [this, start_time] () {
return read_only_trx_execution_task(start_time);
}) );
}

Expand All @@ -2880,27 +2958,21 @@ void producer_plugin_impl::switch_to_read_window() {
}

// Called from a read only trx thread. Run in parallel with app and other read only trx threads
bool producer_plugin_impl::read_only_trx_execution_task() {
auto start = fc::time_point::now();
auto read_window_deadline = start + _ro_read_window_effective_time_us;
bool producer_plugin_impl::read_only_trx_execution_task(fc::time_point start) {
// We have 4 ways to break out the while loop:
// 1. pass read window deadline
// 2. Net_plugin receives a block
// 3. No more transactions in the read-only trx queue
// 4. A transaction execution is exhaused
while ( fc::time_point::now() < read_window_deadline && !_received_block ) {
std::unique_lock<std::mutex> lck( _ro_trx_queue.mtx );
if ( _ro_trx_queue.queue.empty() ) {
break;
}
auto trx = _ro_trx_queue.queue.front();
_ro_trx_queue.queue.pop_front();
lck.unlock();

ro_trx_t trx;
while ( _ro_trx_queue.pop_front(trx) ) {
// If the queue is empty, pop_front() waits on condition variable, and returns false
// when and only when all tasks must exit (i.e queue is empty and all tasks are idle, or
// we have reached the end of the read window, or net plugin received a block)

auto retry = process_read_only_transaction( trx.trx, trx.next, start );
if ( retry ) {
lck.lock();
_ro_trx_queue.queue.push_front(trx);
_ro_trx_queue.push_front(std::move(trx));
// Do not schedule new execution
break;
}
Expand Down
24 changes: 24 additions & 0 deletions plugins/producer_plugin/test/test_read_only_trx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ void test_trxs_common(std::vector<const char*>& specific_args) {

auto[prod_plug, chain_plug] = plugin_fut.get();
auto chain_id = chain_plug->get_chain_id();
prod_plug->set_test_mode(true);

std::atomic<size_t> next_calls = 0;
std::atomic<size_t> num_posts = 0;
Expand Down Expand Up @@ -150,4 +151,27 @@ BOOST_AUTO_TEST_CASE(no_read_only_threads) {
test_trxs_common(specific_args);
}

// test read-only trxs on 1 threads (with --read-only-threads)
BOOST_AUTO_TEST_CASE(with_1_read_only_threads) {
std::vector<const char*> specific_args = { "-p", "eosio", "-e",
"--read-only-threads=1",
"--max-transaction-time=10",
"--read-only-write-window-time-us=100000",
"--read-only-read-window-time-us=40000",
"--disable-subjective-billing=true" };
test_trxs_common(specific_args);
}

// test read-only trxs on 16 separate threads (with --read-only-threads)
BOOST_AUTO_TEST_CASE(with_16_read_only_threads) {
std::vector<const char*> specific_args = { "-p", "eosio", "-e",
"--read-only-threads=16",
"--max-transaction-time=10",
"--read-only-write-window-time-us=100000",
"--read-only-read-window-time-us=40000",
"--disable-subjective-billing=true" };
test_trxs_common(specific_args);
}


BOOST_AUTO_TEST_SUITE_END()

0 comments on commit 8de2986

Please sign in to comment.