Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Name application threads #6992

Merged
merged 36 commits into from
Apr 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
77f519f
Update fc to fc with set_os_thread_name
heifner Mar 26, 2019
d2c8a7e
Name bnet threads
heifner Mar 26, 2019
f488922
Name http threads
heifner Mar 26, 2019
ae9c3c9
Name mongo_db_plugin consume thread
heifner Mar 26, 2019
df67f7d
Name main application thread
heifner Mar 26, 2019
804da25
Name net_plugin server_ioc threads
heifner Mar 26, 2019
6c79f2e
Name all threads in chain controller thread pool
heifner Mar 26, 2019
9a34ba5
Name all threads in producer thread pool
heifner Mar 26, 2019
bf1bae2
Revert move of ilog message
heifner Mar 26, 2019
32540b3
Fix for tests which were destroying controller before all set_thread_…
heifner Mar 26, 2019
b4f8d70
Do not name main thread since some tests expect it to be nodeos
heifner Mar 27, 2019
0f7d853
Name bnet threads
heifner Mar 26, 2019
92ebd6a
Name http threads
heifner Mar 26, 2019
9015d30
Name mongo_db_plugin consume thread
heifner Mar 26, 2019
dc4026e
Name main application thread
heifner Mar 26, 2019
d6bf0b0
Name net_plugin server_ioc threads
heifner Mar 26, 2019
5dac704
Name all threads in chain controller thread pool
heifner Mar 26, 2019
c2a07d2
Name all threads in producer thread pool
heifner Mar 26, 2019
e0ecb3f
Revert move of ilog message
heifner Mar 26, 2019
34a43e2
Fix for tests which were destroying controller before all set_thread_…
heifner Mar 26, 2019
79f507a
Do not name main thread since some tests expect it to be nodeos
heifner Mar 27, 2019
7b3f801
Update to lastest fc with set_os_thread_name
heifner Apr 1, 2019
edb6c2a
Merge branch 'name-threads' of https://github.com/EOSIO/eos into name…
heifner Apr 1, 2019
f0e42da
Use io_context in thread_pool and set thread name before run
heifner Apr 1, 2019
b347269
Use io_context for thread pool and name threads before run
heifner Apr 1, 2019
85bbc90
Use ioc work to prevent io_context::run from exiting
heifner Apr 1, 2019
d1b6eb9
Use io_context instead of thread_pool
heifner Apr 1, 2019
d955af1
Update test to run on io_context like in producer_plugin and controller
heifner Apr 1, 2019
2c62e66
Merge branch 'develop' into name-threads
heifner Apr 2, 2019
408087a
Merge with develop
heifner Apr 2, 2019
0d7ec43
Fix merge issue
heifner Apr 2, 2019
c49af23
Add named_thread_pool to reduce duplicated code
heifner Apr 2, 2019
cb224ed
Simply by using named_thread_pool
heifner Apr 2, 2019
1c27590
Use named_thread_pool to simplify code
heifner Apr 2, 2019
0cfa5b3
Update to fc master with set_os_thread_name
heifner Apr 4, 2019
2b20aa3
Fix warning about unneeded capture
heifner Apr 4, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions libraries/chain/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ add_library( eosio_chain
# global_property_object.cpp
#
# contracts/chain_initializer.cpp



transaction_metadata.cpp
thread_utils.cpp
${HEADERS}
)

Expand Down
14 changes: 8 additions & 6 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <chainbase/chainbase.hpp>
#include <fc/io/json.hpp>
#include <fc/log/logger_config.hpp>
#include <fc/scoped_exit.hpp>
#include <fc/variant_object.hpp>

