Skip to content

Commit

Permalink
revamped TIER1 discovery protocol (#8085)
Browse files Browse the repository at this point in the history
* added some extra fields to AccountData, renamed a bunch of them for better readability. This PR is not backward compatible in a sense that the old binary won't accept new AccountData and vice versa (protobuf compatibility is preserved, however the custom validation logic will treat different fields as required). That's ok as the broadcasted data is not interpreted yet.
* changed identifier of AccountData from (EpochId,AccountId) to AccountKey, which simplifies a lot of stuff and generally makes more sense
* changed version ID of AccountData from an UTC timestamp to an integer: this allows to avoid bad impact of a wall clock time shift, which can be especially dangerous since we do not version AccountData by EpochId any more. The creation timestamp is still present in AccountData to be able to debug data freshness. We can additionally implement an expiration policy in the future based on the timestamp.
* rearranged the code related to broadcasting AccountData to align with the TIER1-related code which comes next after this PR
  • Loading branch information
pompon0 authored Nov 22, 2022
1 parent e20a8f7 commit 824880f
Show file tree
Hide file tree
Showing 22 changed files with 450 additions and 386 deletions.
38 changes: 20 additions & 18 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1707,7 +1707,7 @@ impl Client {
}
}

/// Collects block approvals. Returns false if block approval is invalid.
/// Collects block approvals.
///
/// We send the approval to doomslug given the epoch of the current tip iff:
/// 1. We are the block producer for the target height in the tip's epoch;
Expand Down Expand Up @@ -2255,30 +2255,32 @@ impl Client {
// require some tuning in the future. In particular, if we decide that connecting to
// block & chunk producers of the next expoch is too expensive, we can postpone it
// till almost the end of this epoch.
let mut accounts = HashMap::new();
let mut account_keys = AccountKeys::new();
for epoch_id in [&tip.epoch_id, &tip.next_epoch_id] {
// We assume here that calls to get_epoch_chunk_producers and get_epoch_block_producers_ordered
// are cheaper than block processing (and that they will work with both this and
// the next epoch). The caching on top of that (in tier1_accounts_cache field) is just
// a defence in depth, based on the previous experience with expensive
// RuntimeAdapter::get_validators_info call.
accounts.extend(
self.runtime_adapter.get_epoch_chunk_producers(epoch_id)?.iter().map(|it| {
((epoch_id.clone(), it.account_id().clone()), it.public_key().clone())
}),
);
accounts.extend(
self.runtime_adapter
.get_epoch_block_producers_ordered(epoch_id, &tip.last_block_hash)?
.iter()
.map(|(it, _)| {
((epoch_id.clone(), it.account_id().clone()), it.public_key().clone())
}),
);
for cp in self.runtime_adapter.get_epoch_chunk_producers(epoch_id)? {
account_keys
.entry(cp.account_id().clone())
.or_default()
.insert(cp.public_key().clone());
}
for (bp, _) in self
.runtime_adapter
.get_epoch_block_producers_ordered(epoch_id, &tip.last_block_hash)?
{
account_keys
.entry(bp.account_id().clone())
.or_default()
.insert(bp.public_key().clone());
}
}
let accounts = Arc::new(accounts);
self.tier1_accounts_cache = Some((tip.epoch_id.clone(), accounts.clone()));
Ok(accounts)
let account_keys = Arc::new(account_keys);
self.tier1_accounts_cache = Some((tip.epoch_id.clone(), account_keys.clone()));
Ok(account_keys)
}

