Skip to content

Commit

Permalink
Remove unneeded enum value and simplify target sampling.
Browse files Browse the repository at this point in the history
  • Loading branch information
nikurt committed Apr 11, 2023
1 parent d4bf600 commit 7493c6f
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 18 deletions.
34 changes: 18 additions & 16 deletions chain/client/src/sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ fn make_account_or_peer_id_or_hash(
From::AccountId(a) => To::AccountId(a),
From::PeerId(p) => To::PeerId(p),
From::Hash(h) => To::Hash(h),
From::ExternalStorage => To::ExternalStorage,
}
}

Expand Down Expand Up @@ -152,10 +151,12 @@ impl StateSync {
num_s3_requests_per_shard: u64,
) -> Self {
let inner = if state_sync_from_s3_enabled {
// `unwrap()` here is fine, because the config validation has already
// ensured those fields are present.
let mut bucket = s3::Bucket::new(
s3_bucket,
s3_region.parse::<s3::Region>().unwrap(),
s3::creds::Credentials::default().unwrap(),
s3::creds::Credentials::anonymous().unwrap(),
)
.unwrap();
// Ensure requests finish in finite amount of time.
Expand Down Expand Up @@ -460,7 +461,7 @@ impl StateSync {

let block_producers =
runtime_adapter.get_epoch_block_producers_ordered(&epoch_hash, &sync_hash)?;
let mut peers = block_producers
let peers = block_producers
.iter()
.filter_map(|(validator_stake, _slashed)| {
let account_id = validator_stake.account_id();
Expand Down Expand Up @@ -491,13 +492,13 @@ impl StateSync {
None
}
}));
Ok(self.select_peers(&mut peers, shard_id)?)
Ok(self.select_peers(peers.collect(), shard_id)?)
}

/// Avoids peers that already have outstanding requests for parts.
fn select_peers(
&mut self,
peers: &mut dyn Iterator<Item = AccountOrPeerIdOrHash>,
peers: Vec<AccountOrPeerIdOrHash>,
shard_id: ShardId,
) -> Result<Vec<AccountOrPeerIdOrHash>, near_chain::Error> {
let res = match &mut self.inner {
Expand All @@ -507,13 +508,14 @@ impl StateSync {
} => {
last_part_id_requested.retain(|_, request| !request.expired());
peers
.into_iter()
.filter(|candidate| {
// If we still have a pending request from this node - don't add another one.
!last_part_id_requested.contains_key(&(candidate.clone(), shard_id))
})
.collect::<Vec<_>>()
}
StateSyncInner::PartsFromExternal { .. } => peers.collect::<Vec<_>>(),
StateSyncInner::PartsFromExternal { .. } => peers,
};
Ok(res)
}
Expand Down Expand Up @@ -616,16 +618,17 @@ impl StateSync {
StateSyncInner::Peers { last_part_id_requested, requested_target } => {
// We'll select all the 'highest' peers + validators as candidates (excluding those that gave us timeout in the past).
// And for each one of them, we'll ask for up to 16 (MAX_STATE_PART_REQUEST) parts.
let mut possible_targets_sampler =
let possible_targets_sampler =
SamplerLimited::new(possible_targets, MAX_STATE_PART_REQUEST);

for (part_id, download) in parts_to_fetch(new_shard_sync_download) {
// For every part that needs to be requested it is selected one
// peer (target) randomly to request the part from.
// IMPORTANT: here we use 'zip' with possible_target_sampler -
// which is limited. So at any moment we'll not request more
// than possible_targets.len() * MAX_STATE_PART_REQUEST parts.
let target = possible_targets_sampler.next().unwrap();
// For every part that needs to be requested it is selected one
// peer (target) randomly to request the part from.
// IMPORTANT: here we use 'zip' with possible_target_sampler -
// which is limited. So at any moment we'll not request more
// than possible_targets.len() * MAX_STATE_PART_REQUEST parts.
for ((part_id, download), target) in
parts_to_fetch(new_shard_sync_download).zip(possible_targets_sampler)
{
sent_request_part(
target.clone(),
part_id,
Expand Down Expand Up @@ -1104,8 +1107,7 @@ fn request_part_from_external_storage(
}
}
download.state_requests_count += 1;
download.last_target =
Some(make_account_or_peer_id_or_hash(AccountOrPeerIdOrHash::ExternalStorage));
download.last_target = None;

let location = s3_location(chain_id, epoch_height, shard_id, part_id, num_parts);
let download_response = download.response.clone();
Expand Down
1 change: 0 additions & 1 deletion chain/network/src/network_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,6 @@ pub enum AccountOrPeerIdOrHash {
AccountId(AccountId),
PeerId(PeerId),
Hash(CryptoHash),
ExternalStorage,
}

pub(crate) struct RawRoutedMessage {
Expand Down
1 change: 0 additions & 1 deletion chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,6 @@ impl PeerManagerActor {
}
AccountOrPeerIdOrHash::PeerId(it) => PeerIdOrHash::PeerId(it.clone()),
AccountOrPeerIdOrHash::Hash(it) => PeerIdOrHash::Hash(*it),
AccountOrPeerIdOrHash::ExternalStorage => unreachable!(),
};

self.state.send_message_to_peer(
Expand Down

0 comments on commit 7493c6f

Please sign in to comment.