Expand Down Expand Up @@ -132,7 +133,7 @@ struct controller_impl {
optional<fc::microseconds> subjective_cpu_leeway;
bool trusted_producer_light_validation = false;
uint32_t snapshot_head_block = 0;
boost::asio::thread_pool thread_pool;
named_thread_pool thread_pool;

typedef pair<scope_name,action_name> handler_key;
map< account_name, map<handler_key, apply_handler> > apply_handlers;
Expand Down Expand Up @@ -184,7 +185,7 @@ struct controller_impl {
conf( cfg ),
chain_id( cfg.genesis.compute_chain_id() ),
read_mode( cfg.read_mode ),
thread_pool( cfg.thread_pool_size )
thread_pool( "chain", cfg.thread_pool_size )
{

#define SET_APP_HANDLER( receiver, contract, action) \
Expand Down Expand Up @@ -403,6 +404,7 @@ struct controller_impl {
}

~controller_impl() {
thread_pool.stop();
pending.reset();
}

Expand Down Expand Up @@ -1195,7 +1197,7 @@ struct controller_impl {
auto& pt = receipt.trx.get<packed_transaction>();
auto mtrx = std::make_shared<transaction_metadata>( std::make_shared<packed_transaction>( pt ) );
if( !self.skip_auth_check() ) {
transaction_metadata::start_recover_keys( mtrx, thread_pool, chain_id, microseconds::maximum() );
transaction_metadata::start_recover_keys( mtrx, thread_pool.get_executor(), chain_id, microseconds::maximum() );
}
packed_transactions.emplace_back( std::move( mtrx ) );
}
Expand Down Expand Up @@ -1273,7 +1275,7 @@ struct controller_impl {
auto prev = fork_db.get_block( b->previous );
EOS_ASSERT( prev, unlinkable_block_exception, "unlinkable block ${id}", ("id", id)("previous", b->previous) );

return async_thread_pool( thread_pool, [b, prev]() {
return async_thread_pool( thread_pool.get_executor(), [b, prev]() {
const bool skip_validate_signee = false;
return std::make_shared<block_state>( *prev, move( b ), skip_validate_signee );
} );
Expand Down Expand Up @@ -1780,8 +1782,8 @@ void controller::abort_block() {
my->abort_block();
}

boost::asio::thread_pool& controller::get_thread_pool() {
return my->thread_pool;
boost::asio::io_context& controller::get_thread_pool() {
return my->thread_pool.get_executor();
}

std::future<block_state_ptr> controller::create_block_state_future( const signed_block_ptr& b ) {
Expand Down
2 changes: 1 addition & 1 deletion libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ namespace eosio { namespace chain {
std::future<block_state_ptr> create_block_state_future( const signed_block_ptr& b );
void push_block( std::future<block_state_ptr>& block_state_future );

boost::asio::thread_pool& get_thread_pool();
boost::asio::io_context& get_thread_pool();

const chainbase::database& db()const;

Expand Down
31 changes: 30 additions & 1 deletion libraries/chain/include/eosio/chain/thread_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,45 @@
*/
#pragma once

#include <fc/optional.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/post.hpp>
#include <future>
#include <memory>

namespace eosio { namespace chain {

/**
* Wrapper class for boost asio thread pool and io_context run.
* Also names threads so that tools like htop can see thread name.
*/
class named_thread_pool {
public:
// name_prefix is name appended with -## of thread.
// short name_prefix (6 chars or under) is recommended as console_appender uses 9 chars for thread name
named_thread_pool( std::string name_prefix, size_t num_threads );

// calls stop()
~named_thread_pool();

boost::asio::io_context& get_executor() { return _ioc; }

// destroy work guard, stop io_context, join thread_pool, and stop thread_pool
void stop();

private:
using ioc_work_t = boost::asio::executor_work_guard<boost::asio::io_context::executor_type>;

boost::asio::thread_pool _thread_pool;
boost::asio::io_context _ioc;
fc::optional<ioc_work_t> _ioc_work;
};


// async on thread_pool and return future
template<typename F>
auto async_thread_pool( boost::asio::thread_pool& thread_pool, F&& f ) {
auto async_thread_pool( boost::asio::io_context& thread_pool, F&& f ) {
auto task = std::make_shared<std::packaged_task<decltype( f() )()>>( std::forward<F>( f ) );
boost::asio::post( thread_pool, [task]() { (*task)(); } );
return task->get_future();
Expand Down
4 changes: 1 addition & 3 deletions libraries/chain/include/eosio/chain/transaction_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,11 @@ class transaction_metadata {

// must be called from main application thread
static signing_keys_future_type
start_recover_keys( const transaction_metadata_ptr& mtrx, boost::asio::thread_pool& thread_pool,
start_recover_keys( const transaction_metadata_ptr& mtrx, boost::asio::io_context& thread_pool,
const chain_id_type& chain_id, fc::microseconds time_limit );

// start_recover_keys must be called first
recovery_keys_type recover_keys( const chain_id_type& chain_id );


};

} } // eosio::chain
40 changes: 40 additions & 0 deletions libraries/chain/thread_utils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* @file
* @copyright defined in eos/LICENSE.txt
*/

#include <eosio/chain/thread_utils.hpp>
#include <fc/log/logger_config.hpp>

namespace eosio { namespace chain {


//
// named_thread_pool
//
named_thread_pool::named_thread_pool( std::string name_prefix, size_t num_threads )
: _thread_pool( num_threads )
{
_ioc_work.emplace( boost::asio::make_work_guard( _ioc ) );
for( size_t i = 0; i < num_threads; ++i ) {
boost::asio::post( _thread_pool, [&ioc = _ioc, name_prefix, i]() {
std::string tn = name_prefix + "-" + std::to_string( i );
fc::set_os_thread_name( tn );
ioc.run();
} );
}
}

named_thread_pool::~named_thread_pool() {
stop();
}

void named_thread_pool::stop() {
_ioc_work.reset();
_ioc.stop();
_thread_pool.join();
_thread_pool.stop();
}


} } // eosio::chain
2 changes: 1 addition & 1 deletion libraries/chain/transaction_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ recovery_keys_type transaction_metadata::recover_keys( const chain_id_type& chai
}

signing_keys_future_type transaction_metadata::start_recover_keys( const transaction_metadata_ptr& mtrx,
boost::asio::thread_pool& thread_pool,
boost::asio::io_context& thread_pool,
const chain_id_type& chain_id,
fc::microseconds time_limit )
{
Expand Down
2 changes: 1 addition & 1 deletion libraries/fc
9 changes: 8 additions & 1 deletion plugins/bnet_plugin/bnet_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include <eosio/chain_plugin/chain_plugin.hpp>

#include <fc/io/json.hpp>
#include <fc/log/logger_config.hpp>

#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
Expand Down Expand Up @@ -1398,7 +1399,13 @@ namespace eosio {

my->_socket_threads.reserve( my->_num_threads );
for( auto i = 0; i < my->_num_threads; ++i ) {
my->_socket_threads.emplace_back( [&ioc]{ wlog( "start thread" ); ioc.run(); wlog( "end thread" ); } );
my->_socket_threads.emplace_back( [&ioc, i]{
std::string tn = "bnet-" + std::to_string( i );
fc::set_os_thread_name( tn );
wlog( "start thread" );
ioc.run();
wlog( "end thread" );
} );
}

for( const auto& peer : my->_connect_to_peers ) {
Expand Down
2 changes: 0 additions & 2 deletions plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -770,8 +770,6 @@ void chain_plugin::plugin_shutdown() {
my->irreversible_block_connection.reset();
my->accepted_transaction_connection.reset();
my->applied_transaction_connection.reset();
my->chain->get_thread_pool().stop();
my->chain->get_thread_pool().join();
my->chain.reset();
}

Expand Down
32 changes: 10 additions & 22 deletions plugins/http_plugin/http_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <eosio/http_plugin/http_plugin.hpp>
#include <eosio/http_plugin/local_endpoint.hpp>
#include <eosio/chain/exceptions.hpp>
#include <eosio/chain/thread_utils.hpp>

#include <fc/network/ip.hpp>
#include <fc/log/logger_config.hpp>
Expand Down Expand Up @@ -123,7 +124,6 @@ namespace eosio {
using websocket_local_server_type = websocketpp::server<detail::asio_local_with_stub_log>;
using websocket_server_tls_type = websocketpp::server<detail::asio_with_stub_log<websocketpp::transport::asio::tls_socket::endpoint>>;
using ssl_context_ptr = websocketpp::lib::shared_ptr<websocketpp::lib::asio::ssl::context>;
using io_work_t = boost::asio::executor_work_guard<boost::asio::io_context::executor_type>;

static bool verbose_http_errors = false;

Expand All @@ -140,9 +140,7 @@ namespace eosio {
websocket_server_type server;

uint16_t thread_pool_size = 2;
optional<boost::asio::thread_pool> thread_pool;
std::shared_ptr<boost::asio::io_context> server_ioc;
optional<io_work_t> server_ioc_work;
optional<eosio::chain::named_thread_pool> thread_pool;
std::atomic<int64_t> bytes_in_flight{0};
size_t max_bytes_in_flight = 0;

Expand Down Expand Up @@ -301,12 +299,12 @@ namespace eosio {
con->defer_http_response();
bytes_in_flight += body.size();
app().post( appbase::priority::low,
[ioc = this->server_ioc, &bytes_in_flight = this->bytes_in_flight, handler_itr,
[&ioc = thread_pool->get_executor(), &bytes_in_flight = this->bytes_in_flight, handler_itr,
resource{std::move( resource )}, body{std::move( body )}, con]() {
try {
handler_itr->second( resource, body,
[ioc{std::move(ioc)}, &bytes_in_flight, con]( int code, fc::variant response_body ) {
boost::asio::post( *ioc, [ioc, response_body{std::move( response_body )}, &bytes_in_flight, con, code]() mutable {
[&ioc, &bytes_in_flight, con]( int code, fc::variant response_body ) {
boost::asio::post( ioc, [response_body{std::move( response_body )}, &bytes_in_flight, con, code]() mutable {
std::string json = fc::json::to_string( response_body );
response_body.clear();
const size_t json_size = json.size();
Expand Down Expand Up @@ -340,11 +338,11 @@ namespace eosio {
void create_server_for_endpoint(const tcp::endpoint& ep, websocketpp::server<detail::asio_with_stub_log<T>>& ws) {
try {
ws.clear_access_channels(websocketpp::log::alevel::all);
ws.init_asio(&(*server_ioc));
ws.init_asio( &thread_pool->get_executor() );
ws.set_reuse_addr(true);
ws.set_max_http_body_size(max_body_size);
// capture server_ioc shared_ptr in http handler to keep it alive while in use
ws.set_http_handler([&, ioc = this->server_ioc](connection_hdl hdl) {
ws.set_http_handler([&](connection_hdl hdl) {
handle_http_request<detail::asio_with_stub_log<T>>(ws.get_con_from_hdl(hdl));
});
} catch ( const fc::exception& e ){
Expand Down Expand Up @@ -518,12 +516,7 @@ namespace eosio {

void http_plugin::plugin_startup() {

my->thread_pool.emplace( my->thread_pool_size );
my->server_ioc = std::make_shared<boost::asio::io_context>();
my->server_ioc_work.emplace( boost::asio::make_work_guard(*my->server_ioc) );
for( uint16_t i = 0; i < my->thread_pool_size; ++i ) {
boost::asio::post( *my->thread_pool, [ioc = my->server_ioc]() { ioc->run(); } );
}
my->thread_pool.emplace( "http", my->thread_pool_size );

if(my->listen_endpoint) {
try {
Expand All @@ -547,10 +540,10 @@ namespace eosio {
if(my->unix_endpoint) {
try {
my->unix_server.clear_access_channels(websocketpp::log::alevel::all);
my->unix_server.init_asio(&(*my->server_ioc));
my->unix_server.init_asio( &my->thread_pool->get_executor() );
my->unix_server.set_max_http_body_size(my->max_body_size);
my->unix_server.listen(*my->unix_endpoint);
my->unix_server.set_http_handler([&, ioc = my->server_ioc](connection_hdl hdl) {
my->unix_server.set_http_handler([&, &ioc = my->thread_pool->get_executor()](connection_hdl hdl) {
my->handle_http_request<detail::asio_local_with_stub_log>( my->unix_server.get_con_from_hdl(hdl));
});
my->unix_server.start_accept();
Expand Down Expand Up @@ -610,12 +603,7 @@ namespace eosio {
if(my->unix_server.is_listening())
my->unix_server.stop_listening();

if( my->server_ioc_work )
my->server_ioc_work->reset();
if( my->server_ioc )
my->server_ioc->stop();
if( my->thread_pool ) {
my->thread_pool->join();
my->thread_pool->stop();
}
}
Expand Down
6 changes: 5 additions & 1 deletion plugins/mongo_db_plugin/mongo_db_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <eosio/chain/types.hpp>

#include <fc/io/json.hpp>
#include <fc/log/logger_config.hpp>
#include <fc/utf8.hpp>
#include <fc/variant.hpp>

Expand Down Expand Up @@ -1527,7 +1528,10 @@ void mongo_db_plugin_impl::init() {

ilog("starting db plugin thread");

consume_thread = std::thread([this] { consume_blocks(); });
consume_thread = std::thread( [this] {
fc::set_os_thread_name( "mongodb" );
consume_blocks();
} );

startup = false;
}
Expand Down
Loading