From a482ad8ab870659634f597aaccca2884286dc726 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Tue, 10 Jan 2023 12:16:42 -0600 Subject: [PATCH 1/8] Add handling of exceptions to named_thread_pool --- libraries/chain/controller.cpp | 5 ++- .../include/eosio/chain/thread_utils.hpp | 17 +++++++--- libraries/chain/thread_utils.cpp | 33 ++++++++++++++++--- .../test/test_account_query_db.cpp | 2 +- plugins/http_plugin/http_plugin.cpp | 5 ++- plugins/net_plugin/net_plugin.cpp | 5 ++- plugins/producer_plugin/producer_plugin.cpp | 5 ++- .../txn_test_gen_plugin.cpp | 2 +- unittests/misc_tests.cpp | 2 +- 9 files changed, 60 insertions(+), 16 deletions(-) diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index 90bee6fdf8..2ee4cd6d58 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -301,7 +301,10 @@ struct controller_impl { conf( cfg ), chain_id( chain_id ), read_mode( cfg.read_mode ), - thread_pool( "chain", cfg.thread_pool_size ) + thread_pool( "chain", cfg.thread_pool_size, [this]( const fc::exception& e ) { + elog( "Exception in chain thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); + if( shutdown ) shutdown(); + } ) { fork_db.open( [this]( block_timestamp_type timestamp, const flat_set& cur_features, diff --git a/libraries/chain/include/eosio/chain/thread_utils.hpp b/libraries/chain/include/eosio/chain/thread_utils.hpp index 077b14a4fb..4c4b937984 100644 --- a/libraries/chain/include/eosio/chain/thread_utils.hpp +++ b/libraries/chain/include/eosio/chain/thread_utils.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -15,16 +16,21 @@ namespace eosio { namespace chain { */ class named_thread_pool { public: - // name_prefix is name appended with -## of thread. - // short name_prefix (6 chars or under) is recommended as console_appender uses 9 chars for thread name - named_thread_pool( std::string name_prefix, size_t num_threads ); + using on_except_t = std::function; - // calls stop() + /// @param name_prefix is name appended with -## of thread. + /// A short name_prefix (6 chars or under) is recommended as console_appender uses 9 chars + /// for the thread name. + /// @param num_threads is number of threads spawned in the constructor + /// @param on_except is the function to call if io_context throws an exception, is called from thread pool thread + named_thread_pool( std::string name_prefix, size_t num_threads, on_except_t on_except ); + + /// calls stop() ~named_thread_pool(); boost::asio::io_context& get_executor() { return _ioc; } - // destroy work guard, stop io_context, join thread_pool, and stop thread_pool + /// destroy work guard, stop io_context, join thread_pool, and stop thread_pool void stop(); private: @@ -33,6 +39,7 @@ namespace eosio { namespace chain { boost::asio::thread_pool _thread_pool; boost::asio::io_context _ioc; std::optional _ioc_work; + on_except_t _on_except; }; diff --git a/libraries/chain/thread_utils.cpp b/libraries/chain/thread_utils.cpp index 3ca95ef0a8..ca39705c5e 100644 --- a/libraries/chain/thread_utils.cpp +++ b/libraries/chain/thread_utils.cpp @@ -1,5 +1,6 @@ #include #include +#include namespace eosio { namespace chain { @@ -7,16 +8,40 @@ namespace eosio { namespace chain { // // named_thread_pool // -named_thread_pool::named_thread_pool( std::string name_prefix, size_t num_threads ) +named_thread_pool::named_thread_pool( std::string name_prefix, size_t num_threads, on_except_t on_except ) : _thread_pool( num_threads ) , _ioc( num_threads ) +, _on_except( on_except ) { _ioc_work.emplace( boost::asio::make_work_guard( _ioc ) ); for( size_t i = 0; i < num_threads; ++i ) { - boost::asio::post( _thread_pool, [&ioc = _ioc, name_prefix, i]() { + boost::asio::post( _thread_pool, [&ioc = _ioc, name_prefix, i, on_except]() mutable { std::string tn = name_prefix + "-" + std::to_string( i ); - fc::set_os_thread_name( tn ); - ioc.run(); + try { + fc::set_os_thread_name( tn ); + ioc.run(); + } catch( const fc::exception& e ) { + if( on_except ) { + on_except( e ); + } else { + elog( "Exiting thread ${t} on exception: ${e}", ("t", tn)("e", e.to_detail_string()) ); + } + } catch( const std::exception& e ) { + fc::std_exception_wrapper se( FC_LOG_MESSAGE( warn, "${what}: ", ("what", e.what()) ), + std::current_exception(), BOOST_CORE_TYPEID( e ).name(), e.what() ); + if( on_except ) { + on_except( se ); + } else { + elog( "Exiting thread ${t} on exception: ${e}", ("t", tn)("e", se.to_detail_string()) ); + } + } catch( ... ) { + if( on_except ) { + fc::unhandled_exception ue( FC_LOG_MESSAGE( warn, "unknown exception" ), std::current_exception() ); + on_except( ue ); + } else { + elog( "Exiting thread ${t} on unknown exception", ("t", tn) ); + } + } } ); } } diff --git a/plugins/chain_plugin/test/test_account_query_db.cpp b/plugins/chain_plugin/test/test_account_query_db.cpp index 9cd6509d4d..40c8990aae 100644 --- a/plugins/chain_plugin/test/test_account_query_db.cpp +++ b/plugins/chain_plugin/test/test_account_query_db.cpp @@ -115,7 +115,7 @@ BOOST_FIXTURE_TEST_CASE(updateauth_test_multi_threaded, TESTER) { try { produce_block(); create_account(tester_account); - named_thread_pool thread_pool( "test", 5 ); + named_thread_pool thread_pool( "test", 5, nullptr ); for( size_t i = 0; i < 100; ++i ) { boost::asio::post( thread_pool.get_executor(), [&aq_db, tester_account, role]() { diff --git a/plugins/http_plugin/http_plugin.cpp b/plugins/http_plugin/http_plugin.cpp index 81c5144f04..ebb74881ef 100644 --- a/plugins/http_plugin/http_plugin.cpp +++ b/plugins/http_plugin/http_plugin.cpp @@ -374,7 +374,10 @@ class http_plugin_impl : public std::enable_shared_from_this { { try { my->plugin_state->thread_pool = - std::make_unique( "http", my->plugin_state->thread_pool_size ); + std::make_unique( "http", my->plugin_state->thread_pool_size, [](const fc::exception& e) { + fc_elog( logger(), "Exception in http thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); + app().quit(); + } ); if(my->listen_endpoint) { try { my->create_beast_server(false); diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 5f4986567d..eaf4e312c5 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -3673,7 +3673,10 @@ namespace eosio { my->producer_plug = app().find_plugin(); - my->thread_pool.emplace( "net", my->thread_pool_size ); + my->thread_pool.emplace( "net", my->thread_pool_size, []( const fc::exception& e ) { + fc_elog( logger, "Exception in net plugin thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); + app().quit(); + } ); my->dispatcher.reset( new dispatch_manager( my_impl->thread_pool->get_executor() ) ); diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index c921630bd7..cd82803892 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -929,7 +929,10 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_ auto thread_pool_size = options.at( "producer-threads" ).as(); EOS_ASSERT( thread_pool_size > 0, plugin_config_exception, "producer-threads ${num} must be greater than 0", ("num", thread_pool_size)); - my->_thread_pool.emplace( "prod", thread_pool_size ); + my->_thread_pool.emplace( "prod", thread_pool_size, []( const fc::exception& e ) { + fc_elog( _log, "Exception in producer thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); + app().quit(); + } ); if( options.count( "snapshots-dir" )) { auto sd = options.at( "snapshots-dir" ).as(); diff --git a/plugins/txn_test_gen_plugin/txn_test_gen_plugin.cpp b/plugins/txn_test_gen_plugin/txn_test_gen_plugin.cpp index e9f57139d6..fd34cfff49 100644 --- a/plugins/txn_test_gen_plugin/txn_test_gen_plugin.cpp +++ b/plugins/txn_test_gen_plugin/txn_test_gen_plugin.cpp @@ -317,7 +317,7 @@ struct txn_test_gen_plugin_impl { batch = batch_size/2; nonce_prefix = 0; - thread_pool.emplace( "txntest", thread_pool_size ); + thread_pool.emplace( "txntest", thread_pool_size, nullptr ); // txn_test_gen_plugin is being removed timer = std::make_shared(thread_pool->get_executor()); ilog("Started transaction test plugin; generating ${p} transactions every ${m} ms by ${t} load generation threads", diff --git a/unittests/misc_tests.cpp b/unittests/misc_tests.cpp index 5e90799ae1..38f08957da 100644 --- a/unittests/misc_tests.cpp +++ b/unittests/misc_tests.cpp @@ -910,7 +910,7 @@ BOOST_AUTO_TEST_CASE(transaction_metadata_test) { try { BOOST_CHECK_EQUAL(trx.id(), ptrx->id()); BOOST_CHECK_EQUAL(trx.id(), ptrx2->id()); - named_thread_pool thread_pool( "misc", 5 ); + named_thread_pool thread_pool( "misc", 5, nullptr ); auto fut = transaction_metadata::start_recover_keys( ptrx, thread_pool.get_executor(), test.control->get_chain_id(), fc::microseconds::maximum(), transaction_metadata::trx_type::input ); auto fut2 = transaction_metadata::start_recover_keys( ptrx2, thread_pool.get_executor(), test.control->get_chain_id(), fc::microseconds::maximum(), transaction_metadata::trx_type::input ); From 3b8a5fd9406624bfcf12c8428ff48a669bf4e659 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 11 Jan 2023 09:33:19 -0600 Subject: [PATCH 2/8] Add ability to delay start of thread pool --- .../include/eosio/chain/thread_utils.hpp | 24 ++++++++----- libraries/chain/thread_utils.cpp | 36 +++++++++++-------- 2 files changed, 37 insertions(+), 23 deletions(-) diff --git a/libraries/chain/include/eosio/chain/thread_utils.hpp b/libraries/chain/include/eosio/chain/thread_utils.hpp index 4c4b937984..178822869e 100644 --- a/libraries/chain/include/eosio/chain/thread_utils.hpp +++ b/libraries/chain/include/eosio/chain/thread_utils.hpp @@ -2,16 +2,16 @@ #include #include -#include #include #include #include #include +#include namespace eosio { namespace chain { /** - * Wrapper class for boost asio thread pool and io_context run. + * Wrapper class for thread pool of boost asio io_context run. * Also names threads so that tools like htop can see thread name. */ class named_thread_pool { @@ -23,31 +23,39 @@ namespace eosio { namespace chain { /// for the thread name. /// @param num_threads is number of threads spawned in the constructor /// @param on_except is the function to call if io_context throws an exception, is called from thread pool thread - named_thread_pool( std::string name_prefix, size_t num_threads, on_except_t on_except ); + /// @param delay_start do not spawn threads in constructor, wait for start() call + named_thread_pool( std::string name_prefix, size_t num_threads, on_except_t on_except, bool delay_start = false ); /// calls stop() ~named_thread_pool(); boost::asio::io_context& get_executor() { return _ioc; } - /// destroy work guard, stop io_context, join thread_pool, and stop thread_pool + /// Spawn threads, can be re-started after stop(). + /// Assumes start()/stop() called from the same thread or externally protected. + /// @throw assert_exception if already started and not stopped. + void start(); + + /// destroy work guard, stop io_context, join thread_pool void stop(); private: using ioc_work_t = boost::asio::executor_work_guard; - boost::asio::thread_pool _thread_pool; + std::string _name_prefix; + size_t _num_threads; + std::vector _thread_pool; boost::asio::io_context _ioc; std::optional _ioc_work; on_except_t _on_except; }; - // async on thread_pool and return future + // async on io_context and return future template - auto async_thread_pool( boost::asio::io_context& thread_pool, F&& f ) { + auto async_thread_pool( boost::asio::io_context& ioc, F&& f ) { auto task = std::make_shared>( std::forward( f ) ); - boost::asio::post( thread_pool, [task]() { (*task)(); } ); + boost::asio::post( ioc, [task]() { (*task)(); } ); return task->get_future(); } diff --git a/libraries/chain/thread_utils.cpp b/libraries/chain/thread_utils.cpp index ca39705c5e..0cb1e071ed 100644 --- a/libraries/chain/thread_utils.cpp +++ b/libraries/chain/thread_utils.cpp @@ -4,18 +4,26 @@ namespace eosio { namespace chain { - -// -// named_thread_pool -// -named_thread_pool::named_thread_pool( std::string name_prefix, size_t num_threads, on_except_t on_except ) -: _thread_pool( num_threads ) +named_thread_pool::named_thread_pool( std::string name_prefix, size_t num_threads, on_except_t on_except, bool delay_start ) +: _name_prefix( std::move(name_prefix) ) +, _num_threads( num_threads ) , _ioc( num_threads ) , _on_except( on_except ) { + if( !delay_start ) { + start(); + } +} + +named_thread_pool::~named_thread_pool() { + stop(); +} + +void named_thread_pool::start() { + FC_ASSERT( !_ioc_work, "Thread pool already started" ); _ioc_work.emplace( boost::asio::make_work_guard( _ioc ) ); - for( size_t i = 0; i < num_threads; ++i ) { - boost::asio::post( _thread_pool, [&ioc = _ioc, name_prefix, i, on_except]() mutable { + for( size_t i = 0; i < _num_threads; ++i ) { + _thread_pool.emplace_back( [&ioc = _ioc, &name_prefix = _name_prefix, on_except = _on_except, i]() { std::string tn = name_prefix + "-" + std::to_string( i ); try { fc::set_os_thread_name( tn ); @@ -46,16 +54,14 @@ named_thread_pool::named_thread_pool( std::string name_prefix, size_t num_thread } } -named_thread_pool::~named_thread_pool() { - stop(); -} - void named_thread_pool::stop() { _ioc_work.reset(); _ioc.stop(); - _thread_pool.join(); - _thread_pool.stop(); + for( auto& t : _thread_pool ) { + t.join(); + } + _thread_pool.clear(); } -} } // eosio::chain \ No newline at end of file +} } // eosio::chain From c955f558adc96606494be6a0da4cc7c581ad36ac Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 11 Jan 2023 11:09:21 -0600 Subject: [PATCH 3/8] Added tests and fixed issue with ioc needing restart() --- libraries/chain/thread_utils.cpp | 1 + unittests/misc_tests.cpp | 48 ++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/libraries/chain/thread_utils.cpp b/libraries/chain/thread_utils.cpp index 0cb1e071ed..79b68b2b85 100644 --- a/libraries/chain/thread_utils.cpp +++ b/libraries/chain/thread_utils.cpp @@ -22,6 +22,7 @@ named_thread_pool::~named_thread_pool() { void named_thread_pool::start() { FC_ASSERT( !_ioc_work, "Thread pool already started" ); _ioc_work.emplace( boost::asio::make_work_guard( _ioc ) ); + _ioc.restart(); for( size_t i = 0; i < _num_threads; ++i ) { _thread_pool.emplace_back( [&ioc = _ioc, &name_prefix = _name_prefix, on_except = _on_except, i]() { std::string tn = name_prefix + "-" + std::to_string( i ); diff --git a/unittests/misc_tests.cpp b/unittests/misc_tests.cpp index 38f08957da..af28f87777 100644 --- a/unittests/misc_tests.cpp +++ b/unittests/misc_tests.cpp @@ -1199,6 +1199,54 @@ BOOST_AUTO_TEST_CASE(bad_alloc_test) { BOOST_CHECK( ptr == nullptr ); } +BOOST_AUTO_TEST_CASE(named_thread_pool_test) { + { + named_thread_pool thread_pool( "misc", 5, nullptr ); + + std::promise p; + auto f = p.get_future(); + boost::asio::post( thread_pool.get_executor(), [&p](){ + p.set_value(); + }); + BOOST_TEST( (f.wait_for( 100ms ) == std::future_status::ready) ); + } + { // delayed start + named_thread_pool thread_pool( "misc", 5, nullptr, true ); + + std::promise p; + auto f = p.get_future(); + boost::asio::post( thread_pool.get_executor(), [&p](){ + p.set_value(); + }); + BOOST_TEST( (f.wait_for( 10ms ) == std::future_status::timeout) ); + thread_pool.start(); + BOOST_TEST( (f.wait_for( 100ms ) == std::future_status::ready) ); + } + { // exception + std::promise ep; + auto ef = ep.get_future(); + named_thread_pool thread_pool( "misc", 5, [&ep](const fc::exception& e) { ep.set_value(e); } ); + + boost::asio::post( thread_pool.get_executor(), [](){ + FC_ASSERT( false, "oops throw in thread pool" ); + }); + BOOST_TEST( (ef.wait_for( 100ms ) == std::future_status::ready) ); + BOOST_TEST( ef.get().to_detail_string().find("oops throw in thread pool") != std::string::npos ); + + // we can restart, after a stop + BOOST_REQUIRE_THROW( thread_pool.start(), fc::assert_exception ); + thread_pool.stop(); + + std::promise p; + auto f = p.get_future(); + boost::asio::post( thread_pool.get_executor(), [&p](){ + p.set_value(); + }); + thread_pool.start(); + BOOST_TEST( (f.wait_for( 100ms ) == std::future_status::ready) ); + } +} + BOOST_AUTO_TEST_CASE(public_key_from_hash) { auto private_key_string = std::string("5KQwrPbwdL6PhXujxW37FSSQZ1JiwsST4cqQzDeyXtP79zkvFD3"); auto expected_public_key = std::string("EOS6MRyAjQq8ud7hVNYcfnVPJqcVpscN5So8BhtHuGYqET5GDW5CV"); From 3be3fccaf68ecd23502a39806075113b533495cc Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Sat, 21 Jan 2023 08:44:51 -0600 Subject: [PATCH 4/8] Rename async_thread_pool to post_async_task to better describe its function --- libraries/chain/controller.cpp | 20 +++++++++---------- .../include/eosio/chain/thread_utils.hpp | 2 +- libraries/chain/transaction_metadata.cpp | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index e26ceffb5c..f46c115edf 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -413,7 +413,7 @@ struct controller_impl { std::vector>> v; v.reserve( branch.size() ); for( auto bitr = branch.rbegin(); bitr != branch.rend(); ++bitr ) { - v.emplace_back( async_thread_pool( thread_pool.get_executor(), [b=(*bitr)->block]() { return fc::raw::pack(*b); } ) ); + v.emplace_back( post_async_task( thread_pool.get_executor(), [b=(*bitr)->block]() { return fc::raw::pack(*b); } ) ); } auto it = v.begin(); @@ -1838,17 +1838,17 @@ struct controller_impl { auto& bb = std::get(pending->_block_stage); - auto action_merkle_fut = async_thread_pool( thread_pool.get_executor(), - [ids{std::move( bb._action_receipt_digests )}]() mutable { - return merkle( std::move( ids ) ); - } ); + auto action_merkle_fut = post_async_task( thread_pool.get_executor(), + [ids{std::move( bb._action_receipt_digests )}]() mutable { + return merkle( std::move( ids ) ); + } ); const bool calc_trx_merkle = !std::holds_alternative(bb._trx_mroot_or_receipt_digests); std::future trx_merkle_fut; if( calc_trx_merkle ) { - trx_merkle_fut = async_thread_pool( thread_pool.get_executor(), - [ids{std::move( std::get(bb._trx_mroot_or_receipt_digests) )}]() mutable { - return merkle( std::move( ids ) ); - } ); + trx_merkle_fut = post_async_task( thread_pool.get_executor(), + [ids{std::move( std::get(bb._trx_mroot_or_receipt_digests) )}]() mutable { + return merkle( std::move( ids ) ); + } ); } // Update resource limits: @@ -2173,7 +2173,7 @@ struct controller_impl { std::future create_block_state_future( const block_id_type& id, const signed_block_ptr& b ) { EOS_ASSERT( b, block_validate_exception, "null block" ); - return async_thread_pool( thread_pool.get_executor(), [b, id, control=this]() { + return post_async_task( thread_pool.get_executor(), [b, id, control=this]() { // no reason for a block_state if fork_db already knows about block auto existing = control->fork_db.get_block( id ); EOS_ASSERT( !existing, fork_database_exception, "we already know about this block: ${id}", ("id", id) ); diff --git a/libraries/chain/include/eosio/chain/thread_utils.hpp b/libraries/chain/include/eosio/chain/thread_utils.hpp index 178822869e..373e4a37e4 100644 --- a/libraries/chain/include/eosio/chain/thread_utils.hpp +++ b/libraries/chain/include/eosio/chain/thread_utils.hpp @@ -53,7 +53,7 @@ namespace eosio { namespace chain { // async on io_context and return future template - auto async_thread_pool( boost::asio::io_context& ioc, F&& f ) { + auto post_async_task( boost::asio::io_context& ioc, F&& f ) { auto task = std::make_shared>( std::forward( f ) ); boost::asio::post( ioc, [task]() { (*task)(); } ); return task->get_future(); diff --git a/libraries/chain/transaction_metadata.cpp b/libraries/chain/transaction_metadata.cpp index 300710037d..f33e33192b 100644 --- a/libraries/chain/transaction_metadata.cpp +++ b/libraries/chain/transaction_metadata.cpp @@ -11,7 +11,7 @@ recover_keys_future transaction_metadata::start_recover_keys( packed_transaction trx_type t, uint32_t max_variable_sig_size ) { - return async_thread_pool( thread_pool, [trx{std::move(trx)}, chain_id, time_limit, t, max_variable_sig_size]() mutable { + return post_async_task( thread_pool, [trx{std::move(trx)}, chain_id, time_limit, t, max_variable_sig_size]() mutable { fc::time_point deadline = time_limit == fc::microseconds::maximum() ? fc::time_point::maximum() : fc::time_point::now() + time_limit; check_variable_sig_size( trx, max_variable_sig_size ); From aa46c518c43e205a74c8ec2ff6d33a8b45ffc5a8 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Sat, 21 Jan 2023 08:50:15 -0600 Subject: [PATCH 5/8] Cleanup from peer review comments --- libraries/chain/thread_utils.cpp | 2 +- plugins/chain_plugin/test/test_account_query_db.cpp | 2 +- unittests/misc_tests.cpp | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/libraries/chain/thread_utils.cpp b/libraries/chain/thread_utils.cpp index f144d33925..ae92578b83 100644 --- a/libraries/chain/thread_utils.cpp +++ b/libraries/chain/thread_utils.cpp @@ -24,7 +24,7 @@ void named_thread_pool::start() { _ioc_work.emplace( boost::asio::make_work_guard( _ioc ) ); _ioc.restart(); for( size_t i = 0; i < _num_threads; ++i ) { - _thread_pool.emplace_back( [&ioc = _ioc, &name_prefix = _name_prefix, on_except = _on_except, i]() { + _thread_pool.emplace_back( [&ioc = _ioc, &name_prefix = _name_prefix, &on_except = _on_except, i]() { std::string tn = name_prefix + "-" + std::to_string( i ); try { fc::set_os_thread_name( tn ); diff --git a/plugins/chain_plugin/test/test_account_query_db.cpp b/plugins/chain_plugin/test/test_account_query_db.cpp index 40c8990aae..88011431fa 100644 --- a/plugins/chain_plugin/test/test_account_query_db.cpp +++ b/plugins/chain_plugin/test/test_account_query_db.cpp @@ -115,7 +115,7 @@ BOOST_FIXTURE_TEST_CASE(updateauth_test_multi_threaded, TESTER) { try { produce_block(); create_account(tester_account); - named_thread_pool thread_pool( "test", 5, nullptr ); + named_thread_pool thread_pool( "test", 5, {} ); for( size_t i = 0; i < 100; ++i ) { boost::asio::post( thread_pool.get_executor(), [&aq_db, tester_account, role]() { diff --git a/unittests/misc_tests.cpp b/unittests/misc_tests.cpp index af28f87777..6edc523f15 100644 --- a/unittests/misc_tests.cpp +++ b/unittests/misc_tests.cpp @@ -910,7 +910,7 @@ BOOST_AUTO_TEST_CASE(transaction_metadata_test) { try { BOOST_CHECK_EQUAL(trx.id(), ptrx->id()); BOOST_CHECK_EQUAL(trx.id(), ptrx2->id()); - named_thread_pool thread_pool( "misc", 5, nullptr ); + named_thread_pool thread_pool( "misc", 5, {} ); auto fut = transaction_metadata::start_recover_keys( ptrx, thread_pool.get_executor(), test.control->get_chain_id(), fc::microseconds::maximum(), transaction_metadata::trx_type::input ); auto fut2 = transaction_metadata::start_recover_keys( ptrx2, thread_pool.get_executor(), test.control->get_chain_id(), fc::microseconds::maximum(), transaction_metadata::trx_type::input ); @@ -1201,7 +1201,7 @@ BOOST_AUTO_TEST_CASE(bad_alloc_test) { BOOST_AUTO_TEST_CASE(named_thread_pool_test) { { - named_thread_pool thread_pool( "misc", 5, nullptr ); + named_thread_pool thread_pool( "misc", 5, {} ); std::promise p; auto f = p.get_future(); @@ -1211,7 +1211,7 @@ BOOST_AUTO_TEST_CASE(named_thread_pool_test) { BOOST_TEST( (f.wait_for( 100ms ) == std::future_status::ready) ); } { // delayed start - named_thread_pool thread_pool( "misc", 5, nullptr, true ); + named_thread_pool thread_pool( "misc", 5, {}, true ); std::promise p; auto f = p.get_future(); From 009b4f6c73bbbbd8b4eedd3cab9aec3356d5f0f6 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Sat, 21 Jan 2023 09:04:41 -0600 Subject: [PATCH 6/8] Rethrow exception if no on_except provided. --- libraries/chain/include/eosio/chain/thread_utils.hpp | 3 ++- libraries/chain/thread_utils.cpp | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/libraries/chain/include/eosio/chain/thread_utils.hpp b/libraries/chain/include/eosio/chain/thread_utils.hpp index 373e4a37e4..8a778aa766 100644 --- a/libraries/chain/include/eosio/chain/thread_utils.hpp +++ b/libraries/chain/include/eosio/chain/thread_utils.hpp @@ -22,7 +22,8 @@ namespace eosio { namespace chain { /// A short name_prefix (6 chars or under) is recommended as console_appender uses 9 chars /// for the thread name. /// @param num_threads is number of threads spawned in the constructor - /// @param on_except is the function to call if io_context throws an exception, is called from thread pool thread + /// @param on_except is the function to call if io_context throws an exception, is called from thread pool thread. + /// if an empty function then logs and rethrows exception on thread which will terminate. /// @param delay_start do not spawn threads in constructor, wait for start() call named_thread_pool( std::string name_prefix, size_t num_threads, on_except_t on_except, bool delay_start = false ); diff --git a/libraries/chain/thread_utils.cpp b/libraries/chain/thread_utils.cpp index ae92578b83..3ec464edb8 100644 --- a/libraries/chain/thread_utils.cpp +++ b/libraries/chain/thread_utils.cpp @@ -34,6 +34,7 @@ void named_thread_pool::start() { on_except( e ); } else { elog( "Exiting thread ${t} on exception: ${e}", ("t", tn)("e", e.to_detail_string()) ); + throw; } } catch( const std::exception& e ) { fc::std_exception_wrapper se( FC_LOG_MESSAGE( warn, "${what}: ", ("what", e.what()) ), @@ -42,6 +43,7 @@ void named_thread_pool::start() { on_except( se ); } else { elog( "Exiting thread ${t} on exception: ${e}", ("t", tn)("e", se.to_detail_string()) ); + throw; } } catch( ... ) { if( on_except ) { @@ -49,6 +51,7 @@ void named_thread_pool::start() { on_except( ue ); } else { elog( "Exiting thread ${t} on unknown exception", ("t", tn) ); + throw; } } } ); From 76374f884f73160dac7e33f3916d6881fe347dbd Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 25 Jan 2023 07:53:41 -0600 Subject: [PATCH 7/8] Changed named_thread_pool to always delay start() --- libraries/chain/controller.cpp | 10 ++-- .../include/eosio/chain/thread_utils.hpp | 15 +++--- libraries/chain/thread_utils.cpp | 15 ++---- .../test/test_account_query_db.cpp | 3 +- plugins/http_plugin/http_plugin.cpp | 14 +++--- .../eosio/http_plugin/beast_http_listener.hpp | 4 +- .../include/eosio/http_plugin/common.hpp | 4 +- plugins/net_plugin/net_plugin.cpp | 47 +++++++++---------- plugins/producer_plugin/producer_plugin.cpp | 28 +++++------ .../state_history_plugin.cpp | 12 ++--- unittests/misc_tests.cpp | 17 ++++--- 11 files changed, 82 insertions(+), 87 deletions(-) diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index f46c115edf..36f559eea8 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -301,10 +301,7 @@ struct controller_impl { conf( cfg ), chain_id( chain_id ), read_mode( cfg.read_mode ), - thread_pool( "chain", cfg.thread_pool_size, [this]( const fc::exception& e ) { - elog( "Exception in chain thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); - if( shutdown ) shutdown(); - } ) + thread_pool( "chain" ) { fork_db.open( [this]( block_timestamp_type timestamp, const flat_set& cur_features, @@ -312,6 +309,11 @@ struct controller_impl { { check_protocol_features( timestamp, cur_features, new_features ); } ); + thread_pool.start( cfg.thread_pool_size, [this]( const fc::exception& e ) { + elog( "Exception in chain thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); + if( shutdown ) shutdown(); + } ); + set_activation_handler(); set_activation_handler(); set_activation_handler(); diff --git a/libraries/chain/include/eosio/chain/thread_utils.hpp b/libraries/chain/include/eosio/chain/thread_utils.hpp index 8a778aa766..0f04cc674a 100644 --- a/libraries/chain/include/eosio/chain/thread_utils.hpp +++ b/libraries/chain/include/eosio/chain/thread_utils.hpp @@ -21,11 +21,7 @@ namespace eosio { namespace chain { /// @param name_prefix is name appended with -## of thread. /// A short name_prefix (6 chars or under) is recommended as console_appender uses 9 chars /// for the thread name. - /// @param num_threads is number of threads spawned in the constructor - /// @param on_except is the function to call if io_context throws an exception, is called from thread pool thread. - /// if an empty function then logs and rethrows exception on thread which will terminate. - /// @param delay_start do not spawn threads in constructor, wait for start() call - named_thread_pool( std::string name_prefix, size_t num_threads, on_except_t on_except, bool delay_start = false ); + explicit named_thread_pool( std::string name_prefix ); /// calls stop() ~named_thread_pool(); @@ -34,8 +30,11 @@ namespace eosio { namespace chain { /// Spawn threads, can be re-started after stop(). /// Assumes start()/stop() called from the same thread or externally protected. + /// @param num_threads is number of threads spawned + /// @param on_except is the function to call if io_context throws an exception, is called from thread pool thread. + /// if an empty function then logs and rethrows exception on thread which will terminate. /// @throw assert_exception if already started and not stopped. - void start(); + void start( size_t num_threads, on_except_t on_except ); /// destroy work guard, stop io_context, join thread_pool void stop(); @@ -44,11 +43,9 @@ namespace eosio { namespace chain { using ioc_work_t = boost::asio::executor_work_guard; std::string _name_prefix; - size_t _num_threads; - std::vector _thread_pool; boost::asio::io_context _ioc; + std::vector _thread_pool; std::optional _ioc_work; - on_except_t _on_except; }; diff --git a/libraries/chain/thread_utils.cpp b/libraries/chain/thread_utils.cpp index 3ec464edb8..038675b264 100644 --- a/libraries/chain/thread_utils.cpp +++ b/libraries/chain/thread_utils.cpp @@ -4,27 +4,22 @@ namespace eosio { namespace chain { -named_thread_pool::named_thread_pool( std::string name_prefix, size_t num_threads, on_except_t on_except, bool delay_start ) +named_thread_pool::named_thread_pool( std::string name_prefix ) : _name_prefix( std::move(name_prefix) ) -, _num_threads( num_threads ) -, _ioc( num_threads ) -, _on_except( std::move(on_except) ) +, _ioc() { - if( !delay_start ) { - start(); - } } named_thread_pool::~named_thread_pool() { stop(); } -void named_thread_pool::start() { +void named_thread_pool::start( size_t num_threads, on_except_t on_except ) { FC_ASSERT( !_ioc_work, "Thread pool already started" ); _ioc_work.emplace( boost::asio::make_work_guard( _ioc ) ); _ioc.restart(); - for( size_t i = 0; i < _num_threads; ++i ) { - _thread_pool.emplace_back( [&ioc = _ioc, &name_prefix = _name_prefix, &on_except = _on_except, i]() { + for( size_t i = 0; i < num_threads; ++i ) { + _thread_pool.emplace_back( [&ioc = _ioc, &name_prefix = _name_prefix, on_except, i]() { std::string tn = name_prefix + "-" + std::to_string( i ); try { fc::set_os_thread_name( tn ); diff --git a/plugins/chain_plugin/test/test_account_query_db.cpp b/plugins/chain_plugin/test/test_account_query_db.cpp index 88011431fa..f16efe1482 100644 --- a/plugins/chain_plugin/test/test_account_query_db.cpp +++ b/plugins/chain_plugin/test/test_account_query_db.cpp @@ -115,7 +115,8 @@ BOOST_FIXTURE_TEST_CASE(updateauth_test_multi_threaded, TESTER) { try { produce_block(); create_account(tester_account); - named_thread_pool thread_pool( "test", 5, {} ); + named_thread_pool thread_pool( "test" ); + thread_pool.start( 5, {} ); for( size_t i = 0; i < 100; ++i ) { boost::asio::post( thread_pool.get_executor(), [&aq_db, tester_account, role]() { diff --git a/plugins/http_plugin/http_plugin.cpp b/plugins/http_plugin/http_plugin.cpp index ebb74881ef..637cdd84c8 100644 --- a/plugins/http_plugin/http_plugin.cpp +++ b/plugins/http_plugin/http_plugin.cpp @@ -373,11 +373,11 @@ class http_plugin_impl : public std::enable_shared_from_this { app().post(appbase::priority::high, [this] () { try { - my->plugin_state->thread_pool = - std::make_unique( "http", my->plugin_state->thread_pool_size, [](const fc::exception& e) { - fc_elog( logger(), "Exception in http thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); - app().quit(); - } ); + my->plugin_state->thread_pool.start( my->plugin_state->thread_pool_size, [](const fc::exception& e) { + fc_elog( logger(), "Exception in http thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); + app().quit(); + } ); + if(my->listen_endpoint) { try { my->create_beast_server(false); @@ -468,9 +468,7 @@ class http_plugin_impl : public std::enable_shared_from_this { if(my->beast_unix_server) my->beast_unix_server->stop_listening(); - if( my->plugin_state->thread_pool ) { - my->plugin_state->thread_pool->stop(); - } + my->plugin_state->thread_pool.stop(); my->beast_server.reset(); my->beast_https_server.reset(); diff --git a/plugins/http_plugin/include/eosio/http_plugin/beast_http_listener.hpp b/plugins/http_plugin/include/eosio/http_plugin/beast_http_listener.hpp index b363e0227c..0532ab1a24 100644 --- a/plugins/http_plugin/include/eosio/http_plugin/beast_http_listener.hpp +++ b/plugins/http_plugin/include/eosio/http_plugin/beast_http_listener.hpp @@ -38,7 +38,7 @@ class beast_http_listener : public std::enable_shared_from_this plugin_state) : is_listening_(false), plugin_state_(std::move(plugin_state)), acceptor_(plugin_state_->thread_pool->get_executor()), socket_(plugin_state_->thread_pool->get_executor()), accept_error_timer_(plugin_state_->thread_pool->get_executor()) {} + beast_http_listener(std::shared_ptr plugin_state) : is_listening_(false), plugin_state_(std::move(plugin_state)), acceptor_(plugin_state_->thread_pool.get_executor()), socket_(plugin_state_->thread_pool.get_executor()), accept_error_timer_(plugin_state_->thread_pool.get_executor()) {} virtual ~beast_http_listener() { try { @@ -99,7 +99,7 @@ class beast_http_listener : public std::enable_shared_from_thisthread_pool->stop(); + plugin_state_->thread_pool.stop(); is_listening_ = false; } } diff --git a/plugins/http_plugin/include/eosio/http_plugin/common.hpp b/plugins/http_plugin/include/eosio/http_plugin/common.hpp index 3be93a9f5f..5ac1272126 100644 --- a/plugins/http_plugin/include/eosio/http_plugin/common.hpp +++ b/plugins/http_plugin/include/eosio/http_plugin/common.hpp @@ -129,7 +129,7 @@ struct http_plugin_state { bool keep_alive = false; uint16_t thread_pool_size = 2; - std::unique_ptr thread_pool; + eosio::chain::named_thread_pool thread_pool{ "http" }; fc::logger& logger; @@ -162,7 +162,7 @@ auto make_http_response_handler(std::shared_ptr plugin_state, plugin_state->bytes_in_flight += payload_size; // post back to an HTTP thread to allow the response handler to be called from any thread - boost::asio::post(plugin_state->thread_pool->get_executor(), + boost::asio::post(plugin_state->thread_pool.get_executor(), [plugin_state, session_ptr, code, deadline, start, payload_size, response = std::move(response)]() { try { plugin_state->bytes_in_flight -= payload_size; diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 796836a6fa..a84bd8ee4e 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -283,8 +283,9 @@ namespace eosio { compat::channels::transaction_ack::channel_type::handle incoming_transaction_ack_subscription; - uint16_t thread_pool_size = 2; - std::optional thread_pool; + uint16_t thread_pool_size = 2; + eosio::chain::named_thread_pool thread_pool{ "net" }; + private: mutable std::mutex chain_info_mtx; // protects chain_* @@ -877,11 +878,11 @@ namespace eosio { connection::connection( const string& endpoint ) : peer_addr( endpoint ), - strand( my_impl->thread_pool->get_executor() ), - socket( new tcp::socket( my_impl->thread_pool->get_executor() ) ), + strand( my_impl->thread_pool.get_executor() ), + socket( new tcp::socket( my_impl->thread_pool.get_executor() ) ), log_p2p_address( endpoint ), connection_id( ++my_impl->current_connection_id ), - response_expected_timer( my_impl->thread_pool->get_executor() ), + response_expected_timer( my_impl->thread_pool.get_executor() ), last_handshake_recv(), last_handshake_sent() { @@ -890,10 +891,10 @@ namespace eosio { connection::connection() : peer_addr(), - strand( my_impl->thread_pool->get_executor() ), - socket( new tcp::socket( my_impl->thread_pool->get_executor() ) ), + strand( my_impl->thread_pool.get_executor() ), + socket( new tcp::socket( my_impl->thread_pool.get_executor() ) ), connection_id( ++my_impl->current_connection_id ), - response_expected_timer( my_impl->thread_pool->get_executor() ), + response_expected_timer( my_impl->thread_pool.get_executor() ), last_handshake_recv(), last_handshake_sent() { @@ -996,7 +997,7 @@ namespace eosio { self->socket->shutdown( tcp::socket::shutdown_both, ec ); self->socket->close( ec ); } - self->socket.reset( new tcp::socket( my_impl->thread_pool->get_executor() ) ); + self->socket.reset( new tcp::socket( my_impl->thread_pool.get_executor() ) ); self->flush_queues(); self->connecting = false; self->syncing = false; @@ -2319,7 +2320,7 @@ namespace eosio { string port = c->peer_address().substr( colon + 1, colon2 == string::npos ? string::npos : colon2 - (colon + 1)); c->set_connection_type( c->peer_address() ); - auto resolver = std::make_shared( my_impl->thread_pool->get_executor() ); + auto resolver = std::make_shared( my_impl->thread_pool.get_executor() ); connection_wptr weak_conn = c; // Note: need to add support for IPv6 too resolver->async_resolve( tcp::v4(), host, port, boost::asio::bind_executor( c->strand, @@ -3226,7 +3227,7 @@ namespace eosio { } if( reason == no_reason ) { - boost::asio::post( my_impl->thread_pool->get_executor(), [dispatcher = my_impl->dispatcher.get(), cid=c->connection_id, blk_id, msg]() { + boost::asio::post( my_impl->thread_pool.get_executor(), [dispatcher = my_impl->dispatcher.get(), cid=c->connection_id, blk_id, msg]() { fc_dlog( logger, "accepted signed_block : #${n} ${id}...", ("n", msg->block_num())("id", blk_id.str().substr(8,16)) ); dispatcher->add_peer_block( blk_id, cid ); }); @@ -3307,11 +3308,11 @@ namespace eosio { void net_plugin_impl::start_monitors() { { std::lock_guard g( connector_check_timer_mtx ); - connector_check_timer.reset(new boost::asio::steady_timer( my_impl->thread_pool->get_executor() )); + connector_check_timer.reset(new boost::asio::steady_timer( my_impl->thread_pool.get_executor() )); } { std::lock_guard g( expire_timer_mtx ); - expire_timer.reset( new boost::asio::steady_timer( my_impl->thread_pool->get_executor() ) ); + expire_timer.reset( new boost::asio::steady_timer( my_impl->thread_pool.get_executor() ) ); } start_conn_timer(connector_period, std::weak_ptr()); start_expire_timer(); @@ -3704,12 +3705,12 @@ namespace eosio { my->producer_plug = app().find_plugin(); - my->thread_pool.emplace( "net", my->thread_pool_size, []( const fc::exception& e ) { - fc_elog( logger, "Exception in net plugin thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); - app().quit(); - } ); + my->thread_pool.start( my->thread_pool_size, []( const fc::exception& e ) { + fc_elog( logger, "Exception in net plugin thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); + app().quit(); + } ); - my->dispatcher.reset( new dispatch_manager( my_impl->thread_pool->get_executor() ) ); + my->dispatcher.reset( new dispatch_manager( my_impl->thread_pool.get_executor() ) ); if( !my->p2p_accept_transactions && my->p2p_address.size() ) { fc_ilog( logger, "\n" @@ -3723,11 +3724,11 @@ namespace eosio { if( my->p2p_address.size() > 0 ) { auto host = my->p2p_address.substr( 0, my->p2p_address.find( ':' )); auto port = my->p2p_address.substr( host.size() + 1, my->p2p_address.size()); - tcp::resolver resolver( my->thread_pool->get_executor() ); + tcp::resolver resolver( my->thread_pool.get_executor() ); // Note: need to add support for IPv6 too? listen_endpoint = *resolver.resolve( tcp::v4(), host, port ); - my->acceptor.reset( new tcp::acceptor( my_impl->thread_pool->get_executor() ) ); + my->acceptor.reset( new tcp::acceptor( my_impl->thread_pool.get_executor() ) ); if( !my->p2p_server_address.empty() ) { my->p2p_address = my->p2p_server_address; @@ -3762,7 +3763,7 @@ namespace eosio { { std::lock_guard g( my->keepalive_timer_mtx ); - my->keepalive_timer.reset( new boost::asio::steady_timer( my->thread_pool->get_executor() ) ); + my->keepalive_timer.reset( new boost::asio::steady_timer( my->thread_pool.get_executor() ) ); } my->incoming_transaction_ack_subscription = app().get_channel().subscribe( @@ -3832,9 +3833,7 @@ namespace eosio { my->connections.clear(); } - if( my->thread_pool ) { - my->thread_pool->stop(); - } + my->thread_pool.stop(); if( my->acceptor ) { boost::system::error_code ec; diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 0ffe70b711..e38db88ad6 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -306,7 +306,8 @@ class producer_plugin_impl : public std::enable_shared_from_this _producer_watermarks; pending_block_mode _pending_block_mode = pending_block_mode::speculating; unapplied_transaction_queue _unapplied_transactions; - std::optional _thread_pool; + size_t _thread_pool_size = config::default_controller_thread_pool_size; + named_thread_pool _thread_pool{ "prod" }; std::atomic _max_transaction_time_ms; // modified by app thread, read by net_plugin thread pool std::atomic _received_block{false}; // modified by net_plugin thread pool and app thread @@ -530,7 +531,7 @@ class producer_plugin_impl : public std::enable_shared_from_thisget_executor(), + auto future = transaction_metadata::start_recover_keys( trx, _thread_pool.get_executor(), chain.get_chain_id(), fc::microseconds( max_trx_cpu_usage ), trx_type, chain.configured_subjective_signature_length_limit() ); @@ -550,7 +551,7 @@ class producer_plugin_impl : public std::enable_shared_from_thisget_executor(), [self = this, future{std::move(future)}, api_trx, return_failure_traces, + boost::asio::post(_thread_pool.get_executor(), [self = this, future{std::move(future)}, api_trx, return_failure_traces, next{std::move(next)}, trx=trx]() mutable { if( future.valid() ) { future.wait(); @@ -764,7 +765,7 @@ void producer_plugin::set_program_options( "Disable subjective CPU billing for P2P transactions") ("disable-subjective-api-billing", bpo::value()->default_value(true), "Disable subjective CPU billing for API transactions") - ("producer-threads", bpo::value()->default_value(config::default_controller_thread_pool_size), + ("producer-threads", bpo::value()->default_value(my->_thread_pool_size), "Number of worker threads in producer thread pool") ("snapshots-dir", bpo::value()->default_value("snapshots"), "the location of the snapshots directory (absolute path or relative to application data dir)") @@ -929,13 +930,9 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_ if( my->_disable_subjective_api_billing ) ilog( "Subjective CPU billing of API trxs disabled " ); } - auto thread_pool_size = options.at( "producer-threads" ).as(); - EOS_ASSERT( thread_pool_size > 0, plugin_config_exception, - "producer-threads ${num} must be greater than 0", ("num", thread_pool_size)); - my->_thread_pool.emplace( "prod", thread_pool_size, []( const fc::exception& e ) { - fc_elog( _log, "Exception in producer thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); - app().quit(); - } ); + my->_thread_pool_size = options.at( "producer-threads" ).as(); + EOS_ASSERT( my->_thread_pool_size > 0, plugin_config_exception, + "producer-threads ${num} must be greater than 0", ("num", my->_thread_pool_size)); if( options.count( "snapshots-dir" )) { auto sd = options.at( "snapshots-dir" ).as(); @@ -996,6 +993,11 @@ void producer_plugin::plugin_startup() try { ilog("producer plugin: plugin_startup() begin"); + my->_thread_pool.start( my->_thread_pool_size, []( const fc::exception& e ) { + fc_elog( _log, "Exception in producer thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); + app().quit(); + } ); + chain::controller& chain = my->chain_plug->chain(); EOS_ASSERT( my->_producers.empty() || chain.get_read_mode() != chain::db_read_mode::IRREVERSIBLE, plugin_config_exception, "node cannot have any producer-name configured because block production is impossible when read_mode is \"irreversible\"" ); @@ -1051,9 +1053,7 @@ void producer_plugin::plugin_shutdown() { edump((fc::std_exception_wrapper::from_current_exception(e).to_detail_string())); } - if( my->_thread_pool ) { - my->_thread_pool->stop(); - } + my->_thread_pool.stop(); my->_unapplied_transactions.clear(); diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index aac2bbb7ce..ea2d62766c 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -87,11 +87,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this, std::unique_ptr>; std::set acceptors; - // use of executor assumes only one thread, delayed start - named_thread_pool thread_pool{"SHiP", 1, [](const fc::exception& e) { - fc_elog( _log, "Exception in SHiP thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); - app().quit(); - }, true}; + named_thread_pool thread_pool{"SHiP"}; static void get_log_entry(state_history_log& log, uint32_t block_num, std::optional& result) { if (block_num < log.begin_block() || block_num >= log.end_block()) @@ -709,7 +705,11 @@ void state_history_plugin::plugin_startup() { fc_ilog( _log, "Done storing initial state on startup" ); } my->listen(); - my->thread_pool.start(); + // use of executor assumes only one thread + my->thread_pool.start( 1, [](const fc::exception& e) { + fc_elog( _log, "Exception in SHiP thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); + app().quit(); + } ); } catch (std::exception& ex) { appbase::app().quit(); } diff --git a/unittests/misc_tests.cpp b/unittests/misc_tests.cpp index 6edc523f15..72ee411d85 100644 --- a/unittests/misc_tests.cpp +++ b/unittests/misc_tests.cpp @@ -910,7 +910,8 @@ BOOST_AUTO_TEST_CASE(transaction_metadata_test) { try { BOOST_CHECK_EQUAL(trx.id(), ptrx->id()); BOOST_CHECK_EQUAL(trx.id(), ptrx2->id()); - named_thread_pool thread_pool( "misc", 5, {} ); + named_thread_pool thread_pool( "misc" ); + thread_pool.start( 5, {} ); auto fut = transaction_metadata::start_recover_keys( ptrx, thread_pool.get_executor(), test.control->get_chain_id(), fc::microseconds::maximum(), transaction_metadata::trx_type::input ); auto fut2 = transaction_metadata::start_recover_keys( ptrx2, thread_pool.get_executor(), test.control->get_chain_id(), fc::microseconds::maximum(), transaction_metadata::trx_type::input ); @@ -1201,7 +1202,8 @@ BOOST_AUTO_TEST_CASE(bad_alloc_test) { BOOST_AUTO_TEST_CASE(named_thread_pool_test) { { - named_thread_pool thread_pool( "misc", 5, {} ); + named_thread_pool thread_pool( "misc" ); + thread_pool.start( 5, {} ); std::promise p; auto f = p.get_future(); @@ -1211,7 +1213,7 @@ BOOST_AUTO_TEST_CASE(named_thread_pool_test) { BOOST_TEST( (f.wait_for( 100ms ) == std::future_status::ready) ); } { // delayed start - named_thread_pool thread_pool( "misc", 5, {}, true ); + named_thread_pool thread_pool( "misc" ); std::promise p; auto f = p.get_future(); @@ -1219,13 +1221,14 @@ BOOST_AUTO_TEST_CASE(named_thread_pool_test) { p.set_value(); }); BOOST_TEST( (f.wait_for( 10ms ) == std::future_status::timeout) ); - thread_pool.start(); + thread_pool.start( 5, {} ); BOOST_TEST( (f.wait_for( 100ms ) == std::future_status::ready) ); } { // exception std::promise ep; auto ef = ep.get_future(); - named_thread_pool thread_pool( "misc", 5, [&ep](const fc::exception& e) { ep.set_value(e); } ); + named_thread_pool thread_pool( "misc" ); + thread_pool.start( 5, [&ep](const fc::exception& e) { ep.set_value(e); } ); boost::asio::post( thread_pool.get_executor(), [](){ FC_ASSERT( false, "oops throw in thread pool" ); @@ -1234,7 +1237,7 @@ BOOST_AUTO_TEST_CASE(named_thread_pool_test) { BOOST_TEST( ef.get().to_detail_string().find("oops throw in thread pool") != std::string::npos ); // we can restart, after a stop - BOOST_REQUIRE_THROW( thread_pool.start(), fc::assert_exception ); + BOOST_REQUIRE_THROW( thread_pool.start( 5, [&ep](const fc::exception& e) { ep.set_value(e); } ), fc::assert_exception ); thread_pool.stop(); std::promise p; @@ -1242,7 +1245,7 @@ BOOST_AUTO_TEST_CASE(named_thread_pool_test) { boost::asio::post( thread_pool.get_executor(), [&p](){ p.set_value(); }); - thread_pool.start(); + thread_pool.start( 5, [&ep](const fc::exception& e) { ep.set_value(e); } ); BOOST_TEST( (f.wait_for( 100ms ) == std::future_status::ready) ); } } From 1d2b98363c33a42ae67da4d85ea4c2f197f92b97 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 25 Jan 2023 09:57:44 -0600 Subject: [PATCH 8/8] Add reserve() --- libraries/chain/thread_utils.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/libraries/chain/thread_utils.cpp b/libraries/chain/thread_utils.cpp index 038675b264..e65f5495b4 100644 --- a/libraries/chain/thread_utils.cpp +++ b/libraries/chain/thread_utils.cpp @@ -18,6 +18,7 @@ void named_thread_pool::start( size_t num_threads, on_except_t on_except ) { FC_ASSERT( !_ioc_work, "Thread pool already started" ); _ioc_work.emplace( boost::asio::make_work_guard( _ioc ) ); _ioc.restart(); + _thread_pool.reserve( num_threads ); for( size_t i = 0; i < num_threads; ++i ) { _thread_pool.emplace_back( [&ioc = _ioc, &name_prefix = _name_prefix, on_except, i]() { std::string tn = name_prefix + "-" + std::to_string( i );