From 8bb6a990438f87ba8d3e175a88fbd6bd6cf07055 Mon Sep 17 00:00:00 2001 From: Huang-Ming Huang Date: Tue, 13 Jul 2021 16:36:38 -0500 Subject: [PATCH] add threaded-send option --- .../state_history_plugin.cpp | 398 ++++++++++-------- 1 file changed, 227 insertions(+), 171 deletions(-) diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index c4a98135d7e..23c5328add1 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -16,6 +16,7 @@ #include #include +#include using tcp = boost::asio::ip::tcp; using unixs = boost::asio::local::stream_protocol; @@ -82,11 +83,12 @@ struct state_history_plugin_impl : std::enable_shared_from_this; struct session_base { - virtual void send_update(const block_state_ptr& block_state) = 0; + virtual void send_update(const block_state_ptr& block_state, ::std::optional<::fc::zipkin_span>&& span) = 0; virtual void close() = 0; virtual ~session_base() = default; std::optional current_request; + }; template @@ -95,12 +97,61 @@ struct state_history_plugin_impl : std::enable_shared_from_this plugin; bool sending = false; bool sent_abi = false; - std::vector> send_queue; bool need_to_send_update = false; + std::thread thr; + std::atomic send_thread_has_exception = false; + std::exception_ptr eptr; + boost::asio::io_context ctx; + boost::asio::io_context::strand work_strand{ctx}; + boost::asio::executor_work_guard work_guard = + boost::asio::make_work_guard(ctx); + + uint32_t to_send_block_num = 0; + std::optional::const_iterator> position_it; + session(std::shared_ptr plugin) : plugin(std::move(plugin)) {} + void send_thread_fun() { + fc_ilog(_log, "send thread started"); + try { + ctx.run(); + } + catch(std::system_error& ex) { + if (ex.code() != std::errc::operation_canceled) { + fc_elog(_log, "exception thrown from send thread"); + eptr = std::current_exception(); + send_thread_has_exception = true; + } + } + catch (...) { + fc_elog(_log, "exception thrown from send thread"); + eptr = std::current_exception(); + send_thread_has_exception = true; + } + + // avoid ~session() to be executed in this thread. + app().post( priority::high, [self = this->derived_session().shared_from_this()]() { + self->catch_and_close( [self]{ + if (self->send_thread_has_exception) + std::rethrow_exception(self->eptr); + }); + } ); + } + + ~session() { + try { + if (thr.joinable()) { + work_guard.reset(); + thr.join(); + } + } + catch (std::exception& ex) { + fc_wlog(_log, "exception from join: ${what}", ("what", ex.what())); + } + } + SessionType& derived_session() { return static_cast(*this); } @@ -110,12 +161,23 @@ struct state_history_plugin_impl : std::enable_shared_from_thisbinary(true); derived_session().socket_stream->next_layer().set_option(boost::asio::socket_base::send_buffer_size(1024 * 1024)); derived_session().socket_stream->next_layer().set_option(boost::asio::socket_base::receive_buffer_size(1024 * 1024)); - derived_session().socket_stream->async_accept([self = derived_session().shared_from_this()](boost::system::error_code ec) { - self->callback(ec, "async_accept", [self] { - self->start_read(); - self->send(state_history_plugin_abi); + derived_session().socket_stream->async_accept( + [self = derived_session().shared_from_this()](boost::system::error_code ec) { + self->callback(ec, "async_accept", [self] { + self->socket_stream->binary(false); + self->socket_stream->async_write( + boost::asio::buffer(state_history_plugin_abi, strlen(state_history_plugin_abi)), + [self](boost::system::error_code ec, size_t) { + if (ec) { + self->on_fail(ec, "async_write"); + return; + } + self->socket_stream->binary(true); + self->start_read(); + self->thr = std::thread([self] { self->send_thread_fun(); }); + }); + }); }); - }); } void start_read() { @@ -128,44 +190,28 @@ struct state_history_plugin_impl : std::enable_shared_from_this ds(d, s); state_request req; fc::raw::unpack(ds, req); - std::visit(*self, req); + std::visit(*self, std::move(req)); self->start_read(); }); }); } - void send(const char* s) { - send_queue.push_back({s, s + strlen(s)}); - send(); - } - template - void send(T obj) { - send_queue.push_back(fc::raw::pack(state_result{std::move(obj)})); - send(); - } - - void send() { - if (sending) - return; - if (send_queue.empty()) - return send_update(); - sending = true; - derived_session().socket_stream->binary(sent_abi); - sent_abi = true; - derived_session().socket_stream->async_write( // - boost::asio::buffer(send_queue[0]), - [self = derived_session().shared_from_this()](boost::system::error_code ec, size_t) { - self->callback(ec, "async_write", [self] { - self->send_queue.erase(self->send_queue.begin()); - self->sending = false; - self->send(); - }); - }); + void send(T obj, ::std::optional<::fc::zipkin_span>&& span) { + if (!send_thread_has_exception) { + boost::asio::post(work_strand, [this, data = fc::raw::pack(state_result{std::move(obj)}), token = fc_get_token(span)]() { + auto span = fc_create_span_from_token(token, "send"); + fc_add_tag(span, "buffer_size", data.size()); + this->derived_session().socket_stream->write(boost::asio::buffer(data)); + }); + callback(boost::system::error_code{}, "", [self = derived_session().shared_from_this()] { + self->send_update(::std::optional<::fc::zipkin_span>{}); + }); + } } - using result_type = void; - void operator()(get_status_request_v0&) { + void operator()(get_status_request_v0&&) { + auto request_span = fc_create_trace("get_status_request"); fc_ilog(_log, "got get_status_request_v0"); auto& chain = plugin->chain_plug->chain(); get_status_result_v0 result; @@ -181,44 +227,50 @@ struct state_history_plugin_impl : std::enable_shared_from_thischain_state_log->end_block(); } fc_ilog(_log, "pushing get_status_result_v0 to send queue"); - send(std::move(result)); + send(std::move(result), std::move(request_span)); } template std::enable_if_t> - operator()(T& req) { + operator()(T&& req) { fc_ilog(_log, "received get_blocks_request = ${req}", ("req",req) ); + auto request_span = fc_create_trace("get_blocks_request"); + to_send_block_num = req.start_block_num; for (auto& cp : req.have_positions) { if (req.start_block_num <= cp.block_num) continue; auto id = plugin->get_block_id(cp.block_num); - if (!id || *id != cp.block_id) - req.start_block_num = std::min(req.start_block_num, cp.block_num); - if (!id) { + to_send_block_num = std::min(to_send_block_num, cp.block_num); fc_dlog(_log, "block ${block_num} is not available", ("block_num", cp.block_num)); } else if (*id != cp.block_id) { + to_send_block_num = std::min(to_send_block_num, cp.block_num); fc_dlog(_log, "the id for block ${block_num} in block request have_positions does not match the existing", ("block_num", cp.block_num)); - } + } } - req.have_positions.clear(); - fc_dlog(_log, " get_blocks_request start_block_num set to ${num}", ("num", req.start_block_num)); + + fc_dlog(_log, " get_blocks_request start_block_num set to ${num}", ("num", to_send_block_num)); - current_request = req; + if (req.have_positions.size()) { + position_it = req.have_positions.begin(); + } + + current_request = std::move(req); - send_update(true); + send_update(std::move(request_span), true); } - void operator()(get_blocks_ack_request_v0& ack_req) { + void operator()(get_blocks_ack_request_v0&& ack_req) { fc_ilog(_log, "received get_blocks_ack_request_v0 = ${req}", ("req",ack_req)); if (!current_request.has_value()) { fc_dlog(_log, " no current get_blocks_request_v0, discarding the get_blocks_ack_request_v0"); return; } + auto request_span = fc_create_trace("get_blocks_ack_request"); std::visit([num_messages = ack_req.num_messages](auto& req) { req.max_messages_in_flight += num_messages; }, *current_request); - send_update(); + send_update(std::move(request_span)); } void set_result_block_header(get_blocks_result_v1&, const signed_block_ptr& block) {} @@ -237,9 +289,9 @@ struct state_history_plugin_impl : std::enable_shared_from_this std::enable_if_t || std::is_same_v> - send_update(const block_state_ptr& head_block_state, T&& result) { + send_update(const block_state_ptr& head_block_state, T&& result, ::std::optional<::fc::zipkin_span>&& span) { need_to_send_update = true; - if (!send_queue.empty() || !max_messages_in_flight() ) + if (!max_messages_in_flight() ) return; get_blocks_request_v0& block_req = std::visit([](auto& x) ->get_blocks_request_v0&{ return x; }, *current_request); @@ -247,98 +299,100 @@ struct state_history_plugin_impl : std::enable_shared_from_thisget_block_id(block_num); - - auto get_block = [&chain, block_num, head_block_state]() -> signed_block_ptr { - try { - if (head_block_state->block_num == block_num) - return head_block_state->block; - return chain.fetch_block_by_number(block_num); - } catch (...) { - return {}; - } - }; - - if (block_id) { - result.this_block = block_position{block_num, *block_id}; - auto prev_block_id = plugin->get_block_id(block_num - 1); - if (prev_block_id) - result.prev_block = block_position{block_num - 1, *prev_block_id}; - if (block_req.fetch_block) { + + if (to_send_block_num > current || to_send_block_num >= block_req.end_block_num) + return; + + auto block_id = plugin->get_block_id(to_send_block_num); + + if (block_id && position_it && (*position_it)->block_num == to_send_block_num) { + // This branch happens when the head block of nodeos is behind the head block of connecting client. + // In addition, the client told us the corresponding block id for block_num we are going to send. + // We can send the block when the block_id is different. + auto& itr = *position_it; + auto block_id_seen_by_client = itr->block_id; + ++itr; + if (itr == block_req.have_positions.end()) + position_it.reset(); + + if(block_id_seen_by_client == *block_id) { + ++to_send_block_num; + return; + } + } + + auto send_update_span = fc_create_span(span, "ship-send-update"); + fc_add_tag( send_update_span, "head_block_num", result.head.block_num ); + fc_add_tag(send_update_span, "block_num", to_send_block_num); + + auto get_block = [&chain, block_num = to_send_block_num, head_block_state]() -> signed_block_ptr { + try { + if (head_block_state->block_num == block_num) + return head_block_state->block; + return chain.fetch_block_by_number(block_num); + } catch (...) { + return {}; + } + }; + + if (block_id) { + result.this_block = block_position{to_send_block_num, *block_id}; + auto prev_block_id = plugin->get_block_id(to_send_block_num - 1); + if (prev_block_id) + result.prev_block = block_position{to_send_block_num - 1, *prev_block_id}; + if (block_req.fetch_block) { result.block = signed_block_ptr_variant{get_block()}; - } - if (block_req.fetch_traces && plugin->trace_log) { - result.traces = plugin->trace_log->get_log_entry(block_num); - } - if (block_req.fetch_deltas && plugin->chain_state_log) { - result.deltas = plugin->chain_state_log->get_log_entry(block_num); - } - set_result_block_header(result, get_block()); } - ++block_num; + if (block_req.fetch_traces && plugin->trace_log) { + result.traces = plugin->trace_log->get_log_entry(to_send_block_num); + // fc_add_tag(send_update_span, "traces_size", result.traces.data_size()); + } + if (block_req.fetch_deltas && plugin->chain_state_log) { + result.deltas = plugin->chain_state_log->get_log_entry(to_send_block_num); + // fc_add_tag(send_update_span, "deltas_size", result.deltas.data_size()); + } + set_result_block_header(result, get_block()); } - if (!result.has_value()) - return; - fc_ilog(_log, - "pushing result " - "{\"head\":{\"block_num\":${head}},\"last_irreversible\":{\"block_num\":${last_irr}},\"this_block\":{" - "\"block_num\":${this_block}}} to send queue", - ("head", result.head.block_num)("last_irr", result.last_irreversible.block_num)( - "this_block", result.this_block ? result.this_block->block_num : fc::variant())); - send(std::move(result)); + ++to_send_block_num; + + fc_ilog(_log, "pushing result {\"head\":{\"block_num\":${head}},\"last_irreversible\":{\"block_num\":${last_irr}},\"this_block\":{\"block_num\":${this_block}, \"id\": ${id}}} to send queue", + ("head", result.head.block_num)("last_irr", result.last_irreversible.block_num) + ("this_block", result.this_block ? result.this_block->block_num : fc::variant()) + ("id", block_id ? block_id->_hash[3] : 0 )); + + send(std::move(result), std::move(send_update_span)); --block_req.max_messages_in_flight; - need_to_send_update = block_req.start_block_num <= current && - block_req.start_block_num < block_req.end_block_num; - - std::visit( []( auto&& ptr ) { - if( ptr ) { - if (fc::zipkin_config::is_enabled()) { - auto id = ptr->calculate_id(); - auto blk_trace = fc_create_trace_with_id( "Block", id ); - auto blk_span = fc_create_span( blk_trace, "SHiP-Send" ); - fc_add_tag( blk_span, "block_id", id ); - fc_add_tag( blk_span, "block_num", ptr->block_num() ); - fc_add_tag( blk_span, "block_time", ptr->timestamp.to_time_point() ); - } - } - }, result.block ); - } + need_to_send_update = to_send_block_num <= current && + to_send_block_num < block_req.end_block_num; - void send_update_for_block(const block_state_ptr& head_block_state) { - std::visit( - [&head_block_state, this](const auto& req) { - (void)this; // avoid unused lambda warning - // send get_blocks_result_v1 when the request is get_blocks_request_v0 and - // send send_block_result_v2 when the request is get_blocks_request_v1. - if (head_block_state->block) { - typename std::decay_t::response_type result; - result.head = { head_block_state->block_num, head_block_state->id }; - send_update(head_block_state, std::move(result)); - } - }, - *current_request); } - void send_update(const block_state_ptr& block_state) override { - need_to_send_update = true; - if (!send_queue.empty() || !max_messages_in_flight()) + void send_update(const block_state_ptr& head_block_state, std::optional<::fc::zipkin_span>&& span) override { + if (!current_request || send_thread_has_exception) return; - - send_update_for_block(block_state); + std::visit( + [&head_block_state, this, span = std::move(span)](const auto& req) mutable { + (void)this; // avoid unused lambda warning + // send get_blocks_result_v1 when the request is get_blocks_request_v0 and + // send send_block_result_v2 when the request is get_blocks_request_v1. + if (head_block_state->block) { + typename std::decay_t::response_type result; + result.head = {head_block_state->block_num, head_block_state->id}; + if (::fc::zipkin_config::is_enabled() && !span) { + span.emplace("send-update-0", fc::zipkin_span::to_id(head_block_state->id), + "ship"_n.to_uint64_t()); + } + send_update(head_block_state, std::move(result), std::move(span)); + } + }, + *current_request); } - void send_update(bool changed = false) { - if (changed) - need_to_send_update = true; - if (!send_queue.empty() || !need_to_send_update || - !max_messages_in_flight()) - return; - auto& chain = plugin->chain_plug->chain(); - send_update_for_block(chain.head_block_state()); + void send_update(::std::optional<::fc::zipkin_span>&& span, bool changed = false) { + if (changed || need_to_send_update) { + auto& chain = plugin->chain_plug->chain(); + send_update(chain.head_block_state(), std::move(span)); + } } template @@ -359,27 +413,29 @@ struct state_history_plugin_impl : std::enable_shared_from_this void callback(boost::system::error_code ec, const char* what, F f) { - app().post( priority::medium, [=]() { - if( plugin->stopping ) + if( plugin->stopping ) return; if( ec ) return on_fail( ec, what ); + app().post( priority::high, [=]() { catch_and_close( f ); } ); } void on_fail(boost::system::error_code ec, const char* what) { - try { - fc_elog(_log,"${w}: ${m}", ("w", what)("m", ec.message())); - close(); - } catch (...) { - fc_elog(_log,"uncaught exception on close"); - } + fc_elog(_log,"${w}: ${m}", ("w", what)("m", ec.message())); + close(); } void close() override { - derived_session().socket_stream->next_layer().close(); - plugin->sessions.erase(this); + try { + work_guard.reset(); + derived_session().socket_stream->next_layer().close(); + plugin->sessions.erase(this); + } + catch (...) { + fc_elog(_log, "uncaught exception on close"); + } } }; @@ -502,44 +558,38 @@ struct state_history_plugin_impl : std::enable_shared_from_thisadd_transaction(p, t); } - void store(const block_state_ptr& block_state) { + void store(const block_state_ptr& block_state, const ::std::optional<::fc::zipkin_span>& ship_accept_span) { try { - if (trace_log) + if (trace_log) { + auto trace_log_span = fc_create_span(ship_accept_span, "store_trace_log"); trace_log->store(chain_plug->chain().db(), block_state); - if (chain_state_log) + } + if (chain_state_log) { + auto delta_log_span = fc_create_span(ship_accept_span, "store_delta_log"); chain_state_log->store(chain_plug->chain().kv_db(), block_state); + } return; } FC_LOG_AND_DROP() - // Both app().quit() and exception throwing are required. Without app().quit(), - // the exception would be caught and drop before reaching main(). The exception is - // to ensure the block won't be committed. - appbase::app().quit(); - EOS_THROW( - chain::state_history_write_exception, - "State history encountered an Error which it cannot recover from. Please resolve the error and relaunch " + elog("State history encountered an Error which it cannot recover from. Please resolve the error and relaunch " "the process"); + exit(1); } void on_accepted_block(const block_state_ptr& block_state) { - auto blk_trace = fc_create_trace_with_id("Block", block_state->id); - auto blk_span = fc_create_span(blk_trace, "SHiP-Accepted"); - fc_add_tag(blk_span, "block_id", block_state->id); - fc_add_tag(blk_span, "block_num", block_state->block_num); - fc_add_tag(blk_span, "block_time", block_state->block->timestamp.to_time_point()); - this->store(block_state); + auto ship_accept_span = fc_create_span_with_id("SHiP-Accepted", "ship"_n.to_uint64_t() , block_state->id); + + fc_add_tag(ship_accept_span, "block_id", block_state->id); + fc_add_tag(ship_accept_span, "block_num", block_state->block_num); + fc_add_tag(ship_accept_span, "block_time", block_state->block->timestamp.to_time_point()); + + this->store(block_state, ship_accept_span); + for (auto& s : sessions) { auto& p = s.second; if (p) { - if (p->current_request) { - uint32_t& req_start_block_num = - std::visit([](auto& req) -> uint32_t& { return req.start_block_num; }, *p->current_request); - if (block_state->block_num < req_start_block_num) { - req_start_block_num = block_state->block_num; - } - } - p->send_update(block_state); + p->send_update(block_state, fc_create_span(ship_accept_span, "send_update")); } } } @@ -675,10 +725,16 @@ void state_history_plugin::plugin_initialize(const variables_map& options) { void state_history_plugin::plugin_startup() { handle_sighup(); // setup logging - if (my->endpoint_address.size()) - my->listen(); - if (my->unix_path.size()) - my->unix_listen(); + + try { + if (my->endpoint_address.size()) + my->listen(); + if (my->unix_path.size()) + my->unix_listen(); + } + catch(std::exception& ex) { + appbase::app().quit(); + } } void state_history_plugin::plugin_shutdown() {