Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into GH-592-ship-crash-spl…
Browse files Browse the repository at this point in the history
…it-ship-log
  • Loading branch information
heifner committed Feb 3, 2023
2 parents 24b5333 + 9f0679b commit 7e1ef84
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 50 deletions.
36 changes: 20 additions & 16 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <boost/asio/steady_timer.hpp>

#include <atomic>
#include <cmath>
#include <shared_mutex>

using namespace eosio::chain::plugin_interface;
Expand Down Expand Up @@ -1772,7 +1773,6 @@ namespace eosio {
if( c->is_transactions_only_connection() ) return;

uint32_t lib_num = 0;
uint32_t peer_lib = msg.last_irreversible_block_num;
uint32_t head = 0;
block_id_type head_id;
std::tie( lib_num, std::ignore, head,
Expand All @@ -1786,10 +1786,10 @@ namespace eosio {
peer_wlog(c, "Peer sent a handshake with a timestamp skewed by at least ${t}ms", ("t", network_latency_ns/1000000));
network_latency_ns = 0;
}
// number of blocks syncing node is behind from a peer node
uint32_t nblk_behind_by_net_latency = static_cast<uint32_t>(network_latency_ns / block_interval_ns);
// 2x for time it takes for message to reach back to peer node, +1 to compensate for integer division truncation
uint32_t nblk_combined_latency = 2 * nblk_behind_by_net_latency + 1;
// number of blocks syncing node is behind from a peer node, round up
uint32_t nblk_behind_by_net_latency = std::lround( static_cast<double>(network_latency_ns) / static_cast<double>(block_interval_ns) );
// 2x for time it takes for message to reach back to peer node
uint32_t nblk_combined_latency = 2 * nblk_behind_by_net_latency;
// message in the log below is used in p2p_high_latency_test.py test
peer_dlog(c, "Network latency is ${lat}ms, ${num} blocks discrepancy by network latency, ${tot_num} blocks discrepancy expected once message received",
("lat", network_latency_ns/1000000)("num", nblk_behind_by_net_latency)("tot_num", nblk_combined_latency));
Expand All @@ -1809,8 +1809,8 @@ namespace eosio {
//-----------------------------

if (head_id == msg.head_id) {
peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 0",
("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) );
peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 0, lib ${l}",
("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16))("l", lib_num) );
c->syncing = false;
notice_message note;
note.known_blocks.mode = none;
Expand All @@ -1819,18 +1819,20 @@ namespace eosio {
c->enqueue( note );
return;
}
if (head < peer_lib) {
peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 1",
("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) );
if (head < msg.last_irreversible_block_num) {
peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 1, head ${h}, lib ${l}",
("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16))
("h", head)("l", lib_num) );
c->syncing = false;
if (c->sent_handshake_count > 0) {
c->send_handshake();
}
return;
}
if (lib_num > msg.head_num + nblk_combined_latency) {
peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 2",
("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) );
peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 2, head ${h}, lib ${l}",
("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16))
("h", head)("l", lib_num) );
if (msg.generation > 1 || c->protocol_version > proto_base) {
notice_message note;
note.known_trx.pending = lib_num;
Expand All @@ -1844,14 +1846,16 @@ namespace eosio {
}

if (head + nblk_combined_latency < msg.head_num ) {
peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 3",
("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) );
peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 3, head ${h}, lib ${l}",
("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16))
("h", head)("l", lib_num) );
c->syncing = false;
verify_catchup(c, msg.head_num, msg.head_id);
return;
} else if(head >= msg.head_num + nblk_combined_latency) {
peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 4",
("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) );
peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 4, head ${h}, lib ${l}",
("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16))
("h", head)("l", lib_num) );
if (msg.generation > 1 || c->protocol_version > proto_base) {
notice_message note;
note.known_trx.mode = none;
Expand Down
89 changes: 55 additions & 34 deletions plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,20 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl

constexpr static uint64_t default_frame_size = 1024 * 1024;


using acceptor_type = std::variant<std::unique_ptr<tcp::acceptor>, std::unique_ptr<unixs::acceptor>>;
std::set<acceptor_type> acceptors;
template <class ACCEPTOR>
struct generic_acceptor {
using socket_type = typename ACCEPTOR::protocol_type::socket;
generic_acceptor(boost::asio::io_context& ioc) : acceptor_(ioc), socket_(ioc), error_timer_(ioc) {}
ACCEPTOR acceptor_;
socket_type socket_;
boost::asio::deadline_timer error_timer_;
};

using tcp_acceptor = generic_acceptor<tcp::acceptor>;
using unix_acceptor = generic_acceptor<unixs::acceptor>;

using acceptor_type = std::variant<std::unique_ptr<tcp_acceptor>, std::unique_ptr<unix_acceptor>>;
std::set<acceptor_type> acceptors;

named_thread_pool thread_pool{"SHiP"};

Expand Down Expand Up @@ -156,7 +167,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
FC_THROW_EXCEPTION(plugin_exception, "unable to open listen socket");
};

auto init_tcp_acceptor = [&]() { acceptors.insert(std::make_unique<tcp::acceptor>(thread_pool.get_executor())); };
auto init_tcp_acceptor = [&]() { acceptors.insert(std::make_unique<tcp_acceptor>(thread_pool.get_executor())); };
auto init_unix_acceptor = [&]() {
// take a sniff and see if anything is already listening at the given socket path, or if the socket path exists
// but nothing is listening
Expand All @@ -175,7 +186,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
ec = test_ec;
}
check_ec("open");
acceptors.insert(std::make_unique<unixs::acceptor>(thread_pool.get_executor()));
acceptors.insert(std::make_unique<unix_acceptor>(thread_pool.get_executor()));
};

// create and configure acceptors, can be both
Expand All @@ -184,24 +195,24 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl

// start it
std::for_each(acceptors.begin(), acceptors.end(), [&](const acceptor_type& acc) {
std::visit(overloaded{[&](const std::unique_ptr<tcp::acceptor>& tcp_acc) {
std::visit(overloaded{[&](const std::unique_ptr<tcp_acceptor>& tcp_acc) {
auto address = boost::asio::ip::make_address(endpoint_address);
auto endpoint = tcp::endpoint{address, endpoint_port};
tcp_acc->open(endpoint.protocol(), ec);
tcp_acc->acceptor_.open(endpoint.protocol(), ec);
check_ec("open");
tcp_acc->set_option(boost::asio::socket_base::reuse_address(true));
tcp_acc->bind(endpoint, ec);
tcp_acc->acceptor_.set_option(boost::asio::socket_base::reuse_address(true));
tcp_acc->acceptor_.bind(endpoint, ec);
check_ec("bind");
tcp_acc->listen(boost::asio::socket_base::max_listen_connections, ec);
tcp_acc->acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec);
check_ec("listen");
do_accept(*tcp_acc);
},
[&](const std::unique_ptr<unixs::acceptor>& unx_acc) {
unx_acc->open(unixs::acceptor::protocol_type(), ec);
[&](const std::unique_ptr<unix_acceptor>& unx_acc) {
unx_acc->acceptor_.open(unixs::acceptor::protocol_type(), ec);
check_ec("open");
unx_acc->bind(unix_path.c_str(), ec);
unx_acc->acceptor_.bind(unix_path.c_str(), ec);
check_ec("bind");
unx_acc->listen(boost::asio::socket_base::max_listen_connections, ec);
unx_acc->acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec);
check_ec("listen");
do_accept(*unx_acc);
}},
Expand All @@ -210,23 +221,33 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
}

template <typename Acceptor>
void do_accept(Acceptor& acceptor) {
auto socket = std::make_shared<typename Acceptor::protocol_type::socket>(thread_pool.get_executor());
void do_accept(Acceptor& acc) {
// &acceptor kept alive by self, reference into acceptors set
acceptor.async_accept(*socket, [self = shared_from_this(), this, socket, &acceptor](const boost::system::error_code& ec) {
if (stopping)
return;
if (ec) {
if (ec == boost::system::errc::too_many_files_open)
catch_and_log([&] { self->do_accept(acceptor); });
acc.acceptor_.async_accept(acc.socket_, [self = shared_from_this(), &acc](const boost::system::error_code& ec) {
if (self->stopping)
return;
if (ec == boost::system::errc::too_many_files_open) {
fc_elog(_log, "ship accept() error: too many files open - waiting 200ms");
acc.error_timer_.expires_from_now(boost::posix_time::milliseconds(200));
acc.error_timer_.async_wait([self = self->shared_from_this(), &acc](const boost::system::error_code& ec) {
if (!ec)
catch_and_log([&] { self->do_accept(acc); });
});
} else {
if (ec)
fc_elog(_log, "ship accept() error: ${m} - closing connection", ("m", ec.message()));
else {
// Create a session object and run it
catch_and_log([&] {
auto s = std::make_shared<session<std::shared_ptr<state_history_plugin_impl>, typename Acceptor::socket_type>>(self, std::move(acc.socket_));
s->start();
self->session_set.insert( std::move(s) );
});
}

// Accept another connection
catch_and_log([&] { self->do_accept(acc); });
}
catch_and_log([&] {
auto s = std::make_shared<session<std::shared_ptr<state_history_plugin_impl>, typename Acceptor::protocol_type::socket>>(self, std::move(*socket));
s->start();
self->session_set.insert( std::move(s) );
});
catch_and_log([&] { self->do_accept(acceptor); });
});
}

Expand Down Expand Up @@ -305,12 +326,12 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
~state_history_plugin_impl() {
std::for_each(acceptors.begin(), acceptors.end(), [&](const acceptor_type& acc) {
std::visit(overloaded{
[]( const std::unique_ptr<unixs::acceptor>& a ) {
boost::system::error_code ec;
if( const auto ep = a->local_endpoint( ec ); !ec )
::unlink( ep.path().c_str() );
},
[]( const std::unique_ptr<tcp::acceptor>& a) {}
[]( const std::unique_ptr<unix_acceptor>& a ) {
boost::system::error_code ec;
if( const auto ep = a->acceptor_.local_endpoint( ec ); !ec )
::unlink( ep.path().c_str() );
},
[]( const std::unique_ptr<tcp_acceptor>& a) {}
}, acc);
});
}
Expand Down

0 comments on commit 7e1ef84

Please sign in to comment.