Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4.0 -> main] Cleanup and prevent thread starvation while in the read window. #904

Merged
merged 24 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
72ea655
Cleanup and prevent thread starvation while in the read window.
greg7mdp Mar 17, 2023
81939db
Forgot one notify_one call.
greg7mdp Mar 17, 2023
7b22be3
Update comment
greg7mdp Mar 17, 2023
2a96531
Consistent exit when read_window_deadline is reached, and encapsulate…
greg7mdp Mar 20, 2023
033ca04
Small cleanups.
greg7mdp Mar 20, 2023
f7822e3
Small cleanups and comments.
greg7mdp Mar 20, 2023
8289563
Add two assert() checks.
greg7mdp Mar 20, 2023
0688dc7
Fix a couple issues - now tests pass!
greg7mdp Mar 21, 2023
da8a507
Add C++ test exercizing readonly transactions in read window.
greg7mdp Mar 21, 2023
77e96f8
simplify capture list
greg7mdp Mar 21, 2023
aad1729
No need to check for option's presence since it has a default value
greg7mdp Mar 21, 2023
5a76c1b
Deal with possible spurious wakeups from wait().
greg7mdp Mar 21, 2023
d75db84
Notify condition_variable under the lock.
greg7mdp Mar 21, 2023
30fc086
Implement code review comments.
greg7mdp Mar 22, 2023
ac61ecd
Add extra test with 1 readonly thread
greg7mdp Mar 22, 2023
1ca223c
Address PR comments
greg7mdp Mar 22, 2023
4379207
Replace option to turn off checking with a static variable.
greg7mdp Mar 22, 2023
12c9cdf
Fix test issue (uninitialized variables)
greg7mdp Mar 24, 2023
e4c138d
simplify cond wait - greatly inspired from Kevin's branch
greg7mdp Mar 24, 2023
bda60b5
Merge branch 'release/4.0' into gh-822-4.0
greg7mdp Mar 24, 2023
e05b737
Fix hang occuring sometimes in test
greg7mdp Mar 24, 2023
f331ff9
Merge branch 'gh-822-4.0' of github.com:AntelopeIO/leap into gh-822-4.0
greg7mdp Mar 24, 2023
8de2986
Merge pull request #892 from AntelopeIO/gh-822-4.0
greg7mdp Mar 27, 2023
90b1e9e
Merge remote-tracking branch 'origin/release/4.0' into gh_822_main
greg7mdp Mar 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,12 @@ class producer_plugin : public appbase::plugin<producer_plugin> {
// thread-safe, called when a new block is received
void received_block();

const std::set<account_name>& producer_accounts() const;
const std::set<account_name>& producer_accounts() const;

static void set_test_mode(bool m) { test_mode_ = m; }
private:
inline static bool test_mode_{false}; // to be moved into appbase (application_base)

std::shared_ptr<class producer_plugin_impl> my;
};

Expand Down
122 changes: 97 additions & 25 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,11 +431,87 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
packed_transaction_ptr trx;
next_func_t next;
};

