diff --git a/libraries/custom_appbase/include/eosio/chain/application.hpp b/libraries/custom_appbase/include/eosio/chain/application.hpp index 1d510a996d..6026df97dc 100644 --- a/libraries/custom_appbase/include/eosio/chain/application.hpp +++ b/libraries/custom_appbase/include/eosio/chain/application.hpp @@ -30,14 +30,31 @@ class priority_queue_executor { static constexpr uint16_t minimum_runtime_ms = 3; // inform how many read_threads will be calling read_only/read_exclusive queues - // Currently only used to assert if exec_queue::read_exclusive is used without any read threads + // expected to only be called at program startup, not thread safe, not safe to call after startup void init_read_threads(size_t num_read_threads) { pri_queue_.init_read_threads(num_read_threads); } + // not thread safe, see init_read_threads comment + size_t get_read_threads() const { + return pri_queue_.get_read_threads(); + } + + // assume application is started on the main thread + std::thread::id get_main_thread_id() const { + return main_thread_id_; + } + template - auto post( int priority, exec_queue q, Func&& func ) { - return boost::asio::post( io_serv_, pri_queue_.wrap( priority, q, --order_, std::forward(func))); + void post( int priority, exec_queue q, Func&& func ) { + if (q == exec_queue::read_exclusive) { + // no reason to post to io_service which then places this in the read_exclusive_handlers queue. + // read_exclusive tasks are run exclusively by read threads by pulling off the read_exclusive handlers queue. + pri_queue_.add(priority, q, --order_, std::forward(func)); + } else { + // post to io_service as the main thread may be blocked on io_service.run_one() in application::exec() + boost::asio::post(io_serv_, pri_queue_.wrap(priority, q, --order_, std::forward(func))); + } } // Legacy and deprecated. To be removed after cleaning up its uses in base appbase @@ -125,6 +142,7 @@ class priority_queue_executor { // members are ordered taking into account that the last one is destructed first private: + std::thread::id main_thread_id_{ std::this_thread::get_id() }; boost::asio::io_service io_serv_; appbase::exec_pri_queue pri_queue_; std::atomic order_{ std::numeric_limits::max() }; // to maintain FIFO ordering in all queues within priority diff --git a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp index 15f7664eab..6ef03f4b73 100644 --- a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp +++ b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp @@ -31,10 +31,17 @@ class exec_pri_queue : public boost::asio::execution_context public: // inform how many read_threads will be calling read_only/read_exclusive queues + // expected to only be called at program startup, not thread safe, not safe to call when lock_enabled_ void init_read_threads(size_t num_read_threads) { + assert(!lock_enabled_); num_read_threads_ = num_read_threads; } + // not strictly thread safe, see init_read_threads comment + size_t get_read_threads() const { + return num_read_threads_; + } + void stop() { std::lock_guard g( mtx_ ); exiting_blocking_ = true; @@ -60,7 +67,7 @@ class exec_pri_queue : public boost::asio::execution_context assert( num_read_threads_ > 0 || q != exec_queue::read_exclusive); prio_queue& que = priority_que(q); std::unique_ptr handler(new queued_handler(priority, order, std::move(function))); - if (lock_enabled_) { + if (lock_enabled_ || q == exec_queue::read_exclusive) { // called directly from any thread for read_exclusive std::lock_guard g( mtx_ ); que.push( std::move( handler ) ); if (num_waiting_) diff --git a/libraries/custom_appbase/tests/custom_appbase_tests.cpp b/libraries/custom_appbase/tests/custom_appbase_tests.cpp index f574a89b9f..3f01740bdd 100644 --- a/libraries/custom_appbase/tests/custom_appbase_tests.cpp +++ b/libraries/custom_appbase/tests/custom_appbase_tests.cpp @@ -12,17 +12,61 @@ using namespace appbase; BOOST_AUTO_TEST_SUITE(custom_appbase_tests) -std::thread start_app_thread(appbase::scoped_app& app) { - const char* argv[] = { boost::unit_test::framework::current_test_case().p_name->c_str() }; - BOOST_CHECK(app->initialize(sizeof(argv) / sizeof(char*), const_cast(argv))); - app->startup(); - std::thread app_thread( [&]() { - app->exec(); - } ); - return app_thread; -} +class scoped_app_thread { +public: + explicit scoped_app_thread(bool delay_exec = false) { + app_thread_ = start_app_thread(); + if (!delay_exec) { + start_exec(); + } + } + ~scoped_app_thread() { // destroy app instance so next instance gets a clean one + application::reset_app_singleton(); + } + + scoped_app_thread(const scoped_app_thread&) = delete; + scoped_app_thread& operator=(const scoped_app_thread&) = delete; + + appbase::application* operator->() { + return app_; + } + const appbase::application* operator->() const { + return app_; + } + + void start_exec() { + start_exec_.set_value(); + } + + void join() { + app_thread_.join(); + } + +private: + std::thread start_app_thread() { + std::promise start_complete; + std::thread app_thread( [&]() { + assert(application::null_app_singleton()); + app_ = &appbase::app(); + const char* argv[] = { boost::unit_test::framework::current_test_case().p_name->c_str() }; + BOOST_CHECK(app_->initialize(sizeof(argv) / sizeof(char*), const_cast(argv))); + app_->startup(); + start_complete.set_value(); + start_exec_.get_future().get(); + app_->exec(); + } ); + start_complete.get_future().get(); + return app_thread; + } + +private: + std::thread app_thread_; + std::promise start_exec_; + appbase::application* app_; +}; + -std::thread start_read_thread(appbase::scoped_app& app) { +std::thread start_read_thread(scoped_app_thread& app) { static int num = 0; std::thread read_thread( [&]() { std::string name ="read-" + std::to_string(num++); @@ -37,9 +81,8 @@ std::thread start_read_thread(appbase::scoped_app& app) { // verify functions from both queues (read_only,read_write) are executed when execution window is not explicitly set BOOST_AUTO_TEST_CASE( default_exec_window ) { - appbase::scoped_app app; - auto app_thread = start_app_thread(app); - + scoped_app_thread app; + // post functions std::map rslts {}; int seq_num = 0; @@ -51,19 +94,17 @@ BOOST_AUTO_TEST_CASE( default_exec_window ) { app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[5]=seq_num; ++seq_num; } ); app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts[6]=seq_num; ++seq_num; } ); app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[7]=seq_num; ++seq_num; } ); - app->executor().post( priority::high, exec_queue::read_exclusive, [&]() { rslts[8]=seq_num; ++seq_num; } ); - app->executor().post( priority::low, exec_queue::read_exclusive, [&]() { rslts[9]=seq_num; ++seq_num; } ); // Stop app. Use the lowest priority to make sure this function to execute the last app->executor().post( priority::lowest, exec_queue::read_only, [&]() { // read_only_queue should only contain the current lambda function, // and read_write_queue should have executed all its functions BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 1u); // pop()s after execute - BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 2u ); + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 0u ); BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 0u ); app->quit(); } ); - app_thread.join(); + app.join(); // all queues are cleared when exiting application::exec() BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true); @@ -86,9 +127,8 @@ BOOST_AUTO_TEST_CASE( default_exec_window ) { // verify functions only from read_only queue are processed during read window on the main thread BOOST_AUTO_TEST_CASE( execute_from_read_only_queue ) { - appbase::scoped_app app; - auto app_thread = start_app_thread(app); - + scoped_app_thread app; + // set to run functions from read_only queue only app->executor().init_read_threads(1); app->executor().set_to_read_window([](){return false;}); @@ -115,7 +155,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_queue ) { BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 4u ); app->quit(); } ); - app_thread.join(); + app.join(); // all queues are cleared when exiting application::exec() BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true); @@ -134,9 +174,8 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_queue ) { // verify no functions are executed during read window if read_only & read_exclusive queue is empty BOOST_AUTO_TEST_CASE( execute_from_empty_read_only_queue ) { - appbase::scoped_app app; - auto app_thread = start_app_thread(app); - + scoped_app_thread app; + // set to run functions from read_only & read_exclusive queues only app->executor().init_read_threads(1); app->executor().set_to_read_window([](){return false;}); @@ -163,7 +202,7 @@ BOOST_AUTO_TEST_CASE( execute_from_empty_read_only_queue ) { BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 10u ); app->quit(); } ); - app_thread.join(); + app.join(); // all queues are cleared when exiting application::exec() BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true); @@ -176,8 +215,7 @@ BOOST_AUTO_TEST_CASE( execute_from_empty_read_only_queue ) { // verify functions from both queues (read_only, read_write) are processed in write window, but not read_exclusive BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_write_queues ) { - appbase::scoped_app app; - auto app_thread = start_app_thread(app); + scoped_app_thread app; // set to run functions from both queues app->executor().is_write_window(); @@ -197,20 +235,17 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_write_queues ) { app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts[9]=seq_num; ++seq_num; } ); app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[10]=seq_num; ++seq_num; } ); app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[11]=seq_num; ++seq_num; } ); - app->executor().post( priority::highest,exec_queue::read_exclusive, [&]() { rslts[12]=seq_num; ++seq_num; } ); - app->executor().post( priority::lowest, exec_queue::read_exclusive, [&]() { rslts[13]=seq_num; ++seq_num; } ); - app->executor().post( priority::medium, exec_queue::read_exclusive, [&]() { rslts[14]=seq_num; ++seq_num; } ); // stop application. Use lowest at the end to make sure this executes the last app->executor().post( priority::lowest, exec_queue::read_only, [&]() { // read_queue should have current function and write_queue's functions are all executed BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 1u); // pop()s after execute - BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 3u); + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 0u); BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 0u ); app->quit(); } ); - app_thread.join(); + app.join(); // queues are emptied after exec BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true); @@ -245,7 +280,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_write_queues ) { // verify tasks from both queues (read_only, read_exclusive) are processed in read window BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) { - appbase::scoped_app app; + scoped_app_thread app(true); app->executor().init_read_threads(3); // set to run functions from read_only & read_exclusive queues only @@ -289,7 +324,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } - auto app_thread = start_app_thread(app); + app.start_exec(); constexpr int num_expected = 13; // 16 - 3 read_write auto read_thread1 = start_read_thread(app); @@ -307,7 +342,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) { }; work.reset(); app->quit(); - app_thread.join(); + app.join(); // queues are emptied after exec BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true); @@ -340,10 +375,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) { // verify tasks from both queues (read_only, read_exclusive) are processed in read window BOOST_AUTO_TEST_CASE( execute_many_from_read_only_and_read_exclusive_queues ) { - appbase::scoped_app app; - - auto app_thread = start_app_thread(app); - std::thread::id app_thread_id = app_thread.get_id(); + scoped_app_thread app; // set to run functions from read_only & read_exclusive queues only app->executor().init_read_threads(3); @@ -400,7 +432,7 @@ BOOST_AUTO_TEST_CASE( execute_many_from_read_only_and_read_exclusive_queues ) { }; app->quit(); - app_thread.join(); + app.join(); // exactly number of posts processed BOOST_REQUIRE_EQUAL( std::count_if(rslts.cbegin(), rslts.cend(), [](const auto& v){ return v != std::thread::id(); }), num_expected ); @@ -408,7 +440,7 @@ BOOST_AUTO_TEST_CASE( execute_many_from_read_only_and_read_exclusive_queues ) { const auto run_on_1 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread1_id; }); const auto run_on_2 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread2_id; }); const auto run_on_3 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread3_id; }); - const auto run_on_main = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == app_thread_id; }); + const auto run_on_main = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == app->executor().get_main_thread_id(); }); BOOST_REQUIRE_EQUAL(run_on_1+run_on_2+run_on_3+run_on_main, num_expected); BOOST_CHECK(run_on_1 > 0); diff --git a/plugins/chain_api_plugin/chain_api_plugin.cpp b/plugins/chain_api_plugin/chain_api_plugin.cpp index e862199ae0..ee9fadfdc4 100644 --- a/plugins/chain_api_plugin/chain_api_plugin.cpp +++ b/plugins/chain_api_plugin/chain_api_plugin.cpp @@ -150,7 +150,6 @@ void chain_api_plugin::plugin_startup() { CHAIN_RO_CALL(get_required_keys, 200, http_params_types::params_required), CHAIN_RO_CALL(get_transaction_id, 200, http_params_types::params_required), // transaction related APIs will be posted to read_write queue after keys are recovered, they are safe to run in parallel until they post to the read_write queue - CHAIN_RO_CALL_ASYNC(send_read_only_transaction, chain_apis::read_only::send_read_only_transaction_results, 200, http_params_types::params_required), CHAIN_RO_CALL_ASYNC(compute_transaction, chain_apis::read_only::compute_transaction_results, 200, http_params_types::params_required), CHAIN_RW_CALL_ASYNC(push_transaction, chain_apis::read_write::push_transaction_results, 202, http_params_types::params_required), CHAIN_RW_CALL_ASYNC(push_transactions, chain_apis::read_write::push_transactions_results, 202, http_params_types::params_required), @@ -170,6 +169,8 @@ void chain_api_plugin::plugin_startup() { } _http_plugin.add_async_api({ + // chain_plugin send_read_only_transaction will post to read_exclusive queue + CHAIN_RO_CALL_ASYNC(send_read_only_transaction, chain_apis::read_only::send_read_only_transaction_results, 200, http_params_types::params_required), CHAIN_RO_CALL_WITH_400(get_raw_block, 200, http_params_types::params_required), CHAIN_RO_CALL_WITH_400(get_block_header, 200, http_params_types::params_required) }); diff --git a/plugins/chain_plugin/chain_plugin.cpp b/plugins/chain_plugin/chain_plugin.cpp index e4beb113da..369dfc34ee 100644 --- a/plugins/chain_plugin/chain_plugin.cpp +++ b/plugins/chain_plugin/chain_plugin.cpp @@ -2197,6 +2197,7 @@ void read_write::push_transactions(const read_write::push_transactions_params& p } CATCH_AND_CALL(next); } +// called from read-exclusive thread for read-only template void api_base::send_transaction_gen(API &api, send_transaction_params_t params, next_function next) { try { @@ -2567,12 +2568,19 @@ void read_only::compute_transaction(compute_transaction_params params, next_func } void read_only::send_read_only_transaction(send_read_only_transaction_params params, next_function next) { + static bool read_only_enabled = app().executor().get_read_threads() > 0; + EOS_ASSERT( read_only_enabled, unsupported_feature, + "read-only transactions execution not enabled on API node. Set read-only-threads > 0" ); + send_transaction_params_t gen_params { .return_failure_trace = false, .retry_trx = false, .retry_trx_num_blocks = std::nullopt, .trx_type = transaction_metadata::trx_type::read_only, .transaction = std::move(params.transaction) }; - return send_transaction_gen(*this, std::move(gen_params), std::move(next)); + // run read-only trx exclusively on read-only threads + app().executor().post(priority::low, exec_queue::read_exclusive, [this, gen_params{std::move(gen_params)}, next{std::move(next)}]() mutable { + send_transaction_gen(*this, std::move(gen_params), std::move(next)); + }); } read_only::get_transaction_id_result read_only::get_transaction_id( const read_only::get_transaction_id_params& params, const fc::time_point& ) const { diff --git a/plugins/http_plugin/include/eosio/http_plugin/common.hpp b/plugins/http_plugin/include/eosio/http_plugin/common.hpp index 5f12134086..100bee41f8 100644 --- a/plugins/http_plugin/include/eosio/http_plugin/common.hpp +++ b/plugins/http_plugin/include/eosio/http_plugin/common.hpp @@ -154,18 +154,18 @@ struct http_plugin_state { */ inline auto make_http_response_handler(http_plugin_state& plugin_state, detail::abstract_conn_ptr session_ptr, http_content_type content_type) { return [&plugin_state, - session_ptr{std::move(session_ptr)}, content_type](int code, std::optional response) { + session_ptr{std::move(session_ptr)}, content_type](int code, std::optional response) mutable { auto payload_size = detail::in_flight_sizeof(response); - if(auto error_str = session_ptr->verify_max_bytes_in_flight(payload_size); !error_str.empty()) { - session_ptr->send_busy_response(std::move(error_str)); - return; - } - plugin_state.bytes_in_flight += payload_size; // post back to an HTTP thread to allow the response handler to be called from any thread - boost::asio::post(plugin_state.thread_pool.get_executor(), - [&plugin_state, session_ptr, code, payload_size, response = std::move(response), content_type]() { + boost::asio::dispatch(plugin_state.thread_pool.get_executor(), + [&plugin_state, session_ptr{std::move(session_ptr)}, code, payload_size, response = std::move(response), content_type]() { + if(auto error_str = session_ptr->verify_max_bytes_in_flight(0); !error_str.empty()) { + session_ptr->send_busy_response(std::move(error_str)); + return; + } + try { plugin_state.bytes_in_flight -= payload_size; if (response.has_value()) { diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 2ac61560f3..b53090a7a6 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -774,14 +774,12 @@ class producer_plugin_impl : public std::enable_shared_from_this next) { if (trx_type == transaction_metadata::trx_type::read_only) { - EOS_ASSERT( _ro_thread_pool_size > 0, unsupported_feature, - "read-only transactions execution not enabled on API node. Set read-only-threads > 0" ); + assert(_ro_thread_pool_size > 0); // enforced by chain_plugin + assert(app().executor().get_main_thread_id() != std::this_thread::get_id()); // should only be called from read only threads // Post all read only trxs to read_exclusive queue for execution. auto trx_metadata = transaction_metadata::create_no_recover_keys(trx, transaction_metadata::trx_type::read_only); - app().executor().post(priority::low, exec_queue::read_exclusive, [this, trx{std::move(trx_metadata)}, next{std::move(next)}]() mutable { - push_read_only_transaction(std::move(trx), std::move(next)); - }); + push_read_only_transaction(std::move(trx_metadata), std::move(next)); return; } @@ -2673,11 +2671,8 @@ void producer_plugin::log_failed_transaction(const transaction_id_type& trx_i // Called from only one read_only thread void producer_plugin_impl::switch_to_write_window() { - if (_log.is_enabled(fc::log_level::debug)) { - auto now = fc::time_point::now(); - fc_dlog(_log, "Read-only threads ${n}, read window ${r}us, total all threads ${t}us", - ("n", _ro_thread_pool_size)("r", now - _ro_read_window_start_time)("t", _ro_all_threads_exec_time_us.load())); - } + fc_dlog(_log, "Read-only threads ${n}, read window ${r}us, total all threads ${t}us", + ("n", _ro_thread_pool_size)("r", fc::time_point::now() - _ro_read_window_start_time)("t", _ro_all_threads_exec_time_us.load())); chain::controller& chain = chain_plug->chain(); @@ -2732,6 +2727,8 @@ void producer_plugin_impl::switch_to_read_window() { start_write_window(); // restart write window timer for next round return; } + fc_dlog(_log, "Read only queue size ${s1}, read exclusive size ${s2}", + ("s1", app().executor().read_only_queue_size())("s2", app().executor().read_exclusive_queue_size())); uint32_t pending_block_num = chain.head_block_num() + 1; _ro_read_window_start_time = fc::time_point::now();