Skip to content

Commit

Permalink
GH-1639 Modify exec_pri_queue to manage the 3 priority queues instead…
Browse files Browse the repository at this point in the history
… of three_queue_executor.

Simplifies logic fixes issues with previous approach.
Also add some poll() calls to make sure queues have latest tasks to execute.
  • Loading branch information
heifner committed Sep 20, 2023
1 parent e6b8493 commit 8d221a7
Show file tree
Hide file tree
Showing 5 changed files with 326 additions and 183 deletions.
91 changes: 25 additions & 66 deletions libraries/custom_appbase/include/eosio/chain/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,7 @@ enum class exec_window {
// the main app thread is active.
};

enum class exec_queue {
read_only, // the queue storing tasks which are safe to execute
// in parallel with other read-only & read_exclusive tasks in the read-only
// thread pool as well as on the main app thread.
// Multi-thread safe as long as nothing is executed from the read_write queue.
read_write, // the queue storing tasks which can be only executed
// on the app thread while read-only tasks are
// not being executed in read-only threads. Single threaded.
read_exclusive // the queue storing tasks which should only be executed
// in parallel with other read_exclusive or read_only tasks in the
// read-only thread pool. Should never be executed on the main thread.
// If no read-only thread pool is available this queue grows unbounded
// as tasks will never execute. User is responsible for not queueing
// read_exclusive tasks if no read-only thread pool is available.
};