// The queue storing read-only transactions to be executed by read-only threads
struct ro_trx_queue_t {
std::mutex mtx;
std::deque<ro_trx_t> queue;
class ro_trx_queue_t {
public:
void push_back(ro_trx_t&& trx) {
std::unique_lock<std::mutex> g( mtx );
queue.push_back(std::move(trx));
if (num_waiting)
cond.notify_one();
}

void push_front(ro_trx_t&& trx) {
std::unique_lock<std::mutex> g( mtx );
queue.push_front(std::move(trx));
if (num_waiting)
cond.notify_one();
}

bool empty() const {
std::lock_guard<std::mutex> g( mtx );
return queue.empty();
}

// may wait if the queue is empty, and not all other threads are already waiting.
// returns true if a transaction was dequeued and should be executed, or false
// if conditions are met to stop processing transactions.
bool pop_front(ro_trx_t& trx) {
std::unique_lock<std::mutex> g( mtx );

++num_waiting;
cond.wait(g, [this]() {
bool _should_exit = should_exit();
bool _queue_empty = queue.empty();
if (_queue_empty || _should_exit) {
if (((_queue_empty && num_waiting == max_waiting) || _should_exit) && !exiting_read_window) {
cond.notify_all();
exiting_read_window = true;
}
return _should_exit || exiting_read_window; // same as calling should_exit(), but faster
}
return true;
});
--num_waiting;
if (should_exit())
return false;

trx = std::move(queue.front());
queue.pop_front();
return true;
}

// We exit the read window when either:
// - all threads would be idle
// - or the net_plugin received a block.
// - or we have reached the read_window_deadline
void set_exit_criteria(uint32_t num_tasks, std::atomic<bool>* received_block, fc::time_point deadline) {
std::lock_guard<std::mutex> g( mtx ); // not strictly necessary with current usage from single thread
assert(num_tasks > 0 && num_waiting == 0 && received_block != nullptr);
assert(received_block && *received_block == false);
max_waiting = num_tasks;
num_waiting = 0;
received_block_ptr = received_block;
read_window_deadline = deadline;
exiting_read_window = false;
}

private:
bool should_exit() {
return exiting_read_window || fc::time_point::now() >= read_window_deadline || *received_block_ptr;
}

mutable std::mutex mtx;
std::condition_variable cond;
deque<ro_trx_t> queue;
uint32_t num_waiting{0};
uint32_t max_waiting{0};
bool exiting_read_window{false};
std::atomic<bool>* received_block_ptr{nullptr};
fc::time_point read_window_deadline;
};

uint16_t _ro_thread_pool_size{ 0 };
static constexpr uint16_t _ro_max_eos_vm_oc_threads_allowed{ 8 }; // Due to uncertainty to get total virtual memory size on a 5-level paging system, set a hard limit
named_thread_pool<struct read> _ro_thread_pool;
Expand All @@ -455,7 +531,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
void start_write_window();
void switch_to_write_window();
void switch_to_read_window();
bool read_only_trx_execution_task();
bool read_only_trx_execution_task(fc::time_point start);
bool process_read_only_transaction(const packed_transaction_ptr& trx,
const next_function<transaction_trace_ptr>& next,
const fc::time_point& read_window_start_time);
Expand Down Expand Up @@ -659,8 +735,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
// Parallel read-only trx execution enabled.
// Store the transaction in read-only-trx queue so that it is
// executed in read window
std::lock_guard<std::mutex> g( _ro_trx_queue.mtx );
_ro_trx_queue.queue.push_back({trx, std::move(next)});
_ro_trx_queue.push_back({trx, std::move(next)});
return;
}

