From b2baf8da7011bd2f8dd8607827b1d502cb982fa4 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Sat, 7 Jan 2023 16:55:24 -0600 Subject: [PATCH] GH-592 Use named_thread_pool instead of explicit thread, io_context, etc. Old versions of boost do not have an acceptor that takes a strand in constructor. --- .../state_history_plugin.cpp | 35 +++++++------------ 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index 19969ce948..ee62ec5e13 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -6,12 +6,12 @@ #include #include #include +#include #include #include #include #include -#include #include #include #include @@ -87,11 +87,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this, std::unique_ptr>; std::set acceptors; - std::thread thr; - boost::asio::io_context ctx; - boost::asio::io_context::strand work_strand{ctx}; - boost::asio::executor_work_guard work_guard = - boost::asio::make_work_guard(ctx); + named_thread_pool thread_pool{"SHiP", 1}; // use of executor assumes only one thread static void get_log_entry(state_history_log& log, uint32_t block_num, std::optional& result) { if (block_num < log.begin_block() || block_num >= log.end_block()) @@ -139,9 +135,9 @@ struct state_history_plugin_impl : std::enable_shared_from_this struct session : session_base, std::enable_shared_from_this> { std::shared_ptr plugin; - ws::stream socket_stream; // plugin strand only after creation - bool sending = false; // plugin strand only - std::vector> send_queue; // plugin strand only + ws::stream socket_stream; // ship thread only after creation + bool sending = false; // ship thread only + std::vector> send_queue; // ship thread only bool need_to_send_update = false; // main thread only uint32_t to_send_block_num = 0; // main thread only @@ -216,7 +212,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this void send(T obj) { - boost::asio::post(this->plugin->work_strand, [self = this->shared_from_this(), obj = std::move(obj) ]() mutable { + boost::asio::post(this->plugin->thread_pool.get_executor(), [self = this->shared_from_this(), obj = std::move(obj) ]() mutable { self->send_queue.emplace_back(fc::raw::pack(state_result{std::move(obj)})); self->send(); }); @@ -366,7 +362,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this void callback(boost::system::error_code ec, const char* what, F f) { if (this->plugin->stopping) @@ -397,7 +393,7 @@ struct state_history_plugin_impl : std::enable_shared_from_thiswork_strand, [p = std::weak_ptr(this->weak_from_this())]() { + boost::asio::post(plugin->thread_pool.get_executor(), [p = std::weak_ptr(this->weak_from_this())]() { auto self = p.lock(); if (self) { self->close_i(); @@ -405,7 +401,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this(work_strand)); }; + auto init_tcp_acceptor = [&]() { acceptors.insert(std::make_unique(thread_pool.get_executor())); }; auto init_unix_acceptor = [&]() { // take a sniff and see if anything is already listening at the given socket path, or if the socket path exists // but nothing is listening @@ -447,7 +443,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this(work_strand)); + acceptors.insert(std::make_unique(thread_pool.get_executor())); }; // create and configure acceptors, can be both @@ -483,7 +479,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this void do_accept(Acceptor& acceptor) { - auto socket = std::make_shared(this->ctx); + auto socket = std::make_shared(this->thread_pool.get_executor()); // &acceptor kept alive by self, reference into acceptors set acceptor.async_accept(*socket, [self = shared_from_this(), socket, &acceptor](const boost::system::error_code& ec) { if (self->stopping) @@ -704,7 +700,6 @@ void state_history_plugin::plugin_startup() { handle_sighup(); // setup logging try { - my->thr = std::thread([ptr = my.get()] { ptr->ctx.run(); }); my->listen(); } catch (std::exception& ex) { appbase::app().quit(); @@ -719,11 +714,7 @@ void state_history_plugin::plugin_shutdown() { my->stopping = true; my->trace_log->stop(); my->chain_state_log->stop(); - if (my->thr.joinable()) { - my->work_guard.reset(); - my->ctx.stop(); - my->thr.join(); - } + my->thread_pool.stop(); } void state_history_plugin::handle_sighup() { fc::logger::update(logger_name, _log); }