From 8d221a777c4649c5a19ba99948c61fbced25f99b Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 20 Sep 2023 10:21:29 -0500 Subject: [PATCH] GH-1639 Modify exec_pri_queue to manage the 3 priority queues instead of three_queue_executor. Simplifies logic fixes issues with previous approach. Also add some poll() calls to make sure queues have latest tasks to execute. --- .../include/eosio/chain/application.hpp | 91 ++----- .../include/eosio/chain/exec_pri_queue.hpp | 164 ++++++++---- libraries/custom_appbase/tests/CMakeLists.txt | 2 +- .../tests/custom_appbase_tests.cpp | 249 +++++++++++++----- plugins/producer_plugin/producer_plugin.cpp | 3 +- 5 files changed, 326 insertions(+), 183 deletions(-) diff --git a/libraries/custom_appbase/include/eosio/chain/application.hpp b/libraries/custom_appbase/include/eosio/chain/application.hpp index d4939fc418..e6fa906f68 100644 --- a/libraries/custom_appbase/include/eosio/chain/application.hpp +++ b/libraries/custom_appbase/include/eosio/chain/application.hpp @@ -22,23 +22,7 @@ enum class exec_window { // the main app thread is active. }; -enum class exec_queue { - read_only, // the queue storing tasks which are safe to execute - // in parallel with other read-only & read_exclusive tasks in the read-only - // thread pool as well as on the main app thread. - // Multi-thread safe as long as nothing is executed from the read_write queue. - read_write, // the queue storing tasks which can be only executed - // on the app thread while read-only tasks are - // not being executed in read-only threads. Single threaded. - read_exclusive // the queue storing tasks which should only be executed - // in parallel with other read_exclusive or read_only tasks in the - // read-only thread pool. Should never be executed on the main thread. - // If no read-only thread pool is available this queue grows unbounded - // as tasks will never execute. User is responsible for not queueing - // read_exclusive tasks if no read-only thread pool is available. -}; - -class three_queue_executor { +class priority_queue_executor { public: // Trade off on returning to appbase exec() loop as the overhead of poll/run can be measurable for small running tasks. @@ -47,12 +31,7 @@ class three_queue_executor { template auto post( int priority, exec_queue q, Func&& func ) { - if ( q == exec_queue::read_write ) - return boost::asio::post(io_serv_, read_write_queue_.wrap(priority, --order_, std::forward(func))); - else if ( q == exec_queue::read_only ) - return boost::asio::post( io_serv_, read_only_queue_.wrap( priority, --order_, std::forward( func))); - else - return boost::asio::post( io_serv_, read_exclusive_queue_.wrap( priority, --order_, std::forward( func))); + return boost::asio::post( io_serv_, pri_queue_.wrap( priority, q, --order_, std::forward(func))); } // Legacy and deprecated. To be removed after cleaning up its uses in base appbase @@ -60,12 +39,12 @@ class three_queue_executor { auto post( int priority, Func&& func ) { // safer to use read_write queue for unknown type of operation since operations // from read_write queue are not executed in parallel with read-only operations - return boost::asio::post(io_serv_, read_write_queue_.wrap(priority, --order_, std::forward(func))); + return boost::asio::post(io_serv_, pri_queue_.wrap(priority, exec_queue::read_write, --order_, std::forward(func))); } boost::asio::io_service& get_io_service() { return io_serv_; } - // called from main thread + // called from main thread, highest read_only and read_write bool execute_highest() { // execute for at least minimum runtime const auto end = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(minimum_runtime_ms); @@ -73,17 +52,11 @@ class three_queue_executor { bool more = false; while (true) { if ( exec_window_ == exec_window::write ) { - // During write window only main thread is accessing anything in two_queue_executor, no locking required - if( !read_write_queue_.empty() && (read_only_queue_.empty() || *read_only_queue_.top() < *read_write_queue_.top()) ) { - // read_write_queue_'s top function's priority greater than read_only_queue_'s top function's, or read_only_queue_ empty - read_write_queue_.execute_highest(); - } else if( !read_only_queue_.empty() ) { - read_only_queue_.execute_highest(); - } - more = !read_only_queue_.empty() || !read_write_queue_.empty(); + // During write window only main thread is accessing anything in priority_queue_executor, no locking required + more = pri_queue_.execute_highest(exec_queue::read_write, exec_queue::read_only); } else { - // When in read window, multiple threads including main app thread are accessing two_queue_executor, locking required - more = read_only_queue_.execute_highest_locked(false); + // When in read window, multiple threads including main app thread are accessing priority_queue_executor, locking required + more = pri_queue_.execute_highest_locked(exec_queue::read_only); } if (!more || std::chrono::high_resolution_clock::now() > end) break; @@ -97,14 +70,8 @@ class three_queue_executor { bool more = false; while (true) { - std::optional exec_read_only_queue = read_only_queue_.compare_queues_locked(read_exclusive_queue_); - if (!exec_read_only_queue) { - more = read_only_queue_.execute_highest_locked( true ); - } else if (*exec_read_only_queue) { - more = read_only_queue_.execute_highest_locked( false ); - } else { - more = read_exclusive_queue_.execute_highest_locked( false ); - } + get_io_service().poll(); // schedule any queued + more = pri_queue_.execute_highest_locked(exec_queue::read_only, exec_queue::read_exclusive, true); if (!more || std::chrono::high_resolution_clock::now() > end) break; } @@ -114,34 +81,25 @@ class three_queue_executor { template boost::asio::executor_binder wrap(int priority, exec_queue q, Function&& func ) { - if ( q == exec_queue::read_write ) - return read_write_queue_.wrap(priority, --order_, std::forward(func)); - else - return read_only_queue_.wrap( priority, --order_, std::forward( func)); + return pri_queue_.wrap(priority, q, --order_, std::forward(func)); } void stop() { - read_only_queue_.stop(); - read_write_queue_.stop(); - read_exclusive_queue_.stop(); + pri_queue_.stop(); } void clear() { - read_only_queue_.clear(); - read_write_queue_.clear(); - read_exclusive_queue_.clear(); + pri_queue_.clear(); } void set_to_read_window(uint32_t num_threads, std::function should_exit) { exec_window_ = exec_window::read; - read_only_queue_.enable_locking(num_threads, should_exit); - read_exclusive_queue_.enable_locking(num_threads, std::move(should_exit)); + pri_queue_.enable_locking(num_threads, std::move(should_exit)); } void set_to_write_window() { exec_window_ = exec_window::write; - read_only_queue_.disable_locking(); - read_exclusive_queue_.disable_locking(); + pri_queue_.disable_locking(); } bool is_read_window() const { @@ -152,21 +110,22 @@ class three_queue_executor { return exec_window_ == exec_window::write; } - auto& read_only_queue() { return read_only_queue_; } - auto& read_write_queue() { return read_write_queue_; } - auto& read_exclusive_queue() { return read_exclusive_queue_; } + size_t read_only_queue_size() { return pri_queue_.size(exec_queue::read_only); } + size_t read_write_queue_size() { return pri_queue_.size(exec_queue::read_write); } + size_t read_exclusive_queue_size() { return pri_queue_.size(exec_queue::read_exclusive); } + bool read_only_queue_empty() { return pri_queue_.empty(exec_queue::read_only); } + bool read_write_queue_empty() { return pri_queue_.empty(exec_queue::read_write); } + bool read_exclusive_queue_empty() { return pri_queue_.empty(exec_queue::read_exclusive); } // members are ordered taking into account that the last one is destructed first private: boost::asio::io_service io_serv_; - appbase::exec_pri_queue read_only_queue_; - appbase::exec_pri_queue read_write_queue_; - appbase::exec_pri_queue read_exclusive_queue_; - std::atomic order_ { std::numeric_limits::max() }; // to maintain FIFO ordering in both queues within priority - exec_window exec_window_ { exec_window::write }; + appbase::exec_pri_queue pri_queue_; + std::atomic order_{ std::numeric_limits::max() }; // to maintain FIFO ordering in all queues within priority + exec_window exec_window_{ exec_window::write }; }; -using application = application_t; +using application = application_t; } #include diff --git a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp index 679737d3a0..abec59e810 100644 --- a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp +++ b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp @@ -9,6 +9,22 @@ namespace appbase { // adapted from: https://www.boost.org/doc/libs/1_69_0/doc/html/boost_asio/example/cpp11/invocation/prioritised_handlers.cpp +enum class exec_queue { + read_only, // the queue storing tasks which are safe to execute + // in parallel with other read-only & read_exclusive tasks in the read-only + // thread pool as well as on the main app thread. + // Multi-thread safe as long as nothing is executed from the read_write queue. + read_write, // the queue storing tasks which can be only executed + // on the app thread while read-only tasks are + // not being executed in read-only threads. Single threaded. + read_exclusive // the queue storing tasks which should only be executed + // in parallel with other read_exclusive or read_only tasks in the + // read-only thread pool. Should never be executed on the main thread. + // If no read-only thread pool is available this queue grows unbounded + // as tasks will never execute. User is responsible for not queueing + // read_exclusive tasks if no read-only thread pool is available. +}; + // Locking has to be coordinated by caller, use with care. class exec_pri_queue : public boost::asio::execution_context { @@ -35,53 +51,74 @@ class exec_pri_queue : public boost::asio::execution_context // called from appbase::application_base::exec poll_one() or run_one() template - void add(int priority, size_t order, Function function) - { + void add(int priority, exec_queue q, size_t order, Function function) { + prio_queue& que = priority_que(q); std::unique_ptr handler(new queued_handler(priority, order, std::move(function))); if (lock_enabled_) { std::lock_guard g( mtx_ ); - handlers_.push( std::move( handler ) ); + que.push( std::move( handler ) ); if (num_waiting_) cond_.notify_one(); } else { - handlers_.push( std::move( handler ) ); + que.push( std::move( handler ) ); } } // only call when no lock required - void clear() - { - handlers_ = prio_queue(); + void clear() { + read_only_handlers_ = prio_queue(); + read_write_handlers_ = prio_queue(); + read_exclusive_handlers_ = prio_queue(); } // only call when no lock required - bool execute_highest() - { - if( !handlers_.empty() ) { - handlers_.top()->execute(); - handlers_.pop(); + bool execute_highest(exec_queue q) { + prio_queue& que = priority_que(q); + if( !que.empty() ) { + que.top()->execute(); + que.pop(); } - return !handlers_.empty(); + return !que.empty(); } -private: - // has to be defined before use, auto return type - auto pop() { - auto t = std::move(const_cast&>(handlers_.top())); - handlers_.pop(); - return t; + // only call when no lock required + bool execute_highest(exec_queue lhs, exec_queue rhs) { + prio_queue& lhs_que = priority_que(lhs); + prio_queue& rhs_que = priority_que(rhs); + size_t size = lhs_que.size() + rhs_que.size(); + if (size == 0) + return false; + exec_queue q = rhs; + if (!lhs_que.empty() && (rhs_que.empty() || *rhs_que.top() < *lhs_que.top())) + q = lhs; + prio_queue& que = priority_que(q); + que.top()->execute(); + que.pop(); + --size; + return size > 0; } -public: + bool execute_highest_locked(exec_queue q) { + prio_queue& que = priority_que(q); + std::unique_lock g(mtx_); + if (que.empty()) + return false; + auto t = pop(que); + g.unlock(); + t->execute(); + return true; + } - bool execute_highest_locked(bool should_block) { + bool execute_highest_locked(exec_queue lhs, exec_queue rhs, bool should_block) { + prio_queue& lhs_que = priority_que(lhs); + prio_queue& rhs_que = priority_que(rhs); std::unique_lock g(mtx_); if (should_block) { ++num_waiting_; - cond_.wait(g, [this](){ + cond_.wait(g, [&](){ bool exit = exiting_blocking_ || should_exit_(); - bool empty = handlers_.empty(); + bool empty = lhs_que.empty() && rhs_que.empty(); if (empty || exit) { if (((empty && num_waiting_ == max_waiting_) || exit) && !exiting_blocking_) { cond_.notify_all(); @@ -95,37 +132,32 @@ class exec_pri_queue : public boost::asio::execution_context if (exiting_blocking_ || should_exit_()) return false; } - if( handlers_.empty() ) + if (lhs_que.empty() && rhs_que.empty()) return false; - auto t = pop(); + exec_queue q = rhs; + if (!lhs_que.empty() && (rhs_que.empty() || *rhs_que.top() < *lhs_que.top())) + q = lhs; + auto t = pop(priority_que(q)); g.unlock(); t->execute(); return true; } // Only call when locking disabled - size_t size() const { return handlers_.size(); } + size_t size(exec_queue q) const { return priority_que(q).size(); } + size_t size() const { return read_only_handlers_.size() + read_write_handlers_.size() + read_exclusive_handlers_.size(); } // Only call when locking disabled - bool empty() const { return handlers_.empty(); } + bool empty(exec_queue q) const { return priority_que(q).empty(); } // Only call when locking disabled - const auto& top() const { return handlers_.top(); } - - // return empty optional if both queues empty. - // true if this queue has the highest priority task to execute - std::optional compare_queues_locked( const exec_pri_queue& rhs ) { - std::scoped_lock g(mtx_, rhs.mtx_); - if (empty() && rhs.empty()) - return {}; - return !empty() && (rhs.empty() || *rhs.top() <= *top()); - } + const auto& top(exec_queue q) const { return priority_que(q).top(); } class executor { public: - executor(exec_pri_queue& q, int p, size_t o) - : context_(q), priority_(p), order_(o) + executor(exec_pri_queue& q, int p, size_t o, exec_queue que) + : context_(q), que_(que), priority_(p), order_(o) { } @@ -137,19 +169,19 @@ class exec_pri_queue : public boost::asio::execution_context template void dispatch(Function f, const Allocator&) const { - context_.add(priority_, order_, std::move(f)); + context_.add(priority_, que_, order_, std::move(f)); } template void post(Function f, const Allocator&) const { - context_.add(priority_, order_, std::move(f)); + context_.add(priority_, que_, order_, std::move(f)); } template void defer(Function f, const Allocator&) const { - context_.add(priority_, order_, std::move(f)); + context_.add(priority_, que_, order_, std::move(f)); } void on_work_started() const noexcept {} @@ -157,7 +189,7 @@ class exec_pri_queue : public boost::asio::execution_context bool operator==(const executor& other) const noexcept { - return order_ == other.order_ && &context_ == &other.context_ && priority_ == other.priority_; + return order_ == other.order_ && priority_ == other.priority_ && que_ == other.que_ && &context_ == &other.context_; } bool operator!=(const executor& other) const noexcept @@ -167,15 +199,16 @@ class exec_pri_queue : public boost::asio::execution_context private: exec_pri_queue& context_; + exec_queue que_; int priority_; size_t order_; }; template boost::asio::executor_binder - wrap(int priority, size_t order, Function&& func) + wrap(int priority, exec_queue q, size_t order, Function&& func) { - return boost::asio::bind_executor( executor(*this, priority, order), std::forward(func) ); + return boost::asio::bind_executor( executor(*this, priority, order, q), std::forward(func) ); } private: @@ -201,11 +234,6 @@ class exec_pri_queue : public boost::asio::execution_context { return std::tie( a.priority_, a.order_ ) < std::tie( b.priority_, b.order_ ); } - friend bool operator<=(const queued_handler_base& a, - const queued_handler_base& b) noexcept - { - return std::tie( a.priority_, a.order_ ) <= std::tie( b.priority_, b.order_ ); - } private: int priority_; @@ -240,6 +268,37 @@ class exec_pri_queue : public boost::asio::execution_context } }; + using prio_queue = std::priority_queue, std::deque>, deref_less>; + + prio_queue& priority_que(exec_queue q) { + switch (q) { + case exec_queue::read_only: + return read_only_handlers_; + case exec_queue::read_write: + return read_write_handlers_; + case exec_queue::read_exclusive: + return read_exclusive_handlers_; + } + } + + const prio_queue& priority_que(exec_queue q) const { + switch (q) { + case exec_queue::read_only: + return read_only_handlers_; + case exec_queue::read_write: + return read_write_handlers_; + case exec_queue::read_exclusive: + return read_exclusive_handlers_; + } + } + + static std::unique_ptr pop(prio_queue& que) { + // work around std::priority_queue not having a pop() that returns value + auto t = std::move(const_cast&>(que.top())); + que.pop(); + return t; + } + bool lock_enabled_ = false; mutable std::mutex mtx_; std::condition_variable cond_; @@ -247,8 +306,9 @@ class exec_pri_queue : public boost::asio::execution_context uint32_t max_waiting_{0}; bool exiting_blocking_{false}; std::function should_exit_; // called holding mtx_ - using prio_queue = std::priority_queue, std::deque>, deref_less>; - prio_queue handlers_; + prio_queue read_only_handlers_; + prio_queue read_write_handlers_; + prio_queue read_exclusive_handlers_; }; } // appbase diff --git a/libraries/custom_appbase/tests/CMakeLists.txt b/libraries/custom_appbase/tests/CMakeLists.txt index 95f44b66d6..7df7d2a964 100644 --- a/libraries/custom_appbase/tests/CMakeLists.txt +++ b/libraries/custom_appbase/tests/CMakeLists.txt @@ -7,7 +7,7 @@ endif() file(GLOB UNIT_TESTS "*.cpp") add_executable( custom_appbase_test ${UNIT_TESTS} ) -target_link_libraries( custom_appbase_test appbase ${CMAKE_DL_LIBS} ${PLATFORM_SPECIFIC_LIBS} ) +target_link_libraries( custom_appbase_test appbase fc ${CMAKE_DL_LIBS} ${PLATFORM_SPECIFIC_LIBS} ) target_include_directories( custom_appbase_test PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include" "${CMAKE_CURRENT_SOURCE_DIR}/../../appbase/include" ) add_test( custom_appbase_test custom_appbase_test ) diff --git a/libraries/custom_appbase/tests/custom_appbase_tests.cpp b/libraries/custom_appbase/tests/custom_appbase_tests.cpp index 9ee5af40cd..97f2fcdc2b 100644 --- a/libraries/custom_appbase/tests/custom_appbase_tests.cpp +++ b/libraries/custom_appbase/tests/custom_appbase_tests.cpp @@ -1,10 +1,13 @@ #define BOOST_TEST_MODULE custom_appbase_tests #include -#include -#include #include +#include + +#include +#include + using namespace appbase; BOOST_AUTO_TEST_SUITE(custom_appbase_tests) @@ -19,7 +22,20 @@ std::thread start_app_thread(appbase::scoped_app& app) { return app_thread; } -// verify functions from both queues are executed when execution window is not explictly set +std::thread start_read_thread(appbase::scoped_app& app) { + static int num = 0; + std::thread read_thread( [&]() { + std::string name ="read-" + std::to_string(num++); + fc::set_thread_name(name); + bool more = true; + while (more) { + more = app->executor().execute_highest_read(); // blocks until all read only threads are idle + } + }); + return read_thread; +} + +// verify functions from both queues (read_only,read_write) are executed when execution window is not explicitly set BOOST_AUTO_TEST_CASE( default_exec_window ) { appbase::scoped_app app; auto app_thread = start_app_thread(app); @@ -27,28 +43,32 @@ BOOST_AUTO_TEST_CASE( default_exec_window ) { // post functions std::map rslts {}; int seq_num = 0; - app->executor().post( priority::medium, exec_queue::read_only, [&]() { rslts[0]=seq_num; ++seq_num; } ); - app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[1]=seq_num; ++seq_num; } ); - app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[2]=seq_num; ++seq_num; } ); - app->executor().post( priority::high, exec_queue::read_only, [&]() { rslts[3]=seq_num; ++seq_num; } ); - app->executor().post( priority::low, exec_queue::read_only, [&]() { rslts[4]=seq_num; ++seq_num; } ); - app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[5]=seq_num; ++seq_num; } ); - app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts[6]=seq_num; ++seq_num; } ); - app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[7]=seq_num; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_only, [&]() { rslts[0]=seq_num; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[1]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[2]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_only, [&]() { rslts[3]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_only, [&]() { rslts[4]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[5]=seq_num; ++seq_num; } ); + app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts[6]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[7]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_exclusive, [&]() { rslts[8]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_exclusive, [&]() { rslts[9]=seq_num; ++seq_num; } ); // Stop app. Use the lowest priority to make sure this function to execute the last app->executor().post( priority::lowest, exec_queue::read_only, [&]() { // read_only_queue should only contain the current lambda function, // and read_write_queue should have executed all its functions - BOOST_REQUIRE_EQUAL( app->executor().read_only_queue().size(), 1u); // pop()s after execute - BOOST_REQUIRE_EQUAL( app->executor().read_write_queue().size(), 0u ); + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 1u); // pop()s after execute + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 2u ); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 0u ); app->quit(); } ); app_thread.join(); - // both queues are cleared after execution - BOOST_REQUIRE_EQUAL( app->executor().read_only_queue().empty(), true); - BOOST_REQUIRE_EQUAL( app->executor().read_write_queue().empty(), true); + // all queues are cleared when exiting application::exec() + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_empty(), true); // exactly number of both queues' functions processed BOOST_REQUIRE_EQUAL( rslts.size(), 8u ); @@ -64,8 +84,8 @@ BOOST_AUTO_TEST_CASE( default_exec_window ) { BOOST_CHECK_LT( rslts[6], rslts[7] ); } -// verify functions only from read_only queue are processed during read window -BOOST_AUTO_TEST_CASE( execute_from_read_queue ) { +// verify functions only from read_only queue are processed during read window on the main thread +BOOST_AUTO_TEST_CASE( execute_from_read_only_queue ) { appbase::scoped_app app; auto app_thread = start_app_thread(app); @@ -75,46 +95,48 @@ BOOST_AUTO_TEST_CASE( execute_from_read_queue ) { // post functions std::map rslts {}; int seq_num = 0; - app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[0]=seq_num; ++seq_num; } ); - app->executor().post( priority::high, exec_queue::read_only, [&]() { rslts[1]=seq_num; ++seq_num; } ); - app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[2]=seq_num; ++seq_num; } ); - app->executor().post( priority::high, exec_queue::read_only, [&]() { rslts[3]=seq_num; ++seq_num; } ); - app->executor().post( priority::low, exec_queue::read_only, [&]() { rslts[4]=seq_num; ++seq_num; } ); - app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[5]=seq_num; ++seq_num; } ); - app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts[6]=seq_num; ++seq_num; } ); - app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts[7]=seq_num; ++seq_num; } ); - app->executor().post( priority::high, exec_queue::read_only, [&]() { rslts[8]=seq_num; ++seq_num; } ); - app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[9]=seq_num; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[0]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_only, [&]() { rslts[1]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[2]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_only, [&]() { rslts[3]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_only, [&]() { rslts[4]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[5]=seq_num; ++seq_num; } ); + app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts[6]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_exclusive, [&]() { rslts[7]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_exclusive, [&]() { rslts[8]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[9]=seq_num; ++seq_num; } ); // stop application. Use lowest at the end to make sure this executes the last app->executor().post( priority::lowest, exec_queue::read_only, [&]() { // read_queue should be empty (read window pops before execute) and write_queue should have all its functions - BOOST_REQUIRE_EQUAL( app->executor().read_only_queue().size(), 0u); // pop()s before execute - BOOST_REQUIRE_EQUAL( app->executor().read_write_queue().size(), 4u ); + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 0u); // pop()s before execute + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 2u); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 4u ); app->quit(); } ); app_thread.join(); - // both queues are cleared after execution - BOOST_REQUIRE_EQUAL( app->executor().read_only_queue().empty(), true); - BOOST_REQUIRE_EQUAL( app->executor().read_write_queue().empty(), true); + // all queues are cleared when exiting application::exec() + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_empty(), true); // exactly number of posts processed - BOOST_REQUIRE_EQUAL( rslts.size(), 6u ); + BOOST_REQUIRE_EQUAL( rslts.size(), 4u ); - // same priority (high) of functions in read_queue executed by the post order + // same priority (high) of functions in read queues executed by the post order BOOST_CHECK_LT( rslts[1], rslts[3] ); - // higher priority posted earlier in read_queue executed earlier + // higher priority posted earlier in read queues executed earlier BOOST_CHECK_LT( rslts[3], rslts[4] ); } -// verify no functions are executed during read window if read_only queue is empty -BOOST_AUTO_TEST_CASE( execute_from_empty_read_queue ) { +// verify no functions are executed during read window if read_only & read_exclusive queue is empty +BOOST_AUTO_TEST_CASE( execute_from_empty_read_only_queue ) { appbase::scoped_app app; auto app_thread = start_app_thread(app); - // set to run functions from read_only queue only + // set to run functions from read_only & read_exclusive queues only app->executor().set_to_read_window(1, [](){return false;}); // post functions @@ -134,22 +156,24 @@ BOOST_AUTO_TEST_CASE( execute_from_empty_read_queue ) { // Stop application. Use lowest at the end to make sure this executes the last app->executor().post( priority::lowest, exec_queue::read_only, [&]() { // read_queue should be empty (read window pops before execute) and write_queue should have all its functions - BOOST_REQUIRE_EQUAL( app->executor().read_only_queue().size(), 0u); // pop()s before execute - BOOST_REQUIRE_EQUAL( app->executor().read_write_queue().size(), 10u ); + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 0u); // pop()s before execute + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 0u); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 10u ); app->quit(); } ); app_thread.join(); - // both queues are cleared after execution - BOOST_REQUIRE_EQUAL( app->executor().read_only_queue().empty(), true); - BOOST_REQUIRE_EQUAL( app->executor().read_write_queue().empty(), true); + // all queues are cleared when exiting application::exec() + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_empty(), true); // no results BOOST_REQUIRE_EQUAL( rslts.size(), 0u ); } -// verify functions from both queues are processed in write window -BOOST_AUTO_TEST_CASE( execute_from_both_queues ) { +// verify functions from both queues (read_only, read_write) are processed in write window, but not read_exclusive +BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_write_queues ) { appbase::scoped_app app; auto app_thread = start_app_thread(app); @@ -159,32 +183,37 @@ BOOST_AUTO_TEST_CASE( execute_from_both_queues ) { // post functions std::map rslts {}; int seq_num = 0; - app->executor().post( priority::medium, exec_queue::read_only, [&]() { rslts[0]=seq_num; ++seq_num; } ); - app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[1]=seq_num; ++seq_num; } ); - app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[2]=seq_num; ++seq_num; } ); - app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts[3]=seq_num; ++seq_num; } ); - app->executor().post( priority::low, exec_queue::read_only, [&]() { rslts[4]=seq_num; ++seq_num; } ); - app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[5]=seq_num; ++seq_num; } ); - app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts[6]=seq_num; ++seq_num; } ); - app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[7]=seq_num; ++seq_num; } ); - app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts[8]=seq_num; ++seq_num; } ); - app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts[9]=seq_num; ++seq_num; } ); - app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[10]=seq_num; ++seq_num; } ); - app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[11]=seq_num; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_only, [&]() { rslts[0]=seq_num; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[1]=seq_num; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[2]=seq_num; ++seq_num; } ); + app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts[3]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_only, [&]() { rslts[4]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[5]=seq_num; ++seq_num; } ); + app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts[6]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[7]=seq_num; ++seq_num; } ); + app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts[8]=seq_num; ++seq_num; } ); + app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts[9]=seq_num; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[10]=seq_num; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[11]=seq_num; ++seq_num; } ); + app->executor().post( priority::highest,exec_queue::read_exclusive, [&]() { rslts[12]=seq_num; ++seq_num; } ); + app->executor().post( priority::lowest, exec_queue::read_exclusive, [&]() { rslts[13]=seq_num; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_exclusive, [&]() { rslts[14]=seq_num; ++seq_num; } ); // stop application. Use lowest at the end to make sure this executes the last app->executor().post( priority::lowest, exec_queue::read_only, [&]() { // read_queue should have current function and write_queue's functions are all executed - BOOST_REQUIRE_EQUAL( app->executor().read_only_queue().size(), 1u); // pop()s after execute - BOOST_REQUIRE_EQUAL( app->executor().read_write_queue().size(), 0u ); + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 1u); // pop()s after execute + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 3u); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 0u ); app->quit(); } ); app_thread.join(); - // queues are emptied after quit - BOOST_REQUIRE_EQUAL( app->executor().read_only_queue().empty(), true); - BOOST_REQUIRE_EQUAL( app->executor().read_write_queue().empty(), true); + // queues are emptied after exec + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_empty(), true); // exactly number of posts processed BOOST_REQUIRE_EQUAL( rslts.size(), 12u ); @@ -212,4 +241,98 @@ BOOST_AUTO_TEST_CASE( execute_from_both_queues ) { BOOST_CHECK_LT( rslts[6], rslts[11] ); } +// verify tasks from both queues (read_only, read_exclusive) are processed in read window +BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) { + appbase::scoped_app app; + + // set to run functions from read_only & read_exclusive queues only + app->executor().set_to_read_window(3, [](){return false;}); + + // post functions + std::vector> rslts(16); + std::atomic seq_num = 0; + app->executor().post( priority::medium, exec_queue::read_only, [&]() { rslts.at(0)=1; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_exclusive, [&]() { rslts.at(1)=2; ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_exclusive, [&]() { rslts.at(2)=3; ++seq_num; } ); + app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts.at(3)=4; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_only, [&]() { rslts.at(4)=5; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts.at(5)=6; ++seq_num; } ); + app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts.at(6)=7; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts.at(7)=8; ++seq_num; } ); + app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts.at(8)=9; ++seq_num; } ); + app->executor().post( priority::lowest, exec_queue::read_exclusive, [&]() { rslts.at(9)=10; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts.at(10)=11; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_exclusive, [&]() { rslts.at(11)=12; ++seq_num; } ); + app->executor().post( priority::highest,exec_queue::read_exclusive, [&]() { rslts.at(12)=13; ++seq_num; } ); + app->executor().post( priority::lowest, exec_queue::read_exclusive, [&]() { rslts.at(13)=14; ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_exclusive, [&]() { rslts.at(14)=15; ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_only, [&]() { rslts.at(15)=16; ++seq_num; } ); + + // Use lowest at the end to make sure this executes the last + app->executor().post( priority::lowest, exec_queue::read_exclusive, [&]() { + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 0u); // pop()s before execute + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 0u); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 3u ); + } ); + + + std::optional work; + work.emplace(app->get_io_service()); + while( true ) { + app->get_io_service().poll(); + size_t s = app->executor().read_only_queue_size() + app->executor().read_exclusive_queue_size() + app->executor().read_write_queue_size(); + if (s == 17) + break; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + auto app_thread = start_app_thread(app); + constexpr size_t num_expected = 13u; // 16 - 3 read_write + + auto read_thread1 = start_read_thread(app); + auto read_thread2 = start_read_thread(app); + auto read_thread3 = start_read_thread(app); + read_thread1.join(); + read_thread2.join(); + read_thread3.join(); + + size_t num_sleeps = 0; + while (seq_num < num_expected) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + if (++num_sleeps > 10000) + break; + }; + work.reset(); + app->quit(); + app_thread.join(); + + // queues are emptied after exec + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_empty(), true); + + // exactly number of posts processed + BOOST_REQUIRE_EQUAL( std::count_if(rslts.cbegin(), rslts.cend(), [](const auto& v){return v > 0; }), num_expected ); + + // all low must be processed the in order of posting + BOOST_CHECK_LT( rslts[4], rslts[15] ); + + // all medium must be processed the in order of posting + BOOST_CHECK_LT( rslts[0], rslts[1] ); + BOOST_CHECK_LT( rslts[1], rslts[11] ); + BOOST_CHECK_LT( rslts[11], rslts[14] ); + + // all functions posted after high before highest must be processed after high + BOOST_CHECK_LT( rslts[2], rslts[3] ); + BOOST_CHECK_LT( rslts[2], rslts[4] ); + BOOST_CHECK_LT( rslts[2], rslts[9] ); + + // all functions posted after highest must be processed after it + BOOST_CHECK_LT( rslts[6], rslts[8] ); + BOOST_CHECK_LT( rslts[6], rslts[9] ); + BOOST_CHECK_LT( rslts[6], rslts[11] ); + BOOST_CHECK_LT( rslts[6], rslts[12] ); + BOOST_CHECK_LT( rslts[6], rslts[14] ); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 924b61bf92..3d8ac08cd1 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -2844,7 +2844,8 @@ void producer_plugin_impl::switch_to_read_window() { _time_tracker.pause(); // we are in write window, so no read-only trx threads are processing transactions. - if (app().executor().read_only_queue().empty() && app().executor().read_exclusive_queue().empty()) { // no read-only tasks to process. stay in write window + app().get_io_service().poll(); // make sure we schedule any ready + if (app().executor().read_only_queue_empty() && app().executor().read_exclusive_queue_empty()) { // no read-only tasks to process. stay in write window start_write_window(); // restart write window timer for next round return; }