Expand Down Expand Up @@ -1093,8 +1168,9 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_
my->_ro_thread_pool_size = options.at( "read-only-threads" ).as<uint16_t>();
// only initialize other read-only options when read-only thread pool is enabled
if ( my->_ro_thread_pool_size > 0 ) {
EOS_ASSERT( my->_producers.empty(), plugin_config_exception, "--read-only-threads not allowed on producer node" );

if (!test_mode_)
EOS_ASSERT( my->_producers.empty(), plugin_config_exception, "--read-only-threads not allowed on producer node" );

#ifdef EOSIO_EOS_VM_OC_RUNTIME_ENABLED
if (chain.is_eos_vm_oc_enabled()) {
// EOS VM OC requires 4.2TB Virtual for each executing thread. Make sure the memory
Expand Down Expand Up @@ -2840,7 +2916,7 @@ void producer_plugin_impl::switch_to_read_window() {

// we are in write window, so no read-only trx threads are processing transactions.
// _ro_trx_queue is not being accessed. No need to lock.
if ( _ro_trx_queue.queue.empty() ) { // no read-only trxs to process. stay in write window
if ( _ro_trx_queue.empty() ) { // no read-only trxs to process. stay in write window
start_write_window(); // restart write window timer for next round
return;
}
Expand All @@ -2853,9 +2929,11 @@ void producer_plugin_impl::switch_to_read_window() {

// start a read-only transaction execution task in each thread in the thread pool
_ro_num_active_trx_exec_tasks = _ro_thread_pool_size;
auto start_time = fc::time_point::now();
_ro_trx_queue.set_exit_criteria(_ro_thread_pool_size, &_received_block, start_time + _ro_read_window_effective_time_us);
for (auto i = 0; i < _ro_thread_pool_size; ++i ) {
_ro_trx_exec_tasks_fut.emplace_back( post_async_task( _ro_thread_pool.get_executor(), [self = this] () {
return self->read_only_trx_execution_task();
_ro_trx_exec_tasks_fut.emplace_back( post_async_task( _ro_thread_pool.get_executor(), [this, start_time] () {
return read_only_trx_execution_task(start_time);
}) );
}

Expand All @@ -2880,27 +2958,21 @@ void producer_plugin_impl::switch_to_read_window() {
}

// Called from a read only trx thread. Run in parallel with app and other read only trx threads
bool producer_plugin_impl::read_only_trx_execution_task() {
auto start = fc::time_point::now();
auto read_window_deadline = start + _ro_read_window_effective_time_us;
bool producer_plugin_impl::read_only_trx_execution_task(fc::time_point start) {
// We have 4 ways to break out the while loop:
// 1. pass read window deadline
// 2. Net_plugin receives a block
// 3. No more transactions in the read-only trx queue
// 4. A transaction execution is exhaused
while ( fc::time_point::now() < read_window_deadline && !_received_block ) {
std::unique_lock<std::mutex> lck( _ro_trx_queue.mtx );
if ( _ro_trx_queue.queue.empty() ) {
break;
}
auto trx = _ro_trx_queue.queue.front();
_ro_trx_queue.queue.pop_front();
lck.unlock();

ro_trx_t trx;
while ( _ro_trx_queue.pop_front(trx) ) {
// If the queue is empty, pop_front() waits on condition variable, and returns false
// when and only when all tasks must exit (i.e queue is empty and all tasks are idle, or
// we have reached the end of the read window, or net plugin received a block)

auto retry = process_read_only_transaction( trx.trx, trx.next, start );
if ( retry ) {
lck.lock();
_ro_trx_queue.queue.push_front(trx);
_ro_trx_queue.push_front(std::move(trx));
// Do not schedule new execution
break;
}
Expand Down
24 changes: 24 additions & 0 deletions plugins/producer_plugin/test/test_read_only_trx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ void test_trxs_common(std::vector<const char*>& specific_args) {

auto[prod_plug, chain_plug] = plugin_fut.get();
auto chain_id = chain_plug->get_chain_id();
prod_plug->set_test_mode(true);

std::atomic<size_t> next_calls = 0;
std::atomic<size_t> num_posts = 0;
Expand Down Expand Up @@ -150,4 +151,27 @@ BOOST_AUTO_TEST_CASE(no_read_only_threads) {
test_trxs_common(specific_args);
}

// test read-only trxs on 1 threads (with --read-only-threads)
BOOST_AUTO_TEST_CASE(with_1_read_only_threads) {
std::vector<const char*> specific_args = { "-p", "eosio", "-e",
"--read-only-threads=1",
"--max-transaction-time=10",
"--read-only-write-window-time-us=100000",
"--read-only-read-window-time-us=40000",
"--disable-subjective-billing=true" };
test_trxs_common(specific_args);
}

// test read-only trxs on 16 separate threads (with --read-only-threads)
BOOST_AUTO_TEST_CASE(with_16_read_only_threads) {
std::vector<const char*> specific_args = { "-p", "eosio", "-e",
"--read-only-threads=16",
"--max-transaction-time=10",
"--read-only-write-window-time-us=100000",
"--read-only-read-window-time-us=40000",
"--disable-subjective-billing=true" };
test_trxs_common(specific_args);
}


BOOST_AUTO_TEST_SUITE_END()