Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into GH-592-ship-crash-spl…
Browse files Browse the repository at this point in the history
…it-ship-log
  • Loading branch information
heifner committed Jan 25, 2023
2 parents 3ed8ac7 + e55669c commit 93173ce
Show file tree
Hide file tree
Showing 14 changed files with 100 additions and 99 deletions.
30 changes: 16 additions & 14 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,17 +301,19 @@ struct controller_impl {
conf( cfg ),
chain_id( chain_id ),
read_mode( cfg.read_mode ),
thread_pool( "chain", cfg.thread_pool_size, [this]( const fc::exception& e ) {
elog( "Exception in chain thread pool, exiting: ${e}", ("e", e.to_detail_string()) );
if( shutdown ) shutdown();
} )
thread_pool( "chain" )
{
fork_db.open( [this]( block_timestamp_type timestamp,
const flat_set<digest_type>& cur_features,
const vector<digest_type>& new_features )
{ check_protocol_features( timestamp, cur_features, new_features ); }
);

thread_pool.start( cfg.thread_pool_size, [this]( const fc::exception& e ) {
elog( "Exception in chain thread pool, exiting: ${e}", ("e", e.to_detail_string()) );
if( shutdown ) shutdown();
} );

set_activation_handler<builtin_protocol_feature_t::preactivate_feature>();
set_activation_handler<builtin_protocol_feature_t::replace_deferred>();
set_activation_handler<builtin_protocol_feature_t::get_sender>();
Expand Down Expand Up @@ -413,7 +415,7 @@ struct controller_impl {
std::vector<std::future<std::vector<char>>> v;
v.reserve( branch.size() );
for( auto bitr = branch.rbegin(); bitr != branch.rend(); ++bitr ) {
v.emplace_back( async_thread_pool( thread_pool.get_executor(), [b=(*bitr)->block]() { return fc::raw::pack(*b); } ) );
v.emplace_back( post_async_task( thread_pool.get_executor(), [b=(*bitr)->block]() { return fc::raw::pack(*b); } ) );
}
auto it = v.begin();

Expand Down Expand Up @@ -1838,17 +1840,17 @@ struct controller_impl {

auto& bb = std::get<building_block>(pending->_block_stage);

auto action_merkle_fut = async_thread_pool( thread_pool.get_executor(),
[ids{std::move( bb._action_receipt_digests )}]() mutable {
return merkle( std::move( ids ) );
} );
auto action_merkle_fut = post_async_task( thread_pool.get_executor(),
[ids{std::move( bb._action_receipt_digests )}]() mutable {
return merkle( std::move( ids ) );
} );
const bool calc_trx_merkle = !std::holds_alternative<checksum256_type>(bb._trx_mroot_or_receipt_digests);
std::future<checksum256_type> trx_merkle_fut;
if( calc_trx_merkle ) {
trx_merkle_fut = async_thread_pool( thread_pool.get_executor(),
[ids{std::move( std::get<digests_t>(bb._trx_mroot_or_receipt_digests) )}]() mutable {
return merkle( std::move( ids ) );
} );
trx_merkle_fut = post_async_task( thread_pool.get_executor(),
[ids{std::move( std::get<digests_t>(bb._trx_mroot_or_receipt_digests) )}]() mutable {
return merkle( std::move( ids ) );
} );
}

// Update resource limits:
Expand Down Expand Up @@ -2173,7 +2175,7 @@ struct controller_impl {
std::future<block_state_ptr> create_block_state_future( const block_id_type& id, const signed_block_ptr& b ) {
EOS_ASSERT( b, block_validate_exception, "null block" );

return async_thread_pool( thread_pool.get_executor(), [b, id, control=this]() {
return post_async_task( thread_pool.get_executor(), [b, id, control=this]() {
// no reason for a block_state if fork_db already knows about block
auto existing = control->fork_db.get_block( id );
EOS_ASSERT( !existing, fork_database_exception, "we already know about this block: ${id}", ("id", id) );
Expand Down
16 changes: 7 additions & 9 deletions libraries/chain/include/eosio/chain/thread_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ namespace eosio { namespace chain {
/// @param name_prefix is name appended with -## of thread.
/// A short name_prefix (6 chars or under) is recommended as console_appender uses 9 chars
/// for the thread name.
/// @param num_threads is number of threads spawned in the constructor
/// @param on_except is the function to call if io_context throws an exception, is called from thread pool thread
/// @param delay_start do not spawn threads in constructor, wait for start() call
named_thread_pool( std::string name_prefix, size_t num_threads, on_except_t on_except, bool delay_start = false );
explicit named_thread_pool( std::string name_prefix );

/// calls stop()
~named_thread_pool();
Expand All @@ -33,8 +30,11 @@ namespace eosio { namespace chain {

/// Spawn threads, can be re-started after stop().
/// Assumes start()/stop() called from the same thread or externally protected.
/// @param num_threads is number of threads spawned
/// @param on_except is the function to call if io_context throws an exception, is called from thread pool thread.
/// if an empty function then logs and rethrows exception on thread which will terminate.
/// @throw assert_exception if already started and not stopped.
void start();
void start( size_t num_threads, on_except_t on_except );

/// destroy work guard, stop io_context, join thread_pool
void stop();
Expand All @@ -43,17 +43,15 @@ namespace eosio { namespace chain {
using ioc_work_t = boost::asio::executor_work_guard<boost::asio::io_context::executor_type>;

std::string _name_prefix;
size_t _num_threads;
std::vector<std::thread> _thread_pool;
boost::asio::io_context _ioc;
std::vector<std::thread> _thread_pool;
std::optional<ioc_work_t> _ioc_work;
on_except_t _on_except;
};


// async on io_context and return future
template<typename F>
auto async_thread_pool( boost::asio::io_context& ioc, F&& f ) {
auto post_async_task( boost::asio::io_context& ioc, F&& f ) {
auto task = std::make_shared<std::packaged_task<decltype( f() )()>>( std::forward<F>( f ) );
boost::asio::post( ioc, [task]() { (*task)(); } );
return task->get_future();
Expand Down
1 change: 1 addition & 0 deletions libraries/chain/include/eosio/chain/wast_to_wasm.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include <vector>
#include <string>
#include <stdint.h>

namespace eosio { namespace chain {

Expand Down
19 changes: 9 additions & 10 deletions libraries/chain/thread_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,23 @@

namespace eosio { namespace chain {

named_thread_pool::named_thread_pool( std::string name_prefix, size_t num_threads, on_except_t on_except, bool delay_start )
named_thread_pool::named_thread_pool( std::string name_prefix )
: _name_prefix( std::move(name_prefix) )
, _num_threads( num_threads )
, _ioc( num_threads )
, _on_except( on_except )
, _ioc()
{
if( !delay_start ) {
start();
}
}

named_thread_pool::~named_thread_pool() {
stop();
}

void named_thread_pool::start() {
void named_thread_pool::start( size_t num_threads, on_except_t on_except ) {
FC_ASSERT( !_ioc_work, "Thread pool already started" );
_ioc_work.emplace( boost::asio::make_work_guard( _ioc ) );
_ioc.restart();
for( size_t i = 0; i < _num_threads; ++i ) {
_thread_pool.emplace_back( [&ioc = _ioc, &name_prefix = _name_prefix, on_except = _on_except, i]() {
_thread_pool.reserve( num_threads );
for( size_t i = 0; i < num_threads; ++i ) {
_thread_pool.emplace_back( [&ioc = _ioc, &name_prefix = _name_prefix, on_except, i]() {
std::string tn = name_prefix + "-" + std::to_string( i );
try {
fc::set_os_thread_name( tn );
Expand All @@ -34,6 +30,7 @@ void named_thread_pool::start() {
on_except( e );
} else {
elog( "Exiting thread ${t} on exception: ${e}", ("t", tn)("e", e.to_detail_string()) );
throw;
}
} catch( const std::exception& e ) {
fc::std_exception_wrapper se( FC_LOG_MESSAGE( warn, "${what}: ", ("what", e.what()) ),
Expand All @@ -42,13 +39,15 @@ void named_thread_pool::start() {
on_except( se );
} else {
elog( "Exiting thread ${t} on exception: ${e}", ("t", tn)("e", se.to_detail_string()) );
throw;
}
} catch( ... ) {
if( on_except ) {
fc::unhandled_exception ue( FC_LOG_MESSAGE( warn, "unknown exception" ), std::current_exception() );
on_except( ue );
} else {
elog( "Exiting thread ${t} on unknown exception", ("t", tn) );
throw;
}
}
} );
Expand Down
2 changes: 1 addition & 1 deletion libraries/chain/transaction_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ recover_keys_future transaction_metadata::start_recover_keys( packed_transaction
trx_type t,
uint32_t max_variable_sig_size )
{
return async_thread_pool( thread_pool, [trx{std::move(trx)}, chain_id, time_limit, t, max_variable_sig_size]() mutable {
return post_async_task( thread_pool, [trx{std::move(trx)}, chain_id, time_limit, t, max_variable_sig_size]() mutable {
fc::time_point deadline = time_limit == fc::microseconds::maximum() ?
fc::time_point::maximum() : fc::time_point::now() + time_limit;
check_variable_sig_size( trx, max_variable_sig_size );
Expand Down
3 changes: 2 additions & 1 deletion plugins/chain_plugin/test/test_account_query_db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ BOOST_FIXTURE_TEST_CASE(updateauth_test_multi_threaded, TESTER) { try {
produce_block();
create_account(tester_account);

named_thread_pool thread_pool( "test", 5, nullptr );
named_thread_pool thread_pool( "test" );
thread_pool.start( 5, {} );

for( size_t i = 0; i < 100; ++i ) {
boost::asio::post( thread_pool.get_executor(), [&aq_db, tester_account, role]() {
Expand Down
14 changes: 6 additions & 8 deletions plugins/http_plugin/http_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,11 @@ class http_plugin_impl : public std::enable_shared_from_this<http_plugin_impl> {
app().post(appbase::priority::high, [this] ()
{
try {
my->plugin_state->thread_pool =
std::make_unique<eosio::chain::named_thread_pool>( "http", my->plugin_state->thread_pool_size, [](const fc::exception& e) {
fc_elog( logger(), "Exception in http thread pool, exiting: ${e}", ("e", e.to_detail_string()) );
app().quit();
} );
my->plugin_state->thread_pool.start( my->plugin_state->thread_pool_size, [](const fc::exception& e) {
fc_elog( logger(), "Exception in http thread pool, exiting: ${e}", ("e", e.to_detail_string()) );
app().quit();
} );

if(my->listen_endpoint) {
try {
my->create_beast_server(false);
Expand Down Expand Up @@ -468,9 +468,7 @@ class http_plugin_impl : public std::enable_shared_from_this<http_plugin_impl> {
if(my->beast_unix_server)
my->beast_unix_server->stop_listening();

if( my->plugin_state->thread_pool ) {
my->plugin_state->thread_pool->stop();
}
my->plugin_state->thread_pool.stop();

my->beast_server.reset();
my->beast_https_server.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class beast_http_listener : public std::enable_shared_from_this<beast_http_liste
beast_http_listener& operator=(const beast_http_listener&) = delete;
beast_http_listener& operator=(beast_http_listener&&) = delete;

beast_http_listener(std::shared_ptr<http_plugin_state> plugin_state) : is_listening_(false), plugin_state_(std::move(plugin_state)), acceptor_(plugin_state_->thread_pool->get_executor()), socket_(plugin_state_->thread_pool->get_executor()), accept_error_timer_(plugin_state_->thread_pool->get_executor()) {}
beast_http_listener(std::shared_ptr<http_plugin_state> plugin_state) : is_listening_(false), plugin_state_(std::move(plugin_state)), acceptor_(plugin_state_->thread_pool.get_executor()), socket_(plugin_state_->thread_pool.get_executor()), accept_error_timer_(plugin_state_->thread_pool.get_executor()) {}

virtual ~beast_http_listener() {
try {
Expand Down Expand Up @@ -99,7 +99,7 @@ class beast_http_listener : public std::enable_shared_from_this<beast_http_liste

void stop_listening() {
if(is_listening_) {
plugin_state_->thread_pool->stop();
plugin_state_->thread_pool.stop();
is_listening_ = false;
}
}
Expand Down
4 changes: 2 additions & 2 deletions plugins/http_plugin/include/eosio/http_plugin/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ struct http_plugin_state {
bool keep_alive = false;

uint16_t thread_pool_size = 2;
std::unique_ptr<eosio::chain::named_thread_pool> thread_pool;
eosio::chain::named_thread_pool thread_pool{ "http" };

fc::logger& logger;

Expand Down Expand Up @@ -162,7 +162,7 @@ auto make_http_response_handler(std::shared_ptr<http_plugin_state> plugin_state,
plugin_state->bytes_in_flight += payload_size;

// post back to an HTTP thread to allow the response handler to be called from any thread
boost::asio::post(plugin_state->thread_pool->get_executor(),
boost::asio::post(plugin_state->thread_pool.get_executor(),
[plugin_state, session_ptr, code, deadline, start, payload_size, response = std::move(response)]() {
try {
plugin_state->bytes_in_flight -= payload_size;
Expand Down
47 changes: 23 additions & 24 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,9 @@ namespace eosio {

compat::channels::transaction_ack::channel_type::handle incoming_transaction_ack_subscription;

uint16_t thread_pool_size = 2;
std::optional<eosio::chain::named_thread_pool> thread_pool;
uint16_t thread_pool_size = 2;
eosio::chain::named_thread_pool thread_pool{ "net" };


private:
mutable std::mutex chain_info_mtx; // protects chain_*
Expand Down Expand Up @@ -877,11 +878,11 @@ namespace eosio {

connection::connection( const string& endpoint )
: peer_addr( endpoint ),
strand( my_impl->thread_pool->get_executor() ),
socket( new tcp::socket( my_impl->thread_pool->get_executor() ) ),
strand( my_impl->thread_pool.get_executor() ),
socket( new tcp::socket( my_impl->thread_pool.get_executor() ) ),
log_p2p_address( endpoint ),
connection_id( ++my_impl->current_connection_id ),
response_expected_timer( my_impl->thread_pool->get_executor() ),
response_expected_timer( my_impl->thread_pool.get_executor() ),
last_handshake_recv(),
last_handshake_sent()
{
Expand All @@ -890,10 +891,10 @@ namespace eosio {

connection::connection()
: peer_addr(),
strand( my_impl->thread_pool->get_executor() ),
socket( new tcp::socket( my_impl->thread_pool->get_executor() ) ),
strand( my_impl->thread_pool.get_executor() ),
socket( new tcp::socket( my_impl->thread_pool.get_executor() ) ),
connection_id( ++my_impl->current_connection_id ),
response_expected_timer( my_impl->thread_pool->get_executor() ),
response_expected_timer( my_impl->thread_pool.get_executor() ),
last_handshake_recv(),
last_handshake_sent()
{
Expand Down Expand Up @@ -996,7 +997,7 @@ namespace eosio {
self->socket->shutdown( tcp::socket::shutdown_both, ec );
self->socket->close( ec );
}
self->socket.reset( new tcp::socket( my_impl->thread_pool->get_executor() ) );
self->socket.reset( new tcp::socket( my_impl->thread_pool.get_executor() ) );
self->flush_queues();
self->connecting = false;
self->syncing = false;
Expand Down Expand Up @@ -2319,7 +2320,7 @@ namespace eosio {
string port = c->peer_address().substr( colon + 1, colon2 == string::npos ? string::npos : colon2 - (colon + 1));
c->set_connection_type( c->peer_address() );

auto resolver = std::make_shared<tcp::resolver>( my_impl->thread_pool->get_executor() );
auto resolver = std::make_shared<tcp::resolver>( my_impl->thread_pool.get_executor() );
connection_wptr weak_conn = c;
// Note: need to add support for IPv6 too
resolver->async_resolve( tcp::v4(), host, port, boost::asio::bind_executor( c->strand,
Expand Down Expand Up @@ -3226,7 +3227,7 @@ namespace eosio {
}

if( reason == no_reason ) {
boost::asio::post( my_impl->thread_pool->get_executor(), [dispatcher = my_impl->dispatcher.get(), cid=c->connection_id, blk_id, msg]() {
boost::asio::post( my_impl->thread_pool.get_executor(), [dispatcher = my_impl->dispatcher.get(), cid=c->connection_id, blk_id, msg]() {
fc_dlog( logger, "accepted signed_block : #${n} ${id}...", ("n", msg->block_num())("id", blk_id.str().substr(8,16)) );
dispatcher->add_peer_block( blk_id, cid );
});
Expand Down Expand Up @@ -3307,11 +3308,11 @@ namespace eosio {
void net_plugin_impl::start_monitors() {
{
std::lock_guard<std::mutex> g( connector_check_timer_mtx );
connector_check_timer.reset(new boost::asio::steady_timer( my_impl->thread_pool->get_executor() ));
connector_check_timer.reset(new boost::asio::steady_timer( my_impl->thread_pool.get_executor() ));
}
{
std::lock_guard<std::mutex> g( expire_timer_mtx );
expire_timer.reset( new boost::asio::steady_timer( my_impl->thread_pool->get_executor() ) );
expire_timer.reset( new boost::asio::steady_timer( my_impl->thread_pool.get_executor() ) );
}
start_conn_timer(connector_period, std::weak_ptr<connection>());
start_expire_timer();
Expand Down Expand Up @@ -3704,12 +3705,12 @@ namespace eosio {

my->producer_plug = app().find_plugin<producer_plugin>();

my->thread_pool.emplace( "net", my->thread_pool_size, []( const fc::exception& e ) {
fc_elog( logger, "Exception in net plugin thread pool, exiting: ${e}", ("e", e.to_detail_string()) );
app().quit();
} );
my->thread_pool.start( my->thread_pool_size, []( const fc::exception& e ) {
fc_elog( logger, "Exception in net plugin thread pool, exiting: ${e}", ("e", e.to_detail_string()) );
app().quit();
} );

my->dispatcher.reset( new dispatch_manager( my_impl->thread_pool->get_executor() ) );
my->dispatcher.reset( new dispatch_manager( my_impl->thread_pool.get_executor() ) );

if( !my->p2p_accept_transactions && my->p2p_address.size() ) {
fc_ilog( logger, "\n"
Expand All @@ -3723,11 +3724,11 @@ namespace eosio {
if( my->p2p_address.size() > 0 ) {
auto host = my->p2p_address.substr( 0, my->p2p_address.find( ':' ));
auto port = my->p2p_address.substr( host.size() + 1, my->p2p_address.size());
tcp::resolver resolver( my->thread_pool->get_executor() );
tcp::resolver resolver( my->thread_pool.get_executor() );
// Note: need to add support for IPv6 too?
listen_endpoint = *resolver.resolve( tcp::v4(), host, port );

my->acceptor.reset( new tcp::acceptor( my_impl->thread_pool->get_executor() ) );
my->acceptor.reset( new tcp::acceptor( my_impl->thread_pool.get_executor() ) );

if( !my->p2p_server_address.empty() ) {
my->p2p_address = my->p2p_server_address;
Expand Down Expand Up @@ -3762,7 +3763,7 @@ namespace eosio {

{
std::lock_guard<std::mutex> g( my->keepalive_timer_mtx );
my->keepalive_timer.reset( new boost::asio::steady_timer( my->thread_pool->get_executor() ) );
my->keepalive_timer.reset( new boost::asio::steady_timer( my->thread_pool.get_executor() ) );
}

my->incoming_transaction_ack_subscription = app().get_channel<compat::channels::transaction_ack>().subscribe(
Expand Down Expand Up @@ -3832,9 +3833,7 @@ namespace eosio {
my->connections.clear();
}

if( my->thread_pool ) {
my->thread_pool->stop();
}
my->thread_pool.stop();

if( my->acceptor ) {
boost::system::error_code ec;
Expand Down
Loading

0 comments on commit 93173ce

Please sign in to comment.