Skip to content

Commit

Permalink
Merge pull request #632 from AntelopeIO/named-tp-except
Browse files Browse the repository at this point in the history
Add handling of exceptions to named_thread_pool
  • Loading branch information
heifner authored Jan 25, 2023
2 parents 7ea9f61 + 1d2b983 commit e55669c
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 81 deletions.
27 changes: 16 additions & 11 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,19 @@ struct controller_impl {
conf( cfg ),
chain_id( chain_id ),
read_mode( cfg.read_mode ),
thread_pool( "chain", cfg.thread_pool_size )
thread_pool( "chain" )
{
fork_db.open( [this]( block_timestamp_type timestamp,
const flat_set<digest_type>& cur_features,
const vector<digest_type>& new_features )
{ check_protocol_features( timestamp, cur_features, new_features ); }
);

thread_pool.start( cfg.thread_pool_size, [this]( const fc::exception& e ) {
elog( "Exception in chain thread pool, exiting: ${e}", ("e", e.to_detail_string()) );
if( shutdown ) shutdown();
} );

set_activation_handler<builtin_protocol_feature_t::preactivate_feature>();
set_activation_handler<builtin_protocol_feature_t::replace_deferred>();
set_activation_handler<builtin_protocol_feature_t::get_sender>();
Expand Down Expand Up @@ -410,7 +415,7 @@ struct controller_impl {
std::vector<std::future<std::vector<char>>> v;
v.reserve( branch.size() );
for( auto bitr = branch.rbegin(); bitr != branch.rend(); ++bitr ) {
v.emplace_back( async_thread_pool( thread_pool.get_executor(), [b=(*bitr)->block]() { return fc::raw::pack(*b); } ) );
v.emplace_back( post_async_task( thread_pool.get_executor(), [b=(*bitr)->block]() { return fc::raw::pack(*b); } ) );
}
auto it = v.begin();

Expand Down Expand Up @@ -1835,17 +1840,17 @@ struct controller_impl {

auto& bb = std::get<building_block>(pending->_block_stage);

auto action_merkle_fut = async_thread_pool( thread_pool.get_executor(),
[ids{std::move( bb._action_receipt_digests )}]() mutable {
return merkle( std::move( ids ) );
} );
auto action_merkle_fut = post_async_task( thread_pool.get_executor(),
[ids{std::move( bb._action_receipt_digests )}]() mutable {
return merkle( std::move( ids ) );
} );
const bool calc_trx_merkle = !std::holds_alternative<checksum256_type>(bb._trx_mroot_or_receipt_digests);
std::future<checksum256_type> trx_merkle_fut;
if( calc_trx_merkle ) {
trx_merkle_fut = async_thread_pool( thread_pool.get_executor(),
[ids{std::move( std::get<digests_t>(bb._trx_mroot_or_receipt_digests) )}]() mutable {
return merkle( std::move( ids ) );
} );
trx_merkle_fut = post_async_task( thread_pool.get_executor(),
[ids{std::move( std::get<digests_t>(bb._trx_mroot_or_receipt_digests) )}]() mutable {
return merkle( std::move( ids ) );
} );
}

// Update resource limits:
Expand Down Expand Up @@ -2170,7 +2175,7 @@ struct controller_impl {
std::future<block_state_ptr> create_block_state_future( const block_id_type& id, const signed_block_ptr& b ) {
EOS_ASSERT( b, block_validate_exception, "null block" );

return async_thread_pool( thread_pool.get_executor(), [b, id, control=this]() {
return post_async_task( thread_pool.get_executor(), [b, id, control=this]() {
// no reason for a block_state if fork_db already knows about block
auto existing = control->fork_db.get_block( id );
EOS_ASSERT( !existing, fork_database_exception, "we already know about this block: ${id}", ("id", id) );
Expand Down
29 changes: 17 additions & 12 deletions libraries/chain/include/eosio/chain/thread_utils.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <fc/exception/exception.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
#include <future>
Expand All @@ -10,25 +11,30 @@
namespace eosio { namespace chain {

/**
* Wrapper class for boost asio thread pool and io_context run.
* Wrapper class for thread pool of boost asio io_context run.
* Also names threads so that tools like htop can see thread name.
*/
class named_thread_pool {
public:
// name_prefix is name appended with -## of thread.
// short name_prefix (6 chars or under) is recommended as console_appender uses 9 chars for thread name
/// @param delay_start do not spawn threads in constructor, wait for start() call
named_thread_pool( std::string name_prefix, size_t num_threads, bool delay_start = false );
using on_except_t = std::function<void(const fc::exception& e)>;

// calls stop()
/// @param name_prefix is name appended with -## of thread.
/// A short name_prefix (6 chars or under) is recommended as console_appender uses 9 chars
/// for the thread name.
explicit named_thread_pool( std::string name_prefix );

/// calls stop()
~named_thread_pool();

boost::asio::io_context& get_executor() { return _ioc; }

/// Spawn threads, can be re-started after stop().
/// Assumes start()/stop() called from the same thread or externally protected.
/// @param num_threads is number of threads spawned
/// @param on_except is the function to call if io_context throws an exception, is called from thread pool thread.
/// if an empty function then logs and rethrows exception on thread which will terminate.
/// @throw assert_exception if already started and not stopped.
void start();
void start( size_t num_threads, on_except_t on_except );

/// destroy work guard, stop io_context, join thread_pool
void stop();
Expand All @@ -37,18 +43,17 @@ namespace eosio { namespace chain {
using ioc_work_t = boost::asio::executor_work_guard<boost::asio::io_context::executor_type>;

std::string _name_prefix;
size_t _num_threads;
std::vector<std::thread> _thread_pool;
boost::asio::io_context _ioc;
std::vector<std::thread> _thread_pool;
std::optional<ioc_work_t> _ioc_work;
};


// async on thread_pool and return future
// async on io_context and return future
template<typename F>
auto async_thread_pool( boost::asio::io_context& thread_pool, F&& f ) {
auto post_async_task( boost::asio::io_context& ioc, F&& f ) {
auto task = std::make_shared<std::packaged_task<decltype( f() )()>>( std::forward<F>( f ) );
boost::asio::post( thread_pool, [task]() { (*task)(); } );
boost::asio::post( ioc, [task]() { (*task)(); } );
return task->get_future();
}

Expand Down
47 changes: 35 additions & 12 deletions libraries/chain/thread_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,52 @@

namespace eosio { namespace chain {

named_thread_pool::named_thread_pool( std::string name_prefix, size_t num_threads, bool delay_start )
named_thread_pool::named_thread_pool( std::string name_prefix )
: _name_prefix( std::move(name_prefix) )
, _num_threads( num_threads )
, _ioc( num_threads )
, _ioc()
{
if( !delay_start ) {
start();
}
}

named_thread_pool::~named_thread_pool() {
stop();
}

void named_thread_pool::start() {
void named_thread_pool::start( size_t num_threads, on_except_t on_except ) {
FC_ASSERT( !_ioc_work, "Thread pool already started" );
_ioc_work.emplace( boost::asio::make_work_guard( _ioc ) );
_ioc.restart();
for( size_t i = 0; i < _num_threads; ++i ) {
_thread_pool.emplace_back( [&ioc = _ioc, &name_prefix = _name_prefix, i]() {
_thread_pool.reserve( num_threads );
for( size_t i = 0; i < num_threads; ++i ) {
_thread_pool.emplace_back( [&ioc = _ioc, &name_prefix = _name_prefix, on_except, i]() {
std::string tn = name_prefix + "-" + std::to_string( i );
fc::set_os_thread_name( tn );
ioc.run();
try {
fc::set_os_thread_name( tn );
ioc.run();
} catch( const fc::exception& e ) {
if( on_except ) {
on_except( e );
} else {
elog( "Exiting thread ${t} on exception: ${e}", ("t", tn)("e", e.to_detail_string()) );
throw;
}
} catch( const std::exception& e ) {
fc::std_exception_wrapper se( FC_LOG_MESSAGE( warn, "${what}: ", ("what", e.what()) ),
std::current_exception(), BOOST_CORE_TYPEID( e ).name(), e.what() );
if( on_except ) {
on_except( se );
} else {
elog( "Exiting thread ${t} on exception: ${e}", ("t", tn)("e", se.to_detail_string()) );
throw;
}
} catch( ... ) {
if( on_except ) {
fc::unhandled_exception ue( FC_LOG_MESSAGE( warn, "unknown exception" ), std::current_exception() );
on_except( ue );
} else {
elog( "Exiting thread ${t} on unknown exception", ("t", tn) );
throw;
}
}
} );
}
}
Expand All @@ -41,4 +64,4 @@ void named_thread_pool::stop() {
}


} } // eosio::chain
} } // eosio::chain
2 changes: 1 addition & 1 deletion libraries/chain/transaction_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ recover_keys_future transaction_metadata::start_recover_keys( packed_transaction
trx_type t,
uint32_t max_variable_sig_size )
{
return async_thread_pool( thread_pool, [trx{std::move(trx)}, chain_id, time_limit, t, max_variable_sig_size]() mutable {
return post_async_task( thread_pool, [trx{std::move(trx)}, chain_id, time_limit, t, max_variable_sig_size]() mutable {
fc::time_point deadline = time_limit == fc::microseconds::maximum() ?
fc::time_point::maximum() : fc::time_point::now() + time_limit;
check_variable_sig_size( trx, max_variable_sig_size );
Expand Down
3 changes: 2 additions & 1 deletion plugins/chain_plugin/test/test_account_query_db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ BOOST_FIXTURE_TEST_CASE(updateauth_test_multi_threaded, TESTER) { try {
produce_block();
create_account(tester_account);

named_thread_pool thread_pool( "test", 5 );
named_thread_pool thread_pool( "test" );
thread_pool.start( 5, {} );

for( size_t i = 0; i < 100; ++i ) {
boost::asio::post( thread_pool.get_executor(), [&aq_db, tester_account, role]() {
Expand Down
11 changes: 6 additions & 5 deletions plugins/http_plugin/http_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,11 @@ class http_plugin_impl : public std::enable_shared_from_this<http_plugin_impl> {
app().post(appbase::priority::high, [this] ()
{
try {
my->plugin_state->thread_pool =
std::make_unique<eosio::chain::named_thread_pool>( "http", my->plugin_state->thread_pool_size );
my->plugin_state->thread_pool.start( my->plugin_state->thread_pool_size, [](const fc::exception& e) {
fc_elog( logger(), "Exception in http thread pool, exiting: ${e}", ("e", e.to_detail_string()) );
app().quit();
} );

if(my->listen_endpoint) {
try {
my->create_beast_server(false);
Expand Down Expand Up @@ -465,9 +468,7 @@ class http_plugin_impl : public std::enable_shared_from_this<http_plugin_impl> {
if(my->beast_unix_server)
my->beast_unix_server->stop_listening();

if( my->plugin_state->thread_pool ) {
my->plugin_state->thread_pool->stop();
}
my->plugin_state->thread_pool.stop();

my->beast_server.reset();
my->beast_https_server.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class beast_http_listener : public std::enable_shared_from_this<beast_http_liste
beast_http_listener& operator=(const beast_http_listener&) = delete;
beast_http_listener& operator=(beast_http_listener&&) = delete;

beast_http_listener(std::shared_ptr<http_plugin_state> plugin_state) : is_listening_(false), plugin_state_(std::move(plugin_state)), acceptor_(plugin_state_->thread_pool->get_executor()), socket_(plugin_state_->thread_pool->get_executor()), accept_error_timer_(plugin_state_->thread_pool->get_executor()) {}
beast_http_listener(std::shared_ptr<http_plugin_state> plugin_state) : is_listening_(false), plugin_state_(std::move(plugin_state)), acceptor_(plugin_state_->thread_pool.get_executor()), socket_(plugin_state_->thread_pool.get_executor()), accept_error_timer_(plugin_state_->thread_pool.get_executor()) {}

virtual ~beast_http_listener() {
try {
Expand Down Expand Up @@ -99,7 +99,7 @@ class beast_http_listener : public std::enable_shared_from_this<beast_http_liste

void stop_listening() {
if(is_listening_) {
plugin_state_->thread_pool->stop();
plugin_state_->thread_pool.stop();
is_listening_ = false;
}
}
Expand Down
4 changes: 2 additions & 2 deletions plugins/http_plugin/include/eosio/http_plugin/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ struct http_plugin_state {
bool keep_alive = false;

uint16_t thread_pool_size = 2;
std::unique_ptr<eosio::chain::named_thread_pool> thread_pool;
eosio::chain::named_thread_pool thread_pool{ "http" };

fc::logger& logger;

Expand Down Expand Up @@ -162,7 +162,7 @@ auto make_http_response_handler(std::shared_ptr<http_plugin_state> plugin_state,
plugin_state->bytes_in_flight += payload_size;

// post back to an HTTP thread to allow the response handler to be called from any thread
boost::asio::post(plugin_state->thread_pool->get_executor(),
boost::asio::post(plugin_state->thread_pool.get_executor(),
[plugin_state, session_ptr, code, deadline, start, payload_size, response = std::move(response)]() {
try {
plugin_state->bytes_in_flight -= payload_size;
Expand Down
Loading

0 comments on commit e55669c

Please sign in to comment.