diff --git a/CHANGELOG.md b/CHANGELOG.md index e6975cd421f..2e15974c84c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ to pay for the storage of their accounts. * `ClientConfig` can be updated while the node is running. `dyn_config.json` is no longer needed as its contents were merged into `config.json`. [#8240](https://github.com/near/nearcore/pull/8240) +* TIER2 network stabilization. Long-lasting active connections are persisted to DB and are re-established automatically if either node restarts. A new neard flag `--connect-to-reliable-peers-on-startup` is provided to toggle this behavior; it defaults to true. The PeerStore is no longer persisted to DB and is now kept in-memory. [#8579](https://github.com/near/nearcore/issues/8579), [#8580](https://github.com/near/nearcore/issues/8580). ## 1.31.0 diff --git a/chain/chain/src/store.rs b/chain/chain/src/store.rs index 4095562105f..8cc75d4ad3d 100644 --- a/chain/chain/src/store.rs +++ b/chain/chain/src/store.rs @@ -2394,7 +2394,7 @@ impl<'a> ChainStoreUpdate<'a> { | DBCol::BlockMisc | DBCol::_GCCount | DBCol::BlockHeight // block sync needs it + genesis should be accessible - | DBCol::Peers + | DBCol::_Peers | DBCol::RecentOutboundConnections | DBCol::BlockMerkleTree | DBCol::AccountAnnouncements diff --git a/chain/network/src/lib.rs b/chain/network/src/lib.rs index 1640e38fbc6..32a95d883b5 100644 --- a/chain/network/src/lib.rs +++ b/chain/network/src/lib.rs @@ -1,5 +1,4 @@ pub use crate::peer_manager::peer_manager_actor::{Event, PeerManagerActor}; -pub use crate::peer_manager::peer_store::iter_peers_from_store; mod accounts_data; mod network_protocol; diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index ed12e3537b5..f9adf7b1f87 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -834,11 +834,7 @@ impl PeerActor { } HandshakeFailureReason::InvalidTarget => { tracing::debug!(target: "network", "Peer found was not what expected. Updating peer info with {:?}", peer_info); - if let Err(err) = - self.network_state.peer_store.add_direct_peer(&self.clock, peer_info) - { - tracing::error!(target: "network", ?err, "Fail to update peer store"); - } + self.network_state.peer_store.add_direct_peer(&self.clock, peer_info); self.stop(ctx, ClosingReason::HandshakeFailed); } } @@ -1133,12 +1129,10 @@ impl PeerActor { PeerMessage::PeersResponse(peers) => { tracing::debug!(target: "network", "Received peers from {}: {} peers.", self.peer_info, peers.len()); let node_id = self.network_state.config.node_id(); - if let Err(err) = self.network_state.peer_store.add_indirect_peers( + self.network_state.peer_store.add_indirect_peers( &self.clock, peers.into_iter().filter(|peer_info| peer_info.id != node_id), - ) { - tracing::error!(target: "network", ?err, "Fail to update peer store"); - }; + ); self.network_state .config .event_sink diff --git a/chain/network/src/peer/testonly.rs b/chain/network/src/peer/testonly.rs index 51489b661d4..d6ad79c4ff1 100644 --- a/chain/network/src/peer/testonly.rs +++ b/chain/network/src/peer/testonly.rs @@ -103,8 +103,7 @@ impl PeerHandle { let network_state = Arc::new(NetworkState::new( &clock, store.clone(), - peer_store::PeerStore::new(&clock, network_cfg.peer_store.clone(), store.clone()) - .unwrap(), + peer_store::PeerStore::new(&clock, network_cfg.peer_store.clone()).unwrap(), network_cfg.verify().unwrap(), cfg.chain.genesis_id.clone(), fc.clone(), diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs index 5ac69efca67..0b58575a479 100644 --- a/chain/network/src/peer_manager/network_state/mod.rs +++ b/chain/network/src/peer_manager/network_state/mod.rs @@ -326,10 +326,8 @@ impl NetworkState { .await .map_err(|_: ReasonForBan| RegisterPeerError::InvalidEdge)?; this.tier2.insert_ready(conn.clone()).map_err(RegisterPeerError::PoolError)?; - // Best effort write to DB. - if let Err(err) = this.peer_store.peer_connected(&clock, peer_info) { - tracing::error!(target: "network", ?err, "Failed to save peer data"); - } + // Write to the peer store + this.peer_store.peer_connected(&clock, peer_info); } } Ok(()) diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index ab3f105e6ea..617bcb16749 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -208,9 +208,8 @@ impl PeerManagerActor { ) -> anyhow::Result> { let config = config.verify().context("config")?; let store = store::Store::from(store); - let peer_store = - peer_store::PeerStore::new(&clock, config.peer_store.clone(), store.clone()) - .context("PeerStore::new")?; + let peer_store = peer_store::PeerStore::new(&clock, config.peer_store.clone()) + .context("PeerStore::new")?; tracing::debug!(target: "network", len = peer_store.len(), boot_nodes = config.peer_store.boot_nodes.len(), diff --git a/chain/network/src/peer_manager/peer_store/mod.rs b/chain/network/src/peer_manager/peer_store/mod.rs index 35239512bfc..280fc1c795f 100644 --- a/chain/network/src/peer_manager/peer_store/mod.rs +++ b/chain/network/src/peer_manager/peer_store/mod.rs @@ -1,25 +1,31 @@ use crate::blacklist; use crate::network_protocol::PeerInfo; -use crate::store; use crate::time; use crate::types::{KnownPeerState, KnownPeerStatus, ReasonForBan}; use anyhow::bail; use im::hashmap::Entry; use im::{HashMap, HashSet}; use near_primitives::network::PeerId; -use near_store::db::Database; use parking_lot::Mutex; use rand::seq::IteratorRandom; use rand::thread_rng; use std::net::SocketAddr; use std::ops::Not; -use std::sync::Arc; #[cfg(test)] mod testonly; #[cfg(test)] mod tests; +/// The PeerStore is an in-memory cache of known peer states. It is used to: +/// - Store information about known peers in the network. Peers may be discovered +/// by connecting to them directly or by learning about them from other peers. +/// - Respond to requests from other peers for known peers (see PeerStore::healthy_peers). +/// - Select peers to which we may try to connect directly (see PeerStore::unconnected_peer). +/// +/// Contents of the PeerStore are not persisted to the database. Upon starting a node, +/// the PeerStore is initialized from the boot nodes in its config. + /// How often to update the KnownPeerState.last_seen in storage. const UPDATE_LAST_SEEN_INTERVAL: time::Duration = time::Duration::minutes(1); @@ -41,9 +47,6 @@ struct VerifiedPeer { } impl VerifiedPeer { - fn new(peer_id: PeerId) -> Self { - Self { peer_id, trust_level: TrustLevel::Indirect } - } fn signed(peer_id: PeerId) -> Self { Self { peer_id, trust_level: TrustLevel::Signed } } @@ -54,9 +57,6 @@ pub struct Config { /// A list of nodes to connect to on the first run of the neard server. /// Once it connects to some of them, the server will learn about other /// nodes in the network and will try to connect to them as well. - /// Sever will also store in DB the info about the nodes it learned about, - /// so that on the next run it has a larger choice of nodes to connect - /// to (rather than just the boot nodes). /// /// The recommended boot nodes are distributed together with the config.json /// file, but you can modify the boot_nodes field to contain any nodes that @@ -72,10 +72,9 @@ pub struct Config { pub ban_window: time::Duration, } -/// Known peers store, maintaining cache of known peers and connection to storage to save/load them. +/// Known peers store, maintaining cache of known peers struct Inner { config: Config, - store: store::Store, boot_nodes: HashSet, peer_states: HashMap, // This is a reverse index, from physical address to peer_id @@ -91,21 +90,16 @@ impl Inner { /// to the peer ID thus we can be sure that they control the secret key. /// /// See also [`Self::add_indirect_peers`] and [`Self::add_direct_peer`]. - fn add_signed_peer(&mut self, clock: &time::Clock, peer_info: PeerInfo) -> anyhow::Result<()> { + fn add_signed_peer(&mut self, clock: &time::Clock, peer_info: PeerInfo) { self.add_peer(clock, peer_info, TrustLevel::Signed) } /// Adds a peer into the store with given trust level. - fn add_peer( - &mut self, - clock: &time::Clock, - peer_info: PeerInfo, - trust_level: TrustLevel, - ) -> anyhow::Result<()> { + fn add_peer(&mut self, clock: &time::Clock, peer_info: PeerInfo, trust_level: TrustLevel) { if let Some(peer_addr) = peer_info.addr { match trust_level { TrustLevel::Signed => { - self.update_peer_info(clock, peer_info, peer_addr, TrustLevel::Signed)?; + self.update_peer_info(clock, peer_info, peer_addr, TrustLevel::Signed); } TrustLevel::Direct => { // If this peer already exists with a signed connection ignore this update. @@ -118,9 +112,9 @@ impl Inner { Some(verified_peer.trust_level) })(); if trust_level == Some(TrustLevel::Signed) { - return Ok(()); + return; } - self.update_peer_info(clock, peer_info, peer_addr, TrustLevel::Direct)?; + self.update_peer_info(clock, peer_info, peer_addr, TrustLevel::Direct); } TrustLevel::Indirect => { // We should only update an Indirect connection if we don't know anything about the peer @@ -128,7 +122,7 @@ impl Inner { if !self.peer_states.contains_key(&peer_info.id) && !self.addr_peers.contains_key(&peer_addr) { - self.update_peer_info(clock, peer_info, peer_addr, TrustLevel::Indirect)?; + self.update_peer_info(clock, peer_info, peer_addr, TrustLevel::Indirect); } } } @@ -139,29 +133,19 @@ impl Inner { .entry(peer_info.id.clone()) .or_insert_with(|| KnownPeerState::new(peer_info, clock.now_utc())); } - Ok(()) - } - - /// Copies the in-mem state of the peer to DB. - fn touch(&mut self, peer_id: &PeerId) -> anyhow::Result<()> { - Ok(match self.peer_states.get(peer_id) { - Some(peer_state) => self.store.set_peer_state(&peer_id, peer_state)?, - None => (), - }) } fn peer_unban(&mut self, peer_id: &PeerId) -> anyhow::Result<()> { if let Some(peer_state) = self.peer_states.get_mut(peer_id) { peer_state.status = KnownPeerStatus::NotConnected; - self.store.set_peer_state(&peer_id, peer_state)?; } else { bail!("Peer {} is missing in the peer store", peer_id); } Ok(()) } - /// Deletes peers from the internal cache and the persistent store. - fn delete_peers(&mut self, peer_ids: &[PeerId]) -> anyhow::Result<()> { + /// Deletes peers from the internal cache + fn delete_peers(&mut self, peer_ids: &[PeerId]) { for peer_id in peer_ids { if let Some(peer_state) = self.peer_states.remove(peer_id) { if let Some(addr) = peer_state.peer_info.addr { @@ -169,7 +153,6 @@ impl Inner { } } } - Ok(self.store.delete_peer_states(peer_ids)?) } /// Find a random subset of peers based on filter. @@ -193,14 +176,11 @@ impl Inner { peer_info: PeerInfo, peer_addr: SocketAddr, trust_level: TrustLevel, - ) -> anyhow::Result<()> { - let mut touch_other = None; - + ) { // If there is a peer associated with current address remove the address from it. if let Some(verified_peer) = self.addr_peers.remove(&peer_addr) { self.peer_states.entry(verified_peer.peer_id).and_modify(|peer_state| { peer_state.peer_info.addr = None; - touch_other = Some(peer_state.peer_info.id.clone()); }); } @@ -222,12 +202,6 @@ impl Inner { .entry(peer_info.id.clone()) .and_modify(|peer_state| peer_state.peer_info.addr = Some(peer_addr)) .or_insert_with(|| KnownPeerState::new(peer_info.clone(), now)); - - self.touch(&peer_info.id)?; - if let Some(touch_other) = touch_other { - self.touch(&touch_other)?; - } - Ok(()) } /// Removes peers that are not responding for expiration period. @@ -241,9 +215,7 @@ impl Inner { to_remove.push(peer_id.clone()); } } - if let Err(err) = self.delete_peers(&to_remove) { - tracing::error!(target: "network", ?err, "Failed to remove expired peers"); - } + self.delete_peers(&to_remove); } fn unban(&mut self, now: time::Utc) { @@ -266,14 +238,11 @@ impl Inner { /// Update the 'last_seen' time for all the peers that we're currently connected to. fn update_last_seen(&mut self, now: time::Utc) { - for (peer_id, peer_state) in self.peer_states.iter_mut() { + for (_peer_id, peer_state) in self.peer_states.iter_mut() { if peer_state.status == KnownPeerStatus::Connected && now > peer_state.last_seen + UPDATE_LAST_SEEN_INTERVAL { peer_state.last_seen = now; - if let Err(err) = self.store.set_peer_state(peer_id, peer_state) { - tracing::error!(target: "network", ?peer_id, ?err, "Failed to update peers last seen time."); - } } } } @@ -285,7 +254,6 @@ impl Inner { /// This function should be called periodically. pub fn update(&mut self, clock: &time::Clock) { let now = clock.now_utc(); - // TODO(gprusak): these operations could be put into a single DB write transaction. self.unban(now); self.update_last_seen(now); self.remove_expired(now); @@ -295,7 +263,7 @@ impl Inner { pub(crate) struct PeerStore(Mutex); impl PeerStore { - pub fn new(clock: &time::Clock, config: Config, store: store::Store) -> anyhow::Result { + pub fn new(clock: &time::Clock, config: Config) -> anyhow::Result { let boot_nodes: HashSet<_> = config.boot_nodes.iter().map(|p| p.id.clone()).collect(); // A mapping from `PeerId` to `KnownPeerState`. let mut peerid_2_state = HashMap::default(); @@ -326,80 +294,8 @@ impl PeerStore { .insert(peer_info.id.clone(), KnownPeerState::new(peer_info.clone(), now)); } - let mut peers_to_keep = vec![]; - let mut peers_to_delete = vec![]; - for (peer_id, peer_state) in store.list_peer_states()? { - let status = match peer_state.status { - KnownPeerStatus::Unknown => { - // We mark boot nodes as 'NotConnected', as we trust that they exist. - if boot_nodes.contains(&peer_id) { - KnownPeerStatus::NotConnected - } else { - KnownPeerStatus::Unknown - } - } - KnownPeerStatus::NotConnected => KnownPeerStatus::NotConnected, - KnownPeerStatus::Connected => KnownPeerStatus::NotConnected, - KnownPeerStatus::Banned(reason, deadline) => { - if config.connect_only_to_boot_nodes && boot_nodes.contains(&peer_id) { - // Give boot node another chance. - KnownPeerStatus::NotConnected - } else { - KnownPeerStatus::Banned(reason, deadline) - } - } - }; - - let peer_state = KnownPeerState { - peer_info: peer_state.peer_info, - first_seen: peer_state.first_seen, - last_seen: peer_state.last_seen, - status, - last_outbound_attempt: None, - }; - - let is_blacklisted = - peer_state.peer_info.addr.map_or(false, |addr| config.blacklist.contains(addr)); - if is_blacklisted { - tracing::info!(target: "network", "Removing {:?} because address is blacklisted", peer_state.peer_info); - peers_to_delete.push(peer_id); - } else { - peers_to_keep.push((peer_id, peer_state)); - } - } - - for (peer_id, peer_state) in peers_to_keep.into_iter() { - match peerid_2_state.entry(peer_id) { - // Peer is a boot node - Entry::Occupied(mut current_peer_state) => { - if peer_state.status.is_banned() { - // If it says in database, that peer should be banned, ban the peer. - current_peer_state.get_mut().status = peer_state.status; - } - } - // Peer is not a boot node - Entry::Vacant(entry) => { - if let Some(peer_addr) = peer_state.peer_info.addr { - if let Entry::Vacant(entry2) = addr_2_peer.entry(peer_addr) { - // Default case, add new entry. - entry2.insert(VerifiedPeer::new(peer_state.peer_info.id.clone())); - entry.insert(peer_state); - } - // else: There already exists a peer with a same addr, that's a boot node. - // Note: We don't load this entry into the memory, but it still stays on disk. - } - } - } - } - - let mut inner = Inner { - config, - store, - boot_nodes, - peer_states: peerid_2_state, - addr_peers: addr_2_peer, - }; - inner.delete_peers(&peers_to_delete)?; + let inner = + Inner { config, boot_nodes, peer_states: peerid_2_state, addr_peers: addr_2_peer }; Ok(PeerStore(Mutex::new(inner))) } @@ -429,23 +325,19 @@ impl PeerStore { self.0.lock().peer_states.get(peer_id).cloned() } - pub fn peer_connected(&self, clock: &time::Clock, peer_info: &PeerInfo) -> anyhow::Result<()> { + pub fn peer_connected(&self, clock: &time::Clock, peer_info: &PeerInfo) { let mut inner = self.0.lock(); - inner.add_signed_peer(clock, peer_info.clone())?; - let mut store = inner.store.clone(); + inner.add_signed_peer(clock, peer_info.clone()); let entry = inner.peer_states.get_mut(&peer_info.id).unwrap(); entry.last_seen = clock.now_utc(); entry.status = KnownPeerStatus::Connected; - Ok(store.set_peer_state(&peer_info.id, entry)?) } pub fn peer_disconnected(&self, clock: &time::Clock, peer_id: &PeerId) -> anyhow::Result<()> { let mut inner = self.0.lock(); - let mut store = inner.store.clone(); if let Some(peer_state) = inner.peer_states.get_mut(peer_id) { peer_state.last_seen = clock.now_utc(); peer_state.status = KnownPeerStatus::NotConnected; - store.set_peer_state(peer_id, peer_state)?; } else { bail!("Peer {} is missing in the peer store", peer_id); } @@ -460,7 +352,6 @@ impl PeerStore { result: Result<(), anyhow::Error>, ) -> anyhow::Result<()> { let mut inner = self.0.lock(); - let mut store = inner.store.clone(); if let Some(peer_state) = inner.peer_states.get_mut(peer_id) { if result.is_err() { @@ -470,10 +361,10 @@ impl PeerStore { peer_state.last_outbound_attempt = Some((clock.now_utc(), result.map_err(|err| err.to_string()))); peer_state.last_seen = clock.now_utc(); - store.set_peer_state(peer_id, peer_state)?; } else { bail!("Peer {} is missing in the peer store", peer_id); } + Ok(()) } @@ -485,12 +376,10 @@ impl PeerStore { ) -> anyhow::Result<()> { tracing::warn!(target: "network", "Banning peer {} for {:?}", peer_id, ban_reason); let mut inner = self.0.lock(); - let mut store = inner.store.clone(); if let Some(peer_state) = inner.peer_states.get_mut(peer_id) { let now = clock.now_utc(); peer_state.last_seen = now; peer_state.status = KnownPeerStatus::Banned(ban_reason, now); - store.set_peer_state(peer_id, peer_state)?; } else { bail!("Peer {} is missing in the peer store", peer_id); } @@ -552,11 +441,7 @@ impl PeerStore { /// are nodes there we haven’t received signatures of their peer ID. /// /// See also [`Self::add_direct_peer`] and [`Self::add_signed_peer`]. - pub fn add_indirect_peers( - &self, - clock: &time::Clock, - peers: impl Iterator, - ) -> Result<(), Box> { + pub fn add_indirect_peers(&self, clock: &time::Clock, peers: impl Iterator) { let mut inner = self.0.lock(); let mut total: usize = 0; let mut blacklisted: usize = 0; @@ -567,14 +452,13 @@ impl PeerStore { if is_blacklisted { blacklisted += 1; } else { - inner.add_peer(clock, peer_info, TrustLevel::Indirect)?; + inner.add_peer(clock, peer_info, TrustLevel::Indirect); } } if blacklisted != 0 { tracing::info!(target: "network", "Ignored {} blacklisted peers out of {} indirect peer(s)", blacklisted, total); } - Ok(()) } /// Adds a peer we’ve connected to but haven’t verified ID yet. @@ -584,7 +468,7 @@ impl PeerStore { /// confirming that identity yet. /// /// See also [`Self::add_indirect_peers`] and [`Self::add_signed_peer`]. - pub fn add_direct_peer(&self, clock: &time::Clock, peer_info: PeerInfo) -> anyhow::Result<()> { + pub fn add_direct_peer(&self, clock: &time::Clock, peer_info: PeerInfo) { self.0.lock().add_peer(clock, peer_info, TrustLevel::Direct) } @@ -592,14 +476,3 @@ impl PeerStore { self.0.lock().peer_states.clone() } } - -/// Public method used to iterate through all peers stored in the database. -pub fn iter_peers_from_store(db: Arc, f: F) -where - F: Fn((PeerId, KnownPeerState)), -{ - let store = crate::store::Store::from(db); - for x in store.list_peer_states().unwrap() { - f(x) - } -} diff --git a/chain/network/src/peer_manager/peer_store/tests.rs b/chain/network/src/peer_manager/peer_store/tests.rs index b5cabc6be59..5fefef48f4d 100644 --- a/chain/network/src/peer_manager/peer_store/tests.rs +++ b/chain/network/src/peer_manager/peer_store/tests.rs @@ -2,7 +2,6 @@ use super::*; use crate::blacklist::Blacklist; use crate::time; use near_crypto::{KeyType, SecretKey}; -use near_store::{Mode, NodeStorage, StoreOpener}; use std::collections::HashSet; use std::net::{Ipv4Addr, SocketAddrV4}; @@ -43,32 +42,16 @@ fn make_config( #[test] fn ban_store() { let clock = time::FakeClock::default(); - let (_tmp_dir, opener) = NodeStorage::test_opener(); let peer_info_a = gen_peer_info(0); let peer_info_to_ban = gen_peer_info(1); let boot_nodes = vec![peer_info_a, peer_info_to_ban.clone()]; - { - let store = store::Store::from(opener.open().unwrap()); - let peer_store = PeerStore::new( - &clock.clock(), - make_config(&boot_nodes, Blacklist::default(), false), - store, - ) - .unwrap(); - assert_eq!(peer_store.healthy_peers(3).len(), 2); - peer_store.peer_ban(&clock.clock(), &peer_info_to_ban.id, ReasonForBan::Abusive).unwrap(); - assert_eq!(peer_store.healthy_peers(3).len(), 1); - } - { - let store_new = store::Store::from(opener.open().unwrap()); - let peer_store_new = PeerStore::new( - &clock.clock(), - make_config(&boot_nodes, Blacklist::default(), false), - store_new, - ) - .unwrap(); - assert_eq!(peer_store_new.healthy_peers(3).len(), 1); - } + + let peer_store = + PeerStore::new(&clock.clock(), make_config(&boot_nodes, Blacklist::default(), false)) + .unwrap(); + assert_eq!(peer_store.healthy_peers(3).len(), 2); + peer_store.peer_ban(&clock.clock(), &peer_info_to_ban.id, ReasonForBan::Abusive).unwrap(); + assert_eq!(peer_store.healthy_peers(3).len(), 1); } #[test] @@ -77,17 +60,13 @@ fn test_unconnected_peer() { let peer_info_a = gen_peer_info(0); let peer_info_to_ban = gen_peer_info(1); let boot_nodes = vec![peer_info_a, peer_info_to_ban]; - { - let store = store::Store::from(near_store::db::TestDB::new()); - let peer_store = PeerStore::new( - &clock.clock(), - make_config(&boot_nodes, Blacklist::default(), false), - store, - ) - .unwrap(); - assert!(peer_store.unconnected_peer(|_| false, false).is_some()); - assert!(peer_store.unconnected_peer(|_| true, false).is_none()); - } + + let peer_store = + PeerStore::new(&clock.clock(), make_config(&boot_nodes, Blacklist::default(), false)) + .unwrap(); + + assert!(peer_store.unconnected_peer(|_| false, false).is_some()); + assert!(peer_store.unconnected_peer(|_| true, false).is_none()); } #[test] @@ -96,7 +75,6 @@ fn test_unknown_vs_not_connected() { let clock = time::FakeClock::default(); let peer_info_a = gen_peer_info(0); let peer_info_b = gen_peer_info(1); - let (_tmp_dir, opener) = NodeStorage::test_opener(); let peer_info_boot_node = gen_peer_info(2); let boot_nodes = vec![peer_info_boot_node.clone()]; @@ -106,114 +84,65 @@ fn test_unknown_vs_not_connected() { nodes.map(|peer| peer_store.get_peer_state(&peer.id).map(|known_state| known_state.status)) }; - let get_database_status = || { - let store = crate::store::Store::from(opener.open_in_mode(Mode::ReadOnly).unwrap()); - let peers_state: HashMap = - store.list_peer_states().unwrap().into_iter().map(|x| (x.0, x.1)).collect(); - nodes.map(|peer| peers_state.get(&peer.id).map(|known_state| known_state.status.clone())) - }; - - { - let store = store::Store::from(opener.open().unwrap()); - let peer_store = PeerStore::new( - &clock.clock(), - make_config(&boot_nodes, Blacklist::default(), false), - store, - ) - .unwrap(); - - // Check the status of the in-memory store. - // Boot node should be marked as not-connected, as we've verified it. - // TODO(mm-near) - the boot node should have been added as 'NotConnected' and not Unknown. - assert_eq!(get_in_memory_status(&peer_store), [None, None, Some(Unknown)]); - - // Add the remaining peers. - peer_store.add_direct_peer(&clock.clock(), peer_info_a.clone()).unwrap(); - peer_store.add_direct_peer(&clock.clock(), peer_info_b.clone()).unwrap(); + let peer_store = + PeerStore::new(&clock.clock(), make_config(&boot_nodes, Blacklist::default(), false)) + .unwrap(); - // Check the state in a database. - // Seems that boot node is not added to the database when 'new' is called. - assert_eq!(get_database_status(), [Some(Unknown), Some(Unknown), None]); + // Check the status of the in-memory store. + // Boot node should be marked as not-connected, as we've verified it. + // TODO(mm-near) - the boot node should have been added as 'NotConnected' and not Unknown. + assert_eq!(get_in_memory_status(&peer_store), [None, None, Some(Unknown)]); - assert_eq!( - get_in_memory_status(&peer_store), - [Some(Unknown), Some(Unknown), Some(Unknown)] - ); + // Add the remaining peers. + peer_store.add_direct_peer(&clock.clock(), peer_info_a.clone()); + peer_store.add_direct_peer(&clock.clock(), peer_info_b.clone()); - // Connect to both nodes - for peer_info in [peer_info_a.clone(), peer_info_b.clone()] { - peer_store.peer_connected(&clock.clock(), &peer_info).unwrap(); - } - assert_eq!( - get_in_memory_status(&peer_store), - [Some(Connected), Some(Connected), Some(Unknown)] - ); - assert_eq!(get_database_status(), [Some(Connected), Some(Connected), None]); - - // Disconnect from 'b' - peer_store.peer_disconnected(&clock.clock(), &peer_info_b.id).unwrap(); - - assert_eq!( - get_in_memory_status(&peer_store), - [Some(Connected), Some(NotConnected), Some(Unknown)] - ); - assert_eq!(get_database_status(), [Some(Connected), Some(NotConnected), None]); - - // if we prefer 'previously connected' peers - we should keep picking 'b'. - assert_eq!( - (0..10) - .map(|_| peer_store.unconnected_peer(|_| false, true).unwrap().id) - .collect::>(), - [peer_info_b.id.clone()].into_iter().collect() - ); - - // if we don't care, we should pick either 'b' or 'boot'. - assert_eq!( - (0..100) - .map(|_| peer_store.unconnected_peer(|_| false, false).unwrap().id) - .collect::>(), - [peer_info_b.id.clone(), peer_info_boot_node.id.clone()].into_iter().collect() - ); - - // And fail when trying to reconnect to b. - peer_store - .peer_connection_attempt( - &clock.clock(), - &peer_info_b.id, - Err(anyhow::anyhow!("b failed to connect error")), - ) - .unwrap(); + assert_eq!(get_in_memory_status(&peer_store), [Some(Unknown), Some(Unknown), Some(Unknown)]); - // It should move 'back' into Unknown state. - assert_eq!( - get_in_memory_status(&peer_store), - [Some(Connected), Some(Unknown), Some(Unknown)] - ); - assert_eq!(get_database_status(), [Some(Connected), Some(Unknown), None]); + // Connect to both nodes + for peer_info in [peer_info_a.clone(), peer_info_b.clone()] { + peer_store.peer_connected(&clock.clock(), &peer_info); } - - { - // Let's reset the store. - let store = store::Store::from(opener.open().unwrap()); - let peer_store = PeerStore::new( + assert_eq!( + get_in_memory_status(&peer_store), + [Some(Connected), Some(Connected), Some(Unknown)] + ); + + // Disconnect from 'b' + peer_store.peer_disconnected(&clock.clock(), &peer_info_b.id).unwrap(); + + assert_eq!( + get_in_memory_status(&peer_store), + [Some(Connected), Some(NotConnected), Some(Unknown)] + ); + + // if we prefer 'previously connected' peers - we should keep picking 'b'. + assert_eq!( + (0..10) + .map(|_| peer_store.unconnected_peer(|_| false, true).unwrap().id) + .collect::>(), + [peer_info_b.id.clone()].into_iter().collect() + ); + + // if we don't care, we should pick either 'b' or 'boot'. + assert_eq!( + (0..100) + .map(|_| peer_store.unconnected_peer(|_| false, false).unwrap().id) + .collect::>(), + [peer_info_b.id.clone(), peer_info_boot_node.id.clone()].into_iter().collect() + ); + + // And fail when trying to reconnect to b. + peer_store + .peer_connection_attempt( &clock.clock(), - make_config(&boot_nodes, Blacklist::default(), false), - store, + &peer_info_b.id, + Err(anyhow::anyhow!("b failed to connect error")), ) .unwrap(); - assert_eq!( - get_in_memory_status(&peer_store), - [Some(NotConnected), Some(Unknown), Some(Unknown)] - ); - assert_eq!(get_database_status(), [Some(Connected), Some(Unknown), None]); - // After restart - we should try to connect to 'a' (if we prefer previously connected nodes). - assert_eq!( - (0..10) - .map(|_| peer_store.unconnected_peer(|_| false, true).unwrap().id) - .collect::>(), - [peer_info_a.id.clone()].into_iter().collect() - ); - } + + // It should move 'back' into Unknown state. + assert_eq!(get_in_memory_status(&peer_store), [Some(Connected), Some(Unknown), Some(Unknown)]); } #[test] @@ -227,15 +156,11 @@ fn test_unconnected_peer_only_boot_nodes() { // 1 non-boot (peer_in_store) node peer that is in the store. // we should connect to peer_in_store { - let store = store::Store::from(near_store::db::TestDB::new()); - let peer_store = PeerStore::new( - &clock.clock(), - make_config(&boot_nodes, Blacklist::default(), false), - store, - ) - .unwrap(); - peer_store.add_direct_peer(&clock.clock(), peer_in_store.clone()).unwrap(); - peer_store.peer_connected(&clock.clock(), &peer_info_a).unwrap(); + let peer_store = + PeerStore::new(&clock.clock(), make_config(&boot_nodes, Blacklist::default(), false)) + .unwrap(); + peer_store.add_direct_peer(&clock.clock(), peer_in_store.clone()); + peer_store.peer_connected(&clock.clock(), &peer_info_a); assert_eq!(peer_store.unconnected_peer(|_| false, false), Some(peer_in_store.clone())); } @@ -243,29 +168,23 @@ fn test_unconnected_peer_only_boot_nodes() { // 1 non-boot (peer_in_store) node peer that is in the store. // connect to only boot nodes is enabled - we should not find any peer to connect to. { - let store = store::Store::from(near_store::db::TestDB::new()); - let peer_store = PeerStore::new( - &clock.clock(), - make_config(&boot_nodes, Default::default(), true), - store, - ) - .unwrap(); - peer_store.add_direct_peer(&clock.clock(), peer_in_store).unwrap(); - peer_store.peer_connected(&clock.clock(), &peer_info_a).unwrap(); + let peer_store = + PeerStore::new(&clock.clock(), make_config(&boot_nodes, Default::default(), true)) + .unwrap(); + peer_store.add_direct_peer(&clock.clock(), peer_in_store); + peer_store.peer_connected(&clock.clock(), &peer_info_a); assert_eq!(peer_store.unconnected_peer(|_| false, false), None); } // 1 boot node (peer_info_a) is in the store. // we should connect to it - no matter what the setting is. for connect_to_boot_nodes in [true, false] { - let store = store::Store::from(near_store::db::TestDB::new()); let peer_store = PeerStore::new( &clock.clock(), make_config(&boot_nodes, Default::default(), connect_to_boot_nodes), - store, ) .unwrap(); - peer_store.add_direct_peer(&clock.clock(), peer_info_a.clone()).unwrap(); + peer_store.add_direct_peer(&clock.clock(), peer_info_a.clone()); assert_eq!(peer_store.unconnected_peer(|_| false, false), Some(peer_info_a.clone())); } } @@ -314,19 +233,18 @@ fn check_integrity(peer_store: &PeerStore) -> bool { #[test] fn handle_peer_id_change() { let clock = time::FakeClock::default(); - let store = store::Store::from(near_store::db::TestDB::new()); let peer_store = - PeerStore::new(&clock.clock(), make_config(&[], Default::default(), false), store).unwrap(); + PeerStore::new(&clock.clock(), make_config(&[], Default::default(), false)).unwrap(); let peers_id = (0..2).map(|ix| get_peer_id(format!("node{}", ix))).collect::>(); let addr = get_addr(0); let peer_aa = get_peer_info(peers_id[0].clone(), Some(addr)); - peer_store.peer_connected(&clock.clock(), &peer_aa).unwrap(); + peer_store.peer_connected(&clock.clock(), &peer_aa); assert!(check_exist(&peer_store, &peers_id[0], Some((addr, TrustLevel::Signed)))); let peer_ba = get_peer_info(peers_id[1].clone(), Some(addr)); - peer_store.add_direct_peer(&clock.clock(), peer_ba).unwrap(); + peer_store.add_direct_peer(&clock.clock(), peer_ba); assert!(check_exist(&peer_store, &peers_id[0], None)); assert!(check_exist(&peer_store, &peers_id[1], Some((addr, TrustLevel::Direct)))); @@ -339,19 +257,18 @@ fn handle_peer_id_change() { #[test] fn dont_handle_address_change() { let clock = time::FakeClock::default(); - let store = store::Store::from(near_store::db::TestDB::new()); let peer_store = - PeerStore::new(&clock.clock(), make_config(&[], Default::default(), false), store).unwrap(); + PeerStore::new(&clock.clock(), make_config(&[], Default::default(), false)).unwrap(); let peers_id = (0..1).map(|ix| get_peer_id(format!("node{}", ix))).collect::>(); let addrs = (0..2).map(get_addr).collect::>(); let peer_aa = get_peer_info(peers_id[0].clone(), Some(addrs[0])); - peer_store.peer_connected(&clock.clock(), &peer_aa).unwrap(); + peer_store.peer_connected(&clock.clock(), &peer_aa); assert!(check_exist(&peer_store, &peers_id[0], Some((addrs[0], TrustLevel::Signed)))); let peer_ba = get_peer_info(peers_id[0].clone(), Some(addrs[1])); - peer_store.add_direct_peer(&clock.clock(), peer_ba).unwrap(); + peer_store.add_direct_peer(&clock.clock(), peer_ba); assert!(check_exist(&peer_store, &peers_id[0], Some((addrs[0], TrustLevel::Signed)))); assert!(check_integrity(&peer_store)); } @@ -359,10 +276,8 @@ fn dont_handle_address_change() { #[test] fn check_add_peers_overriding() { let clock = time::FakeClock::default(); - let store = store::Store::from(near_store::db::TestDB::new()); let peer_store = - PeerStore::new(&clock.clock(), make_config(&[], Default::default(), false), store.clone()) - .unwrap(); + PeerStore::new(&clock.clock(), make_config(&[], Default::default(), false)).unwrap(); // Five peers: A, B, C, D, X, T let peers_id = (0..6).map(|ix| get_peer_id(format!("node{}", ix))).collect::>(); @@ -371,60 +286,60 @@ fn check_add_peers_overriding() { // Create signed connection A - #A let peer_00 = get_peer_info(peers_id[0].clone(), Some(addrs[0])); - peer_store.peer_connected(&clock.clock(), &peer_00).unwrap(); + peer_store.peer_connected(&clock.clock(), &peer_00); assert!(check_exist(&peer_store, &peers_id[0], Some((addrs[0], TrustLevel::Signed)))); assert!(check_integrity(&peer_store)); // Create direct connection B - #B let peer_11 = get_peer_info(peers_id[1].clone(), Some(addrs[1])); - peer_store.add_direct_peer(&clock.clock(), peer_11.clone()).unwrap(); + peer_store.add_direct_peer(&clock.clock(), peer_11.clone()); assert!(check_exist(&peer_store, &peers_id[1], Some((addrs[1], TrustLevel::Direct)))); assert!(check_integrity(&peer_store)); // Create signed connection B - #B - peer_store.peer_connected(&clock.clock(), &peer_11).unwrap(); + peer_store.peer_connected(&clock.clock(), &peer_11); assert!(check_exist(&peer_store, &peers_id[1], Some((addrs[1], TrustLevel::Signed)))); assert!(check_integrity(&peer_store)); // Create indirect connection C - #C let peer_22 = get_peer_info(peers_id[2].clone(), Some(addrs[2])); - peer_store.add_indirect_peers(&clock.clock(), [peer_22.clone()].into_iter()).unwrap(); + peer_store.add_indirect_peers(&clock.clock(), [peer_22.clone()].into_iter()); assert!(check_exist(&peer_store, &peers_id[2], Some((addrs[2], TrustLevel::Indirect)))); assert!(check_integrity(&peer_store)); // Create signed connection C - #C - peer_store.peer_connected(&clock.clock(), &peer_22).unwrap(); + peer_store.peer_connected(&clock.clock(), &peer_22); assert!(check_exist(&peer_store, &peers_id[2], Some((addrs[2], TrustLevel::Signed)))); assert!(check_integrity(&peer_store)); // Create signed connection C - #B // This overrides C - #C and B - #B let peer_21 = get_peer_info(peers_id[2].clone(), Some(addrs[1])); - peer_store.peer_connected(&clock.clock(), &peer_21).unwrap(); + peer_store.peer_connected(&clock.clock(), &peer_21); assert!(check_exist(&peer_store, &peers_id[1], None)); assert!(check_exist(&peer_store, &peers_id[2], Some((addrs[1], TrustLevel::Signed)))); assert!(check_integrity(&peer_store)); // Create indirect connection D - #D let peer_33 = get_peer_info(peers_id[3].clone(), Some(addrs[3])); - peer_store.add_indirect_peers(&clock.clock(), [peer_33].into_iter()).unwrap(); + peer_store.add_indirect_peers(&clock.clock(), [peer_33].into_iter()); assert!(check_exist(&peer_store, &peers_id[3], Some((addrs[3], TrustLevel::Indirect)))); assert!(check_integrity(&peer_store)); // Try to create indirect connection A - #X but fails since A - #A exists let peer_04 = get_peer_info(peers_id[0].clone(), Some(addrs[4])); - peer_store.add_indirect_peers(&clock.clock(), [peer_04].into_iter()).unwrap(); + peer_store.add_indirect_peers(&clock.clock(), [peer_04].into_iter()); assert!(check_exist(&peer_store, &peers_id[0], Some((addrs[0], TrustLevel::Signed)))); assert!(check_integrity(&peer_store)); // Try to create indirect connection X - #D but fails since D - #D exists let peer_43 = get_peer_info(peers_id[4].clone(), Some(addrs[3])); - peer_store.add_indirect_peers(&clock.clock(), [peer_43.clone()].into_iter()).unwrap(); + peer_store.add_indirect_peers(&clock.clock(), [peer_43.clone()].into_iter()); assert!(check_exist(&peer_store, &peers_id[3], Some((addrs[3], TrustLevel::Indirect)))); assert!(check_integrity(&peer_store)); // Create Direct connection X - #D and succeed removing connection D - #D - peer_store.add_direct_peer(&clock.clock(), peer_43).unwrap(); + peer_store.add_direct_peer(&clock.clock(), peer_43); assert!(check_exist(&peer_store, &peers_id[4], Some((addrs[3], TrustLevel::Direct)))); // D should still exist, but without any addr assert!(check_exist(&peer_store, &peers_id[3], None)); @@ -432,15 +347,9 @@ fn check_add_peers_overriding() { // Try to create indirect connection A - #T but fails since A - #A (signed) exists let peer_05 = get_peer_info(peers_id[0].clone(), Some(addrs[5])); - peer_store.add_direct_peer(&clock.clock(), peer_05).unwrap(); + peer_store.add_direct_peer(&clock.clock(), peer_05); assert!(check_exist(&peer_store, &peers_id[0], Some((addrs[0], TrustLevel::Signed)))); assert!(check_integrity(&peer_store)); - - // Check we are able to recover from store previous signed connection - let peer_store_2 = - PeerStore::new(&clock.clock(), make_config(&[], Default::default(), false), store).unwrap(); - assert!(check_exist(&peer_store_2, &peers_id[0], Some((addrs[0], TrustLevel::Indirect)))); - assert!(check_integrity(&peer_store_2)); } #[test] @@ -455,107 +364,25 @@ fn check_ignore_blacklisted_peers() { assert_eq!(expected, got); } - let ids = (0..6).map(|ix| get_peer_id(format!("node{}", ix))).collect::>(); - let store = store::Store::from(near_store::db::TestDB::new()); - - // Populate store with three peers. - { - let peer_store = PeerStore::new( - &clock.clock(), - make_config(&[], Default::default(), false), - store.clone(), - ) - .unwrap(); - peer_store - .add_indirect_peers( - &clock.clock(), - [ - get_peer_info(ids[0].clone(), None), - get_peer_info(ids[1].clone(), Some(get_addr(1))), - get_peer_info(ids[2].clone(), Some(get_addr(2))), - ] - .into_iter(), - ) - .unwrap(); - assert_peers(&peer_store, &[&ids[0], &ids[1], &ids[2]]); - } - - // Peers without address aren’t saved but make sure the rest are read - // correctly. - { - let peer_store = PeerStore::new( - &clock.clock(), - make_config(&[], Default::default(), false), - store.clone(), - ) - .unwrap(); - assert_peers(&peer_store, &[&ids[1], &ids[2]]); - } + let ids = (0..3).map(|ix| get_peer_id(format!("node{}", ix))).collect::>(); - // Blacklist one of the existing peers and one new peer. - { - let blacklist: blacklist::Blacklist = - ["127.0.0.1:2", "127.0.0.1:5"].iter().map(|e| e.parse().unwrap()).collect(); - let peer_store = - PeerStore::new(&clock.clock(), make_config(&[], blacklist, false), store).unwrap(); - // Peer 127.0.0.1:2 is removed since it's blacklisted. - assert_peers(&peer_store, &[&ids[1]]); - - peer_store - .add_indirect_peers( - &clock.clock(), - [ - get_peer_info(ids[3].clone(), None), - get_peer_info(ids[4].clone(), Some(get_addr(4))), - get_peer_info(ids[5].clone(), Some(get_addr(5))), - ] - .into_iter(), - ) - .unwrap(); - // Peer 127.0.0.1:5 is ignored and never added. - assert_peers(&peer_store, &[&ids[1], &ids[3], &ids[4]]); - } -} - -#[test] -fn remove_blacklisted_peers_from_store() { - let clock = time::FakeClock::default(); - let (_tmp_dir, opener) = NodeStorage::test_opener(); - let (peer_ids, peer_infos): (Vec<_>, Vec<_>) = (0..3) - .map(|i| { - let id = get_peer_id(format!("node{}", i)); - let info = get_peer_info(id.clone(), Some(get_addr(i))); - (id, info) - }) - .unzip(); + let blacklist: blacklist::Blacklist = + ["127.0.0.1:1"].iter().map(|e| e.parse().unwrap()).collect(); - // Add three peers. - { - let store = store::Store::from(opener.open().unwrap()); - let peer_store = - PeerStore::new(&clock.clock(), make_config(&[], Default::default(), false), store) - .unwrap(); - peer_store.add_indirect_peers(&clock.clock(), peer_infos.clone().into_iter()).unwrap(); - } - assert_peers_in_store(&opener, &peer_ids); + let peer_store = PeerStore::new(&clock.clock(), make_config(&[], blacklist, false)).unwrap(); - // Blacklisted peers are removed from the store. - { - let store = store::Store::from(opener.open().unwrap()); - let blacklist: blacklist::Blacklist = - [blacklist::Entry::from_addr(peer_infos[2].addr.unwrap())].into_iter().collect(); - let _peer_store = - PeerStore::new(&clock.clock(), make_config(&[], blacklist, false), store).unwrap(); - } - assert_peers_in_store(&opener, &peer_ids[0..2]); -} + peer_store.add_indirect_peers( + &clock.clock(), + [ + get_peer_info(ids[0].clone(), None), + get_peer_info(ids[1].clone(), Some(get_addr(1))), + get_peer_info(ids[2].clone(), Some(get_addr(2))), + ] + .into_iter(), + ); -#[track_caller] -fn assert_peers_in_store(opener: &StoreOpener, want: &[PeerId]) { - let store = crate::store::Store::from(opener.open().unwrap()); - let got: HashSet = store.list_peer_states().unwrap().into_iter().map(|x| x.0).collect(); - let want: HashSet = want.iter().cloned().collect(); - assert_eq!(got, want); + // Peer 127.0.0.1:1 is ignored and never added. + assert_peers(&peer_store, &[&ids[0], &ids[2]]); } #[track_caller] @@ -577,7 +404,6 @@ fn assert_peers_in_cache( #[test] fn test_delete_peers() { let clock = time::FakeClock::default(); - let (_tmp_dir, opener) = NodeStorage::test_opener(); let (peer_ids, peer_infos): (Vec<_>, Vec<_>) = (0..3) .map(|i| { let id = get_peer_id(format!("node{}", i)); @@ -587,23 +413,12 @@ fn test_delete_peers() { .unzip(); let peer_addresses = peer_infos.iter().map(|info| info.addr.unwrap()).collect::>(); - { - let store = store::Store::from(opener.open().unwrap()); - let peer_store = - PeerStore::new(&clock.clock(), make_config(&[], Default::default(), false), store) - .unwrap(); - peer_store.add_indirect_peers(&clock.clock(), peer_infos.into_iter()).unwrap(); - } - assert_peers_in_store(&opener, &peer_ids); + let peer_store = + PeerStore::new(&clock.clock(), make_config(&[], Default::default(), false)).unwrap(); - { - let store = store::Store::from(opener.open().unwrap()); - let peer_store = - PeerStore::new(&clock.clock(), make_config(&[], Default::default(), false), store) - .unwrap(); - assert_peers_in_cache(&peer_store, &peer_ids, &peer_addresses); - peer_store.0.lock().delete_peers(&peer_ids).unwrap(); - assert_peers_in_cache(&peer_store, &[], &[]); - } - assert_peers_in_store(&opener, &[]); + peer_store.add_indirect_peers(&clock.clock(), peer_infos.into_iter()); + assert_peers_in_cache(&peer_store, &peer_ids, &peer_addresses); + + peer_store.0.lock().delete_peers(&peer_ids); + assert_peers_in_cache(&peer_store, &[], &[]); } diff --git a/chain/network/src/store/mod.rs b/chain/network/src/store/mod.rs index f2f47095458..b2606237794 100644 --- a/chain/network/src/store/mod.rs +++ b/chain/network/src/store/mod.rs @@ -2,7 +2,7 @@ /// All transactions should be implemented within this module, /// in particular schema::StoreUpdate is not exported. use crate::network_protocol::Edge; -use crate::types::{ConnectionInfo, KnownPeerState}; +use crate::types::ConnectionInfo; use near_primitives::network::{AnnounceAccount, PeerId}; use near_primitives::types::AccountId; use std::collections::HashSet; @@ -118,33 +118,8 @@ impl Store { } } -// PeerStore storage. +// ConnectionStore storage. impl Store { - /// Inserts (peer_id,peer_state) to Peers column. - pub fn set_peer_state( - &mut self, - peer_id: &PeerId, - peer_state: &KnownPeerState, - ) -> Result<(), Error> { - let mut update = self.0.new_update(); - update.set::(peer_id, peer_state); - self.0.commit(update).map_err(Error) - } - - /// Deletes rows with keys in from Peers column. - pub fn delete_peer_states(&mut self, peers: &[PeerId]) -> Result<(), Error> { - let mut update = self.0.new_update(); - for p in peers { - update.delete::(p); - } - self.0.commit(update).map_err(Error) - } - - /// Reads the whole Peers column. - pub fn list_peer_states(&self) -> Result, Error> { - self.0.iter::().collect::>().map_err(Error) - } - pub fn set_recent_outbound_connections( &mut self, recent_outbound_connections: &Vec, diff --git a/chain/network/src/store/schema/mod.rs b/chain/network/src/store/schema/mod.rs index 36ce53a4eed..3d3086c17a8 100644 --- a/chain/network/src/store/schema/mod.rs +++ b/chain/network/src/store/schema/mod.rs @@ -11,6 +11,8 @@ use near_store::DBCol; use std::io; use std::sync::Arc; +#[cfg(test)] +mod testonly; #[cfg(test)] mod tests; @@ -25,82 +27,6 @@ impl Format for AccountIdFormat { } } -#[derive(BorshSerialize, BorshDeserialize)] -enum KnownPeerStatus { - Unknown, - NotConnected, - Connected, - /// UNIX timestamps in nanos. - Banned(primitives::ReasonForBan, u64), -} - -impl From for KnownPeerStatus { - fn from(s: primitives::KnownPeerStatus) -> Self { - match s { - primitives::KnownPeerStatus::Unknown => Self::Unknown, - primitives::KnownPeerStatus::NotConnected => Self::NotConnected, - primitives::KnownPeerStatus::Connected => Self::Connected, - primitives::KnownPeerStatus::Banned(r, t) => { - Self::Banned(r, t.unix_timestamp_nanos() as u64) - } - } - } -} - -impl From for primitives::KnownPeerStatus { - fn from(s: KnownPeerStatus) -> primitives::KnownPeerStatus { - match s { - KnownPeerStatus::Unknown => primitives::KnownPeerStatus::Unknown, - KnownPeerStatus::NotConnected => primitives::KnownPeerStatus::NotConnected, - KnownPeerStatus::Connected => primitives::KnownPeerStatus::Connected, - KnownPeerStatus::Banned(r, t) => primitives::KnownPeerStatus::Banned( - r, - time::Utc::from_unix_timestamp_nanos(t as i128).unwrap(), - ), - } - } -} - -/// A Borsh representation of the primitives::KnownPeerState. -/// TODO: Currently primitives::KnownPeerState implements Borsh serialization -/// directly, but eventually direct serialization should be removed -/// so that the storage format doesn't leak to the business logic. -/// TODO: Currently primitives::KnownPeerState is identical -/// to the KnownPeerStateRepr, but in the following PR the -/// timestamp type (currently u64), will be replaced with time::Utc. -#[derive(BorshSerialize, BorshDeserialize)] -pub(super) struct KnownPeerStateRepr { - peer_info: primitives::PeerInfo, - status: KnownPeerStatus, - /// UNIX timestamps in nanos. - first_seen: u64, - last_seen: u64, -} - -impl BorshRepr for KnownPeerStateRepr { - type T = primitives::KnownPeerState; - fn to_repr(s: &primitives::KnownPeerState) -> Self { - Self { - peer_info: s.peer_info.clone(), - status: s.status.clone().into(), - first_seen: s.first_seen.unix_timestamp_nanos() as u64, - last_seen: s.last_seen.unix_timestamp_nanos() as u64, - } - } - - fn from_repr(s: Self) -> Result { - Ok(primitives::KnownPeerState { - peer_info: s.peer_info, - status: s.status.into(), - first_seen: time::Utc::from_unix_timestamp_nanos(s.first_seen as i128) - .map_err(invalid_data)?, - last_seen: time::Utc::from_unix_timestamp_nanos(s.last_seen as i128) - .map_err(invalid_data)?, - last_outbound_attempt: None, - }) - } -} - /// A Borsh representation of the primitives::ConnectionInfo. #[derive(BorshSerialize, BorshDeserialize)] pub(super) struct ConnectionInfoRepr { @@ -170,13 +96,6 @@ impl Column for AccountAnnouncements { type Value = Borsh; } -pub(super) struct Peers; -impl Column for Peers { - const COL: DBCol = DBCol::Peers; - type Key = Borsh; - type Value = KnownPeerStateRepr; -} - pub(super) struct RecentOutboundConnections; impl Column for RecentOutboundConnections { const COL: DBCol = DBCol::RecentOutboundConnections; @@ -313,15 +232,6 @@ impl Store { self.0.write(update.0) } - pub fn iter( - &self, - ) -> impl Iterator::T, ::T), Error>> + '_ - { - debug_assert!(!C::COL.is_rc()); - self.0 - .iter_raw_bytes(C::COL) - .map(|item| item.and_then(|(k, v)| Ok((C::Key::decode(&k)?, C::Value::decode(&v)?)))) - } pub fn get( &self, k: &::T, diff --git a/chain/network/src/store/schema/testonly.rs b/chain/network/src/store/schema/testonly.rs new file mode 100644 index 00000000000..af48754c1f1 --- /dev/null +++ b/chain/network/src/store/schema/testonly.rs @@ -0,0 +1,13 @@ +use crate::store::schema::{Column, Error, Format}; + +impl super::Store { + pub fn iter( + &self, + ) -> impl Iterator::T, ::T), Error>> + '_ + { + debug_assert!(!C::COL.is_rc()); + self.0 + .iter_raw_bytes(C::COL) + .map(|item| item.and_then(|(k, v)| Ok((C::Key::decode(&k)?, C::Value::decode(&v)?)))) + } +} diff --git a/core/store/src/columns.rs b/core/store/src/columns.rs index e718481be8e..07451d94015 100644 --- a/core/store/src/columns.rs +++ b/core/store/src/columns.rs @@ -61,10 +61,9 @@ pub enum DBCol { /// - *Rows*: (block, shard) /// - *Content type*: Vec of [near_primitives::sharding::ReceiptProof] IncomingReceipts, - /// Info about the peers that we are connected to. Mapping from peer_id to KnownPeerState. - /// - *Rows*: peer_id (PublicKey) - /// - *Content type*: [network_primitives::types::KnownPeerState] - Peers, + /// Deprecated. + #[strum(serialize = "Peers")] + _Peers, /// List of recent outbound TIER2 connections. We'll attempt to re-establish /// these connections after node restart or upon disconnection. /// - *Rows*: single row (empty row name) @@ -430,7 +429,7 @@ impl DBCol { DBCol::_TransactionResult => &[DBKeyType::OutcomeId], DBCol::OutgoingReceipts => &[DBKeyType::BlockHash, DBKeyType::ShardId], DBCol::IncomingReceipts => &[DBKeyType::BlockHash, DBKeyType::ShardId], - DBCol::Peers => &[DBKeyType::PeerId], + DBCol::_Peers => &[DBKeyType::PeerId], DBCol::RecentOutboundConnections => &[DBKeyType::Empty], DBCol::EpochInfo => &[DBKeyType::EpochId], DBCol::BlockInfo => &[DBKeyType::BlockHash], diff --git a/core/store/src/db/rocksdb.rs b/core/store/src/db/rocksdb.rs index 8876dd1ee97..a87f4335e68 100644 --- a/core/store/src/db/rocksdb.rs +++ b/core/store/src/db/rocksdb.rs @@ -620,7 +620,7 @@ fn col_name(col: DBCol) -> &'static str { DBCol::_TransactionResult => "col7", DBCol::OutgoingReceipts => "col8", DBCol::IncomingReceipts => "col9", - DBCol::Peers => "col10", + DBCol::_Peers => "col10", DBCol::EpochInfo => "col11", DBCol::BlockInfo => "col12", DBCol::Chunks => "col13", diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index bc2f5fed2c1..bf551429f6c 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -1006,7 +1006,7 @@ mod tests { use rand::Rng; // An arbitrary non-rc non-insert-only column we can write data into. - const COLUMN: DBCol = DBCol::Peers; + const COLUMN: DBCol = DBCol::RecentOutboundConnections; assert!(!COLUMN.is_rc()); assert!(!COLUMN.is_insert_only()); diff --git a/core/store/src/migrations.rs b/core/store/src/migrations.rs index 83497a6e413..d1ece60611b 100644 --- a/core/store/src/migrations.rs +++ b/core/store/src/migrations.rs @@ -304,3 +304,14 @@ pub fn migrate_33_to_34(store: &Store, mut is_node_archival: bool) -> anyhow::Re update.commit()?; Ok(()) } + +/// Migrates the database from version 35 to 36. +/// +/// This involves deleting contents of Peers column which is now +/// deprecated and no longer used. +pub fn migrate_35_to_36(store: &Store) -> anyhow::Result<()> { + let mut update = store.store_update(); + update.delete_all(DBCol::_Peers); + update.commit()?; + Ok(()) +} diff --git a/nearcore/src/migrations.rs b/nearcore/src/migrations.rs index efbe0f885df..bceceaea981 100644 --- a/nearcore/src/migrations.rs +++ b/nearcore/src/migrations.rs @@ -159,6 +159,7 @@ impl<'a> near_store::StoreMigrator for Migrator<'a> { tracing::info!(target: "migrations", "It will happen in parallel with regular block processing. ETA is 5h for RPC node and 10h for archival node."); Ok(()) } + 35 => near_store::migrations::migrate_35_to_36(store), DB_VERSION.. => unreachable!(), } } diff --git a/tools/state-viewer/src/cli.rs b/tools/state-viewer/src/cli.rs index 2022c7e960f..d1ccad7b54e 100644 --- a/tools/state-viewer/src/cli.rs +++ b/tools/state-viewer/src/cli.rs @@ -67,8 +67,6 @@ pub enum StateViewerSubCommand { /// Looks up a certain partial chunk. #[clap(alias = "partial_chunks")] PartialChunks(PartialChunksCmd), - /// Prints stored peers information from the DB. - Peers, /// Looks up a certain receipt. Receipts(ReceiptsCmd), /// Replay headers from chain. @@ -107,7 +105,6 @@ impl StateViewerSubCommand { let storage = store_opener.open_in_mode(mode).unwrap(); let store = storage.get_store(temperature); - let db = storage.into_inner(temperature); match self { StateViewerSubCommand::Apply(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::ApplyChunk(cmd) => cmd.run(home_dir, near_config, store), @@ -127,7 +124,6 @@ impl StateViewerSubCommand { StateViewerSubCommand::DumpTx(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::EpochInfo(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::PartialChunks(cmd) => cmd.run(near_config, store), - StateViewerSubCommand::Peers => peers(db), StateViewerSubCommand::Receipts(cmd) => cmd.run(near_config, store), StateViewerSubCommand::Replay(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::RocksDBStats(cmd) => cmd.run(store_opener.path()), diff --git a/tools/state-viewer/src/commands.rs b/tools/state-viewer/src/commands.rs index ea6b3f8b271..94f13b7de72 100644 --- a/tools/state-viewer/src/commands.rs +++ b/tools/state-viewer/src/commands.rs @@ -16,7 +16,6 @@ use near_chain::{ }; use near_chain_configs::GenesisChangeConfig; use near_epoch_manager::{EpochManager, EpochManagerAdapter}; -use near_network::iter_peers_from_store; use near_primitives::account::id::AccountId; use near_primitives::block::{Block, BlockHeader}; use near_primitives::hash::CryptoHash; @@ -27,7 +26,6 @@ use near_primitives::state_record::StateRecord; use near_primitives::trie_key::TrieKey; use near_primitives::types::{chunk_extra::ChunkExtra, BlockHeight, ShardId, StateRoot}; use near_primitives_core::types::Gas; -use near_store::db::Database; use near_store::test_utils::create_test_store; use near_store::TrieDBStorage; use near_store::{Store, Trie, TrieCache, TrieCachingStorage, TrieConfig}; @@ -417,12 +415,6 @@ pub(crate) fn get_receipt(receipt_id: CryptoHash, near_config: NearConfig, store println!("Receipt: {:#?}", receipt); } -pub(crate) fn peers(db: Arc) { - iter_peers_from_store(db, |(peer_id, peer_info)| { - println!("{} {:?}", peer_id, peer_info); - }) -} - pub(crate) fn check_apply_block_result( block: &Block, apply_result: &ApplyTransactionResult,