class three_queue_executor {
class priority_queue_executor {
public:

// Trade off on returning to appbase exec() loop as the overhead of poll/run can be measurable for small running tasks.
Expand All @@ -47,43 +31,32 @@ class three_queue_executor {

template <typename Func>
auto post( int priority, exec_queue q, Func&& func ) {
if ( q == exec_queue::read_write )
return boost::asio::post(io_serv_, read_write_queue_.wrap(priority, --order_, std::forward<Func>(func)));
else if ( q == exec_queue::read_only )
return boost::asio::post( io_serv_, read_only_queue_.wrap( priority, --order_, std::forward<Func>( func)));
else
return boost::asio::post( io_serv_, read_exclusive_queue_.wrap( priority, --order_, std::forward<Func>( func)));
return boost::asio::post( io_serv_, pri_queue_.wrap( priority, q, --order_, std::forward<Func>(func)));
}

// Legacy and deprecated. To be removed after cleaning up its uses in base appbase
template <typename Func>
auto post( int priority, Func&& func ) {
// safer to use read_write queue for unknown type of operation since operations
// from read_write queue are not executed in parallel with read-only operations
return boost::asio::post(io_serv_, read_write_queue_.wrap(priority, --order_, std::forward<Func>(func)));
return boost::asio::post(io_serv_, pri_queue_.wrap(priority, exec_queue::read_write, --order_, std::forward<Func>(func)));
}

boost::asio::io_service& get_io_service() { return io_serv_; }

// called from main thread
// called from main thread, highest read_only and read_write
bool execute_highest() {
// execute for at least minimum runtime
const auto end = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(minimum_runtime_ms);

bool more = false;
while (true) {
if ( exec_window_ == exec_window::write ) {
// During write window only main thread is accessing anything in two_queue_executor, no locking required
if( !read_write_queue_.empty() && (read_only_queue_.empty() || *read_only_queue_.top() < *read_write_queue_.top()) ) {
// read_write_queue_'s top function's priority greater than read_only_queue_'s top function's, or read_only_queue_ empty
read_write_queue_.execute_highest();
} else if( !read_only_queue_.empty() ) {
read_only_queue_.execute_highest();
}
more = !read_only_queue_.empty() || !read_write_queue_.empty();
// During write window only main thread is accessing anything in priority_queue_executor, no locking required
more = pri_queue_.execute_highest(exec_queue::read_write, exec_queue::read_only);
} else {
// When in read window, multiple threads including main app thread are accessing two_queue_executor, locking required
more = read_only_queue_.execute_highest_locked(false);
// When in read window, multiple threads including main app thread are accessing priority_queue_executor, locking required
more = pri_queue_.execute_highest_locked(exec_queue::read_only);
}
if (!more || std::chrono::high_resolution_clock::now() > end)
break;
Expand All @@ -97,14 +70,8 @@ class three_queue_executor {

bool more = false;
while (true) {
std::optional<bool> exec_read_only_queue = read_only_queue_.compare_queues_locked(read_exclusive_queue_);
if (!exec_read_only_queue) {
more = read_only_queue_.execute_highest_locked( true );
} else if (*exec_read_only_queue) {
more = read_only_queue_.execute_highest_locked( false );
} else {
more = read_exclusive_queue_.execute_highest_locked( false );
}
get_io_service().poll(); // schedule any queued
more = pri_queue_.execute_highest_locked(exec_queue::read_only, exec_queue::read_exclusive, true);
if (!more || std::chrono::high_resolution_clock::now() > end)
break;
}
Expand All @@ -114,34 +81,25 @@ class three_queue_executor {
template <typename Function>
boost::asio::executor_binder<Function, appbase::exec_pri_queue::executor>
wrap(int priority, exec_queue q, Function&& func ) {
if ( q == exec_queue::read_write )
return read_write_queue_.wrap(priority, --order_, std::forward<Function>(func));
else
return read_only_queue_.wrap( priority, --order_, std::forward<Function>( func));
return pri_queue_.wrap(priority, q, --order_, std::forward<Function>(func));
}

void stop() {
read_only_queue_.stop();
read_write_queue_.stop();
read_exclusive_queue_.stop();
pri_queue_.stop();
}

void clear() {
read_only_queue_.clear();
read_write_queue_.clear();
read_exclusive_queue_.clear();
pri_queue_.clear();
}

void set_to_read_window(uint32_t num_threads, std::function<bool()> should_exit) {
exec_window_ = exec_window::read;
read_only_queue_.enable_locking(num_threads, should_exit);
read_exclusive_queue_.enable_locking(num_threads, std::move(should_exit));
pri_queue_.enable_locking(num_threads, std::move(should_exit));
}

void set_to_write_window() {
exec_window_ = exec_window::write;
read_only_queue_.disable_locking();
read_exclusive_queue_.disable_locking();
pri_queue_.disable_locking();
}

bool is_read_window() const {
Expand All @@ -152,21 +110,22 @@ class three_queue_executor {
return exec_window_ == exec_window::write;
}

auto& read_only_queue() { return read_only_queue_; }
auto& read_write_queue() { return read_write_queue_; }
auto& read_exclusive_queue() { return read_exclusive_queue_; }
size_t read_only_queue_size() { return pri_queue_.size(exec_queue::read_only); }
size_t read_write_queue_size() { return pri_queue_.size(exec_queue::read_write); }
size_t read_exclusive_queue_size() { return pri_queue_.size(exec_queue::read_exclusive); }
bool read_only_queue_empty() { return pri_queue_.empty(exec_queue::read_only); }
bool read_write_queue_empty() { return pri_queue_.empty(exec_queue::read_write); }
bool read_exclusive_queue_empty() { return pri_queue_.empty(exec_queue::read_exclusive); }

// members are ordered taking into account that the last one is destructed first
private:
boost::asio::io_service io_serv_;
appbase::exec_pri_queue read_only_queue_;
appbase::exec_pri_queue read_write_queue_;
appbase::exec_pri_queue read_exclusive_queue_;
std::atomic<std::size_t> order_ { std::numeric_limits<size_t>::max() }; // to maintain FIFO ordering in both queues within priority
exec_window exec_window_ { exec_window::write };
appbase::exec_pri_queue pri_queue_;
std::atomic<std::size_t> order_{ std::numeric_limits<size_t>::max() }; // to maintain FIFO ordering in all queues within priority
exec_window exec_window_{ exec_window::write };
};

using application = application_t<three_queue_executor>;
using application = application_t<priority_queue_executor>;
}

#include <appbase/application_instance.hpp>
Loading

0 comments on commit 8d221a7

Please sign in to comment.