/// send_network_chain_info sends ChainInfo to PeerManagerActor.
Expand Down
82 changes: 41 additions & 41 deletions chain/network/src/accounts_data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//!
//! Strategy:
//! - handling of SyncAccountsData should be throttled by PeerActor/PeerManagerActor.
//! - synchronously select interesting AccountData (i.e. those with never timestamp than any
//! - synchronously select interesting AccountData (i.e. those with newer version than any
//! previously seen for the given (account_id,epoch_id) pair.
//! - asynchronously verify signatures, until an invalid signature is encountered.
//! - if any signature is invalid, drop validation of the remaining signature and ban the peer
Expand All @@ -29,7 +29,7 @@ use crate::concurrency::arc_mutex::ArcMutex;
use crate::network_protocol;
use crate::network_protocol::SignedAccountData;
use crate::types::AccountKeys;
use near_primitives::types::{AccountId, EpochId};
use near_crypto::PublicKey;
use rayon::iter::ParallelBridge;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -49,29 +49,35 @@ pub(crate) enum Error {

#[derive(Clone)]
pub struct CacheSnapshot {
pub keys: Arc<AccountKeys>,
/// Map from account ID to account key.
/// Used only for selecting target when routing a message to a TIER1 peer.
/// TODO(gprusak): In fact, since the account key assigned to a given account ID can change
/// between epochs, Client should rather send messages to node with a specific account key,
/// rather than with a specific account ID.
pub keys_by_id: Arc<AccountKeys>,
/// Set of account keys allowed on TIER1 network.
pub keys: im::HashSet<PublicKey>,
/// Current state of knowledge about an account.
/// key is the public key of the account in the given epoch.
/// It will be used to verify new incoming versions of SignedAccountData
/// for this account.
pub data: im::HashMap<(EpochId, AccountId), Arc<SignedAccountData>>,
/// `data.keys()` is a subset of `keys` at all times,
/// as cache is collecting data only about the accounts from `keys`,
/// and data about the particular account might be not known at the given moment.
pub data: im::HashMap<PublicKey, Arc<SignedAccountData>>,
}

impl CacheSnapshot {
fn is_new(&self, d: &SignedAccountData) -> bool {
let id = (d.epoch_id.clone(), d.account_id.clone());
self.keys.contains_key(&id)
&& match self.data.get(&id) {
Some(old) if old.timestamp >= d.timestamp => false,
self.keys.contains(&d.account_key)
&& match self.data.get(&d.account_key) {
Some(old) if old.version >= d.version => false,
_ => true,
}
}

fn try_insert(&mut self, d: Arc<SignedAccountData>) -> Option<Arc<SignedAccountData>> {
if !self.is_new(&d) {
return None;
}
let id = (d.epoch_id.clone(), d.account_id.clone());
self.data.insert(id, d.clone());
self.data.insert(d.account_key.clone(), d.clone());
Some(d)
}
}
Expand All @@ -81,27 +87,25 @@ pub(crate) struct Cache(ArcMutex<CacheSnapshot>);
impl Cache {
pub fn new() -> Self {
Self(ArcMutex::new(CacheSnapshot {
keys: Arc::new(AccountKeys::default()),
keys_by_id: Arc::new(AccountKeys::default()),
keys: im::HashSet::new(),
data: im::HashMap::new(),
}))
}

/// Updates the set of important accounts and their public keys.
/// The AccountData which is no longer important is dropped.
/// Returns true iff the set of accounts actually changed.
pub fn set_keys(&self, keys: Arc<AccountKeys>) -> bool {
pub fn set_keys(&self, keys_by_id: Arc<AccountKeys>) -> bool {
self.0.update(|inner| {
// Skip further processing if the key set didn't change.
// NOTE: if T implements Eq, then Arc<T> short circuits equality for x == x.
if keys == inner.keys {
if keys_by_id == inner.keys_by_id {
return false;
}
for (k, v) in std::mem::take(&mut inner.data) {
if keys.contains_key(&k) {
inner.data.insert(k, v);
}
}
inner.keys = keys;
inner.keys_by_id = keys_by_id;
inner.keys = inner.keys_by_id.values().flatten().cloned().collect();
inner.data.retain(|k, _| inner.keys.contains(k));
true
})
}
Expand All @@ -110,49 +114,45 @@ impl Cache {
/// Returns the verified new data and an optional error.
/// Note that even if error has been returned the partially validated output is returned
/// anyway.
fn verify(
async fn verify(
&self,
data: Vec<Arc<SignedAccountData>>,
) -> (Vec<Arc<SignedAccountData>>, Option<Error>) {
// Filter out non-interesting data, so that we never check signatures for valid non-interesting data.
// Bad peers may force us to check signatures for fake data anyway, but we will ban them after first invalid signature.
// It locks epochs for reading for a short period.
let mut data_and_keys = HashMap::new();
let mut new_data = HashMap::new();
let inner = self.0.load();
for d in data {
// There is a limit on the amount of RAM occupied by per-account datasets.
// Broadcasting larger datasets is considered malicious behavior.
if d.payload().len() > network_protocol::MAX_ACCOUNT_DATA_SIZE_BYTES {
return (vec![], Some(Error::DataTooLarge));
}
let id = (d.epoch_id.clone(), d.account_id.clone());
// We want the communication needed for broadcasting per-account data to be minimal.
// Therefore broadcasting multiple datasets per account is considered malicious
// behavior, since all but one are obviously outdated.
if data_and_keys.contains_key(&id) {
if new_data.contains_key(&d.account_key) {
return (vec![], Some(Error::SingleAccountMultipleData));
}
// It is fine to broadcast data we already know about.
if !inner.is_new(&d) {
continue;
}
// It is fine to broadcast account data that we don't care about.
let key = match inner.keys.get(&id) {
Some(key) => key.clone(),
None => continue,
};
data_and_keys.insert(id, (d, key));
if inner.is_new(&d) {
new_data.insert(d.account_key.clone(), d);
}
}

// Verify the signatures in parallel.
// Verification will stop at the first encountered error.
let (data, ok) =
concurrency::rayon::try_map(data_and_keys.into_values().par_bridge(), |(d, key)| {
if d.payload().verify(&key).is_ok() {
return Some(d);
let (data, ok) = concurrency::rayon::run(move || {
concurrency::rayon::try_map(new_data.into_values().par_bridge(), |d| {
match d.payload().verify(&d.account_key) {
Ok(()) => Some(d),
Err(()) => None,
}
return None;
});
})
})
.await;
if !ok {
return (data, Some(Error::InvalidSignature));
}
Expand All @@ -168,7 +168,7 @@ impl Cache {
) -> (Vec<Arc<SignedAccountData>>, Option<Error>) {
let this = self.clone();
// Execute verification on the rayon threadpool.
let (data, err) = concurrency::rayon::run(move || this.verify(data)).await;
let (data, err) = this.verify(data).await;
// Insert the successfully verified data, even if an error has been encountered.
let inserted =
self.0.update(|inner| data.into_iter().filter_map(|d| inner.try_insert(d)).collect());
Expand Down
Loading

0 comments on commit 824880f

Please sign in to comment.