From 87b40ea7d8e06722e8d0792615fb5b79b80aec54 Mon Sep 17 00:00:00 2001 From: Huang-Ming Huang Date: Wed, 4 Aug 2021 10:39:33 -0500 Subject: [PATCH] move all network operations another thread --- .../state_history_plugin.cpp | 567 ++++++++++-------- 1 file changed, 313 insertions(+), 254 deletions(-) diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index fa08bb9e7ef..39eb1a255ac 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -15,6 +15,9 @@ #include #include +#include +#include + using tcp = boost::asio::ip::tcp; using unixs = boost::asio::local::stream_protocol; namespace ws = boost::beast::websocket; @@ -48,7 +51,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this trace_log; std::optional chain_state_log; - bool stopping = false; + std::atomic stopping = false; std::optional applied_transaction_connection; std::optional block_start_connection; std::optional accepted_block_connection; @@ -58,6 +61,12 @@ struct state_history_plugin_impl : std::enable_shared_from_this unix_acceptor; + std::thread thr; + boost::asio::io_context ctx; + boost::asio::io_context::strand work_strand{ctx}; + boost::asio::executor_work_guard work_guard = + boost::asio::make_work_guard(ctx); + std::optional get_block_id(uint32_t block_num) { std::optional result; @@ -78,90 +87,35 @@ struct state_history_plugin_impl : std::enable_shared_from_this; + struct session_base { - virtual void send_update(const block_state_ptr& block_state) = 0; - virtual void close() = 0; - virtual ~session_base() = default; + virtual void send_update(const block_state_ptr& block_state, ::std::optional<::fc::zipkin_span>&& span) = 0; + virtual void close() = 0; + virtual ~session_base() = default; - std::optional current_request; + std::optional current_request; }; template struct session : session_base { - std::shared_ptr plugin; - bool sending = false; - bool sent_abi = false; - std::vector> send_queue; + state_history_plugin_impl* plugin; bool need_to_send_update = false; - session(std::shared_ptr plugin) - : plugin(std::move(plugin)) {} - - SessionType& derived_session() { - return static_cast(*this); - } - - void start() { - fc_ilog(_log, "incoming connection"); - derived_session().socket_stream->binary(true); - derived_session().socket_stream->next_layer().set_option(boost::asio::socket_base::send_buffer_size(1024 * 1024)); - derived_session().socket_stream->next_layer().set_option(boost::asio::socket_base::receive_buffer_size(1024 * 1024)); - derived_session().socket_stream->async_accept([self = derived_session().shared_from_this()](boost::system::error_code ec) { - self->callback(ec, "async_accept", [self] { - self->start_read(); - self->send(state_history_plugin_abi); - }); - }); - } - - void start_read() { - auto in_buffer = std::make_shared(); - derived_session().socket_stream->async_read( - *in_buffer, [self = derived_session().shared_from_this(), in_buffer](boost::system::error_code ec, size_t) { - self->callback(ec, "async_read", [self, in_buffer] { - auto d = boost::asio::buffer_cast(boost::beast::buffers_front(in_buffer->data())); - auto s = boost::asio::buffer_size(in_buffer->data()); - fc::datastream ds(d, s); - state_request req; - fc::raw::unpack(ds, req); - std::visit(*self, req); - self->start_read(); - }); - }); - } + uint32_t to_send_block_num = 0; + std::optional::const_iterator> position_it; - void send(const char* s) { - send_queue.push_back({s, s + strlen(s)}); - send(); - } + session(state_history_plugin_impl* plugin) + : plugin(std::move(plugin)) {} - template - void send(T obj) { - send_queue.push_back(fc::raw::pack(state_result{std::move(obj)})); - send(); + ~session() { } - void send() { - if (sending) - return; - if (send_queue.empty()) - return send_update(); - sending = true; - derived_session().socket_stream->binary(sent_abi); - sent_abi = true; - derived_session().socket_stream->async_write( // - boost::asio::buffer(send_queue[0]), - [self = derived_session().shared_from_this()](boost::system::error_code ec, size_t) { - self->callback(ec, "async_write", [self] { - self->send_queue.erase(self->send_queue.begin()); - self->sending = false; - self->send(); - }); - }); + SessionType& derived_session() { + return static_cast(*this); } - - using result_type = void; - void operator()(get_status_request_v0&) { + + void operator()(get_status_request_v0&&) { + auto request_span = fc_create_trace("get_status_request"); fc_ilog(_log, "got get_status_request_v0"); auto& chain = plugin->chain_plug->chain(); get_status_result_v0 result; @@ -176,44 +130,49 @@ struct state_history_plugin_impl : std::enable_shared_from_thischain_state_log->begin_end_block_nums(); } fc_ilog(_log, "pushing get_status_result_v0 to send queue"); - send(std::move(result)); + derived_session().send(std::move(result), fc_get_token(request_span)); } template std::enable_if_t> - operator()(T& req) { + operator()(T&& req) { fc_ilog(_log, "received get_blocks_request = ${req}", ("req",req) ); + auto request_span = fc_create_trace("get_blocks_request"); + to_send_block_num = req.start_block_num; for (auto& cp : req.have_positions) { if (req.start_block_num <= cp.block_num) continue; auto id = plugin->get_block_id(cp.block_num); - if (!id || *id != cp.block_id) - req.start_block_num = std::min(req.start_block_num, cp.block_num); - if (!id) { + to_send_block_num = std::min(to_send_block_num, cp.block_num); fc_dlog(_log, "block ${block_num} is not available", ("block_num", cp.block_num)); } else if (*id != cp.block_id) { + to_send_block_num = std::min(to_send_block_num, cp.block_num); fc_dlog(_log, "the id for block ${block_num} in block request have_positions does not match the existing", ("block_num", cp.block_num)); - } + } } - req.have_positions.clear(); - fc_dlog(_log, " get_blocks_request start_block_num set to ${num}", ("num", req.start_block_num)); - - current_request = req; - send_update(true); + fc_dlog(_log, " get_blocks_request start_block_num set to ${num}", ("num", to_send_block_num)); + + if (req.have_positions.size()) { + position_it = req.have_positions.begin(); + } + + current_request = std::move(req); + send_update(std::move(request_span), true); } - void operator()(get_blocks_ack_request_v0& ack_req) { + void operator()(get_blocks_ack_request_v0&& ack_req) { fc_ilog(_log, "received get_blocks_ack_request_v0 = ${req}", ("req",ack_req)); if (!current_request.has_value()) { fc_dlog(_log, " no current get_blocks_request_v0, discarding the get_blocks_ack_request_v0"); return; } + auto request_span = fc_create_trace("get_blocks_ack_request"); std::visit([num_messages = ack_req.num_messages](auto& req) { req.max_messages_in_flight += num_messages; }, *current_request); - send_update(); + send_update(std::move(request_span)); } void set_result_block_header(get_blocks_result_v1&, const signed_block_ptr& block) {} @@ -232,9 +191,9 @@ struct state_history_plugin_impl : std::enable_shared_from_this std::enable_if_t || std::is_same_v> - send_update(const block_state_ptr& head_block_state, T&& result) { + send_update(const block_state_ptr& head_block_state, T&& result, ::std::optional<::fc::zipkin_span>&& span) { need_to_send_update = true; - if (!send_queue.empty() || !max_messages_in_flight() ) + if (!max_messages_in_flight() ) return; get_blocks_request_v0& block_req = std::visit([](auto& x) ->get_blocks_request_v0&{ return x; }, *current_request); @@ -242,97 +201,193 @@ struct state_history_plugin_impl : std::enable_shared_from_thisget_block_id(block_num); - - auto get_block = [&chain, block_num, head_block_state]() -> signed_block_ptr { - try { - if (head_block_state->block_num == block_num) - return head_block_state->block; - return chain.fetch_block_by_number(block_num); - } catch (...) { - return {}; - } - }; - - if (block_id) { - result.this_block = block_position{block_num, *block_id}; - auto prev_block_id = plugin->get_block_id(block_num - 1); - if (prev_block_id) - result.prev_block = block_position{block_num - 1, *prev_block_id}; - if (block_req.fetch_block) { + + if (to_send_block_num > current || to_send_block_num >= block_req.end_block_num) + return; + + auto block_id = plugin->get_block_id(to_send_block_num); + + if (block_id && position_it && (*position_it)->block_num == to_send_block_num) { + // This branch happens when the head block of nodeos is behind the head block of connecting client. + // In addition, the client told us the corresponding block id for block_num we are going to send. + // We can send the block when the block_id is different. + auto& itr = *position_it; + auto block_id_seen_by_client = itr->block_id; + ++itr; + if (itr == block_req.have_positions.end()) + position_it.reset(); + + if(block_id_seen_by_client == *block_id) { + ++to_send_block_num; + return; + } + } + + auto send_update_span = fc_create_span(span, "ship-send-update"); + fc_add_tag( send_update_span, "head_block_num", result.head.block_num ); + fc_add_tag(send_update_span, "block_num", to_send_block_num); + + auto get_block = [&chain, block_num = to_send_block_num, head_block_state]() -> signed_block_ptr { + try { + if (head_block_state->block_num == block_num) + return head_block_state->block; + return chain.fetch_block_by_number(block_num); + } catch (...) { + return {}; + } + }; + + if (block_id) { + result.this_block = block_position{to_send_block_num, *block_id}; + auto prev_block_id = plugin->get_block_id(to_send_block_num - 1); + if (prev_block_id) + result.prev_block = block_position{to_send_block_num - 1, *prev_block_id}; + if (block_req.fetch_block) { result.block = signed_block_ptr_variant{get_block()}; - } - if (block_req.fetch_traces && plugin->trace_log) { - result.traces = plugin->trace_log->get_log_entry(block_num); - } - if (block_req.fetch_deltas && plugin->chain_state_log) { - result.deltas = plugin->chain_state_log->get_log_entry(block_num); - } - set_result_block_header(result, get_block()); } - ++block_num; + if (block_req.fetch_traces && plugin->trace_log) { + result.traces = plugin->trace_log->get_log_entry(to_send_block_num); + fc_add_tag(send_update_span, "traces_size", result.traces.data_size()); + } + if (block_req.fetch_deltas && plugin->chain_state_log) { + result.deltas = plugin->chain_state_log->get_log_entry(to_send_block_num); + fc_add_tag(send_update_span, "deltas_size", result.deltas.data_size()); + } + set_result_block_header(result, get_block()); } - if (!result.has_value()) - return; - fc_ilog(_log, - "pushing result " - "{\"head\":{\"block_num\":${head}},\"last_irreversible\":{\"block_num\":${last_irr}},\"this_block\":{" - "\"block_num\":${this_block}}} to send queue", - ("head", result.head.block_num)("last_irr", result.last_irreversible.block_num)( - "this_block", result.this_block ? result.this_block->block_num : fc::variant())); - send(std::move(result)); + ++to_send_block_num; + + fc_ilog(_log, "pushing result {\"head\":{\"block_num\":${head}},\"last_irreversible\":{\"block_num\":${last_irr}},\"this_block\":{\"block_num\":${this_block}, \"id\": ${id}}} to send queue", + ("head", result.head.block_num)("last_irr", result.last_irreversible.block_num) + ("this_block", result.this_block ? result.this_block->block_num : fc::variant()) + ("id", block_id ? block_id->_hash[3] : 0 )); + + derived_session().send(std::move(result), fc_get_token(send_update_span)); --block_req.max_messages_in_flight; - need_to_send_update = block_req.start_block_num <= current && - block_req.start_block_num < block_req.end_block_num; - - std::visit( []( auto&& ptr ) { - if( ptr ) { - if (fc::zipkin_config::is_enabled()) { - auto id = ptr->calculate_id(); - auto blk_trace = fc_create_trace_with_id( "Block", id ); - auto blk_span = fc_create_span( blk_trace, "SHiP-Send" ); - fc_add_tag( blk_span, "block_id", id ); - fc_add_tag( blk_span, "block_num", ptr->block_num() ); - fc_add_tag( blk_span, "block_time", ptr->timestamp.to_time_point() ); - } - } - }, result.block ); + need_to_send_update = to_send_block_num <= current && + to_send_block_num < block_req.end_block_num; + } - void send_update_for_block(const block_state_ptr& head_block_state) { + void send_update(const block_state_ptr& head_block_state, std::optional<::fc::zipkin_span>&& span) override { + if (!current_request) + return; std::visit( - [&head_block_state, this](const auto& req) { - // send get_blocks_result_v1 when the request is get_blocks_request_v0 and - // send send_block_result_v2 when the request is get_blocks_request_v1. - if (head_block_state->block) { + [&head_block_state, this, span = std::move(span)](const auto& req) mutable { + (void)this; // avoid unused lambda warning + // send get_blocks_result_v1 when the request is get_blocks_request_v0 and + // send send_block_result_v2 when the request is get_blocks_request_v1. + if (head_block_state->block) { typename std::decay_t::response_type result; - result.head = { head_block_state->block_num, head_block_state->id }; - send_update(head_block_state, std::move(result)); - } - }, - *current_request); + result.head = {head_block_state->block_num, head_block_state->id}; + if (::fc::zipkin_config::is_enabled() && !span) { + span.emplace("send-update-0", fc::zipkin_span::to_id(head_block_state->id), + "ship"_n.to_uint64_t()); + } + send_update(head_block_state, std::move(result), std::move(span)); + } + }, + *current_request); } - void send_update(const block_state_ptr& block_state) override { - need_to_send_update = true; - if (!send_queue.empty() || !max_messages_in_flight()) - return; + void send_update(::std::optional<::fc::zipkin_span>&& span, bool changed = false) { + if (changed || need_to_send_update) { + auto& chain = plugin->chain_plug->chain(); + send_update(chain.head_block_state(), std::move(span)); + } + } + }; - send_update_for_block(block_state); + template + class ws_session : public session>, + public std::enable_shared_from_this> { + public: + using socket_type = SocketType; + ws_session(state_history_plugin_impl* plugin, SocketType socket) + : session>(plugin), socket_stream(std::move(socket)) { + } + void start() { + fc_ilog(_log, "incoming connection"); + if constexpr (std::is_same_v) { + socket_stream.next_layer().set_option(boost::asio::ip::tcp::no_delay(true)); + } + socket_stream.auto_fragment(false); + socket_stream.binary(true); + socket_stream.next_layer().set_option(boost::asio::socket_base::send_buffer_size(1024 * 1024)); + socket_stream.next_layer().set_option(boost::asio::socket_base::receive_buffer_size(1024 * 1024)); + socket_stream.async_accept([self = this->shared_from_this()](boost::system::error_code ec) { + self->callback(ec, "async_accept", [self] { + self->socket_stream.binary(false); + self->socket_stream.async_write( + boost::asio::buffer(state_history_plugin_abi, strlen(state_history_plugin_abi)), + [self](boost::system::error_code ec, size_t) { + self->callback(ec, "async_write", [self] { + self->socket_stream.binary(true); + self->start_read(); + }); + }); + }); + }); + } + + template + void send(T obj, fc::zipkin_span::token token) { + boost::asio::post(this->plugin->work_strand, [this, obj = std::move(obj), token ]() { + send_queue.emplace_back(fc::raw::pack(state_result{std::move(obj)}), token); + send(); + }); } - void send_update(bool changed = false) { - if (changed) - need_to_send_update = true; - if (!send_queue.empty() || !need_to_send_update || - !max_messages_in_flight()) + void close() override { + boost::asio::post(this->plugin->work_strand, [p = std::weak_ptr(this->weak_from_this())]() { + auto self = p.lock(); + if (self) { + self->close_i(); + } + }); + } + + private: + // all private member functions are called from internal thread + void start_read() { + auto in_buffer = std::make_shared(); + socket_stream.async_read( + *in_buffer, [self = this->shared_from_this(), in_buffer](boost::system::error_code ec, size_t) { + self->callback(ec, "async_read", [self, in_buffer] { + auto d = boost::asio::buffer_cast(boost::beast::buffers_front(in_buffer->data())); + auto s = boost::asio::buffer_size(in_buffer->data()); + fc::datastream ds(d, s); + state_request req; + fc::raw::unpack(ds, req); + app().post(priority::medium, + [self, req = std::move(req)]() mutable { std::visit(*self, std::move(req)); }); + self->start_read(); + }); + }); + } + + void send() { + if (sending) return; - auto& chain = plugin->chain_plug->chain(); - send_update_for_block(chain.head_block_state()); + if (send_queue.empty()) { + app().post(priority::medium, [self = this->shared_from_this()]() { + self->send_update(::std::optional<::fc::zipkin_span>{}); + }); + return; + } + sending = true; + + auto send_span = fc_create_span_from_token(send_queue[0].second, "send"); + fc_add_tag(send_span, "buffer_size", send_queue[0].first.size()); + socket_stream.async_write( // + boost::asio::buffer(send_queue[0].first), + [self = this->shared_from_this(), send_span = std::move(send_span)](boost::system::error_code ec, + size_t) mutable { + send_span.reset(); + self->send_queue.erase(self->send_queue.begin()); + self->sending = false; + self->callback(ec, "async_write", [self] { self->send(); }); + }); } template @@ -341,73 +396,74 @@ struct state_history_plugin_impl : std::enable_shared_from_this void callback(boost::system::error_code ec, const char* what, F f) { - app().post( priority::medium, [=]() { - if( plugin->stopping ) - return; - if( ec ) - return on_fail( ec, what ); - catch_and_close( f ); - } ); - } - - void on_fail(boost::system::error_code ec, const char* what) { - try { - fc_elog(_log,"${w}: ${m}", ("w", what)("m", ec.message())); - close(); - } catch (...) { - fc_elog(_log,"uncaught exception on close"); + if (this->plugin->stopping) + return; + if (ec) { + fc_elog(_log, "${w}: ${m}", ("w", what)("m", ec.message())); + close_i(); + return; } + catch_and_close(f); } - void close() override { - derived_session().socket_stream->next_layer().close(); - plugin->sessions.erase(this); + void close_i() { + boost::system::error_code ec; + socket_stream.next_layer().close(ec); + if (ec) fc_elog(_log, "close: ${m}", ("m", ec.message())); + this->plugin->sessions.remove(this->shared_from_this()); } - }; - - struct tcp_session : session, std::enable_shared_from_this { - tcp_session(std::shared_ptr plugin) : session(plugin) {} - void start(tcp::socket socket) { - socket_stream = std::make_unique>(std::move(socket)); - socket_stream->next_layer().set_option(boost::asio::ip::tcp::no_delay(true)); - session::start(); - } - - std::unique_ptr> socket_stream; + ws::stream socket_stream; + using send_queue_t = std::vector, fc::zipkin_span::token>>; + send_queue_t send_queue; + bool sending = false; }; - struct unix_session : session, std::enable_shared_from_this { - unix_session(std::shared_ptr plugin) : session(plugin) {} - - void start(unixs::socket socket) { - socket_stream = std::make_unique>(std::move(socket)); - session::start(); + class session_manager_t { + std::mutex mx; + boost::container::flat_set> session_set; + + public: + template + void add(state_history_plugin_impl* plugin, std::shared_ptr socket) { + auto s = std::make_shared>(plugin, std::move(*socket)); + s->start(); + std::lock_guard lock(mx); + session_set.insert(std::move(s)); } - std::unique_ptr> socket_stream; - }; + void remove(std::shared_ptr s) { + std::lock_guard lock(mx); + session_set.erase(s); + } - std::map> sessions; + template + void for_each(F&& f) { + std::lock_guard lock(mx); + for (auto& s : session_set) { + f(s); + } + } + } sessions; void listen() { boost::system::error_code ec; auto address = boost::asio::ip::make_address(endpoint_address); auto endpoint = tcp::endpoint{address, endpoint_port}; - acceptor = std::make_unique(app().get_io_service()); + acceptor = std::make_unique(this->ctx); auto check_ec = [&](const char* what) { if (!ec) @@ -455,7 +511,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this(app().get_io_service()); + unix_acceptor = std::make_unique(this->ctx); unix_acceptor->open(unixs::acceptor::protocol_type(), ec); check_ec("open"); unix_acceptor->bind(unix_path.c_str(), ec); @@ -467,8 +523,8 @@ struct state_history_plugin_impl : std::enable_shared_from_this void do_accept(Acceptor& acceptor) { - auto socket = std::make_shared(app().get_io_service()); - acceptor.async_accept(*socket, [self = shared_from_this(), socket, &acceptor, this](const boost::system::error_code& ec) { + auto socket = std::make_shared(this->ctx); + acceptor.async_accept(*socket, [this, socket, &acceptor](const boost::system::error_code& ec) { if (stopping) return; if (ec) { @@ -477,15 +533,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this) { - auto s = std::make_shared(self); - sessions[s.get()] = s; - s->start(std::move(*socket)); - } else if constexpr (std::is_same_v) { - auto s = std::make_shared(self); - sessions[s.get()] = s; - s->start(std::move(*socket)); - } + sessions.add(this, socket); }); catch_and_log([&] { do_accept(acceptor); }); }); @@ -496,12 +544,16 @@ struct state_history_plugin_impl : std::enable_shared_from_thisadd_transaction(p, t); } - void store(const block_state_ptr& block_state) { + void store(const block_state_ptr& block_state, const ::std::optional<::fc::zipkin_span>& ship_accept_span) { try { - if (trace_log) + if (trace_log) { + auto trace_log_span = fc_create_span(ship_accept_span, "store_trace_log"); trace_log->store(chain_plug->chain().db(), block_state); - if (chain_state_log) + } + if (chain_state_log) { + auto delta_log_span = fc_create_span(ship_accept_span, "store_delta_log"); chain_state_log->store(chain_plug->chain().kv_db(), block_state); + } return; } FC_LOG_AND_DROP() @@ -517,25 +569,17 @@ struct state_history_plugin_impl : std::enable_shared_from_thisid); - auto blk_span = fc_create_span(blk_trace, "SHiP-Accepted"); - fc_add_tag(blk_span, "block_id", block_state->id); - fc_add_tag(blk_span, "block_num", block_state->block_num); - fc_add_tag(blk_span, "block_time", block_state->block->timestamp.to_time_point()); - this->store(block_state); - for (auto& s : sessions) { - auto& p = s.second; - if (p) { - if (p->current_request) { - uint32_t& req_start_block_num = - std::visit([](auto& req) -> uint32_t& { return req.start_block_num; }, *p->current_request); - if (block_state->block_num < req_start_block_num) { - req_start_block_num = block_state->block_num; - } - } - p->send_update(block_state); - } - } + auto ship_accept_span = fc_create_span_with_id("SHiP-Accepted", "ship"_n.to_uint64_t() , block_state->id); + + fc_add_tag(ship_accept_span, "block_id", block_state->id); + fc_add_tag(ship_accept_span, "block_num", block_state->block_num); + fc_add_tag(ship_accept_span, "block_time", block_state->block->timestamp.to_time_point()); + + this->store(block_state, ship_accept_span); + + sessions.for_each([&block_state, &ship_accept_span](auto& s) { + s->send_update(block_state, fc_create_span(ship_accept_span, "send_update")); + }); } void on_block_start(uint32_t block_num) { @@ -610,7 +654,7 @@ void state_history_plugin::plugin_initialize(const variables_map& options) { auto dir_option = options.at("state-history-dir").as(); - eosio::state_history_config config; + static eosio::state_history_config config; config.logger = &_log; if (dir_option.is_relative()) @@ -670,21 +714,36 @@ void state_history_plugin::plugin_initialize(const variables_map& options) { void state_history_plugin::plugin_startup() { handle_sighup(); // setup logging - if (my->endpoint_address.size()) - my->listen(); - if (my->unix_path.size()) - my->unix_listen(); + + try { + my->thr = std::thread([ptr = my.get()]{ ptr->ctx.run(); }); + + if (my->endpoint_address.size()) + my->listen(); + if (my->unix_path.size()) + my->unix_listen(); + } + catch(std::exception& ex) { + appbase::app().quit(); + } } void state_history_plugin::plugin_shutdown() { - my->applied_transaction_connection.reset(); - my->accepted_block_connection.reset(); - my->block_start_connection.reset(); - while (!my->sessions.empty()) - my->sessions.begin()->second->close(); - my->stopping = true; - my->trace_log->stop(); - my->chain_state_log->stop(); + try { + my->applied_transaction_connection.reset(); + my->accepted_block_connection.reset(); + my->block_start_connection.reset(); + my->sessions.for_each([](auto& s) { s->close(); }); + my->stopping = true; + my->trace_log->stop(); + my->chain_state_log->stop(); + if (my->thr.joinable()) { + my->work_guard.reset(); + my->ctx.stop(); + my->thr.join(); + } + } + FC_CAPTURE_LOG_AND_RETHROW(("")) } void state_history_plugin::handle_sighup() {