diff --git a/chain/network/src/config.rs b/chain/network/src/config.rs index 3b4f254333b..4e5357616dd 100644 --- a/chain/network/src/config.rs +++ b/chain/network/src/config.rs @@ -96,6 +96,9 @@ pub struct NetworkConfig { pub whitelist_nodes: Vec, pub handshake_timeout: time::Duration, + /// Whether to re-establish connection to known reliable peers from previous neard run(s). + /// See near_network::peer_manager::connection_store for details. + pub connect_to_reliable_peers_on_startup: bool, /// Maximum time between refreshing the peer list. pub monitor_peers_max_period: time::Duration, /// Maximum number of active peers. Hard limit. @@ -250,6 +253,7 @@ impl NetworkConfig { .collect::>() .context("whitelist_nodes")? }, + connect_to_reliable_peers_on_startup: true, handshake_timeout: cfg.handshake_timeout.try_into()?, monitor_peers_max_period: cfg.monitor_peers_max_period.try_into()?, max_num_peers: cfg.max_num_peers, @@ -315,6 +319,7 @@ impl NetworkConfig { }, whitelist_nodes: vec![], handshake_timeout: time::Duration::seconds(5), + connect_to_reliable_peers_on_startup: true, monitor_peers_max_period: time::Duration::seconds(100), max_num_peers: 40, minimum_outbound_peers: 5, diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index a2205578c4c..ab3f105e6ea 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -16,8 +16,8 @@ use crate::tcp; use crate::time; use crate::types::{ ConnectedPeerInfo, GetNetworkInfo, HighestHeightPeerInfo, KnownProducer, NetworkInfo, - NetworkRequests, NetworkResponses, PeerManagerMessageRequest, PeerManagerMessageResponse, - PeerType, SetChainInfo, + NetworkRequests, NetworkResponses, PeerInfo, PeerManagerMessageRequest, + PeerManagerMessageResponse, PeerType, SetChainInfo, }; use actix::fut::future::wrap_future; use actix::{Actor as _, AsyncContext as _}; @@ -101,6 +101,7 @@ pub struct PeerManagerActor { /// In particular prefer emitting a new event to polling for a state change. #[derive(Debug, PartialEq, Eq, Clone)] pub enum Event { + PeerManagerStarted, ServerStarted, RoutedMessageDropped, AccountsAdded(Vec), @@ -119,6 +120,8 @@ pub enum Event { // actually complete. Currently this event is reported only for some message types, // feel free to add support for more. MessageProcessed(tcp::Tier, PeerMessage), + // Reported when a reconnect loop is spawned. + ReconnectLoopSpawned(PeerInfo), // Reported when a handshake has been started. HandshakeStarted(crate::peer::peer_actor::HandshakeStartedEvent), // Reported when a handshake has been successfully completed. @@ -135,7 +138,12 @@ impl actix::Actor for PeerManagerActor { self.push_network_info_trigger(ctx, self.state.config.push_info_period); // Attempt to reconnect to recent outbound connections from storage - self.bootstrap_outbound_from_recent_connections(ctx); + if self.state.config.connect_to_reliable_peers_on_startup { + tracing::debug!(target: "network", "Reconnecting to reliable peers from storage"); + self.bootstrap_outbound_from_recent_connections(ctx); + } else { + tracing::debug!(target: "network", "Skipping reconnection to reliable peers"); + } // Periodically starts peer monitoring. tracing::debug!(target: "network", @@ -171,6 +179,8 @@ impl actix::Actor for PeerManagerActor { // Periodically prints bandwidth stats for each peer. self.report_bandwidth_stats_trigger(ctx, REPORT_BANDWIDTH_STATS_TRIGGER_INTERVAL); + + self.state.config.event_sink.push(Event::PeerManagerStarted); } /// Try to gracefully disconnect from connected peers. @@ -308,10 +318,13 @@ impl PeerManagerActor { arbiter.spawn({ let state = state.clone(); let clock = clock.clone(); + let peer_info = peer_info.clone(); async move { state.reconnect(clock, peer_info, MAX_RECONNECT_ATTEMPTS).await; } }); + + state.config.event_sink.push(Event::ReconnectLoopSpawned(peer_info)); } } } @@ -621,10 +634,16 @@ impl PeerManagerActor { ctx.spawn(wrap_future({ let state = self.state.clone(); let clock = self.clock.clone(); + let peer_info = conn_info.peer_info.clone(); async move { - state.reconnect(clock, conn_info.peer_info, 1).await; + state.reconnect(clock, peer_info, 1).await; } })); + + self.state + .config + .event_sink + .push(Event::ReconnectLoopSpawned(conn_info.peer_info.clone())); } } diff --git a/chain/network/src/peer_manager/tests/tier2.rs b/chain/network/src/peer_manager/tests/tier2.rs index d615264b611..ffa7af65655 100644 --- a/chain/network/src/peer_manager/tests/tier2.rs +++ b/chain/network/src/peer_manager/tests/tier2.rs @@ -133,6 +133,51 @@ async fn test_reconnect_after_restart_outbound_side() { pm0.wait_for_direct_connection(id1.clone()).await; } +#[tokio::test] +async fn test_skip_reconnect_after_restart_outbound_side() { + init_test_logger(); + let mut rng = make_rng(921853233); + let rng = &mut rng; + let mut clock = time::FakeClock::default(); + let chain = Arc::new(data::Chain::make(&mut clock, rng, 10)); + + let pm0_db = TestDB::new(); + let mut pm0_cfg = chain.make_config(rng); + pm0_cfg.connect_to_reliable_peers_on_startup = false; + + let pm0 = start_pm(clock.clock(), pm0_db.clone(), pm0_cfg.clone(), chain.clone()).await; + let pm1 = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await; + + let id1 = pm1.cfg.node_id(); + + tracing::info!(target:"test", "connect pm0 to pm1"); + pm0.connect_to(&pm1.peer_info(), tcp::Tier::T2).await; + clock.advance(STORED_CONNECTIONS_MIN_DURATION); + + tracing::info!(target:"test", "check that pm0 stores the outbound connection to pm1"); + pm0.update_connection_store(&clock.clock()).await; + check_recent_outbound_connections(&pm0, vec![id1.clone()]).await; + + tracing::info!(target:"test", "drop pm0 and start it again with the same db"); + drop(pm0); + let pm0 = start_pm(clock.clock(), pm0_db.clone(), pm0_cfg.clone(), chain.clone()).await; + + tracing::info!(target:"test", "check that pm0 starts without attempting to reconnect to pm1"); + let mut pm0_ev = pm0.events.clone(); + pm0_ev + .recv_until(|ev| match &ev { + Event::PeerManager(PME::ReconnectLoopSpawned(_)) => { + panic!("PeerManager spawned a reconnect loop during startup"); + } + Event::PeerManager(PME::PeerManagerStarted) => Some(()), + _ => None, + }) + .await; + + tracing::info!(target:"test", "check that pm0 has pm1 as a recent connection"); + check_recent_outbound_connections(&pm0, vec![id1.clone()]).await; +} + #[tokio::test] async fn test_reconnect_after_restart_inbound_side() { init_test_logger(); diff --git a/neard/src/cli.rs b/neard/src/cli.rs index 68148c19f9c..d0536633e55 100644 --- a/neard/src/cli.rs +++ b/neard/src/cli.rs @@ -347,6 +347,9 @@ pub(super) struct RunCmd { /// Set the boot nodes to bootstrap network from. #[clap(long)] boot_nodes: Option, + /// Whether to re-establish connections from the ConnectionStore on startup + #[clap(long)] + connect_to_reliable_peers_on_startup: Option, /// Minimum number of peers to start syncing/producing blocks #[clap(long)] min_peers: Option, @@ -402,6 +405,12 @@ impl RunCmd { if let Some(produce_empty_blocks) = self.produce_empty_blocks { near_config.client_config.produce_empty_blocks = produce_empty_blocks; } + if let Some(connect_to_reliable_peers_on_startup) = + self.connect_to_reliable_peers_on_startup + { + near_config.network_config.connect_to_reliable_peers_on_startup = + connect_to_reliable_peers_on_startup; + } if let Some(boot_nodes) = self.boot_nodes { if !boot_nodes.is_empty() { near_config.network_config.peer_store.boot_nodes = boot_nodes