Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

revamped TIER1 discovery protocol #8085

Merged
merged 7 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1704,7 +1704,7 @@ impl Client {
}
}

/// Collects block approvals. Returns false if block approval is invalid.
/// Collects block approvals.
///
/// We send the approval to doomslug given the epoch of the current tip iff:
/// 1. We are the block producer for the target height in the tip's epoch;
Expand Down Expand Up @@ -2252,30 +2252,32 @@ impl Client {
// require some tuning in the future. In particular, if we decide that connecting to
// block & chunk producers of the next expoch is too expensive, we can postpone it
// till almost the end of this epoch.
let mut accounts = HashMap::new();
let mut account_keys = AccountKeys::new();
for epoch_id in [&tip.epoch_id, &tip.next_epoch_id] {
// We assume here that calls to get_epoch_chunk_producers and get_epoch_block_producers_ordered
// are cheaper than block processing (and that they will work with both this and
// the next epoch). The caching on top of that (in tier1_accounts_cache field) is just
// a defence in depth, based on the previous experience with expensive
// RuntimeAdapter::get_validators_info call.
accounts.extend(
self.runtime_adapter.get_epoch_chunk_producers(epoch_id)?.iter().map(|it| {
((epoch_id.clone(), it.account_id().clone()), it.public_key().clone())
}),
);
accounts.extend(
self.runtime_adapter
.get_epoch_block_producers_ordered(epoch_id, &tip.last_block_hash)?
.iter()
.map(|(it, _)| {
((epoch_id.clone(), it.account_id().clone()), it.public_key().clone())
}),
);
for cp in self.runtime_adapter.get_epoch_chunk_producers(epoch_id)? {
account_keys
.entry(cp.account_id().clone())
.or_default()
.insert(cp.public_key().clone());
}
for (bp, _) in self
.runtime_adapter
.get_epoch_block_producers_ordered(epoch_id, &tip.last_block_hash)?
{
account_keys
.entry(bp.account_id().clone())
.or_default()
.insert(bp.public_key().clone());
}
}
let accounts = Arc::new(accounts);
self.tier1_accounts_cache = Some((tip.epoch_id.clone(), accounts.clone()));
Ok(accounts)
let account_keys = Arc::new(account_keys);
self.tier1_accounts_cache = Some((tip.epoch_id.clone(), account_keys.clone()));
Ok(account_keys)
}

