Skip to content

Commit

Permalink
Add a flag to neard configuring reconnection to reliable peers on sta…
Browse files Browse the repository at this point in the history
…rtup (#8576)

This is a follow-up to #8318, in which reliable peers are persisted to storage in the ConnectionStore.

A new boolean flag `--connect-to-reliable-peers-on-startup` is added to neard. If set to `true`, the node will attempt to reconnect to known reliable peers from storage upon starting up. The default value is `true`.

Note that setting the flag to `false` skips reconnection, but does not purge the reliable peers in storage.

Closes #8580
  • Loading branch information
saketh-are authored Feb 17, 2023
1 parent 92a8d42 commit cd5bc78
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 4 deletions.
5 changes: 5 additions & 0 deletions chain/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ pub struct NetworkConfig {
pub whitelist_nodes: Vec<PeerInfo>,
pub handshake_timeout: time::Duration,

/// Whether to re-establish connection to known reliable peers from previous neard run(s).
/// See near_network::peer_manager::connection_store for details.
pub connect_to_reliable_peers_on_startup: bool,
/// Maximum time between refreshing the peer list.
pub monitor_peers_max_period: time::Duration,
/// Maximum number of active peers. Hard limit.
Expand Down Expand Up @@ -250,6 +253,7 @@ impl NetworkConfig {
.collect::<anyhow::Result<_>>()
.context("whitelist_nodes")?
},
connect_to_reliable_peers_on_startup: true,
handshake_timeout: cfg.handshake_timeout.try_into()?,
monitor_peers_max_period: cfg.monitor_peers_max_period.try_into()?,
max_num_peers: cfg.max_num_peers,
Expand Down Expand Up @@ -315,6 +319,7 @@ impl NetworkConfig {
},
whitelist_nodes: vec![],
handshake_timeout: time::Duration::seconds(5),
connect_to_reliable_peers_on_startup: true,
monitor_peers_max_period: time::Duration::seconds(100),
max_num_peers: 40,
minimum_outbound_peers: 5,
Expand Down
27 changes: 23 additions & 4 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use crate::tcp;
use crate::time;
use crate::types::{
ConnectedPeerInfo, GetNetworkInfo, HighestHeightPeerInfo, KnownProducer, NetworkInfo,
NetworkRequests, NetworkResponses, PeerManagerMessageRequest, PeerManagerMessageResponse,
PeerType, SetChainInfo,
NetworkRequests, NetworkResponses, PeerInfo, PeerManagerMessageRequest,
PeerManagerMessageResponse, PeerType, SetChainInfo,
};
use actix::fut::future::wrap_future;
use actix::{Actor as _, AsyncContext as _};
Expand Down Expand Up @@ -101,6 +101,7 @@ pub struct PeerManagerActor {
/// In particular prefer emitting a new event to polling for a state change.
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum Event {
PeerManagerStarted,
ServerStarted,
RoutedMessageDropped,
AccountsAdded(Vec<AnnounceAccount>),
Expand All @@ -119,6 +120,8 @@ pub enum Event {
// actually complete. Currently this event is reported only for some message types,
// feel free to add support for more.
MessageProcessed(tcp::Tier, PeerMessage),
// Reported when a reconnect loop is spawned.
ReconnectLoopSpawned(PeerInfo),
// Reported when a handshake has been started.
HandshakeStarted(crate::peer::peer_actor::HandshakeStartedEvent),
// Reported when a handshake has been successfully completed.
Expand All @@ -135,7 +138,12 @@ impl actix::Actor for PeerManagerActor {
self.push_network_info_trigger(ctx, self.state.config.push_info_period);

// Attempt to reconnect to recent outbound connections from storage
self.bootstrap_outbound_from_recent_connections(ctx);
if self.state.config.connect_to_reliable_peers_on_startup {
tracing::debug!(target: "network", "Reconnecting to reliable peers from storage");
self.bootstrap_outbound_from_recent_connections(ctx);
} else {
tracing::debug!(target: "network", "Skipping reconnection to reliable peers");
}

// Periodically starts peer monitoring.
tracing::debug!(target: "network",
Expand Down Expand Up @@ -171,6 +179,8 @@ impl actix::Actor for PeerManagerActor {

// Periodically prints bandwidth stats for each peer.
self.report_bandwidth_stats_trigger(ctx, REPORT_BANDWIDTH_STATS_TRIGGER_INTERVAL);

self.state.config.event_sink.push(Event::PeerManagerStarted);
}

/// Try to gracefully disconnect from connected peers.
Expand Down Expand Up @@ -308,10 +318,13 @@ impl PeerManagerActor {
arbiter.spawn({
let state = state.clone();
let clock = clock.clone();
let peer_info = peer_info.clone();
async move {
state.reconnect(clock, peer_info, MAX_RECONNECT_ATTEMPTS).await;
}
});

state.config.event_sink.push(Event::ReconnectLoopSpawned(peer_info));
}
}
}
Expand Down Expand Up @@ -621,10 +634,16 @@ impl PeerManagerActor {
ctx.spawn(wrap_future({
let state = self.state.clone();
let clock = self.clock.clone();
let peer_info = conn_info.peer_info.clone();
async move {
state.reconnect(clock, conn_info.peer_info, 1).await;
state.reconnect(clock, peer_info, 1).await;
}
}));

self.state
.config
.event_sink
.push(Event::ReconnectLoopSpawned(conn_info.peer_info.clone()));
}
}

Expand Down
45 changes: 45 additions & 0 deletions chain/network/src/peer_manager/tests/tier2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,51 @@ async fn test_reconnect_after_restart_outbound_side() {
pm0.wait_for_direct_connection(id1.clone()).await;
}

#[tokio::test]
async fn test_skip_reconnect_after_restart_outbound_side() {
init_test_logger();
let mut rng = make_rng(921853233);
let rng = &mut rng;
let mut clock = time::FakeClock::default();
let chain = Arc::new(data::Chain::make(&mut clock, rng, 10));

let pm0_db = TestDB::new();
let mut pm0_cfg = chain.make_config(rng);
pm0_cfg.connect_to_reliable_peers_on_startup = false;

let pm0 = start_pm(clock.clock(), pm0_db.clone(), pm0_cfg.clone(), chain.clone()).await;
let pm1 = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await;

let id1 = pm1.cfg.node_id();

tracing::info!(target:"test", "connect pm0 to pm1");
pm0.connect_to(&pm1.peer_info(), tcp::Tier::T2).await;
clock.advance(STORED_CONNECTIONS_MIN_DURATION);

tracing::info!(target:"test", "check that pm0 stores the outbound connection to pm1");
pm0.update_connection_store(&clock.clock()).await;
check_recent_outbound_connections(&pm0, vec![id1.clone()]).await;

tracing::info!(target:"test", "drop pm0 and start it again with the same db");
drop(pm0);
let pm0 = start_pm(clock.clock(), pm0_db.clone(), pm0_cfg.clone(), chain.clone()).await;

tracing::info!(target:"test", "check that pm0 starts without attempting to reconnect to pm1");
let mut pm0_ev = pm0.events.clone();
pm0_ev
.recv_until(|ev| match &ev {
Event::PeerManager(PME::ReconnectLoopSpawned(_)) => {
panic!("PeerManager spawned a reconnect loop during startup");
}
Event::PeerManager(PME::PeerManagerStarted) => Some(()),
_ => None,
})
.await;

tracing::info!(target:"test", "check that pm0 has pm1 as a recent connection");
check_recent_outbound_connections(&pm0, vec![id1.clone()]).await;
}

#[tokio::test]
async fn test_reconnect_after_restart_inbound_side() {
init_test_logger();
Expand Down
9 changes: 9 additions & 0 deletions neard/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,9 @@ pub(super) struct RunCmd {
/// Set the boot nodes to bootstrap network from.
#[clap(long)]
boot_nodes: Option<String>,
/// Whether to re-establish connections from the ConnectionStore on startup
#[clap(long)]
connect_to_reliable_peers_on_startup: Option<bool>,
/// Minimum number of peers to start syncing/producing blocks
#[clap(long)]
min_peers: Option<usize>,
Expand Down Expand Up @@ -402,6 +405,12 @@ impl RunCmd {
if let Some(produce_empty_blocks) = self.produce_empty_blocks {
near_config.client_config.produce_empty_blocks = produce_empty_blocks;
}
if let Some(connect_to_reliable_peers_on_startup) =
self.connect_to_reliable_peers_on_startup
{
near_config.network_config.connect_to_reliable_peers_on_startup =
connect_to_reliable_peers_on_startup;
}
if let Some(boot_nodes) = self.boot_nodes {
if !boot_nodes.is_empty() {
near_config.network_config.peer_store.boot_nodes = boot_nodes
Expand Down

0 comments on commit cd5bc78

Please sign in to comment.