From 824880f5c4d651b6aac928d32f0e72918123aea7 Mon Sep 17 00:00:00 2001 From: pompon0 Date: Tue, 22 Nov 2022 09:58:54 +0100 Subject: [PATCH] revamped TIER1 discovery protocol (#8085) * added some extra fields to AccountData, renamed a bunch of them for better readability. This PR is not backward compatible in a sense that the old binary won't accept new AccountData and vice versa (protobuf compatibility is preserved, however the custom validation logic will treat different fields as required). That's ok as the broadcasted data is not interpreted yet. * changed identifier of AccountData from (EpochId,AccountId) to AccountKey, which simplifies a lot of stuff and generally makes more sense * changed version ID of AccountData from an UTC timestamp to an integer: this allows to avoid bad impact of a wall clock time shift, which can be especially dangerous since we do not version AccountData by EpochId any more. The creation timestamp is still present in AccountData to be able to debug data freshness. We can additionally implement an expiration policy in the future based on the timestamp. * rearranged the code related to broadcasting AccountData to align with the TIER1-related code which comes next after this PR --- chain/client/src/client.rs | 38 +++--- chain/network/src/accounts_data/mod.rs | 82 ++++++------- chain/network/src/accounts_data/tests.rs | 98 ++++++---------- chain/network/src/config.rs | 99 +++++++++++----- chain/network/src/config_json.rs | 5 + chain/network/src/network_protocol/mod.rs | 17 +-- .../src/network_protocol/network.proto | 37 ++++-- .../proto_conv/account_key.rs | 24 ++-- .../src/network_protocol/proto_conv/crypto.rs | 22 +++- .../network_protocol/proto_conv/handshake.rs | 4 +- .../src/network_protocol/proto_conv/net.rs | 2 +- .../network/src/network_protocol/testonly.rs | 53 ++++----- chain/network/src/network_protocol/tests.rs | 8 +- chain/network/src/peer/peer_actor.rs | 3 - .../src/peer_manager/connection/mod.rs | 4 +- .../src/peer_manager/network_state/mod.rs | 2 + .../src/peer_manager/network_state/tier1.rs | 80 +++++++++++++ .../src/peer_manager/peer_manager_actor.rs | 108 +++++++----------- chain/network/src/peer_manager/testonly.rs | 55 +++++---- .../src/peer_manager/tests/accounts_data.rs | 87 +++++--------- chain/network/src/types.rs | 6 +- nearcore/src/config.rs | 2 - 22 files changed, 450 insertions(+), 386 deletions(-) create mode 100644 chain/network/src/peer_manager/network_state/tier1.rs diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 80f61baeb37..8a532724fe5 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -1707,7 +1707,7 @@ impl Client { } } - /// Collects block approvals. Returns false if block approval is invalid. + /// Collects block approvals. /// /// We send the approval to doomslug given the epoch of the current tip iff: /// 1. We are the block producer for the target height in the tip's epoch; @@ -2255,30 +2255,32 @@ impl Client { // require some tuning in the future. In particular, if we decide that connecting to // block & chunk producers of the next expoch is too expensive, we can postpone it // till almost the end of this epoch. - let mut accounts = HashMap::new(); + let mut account_keys = AccountKeys::new(); for epoch_id in [&tip.epoch_id, &tip.next_epoch_id] { // We assume here that calls to get_epoch_chunk_producers and get_epoch_block_producers_ordered // are cheaper than block processing (and that they will work with both this and // the next epoch). The caching on top of that (in tier1_accounts_cache field) is just // a defence in depth, based on the previous experience with expensive // RuntimeAdapter::get_validators_info call. - accounts.extend( - self.runtime_adapter.get_epoch_chunk_producers(epoch_id)?.iter().map(|it| { - ((epoch_id.clone(), it.account_id().clone()), it.public_key().clone()) - }), - ); - accounts.extend( - self.runtime_adapter - .get_epoch_block_producers_ordered(epoch_id, &tip.last_block_hash)? - .iter() - .map(|(it, _)| { - ((epoch_id.clone(), it.account_id().clone()), it.public_key().clone()) - }), - ); + for cp in self.runtime_adapter.get_epoch_chunk_producers(epoch_id)? { + account_keys + .entry(cp.account_id().clone()) + .or_default() + .insert(cp.public_key().clone()); + } + for (bp, _) in self + .runtime_adapter + .get_epoch_block_producers_ordered(epoch_id, &tip.last_block_hash)? + { + account_keys + .entry(bp.account_id().clone()) + .or_default() + .insert(bp.public_key().clone()); + } } - let accounts = Arc::new(accounts); - self.tier1_accounts_cache = Some((tip.epoch_id.clone(), accounts.clone())); - Ok(accounts) + let account_keys = Arc::new(account_keys); + self.tier1_accounts_cache = Some((tip.epoch_id.clone(), account_keys.clone())); + Ok(account_keys) } /// send_network_chain_info sends ChainInfo to PeerManagerActor. diff --git a/chain/network/src/accounts_data/mod.rs b/chain/network/src/accounts_data/mod.rs index 2c2d5587213..af5c87738e2 100644 --- a/chain/network/src/accounts_data/mod.rs +++ b/chain/network/src/accounts_data/mod.rs @@ -13,7 +13,7 @@ //! //! Strategy: //! - handling of SyncAccountsData should be throttled by PeerActor/PeerManagerActor. -//! - synchronously select interesting AccountData (i.e. those with never timestamp than any +//! - synchronously select interesting AccountData (i.e. those with newer version than any //! previously seen for the given (account_id,epoch_id) pair. //! - asynchronously verify signatures, until an invalid signature is encountered. //! - if any signature is invalid, drop validation of the remaining signature and ban the peer @@ -29,7 +29,7 @@ use crate::concurrency::arc_mutex::ArcMutex; use crate::network_protocol; use crate::network_protocol::SignedAccountData; use crate::types::AccountKeys; -use near_primitives::types::{AccountId, EpochId}; +use near_crypto::PublicKey; use rayon::iter::ParallelBridge; use std::collections::HashMap; use std::sync::Arc; @@ -49,29 +49,35 @@ pub(crate) enum Error { #[derive(Clone)] pub struct CacheSnapshot { - pub keys: Arc, + /// Map from account ID to account key. + /// Used only for selecting target when routing a message to a TIER1 peer. + /// TODO(gprusak): In fact, since the account key assigned to a given account ID can change + /// between epochs, Client should rather send messages to node with a specific account key, + /// rather than with a specific account ID. + pub keys_by_id: Arc, + /// Set of account keys allowed on TIER1 network. + pub keys: im::HashSet, /// Current state of knowledge about an account. - /// key is the public key of the account in the given epoch. - /// It will be used to verify new incoming versions of SignedAccountData - /// for this account. - pub data: im::HashMap<(EpochId, AccountId), Arc>, + /// `data.keys()` is a subset of `keys` at all times, + /// as cache is collecting data only about the accounts from `keys`, + /// and data about the particular account might be not known at the given moment. + pub data: im::HashMap>, } impl CacheSnapshot { fn is_new(&self, d: &SignedAccountData) -> bool { - let id = (d.epoch_id.clone(), d.account_id.clone()); - self.keys.contains_key(&id) - && match self.data.get(&id) { - Some(old) if old.timestamp >= d.timestamp => false, + self.keys.contains(&d.account_key) + && match self.data.get(&d.account_key) { + Some(old) if old.version >= d.version => false, _ => true, } } + fn try_insert(&mut self, d: Arc) -> Option> { if !self.is_new(&d) { return None; } - let id = (d.epoch_id.clone(), d.account_id.clone()); - self.data.insert(id, d.clone()); + self.data.insert(d.account_key.clone(), d.clone()); Some(d) } } @@ -81,7 +87,8 @@ pub(crate) struct Cache(ArcMutex); impl Cache { pub fn new() -> Self { Self(ArcMutex::new(CacheSnapshot { - keys: Arc::new(AccountKeys::default()), + keys_by_id: Arc::new(AccountKeys::default()), + keys: im::HashSet::new(), data: im::HashMap::new(), })) } @@ -89,19 +96,16 @@ impl Cache { /// Updates the set of important accounts and their public keys. /// The AccountData which is no longer important is dropped. /// Returns true iff the set of accounts actually changed. - pub fn set_keys(&self, keys: Arc) -> bool { + pub fn set_keys(&self, keys_by_id: Arc) -> bool { self.0.update(|inner| { // Skip further processing if the key set didn't change. // NOTE: if T implements Eq, then Arc short circuits equality for x == x. - if keys == inner.keys { + if keys_by_id == inner.keys_by_id { return false; } - for (k, v) in std::mem::take(&mut inner.data) { - if keys.contains_key(&k) { - inner.data.insert(k, v); - } - } - inner.keys = keys; + inner.keys_by_id = keys_by_id; + inner.keys = inner.keys_by_id.values().flatten().cloned().collect(); + inner.data.retain(|k, _| inner.keys.contains(k)); true }) } @@ -110,14 +114,14 @@ impl Cache { /// Returns the verified new data and an optional error. /// Note that even if error has been returned the partially validated output is returned /// anyway. - fn verify( + async fn verify( &self, data: Vec>, ) -> (Vec>, Option) { // Filter out non-interesting data, so that we never check signatures for valid non-interesting data. // Bad peers may force us to check signatures for fake data anyway, but we will ban them after first invalid signature. // It locks epochs for reading for a short period. - let mut data_and_keys = HashMap::new(); + let mut new_data = HashMap::new(); let inner = self.0.load(); for d in data { // There is a limit on the amount of RAM occupied by per-account datasets. @@ -125,34 +129,30 @@ impl Cache { if d.payload().len() > network_protocol::MAX_ACCOUNT_DATA_SIZE_BYTES { return (vec![], Some(Error::DataTooLarge)); } - let id = (d.epoch_id.clone(), d.account_id.clone()); // We want the communication needed for broadcasting per-account data to be minimal. // Therefore broadcasting multiple datasets per account is considered malicious // behavior, since all but one are obviously outdated. - if data_and_keys.contains_key(&id) { + if new_data.contains_key(&d.account_key) { return (vec![], Some(Error::SingleAccountMultipleData)); } // It is fine to broadcast data we already know about. - if !inner.is_new(&d) { - continue; - } // It is fine to broadcast account data that we don't care about. - let key = match inner.keys.get(&id) { - Some(key) => key.clone(), - None => continue, - }; - data_and_keys.insert(id, (d, key)); + if inner.is_new(&d) { + new_data.insert(d.account_key.clone(), d); + } } // Verify the signatures in parallel. // Verification will stop at the first encountered error. - let (data, ok) = - concurrency::rayon::try_map(data_and_keys.into_values().par_bridge(), |(d, key)| { - if d.payload().verify(&key).is_ok() { - return Some(d); + let (data, ok) = concurrency::rayon::run(move || { + concurrency::rayon::try_map(new_data.into_values().par_bridge(), |d| { + match d.payload().verify(&d.account_key) { + Ok(()) => Some(d), + Err(()) => None, } - return None; - }); + }) + }) + .await; if !ok { return (data, Some(Error::InvalidSignature)); } @@ -168,7 +168,7 @@ impl Cache { ) -> (Vec>, Option) { let this = self.clone(); // Execute verification on the rayon threadpool. - let (data, err) = concurrency::rayon::run(move || this.verify(data)).await; + let (data, err) = this.verify(data).await; // Insert the successfully verified data, even if an error has been encountered. let inserted = self.0.update(|inner| data.into_iter().filter_map(|d| inner.try_insert(d)).collect()); diff --git a/chain/network/src/accounts_data/tests.rs b/chain/network/src/accounts_data/tests.rs index 32d38a41e3c..389c6787d42 100644 --- a/chain/network/src/accounts_data/tests.rs +++ b/chain/network/src/accounts_data/tests.rs @@ -3,28 +3,20 @@ use crate::network_protocol::testonly as data; use crate::network_protocol::SignedAccountData; use crate::testonly::{assert_is_superset, make_rng, AsSet as _, Rng}; use crate::time; -use crate::types::AccountKeys; -use near_primitives::types::EpochId; -use near_primitives::validator_signer::{InMemoryValidatorSigner, ValidatorSigner as _}; +use near_primitives::validator_signer::InMemoryValidatorSigner; use pretty_assertions::assert_eq; use std::sync::Arc; -struct Signer { - epoch_id: EpochId, - signer: InMemoryValidatorSigner, -} - -impl Signer { - fn make_account_data(&self, rng: &mut Rng, timestamp: time::Utc) -> SignedAccountData { - data::make_account_data( - rng, - timestamp, - self.epoch_id.clone(), - self.signer.validator_id().clone(), - ) - .sign(&self.signer) +fn make_account_data( + rng: &mut Rng, + clock: &time::Clock, + version: u64, + signer: &InMemoryValidatorSigner, +) -> SignedAccountData { + let peer_id = data::make_peer_id(rng); + data::make_account_data(rng, version, clock.now_utc(), signer.public_key(), peer_id) + .sign(signer) .unwrap() - } } fn unwrap<'a, T: std::hash::Hash + std::cmp::Eq, E: std::fmt::Debug>( @@ -36,27 +28,8 @@ fn unwrap<'a, T: std::hash::Hash + std::cmp::Eq, E: std::fmt::Debug>( &v.0 } -fn make_signers(rng: &mut Rng, n: usize) -> Vec { - (0..n) - .map(|_| Signer { - epoch_id: data::make_epoch_id(rng), - signer: data::make_validator_signer(rng), - }) - .collect() -} - -fn make_account_keys(signers: &[Signer]) -> Arc { - Arc::new( - signers - .iter() - .map(|s| { - ( - (s.epoch_id.clone(), s.signer.validator_id().clone()), - s.signer.public_key().clone(), - ) - }) - .collect(), - ) +fn make_signers(rng: &mut Rng, n: usize) -> Vec { + (0..n).map(|_| data::make_validator_signer(rng)).collect() } #[tokio::test] @@ -64,11 +37,10 @@ async fn happy_path() { let mut rng = make_rng(2947294234); let rng = &mut rng; let clock = time::FakeClock::default(); - let now = clock.now_utc(); let signers: Vec<_> = make_signers(rng, 7); - let e0 = make_account_keys(&signers[0..5]); - let e1 = make_account_keys(&signers[2..7]); + let e0 = Arc::new(data::make_account_keys(&signers[0..5])); + let e1 = Arc::new(data::make_account_keys(&signers[2..7])); let cache = Arc::new(Cache::new()); assert_eq!(cache.load().data.values().count(), 0); // initially empty @@ -76,17 +48,17 @@ async fn happy_path() { assert_eq!(cache.load().data.values().count(), 0); // empty after initial set_keys. // initial insert - let a0 = Arc::new(signers[0].make_account_data(rng, now)); - let a1 = Arc::new(signers[1].make_account_data(rng, now)); + let a0 = Arc::new(make_account_data(rng, &clock.clock(), 1, &signers[0])); + let a1 = Arc::new(make_account_data(rng, &clock.clock(), 1, &signers[1])); let res = cache.clone().insert(vec![a0.clone(), a1.clone()]).await; assert_eq!([&a0, &a1].as_set(), unwrap(&res).as_set()); assert_eq!([&a0, &a1].as_set(), cache.load().data.values().collect()); // entries of various types - let a0new = Arc::new(signers[0].make_account_data(rng, now + time::Duration::seconds(1))); - let a1old = Arc::new(signers[1].make_account_data(rng, now - time::Duration::seconds(1))); - let a2 = Arc::new(signers[2].make_account_data(rng, now)); - let a5 = Arc::new(signers[5].make_account_data(rng, now)); + let a0new = Arc::new(make_account_data(rng, &clock.clock(), 2, &signers[0])); + let a1old = Arc::new(make_account_data(rng, &clock.clock(), 0, &signers[1])); + let a2 = Arc::new(make_account_data(rng, &clock.clock(), 1, &signers[2])); + let a5 = Arc::new(make_account_data(rng, &clock.clock(), 1, &signers[5])); let res = cache .clone() .insert(vec![ @@ -123,16 +95,16 @@ async fn data_too_large() { let mut rng = make_rng(2947294234); let rng = &mut rng; let clock = time::FakeClock::default(); - let now = clock.now_utc(); let signers = make_signers(rng, 3); - let e = make_account_keys(&signers); + let e = Arc::new(data::make_account_keys(&signers)); let cache = Arc::new(Cache::new()); cache.set_keys(e); - let a0 = Arc::new(signers[0].make_account_data(rng, now)); - let a1 = Arc::new(signers[1].make_account_data(rng, now)); - let mut a2_too_large: SignedAccountData = signers[2].make_account_data(rng, now); + let a0 = Arc::new(make_account_data(rng, &clock.clock(), 1, &signers[0])); + let a1 = Arc::new(make_account_data(rng, &clock.clock(), 1, &signers[1])); + let mut a2_too_large: SignedAccountData = + make_account_data(rng, &clock.clock(), 1, &signers[2]); *a2_too_large.payload_mut() = (0..crate::network_protocol::MAX_ACCOUNT_DATA_SIZE_BYTES + 1).map(|_| 17).collect(); let a2_too_large = Arc::new(a2_too_large); @@ -158,16 +130,15 @@ async fn invalid_signature() { let mut rng = make_rng(2947294234); let rng = &mut rng; let clock = time::FakeClock::default(); - let now = clock.now_utc(); let signers = make_signers(rng, 3); - let e = make_account_keys(&signers); + let e = Arc::new(data::make_account_keys(&signers)); let cache = Arc::new(Cache::new()); cache.set_keys(e); - let a0 = Arc::new(signers[0].make_account_data(rng, now)); - let mut a1 = signers[1].make_account_data(rng, now); - let mut a2_invalid_sig = signers[2].make_account_data(rng, now); + let a0 = Arc::new(make_account_data(rng, &clock.clock(), 1, &signers[0])); + let mut a1 = make_account_data(rng, &clock.clock(), 1, &signers[1]); + let mut a2_invalid_sig = make_account_data(rng, &clock.clock(), 1, &signers[2]); *a2_invalid_sig.signature_mut() = a1.signature_mut().clone(); let a1 = Arc::new(a1); let a2_invalid_sig = Arc::new(a2_invalid_sig); @@ -193,17 +164,16 @@ async fn single_account_multiple_data() { let mut rng = make_rng(2947294234); let rng = &mut rng; let clock = time::FakeClock::default(); - let now = clock.now_utc(); let signers = make_signers(rng, 3); - let e = make_account_keys(&signers); + let e = Arc::new(data::make_account_keys(&signers)); let cache = Arc::new(Cache::new()); cache.set_keys(e); - let a0 = Arc::new(signers[0].make_account_data(rng, now)); - let a1 = Arc::new(signers[1].make_account_data(rng, now)); - let a2old = Arc::new(signers[2].make_account_data(rng, now)); - let a2new = Arc::new(signers[2].make_account_data(rng, now + time::Duration::seconds(1))); + let a0 = Arc::new(make_account_data(rng, &clock.clock(), 1, &signers[0])); + let a1 = Arc::new(make_account_data(rng, &clock.clock(), 1, &signers[1])); + let a2old = Arc::new(make_account_data(rng, &clock.clock(), 1, &signers[2])); + let a2new = Arc::new(make_account_data(rng, &clock.clock(), 2, &signers[2])); // 2 entries for the same (epoch_id,account_id) => SingleAccountMultipleData let res = diff --git a/chain/network/src/config.rs b/chain/network/src/config.rs index e7bbe5af08c..570d6352453 100644 --- a/chain/network/src/config.rs +++ b/chain/network/src/config.rs @@ -24,26 +24,39 @@ pub const MAX_ROUTES_TO_STORE: usize = 5; /// Maximum number of PeerAddts in the ValidatorConfig::endpoints field. pub const MAX_PEER_ADDRS: usize = 10; -/// ValidatorEndpoints are the endpoints that peers should connect to, to send messages to this -/// validator. Validator will sign the endpoints and broadcast them to the network. -/// For a static setup (a static IP, or a list of relay nodes with static IPs) use PublicAddrs. -/// For a dynamic setup (with a single dynamic/ephemeral IP), use TrustedStunServers. +/// Address of the format ":" of STUN servers. +// TODO(gprusak): turn into a proper struct implementing Display and FromStr. +pub type StunServerAddr = String; + +/// ValidatorProxies are nodes with public IP (aka proxies) that this validator trusts to be honest +/// and willing to forward traffic to this validator. Whenever this node is a TIER1 validator +/// (i.e. whenever it is a block producer/chunk producer/approver for the given epoch), +/// it will connect to all the proxies in this config and advertise a signed list of proxies that +/// it has established a connection to. +/// +/// Once other TIER1 nodes learn the list of proxies, they will maintain a connection to a random +/// proxy on this list. This way a message from any TIER1 node to this node will require at most 2 +/// hops. +/// +/// neard supports 2 modes for configuring proxy addresses: +/// * [recommended] `Static` list of proxies (public SocketAddr + PeerId), supports up to 10 proxies. +/// It is a totally valid setup for a TIER1 validator to be its own (perahaps only) proxy: +/// to achieve that, add an entry with the public address of this node to the Static list. +/// * [discouraged] `Dynamic` proxy - in case you want this validator to be its own and only proxy, +/// instead of adding the public address explicitly to the `Static` list, you can specify a STUN +/// server address (or a couple of them) which will be used to dynamically resolve the public IP +/// of this validator. Note that in this case the validator trusts the STUN servers to correctly +/// resolve the public IP. #[derive(Clone)] -pub enum ValidatorEndpoints { - /// Single public address of this validator, or a list of public addresses of trusted nodes - /// willing to route messages to this validator. Validator will connect to the listed relay - /// nodes on startup. - PublicAddrs(Vec), - /// Addresses of the format ":" of STUN servers. - /// The IP of the validator will be determined dynamically by querying all the STUN servers on - /// the list. - TrustedStunServers(Vec), +pub enum ValidatorProxies { + Static(Vec), + Dynamic(Vec), } #[derive(Clone)] pub struct ValidatorConfig { pub signer: Arc, - pub endpoints: ValidatorEndpoints, + pub proxies: ValidatorProxies, } impl ValidatorConfig { @@ -53,8 +66,13 @@ impl ValidatorConfig { } #[derive(Clone)] -pub struct Features { - pub enable_tier1: bool, +pub struct Features {} + +#[derive(Clone)] +pub struct Tier1 { + /// Interval between broacasts of the list of validator's proxies. + /// Before the broadcast, validator tries to establish all the missing connections to proxies. + pub advertise_proxies_interval: time::Duration, } /// Validated configuration for the peer-to-peer manager. @@ -117,8 +135,8 @@ pub struct NetworkConfig { pub accounts_data_broadcast_rate_limit: demux::RateLimit, /// Maximal rate at which RoutingTableUpdate can be sent out. pub routing_table_update_rate_limit: demux::RateLimit, - /// features - pub features: Features, + /// Config of the TIER1 network. + pub tier1: Option, // Whether to ignore tombstones some time after startup. // @@ -139,7 +157,6 @@ impl NetworkConfig { node_key: SecretKey, validator_signer: Option>, archive: bool, - features: Features, ) -> anyhow::Result { if cfg.public_addrs.len() > MAX_PEER_ADDRS { anyhow::bail!( @@ -150,14 +167,34 @@ impl NetworkConfig { if cfg.public_addrs.len() > 0 && cfg.trusted_stun_servers.len() > 0 { anyhow::bail!("you cannot specify both public_addrs and trusted_stun_servers"); } + for proxy in &cfg.public_addrs { + let ip = proxy.addr.ip(); + if cfg.allow_private_ip_in_public_addrs { + if ip.is_unspecified() { + anyhow::bail!("public_addrs: {ip} is not a valid IP. If you wanted to specify a loopback IP, use 127.0.0.1 instead."); + } + } else { + // TODO(gprusak): use !ip.is_global() instead, once it is stable. + if ip.is_loopback() + || ip.is_unspecified() + || match ip { + std::net::IpAddr::V4(ip) => ip.is_private(), + // TODO(gprusak): use ip.is_unique_local() once stable. + std::net::IpAddr::V6(_) => false, + } + { + anyhow::bail!("public_addrs: {ip} is not a public IP."); + } + } + } let this = Self { node_key, validator: validator_signer.map(|signer| ValidatorConfig { signer, - endpoints: if cfg.public_addrs.len() > 0 { - ValidatorEndpoints::PublicAddrs(cfg.public_addrs) + proxies: if cfg.public_addrs.len() > 0 { + ValidatorProxies::Static(cfg.public_addrs) } else { - ValidatorEndpoints::TrustedStunServers(cfg.trusted_stun_servers) + ValidatorProxies::Dynamic(cfg.trusted_stun_servers) }, }), node_addr: match cfg.addr.as_str() { @@ -219,7 +256,7 @@ impl NetworkConfig { archive, accounts_data_broadcast_rate_limit: demux::RateLimit { qps: 0.1, burst: 1 }, routing_table_update_rate_limit: demux::RateLimit { qps: 0.5, burst: 1 }, - features, + tier1: Some(Tier1 { advertise_proxies_interval: time::Duration::minutes(15) }), inbound_disabled: cfg.experimental.inbound_disabled, skip_tombstones: if cfg.experimental.skip_sending_tombstones_seconds > 0 { Some(time::Duration::seconds(cfg.experimental.skip_sending_tombstones_seconds)) @@ -246,7 +283,7 @@ impl NetworkConfig { KeyType::ED25519, seed, )), - endpoints: ValidatorEndpoints::PublicAddrs(vec![PeerAddr { + proxies: ValidatorProxies::Static(vec![PeerAddr { addr: node_addr, peer_id: PeerId::new(node_key.public_key()), }]), @@ -284,7 +321,11 @@ impl NetworkConfig { archive: false, accounts_data_broadcast_rate_limit: demux::RateLimit { qps: 100., burst: 1000000 }, routing_table_update_rate_limit: demux::RateLimit { qps: 100., burst: 1000000 }, - features: Features { enable_tier1: true }, + tier1: Some(Tier1 { + // Interval is very large, so that it doesn't happen spontaneously in tests. + // It should rather be triggered manually in tests. + advertise_proxies_interval: time::Duration::hours(1000), + }), skip_tombstones: None, event_sink: Sink::null(), } @@ -362,7 +403,6 @@ mod test { use crate::network_protocol::AccountData; use crate::testonly::make_rng; use crate::time; - use near_primitives::validator_signer::ValidatorSigner; #[test] fn test_network_config() { @@ -395,15 +435,16 @@ mod test { let signer = data::make_validator_signer(&mut rng); let ad = AccountData { - peers: (0..config::MAX_PEER_ADDRS) + proxies: (0..config::MAX_PEER_ADDRS) .map(|_| { // Using IPv6 gives maximal size of the resulting config. let ip = data::make_ipv6(&mut rng); data::make_peer_addr(&mut rng, ip) }) .collect(), - account_id: signer.validator_id().clone(), - epoch_id: data::make_epoch_id(&mut rng), + account_key: signer.public_key(), + peer_id: data::make_peer_id(&mut rng), + version: 0, timestamp: clock.now_utc(), }; let sad = ad.sign(&signer).unwrap(); diff --git a/chain/network/src/config_json.rs b/chain/network/src/config_json.rs index 1155e3ce450..a86c446a576 100644 --- a/chain/network/src/config_json.rs +++ b/chain/network/src/config_json.rs @@ -158,6 +158,10 @@ pub struct Config { /// This setup is not reliable in presence of byzantine peers. #[serde(default)] pub public_addrs: Vec, + /// For local tests only (localnet). Allows specifying IPs from private range + /// (which are not visible from the public internet) in public_addrs field. + #[serde(default)] + pub allow_private_ip_in_public_addrs: bool, /// List of endpoints of trusted [STUN servers](https://datatracker.ietf.org/doc/html/rfc8489). /// /// Used only if this node is a validator and public_ips is empty (see @@ -222,6 +226,7 @@ impl Default for Config { monitor_peers_max_period: default_monitor_peers_max_period(), peer_expiration_duration: default_peer_expiration_duration(), public_addrs: vec![], + allow_private_ip_in_public_addrs: false, trusted_stun_servers: vec![], experimental: Default::default(), } diff --git a/chain/network/src/network_protocol/mod.rs b/chain/network/src/network_protocol/mod.rs index 9b2f5442919..ad1455863ae 100644 --- a/chain/network/src/network_protocol/mod.rs +++ b/chain/network/src/network_protocol/mod.rs @@ -37,7 +37,7 @@ use near_primitives::sharding::{ }; use near_primitives::syncing::{ShardStateSyncResponse, ShardStateSyncResponseV1}; use near_primitives::transaction::SignedTransaction; -use near_primitives::types::{AccountId, EpochId}; +use near_primitives::types::AccountId; use near_primitives::types::{BlockHeight, ShardId}; use near_primitives::validator_signer::ValidatorSigner; use near_primitives::views::FinalExecutionOutcomeView; @@ -92,9 +92,10 @@ impl std::str::FromStr for PeerAddr { #[derive(PartialEq, Eq, Debug, Hash)] pub struct AccountData { - pub peers: Vec, - pub account_id: AccountId, - pub epoch_id: EpochId, + pub peer_id: PeerId, + pub proxies: Vec, + pub account_key: PublicKey, + pub version: u64, pub timestamp: time::Utc, } @@ -115,9 +116,9 @@ impl AccountData { /// consistute a cleaner never-panicking interface. pub fn sign(self, signer: &dyn ValidatorSigner) -> anyhow::Result { assert_eq!( - &self.account_id, - signer.validator_id(), - "AccountData.account_id doesn't match the signer's account_id" + self.account_key, + signer.public_key(), + "AccountData.account_key doesn't match the signer's account_key" ); let payload = proto::AccountKeyPayload::from(&self).write_to_bytes().unwrap(); if payload.len() > MAX_ACCOUNT_DATA_SIZE_BYTES { @@ -135,7 +136,7 @@ impl AccountData { } } -#[derive(PartialEq, Eq, Debug, Hash)] +#[derive(Clone, PartialEq, Eq, Debug, Hash)] pub struct AccountKeySignedPayload { payload: Vec, signature: near_crypto::Signature, diff --git a/chain/network/src/network_protocol/network.proto b/chain/network/src/network_protocol/network.proto index 6a077bf644b..488ca198f13 100644 --- a/chain/network/src/network_protocol/network.proto +++ b/chain/network/src/network_protocol/network.proto @@ -189,19 +189,32 @@ message PeerAddr { } message AccountData { - string account_id = 1; // required - // Either address of the node handling the account (if it has a public IP), - // or a list of peers known to be connected to the validator. + reserved 1,3; + + // PeerId of the node owning the account_key. + // Used to route the message over TIER1. + // TODO(gprusak): it should be possible to add support for routing + // messages to an account_id directly (for TIER1), instead of routing + // to a specific peer_id. Then this field won't be necessary. + // Unless we use it instead of AnnounceAccount. + PublicKey peer_id = 5; // required. + + PublicKey account_key = 6; // required. + + // List of nodes which + // - are trusted by the validator and + // - are connected to the validator directly + // - are willing to proxy traffic to the validator. + // It may include the validator node itself, if it has a public IP. // If empty, the validator explicitly declares that it has no public IP - // and the P2P routing should be used instead (discouraged, might be disallowed in the future). - repeated PeerAddr peers = 2; - - // Epoch for which this data is valid. - // This AccountData should be signed with the account key assigned to - // for epoch . - CryptoHash epoch_id = 3; - // If there are multiple signed AccountData messages for the same - // account_id for the same epoch, the one with the most recent timestamp is valid. + // and the TIER2 routing should be used instead (discouraged, might be disallowed in the future). + repeated PeerAddr proxies = 2; + + // Version of the AccountData. A node can override a previous version, + // by broadcasting a never version. + uint64 version = 7; + // Time of creation of this AccountData. + // TODO(gprusak): consider expiring the AccountData based on this field. google.protobuf.Timestamp timestamp = 4; } diff --git a/chain/network/src/network_protocol/proto_conv/account_key.rs b/chain/network/src/network_protocol/proto_conv/account_key.rs index d04f1749e9c..77787795c98 100644 --- a/chain/network/src/network_protocol/proto_conv/account_key.rs +++ b/chain/network/src/network_protocol/proto_conv/account_key.rs @@ -4,20 +4,18 @@ use super::*; use crate::network_protocol::proto; use crate::network_protocol::proto::account_key_payload::Payload_type as ProtoPT; use crate::network_protocol::{AccountData, AccountKeySignedPayload, SignedAccountData}; -use near_primitives::account::id::ParseAccountError; -use near_primitives::types::EpochId; use protobuf::{Message as _, MessageField as MF}; #[derive(thiserror::Error, Debug)] pub enum ParseAccountDataError { #[error("bad payload type")] BadPayloadType, - #[error("account_id: {0}")] - AccountId(ParseAccountError), + #[error("peer_id: {0}")] + PeerId(ParseRequiredError), + #[error("account_key: {0}")] + AccountKey(ParseRequiredError), #[error("peers: {0}")] Peers(ParseVecError), - #[error("epoch_id: {0}")] - EpochId(ParseRequiredError), #[error("timestamp: {0}")] Timestamp(ParseRequiredError), } @@ -29,9 +27,10 @@ impl From<&AccountData> for proto::AccountKeyPayload { fn from(x: &AccountData) -> Self { Self { payload_type: Some(ProtoPT::AccountData(proto::AccountData { - account_id: x.account_id.to_string(), - peers: x.peers.iter().map(Into::into).collect(), - epoch_id: MF::some((&x.epoch_id.0).into()), + peer_id: MF::some((&x.peer_id).into()), + account_key: MF::some((&x.account_key).into()), + proxies: x.proxies.iter().map(Into::into).collect(), + version: x.version, timestamp: MF::some(utc_to_proto(&x.timestamp)), ..Default::default() })), @@ -49,9 +48,10 @@ impl TryFrom<&proto::AccountKeyPayload> for AccountData { _ => return Err(Self::Error::BadPayloadType), }; Ok(Self { - account_id: x.account_id.clone().try_into().map_err(Self::Error::AccountId)?, - peers: try_from_slice(&x.peers).map_err(Self::Error::Peers)?, - epoch_id: EpochId(try_from_required(&x.epoch_id).map_err(Self::Error::EpochId)?), + peer_id: try_from_required(&x.peer_id).map_err(Self::Error::PeerId)?, + account_key: try_from_required(&x.account_key).map_err(Self::Error::AccountKey)?, + proxies: try_from_slice(&x.proxies).map_err(Self::Error::Peers)?, + version: x.version, timestamp: map_from_required(&x.timestamp, utc_from_proto) .map_err(Self::Error::Timestamp)?, }) diff --git a/chain/network/src/network_protocol/proto_conv/crypto.rs b/chain/network/src/network_protocol/proto_conv/crypto.rs index 81731bb648f..cbadf92b1b3 100644 --- a/chain/network/src/network_protocol/proto_conv/crypto.rs +++ b/chain/network/src/network_protocol/proto_conv/crypto.rs @@ -1,6 +1,7 @@ /// Conversion functions for messages representing crypto primitives. use crate::network_protocol::proto; use borsh::{BorshDeserialize as _, BorshSerialize as _}; +use near_crypto::PublicKey; use near_primitives::hash::CryptoHash; use near_primitives::network::PeerId; @@ -25,18 +26,31 @@ impl TryFrom<&proto::CryptoHash> for CryptoHash { ////////////////////////////////////////// -pub type ParsePeerIdError = borsh::maybestd::io::Error; +pub type ParsePublicKeyError = borsh::maybestd::io::Error; + +impl From<&PublicKey> for proto::PublicKey { + fn from(x: &PublicKey) -> Self { + Self { borsh: x.try_to_vec().unwrap(), ..Self::default() } + } +} + +impl TryFrom<&proto::PublicKey> for PublicKey { + type Error = ParsePublicKeyError; + fn try_from(p: &proto::PublicKey) -> Result { + Self::try_from_slice(&p.borsh) + } +} impl From<&PeerId> for proto::PublicKey { fn from(x: &PeerId) -> Self { - Self { borsh: x.try_to_vec().unwrap(), ..Self::default() } + x.public_key().into() } } impl TryFrom<&proto::PublicKey> for PeerId { - type Error = ParsePeerIdError; + type Error = ParsePublicKeyError; fn try_from(p: &proto::PublicKey) -> Result { - Self::try_from_slice(&p.borsh) + Ok(PeerId::new(PublicKey::try_from(p)?)) } } diff --git a/chain/network/src/network_protocol/proto_conv/handshake.rs b/chain/network/src/network_protocol/proto_conv/handshake.rs index a8484929346..95b1bd83427 100644 --- a/chain/network/src/network_protocol/proto_conv/handshake.rs +++ b/chain/network/src/network_protocol/proto_conv/handshake.rs @@ -66,9 +66,9 @@ impl TryFrom<&proto::PeerChainInfo> for PeerChainInfoV2 { #[derive(thiserror::Error, Debug)] pub enum ParseHandshakeError { #[error("sender_peer_id {0}")] - SenderPeerId(ParseRequiredError), + SenderPeerId(ParseRequiredError), #[error("target_peer_id {0}")] - TargetPeerId(ParseRequiredError), + TargetPeerId(ParseRequiredError), #[error("sender_listen_port {0}")] SenderListenPort(std::num::TryFromIntError), #[error("sender_chain_info {0}")] diff --git a/chain/network/src/network_protocol/proto_conv/net.rs b/chain/network/src/network_protocol/proto_conv/net.rs index ae9a2e62fb6..8fc16aef16b 100644 --- a/chain/network/src/network_protocol/proto_conv/net.rs +++ b/chain/network/src/network_protocol/proto_conv/net.rs @@ -50,7 +50,7 @@ pub enum ParsePeerAddrError { #[error("addr: {0}")] Addr(ParseRequiredError), #[error("peer_id: {0}")] - PeerId(ParseRequiredError), + PeerId(ParseRequiredError), } impl From<&PeerAddr> for proto::PeerAddr { diff --git a/chain/network/src/network_protocol/testonly.rs b/chain/network/src/network_protocol/testonly.rs index b9014692464..7f1518d75e2 100644 --- a/chain/network/src/network_protocol/testonly.rs +++ b/chain/network/src/network_protocol/testonly.rs @@ -229,15 +229,19 @@ impl ChunkSet { } } -pub fn make_epoch_id(rng: &mut R) -> EpochId { - EpochId(CryptoHash::hash_bytes(&rng.gen::<[u8; 19]>())) +pub fn make_account_keys(signers: &[InMemoryValidatorSigner]) -> AccountKeys { + let mut account_keys = AccountKeys::new(); + for s in signers { + account_keys.entry(s.validator_id().clone()).or_default().insert(s.public_key()); + } + account_keys } pub struct Chain { pub genesis_id: GenesisId, pub blocks: Vec, pub chunks: HashMap, - pub tier1_accounts: Vec<(EpochId, InMemoryValidatorSigner)>, + pub tier1_accounts: Vec, } impl Chain { @@ -256,9 +260,7 @@ impl Chain { hash: Default::default(), }, blocks, - tier1_accounts: (0..10) - .map(|_| (make_epoch_id(rng), make_validator_signer(rng))) - .collect(), + tier1_accounts: (0..10).map(|_| make_validator_signer(rng)).collect(), chunks: chunks.chunks, } } @@ -272,12 +274,7 @@ impl Chain { } pub fn get_tier1_accounts(&self) -> AccountKeys { - self.tier1_accounts - .iter() - .map(|(epoch_id, v)| { - ((epoch_id.clone(), v.validator_id().clone()), v.public_key().clone()) - }) - .collect() + make_account_keys(&self.tier1_accounts) } pub fn get_chain_info(&self) -> ChainInfo { @@ -318,16 +315,12 @@ impl Chain { ) -> Vec> { self.tier1_accounts .iter() - .map(|(epoch_id, v)| { + .map(|v| { + let peer_id = make_peer_id(rng); Arc::new( - make_account_data( - rng, - clock.now_utc(), - epoch_id.clone(), - v.validator_id().clone(), - ) - .sign(v) - .unwrap(), + make_account_data(rng, 1, clock.now_utc(), v.public_key(), peer_id) + .sign(v) + .unwrap(), ) }) .collect() @@ -377,12 +370,13 @@ pub fn make_peer_addr(rng: &mut impl Rng, ip: net::IpAddr) -> PeerAddr { pub fn make_account_data( rng: &mut impl Rng, + version: u64, timestamp: time::Utc, - epoch_id: EpochId, - account_id: AccountId, + account_key: PublicKey, + peer_id: PeerId, ) -> AccountData { AccountData { - peers: vec![ + proxies: vec![ // Can't inline make_ipv4/ipv6 calls, because 2-phase borrow // doesn't work. { @@ -398,18 +392,17 @@ pub fn make_account_data( make_peer_addr(rng, ip) }, ], - account_id, - epoch_id, + peer_id, + account_key, + version, timestamp, } } pub fn make_signed_account_data(rng: &mut impl Rng, clock: &time::Clock) -> SignedAccountData { let signer = make_validator_signer(rng); - let epoch_id = make_epoch_id(rng); - make_account_data(rng, clock.now_utc(), epoch_id, signer.validator_id().clone()) - .sign(&signer) - .unwrap() + let peer_id = make_peer_id(rng); + make_account_data(rng, 1, clock.now_utc(), signer.public_key(), peer_id).sign(&signer).unwrap() } // Accessors for creating malformed SignedAccountData diff --git a/chain/network/src/network_protocol/tests.rs b/chain/network/src/network_protocol/tests.rs index 2382ffd9986..d6c4d7a192e 100644 --- a/chain/network/src/network_protocol/tests.rs +++ b/chain/network/src/network_protocol/tests.rs @@ -7,6 +7,7 @@ use crate::types::{HandshakeFailureReason, PeerMessage}; use crate::types::{PartialEncodedChunkRequestMsg, PartialEncodedChunkResponseMsg}; use anyhow::{bail, Context as _}; use itertools::Itertools as _; +use rand::Rng as _; #[test] fn deduplicate_edges() { @@ -39,14 +40,15 @@ fn bad_account_data_size() { let signer = data::make_validator_signer(&mut rng); let ad = AccountData { - peers: (0..1000) + proxies: (0..1000) .map(|_| { let ip = data::make_ipv6(&mut rng); data::make_peer_addr(&mut rng, ip) }) .collect(), - account_id: signer.validator_id().clone(), - epoch_id: data::make_epoch_id(&mut rng), + account_key: signer.public_key(), + peer_id: data::make_peer_id(&mut rng), + version: rng.gen(), timestamp: clock.now_utc(), }; assert!(ad.sign(&signer).is_err()); diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index ca44c8fea19..b007697e448 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -110,9 +110,6 @@ pub(crate) struct PeerActor { peer_addr: SocketAddr, /// Peer type. peer_type: PeerType, - /// OUTBOUND-ONLY: Handshake specification. For outbound connections it is initialized - /// in constructor and then can change as HandshakeFailure and LastEdge messages - /// are received. For inbound connections, handshake is stateless. /// Framed wrapper to send messages through the TCP connection. framed: stream::FramedStream, diff --git a/chain/network/src/peer_manager/connection/mod.rs b/chain/network/src/peer_manager/connection/mod.rs index 7e894e70e6e..ff57a948529 100644 --- a/chain/network/src/peer_manager/connection/mod.rs +++ b/chain/network/src/peer_manager/connection/mod.rs @@ -159,9 +159,9 @@ impl Connection { let res = ds.iter().map(|_| ()).collect(); let mut sum = HashMap::<_, Arc>::new(); for d in ds.into_iter().flatten() { - match sum.entry((d.epoch_id.clone(), d.account_id.clone())) { + match sum.entry(d.account_key.clone()) { Entry::Occupied(mut x) => { - if x.get().timestamp < d.timestamp { + if x.get().version < d.version { x.insert(d); } } diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs index 1de33f4a32c..2e7a72a10d3 100644 --- a/chain/network/src/peer_manager/network_state/mod.rs +++ b/chain/network/src/peer_manager/network_state/mod.rs @@ -31,6 +31,8 @@ use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; use std::sync::Arc; use tracing::{debug, trace, Instrument}; +mod tier1; + /// Limit number of pending Peer actors to avoid OOM. pub(crate) const LIMIT_PENDING_PEERS: usize = 60; diff --git a/chain/network/src/peer_manager/network_state/tier1.rs b/chain/network/src/peer_manager/network_state/tier1.rs new file mode 100644 index 00000000000..f9ecbe6e486 --- /dev/null +++ b/chain/network/src/peer_manager/network_state/tier1.rs @@ -0,0 +1,80 @@ +use crate::accounts_data; +use crate::config; +use crate::network_protocol::{AccountData, PeerMessage, SignedAccountData, SyncAccountsData}; +use crate::peer_manager::peer_manager_actor::Event; +use crate::time; +use std::sync::Arc; + +impl super::NetworkState { + // Returns ValidatorConfig of this node iff it belongs to TIER1 according to `accounts_data`. + pub fn tier1_validator_config( + &self, + accounts_data: &accounts_data::CacheSnapshot, + ) -> Option<&config::ValidatorConfig> { + if self.config.tier1.is_none() { + return None; + } + self.config + .validator + .as_ref() + .filter(|cfg| accounts_data.keys.contains(&cfg.signer.public_key())) + } + + /// Tries to connect to ALL trusted proxies from the config, then broadcasts AccountData with + /// the set of proxies it managed to connect to. This way other TIER1 nodes can just connect + /// to ANY proxy of this node. + pub async fn tier1_advertise_proxies( + self: &Arc, + clock: &time::Clock, + ) -> Vec> { + let accounts_data = self.accounts_data.load(); + let Some(vc) = self.tier1_validator_config(&accounts_data) else { + return vec![]; + }; + // TODO(gprusak): for now we just blindly broadcast the static list of proxies, however + // here we should try to connect to the TIER1 proxies, before broadcasting them. + let my_proxies = match &vc.proxies { + config::ValidatorProxies::Dynamic(_) => vec![], + config::ValidatorProxies::Static(proxies) => proxies.clone(), + }; + let now = clock.now_utc(); + let version = + self.accounts_data.load().data.get(&vc.signer.public_key()).map_or(0, |d| d.version) + + 1; + // This unwrap is safe, because we did signed a sample payload during + // config validation. See config::Config::new(). + let my_data = Arc::new( + AccountData { + peer_id: self.config.node_id(), + account_key: vc.signer.public_key(), + proxies: my_proxies.clone(), + timestamp: now, + version, + } + .sign(vc.signer.as_ref()) + .unwrap(), + ); + let (new_data, err) = self.accounts_data.insert(vec![my_data]).await; + // Inserting node's own AccountData should never fail. + if let Some(err) = err { + panic!("inserting node's own AccountData to self.state.accounts_data: {err}"); + } + if new_data.is_empty() { + // If new_data is empty, it means that accounts_data contains entry newer than `version`. + // This means that this node has been restarted and forgot what was the latest `version` + // of accounts_data published AND it just learned about it from a peer. + // TODO(gprusak): for better resiliency, consider persisting latest version in storage. + // TODO(gprusak): consider broadcasting a new version immediately after learning about + // conflicting version. + tracing::info!("received a conflicting version of AccountData (expected, iff node has been just restarted)"); + return vec![]; + } + self.tier2.broadcast_message(Arc::new(PeerMessage::SyncAccountsData(SyncAccountsData { + incremental: true, + requesting_full_sync: true, + accounts_data: new_data.clone(), + }))); + self.config.event_sink.push(Event::Tier1AdvertiseProxies(new_data.clone())); + new_data + } +} diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 2e61acab01a..f23a404e81b 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -2,8 +2,8 @@ use crate::client; use crate::config; use crate::debug::{DebugStatus, GetDebugStatus}; use crate::network_protocol::{ - AccountData, AccountOrPeerIdOrHash, Edge, PeerMessage, Ping, Pong, RawRoutedMessage, - RoutedMessageBody, StateResponseInfo, SyncAccountsData, + AccountOrPeerIdOrHash, Edge, PeerMessage, Ping, Pong, RawRoutedMessage, RoutedMessageBody, + SignedAccountData, StateResponseInfo, }; use crate::peer::peer_actor::PeerActor; use crate::peer_manager::connection; @@ -122,6 +122,8 @@ pub enum Event { // actually complete. Currently this event is reported only for some message types, // feel free to add support for more. MessageProcessed(PeerMessage), + // Reported every time a new list of proxies has been constructed. + Tier1AdvertiseProxies(Vec>), // Reported when a handshake has been started. HandshakeStarted(crate::peer::peer_actor::HandshakeStartedEvent), // Reported when a handshake has been successfully completed. @@ -261,6 +263,20 @@ impl PeerManagerActor { client, whitelist_nodes, )); + if let Some(cfg) = state.config.tier1.clone() { + // Connect to TIER1 proxies and broadcast the list those connections periodically. + arbiter.spawn({ + let clock = clock.clone(); + let state = state.clone(); + let mut interval = time::Interval::new(clock.now(), cfg.advertise_proxies_interval); + async move { + loop { + interval.tick(&clock).await; + state.tier1_advertise_proxies(&clock).await; + } + } + }); + } Ok(Self::start_in_arbiter(&arbiter, move |_ctx| Self { my_peer_id: my_peer_id.clone(), started_connect_attempts: false, @@ -944,93 +960,51 @@ impl Handler> for PeerManagerActor { } } -impl Handler> for PeerManagerActor { +impl actix::Handler> for PeerManagerActor { type Result = (); fn handle(&mut self, msg: WithSpanContext, ctx: &mut Self::Context) { let (_span, info) = handler_trace_span!(target: "network", msg); let _timer = metrics::PEER_MANAGER_MESSAGES_TIME.with_label_values(&["SetChainInfo"]).start_timer(); - let now = self.clock.now_utc(); let SetChainInfo(info) = info; let state = self.state.clone(); // We set state.chain_info and call accounts_data.set_keys // synchronously, therefore, assuming actix in-order delivery, // there will be no race condition between subsequent SetChainInfo // calls. - // TODO(gprusak): if we could make handle() async, then we could - // just require the caller to await for completion before calling - // SetChainInfo again. Alternatively we could have an async mutex - // on the handler. state.chain_info.store(Arc::new(Some(info.clone()))); - // If enable_tier1 is false, we skip set_keys() call. + // If tier1 is not enabled, we skip set_keys() call. // This way self.state.accounts_data is always empty, hence no data // will be collected or broadcasted. - if !state.config.features.enable_tier1 { + if state.config.tier1.is_none() { + state.config.event_sink.push(Event::SetChainInfo); return; } // If the key set didn't change, early exit. if !state.accounts_data.set_keys(info.tier1_accounts.clone()) { + state.config.event_sink.push(Event::SetChainInfo); return; } - ctx.spawn(wrap_future(async move { - // If the set of keys has changed, and the node is a validator, - // we should try to sign data and broadcast it. However, this is - // also a trigger for a full sync, so a dedicated broadcast is - // not required. - // - // TODO(gprusak): For dynamic self-IP-discovery, add a STUN daemon which - // will add new AccountData and trigger an incremental broadcast. - if let Some(vc) = &state.config.validator { - let my_account_id = vc.signer.validator_id(); - let my_public_key = vc.signer.public_key(); - // TODO(gprusak): STUN servers should be queried periocally by a daemon - // so that the my_peers list is always resolved. - // Note that currently we will broadcast an empty list. - // It won't help us to connect the the validator BUT it - // will indicate that a validator is misconfigured, which - // is could be useful for debugging. Consider keeping this - // behavior for situations when the IPs are not known. - let my_peers = match &vc.endpoints { - config::ValidatorEndpoints::TrustedStunServers(_) => vec![], - config::ValidatorEndpoints::PublicAddrs(peer_addrs) => peer_addrs.clone(), - }; - let my_data = info.tier1_accounts.iter().filter_map(|((epoch_id,account_id),key)| { - if account_id != my_account_id{ - return None; - } - if key != &my_public_key { - warn!(target: "network", "node's account_id found in TIER1 accounts, but the public keys do not match"); - return None; - } - // This unwrap is safe, because we did signed a sample payload during - // config validation. See config::Config::new(). - Some(Arc::new(AccountData { - epoch_id: epoch_id.clone(), - account_id: my_account_id.clone(), - timestamp: now, - peers: my_peers.clone(), - }.sign(vc.signer.as_ref()).unwrap())) - }).collect(); - // Insert node's own AccountData should never fail. - // We ignore the new data, because we trigger a full sync anyway. - if let (_, Some(err)) = state.accounts_data.insert(my_data).await { - panic!("inserting node's own AccountData to self.state.accounts_data: {err}"); - } + let clock = self.clock.clone(); + ctx.spawn(wrap_future( + async move { + // This node might have become a TIER1 node due to the change of the key set. + // If so we should recompute and readvertise the list of proxies. + // This is mostly important in case a node is its own proxy. In all other cases + // (when proxies are different nodes) the update of the key set happens asynchronously + // and this node won't be able to connect to proxies until it happens (and only the + // connected proxies are included in the advertisement). We run tier1_advertise_proxies + // periodically in the background anyway to cover those cases. + // + // The set of tier1 accounts has changed, so we might be missing some accounts_data + // that our peers know about. tier1_advertise_proxies() has a side effect + // of asking peers for a full sync of the accounts_data with the TIER2 peers. + state.tier1_advertise_proxies(&clock).await; + state.config.event_sink.push(Event::SetChainInfo); } - // The set of tier1 accounts has changed. - // We might miss some data, so we start a full sync with the connected peers. - // TODO(gprusak): add a daemon which does a periodic full sync in case some messages - // are lost (at a frequency which makes the additional network load negligible). - state.tier2.broadcast_message(Arc::new(PeerMessage::SyncAccountsData( - SyncAccountsData { - incremental: false, - requesting_full_sync: true, - accounts_data: state.accounts_data.load().data.values().cloned().collect(), - }, - ))); - state.config.event_sink.push(Event::SetChainInfo); - }.in_current_span())); + .in_current_span(), + )); } } diff --git a/chain/network/src/peer_manager/testonly.rs b/chain/network/src/peer_manager/testonly.rs index 4928e1e769a..95af4eb7257 100644 --- a/chain/network/src/peer_manager/testonly.rs +++ b/chain/network/src/peer_manager/testonly.rs @@ -2,7 +2,7 @@ use crate::broadcast; use crate::config; use crate::network_protocol::testonly as data; use crate::network_protocol::{ - EdgeState, Encoding, PeerAddr, PeerInfo, PeerMessage, SignedAccountData, SyncAccountsData, + EdgeState, Encoding, PeerInfo, PeerMessage, SignedAccountData, SyncAccountsData, }; use crate::peer; use crate::peer::peer_actor::ClosingReason; @@ -15,12 +15,13 @@ use crate::testonly::actix::ActixSystem; use crate::testonly::fake_client; use crate::time; use crate::types::{ - ChainInfo, KnownPeerStatus, NetworkRequests, PeerManagerMessageRequest, SetChainInfo, + AccountKeys, ChainInfo, KnownPeerStatus, NetworkRequests, PeerManagerMessageRequest, + SetChainInfo, }; use crate::PeerManagerActor; use near_o11y::WithSpanContextExt; use near_primitives::network::{AnnounceAccount, PeerId}; -use near_primitives::types::{AccountId, EpochId}; +use near_primitives::types::AccountId; use std::collections::HashSet; use std::future::Future; use std::pin::Pin; @@ -62,21 +63,16 @@ pub fn unwrap_sync_accounts_data_processed(ev: Event) -> Option, -} - -impl From<&Arc> for NormalAccountData { - fn from(d: &Arc) -> Self { - Self { - epoch_id: d.epoch_id.clone(), - account_id: d.account_id.clone(), - peers: d.peers.clone(), - } +pub(crate) fn make_chain_info(chain: &data::Chain, validators: &[&ActorHandler]) -> ChainInfo { + // Construct ChainInfo with tier1_accounts set to `validators`. + let mut chain_info = chain.get_chain_info(); + let mut account_keys = AccountKeys::new(); + for pm in validators { + let s = &pm.cfg.validator.as_ref().unwrap().signer; + account_keys.entry(s.validator_id().clone()).or_default().insert(s.public_key()); } + chain_info.tier1_accounts = Arc::new(account_keys); + chain_info } pub(crate) struct RawConnection { @@ -295,9 +291,10 @@ impl ActorHandler { self.with_state(move |s| async move { s.fix_local_edges(&clock, timeout).await }).await } - pub async fn set_chain_info(&mut self, chain_info: ChainInfo) { + pub async fn set_chain_info(&self, chain_info: ChainInfo) { + let mut events = self.events.from_now(); self.actix.addr.send(SetChainInfo(chain_info).with_span_context()).await.unwrap(); - self.events + events .recv_until(|ev| match ev { Event::PeerManager(PME::SetChainInfo) => Some(()), _ => None, @@ -305,6 +302,14 @@ impl ActorHandler { .await; } + pub async fn tier1_advertise_proxies( + &self, + clock: &time::Clock, + ) -> Vec> { + let clock = clock.clone(); + self.with_state(move |s| async move { s.tier1_advertise_proxies(&clock).await }).await + } + pub async fn announce_account(&self, aa: AnnounceAccount) { self.actix .addr @@ -317,19 +322,21 @@ impl ActorHandler { } // Awaits until the accounts_data state matches `want`. - pub async fn wait_for_accounts_data(&mut self, want: &HashSet) { + pub async fn wait_for_accounts_data(&self, want: &HashSet>) { + let mut events = self.events.from_now(); loop { - let got: HashSet<_> = self - .with_state(|s| async move { - s.accounts_data.load().data.values().map(|d| d.into()).collect() + let got = self + .with_state(move |s| async move { + s.accounts_data.load().data.values().cloned().collect::>() }) .await; + tracing::info!(target:"dupa","got = {:?}",got); if &got == want { break; } // It is important that we wait for the next PeerMessage::SyncAccountsData to get // PROCESSED, not just RECEIVED. Otherwise we would get a race condition. - self.events.recv_until(unwrap_sync_accounts_data_processed).await; + events.recv_until(unwrap_sync_accounts_data_processed).await; } } diff --git a/chain/network/src/peer_manager/tests/accounts_data.rs b/chain/network/src/peer_manager/tests/accounts_data.rs index 47addce7001..e19d179512b 100644 --- a/chain/network/src/peer_manager/tests/accounts_data.rs +++ b/chain/network/src/peer_manager/tests/accounts_data.rs @@ -1,11 +1,10 @@ use crate::concurrency::demux; -use crate::config; use crate::network_protocol::testonly as data; -use crate::network_protocol::{PeerAddr, SyncAccountsData}; +use crate::network_protocol::SyncAccountsData; use crate::peer; use crate::peer_manager; use crate::peer_manager::peer_manager_actor::Event as PME; -use crate::peer_manager::testonly::NormalAccountData; +use crate::peer_manager::testonly; use crate::testonly::{make_rng, AsSet as _}; use crate::time; use crate::types::PeerMessage; @@ -13,17 +12,9 @@ use itertools::Itertools; use near_o11y::testonly::init_test_logger; use pretty_assertions::assert_eq; use rand::seq::SliceRandom as _; +use std::collections::HashSet; use std::sync::Arc; -fn peer_addrs(vc: &config::ValidatorConfig) -> Vec { - match &vc.endpoints { - config::ValidatorEndpoints::PublicAddrs(peer_addrs) => peer_addrs.clone(), - config::ValidatorEndpoints::TrustedStunServers(_) => { - panic!("tests only support PublicAddrs in validator config") - } - } -} - #[tokio::test] async fn broadcast() { init_test_logger(); @@ -34,7 +25,7 @@ async fn broadcast() { let clock = clock.clock(); let clock = &clock; - let mut pm = peer_manager::testonly::start( + let pm = peer_manager::testonly::start( clock.clone(), near_store::db::TestDB::new(), chain.make_config(rng), @@ -56,7 +47,6 @@ async fn broadcast() { }; let data = chain.make_tier1_data(rng, clock); - tracing::info!(target:"test", "Connect peer, expect initial sync to be empty."); let mut peer1 = pm.start_inbound(chain.clone(), chain.make_config(rng)).await.handshake(clock).await; @@ -69,15 +59,15 @@ async fn broadcast() { incremental: true, requesting_full_sync: false, }; - let want = msg.accounts_data.clone(); + let want: HashSet<_> = msg.accounts_data.iter().cloned().collect(); peer1.send(PeerMessage::SyncAccountsData(msg)).await; - pm.wait_for_accounts_data(&want.iter().map(|d| d.into()).collect()).await; + pm.wait_for_accounts_data(&want).await; tracing::info!(target:"test", "Connect another peer and perform initial full sync."); let mut peer2 = pm.start_inbound(chain.clone(), chain.make_config(rng)).await.handshake(clock).await; let got2 = peer2.events.recv_until(take_full_sync).await; - assert_eq!(got2.accounts_data.as_set(), want.as_set()); + assert_eq!(got2.accounts_data.as_set(), want.iter().collect()); tracing::info!(target:"test", "Send a mix of new and old data. Only new data should be broadcasted."); let msg = SyncAccountsData { @@ -135,35 +125,24 @@ async fn gradual_epoch_change() { pms[0].connect_to(&pm1).await; pms[1].connect_to(&pm2).await; - // Validator configs. - let vs: Vec<_> = pms.iter().map(|pm| pm.cfg.validator.clone().unwrap()).collect(); - // For every order of nodes. for ids in (0..pms.len()).permutations(pms.len()) { - // Construct ChainInfo for a new epoch, - // with tier1_accounts containing all validators. - let e = data::make_epoch_id(rng); - let mut chain_info = chain.get_chain_info(); - chain_info.tier1_accounts = Arc::new( - vs.iter() - .map(|v| ((e.clone(), v.signer.validator_id().clone()), v.signer.public_key())) - .collect(), - ); + tracing::info!(target:"test", "permutation {ids:?}"); + clock.advance(time::Duration::hours(1)); + let chain_info = testonly::make_chain_info(&chain, &pms.iter().collect::>()[..]); + let mut want = HashSet::new(); // Advance epoch in the given order. for id in ids { pms[id].set_chain_info(chain_info.clone()).await; + // In this tests each node is its own proxy, so it can immediately + // connect to itself (to verify the public addr) and advertise it. + // If some other node B was a proxy for a node A, then first both + // A and B would have to update their chain_info, and only then A + // would be able to connect to B and advertise B as proxy afterwards. + want.extend(pms[id].tier1_advertise_proxies(&clock.clock()).await); } - // Wait for data to arrive. - let want = vs - .iter() - .map(|v| NormalAccountData { - epoch_id: e.clone(), - account_id: v.signer.validator_id().clone(), - peers: peer_addrs(v), - }) - .collect(); for pm in &mut pms { pm.wait_for_accounts_data(&want).await; } @@ -226,38 +205,24 @@ async fn rate_limiting() { } } - // Validator configs. - let vs: Vec<_> = pms.iter().map(|pm| pm.cfg.validator.clone().unwrap()).collect(); + // Construct ChainInfo with tier1_accounts containing all validators. + let chain_info = testonly::make_chain_info(&chain, &pms.iter().collect::>()[..]); + + clock.advance(time::Duration::hours(1)); - // Construct ChainInfo for a new epoch, - // with tier1_accounts containing all validators. - let e = data::make_epoch_id(rng); - let mut chain_info = chain.get_chain_info(); - chain_info.tier1_accounts = Arc::new( - vs.iter() - .map(|v| ((e.clone(), v.signer.validator_id().clone()), v.signer.public_key())) - .collect(), - ); + // Capture the event streams now, so that we can compute + // the total number of SyncAccountsData messages exchanged in the process. + let events: Vec<_> = pms.iter().map(|pm| pm.events.from_now()).collect(); tracing::info!(target:"test","Advance epoch in random order."); pms.shuffle(rng); + let mut want = HashSet::new(); for pm in &mut pms { pm.set_chain_info(chain_info.clone()).await; + want.extend(pm.tier1_advertise_proxies(&clock.clock()).await); } - // Capture the event streams at the start, so that we can compute - // the total number of SyncAccountsData messages exchanged in the process. - let events: Vec<_> = pms.iter().map(|pm| pm.events.clone()).collect(); - tracing::info!(target:"test","Wait for data to arrive."); - let want = vs - .iter() - .map(|v| NormalAccountData { - epoch_id: e.clone(), - account_id: v.signer.validator_id().clone(), - peers: peer_addrs(&v), - }) - .collect(); for pm in &mut pms { pm.wait_for_accounts_data(&want).await; } diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index 6c91619cabe..2dc9ad74933 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -16,9 +16,9 @@ use near_primitives::network::{AnnounceAccount, PeerId}; use near_primitives::sharding::PartialEncodedChunkWithArcReceipts; use near_primitives::transaction::SignedTransaction; use near_primitives::types::BlockHeight; -use near_primitives::types::{AccountId, EpochId, ShardId}; +use near_primitives::types::{AccountId, ShardId}; use once_cell::sync::OnceCell; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::net::SocketAddr; use std::sync::Arc; @@ -126,7 +126,7 @@ impl KnownPeerStatus { /// Set of account keys. /// This is information which chain pushes to network to implement tier1. /// See ChainInfo. -pub type AccountKeys = HashMap<(EpochId, AccountId), PublicKey>; +pub type AccountKeys = HashMap>; /// Network-relevant data about the chain. // TODO(gprusak): it is more like node info, or sth. diff --git a/nearcore/src/config.rs b/nearcore/src/config.rs index b964d2dc744..c755a10a4ef 100644 --- a/nearcore/src/config.rs +++ b/nearcore/src/config.rs @@ -612,8 +612,6 @@ impl NearConfig { network_key_pair.secret_key, validator_signer.clone(), config.archive, - // Enable tier1 (currently tier1 discovery only). - near_network::config::Features { enable_tier1: true }, )?, telemetry_config: config.telemetry, #[cfg(feature = "json_rpc")]