Skip to content

Commit

Permalink
GH-592 Use named_thread_pool instead of explicit thread, io_context, …
Browse files Browse the repository at this point in the history
…etc. Old versions of boost do not have an acceptor that takes a strand in constructor.
  • Loading branch information
heifner committed Jan 7, 2023
1 parent 66bdc38 commit b2baf8d
Showing 1 changed file with 13 additions and 22 deletions.
35 changes: 13 additions & 22 deletions plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
#include <eosio/state_history/serialization.hpp>
#include <eosio/state_history/trace_converter.hpp>
#include <eosio/state_history_plugin/state_history_plugin.hpp>
#include <eosio/chain/thread_utils.hpp>

#include <boost/asio/bind_executor.hpp>
#include <boost/asio/ip/host_name.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/local/stream_protocol.hpp>
#include <boost/asio/strand.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/signals2/connection.hpp>
Expand Down Expand Up @@ -87,11 +87,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
using acceptor_type = std::variant<std::unique_ptr<tcp::acceptor>, std::unique_ptr<unixs::acceptor>>;
std::set<acceptor_type> acceptors;

std::thread thr;
boost::asio::io_context ctx;
boost::asio::io_context::strand work_strand{ctx};
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard =
boost::asio::make_work_guard(ctx);
named_thread_pool thread_pool{"SHiP", 1}; // use of executor assumes only one thread

static void get_log_entry(state_history_log& log, uint32_t block_num, std::optional<bytes>& result) {
if (block_num < log.begin_block() || block_num >= log.end_block())
Expand Down Expand Up @@ -139,9 +135,9 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
template <typename SocketType>
struct session : session_base, std::enable_shared_from_this<session<SocketType>> {
std::shared_ptr<state_history_plugin_impl> plugin;
ws::stream<SocketType> socket_stream; // plugin strand only after creation
bool sending = false; // plugin strand only
std::vector<std::vector<char>> send_queue; // plugin strand only
ws::stream<SocketType> socket_stream; // ship thread only after creation
bool sending = false; // ship thread only
std::vector<std::vector<char>> send_queue; // ship thread only
bool need_to_send_update = false; // main thread only

uint32_t to_send_block_num = 0; // main thread only
Expand Down Expand Up @@ -216,7 +212,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl

template <typename T>
void send(T obj) {
boost::asio::post(this->plugin->work_strand, [self = this->shared_from_this(), obj = std::move(obj) ]() mutable {
boost::asio::post(this->plugin->thread_pool.get_executor(), [self = this->shared_from_this(), obj = std::move(obj) ]() mutable {
self->send_queue.emplace_back(fc::raw::pack(state_result{std::move(obj)}));
self->send();
});
Expand Down Expand Up @@ -366,7 +362,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
}
}

// called from plugin strand
// called from ship thread
template <typename F>
void callback(boost::system::error_code ec, const char* what, F f) {
if (this->plugin->stopping)
Expand Down Expand Up @@ -397,15 +393,15 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
}

void close() override {
boost::asio::post(plugin->work_strand, [p = std::weak_ptr(this->weak_from_this())]() {
boost::asio::post(plugin->thread_pool.get_executor(), [p = std::weak_ptr(this->weak_from_this())]() {
auto self = p.lock();
if (self) {
self->close_i();
}
});
}

// called from plugin strand
// called from ship thread
void close_i() {
boost::system::error_code ec;
socket_stream.next_layer().close(ec);
Expand All @@ -428,7 +424,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
FC_THROW_EXCEPTION(plugin_exception, "unable to open listen socket");
};

auto init_tcp_acceptor = [&]() { acceptors.insert(std::make_unique<tcp::acceptor>(work_strand)); };
auto init_tcp_acceptor = [&]() { acceptors.insert(std::make_unique<tcp::acceptor>(thread_pool.get_executor())); };
auto init_unix_acceptor = [&]() {
// take a sniff and see if anything is already listening at the given socket path, or if the socket path exists
// but nothing is listening
Expand All @@ -447,7 +443,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
ec = test_ec;
}
check_ec("open");
acceptors.insert(std::make_unique<unixs::acceptor>(work_strand));
acceptors.insert(std::make_unique<unixs::acceptor>(thread_pool.get_executor()));
};

// create and configure acceptors, can be both
Expand Down Expand Up @@ -483,7 +479,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl

template <typename Acceptor>
void do_accept(Acceptor& acceptor) {
auto socket = std::make_shared<typename Acceptor::protocol_type::socket>(this->ctx);
auto socket = std::make_shared<typename Acceptor::protocol_type::socket>(this->thread_pool.get_executor());
// &acceptor kept alive by self, reference into acceptors set
acceptor.async_accept(*socket, [self = shared_from_this(), socket, &acceptor](const boost::system::error_code& ec) {
if (self->stopping)
Expand Down Expand Up @@ -704,7 +700,6 @@ void state_history_plugin::plugin_startup() {
handle_sighup(); // setup logging

try {
my->thr = std::thread([ptr = my.get()] { ptr->ctx.run(); });
my->listen();
} catch (std::exception& ex) {
appbase::app().quit();
Expand All @@ -719,11 +714,7 @@ void state_history_plugin::plugin_shutdown() {
my->stopping = true;
my->trace_log->stop();
my->chain_state_log->stop();
if (my->thr.joinable()) {
my->work_guard.reset();
my->ctx.stop();
my->thr.join();
}
my->thread_pool.stop();
}

void state_history_plugin::handle_sighup() { fc::logger::update(logger_name, _log); }
Expand Down

0 comments on commit b2baf8d

Please sign in to comment.