/// send_network_chain_info sends ChainInfo to PeerManagerActor.
Expand Down
82 changes: 41 additions & 41 deletions chain/network/src/accounts_data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//!
//! Strategy:
//! - handling of SyncAccountsData should be throttled by PeerActor/PeerManagerActor.
//! - synchronously select interesting AccountData (i.e. those with never timestamp than any
//! - synchronously select interesting AccountData (i.e. those with newer version than any
//! previously seen for the given (account_id,epoch_id) pair.
//! - asynchronously verify signatures, until an invalid signature is encountered.
//! - if any signature is invalid, drop validation of the remaining signature and ban the peer
Expand All @@ -29,7 +29,7 @@ use crate::concurrency::arc_mutex::ArcMutex;
use crate::network_protocol;
use crate::network_protocol::SignedAccountData;
use crate::types::AccountKeys;
use near_primitives::types::{AccountId, EpochId};
use near_crypto::PublicKey;
use rayon::iter::ParallelBridge;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -49,29 +49,35 @@ pub(crate) enum Error {

#[derive(Clone)]
pub struct CacheSnapshot {
pub keys: Arc<AccountKeys>,
/// Map from account ID to account key.
/// Used only for selecting target when routing a message to a TIER1 peer.
/// TODO(gprusak): In fact, since the account key assigned to a given account ID can change
/// between epochs, Client should rather send messages to node with a specific account key,
/// rather than with a specific account ID.
pub keys_by_id: Arc<AccountKeys>,
/// Set of account keys allowed on TIER1 network.
pub keys: im::HashSet<PublicKey>,
/// Current state of knowledge about an account.
/// key is the public key of the account in the given epoch.
/// It will be used to verify new incoming versions of SignedAccountData
/// for this account.
pub data: im::HashMap<(EpochId, AccountId), Arc<SignedAccountData>>,
/// `data.keys()` is a subset of `keys` at all times,
/// as cache is collecting data only about the accounts from `keys`,
/// and data about the particular account might be not known at the given moment.
pub data: im::HashMap<PublicKey, Arc<SignedAccountData>>,
}

impl CacheSnapshot {
fn is_new(&self, d: &SignedAccountData) -> bool {
let id = (d.epoch_id.clone(), d.account_id.clone());
self.keys.contains_key(&id)
&& match self.data.get(&id) {
Some(old) if old.timestamp >= d.timestamp => false,
self.keys.contains(&d.account_key)
&& match self.data.get(&d.account_key) {
Some(old) if old.version >= d.version => false,
_ => true,
}
}

fn try_insert(&mut self, d: Arc<SignedAccountData>) -> Option<Arc<SignedAccountData>> {
if !self.is_new(&d) {
return None;
}
let id = (d.epoch_id.clone(), d.account_id.clone());
self.data.insert(id, d.clone());
self.data.insert(d.account_key.clone(), d.clone());
Some(d)
}
}
Expand All @@ -81,27 +87,25 @@ pub(crate) struct Cache(ArcMutex<CacheSnapshot>);
impl Cache {
pub fn new() -> Self {
Self(ArcMutex::new(CacheSnapshot {
keys: Arc::new(AccountKeys::default()),
keys_by_id: Arc::new(AccountKeys::default()),
keys: im::HashSet::new(),
data: im::HashMap::new(),
}))
}

/// Updates the set of important accounts and their public keys.
/// The AccountData which is no longer important is dropped.
/// Returns true iff the set of accounts actually changed.
pub fn set_keys(&self, keys: Arc<AccountKeys>) -> bool {
pub fn set_keys(&self, keys_by_id: Arc<AccountKeys>) -> bool {
self.0.update(|inner| {
// Skip further processing if the key set didn't change.
// NOTE: if T implements Eq, then Arc<T> short circuits equality for x == x.
if keys == inner.keys {
if keys_by_id == inner.keys_by_id {
return false;
}
for (k, v) in std::mem::take(&mut inner.data) {
if keys.contains_key(&k) {
inner.data.insert(k, v);
}
}
inner.keys = keys;
inner.keys_by_id = keys_by_id;
inner.keys = inner.keys_by_id.values().flatten().cloned().collect();
inner.data.retain(|k, _| inner.keys.contains(k));
true
})
}
Expand All @@ -110,49 +114,45 @@ impl Cache {
/// Returns the verified new data and an optional error.
/// Note that even if error has been returned the partially validated output is returned
/// anyway.
fn verify(
async fn verify(
&self,
data: Vec<Arc<SignedAccountData>>,
) -> (Vec<Arc<SignedAccountData>>, Option<Error>) {
// Filter out non-interesting data, so that we never check signatures for valid non-interesting data.
// Bad peers may force us to check signatures for fake data anyway, but we will ban them after first invalid signature.
// It locks epochs for reading for a short period.
let mut data_and_keys = HashMap::new();
let mut new_data = HashMap::new();
let inner = self.0.load();
for d in data {
// There is a limit on the amount of RAM occupied by per-account datasets.
// Broadcasting larger datasets is considered malicious behavior.
if d.payload().len() > network_protocol::MAX_ACCOUNT_DATA_SIZE_BYTES {
return (vec![], Some(Error::DataTooLarge));
}
let id = (d.epoch_id.clone(), d.account_id.clone());
// We want the communication needed for broadcasting per-account data to be minimal.
// Therefore broadcasting multiple datasets per account is considered malicious
// behavior, since all but one are obviously outdated.
if data_and_keys.contains_key(&id) {
if new_data.contains_key(&d.account_key) {
return (vec![], Some(Error::SingleAccountMultipleData));
}
// It is fine to broadcast data we already know about.
if !inner.is_new(&d) {
continue;
}
// It is fine to broadcast account data that we don't care about.
let key = match inner.keys.get(&id) {
Some(key) => key.clone(),
None => continue,
};
data_and_keys.insert(id, (d, key));
if inner.is_new(&d) {
new_data.insert(d.account_key.clone(), d);
}
}

// Verify the signatures in parallel.
// Verification will stop at the first encountered error.
let (data, ok) =
concurrency::rayon::try_map(data_and_keys.into_values().par_bridge(), |(d, key)| {
if d.payload().verify(&key).is_ok() {
return Some(d);
let (data, ok) = concurrency::rayon::run(move || {
concurrency::rayon::try_map(new_data.into_values().par_bridge(), |d| {
match d.payload().verify(&d.account_key) {
Ok(()) => Some(d),
Err(()) => None,
}
return None;
});
})
})
.await;
if !ok {
return (data, Some(Error::InvalidSignature));
}
Expand All @@ -168,7 +168,7 @@ impl Cache {
) -> (Vec<Arc<SignedAccountData>>, Option<Error>) {
let this = self.clone();
// Execute verification on the rayon threadpool.
let (data, err) = concurrency::rayon::run(move || this.verify(data)).await;
let (data, err) = this.verify(data).await;
// Insert the successfully verified data, even if an error has been encountered.
let inserted =
self.0.update(|inner| data.into_iter().filter_map(|d| inner.try_insert(d)).collect());
Expand Down
Loading