diff --git a/libraries/libfc/CMakeLists.txt b/libraries/libfc/CMakeLists.txt index 0ce96c2d09..67841f79c8 100644 --- a/libraries/libfc/CMakeLists.txt +++ b/libraries/libfc/CMakeLists.txt @@ -63,7 +63,6 @@ set( fc_sources src/network/http/http_client.cpp src/compress/zlib.cpp src/log/gelf_appender.cpp - src/log/zipkin.cpp ) file( GLOB_RECURSE fc_headers ${CMAKE_CURRENT_SOURCE_DIR} *.hpp *.h ) diff --git a/libraries/libfc/include/fc/log/trace.hpp b/libraries/libfc/include/fc/log/trace.hpp deleted file mode 100644 index 6a0d3de14c..0000000000 --- a/libraries/libfc/include/fc/log/trace.hpp +++ /dev/null @@ -1,44 +0,0 @@ -#pragma once - -#include -#include - -/// @param TRACE_STR const char* identifier for trace -/// @return implementation defined type RAII object that submits trace on exit of scope -#define fc_create_trace( TRACE_STR ) \ - ::fc::zipkin_config::is_enabled() ? \ - ::fc::optional_trace{ ::std::optional<::fc::zipkin_trace>( ::std::in_place, (TRACE_STR) ) } \ - : ::fc::optional_trace{}; - -/// @param TRACE_STR const char* identifier for trace -/// @param TRACE_ID fc::sha256 id to use -/// @return implementation defined type RAII object that submits trace on exit of scope -#define fc_create_trace_with_id( TRACE_STR, TRACE_ID ) \ - ::fc::zipkin_config::is_enabled() ? \ - ::fc::optional_trace{ ::std::optional<::fc::zipkin_trace>( ::std::in_place, ::fc::zipkin_span::to_id(TRACE_ID), (TRACE_STR) ) } \ - : ::fc::optional_trace{}; - -/// @param TRACE_VARNAME variable returned from fc_create_trace -/// @param SPAN_STR const char* indentifier -/// @return implementation defined type RAII object that submits span on exit of scope -#define fc_create_span( TRACE_VARNAME, SPAN_STR ) \ - ( (TRACE_VARNAME) && ::fc::zipkin_config::is_enabled() ) ? \ - (TRACE_VARNAME).opt->create_span( (SPAN_STR) ) \ - : ::std::optional<::fc::zipkin_span>{}; - -/// @param TRACE_TOKEN variable returned from trace.get_token() -/// @param SPAN_STR const char* indentifier -/// @return implementation defined type RAII object that submits span on exit of scope -#define fc_create_span_from_token( TRACE_TOKEN, SPAN_STR ) \ - ( (TRACE_TOKEN) && ::fc::zipkin_config::is_enabled() ) ? \ - ::fc::zipkin_trace::create_span_from_token( (TRACE_TOKEN), (SPAN_STR) ) \ - : ::std::optional<::fc::zipkin_span>{}; - -/// @param SPAN_VARNAME variable returned from fc_create_span -/// @param TAG_KEY_STR string key -/// @param TAG_VALUE string value -#define fc_add_tag( SPAN_VARNAME, TAG_KEY_STR, TAG_VALUE_STR) \ - FC_MULTILINE_MACRO_BEGIN \ - if( (SPAN_VARNAME) && ::fc::zipkin_config::is_enabled() ) \ - (SPAN_VARNAME)->add_tag( (TAG_KEY_STR), (TAG_VALUE_STR) ); \ - FC_MULTILINE_MACRO_END diff --git a/libraries/libfc/include/fc/log/zipkin.hpp b/libraries/libfc/include/fc/log/zipkin.hpp deleted file mode 100644 index 32010a1140..0000000000 --- a/libraries/libfc/include/fc/log/zipkin.hpp +++ /dev/null @@ -1,197 +0,0 @@ -#pragma once - -#include -#include -#include - -namespace fc { -/// Active Object that sends zipkin messages in JSON format -/// https://zipkin.io/zipkin-api/ -/// -/// Span contains following data -/// uint64_t traceId - unique id for trace, all children spans shared same id -/// std::string name - logical operation, should have low cardinality -/// uint64_t parentId - The parent span id, or absent if root span -/// uint64_t id - unique id for this span -/// int64_t timestamp - epoch microseconds of start of span -/// int64_t duration - microseconds of span -/// -/// Enable zipkin by calling zipkin_config::init() from main thread on startup. -/// Use macros defined in trace.hpp. - -class zipkin; - -class sha256; - -class zipkin_config { -public: - /// Thread safe only if init() called from main thread before spawning of any threads - static bool is_enabled() { return get_zipkin_() != nullptr; } - - /// Not thread safe, call from main thread before spawning any threads that might use zipkin. - /// @param url the url endpoint of zipkin server. e.g. http://127.0.0.1:9411/api/v2/spans - /// @param service_name the service name to include in each zipkin span - /// @param timeout_us the timeout in microseconds for each http call (9 consecutive failures and zipkin is disabled) - static void init( const std::string& url, const std::string& service_name, uint32_t timeout_us ); - - /// Thread safe only if init() called from main thread before spawning of any threads - /// @throw assert_exception if called before init() - static zipkin& get_zipkin(); - - /// Thread safe only if init() called from main thread before spawning of any threads - /// @throw assert_exception if called before init() - static void shutdown(); - - /// Starts with a random id and increments on each call, will not return 0 - static uint64_t get_next_unique_id(); - -private: - /// Provide access to initialized zipkin endpoint - /// Thread safe as long as init() called correctly - /// @return nullptr if init() not called - static zipkin* get_zipkin_() { return get().zip.get(); }; - - static zipkin_config& get(); - -private: - std::unique_ptr zip; -}; - -struct zipkin_span { - explicit zipkin_span( std::string name, uint64_t parent_id = 0 ) - : data( std::move( name ), parent_id ) {} - - explicit zipkin_span( uint64_t id, std::string name, uint64_t parent_id = 0 ) - : data( id, std::move( name ), parent_id ) {} - - zipkin_span( const zipkin_span& ) = delete; - zipkin_span& operator=( const zipkin_span& ) = delete; - zipkin_span& operator=( zipkin_span&& ) = delete; - - zipkin_span( zipkin_span&& rhs ) noexcept - : data( std::move( rhs.data ) ) { - rhs.data.id = 0; - } - - ~zipkin_span(); - - void add_tag( const std::string& key, const std::string& var ) { - // zipkin tags are required to be strings - data.tags( key, var ); - } - - void add_tag( const std::string& key, const char* var ) { - // zipkin tags are required to be strings - data.tags( key, var ); - } - - void add_tag( const std::string& key, bool v ) { - // zipkin tags are required to be strings - data.tags( key, v ? "true" : "false" ); - } - - template - std::enable_if_t>, void> - add_tag( const std::string& key, T&& var ) { - data.tags( key, std::to_string( std::forward( var ) ) ); - } - - template - std::enable_if_t>, void> - add_tag( const std::string& key, T&& var ) { - data.tags( key, (std::string) var ); - } - - struct token { - friend zipkin_span; - friend struct zipkin_trace; - friend struct optional_trace; - constexpr explicit operator bool() const noexcept { return id != 0; } - private: - explicit token( uint64_t id ) - : id( id ) {} - uint64_t id; - }; - - token get_token() const { return token{data.id}; }; - - static uint64_t to_id( const fc::sha256& id ); - - template - static uint64_t to_id( const T& id ) { - static_assert( std::is_same_v, "expected uint64_t" ); - return id.data()[3]; - } - - struct span_data { - explicit span_data( std::string name, uint64_t parent_id = 0 ) - : id( zipkin_config::get_next_unique_id() ), parent_id( parent_id ), - start( time_point::now() ), name( std::move( name ) ) {} - - explicit span_data( uint64_t id, std::string name, uint64_t parent_id = 0 ) - : id( id ), parent_id( parent_id ), start( time_point::now() ), name( std::move( name ) ) {} - - span_data( const span_data& ) = delete; - span_data& operator=( const span_data& ) = delete; - span_data& operator=( span_data&& ) = delete; - span_data( span_data&& rhs ) = default; - - uint64_t id; - // zipkin traceId and parentId are same (when parent_id set) since only allowing trace with span children. - // Not currently supporting spans with children, only trace with children spans. - const uint64_t parent_id; - const fc::time_point start; - fc::time_point stop; - std::string name; - fc::mutable_variant_object tags; - }; - - span_data data; -}; - -struct zipkin_trace : public zipkin_span { - using zipkin_span::zipkin_span; - - [[nodiscard]] std::optional create_span( std::string name ) const { - return zipkin_span{std::move( name ), get_token().id}; - } - - [[nodiscard]] static std::optional - create_span_from_token( zipkin_span::token token, std::string name ) { - return zipkin_span{std::move( name ), token.id}; - } -}; - -struct optional_trace { - std::optional opt; - - constexpr explicit operator bool() const noexcept { return opt.has_value(); } - - [[nodiscard]] zipkin_span::token get_token() const { - return opt ? opt->get_token() : zipkin_span::token( 0 ); - } -}; - -class zipkin { -public: - zipkin( const std::string& url, const std::string& service_name, uint32_t timeout_us ); - - /// finishes logging all queued up spans - ~zipkin() = default; - - /// Starts with a random id and increments on each call, will not return 0 - uint64_t get_next_unique_id(); - - // finish logging all queued up spans, not restartable - void shutdown(); - - // Logs zipkin json via http on separate thread - void log( zipkin_span::span_data&& span ); - -private: - class impl; - std::unique_ptr my; -}; - -} // namespace fc - diff --git a/libraries/libfc/src/log/zipkin.cpp b/libraries/libfc/src/log/zipkin.cpp deleted file mode 100644 index eddf4d41d1..0000000000 --- a/libraries/libfc/src/log/zipkin.cpp +++ /dev/null @@ -1,208 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include -#include - -namespace fc { - -zipkin_config& zipkin_config::get() { - static zipkin_config the_one; - return the_one; -} - -void zipkin_config::init( const std::string& url, const std::string& service_name, uint32_t timeout_us ) { - get().zip = std::make_unique( url, service_name, timeout_us ); -} - -zipkin& zipkin_config::get_zipkin() { - if( !get().zip ) { - FC_THROW_EXCEPTION( fc::assert_exception, "uninitialized zipkin" ); - } - return *get().zip; -} - -void zipkin_config::shutdown() { - if( zipkin* z = get_zipkin_() ) { - z->shutdown(); - } -} - -uint64_t zipkin_config::get_next_unique_id() { - if( !get().zip ) { - FC_THROW_EXCEPTION( fc::assert_exception, "uninitialized zipkin" ); - } - return get().zip->get_next_unique_id(); -} - -class zipkin::impl { -public: - static constexpr uint32_t max_consecutive_errors = 9; - - const std::string zipkin_url; - const std::string service_name; - const uint32_t timeout_us; - std::mutex mtx; - uint64_t next_id = 0; - http_client http; - std::atomic consecutive_errors = 0; - std::atomic stopped = 0; - std::optional endpoint; - std::thread thread; - boost::asio::io_context ctx; - boost::asio::io_context::strand work_strand{ctx}; - boost::asio::executor_work_guard work_guard = boost::asio::make_work_guard(ctx); - - impl( std::string url, std::string service_name, uint32_t timeout_us ) - : zipkin_url( std::move(url) ) - , service_name( std::move(service_name) ) - , timeout_us( timeout_us ) { - } - - void init(); - void shutdown(); - - void log( zipkin_span::span_data&& span ); - - ~impl(); -}; - -void zipkin::impl::init() { - thread = std::thread( [this]() { - fc::set_os_thread_name( "zipkin" ); - while( true ) { - try { - ctx.run(); - break; - } FC_LOG_AND_DROP(); - } - } ); -} - -zipkin::impl::~impl() { - try { - shutdown(); - } catch (...) {} -} - -void zipkin::impl::shutdown() { - if( stopped ^= 1 ) return; - work_guard.reset(); // drain the queue - thread.join(); -} - -zipkin::zipkin( const std::string& url, const std::string& service_name, uint32_t timeout_us ) : - my( new impl( url, service_name, timeout_us ) ) { - my->init(); -} - -uint64_t zipkin::get_next_unique_id() { - std::scoped_lock g( my->mtx ); - if( my->next_id == 0 ) { - std::mt19937_64 engine( std::random_device{}() ); - std::uniform_int_distribution distribution(1); - my->next_id = distribution( engine ); - } - return my->next_id++; -} - -void zipkin::shutdown() { - my->shutdown(); -} - -fc::variant create_zipkin_variant( zipkin_span::span_data&& span, const std::string& service_name ) { - // https://zipkin.io/zipkin-api/ - // std::string traceId; // [a-f0-9]{16,32} unique id for trace, all children spans shared same id - // std::string name; // logical operation, should have low cardinality - // std::string parentId; // The parent span id, or absent if root span - // std::string id // a-f0-9]{16} - // int64_t timestamp // epoch microseconds of start of span - // int64_t duration // microseconds of span - - uint64_t trace_id; - if( span.parent_id != 0 ) { - trace_id = span.parent_id; - } else { - trace_id = span.id; - } - - fc::mutable_variant_object mvo; - mvo( "id", fc::to_hex( reinterpret_cast(&span.id), sizeof( span.id ) ) ); - mvo( "traceId", fc::to_hex( reinterpret_cast(&trace_id), sizeof( trace_id ) ) ); - if( span.parent_id != 0 ) { - mvo( "parentId", fc::to_hex( reinterpret_cast(&span.parent_id), sizeof( span.parent_id ) ) ); - } - mvo( "name", std::move( span.name ) ); - mvo( "timestamp", span.start.time_since_epoch().count() ); - mvo( "duration", (span.stop - span.start).count() ); - mvo( "localEndpoint", fc::variant_object( "serviceName", service_name ) ); - - mvo( "tags", std::move( span.tags ) ); - span.id = 0; // stop destructor of span from calling log again - - // /api/v2/spans takes an array of spans - fc::variants result; - result.emplace_back( std::move( mvo ) ); - - return result; -} - -void zipkin::log( zipkin_span::span_data&& span ) { - if( my->consecutive_errors > my->max_consecutive_errors || my->stopped ) - return; - - boost::asio::post(my->work_strand, [this, span{std::move(span)}]() mutable { - my->log( std::move( span ) ); - }); -} - -void zipkin::impl::log( zipkin_span::span_data&& span ) { - if( consecutive_errors > max_consecutive_errors ) - return; - - try { - auto deadline = fc::time_point::now() + fc::microseconds( timeout_us ); - if( !endpoint ) { - endpoint = url( zipkin_url ); - dlog( "connecting to zipkin: ${p}", ("p", *endpoint) ); - } - - http.post_sync( *endpoint, create_zipkin_variant( std::move( span ), service_name ), deadline ); - - consecutive_errors = 0; - return; - } catch( const fc::exception& e ) { - wlog( "unable to connect to zipkin: ${u}, error: ${e}", ("u", zipkin_url)("e", e.to_detail_string()) ); - } catch( const std::exception& e ) { - wlog( "unable to connect to zipkin: ${u}, error: ${e}", ("u", zipkin_url)("e", e.what()) ); - } catch( ... ) { - wlog( "unable to connect to zipkin: ${u}, error: unknown", ("u", zipkin_url) ); - } - ++consecutive_errors; -} - -uint64_t zipkin_span::to_id( const fc::sha256& id ) { - // avoid 0 since id of 0 is used as a flag - return id._hash[3] == 0 ? 1 : id._hash[3]; -} - -zipkin_span::~zipkin_span() { - if( data.id == 0 ) - return; - try { - if( zipkin_config::is_enabled() ) { - data.stop = time_point::now(); - zipkin_config::get_zipkin().log( std::move( data ) ); - } - } catch( ... ) {} -} - -} // fc diff --git a/plugins/chain_interface/include/eosio/chain/plugin_interface.hpp b/plugins/chain_interface/include/eosio/chain/plugin_interface.hpp index d30a18dd9e..d9b458184a 100644 --- a/plugins/chain_interface/include/eosio/chain/plugin_interface.hpp +++ b/plugins/chain_interface/include/eosio/chain/plugin_interface.hpp @@ -38,7 +38,7 @@ namespace eosio { namespace chain { namespace plugin_interface { namespace incoming { namespace methods { - // synchronously push a block/trx to a single provider, block_state_ptr may be null + // synchronously push a block/trx to a single provider, block_state_ptr may be null using block_sync = method_decl&, const block_state_ptr&), first_provider_policy>; using transaction_async = method_decl), first_provider_policy>; } diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index a84bd8ee4e..09265e2371 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -16,7 +16,6 @@ #include #include #include -#include #include #include #include @@ -50,10 +49,14 @@ namespace eosio { using connection_ptr = std::shared_ptr; using connection_wptr = std::weak_ptr; + const fc::string logger_name("net_plugin_impl"); + fc::logger logger; + std::string peer_log_format; + template void verify_strand_in_this_thread(const Strand& strand, const char* func, int line) { if( !strand.running_in_this_thread() ) { - elog( "wrong strand: ${f} : line ${n}, exiting", ("f", func)("n", line) ); + fc_elog( logger, "wrong strand: ${f} : line ${n}, exiting", ("f", func)("n", line) ); app().quit(); } } @@ -75,7 +78,7 @@ namespace eosio { member, member >, - composite_key_compare< sha256_less, std::less > + composite_key_compare< sha256_less, std::less<> > >, ordered_non_unique< tag< by_expiry >, @@ -86,32 +89,24 @@ namespace eosio { struct peer_block_state { block_id_type id; - uint32_t block_num = 0; uint32_t connection_id = 0; - bool have_block = false; // true if we have received the block, false if only received id notification + + uint32_t block_num() const { return block_header::num_from_id(id); } }; - struct by_peer_block_id; - struct by_block_num; + struct by_connection_id; typedef multi_index_container< eosio::peer_block_state, indexed_by< - ordered_unique< tag, - composite_key< peer_block_state, - member, - member - >, - composite_key_compare< std::less, sha256_less > - >, - ordered_non_unique< tag, + ordered_unique< tag, composite_key< peer_block_state, + const_mem_fun, member, - member + member >, - composite_key_compare< sha256_less, std::greater > - >, - ordered_non_unique< tag, member > + composite_key_compare< std::less<>, sha256_less, std::less<> > + > > > peer_block_state_index; @@ -175,7 +170,7 @@ namespace eosio { : strand( io_context ) {} void bcast_transaction(const packed_transaction_ptr& trx); - void rejected_transaction(const packed_transaction_ptr& trx, uint32_t head_blk_num); + void rejected_transaction(const packed_transaction_ptr& trx); void bcast_block( const signed_block_ptr& b, const block_id_type& id ); void rejected_block(const block_id_type& id); @@ -188,6 +183,7 @@ namespace eosio { bool add_peer_block( const block_id_type& blkid, uint32_t connection_id ); bool peer_has_block(const block_id_type& blkid, uint32_t connection_id) const; bool have_block(const block_id_type& blkid) const; + void rm_block(const block_id_type& blkid); bool add_peer_txn( const transaction_id_type id, const time_point_sec& trx_expires, uint32_t connection_id, const time_point_sec& now = time_point::now() ); @@ -283,9 +279,10 @@ namespace eosio { compat::channels::transaction_ack::channel_type::handle incoming_transaction_ack_subscription; - uint16_t thread_pool_size = 2; + uint16_t thread_pool_size = 4; eosio::chain::named_thread_pool thread_pool{ "net" }; + boost::asio::deadline_timer accept_error_timer{thread_pool.get_executor()}; private: mutable std::mutex chain_info_mtx; // protects chain_* @@ -303,8 +300,7 @@ namespace eosio { void start_listen_loop(); - void on_accepted_block( const block_state_ptr& bs ); - void on_pre_accepted_block( const signed_block_ptr& bs ); + void on_accepted_block_header( const block_state_ptr& bs ); void transaction_ack(const std::pair&); void on_irreversible_block( const block_state_ptr& blk ); @@ -350,11 +346,9 @@ namespace eosio { connection_ptr find_connection(const string& host)const; // must call with held mutex string connect( const string& host ); - }; - const fc::string logger_name("net_plugin_impl"); - fc::logger logger; - std::string peer_log_format; + void plugin_shutdown(); + }; // peer_[x]log must be called from thread in connection strand #define peer_dlog( PEER, FORMAT, ... ) \ @@ -633,6 +627,8 @@ namespace eosio { string log_remote_endpoint_port; string local_endpoint_ip; string local_endpoint_port; + // kept in sync with last_handshake_recv.last_irreversible_block_num, only accessed from connection strand + uint32_t peer_lib_num = 0; std::atomic trx_in_progress_size{0}; fc::time_point last_dropped_trx_msg_time; @@ -1015,6 +1011,7 @@ namespace eosio { if( has_last_req && !shutdown ) { my_impl->dispatcher->retry_fetch( self->shared_from_this() ); } + self->peer_lib_num = 0; self->peer_requested.reset(); self->sent_handshake_count = 0; if( !shutdown) my_impl->sync_master->sync_reset_lib_num( self->shared_from_this(), true ); @@ -1040,15 +1037,15 @@ namespace eosio { enqueue(note); return; } - std::unique_lock g_conn( conn_mtx ); - if( last_handshake_recv.generation >= 1 ) { - peer_dlog( this, "maybe truncating branch at = ${h}:${id}", - ("h", block_header::num_from_id(last_handshake_recv.head_id))("id", last_handshake_recv.head_id) ); - } - block_id_type lib_id = last_handshake_recv.last_irreversible_block_id; - g_conn.unlock(); - const auto lib_num = block_header::num_from_id(lib_id); + if( logger.is_enabled( fc::log_level::debug ) ) { + std::unique_lock g_conn( conn_mtx ); + if( last_handshake_recv.generation >= 1 ) { + peer_dlog( this, "maybe truncating branch at = ${h}:${id}", + ("h", block_header::num_from_id(last_handshake_recv.head_id))("id", last_handshake_recv.head_id) ); + } + } + const auto lib_num = peer_lib_num; if( lib_num == 0 ) return; // if last_irreversible_block_id is null (we have not received handshake or reset) app().post( priority::medium, [chain_plug = my_impl->chain_plug, c = shared_from_this(), @@ -1113,7 +1110,6 @@ namespace eosio { if( b ) { fc_dlog( logger, "fetch_block_by_id num ${n}, connection ${cid}", ("n", b->block_num())("cid", c->connection_id) ); - my_impl->dispatcher->add_peer_block( blkid, c->connection_id ); c->strand.post( [c, b{std::move(b)}]() { c->enqueue_block( b ); } ); @@ -1571,9 +1567,8 @@ namespace eosio { } if( !c ) return; if( !closing ) { - std::lock_guard g_conn( c->conn_mtx ); - if( c->last_handshake_recv.last_irreversible_block_num > sync_known_lib_num ) { - sync_known_lib_num = c->last_handshake_recv.last_irreversible_block_num; + if( c->peer_lib_num > sync_known_lib_num ) { + sync_known_lib_num = c->peer_lib_num; } } else { // Closing connection, therefore its view of LIB can no longer be considered as we will no longer be connected. @@ -1955,6 +1950,7 @@ namespace eosio { } } else if (msg.known_blocks.mode == last_irr_catch_up) { { + c->peer_lib_num = msg.known_trx.pending; std::lock_guard g_conn( c->conn_mtx ); c->last_handshake_recv.last_irreversible_block_num = msg.known_trx.pending; } @@ -2059,34 +2055,38 @@ namespace eosio { // thread safe bool dispatch_manager::add_peer_block( const block_id_type& blkid, uint32_t connection_id) { + uint32_t block_num = block_header::num_from_id(blkid); std::lock_guard g( blk_state_mtx ); - auto bptr = blk_state.get().find( std::make_tuple( connection_id, std::ref( blkid ))); + auto bptr = blk_state.get().find( std::make_tuple(block_num, std::ref(blkid), connection_id) ); bool added = (bptr == blk_state.end()); if( added ) { - blk_state.insert( {blkid, block_header::num_from_id( blkid ), connection_id, true} ); - } else if( !bptr->have_block ) { - blk_state.modify( bptr, []( auto& pb ) { - pb.have_block = true; - }); + blk_state.insert( {blkid, connection_id} ); } return added; } bool dispatch_manager::peer_has_block( const block_id_type& blkid, uint32_t connection_id ) const { + uint32_t block_num = block_header::num_from_id(blkid); std::lock_guard g(blk_state_mtx); - const auto blk_itr = blk_state.get().find( std::make_tuple( connection_id, std::ref( blkid ))); + const auto blk_itr = blk_state.get().find( std::make_tuple(block_num, std::ref(blkid), connection_id) ); return blk_itr != blk_state.end(); } bool dispatch_manager::have_block( const block_id_type& blkid ) const { + uint32_t block_num = block_header::num_from_id(blkid); std::lock_guard g(blk_state_mtx); - // by_peer_block_id sorts have_block by greater so have_block == true will be the first one found - const auto& index = blk_state.get(); - auto blk_itr = index.find( blkid ); - if( blk_itr != index.end() ) { - return blk_itr->have_block; - } - return false; + const auto& index = blk_state.get(); + auto blk_itr = index.find( std::make_tuple(block_num, std::ref(blkid)) ); + return blk_itr != index.end(); + } + + void dispatch_manager::rm_block( const block_id_type& blkid ) { + uint32_t block_num = block_header::num_from_id(blkid); + fc_dlog( logger, "rm_block ${n}, id: ${id}", ("n", block_num)("id", blkid)); + std::lock_guard g(blk_state_mtx); + auto& index = blk_state.get(); + auto p = index.equal_range( std::make_tuple(block_num, std::ref(blkid)) ); + index.erase(p.first, p.second); } bool dispatch_manager::add_peer_txn( const transaction_id_type id, const time_point_sec& trx_expires, @@ -2128,7 +2128,7 @@ namespace eosio { void dispatch_manager::expire_blocks( uint32_t lib_num ) { std::lock_guard g(blk_state_mtx); - auto& stale_blk = blk_state.get(); + auto& stale_blk = blk_state.get(); stale_blk.erase( stale_blk.lower_bound(1), stale_blk.upper_bound(lib_num) ); } @@ -2144,18 +2144,18 @@ namespace eosio { fc_dlog( logger, "socket_is_open ${s}, connecting ${c}, syncing ${ss}, connection ${cid}", ("s", cp->socket_is_open())("c", cp->connecting.load())("ss", cp->syncing.load())("cid", cp->connection_id) ); if( !cp->current() ) return true; + + if( !add_peer_block( id, cp->connection_id ) ) { + fc_dlog( logger, "not bcast block ${b} to connection ${cid}", ("b", bnum)("cid", cp->connection_id) ); + return true; + } + send_buffer_type sb = buff_factory.get_send_buffer( b ); - cp->strand.post( [this, cp, id, bnum, sb{std::move(sb)}]() { + cp->strand.post( [cp, bnum, sb{std::move(sb)}]() { cp->latest_blk_time = cp->get_time(); - std::unique_lock g_conn( cp->conn_mtx ); - bool has_block = cp->last_handshake_recv.last_irreversible_block_num >= bnum; - g_conn.unlock(); + bool has_block = cp->peer_lib_num >= bnum; if( !has_block ) { - if( !add_peer_block( id, cp->connection_id ) ) { - peer_dlog( cp, "not bcast block ${b}", ("b", bnum) ); - return; - } peer_dlog( cp, "bcast block ${b}", ("b", bnum) ); cp->enqueue_buffer( sb, no_reason ); } @@ -2185,6 +2185,7 @@ namespace eosio { fc_dlog( logger, "rejected block ${id}", ("id", id) ); } + // called from any thread void dispatch_manager::bcast_transaction(const packed_transaction_ptr& trx) { trx_buffer_factory buff_factory; const auto now = fc::time_point::now(); @@ -2205,7 +2206,8 @@ namespace eosio { } ); } - void dispatch_manager::rejected_transaction(const packed_transaction_ptr& trx, uint32_t head_blk_num) { + // called from any thread + void dispatch_manager::rejected_transaction(const packed_transaction_ptr& trx) { fc_dlog( logger, "not sending rejected transaction ${tid}", ("tid", trx->id()) ); // keep rejected transaction around for awhile so we don't broadcast it, don't remove from local_txns } @@ -2413,8 +2415,19 @@ namespace eosio { fc_elog( logger, "Error accepting connection: ${m}", ("m", ec.message())); // For the listed error codes below, recall start_listen_loop() switch (ec.value()) { + case EMFILE: // same as boost::system::errc::too_many_files_open + { + // no file descriptors available to accept the connection. Wait on async_timer + // and retry listening using shorter 100ms timer than SHiP or http_plugin + // as net_pluging is more critical + accept_error_timer.expires_from_now(boost::posix_time::milliseconds(100)); + accept_error_timer.async_wait([this]( const boost::system::error_code &ec) { + if (!ec) + start_listen_loop(); + }); + return; // wait for timer!! + } case ECONNABORTED: - case EMFILE: case ENFILE: case ENOBUFS: case ENOMEM: @@ -2598,7 +2611,8 @@ namespace eosio { fc::raw::unpack( peek_ds, bh ); const block_id_type blk_id = bh.calculate_id(); - const uint32_t blk_num = bh.block_num(); + const uint32_t blk_num = block_header::num_from_id(blk_id); + // don't add_peer_block because we have not validated this block header yet if( my_impl->dispatcher->have_block( blk_id ) ) { peer_dlog( this, "canceling wait, already received block ${num}, id ${id}...", ("num", blk_num)("id", blk_id.str().substr(8,16)) ); @@ -2695,6 +2709,43 @@ namespace eosio { return true; } + void net_plugin_impl::plugin_shutdown() { + in_shutdown = true; + { + std::lock_guard g( connector_check_timer_mtx ); + if( connector_check_timer ) + connector_check_timer->cancel(); + } + { + std::lock_guard g( expire_timer_mtx ); + if( expire_timer ) + expire_timer->cancel(); + } + { + std::lock_guard g( keepalive_timer_mtx ); + if( keepalive_timer ) + keepalive_timer->cancel(); + } + + { + fc_ilog( logger, "close ${s} connections", ("s", connections.size()) ); + std::lock_guard g( connections_mtx ); + for( auto& con : connections ) { + fc_dlog( logger, "close: ${cid}", ("cid", con->connection_id) ); + con->close( false, true ); + } + connections.clear(); + } + + thread_pool.stop(); + + if( acceptor ) { + boost::system::error_code ec; + acceptor->cancel( ec ); + acceptor->close( ec ); + } + } + // call only from main application thread void net_plugin_impl::update_chain_info() { controller& cc = chain_plug->chain(); @@ -2772,6 +2823,7 @@ namespace eosio { peer_dlog( this, "received handshake gen ${g}, lib ${lib}, head ${head}", ("g", msg.generation)("lib", msg.last_irreversible_block_num)("head", msg.head_num) ); + peer_lib_num = msg.last_irreversible_block_num; std::unique_lock g_conn( conn_mtx ); last_handshake_recv = msg; g_conn.unlock(); @@ -3135,53 +3187,82 @@ namespace eosio { // called from connection strand void connection::handle_message( const block_id_type& id, signed_block_ptr ptr ) { - peer_dlog( this, "received signed_block ${num}, id ${id}", ("num", ptr->block_num())("id", id) ); + peer_dlog( this, "received signed_block ${num}, id ${id}", ("num", block_header::num_from_id(id))("id", id) ); - controller& cc = my_impl->chain_plug->chain(); - block_state_ptr bsp; - bool exception = false; - try { - if( cc.fetch_block_state_by_id( id ) ) { - my_impl->dispatcher->add_peer_block( id, connection_id ); - my_impl->sync_master->sync_recv_block( shared_from_this(), id, ptr->block_num(), false ); + // post to dispatcher strand so that we don't have multiple threads validating the block header + // the dispatcher strand will sync the add_peer_block and rm_block calls + my_impl->dispatcher->strand.post([id, c{shared_from_this()}, ptr{std::move(ptr)}, cid=connection_id]() mutable { + controller& cc = my_impl->chain_plug->chain(); + + // may have come in on a different connection and posted into dispatcher strand before this one + if( my_impl->dispatcher->have_block( id ) || cc.fetch_block_state_by_id( id ) ) { + my_impl->dispatcher->add_peer_block( id, c->connection_id ); + c->strand.post( [c, id]() { + my_impl->sync_master->sync_recv_block( c, id, block_header::num_from_id(id), false ); + }); return; } - // this may return null if block is not immediately ready to be processed - bsp = cc.create_block_state( id, ptr ); - } catch( const fc::exception& ex ) { - exception = true; - peer_elog(this, "bad block exception: #${n} ${id}...: ${m}", - ("n", ptr->block_num())("id", id.str().substr(8,16))("m",ex.to_string())); - } catch( ... ) { - exception = true; - peer_elog(this, "bad block: #${n} ${id}...: unknown exception", - ("n", ptr->block_num())("id", id.str().substr(8,16))); - } - if( exception ) { - my_impl->sync_master->rejected_block( shared_from_this(), ptr->block_num() ); - my_impl->dispatcher->rejected_block( id ); - return; - } - bool signal_producer = !!bsp; // ready to process immediately, so signal producer to interrupt start_block - app().post(priority::medium, [ptr{std::move(ptr)}, bsp{std::move(bsp)}, id, c = shared_from_this()]() mutable { - c->process_signed_block( id, std::move(ptr), std::move(bsp) ); - }); + block_state_ptr bsp; + bool exception = false; + try { + // this may return null if block is not immediately ready to be processed + bsp = cc.create_block_state( id, ptr ); + } catch( const fc::exception& ex ) { + exception = true; + fc_elog( logger, "bad block exception connection ${cid}: #${n} ${id}...: ${m}", + ("cid", cid)("n", ptr->block_num())("id", id.str().substr(8,16))("m",ex.to_string())); + } catch( ... ) { + exception = true; + fc_elog( logger, "bad block connection ${cid}: #${n} ${id}...: unknown exception", + ("cid", cid)("n", ptr->block_num())("id", id.str().substr(8,16))); + } + if( exception ) { + c->strand.post( [c, id, blk_num=ptr->block_num()]() { + my_impl->sync_master->rejected_block( c, blk_num ); + my_impl->dispatcher->rejected_block( id ); + }); + return; + } + + bool valid_block_header = !!bsp; + + if( valid_block_header ) { + fc_dlog( logger, "validated block header, broadcasting immediately, connection ${cid}, blk num = ${num}, id = ${id}", + ("cid", cid)("num", bsp->block_num)("id", bsp->id) ); + my_impl->dispatcher->add_peer_block( bsp->id, cid ); // no need to send back to sender + my_impl->dispatcher->bcast_block( bsp->block, bsp->id ); + } - if( signal_producer ) - my_impl->producer_plug->received_block(); + app().post(priority::medium, [ptr{std::move(ptr)}, bsp{std::move(bsp)}, id, c{std::move(c)}]() mutable { + c->process_signed_block( id, std::move(ptr), std::move(bsp) ); + }); + + if( valid_block_header ) { + // ready to process immediately, so signal producer to interrupt start_block + my_impl->producer_plug->received_block(); + } + }); } // called from application thread void connection::process_signed_block( const block_id_type& blk_id, signed_block_ptr msg, block_state_ptr bsp ) { controller& cc = my_impl->chain_plug->chain(); - uint32_t blk_num = msg->block_num(); + uint32_t blk_num = block_header::num_from_id(blk_id); // use c in this method instead of this to highlight that all methods called on c-> must be thread safe connection_ptr c = shared_from_this(); // if we have closed connection then stop processing - if( !c->socket_is_open() ) + if( !c->socket_is_open() ) { + if( bsp ) { + // valid bsp means add_peer_block already called, need to remove it since we are not going to process the block + // call on dispatch strand to serialize with the add_peer_block calls + my_impl->dispatcher->strand.post( [blk_id]() { + my_impl->dispatcher->rm_block( blk_id ); + } ); + } return; + } try { if( cc.fetch_block_by_id(blk_id) ) { @@ -3198,15 +3279,14 @@ namespace eosio { } fc::microseconds age( fc::time_point::now() - msg->timestamp); - fc_dlog( logger, "received signed_block: #${n} block age in secs = ${age}, connection ${cid}", - ("n", blk_num)("age", age.to_seconds())("cid", c->connection_id) ); + fc_dlog( logger, "received signed_block: #${n} block age in secs = ${age}, connection ${cid}, ${v}", + ("n", blk_num)("age", age.to_seconds())("cid", c->connection_id)("v", bsp ? "pre-validated" : "validation pending") ); - go_away_reason reason = fatal_other; + go_away_reason reason = no_reason; + bool accepted = false; try { - bool accepted = my_impl->chain_plug->accept_block(msg, blk_id, bsp); + accepted = my_impl->chain_plug->accept_block(msg, blk_id, bsp); my_impl->update_chain_info(); - if( !accepted ) return; - reason = no_reason; } catch( const unlinkable_block_exception &ex) { fc_elog(logger, "unlinkable_block_exception connection ${cid}: #${n} ${id}...: ${m}", ("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))("m",ex.to_string())); @@ -3218,15 +3298,18 @@ namespace eosio { } catch( const assert_exception &ex ) { fc_elog(logger, "block assert_exception connection ${cid}: #${n} ${id}...: ${m}", ("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))("m",ex.to_string())); + reason = fatal_other; } catch( const fc::exception &ex ) { fc_elog(logger, "bad block exception connection ${cid}: #${n} ${id}...: ${m}", ("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))("m",ex.to_string())); + reason = fatal_other; } catch( ... ) { fc_elog(logger, "bad block connection ${cid}: #${n} ${id}...: unknown exception", ("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))); + reason = fatal_other; } - if( reason == no_reason ) { + if( accepted ) { boost::asio::post( my_impl->thread_pool.get_executor(), [dispatcher = my_impl->dispatcher.get(), cid=c->connection_id, blk_id, msg]() { fc_dlog( logger, "accepted signed_block : #${n} ${id}...", ("n", msg->block_num())("id", blk_id.str().substr(8,16)) ); dispatcher->add_peer_block( blk_id, cid ); @@ -3236,8 +3319,18 @@ namespace eosio { sync_master->sync_recv_block( c, blk_id, blk_num, true ); }); } else { - c->strand.post( [sync_master = my_impl->sync_master.get(), dispatcher = my_impl->dispatcher.get(), c, blk_id, blk_num]() { - sync_master->rejected_block( c, blk_num ); + c->strand.post( [sync_master = my_impl->sync_master.get(), dispatcher = my_impl->dispatcher.get(), c, blk_id, blk_num, reason]() { + if( reason == unlinkable || reason == no_reason ) { + // unlinkable may be linkable in the future, so indicate we have not received it + // call on dispatch strand to serialize with the add_peer_block calls + my_impl->dispatcher->strand.post( [blk_id]() { + my_impl->dispatcher->rm_block( blk_id ); + } ); + } + // reason==no_reason means accept_block() return false because we are producing, don't call rejected_block which sends handshake + if( reason != no_reason ) { + sync_master->rejected_block( c, blk_num ); + } dispatcher->rejected_block( blk_id ); }); } @@ -3377,42 +3470,15 @@ namespace eosio { } // called from application thread - void net_plugin_impl::on_accepted_block(const block_state_ptr& bs) { + void net_plugin_impl::on_accepted_block_header(const block_state_ptr& bs) { update_chain_info(); - dispatcher->strand.post( [this, bs]() { - fc_dlog( logger, "signaled accepted_block, blk num = ${num}, id = ${id}", ("num", bs->block_num)("id", bs->id) ); + dispatcher->strand.post( [bs]() { + fc_dlog( logger, "signaled accepted_block_header, blk num = ${num}, id = ${id}", ("num", bs->block_num)("id", bs->id) ); - auto blk_trace = fc_create_trace_with_id( "Block", bs->id ); - auto blk_span = fc_create_span( blk_trace, "Accepted" ); - fc_add_tag( blk_span, "block_id", bs->id ); - fc_add_tag( blk_span, "block_num", bs->block_num ); - fc_add_tag( blk_span, "block_time", bs->block->timestamp.to_time_point() ); - - dispatcher->bcast_block( bs->block, bs->id ); + my_impl->dispatcher->bcast_block( bs->block, bs->id ); }); } - // called from application thread - void net_plugin_impl::on_pre_accepted_block(const signed_block_ptr& block) { - update_chain_info(); - controller& cc = chain_plug->chain(); - if( cc.is_trusted_producer(block->producer) ) { - dispatcher->strand.post( [this, block]() { - auto id = block->calculate_id(); - fc_dlog( logger, "signaled pre_accepted_block, blk num = ${num}, id = ${id}", ("num", block->block_num())("id", id) ); - - auto blk_trace = fc_create_trace_with_id("Block", id); - auto blk_span = fc_create_span(blk_trace, "PreAccepted"); - fc_add_tag(blk_span, "block_id", id); - fc_add_tag(blk_span, "block_num", block->block_num()); - fc_add_tag(blk_span, "block_time", block->timestamp.to_time_point()); - fc_add_tag(blk_span, "producer", block->producer.to_string()); - - dispatcher->bcast_block( block, id ); - }); - } - } - // called from application thread void net_plugin_impl::on_irreversible_block( const block_state_ptr& block) { fc_dlog( logger, "on_irreversible_block, blk num = ${num}, id = ${id}", ("num", block->block_num)("id", block->id) ); @@ -3421,14 +3487,11 @@ namespace eosio { // called from application thread void net_plugin_impl::transaction_ack(const std::pair& results) { - dispatcher->strand.post( [this, results]() { + boost::asio::post( my_impl->thread_pool.get_executor(), [dispatcher = my_impl->dispatcher.get(), results]() { const auto& id = results.second->id(); if (results.first) { fc_dlog( logger, "signaled NACK, trx-id = ${id} : ${why}", ("id", id)( "why", results.first->to_detail_string() ) ); - - uint32_t head_blk_num = 0; - std::tie( std::ignore, head_blk_num, std::ignore, std::ignore, std::ignore, std::ignore ) = get_chain_info(); - dispatcher->rejected_transaction(results.second, head_blk_num); + dispatcher->rejected_transaction(results.second); } else { fc_dlog( logger, "signaled ACK, trx-id = ${id}", ("id", id) ); dispatcher->bcast_transaction(results.second); @@ -3687,7 +3750,7 @@ namespace eosio { if( cc.get_read_mode() == db_read_mode::IRREVERSIBLE ) { if( my->p2p_accept_transactions ) { my->p2p_accept_transactions = false; - wlog( "p2p-accept-transactions set to false due to read-mode: irreversible" ); + fc_wlog( logger, "p2p-accept-transactions set to false due to read-mode: irreversible" ); } } if( my->p2p_accept_transactions ) { @@ -3750,11 +3813,8 @@ namespace eosio { { chain::controller& cc = my->chain_plug->chain(); - cc.accepted_block.connect( [my = my]( const block_state_ptr& s ) { - my->on_accepted_block( s ); - } ); - cc.pre_accepted_block.connect( [my = my]( const signed_block_ptr& s ) { - my->on_pre_accepted_block( s ); + cc.accepted_block_header.connect( [my = my]( const block_state_ptr& s ) { + my->on_accepted_block_header( s ); } ); cc.irreversible_block.connect( [my = my]( const block_state_ptr& s ) { my->on_irreversible_block( s ); @@ -3777,7 +3837,7 @@ namespace eosio { my->acceptor->bind(listen_endpoint); my->acceptor->listen(); } catch (const std::exception& e) { - elog( "net_plugin::plugin_startup failed to bind to port ${port}, ${what}", + fc_elog( logger, "net_plugin::plugin_startup failed to bind to port ${port}, ${what}", ("port", listen_endpoint.port())("what", e.what()) ); app().quit(); return; @@ -3808,39 +3868,9 @@ namespace eosio { void net_plugin::plugin_shutdown() { try { fc_ilog( logger, "shutdown.." ); - my->in_shutdown = true; - { - std::lock_guard g( my->connector_check_timer_mtx ); - if( my->connector_check_timer ) - my->connector_check_timer->cancel(); - }{ - std::lock_guard g( my->expire_timer_mtx ); - if( my->expire_timer ) - my->expire_timer->cancel(); - }{ - std::lock_guard g( my->keepalive_timer_mtx ); - if( my->keepalive_timer ) - my->keepalive_timer->cancel(); - } - - { - fc_ilog( logger, "close ${s} connections", ("s", my->connections.size()) ); - std::lock_guard g( my->connections_mtx ); - for( auto& con : my->connections ) { - fc_dlog( logger, "close: ${cid}", ("cid", con->connection_id) ); - con->close( false, true ); - } - my->connections.clear(); - } - - my->thread_pool.stop(); - - if( my->acceptor ) { - boost::system::error_code ec; - my->acceptor->cancel( ec ); - my->acceptor->close( ec ); - } - + + my->plugin_shutdown(); + app().post( 0, [me = my](){} ); // keep my pointer alive until queue is drained fc_ilog( logger, "exit shutdown" ); } diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index e38db88ad6..21756af9db 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -327,7 +327,7 @@ class producer_plugin_impl : public std::enable_shared_from_this bsf; @@ -481,6 +481,7 @@ class producer_plugin_impl : public std::enable_shared_from_thiscreate_block_state_future( copy_b->calculate_id(), copy_b ); validator.control->abort_block(); controller::block_report br; - BOOST_REQUIRE_EXCEPTION(validator.control->push_block( br, bsf.get(), forked_branch_callback{}, trx_meta_cache_lookup{} ), fc::exception , + BOOST_REQUIRE_EXCEPTION(validator.control->push_block( br, bsf.get(), forked_branch_callback{}, trx_meta_cache_lookup{} ), fc::exception, [] (const fc::exception &e)->bool { return e.code() == block_validate_exception::code_value && e.to_detail_string().find("invalid block transaction merkle root") != std::string::npos;