diff --git a/Cargo.lock b/Cargo.lock index 36e94ebae4ee..00cb9ae91cc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -16126,6 +16126,7 @@ dependencies = [ "log", "mockall", "parity-scale-codec", + "parking_lot 0.12.1", "prost 0.12.3", "prost-build", "quickcheck", diff --git a/prdoc/pr_2814.prdoc b/prdoc/pr_2814.prdoc new file mode 100644 index 000000000000..b1a90d35d78a --- /dev/null +++ b/prdoc/pr_2814.prdoc @@ -0,0 +1,15 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Share peers between syncing strategies + +doc: + - audience: Node Dev + description: | + New struct `PeerPool` is added to `SyncingStrategy` to allow running syncing strategies + in parellel and sharing peers. Available peers (with no active requests) are retrieved by + calling `PeerPool::available_peers()`, and a peer is reserved for a request by a syncing + strategy using `AvailablePeer::reserve()`. + +crates: + - name: sc-network-sync diff --git a/substrate/client/network/sync/Cargo.toml b/substrate/client/network/sync/Cargo.toml index f81b4ee77bdf..323bccf29a9b 100644 --- a/substrate/client/network/sync/Cargo.toml +++ b/substrate/client/network/sync/Cargo.toml @@ -28,6 +28,7 @@ futures-timer = "3.0.2" libp2p = "0.51.4" log = "0.4.17" mockall = "0.11.3" +parking_lot = "0.12.1" prost = "0.12" schnellru = "0.2.1" smallvec = "1.11.0" diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 7486c091ebf1..e4d8bdcc2b3b 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -214,9 +214,6 @@ pub struct SyncingEngine { /// Syncing strategy. strategy: SyncingStrategy, - /// Syncing configuration for startegies. - syncing_config: SyncingConfig, - /// Blockchain client. client: Arc, @@ -441,8 +438,7 @@ where .map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse()); // Initialize syncing strategy. - let strategy = - SyncingStrategy::new(syncing_config.clone(), client.clone(), warp_sync_config)?; + let strategy = SyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?; let block_announce_protocol_name = block_announce_config.protocol_name().clone(); let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000); @@ -471,7 +467,6 @@ where roles, client, strategy, - syncing_config, network_service, peers: HashMap::new(), block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)), @@ -661,8 +656,15 @@ where Some(event) => self.process_notification_event(event), None => return, }, - warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused => - self.pass_warp_sync_target_block_header(warp_target_block_header), + warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused => { + if let Err(_) = self.pass_warp_sync_target_block_header(warp_target_block_header) { + error!( + target: LOG_TARGET, + "Failed to set warp sync target block header, terminating `SyncingEngine`.", + ); + return + } + }, response_event = self.pending_responses.select_next_some() => self.process_response_event(response_event), validation_result = self.block_announce_validator.select_next_some() => @@ -675,14 +677,17 @@ where // Process actions requested by a syncing strategy. if let Err(e) = self.process_strategy_actions() { - error!("Terminating `SyncingEngine` due to fatal error: {e:?}"); + error!( + target: LOG_TARGET, + "Terminating `SyncingEngine` due to fatal error: {e:?}.", + ); return } } } fn process_strategy_actions(&mut self) -> Result<(), ClientError> { - for action in self.strategy.actions() { + for action in self.strategy.actions()? { match action { SyncingAction::SendBlockRequest { peer_id, request } => { // Sending block request implies dropping obsolete pending response as we are @@ -699,7 +704,7 @@ where removed, ) }, - SyncingAction::CancelBlockRequest { peer_id } => { + SyncingAction::CancelRequest { peer_id } => { let removed = self.pending_responses.remove(&peer_id); trace!( @@ -753,20 +758,8 @@ where number, ) }, - SyncingAction::Finished => { - let connected_peers = self.peers.iter().filter_map(|(peer_id, peer)| { - peer.info.roles.is_full().then_some(( - *peer_id, - peer.info.best_hash, - peer.info.best_number, - )) - }); - self.strategy.switch_to_next( - self.syncing_config.clone(), - self.client.clone(), - connected_peers, - )?; - }, + // Nothing to do, this is handled internally by `SyncingStrategy`. + SyncingAction::Finished => {}, } } @@ -948,23 +941,18 @@ where } } - fn pass_warp_sync_target_block_header(&mut self, header: Result) { + fn pass_warp_sync_target_block_header( + &mut self, + header: Result, + ) -> Result<(), ()> { match header { - Ok(header) => - if let SyncingStrategy::WarpSyncStrategy(warp_sync) = &mut self.strategy { - warp_sync.set_target_block(header); - } else { - error!( - target: LOG_TARGET, - "Cannot set warp sync target block: no warp sync strategy is active." - ); - debug_assert!(false); - }, + Ok(header) => self.strategy.set_warp_sync_target_block_header(header), Err(err) => { error!( target: LOG_TARGET, "Failed to get target block for warp sync. Error: {err:?}", ); + Err(()) }, } } diff --git a/substrate/client/network/sync/src/extra_requests.rs b/substrate/client/network/sync/src/justification_requests.rs similarity index 89% rename from substrate/client/network/sync/src/extra_requests.rs rename to substrate/client/network/sync/src/justification_requests.rs index cd3008d270b1..3fbcb40ed5d5 100644 --- a/substrate/client/network/sync/src/extra_requests.rs +++ b/substrate/client/network/sync/src/justification_requests.rs @@ -16,14 +16,16 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +//! Justification requests scheduling. [`ExtraRequests`] manages requesting justifications +//! from peers taking into account forks and their finalization (dropping pending requests +//! that don't make sense after one of the forks is finalized). + use crate::{ - request_metrics::Metrics, - strategy::chain_sync::{PeerSync, PeerSyncState}, - LOG_TARGET, + peer_pool::PeerPool, request_metrics::Metrics, strategy::chain_sync::PeerSync, LOG_TARGET, }; use fork_tree::ForkTree; use libp2p::PeerId; -use log::{debug, trace, warn}; +use log::{debug, error, trace, warn}; use sp_blockchain::Error as ClientError; use sp_runtime::traits::{Block as BlockT, NumberFor, Zero}; use std::{ @@ -288,6 +290,7 @@ impl<'a, B: BlockT> Matcher<'a, B> { pub(crate) fn next( &mut self, peers: &HashMap>, + peer_pool: &PeerPool, ) -> Option<(PeerId, ExtraRequest)> { if self.remaining == 0 { return None @@ -299,36 +302,41 @@ impl<'a, B: BlockT> Matcher<'a, B> { } while let Some(request) = self.extras.pending_requests.pop_front() { - for (peer, sync) in - peers.iter().filter(|(_, sync)| sync.state == PeerSyncState::Available) - { + for mut available_peer in peer_pool.lock().available_peers() { + let peer_id = available_peer.peer_id(); + let Some(sync) = peers.get(peer_id) else { + error!( + target: LOG_TARGET, + "Peer {peer_id} is available, but not known to `ChainSync`", + ); + debug_assert!(false); + continue + }; // only ask peers that have synced at least up to the block number that we're asking // the extra for if sync.best_number < request.1 { continue } - // don't request to any peers that already have pending requests - if self.extras.active_requests.contains_key(peer) { - continue - } // only ask if the same request has not failed for this peer before if self .extras .failed_requests .get(&request) - .map(|rr| rr.iter().any(|i| &i.0 == peer)) + .map(|rr| rr.iter().any(|i| &i.0 == peer_id)) .unwrap_or(false) { continue } - self.extras.active_requests.insert(*peer, request); + + available_peer.reserve(); + self.extras.active_requests.insert(*peer_id, request); trace!(target: LOG_TARGET, "Sending {} request to {:?} for {:?}", - self.extras.request_type_name, peer, request, + self.extras.request_type_name, peer_id, request, ); - return Some((*peer, request)) + return Some((*peer_id, request)) } self.extras.pending_requests.push_back(request); @@ -346,7 +354,10 @@ impl<'a, B: BlockT> Matcher<'a, B> { #[cfg(test)] mod tests { use super::*; - use crate::strategy::chain_sync::PeerSync; + use crate::{ + peer_pool::PeerPool, + strategy::chain_sync::{PeerSync, PeerSyncState}, + }; use quickcheck::{Arbitrary, Gen, QuickCheck}; use sp_blockchain::Error as ClientError; use sp_test_primitives::{Block, BlockNumber, Hash}; @@ -354,11 +365,11 @@ mod tests { #[test] fn requests_are_processed_in_order() { - fn property(mut peers: ArbitraryPeers) { + fn property(mut ap: ArbitraryPeers) { let mut requests = ExtraRequests::::new("test"); let num_peers_available = - peers.0.values().filter(|s| s.state == PeerSyncState::Available).count(); + ap.peers.values().filter(|s| s.state == PeerSyncState::Available).count(); for i in 0..num_peers_available { requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0])) @@ -368,9 +379,9 @@ mod tests { let mut m = requests.matcher(); for p in &pending { - let (peer, r) = m.next(&peers.0).unwrap(); + let (peer, r) = m.next(&ap.peers, &ap.peer_pool).unwrap(); assert_eq!(p, &r); - peers.0.get_mut(&peer).unwrap().state = + ap.peers.get_mut(&peer).unwrap().state = PeerSyncState::DownloadingJustification(r.0); } } @@ -397,19 +408,19 @@ mod tests { #[test] fn disconnecting_implies_rescheduling() { - fn property(mut peers: ArbitraryPeers) -> bool { + fn property(mut ap: ArbitraryPeers) -> bool { let mut requests = ExtraRequests::::new("test"); let num_peers_available = - peers.0.values().filter(|s| s.state == PeerSyncState::Available).count(); + ap.peers.values().filter(|s| s.state == PeerSyncState::Available).count(); for i in 0..num_peers_available { requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0])) } let mut m = requests.matcher(); - while let Some((peer, r)) = m.next(&peers.0) { - peers.0.get_mut(&peer).unwrap().state = + while let Some((peer, r)) = m.next(&ap.peers, &ap.peer_pool) { + ap.peers.get_mut(&peer).unwrap().state = PeerSyncState::DownloadingJustification(r.0); } @@ -433,19 +444,19 @@ mod tests { #[test] fn no_response_reschedules() { - fn property(mut peers: ArbitraryPeers) { + fn property(mut ap: ArbitraryPeers) { let mut requests = ExtraRequests::::new("test"); let num_peers_available = - peers.0.values().filter(|s| s.state == PeerSyncState::Available).count(); + ap.peers.values().filter(|s| s.state == PeerSyncState::Available).count(); for i in 0..num_peers_available { requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0])) } let mut m = requests.matcher(); - while let Some((peer, r)) = m.next(&peers.0) { - peers.0.get_mut(&peer).unwrap().state = + while let Some((peer, r)) = m.next(&ap.peers, &ap.peer_pool) { + ap.peers.get_mut(&peer).unwrap().state = PeerSyncState::DownloadingJustification(r.0); } @@ -570,16 +581,22 @@ mod tests { } #[derive(Debug, Clone)] - struct ArbitraryPeers(HashMap>); + struct ArbitraryPeers { + peers: HashMap>, + peer_pool: PeerPool, + } impl Arbitrary for ArbitraryPeers { fn arbitrary(g: &mut Gen) -> Self { let mut peers = HashMap::with_capacity(g.size()); + let peer_pool = PeerPool::default(); for _ in 0..g.size() { let ps = ArbitraryPeerSync::arbitrary(g).0; - peers.insert(ps.peer_id, ps); + let peer_id = ps.peer_id; + peers.insert(peer_id, ps); + peer_pool.add_peer(peer_id); } - ArbitraryPeers(peers) + ArbitraryPeers { peers, peer_pool } } } } diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index 494e3b87aa95..b51da98e7533 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -23,8 +23,9 @@ pub use strategy::warp::{WarpSyncParams, WarpSyncPhase, WarpSyncProgress}; pub use types::{SyncEvent, SyncEventStream, SyncState, SyncStatus, SyncStatusProvider}; mod block_announce_validator; -mod extra_requests; mod futures_stream; +mod justification_requests; +mod peer_pool; mod pending_responses; mod request_metrics; mod schema; diff --git a/substrate/client/network/sync/src/peer_pool.rs b/substrate/client/network/sync/src/peer_pool.rs new file mode 100644 index 000000000000..41db903c4b63 --- /dev/null +++ b/substrate/client/network/sync/src/peer_pool.rs @@ -0,0 +1,225 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! [`PeerPool`] manages the peers available for requests by syncing strategies. + +use crate::LOG_TARGET; +use libp2p::PeerId; +use log::warn; +use parking_lot::{Mutex, MutexGuard}; +use std::{collections::HashMap, sync::Arc}; + +#[derive(Debug)] +enum PeerStatus { + Available, + Reserved, +} + +impl PeerStatus { + fn is_available(&self) -> bool { + matches!(self, PeerStatus::Available) + } +} + +#[derive(Default, Debug, Clone)] +pub struct PeerPool { + inner: Arc>, +} + +#[derive(Default, Debug)] +pub struct PeerPoolInner { + peers: HashMap, +} + +pub struct AvailablePeer<'a> { + peer_id: &'a PeerId, + status: &'a mut PeerStatus, +} + +impl<'a> AvailablePeer<'a> { + pub fn peer_id(&self) -> &'a PeerId { + self.peer_id + } + + pub fn reserve(&mut self) { + *self.status = PeerStatus::Reserved; + } +} + +impl PeerPoolInner { + pub fn available_peers<'a>(&'a mut self) -> impl Iterator + 'a { + self.peers.iter_mut().filter_map(|(peer_id, status)| { + status.is_available().then_some(AvailablePeer::<'a> { peer_id, status }) + }) + } +} + +impl PeerPool { + pub fn lock<'a>(&'a self) -> MutexGuard<'a, PeerPoolInner> { + self.inner.lock() + } + + pub fn add_peer(&self, peer_id: PeerId) { + self.inner.lock().peers.insert(peer_id, PeerStatus::Available); + } + + pub fn remove_peer(&self, peer_id: &PeerId) { + self.inner.lock().peers.remove(peer_id); + } + + pub fn try_reserve_peer(&self, peer_id: &PeerId) -> bool { + match self.inner.lock().peers.get_mut(peer_id) { + Some(peer_status) => match peer_status { + PeerStatus::Available => { + *peer_status = PeerStatus::Reserved; + true + }, + PeerStatus::Reserved => false, + }, + None => { + warn!(target: LOG_TARGET, "Trying to reserve unknown peer {peer_id}."); + false + }, + } + } + + pub fn free_peer(&self, peer_id: &PeerId) { + match self.inner.lock().peers.get_mut(peer_id) { + Some(peer_status) => match peer_status { + PeerStatus::Available => { + warn!(target: LOG_TARGET, "Trying to free available peer {peer_id}.") + }, + PeerStatus::Reserved => { + *peer_status = PeerStatus::Available; + }, + }, + None => { + warn!(target: LOG_TARGET, "Trying to free unknown peer {peer_id}."); + }, + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn adding_peer() { + let peer_pool = PeerPool::default(); + assert_eq!(peer_pool.lock().available_peers().count(), 0); + + // Add peer. + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + + // Peer is available. + assert_eq!(peer_pool.lock().available_peers().count(), 1); + assert!(peer_pool.lock().available_peers().any(|p| *p.peer_id() == peer_id)); + } + + #[test] + fn removing_peer() { + let peer_pool = PeerPool::default(); + assert_eq!(peer_pool.lock().available_peers().count(), 0); + + // Add peer. + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + assert_eq!(peer_pool.lock().available_peers().count(), 1); + assert!(peer_pool.lock().available_peers().any(|p| *p.peer_id() == peer_id)); + + // Remove peer. + peer_pool.remove_peer(&peer_id); + assert_eq!(peer_pool.lock().available_peers().count(), 0); + } + + #[test] + fn reserving_peer_via_available_peers() { + let peer_pool = PeerPool::default(); + assert_eq!(peer_pool.lock().available_peers().count(), 0); + + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + assert_eq!(peer_pool.lock().available_peers().count(), 1); + assert!(peer_pool.lock().available_peers().any(|p| *p.peer_id() == peer_id)); + + // Reserve via `available_peers()`. + peer_pool.lock().available_peers().for_each(|mut available_peer| { + assert_eq!(*available_peer.peer_id(), peer_id); + available_peer.reserve(); + }); + + // Peer is reserved. + assert_eq!(peer_pool.lock().available_peers().count(), 0); + assert!(!peer_pool.try_reserve_peer(&peer_id)); + } + + #[test] + fn reserving_peer_via_try_reserve() { + let peer_pool = PeerPool::default(); + assert_eq!(peer_pool.lock().available_peers().count(), 0); + + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + assert_eq!(peer_pool.lock().available_peers().count(), 1); + assert!(peer_pool.lock().available_peers().any(|p| *p.peer_id() == peer_id)); + + // Reserve via `try_reserve_peer()`. + assert!(peer_pool.try_reserve_peer(&peer_id)); + + // Peer is reserved. + assert_eq!(peer_pool.lock().available_peers().count(), 0); + assert!(!peer_pool.try_reserve_peer(&peer_id)); + } + + #[test] + fn freeing_peer() { + let peer_pool = PeerPool::default(); + assert_eq!(peer_pool.lock().available_peers().count(), 0); + + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + assert_eq!(peer_pool.lock().available_peers().count(), 1); + assert!(peer_pool.lock().available_peers().any(|p| *p.peer_id() == peer_id)); + + // Reserve via `try_reserve_peer()`. + assert!(peer_pool.try_reserve_peer(&peer_id)); + assert_eq!(peer_pool.lock().available_peers().count(), 0); + assert!(!peer_pool.try_reserve_peer(&peer_id)); + + // Free peer. + peer_pool.free_peer(&peer_id); + + // Peer is available. + assert_eq!(peer_pool.lock().available_peers().count(), 1); + assert!(peer_pool.lock().available_peers().any(|p| *p.peer_id() == peer_id)); + + // And can be reserved again. + assert!(peer_pool.try_reserve_peer(&peer_id)); + } + + #[test] + fn reserving_unknown_peer_fails() { + let peer_pool = PeerPool::default(); + assert_eq!(peer_pool.lock().available_peers().count(), 0); + + let peer_id = PeerId::random(); + assert!(!peer_pool.try_reserve_peer(&peer_id)); + } +} diff --git a/substrate/client/network/sync/src/strategy.rs b/substrate/client/network/sync/src/strategy.rs index dbfb4188ec3f..b5c95f8e34c3 100644 --- a/substrate/client/network/sync/src/strategy.rs +++ b/substrate/client/network/sync/src/strategy.rs @@ -25,6 +25,7 @@ pub mod state_sync; pub mod warp; use crate::{ + peer_pool::PeerPool, types::{BadPeer, OpaqueStateRequest, OpaqueStateResponse, SyncStatus}, LOG_TARGET, }; @@ -41,11 +42,11 @@ use sc_network_common::sync::{ use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata}; use sp_consensus::BlockOrigin; use sp_runtime::{ - traits::{Block as BlockT, NumberFor}, + traits::{Block as BlockT, Header, NumberFor}, Justifications, }; use state::{StateStrategy, StateStrategyAction}; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use warp::{EncodedProof, WarpProofRequest, WarpSync, WarpSyncAction, WarpSyncConfig}; /// Corresponding `ChainSync` mode. @@ -75,12 +76,12 @@ pub struct SyncingConfig { pub enum SyncingAction { /// Send block request to peer. Always implies dropping a stale block request to the same peer. SendBlockRequest { peer_id: PeerId, request: BlockRequest }, - /// Drop stale block request. - CancelBlockRequest { peer_id: PeerId }, /// Send state request to peer. SendStateRequest { peer_id: PeerId, request: OpaqueStateRequest }, /// Send warp proof request to peer. SendWarpProofRequest { peer_id: PeerId, request: WarpProofRequest }, + /// Drop stale request. + CancelRequest { peer_id: PeerId }, /// Peer misbehaved. Disconnect, report it and cancel any requests to it. DropPeer(BadPeer), /// Import blocks. @@ -92,15 +93,68 @@ pub enum SyncingAction { number: NumberFor, justifications: Justifications, }, - /// Syncing strategy has finished. + /// Strategy finished. Nothing to do, this is handled by `SyncingStrategy`. Finished, } +impl SyncingAction { + fn is_finished(&self) -> bool { + matches!(self, SyncingAction::Finished) + } +} + +impl From> for SyncingAction { + fn from(action: WarpSyncAction) -> Self { + match action { + WarpSyncAction::SendWarpProofRequest { peer_id, request } => + SyncingAction::SendWarpProofRequest { peer_id, request }, + WarpSyncAction::SendBlockRequest { peer_id, request } => + SyncingAction::SendBlockRequest { peer_id, request }, + WarpSyncAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer), + WarpSyncAction::Finished => SyncingAction::Finished, + } + } +} + +impl From> for SyncingAction { + fn from(action: StateStrategyAction) -> Self { + match action { + StateStrategyAction::SendStateRequest { peer_id, request } => + SyncingAction::SendStateRequest { peer_id, request }, + StateStrategyAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer), + StateStrategyAction::ImportBlocks { origin, blocks } => + SyncingAction::ImportBlocks { origin, blocks }, + StateStrategyAction::Finished => SyncingAction::Finished, + } + } +} + +impl From> for SyncingAction { + fn from(action: ChainSyncAction) -> Self { + match action { + ChainSyncAction::SendBlockRequest { peer_id, request } => + SyncingAction::SendBlockRequest { peer_id, request }, + ChainSyncAction::SendStateRequest { peer_id, request } => + SyncingAction::SendStateRequest { peer_id, request }, + ChainSyncAction::CancelRequest { peer_id } => SyncingAction::CancelRequest { peer_id }, + ChainSyncAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer), + ChainSyncAction::ImportBlocks { origin, blocks } => + SyncingAction::ImportBlocks { origin, blocks }, + ChainSyncAction::ImportJustifications { peer_id, hash, number, justifications } => + SyncingAction::ImportJustifications { peer_id, hash, number, justifications }, + } + } +} + /// Proxy to specific syncing strategies. -pub enum SyncingStrategy { - WarpSyncStrategy(WarpSync), - StateSyncStrategy(StateStrategy), - ChainSyncStrategy(ChainSync), +pub struct SyncingStrategy { + config: SyncingConfig, + client: Arc, + warp: Option>, + state: Option>, + chain_sync: Option>, + peer_pool: PeerPool, + peer_best_blocks: HashMap)>, } impl SyncingStrategy @@ -114,7 +168,7 @@ where + Sync + 'static, { - /// Initialize a new syncing startegy. + /// Initialize a new syncing strategy. pub fn new( config: SyncingConfig, client: Arc, @@ -123,37 +177,58 @@ where if let SyncMode::Warp = config.mode { let warp_sync_config = warp_sync_config .expect("Warp sync configuration must be supplied in warp sync mode."); - Ok(Self::WarpSyncStrategy(WarpSync::new(client.clone(), warp_sync_config))) + let peer_pool = PeerPool::default(); + let warp_sync = WarpSync::new(client.clone(), warp_sync_config, peer_pool.clone()); + Ok(Self { + config, + client, + warp: Some(warp_sync), + state: None, + chain_sync: None, + peer_pool, + peer_best_blocks: Default::default(), + }) } else { - Ok(Self::ChainSyncStrategy(ChainSync::new( + let peer_pool = PeerPool::default(); + let chain_sync = ChainSync::new( chain_sync_mode(config.mode), client.clone(), config.max_parallel_downloads, config.max_blocks_per_request, - config.metrics_registry, - )?)) + config.metrics_registry.clone(), + peer_pool.clone(), + std::iter::empty(), + )?; + Ok(Self { + config, + client, + warp: None, + state: None, + chain_sync: Some(chain_sync), + peer_pool, + peer_best_blocks: Default::default(), + }) } } /// Notify that a new peer has connected. pub fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { - match self { - SyncingStrategy::WarpSyncStrategy(strategy) => - strategy.add_peer(peer_id, best_hash, best_number), - SyncingStrategy::StateSyncStrategy(strategy) => - strategy.add_peer(peer_id, best_hash, best_number), - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.add_peer(peer_id, best_hash, best_number), - } + self.peer_pool.add_peer(peer_id); + self.peer_best_blocks.insert(peer_id, (best_hash, best_number)); + + self.warp.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number)); + self.state.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number)); + self.chain_sync.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number)); } /// Notify that a peer has disconnected. pub fn remove_peer(&mut self, peer_id: &PeerId) { - match self { - SyncingStrategy::WarpSyncStrategy(strategy) => strategy.remove_peer(peer_id), - SyncingStrategy::StateSyncStrategy(strategy) => strategy.remove_peer(peer_id), - SyncingStrategy::ChainSyncStrategy(strategy) => strategy.remove_peer(peer_id), - } + self.warp.as_mut().map(|s| s.remove_peer(peer_id)); + self.state.as_mut().map(|s| s.remove_peer(peer_id)); + self.chain_sync.as_mut().map(|s| s.remove_peer(peer_id)); + + self.peer_pool.remove_peer(peer_id); + self.peer_best_blocks.remove(peer_id); } /// Submit a validated block announcement. @@ -165,14 +240,23 @@ where peer_id: PeerId, announce: &BlockAnnounce, ) -> Option<(B::Hash, NumberFor)> { - match self { - SyncingStrategy::WarpSyncStrategy(strategy) => - strategy.on_validated_block_announce(is_best, peer_id, announce), - SyncingStrategy::StateSyncStrategy(strategy) => - strategy.on_validated_block_announce(is_best, peer_id, announce), - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.on_validated_block_announce(is_best, peer_id, announce), + let new_best = if let Some(ref mut warp) = self.warp { + warp.on_validated_block_announce(is_best, peer_id, announce) + } else if let Some(ref mut state) = self.state { + state.on_validated_block_announce(is_best, peer_id, announce) + } else if let Some(ref mut chain_sync) = self.chain_sync { + chain_sync.on_validated_block_announce(is_best, peer_id, announce) + } else { + Some((announce.header.hash(), *announce.header.number())) + }; + + if let Some(new_best) = new_best { + if let Some(best) = self.peer_best_blocks.get_mut(&peer_id) { + *best = new_best; + } } + + new_best } /// Configure an explicit fork sync request in case external code has detected that there is a @@ -183,40 +267,33 @@ where hash: &B::Hash, number: NumberFor, ) { - match self { - SyncingStrategy::WarpSyncStrategy(_) => {}, - SyncingStrategy::StateSyncStrategy(_) => {}, - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.set_sync_fork_request(peers, hash, number), + // Fork requests are only handled by `ChainSync`. + if let Some(ref mut chain_sync) = self.chain_sync { + chain_sync.set_sync_fork_request(peers.clone(), hash, number); } } /// Request extra justification. pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { - match self { - SyncingStrategy::WarpSyncStrategy(_) => {}, - SyncingStrategy::StateSyncStrategy(_) => {}, - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.request_justification(hash, number), + // Justifications can only be requested via `ChainSync`. + if let Some(ref mut chain_sync) = self.chain_sync { + chain_sync.request_justification(hash, number); } } /// Clear extra justification requests. pub fn clear_justification_requests(&mut self) { - match self { - SyncingStrategy::WarpSyncStrategy(_) => {}, - SyncingStrategy::StateSyncStrategy(_) => {}, - SyncingStrategy::ChainSyncStrategy(strategy) => strategy.clear_justification_requests(), + // Justification requests can only be cleared by `ChainSync`. + if let Some(ref mut chain_sync) = self.chain_sync { + chain_sync.clear_justification_requests(); } } /// Report a justification import (successful or not). pub fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool) { - match self { - SyncingStrategy::WarpSyncStrategy(_) => {}, - SyncingStrategy::StateSyncStrategy(_) => {}, - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.on_justification_import(hash, number, success), + // Only `ChainSync` is interested in justification import. + if let Some(ref mut chain_sync) = self.chain_sync { + chain_sync.on_justification_import(hash, number, success); } } @@ -227,33 +304,29 @@ where request: BlockRequest, blocks: Vec>, ) { - match self { - SyncingStrategy::WarpSyncStrategy(strategy) => - strategy.on_block_response(peer_id, request, blocks), - SyncingStrategy::StateSyncStrategy(_) => {}, - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.on_block_response(peer_id, request, blocks), + // Only `WarpSync` and `ChainSync` handle block responses. + if let Some(ref mut warp) = self.warp { + warp.on_block_response(peer_id, request, blocks); + } else if let Some(ref mut chain_sync) = self.chain_sync { + chain_sync.on_block_response(peer_id, request, blocks); } } /// Process state response. pub fn on_state_response(&mut self, peer_id: PeerId, response: OpaqueStateResponse) { - match self { - SyncingStrategy::WarpSyncStrategy(_) => {}, - SyncingStrategy::StateSyncStrategy(strategy) => - strategy.on_state_response(peer_id, response), - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.on_state_response(peer_id, response), + // Only `StateStrategy` and `ChainSync` handle state responses. + if let Some(ref mut state) = self.state { + state.on_state_response(peer_id, response); + } else if let Some(ref mut chain_sync) = self.chain_sync { + chain_sync.on_state_response(peer_id, response); } } /// Process warp proof response. pub fn on_warp_proof_response(&mut self, peer_id: &PeerId, response: EncodedProof) { - match self { - SyncingStrategy::WarpSyncStrategy(strategy) => - strategy.on_warp_proof_response(peer_id, response), - SyncingStrategy::StateSyncStrategy(_) => {}, - SyncingStrategy::ChainSyncStrategy(_) => {}, + // Only `WarpSync` handles warp proof responses. + if let Some(ref mut warp) = self.warp { + warp.on_warp_proof_response(peer_id, response); } } @@ -264,226 +337,209 @@ where count: usize, results: Vec<(Result>, BlockImportError>, B::Hash)>, ) { - match self { - SyncingStrategy::WarpSyncStrategy(_) => {}, - SyncingStrategy::StateSyncStrategy(strategy) => - strategy.on_blocks_processed(imported, count, results), - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.on_blocks_processed(imported, count, results), + // Only `StateStrategy` and `ChainSync` are interested in block processing notifications. + + if let Some(ref mut state) = self.state { + state.on_blocks_processed(imported, count, results); + } else if let Some(ref mut chain_sync) = self.chain_sync { + chain_sync.on_blocks_processed(imported, count, results); } } /// Notify a syncing strategy that a block has been finalized. pub fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor) { - match self { - SyncingStrategy::WarpSyncStrategy(_) => {}, - SyncingStrategy::StateSyncStrategy(_) => {}, - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.on_block_finalized(hash, number), + // Only `ChainSync` is interested in block finalization notifications. + if let Some(ref mut chain_sync) = self.chain_sync { + chain_sync.on_block_finalized(hash, number); } } /// Inform sync about a new best imported block. pub fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor) { - match self { - SyncingStrategy::WarpSyncStrategy(_) => {}, - SyncingStrategy::StateSyncStrategy(_) => {}, - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.update_chain_info(best_hash, best_number), + // This is relevant to `ChainSync` only. + if let Some(ref mut chain_sync) = self.chain_sync { + chain_sync.update_chain_info(best_hash, best_number); } } // Are we in major sync mode? pub fn is_major_syncing(&self) -> bool { - match self { - SyncingStrategy::WarpSyncStrategy(_) => true, - SyncingStrategy::StateSyncStrategy(_) => true, - SyncingStrategy::ChainSyncStrategy(strategy) => - strategy.status().state.is_major_syncing(), - } + self.warp.is_some() || + self.state.is_some() || + match self.chain_sync { + Some(ref s) => s.status().state.is_major_syncing(), + None => unreachable!("At least one syncing strategy is active; qed"), + } } /// Get the number of peers known to the syncing strategy. pub fn num_peers(&self) -> usize { - match self { - SyncingStrategy::WarpSyncStrategy(strategy) => strategy.num_peers(), - SyncingStrategy::StateSyncStrategy(strategy) => strategy.num_peers(), - SyncingStrategy::ChainSyncStrategy(strategy) => strategy.num_peers(), - } + self.peer_best_blocks.len() } /// Returns the current sync status. pub fn status(&self) -> SyncStatus { - match self { - SyncingStrategy::WarpSyncStrategy(strategy) => strategy.status(), - SyncingStrategy::StateSyncStrategy(strategy) => strategy.status(), - SyncingStrategy::ChainSyncStrategy(strategy) => strategy.status(), + // This function presumes that startegies are executed serially and must be refactored + // once we have parallel strategies. + if let Some(ref warp) = self.warp { + warp.status() + } else if let Some(ref state) = self.state { + state.status() + } else if let Some(ref chain_sync) = self.chain_sync { + chain_sync.status() + } else { + unreachable!("At least one syncing strategy is always active; qed") } } /// Get the total number of downloaded blocks. pub fn num_downloaded_blocks(&self) -> usize { - match self { - SyncingStrategy::WarpSyncStrategy(_) => 0, - SyncingStrategy::StateSyncStrategy(_) => 0, - SyncingStrategy::ChainSyncStrategy(strategy) => strategy.num_downloaded_blocks(), - } + self.chain_sync + .as_ref() + .map_or(0, |chain_sync| chain_sync.num_downloaded_blocks()) } /// Get an estimate of the number of parallel sync requests. pub fn num_sync_requests(&self) -> usize { - match self { - SyncingStrategy::WarpSyncStrategy(_) => 0, - SyncingStrategy::StateSyncStrategy(_) => 0, - SyncingStrategy::ChainSyncStrategy(strategy) => strategy.num_sync_requests(), - } + self.chain_sync.as_ref().map_or(0, |chain_sync| chain_sync.num_sync_requests()) } /// Report Prometheus metrics pub fn report_metrics(&self) { - match self { - SyncingStrategy::WarpSyncStrategy(_) => {}, - SyncingStrategy::StateSyncStrategy(_) => {}, - SyncingStrategy::ChainSyncStrategy(strategy) => strategy.report_metrics(), + if let Some(ref chain_sync) = self.chain_sync { + chain_sync.report_metrics(); + } + } + + /// Let `WarpSync` know about target block header + pub fn set_warp_sync_target_block_header( + &mut self, + target_header: B::Header, + ) -> Result<(), ()> { + match self.warp { + Some(ref mut warp) => { + warp.set_target_block(target_header); + Ok(()) + }, + None => { + error!( + target: LOG_TARGET, + "Cannot set warp sync target block: no warp sync strategy is active." + ); + debug_assert!(false); + Err(()) + }, } } /// Get actions that should be performed by the owner on the strategy's behalf #[must_use] - pub fn actions(&mut self) -> Box>> { - match self { - SyncingStrategy::WarpSyncStrategy(strategy) => - Box::new(strategy.actions().map(|action| match action { - WarpSyncAction::SendWarpProofRequest { peer_id, request } => - SyncingAction::SendWarpProofRequest { peer_id, request }, - WarpSyncAction::SendBlockRequest { peer_id, request } => - SyncingAction::SendBlockRequest { peer_id, request }, - WarpSyncAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer), - WarpSyncAction::Finished => SyncingAction::Finished, - })), - SyncingStrategy::StateSyncStrategy(strategy) => - Box::new(strategy.actions().map(|action| match action { - StateStrategyAction::SendStateRequest { peer_id, request } => - SyncingAction::SendStateRequest { peer_id, request }, - StateStrategyAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer), - StateStrategyAction::ImportBlocks { origin, blocks } => - SyncingAction::ImportBlocks { origin, blocks }, - StateStrategyAction::Finished => SyncingAction::Finished, - })), - SyncingStrategy::ChainSyncStrategy(strategy) => - Box::new(strategy.actions().map(|action| match action { - ChainSyncAction::SendBlockRequest { peer_id, request } => - SyncingAction::SendBlockRequest { peer_id, request }, - ChainSyncAction::CancelBlockRequest { peer_id } => - SyncingAction::CancelBlockRequest { peer_id }, - ChainSyncAction::SendStateRequest { peer_id, request } => - SyncingAction::SendStateRequest { peer_id, request }, - ChainSyncAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer), - ChainSyncAction::ImportBlocks { origin, blocks } => - SyncingAction::ImportBlocks { origin, blocks }, - ChainSyncAction::ImportJustifications { - peer_id, - hash, - number, - justifications, - } => SyncingAction::ImportJustifications { - peer_id, - hash, - number, - justifications, - }, - })), + pub fn actions(&mut self) -> Result>, ClientError> { + // This function presumes that strategies are executed serially and must be refactored once + // we have parallel startegies. + let actions: Vec<_> = if let Some(ref mut warp) = self.warp { + warp.actions().map(Into::into).collect() + } else if let Some(ref mut state) = self.state { + state.actions().map(Into::into).collect() + } else if let Some(ref mut chain_sync) = self.chain_sync { + chain_sync.actions().map(Into::into).collect() + } else { + unreachable!("At least one syncing strategy is always active; qed") + }; + + if actions.iter().any(SyncingAction::is_finished) { + self.proceed_to_next()?; } + + Ok(actions) } - /// Switch to next strategy if the active one finished. - pub fn switch_to_next( - &mut self, - config: SyncingConfig, - client: Arc, - connected_peers: impl Iterator)>, - ) -> Result<(), ClientError> { - match self { - Self::WarpSyncStrategy(warp_sync) => { - match warp_sync.take_result() { - Some(res) => { - info!( - target: LOG_TARGET, - "Warp sync is complete, continuing with state sync." - ); - let state_sync = StateStrategy::new( - client, - res.target_header, - res.target_body, - res.target_justifications, - // skip proofs, only set to `true` in `FastUnsafe` sync mode - false, - connected_peers - .map(|(peer_id, _best_hash, best_number)| (peer_id, best_number)), - ); - - *self = Self::StateSyncStrategy(state_sync); - }, - None => { - error!( - target: LOG_TARGET, - "Warp sync failed. Falling back to full sync." - ); - let mut chain_sync = match ChainSync::new( - chain_sync_mode(config.mode), - client, - config.max_parallel_downloads, - config.max_blocks_per_request, - config.metrics_registry, - ) { - Ok(chain_sync) => chain_sync, - Err(e) => { - error!(target: LOG_TARGET, "Failed to start `ChainSync`."); - return Err(e) - }, - }; - // Let `ChainSync` know about connected peers. - connected_peers.into_iter().for_each( - |(peer_id, best_hash, best_number)| { - chain_sync.add_peer(peer_id, best_hash, best_number) - }, - ); - - *self = Self::ChainSyncStrategy(chain_sync); - }, - } - }, - Self::StateSyncStrategy(state_sync) => { - if state_sync.is_succeded() { - info!(target: LOG_TARGET, "State sync is complete, continuing with block sync."); - } else { - error!(target: LOG_TARGET, "State sync failed. Falling back to full sync."); - } - let mut chain_sync = match ChainSync::new( - chain_sync_mode(config.mode), - client, - config.max_parallel_downloads, - config.max_blocks_per_request, - config.metrics_registry, - ) { - Ok(chain_sync) => chain_sync, - Err(e) => { - error!(target: LOG_TARGET, "Failed to start `ChainSync`."); - return Err(e) - }, - }; - // Let `ChainSync` know about connected peers. - connected_peers.into_iter().for_each(|(peer_id, best_hash, best_number)| { - chain_sync.add_peer(peer_id, best_hash, best_number) - }); - - *self = Self::ChainSyncStrategy(chain_sync); - }, - Self::ChainSyncStrategy(_) => { - error!(target: LOG_TARGET, "`ChainSyncStrategy` is final startegy, cannot switch to next."); - debug_assert!(false); - }, + /// Proceed with the next strategy if the active one finished. + pub fn proceed_to_next(&mut self) -> Result<(), ClientError> { + // The strategies are switched as `WarpSync` -> `StateStartegy` -> `ChainSync`. + if let Some(ref mut warp) = self.warp { + match warp.take_result() { + Some(res) => { + info!( + target: LOG_TARGET, + "Warp sync is complete, continuing with state sync.", + ); + let state_sync = StateStrategy::new( + self.client.clone(), + res.target_header, + res.target_body, + res.target_justifications, + false, + self.peer_best_blocks + .iter() + .map(|(peer_id, (_, best_number))| (*peer_id, *best_number)), + self.peer_pool.clone(), + ); + + self.warp = None; + self.state = Some(state_sync); + + Ok(()) + }, + None => { + error!( + target: LOG_TARGET, + "Warp sync failed. Falling back to full sync.", + ); + let chain_sync = match ChainSync::new( + chain_sync_mode(self.config.mode), + self.client.clone(), + self.config.max_parallel_downloads, + self.config.max_blocks_per_request, + self.config.metrics_registry.clone(), + self.peer_pool.clone(), + self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| { + (*peer_id, *best_hash, *best_number) + }), + ) { + Ok(chain_sync) => chain_sync, + Err(e) => { + error!(target: LOG_TARGET, "Failed to start `ChainSync`."); + return Err(e) + }, + }; + + self.warp = None; + self.chain_sync = Some(chain_sync); + + Ok(()) + }, + } + } else if let Some(state) = &self.state { + if state.is_succeded() { + info!(target: LOG_TARGET, "State sync is complete, continuing with block sync."); + } else { + error!(target: LOG_TARGET, "State sync failed. Falling back to full sync."); + } + let chain_sync = match ChainSync::new( + chain_sync_mode(self.config.mode), + self.client.clone(), + self.config.max_parallel_downloads, + self.config.max_blocks_per_request, + self.config.metrics_registry.clone(), + self.peer_pool.clone(), + self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| { + (*peer_id, *best_hash, *best_number) + }), + ) { + Ok(chain_sync) => chain_sync, + Err(e) => { + error!(target: LOG_TARGET, "Failed to start `ChainSync`."); + return Err(e); + }, + }; + + self.state = None; + self.chain_sync = Some(chain_sync); + + Ok(()) + } else { + unreachable!("Only warp & state strategies can finish; qed") } - Ok(()) } } diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index 62c260d582b5..0326f6dd1d92 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -28,9 +28,10 @@ //! the network, or whenever a block has been successfully verified, call the appropriate method in //! order to update it. +use super::PeerPool; use crate::{ blocks::BlockCollection, - extra_requests::ExtraRequests, + justification_requests::ExtraRequests, schema::v1::StateResponse, strategy::{ state_sync::{ImportResult, StateSync, StateSyncProvider}, @@ -212,10 +213,10 @@ struct GapSync { pub enum ChainSyncAction { /// Send block request to peer. Always implies dropping a stale block request to the same peer. SendBlockRequest { peer_id: PeerId, request: BlockRequest }, - /// Drop stale block request. - CancelBlockRequest { peer_id: PeerId }, /// Send state request to peer. SendStateRequest { peer_id: PeerId, request: OpaqueStateRequest }, + /// Drop stale request. + CancelRequest { peer_id: PeerId }, /// Peer misbehaved. Disconnect, report it and cancel the block request to it. DropPeer(BadPeer), /// Import blocks. @@ -284,6 +285,8 @@ pub struct ChainSync { actions: Vec>, /// Prometheus metrics. metrics: Option, + /// Peer pool to reserve peers for requests from. + peer_pool: PeerPool, } /// All the data we have about a Peer that we are trying to sync with @@ -331,6 +334,8 @@ struct ForkTarget { /// defines what we are busy with. #[derive(Copy, Clone, Eq, PartialEq, Debug)] pub(crate) enum PeerSyncState { + /// New peer not yet handled by sync state machine. + New, /// Available for sync requests. Available, /// Searching for ancestors the Peer has in common with us. @@ -373,10 +378,25 @@ where max_parallel_downloads: u32, max_blocks_per_request: u32, metrics_registry: Option, + peer_pool: PeerPool, + initial_peers: impl Iterator)>, ) -> Result { let mut sync = Self { client, - peers: HashMap::new(), + peers: initial_peers + .map(|(peer_id, best_hash, best_number)| { + ( + peer_id, + PeerSync { + peer_id, + common_number: Zero::zero(), + best_hash, + best_number, + state: PeerSyncState::New, + }, + ) + }) + .collect(), blocks: BlockCollection::new(), best_queued_hash: Default::default(), best_queued_number: Zero::zero(), @@ -402,6 +422,7 @@ where None }, }), + peer_pool, }; sync.reset_sync_start_point()?; @@ -468,130 +489,160 @@ where /// Notify syncing state machine that a new sync peer has connected. pub fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { - match self.add_peer_inner(peer_id, best_hash, best_number) { - Ok(Some(request)) => - self.actions.push(ChainSyncAction::SendBlockRequest { peer_id, request }), - Ok(None) => {}, - Err(bad_peer) => self.actions.push(ChainSyncAction::DropPeer(bad_peer)), - } + self.peers.insert( + peer_id, + PeerSync { + peer_id, + common_number: Zero::zero(), + best_hash, + best_number, + state: PeerSyncState::New, + }, + ); } - #[must_use] - fn add_peer_inner( + /// Process new peers assigning proper states and initiating requests. + fn handle_new_peers( &mut self, - peer_id: PeerId, - best_hash: B::Hash, - best_number: NumberFor, - ) -> Result>, BadPeer> { - // There is nothing sync can get from the node that has no blockchain data. - match self.block_status(&best_hash) { - Err(e) => { - debug!(target: LOG_TARGET, "Error reading blockchain: {e}"); - Err(BadPeer(peer_id, rep::BLOCKCHAIN_READ_ERROR)) - }, - Ok(BlockStatus::KnownBad) => { - info!( - "💔 New peer {peer_id} with known bad best block {best_hash} ({best_number})." - ); - Err(BadPeer(peer_id, rep::BAD_BLOCK)) - }, - Ok(BlockStatus::Unknown) => { - if best_number.is_zero() { + ) -> impl Iterator), BadPeer>> + '_ { + let new_peers = self + .peers + .iter() + .filter_map(|(peer_id, peer)| match peer.state { + PeerSyncState::New => Some((*peer_id, peer.clone())), + _ => None, + }) + .collect::)>>(); + + new_peers.into_iter().filter_map(|(peer_id, peer)| { + let best_hash = peer.best_hash; + let best_number = peer.best_number; + // There is nothing sync can get from the node that has no blockchain data. + match self.block_status(&best_hash) { + Err(e) => { + debug!(target: LOG_TARGET, "Error reading blockchain: {e}"); + Some(Err(BadPeer(peer_id, rep::BLOCKCHAIN_READ_ERROR))) + }, + Ok(BlockStatus::KnownBad) => { info!( - "💔 New peer {} with unknown genesis hash {} ({}).", - peer_id, best_hash, best_number, + "💔 New peer {peer_id} with known bad best block {best_hash} ({best_number})." ); - return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH)) - } + Some(Err(BadPeer(peer_id, rep::BAD_BLOCK))) + }, + Ok(BlockStatus::Unknown) => { + if best_number.is_zero() { + info!( + "💔 New peer {} with unknown genesis hash {} ({}).", + peer_id, best_hash, best_number, + ); + return Some(Err(BadPeer(peer_id, rep::GENESIS_MISMATCH))) + } + + // If there are more than `MAJOR_SYNC_BLOCKS` in the import queue then we + // have enough to do in the import queue that it's not worth kicking off + // an ancestor search, which is what we do in the next match case below. + if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS.into() { + debug!( + target: LOG_TARGET, + "New peer {} with unknown best hash {} ({}), assuming common block.", + peer_id, + self.best_queued_hash, + self.best_queued_number + ); + // Replace `PeerSyncState::New` peer. + self.peers.insert( + peer_id, + PeerSync { + peer_id, + common_number: self.best_queued_number, + best_hash, + best_number, + state: PeerSyncState::Available, + }, + ); + return None + } + + // If we are at genesis, just start downloading. + let (new_state, req) = if self.best_queued_number.is_zero() { + debug!( + target: LOG_TARGET, + "New peer {peer_id} with best hash {best_hash} ({best_number}).", + ); + + (Some(PeerSyncState::Available), None) + } else { + if self.peer_pool.try_reserve_peer(&peer_id) { + let common_best = std::cmp::min(self.best_queued_number, best_number); + + debug!( + target: LOG_TARGET, + "New peer {} with unknown best hash {} ({}), searching for common ancestor.", + peer_id, + best_hash, + best_number + ); + + ( + Some(PeerSyncState::AncestorSearch { + current: common_best, + start: self.best_queued_number, + state: AncestorSearchState::ExponentialBackoff(One::one()), + }), + Some(ancestry_request::(common_best)), + ) + } else { + trace!( + target: LOG_TARGET, + "`ChainSync` is aware of a new peer {peer_id} with unknown best \ + hash {best_hash} ({best_number}), but can't start ancestry search \ + as the peer is reserved by another syncing strategy.", + ); + + (None, None) + } + }; - // If there are more than `MAJOR_SYNC_BLOCKS` in the import queue then we have - // enough to do in the import queue that it's not worth kicking off - // an ancestor search, which is what we do in the next match case below. - if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS.into() { + if let Some(state) = new_state { + self.allowed_requests.add(&peer_id); + // Replace `PeerSyncState::New` peer. + self.peers.insert( + peer_id, + PeerSync { + peer_id, + common_number: Zero::zero(), + best_hash, + best_number, + state, + }, + ); + } + + req.map(|req| Ok((peer_id, req))) + }, + Ok(BlockStatus::Queued) | + Ok(BlockStatus::InChainWithState) | + Ok(BlockStatus::InChainPruned) => { debug!( target: LOG_TARGET, - "New peer {} with unknown best hash {} ({}), assuming common block.", - peer_id, - self.best_queued_hash, - self.best_queued_number + "New peer {peer_id} with known best hash {best_hash} ({best_number}).", ); + // Replace `PeerSyncState::New` peer. self.peers.insert( peer_id, PeerSync { peer_id, - common_number: self.best_queued_number, + common_number: std::cmp::min(self.best_queued_number, best_number), best_hash, best_number, state: PeerSyncState::Available, }, ); - return Ok(None) - } - - // If we are at genesis, just start downloading. - let (state, req) = if self.best_queued_number.is_zero() { - debug!( - target: LOG_TARGET, - "New peer {peer_id} with best hash {best_hash} ({best_number}).", - ); - - (PeerSyncState::Available, None) - } else { - let common_best = std::cmp::min(self.best_queued_number, best_number); - - debug!( - target: LOG_TARGET, - "New peer {} with unknown best hash {} ({}), searching for common ancestor.", - peer_id, - best_hash, - best_number - ); - - ( - PeerSyncState::AncestorSearch { - current: common_best, - start: self.best_queued_number, - state: AncestorSearchState::ExponentialBackoff(One::one()), - }, - Some(ancestry_request::(common_best)), - ) - }; - - self.allowed_requests.add(&peer_id); - self.peers.insert( - peer_id, - PeerSync { - peer_id, - common_number: Zero::zero(), - best_hash, - best_number, - state, - }, - ); - - Ok(req) - }, - Ok(BlockStatus::Queued) | - Ok(BlockStatus::InChainWithState) | - Ok(BlockStatus::InChainPruned) => { - debug!( - target: LOG_TARGET, - "New peer {peer_id} with known best hash {best_hash} ({best_number}).", - ); - self.peers.insert( - peer_id, - PeerSync { - peer_id, - common_number: std::cmp::min(self.best_queued_number, best_number), - best_hash, - best_number, - state: PeerSyncState::Available, - }, - ); - self.allowed_requests.add(&peer_id); - Ok(None) - }, - } + self.allowed_requests.add(&peer_id); + None + }, + } + }) } /// Inform sync about a new best imported block. @@ -690,6 +741,7 @@ where blocks.reverse() } self.allowed_requests.add(peer_id); + self.peer_pool.free_peer(peer_id); if let Some(request) = request { match &mut peer.state { PeerSyncState::DownloadingNew(_) => { @@ -887,6 +939,7 @@ where return Ok(()) } }, + PeerSyncState::New | PeerSyncState::Available | PeerSyncState::DownloadingJustification(..) | PeerSyncState::DownloadingState => Vec::new(), @@ -943,6 +996,7 @@ where }; self.allowed_requests.add(&peer_id); + self.peer_pool.free_peer(&peer_id); if let PeerSyncState::DownloadingJustification(hash) = peer.state { peer.state = PeerSyncState::Available; @@ -1312,34 +1366,35 @@ where ); let old_peers = std::mem::take(&mut self.peers); - old_peers.into_iter().for_each(|(peer_id, mut p)| { - // peers that were downloading justifications - // should be kept in that state. - if let PeerSyncState::DownloadingJustification(_) = p.state { - // We make sure our commmon number is at least something we have. - trace!( - target: LOG_TARGET, - "Keeping peer {} after restart, updating common number from={} => to={} (our best).", - peer_id, - p.common_number, - self.best_queued_number, - ); - p.common_number = self.best_queued_number; - self.peers.insert(peer_id, p); - return + old_peers.into_iter().for_each(|(peer_id, mut peer_sync)| { + match peer_sync.state { + PeerSyncState::New | PeerSyncState::Available => { + self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number); + }, + PeerSyncState::AncestorSearch { .. } | + PeerSyncState::DownloadingNew(_) | + PeerSyncState::DownloadingStale(_) | + PeerSyncState::DownloadingGap(_) | + PeerSyncState::DownloadingState => { + self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number); + self.actions.push(ChainSyncAction::CancelRequest { peer_id }); + self.peer_pool.free_peer(&peer_id); + }, + PeerSyncState::DownloadingJustification(_) => { + // Peers that were downloading justifications + // should be kept in that state. + // We make sure our commmon number is at least something we have. + trace!( + target: LOG_TARGET, + "Keeping peer {} after restart, updating common number from={} => to={} (our best).", + peer_id, + peer_sync.common_number, + self.best_queued_number, + ); + peer_sync.common_number = self.best_queued_number; + self.peers.insert(peer_id, peer_sync); + }, } - - // handle peers that were in other states. - let action = match self.add_peer_inner(peer_id, p.best_hash, p.best_number) { - // since the request is not a justification, remove it from pending responses - Ok(None) => ChainSyncAction::CancelBlockRequest { peer_id }, - // update the request if the new one is available - Ok(Some(request)) => ChainSyncAction::SendBlockRequest { peer_id, request }, - // this implies that we need to drop pending response from the peer - Err(bad_peer) => ChainSyncAction::DropPeer(bad_peer), - }; - - self.actions.push(action); }); } @@ -1488,11 +1543,13 @@ where /// Get justification requests scheduled by sync to be sent out. fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest)> { let peers = &mut self.peers; + let peer_pool = &self.peer_pool; let mut matcher = self.extra_justifications.matcher(); std::iter::from_fn(move || { - if let Some((peer, request)) = matcher.next(peers) { + if let Some((peer_id, request)) = matcher.next(peers, peer_pool) { + // TODO: reserve the peer in `PeerPool`. peers - .get_mut(&peer) + .get_mut(&peer_id) .expect( "`Matcher::next` guarantees the `PeerId` comes from the given peers; qed", ) @@ -1504,7 +1561,7 @@ where direction: Direction::Ascending, max: Some(1), }; - Some((peer, req)) + Some((peer_id, req)) } else { None } @@ -1535,98 +1592,116 @@ where let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads }; let max_blocks_per_request = self.max_blocks_per_request; let gap_sync = &mut self.gap_sync; - self.peers - .iter_mut() - .filter_map(move |(&id, peer)| { - if !peer.state.is_available() || !allowed_requests.contains(&id) { - return None - } - // If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away from - // the common number, the peer best number is higher than our best queued and the - // common number is smaller than the last finalized block number, we should do an - // ancestor search to find a better common block. If the queue is full we wait till - // all blocks are imported though. - if best_queued.saturating_sub(peer.common_number) > - MAX_BLOCKS_TO_LOOK_BACKWARDS.into() && - best_queued < peer.best_number && - peer.common_number < last_finalized && - queue.len() <= MAJOR_SYNC_BLOCKS.into() - { - trace!( - target: LOG_TARGET, - "Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.", - id, - peer.common_number, - best_queued, - ); - let current = std::cmp::min(peer.best_number, best_queued); - peer.state = PeerSyncState::AncestorSearch { - current, - start: best_queued, - state: AncestorSearchState::ExponentialBackoff(One::one()), - }; - Some((id, ancestry_request::(current))) - } else if let Some((range, req)) = peer_block_request( - &id, - peer, - blocks, - attrs, - max_parallel, - max_blocks_per_request, - last_finalized, - best_queued, - ) { - peer.state = PeerSyncState::DownloadingNew(range.start); - trace!( - target: LOG_TARGET, - "New block request for {}, (best:{}, common:{}) {:?}", - id, - peer.best_number, - peer.common_number, - req, - ); - Some((id, req)) - } else if let Some((hash, req)) = fork_sync_request( - &id, - fork_targets, - best_queued, - last_finalized, - attrs, - |hash| { - if queue.contains(hash) { - BlockStatus::Queued - } else { - client.block_status(*hash).unwrap_or(BlockStatus::Unknown) - } - }, - max_blocks_per_request, - ) { - trace!(target: LOG_TARGET, "Downloading fork {hash:?} from {id}"); - peer.state = PeerSyncState::DownloadingStale(hash); - Some((id, req)) - } else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| { - peer_gap_block_request( - &id, + self.peer_pool + .lock() + .available_peers() + .filter_map(|mut available_peer| { + let peer_id = available_peer.peer_id(); + if let Some(peer) = self.peers.get_mut(peer_id) { + if !allowed_requests.contains(&peer_id) { + return None + } + + // If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away + // from the common number, the peer best number is higher than our best queued + // and the common number is smaller than the last finalized block number, we + // should do an ancestor search to find a better common block. If the queue is + // full we wait till all blocks are imported though. + if best_queued.saturating_sub(peer.common_number) > + MAX_BLOCKS_TO_LOOK_BACKWARDS.into() && + best_queued < peer.best_number && + peer.common_number < last_finalized && + queue.len() <= MAJOR_SYNC_BLOCKS.into() + { + available_peer.reserve(); + trace!( + target: LOG_TARGET, + "Peer {:?} common block {} too far behind of our best {}. \ + Starting ancestry search.", + peer_id, + peer.common_number, + best_queued, + ); + let current = std::cmp::min(peer.best_number, best_queued); + peer.state = PeerSyncState::AncestorSearch { + current, + start: best_queued, + state: AncestorSearchState::ExponentialBackoff(One::one()), + }; + Some((*peer_id, ancestry_request::(current))) + } else if let Some((range, req)) = peer_block_request( + &peer_id, peer, - &mut sync.blocks, + blocks, attrs, - sync.target, - sync.best_queued_number, + max_parallel, max_blocks_per_request, - ) - }) { - peer.state = PeerSyncState::DownloadingGap(range.start); - trace!( + last_finalized, + best_queued, + ) { + available_peer.reserve(); + peer.state = PeerSyncState::DownloadingNew(range.start); + trace!( + target: LOG_TARGET, + "New block request for {}, (best:{}, common:{}) {:?}", + peer_id, + peer.best_number, + peer.common_number, + req, + ); + Some((*peer_id, req)) + } else if let Some((hash, req)) = fork_sync_request( + &peer_id, + fork_targets, + best_queued, + last_finalized, + attrs, + |hash| { + if queue.contains(hash) { + BlockStatus::Queued + } else { + client.block_status(*hash).unwrap_or(BlockStatus::Unknown) + } + }, + max_blocks_per_request, + ) { + available_peer.reserve(); + trace!(target: LOG_TARGET, "Downloading fork {hash:?} from {peer_id}"); + peer.state = PeerSyncState::DownloadingStale(hash); + Some((*peer_id, req)) + } else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| { + peer_gap_block_request( + &peer_id, + peer, + &mut sync.blocks, + attrs, + sync.target, + sync.best_queued_number, + max_blocks_per_request, + ) + }) { + available_peer.reserve(); + peer.state = PeerSyncState::DownloadingGap(range.start); + trace!( + target: LOG_TARGET, + "New gap block request for {}, (best:{}, common:{}) {:?}", + peer_id, + peer.best_number, + peer.common_number, + req, + ); + Some((*peer_id, req)) + } else { + None + } + } else { + warn!( target: LOG_TARGET, - "New gap block request for {}, (best:{}, common:{}) {:?}", - id, - peer.best_number, - peer.common_number, - req, + "State inconsistency: peer {peer_id} is in the pool of connected peers, \ + but not known to `ChainSync`.", ); - Some((id, req)) - } else { + debug_assert!(false); None } }) @@ -1649,13 +1724,27 @@ where return None } - for (id, peer) in self.peers.iter_mut() { - if peer.state.is_available() && peer.common_number >= sync.target_number() { - peer.state = PeerSyncState::DownloadingState; - let request = sync.next_request(); - trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request); - self.allowed_requests.clear(); - return Some((*id, OpaqueStateRequest(Box::new(request)))) + for mut available_peer in self.peer_pool.lock().available_peers() { + let peer_id = available_peer.peer_id(); + if let Some(peer) = self.peers.get_mut(&peer_id) { + if peer.state.is_available() && peer.common_number >= sync.target_number() { + available_peer.reserve(); + peer.state = PeerSyncState::DownloadingState; + let request = sync.next_request(); + trace!( + target: LOG_TARGET, + "New StateRequest for {peer_id}: {request:?}", + ); + self.allowed_requests.clear(); + return Some((*peer_id, OpaqueStateRequest(Box::new(request)))) + } + } else { + warn!( + target: LOG_TARGET, + "State inconsistency: peer {peer_id} is in the pool of connected peers, \ + but not known to `ChainSync`.", + ); + debug_assert!(false); } } } @@ -1681,6 +1770,7 @@ where if let PeerSyncState::DownloadingState = peer.state { peer.state = PeerSyncState::Available; self.allowed_requests.set_all(); + self.peer_pool.free_peer(peer_id); } } let import_result = if let Some(sync) = &mut self.state_sync { @@ -1863,6 +1953,15 @@ where /// Get pending actions to perform. #[must_use] pub fn actions(&mut self) -> impl Iterator> { + let new_peer_actions = self + .handle_new_peers() + .map(|res| match res { + Ok((peer_id, request)) => ChainSyncAction::SendBlockRequest { peer_id, request }, + Err(bad_peer) => ChainSyncAction::DropPeer(bad_peer), + }) + .collect::>>(); + self.actions.extend(new_peer_actions.into_iter()); + let block_requests = self .block_requests() .into_iter() diff --git a/substrate/client/network/sync/src/strategy/chain_sync/test.rs b/substrate/client/network/sync/src/strategy/chain_sync/test.rs index c89096bc6c90..395502f8b233 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync/test.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync/test.rs @@ -37,8 +37,18 @@ fn processes_empty_response_on_justification_request_for_unknown_block() { let client = Arc::new(TestClientBuilder::new().build()); let peer_id = PeerId::random(); - - let mut sync = ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 64, None).unwrap(); + let peer_pool = PeerPool::default(); + + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 1, + 64, + None, + peer_pool.clone(), + std::iter::empty(), + ) + .unwrap(); let (a1_hash, a1_number) = { let a1 = BlockBuilderBuilder::new(&*client) @@ -53,7 +63,10 @@ fn processes_empty_response_on_justification_request_for_unknown_block() { }; // add a new peer with the same best block + peer_pool.add_peer(peer_id); sync.add_peer(peer_id, a1_hash, a1_number); + // Transit the peer into a requestable state + let _ = sync.handle_new_peers().collect::>(); // and request a justification for the block sync.request_justification(&a1_hash, a1_number); @@ -90,8 +103,20 @@ fn processes_empty_response_on_justification_request_for_unknown_block() { #[test] fn restart_doesnt_affect_peers_downloading_finality_data() { let mut client = Arc::new(TestClientBuilder::new().build()); - - let mut sync = ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 64, None).unwrap(); + let peer_pool = PeerPool::default(); + + // we request max 8 blocks to always initiate block requests to both peers for the test to be + // deterministic + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 1, + 8, + None, + peer_pool.clone(), + std::iter::empty(), + ) + .unwrap(); let peer_id1 = PeerId::random(); let peer_id2 = PeerId::random(); @@ -117,18 +142,28 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { let (b1_hash, b1_number) = new_blocks(50); // add 2 peers at blocks that we don't have locally + peer_pool.add_peer(peer_id1); + peer_pool.add_peer(peer_id2); + // note peer 2 is at block 10 > 8, so we always have something to request from it, even if the + // first request went to peer 1 sync.add_peer(peer_id1, Hash::random(), 42); sync.add_peer(peer_id2, Hash::random(), 10); // we wil send block requests to these peers // for these blocks we don't know about - assert!(sync - .block_requests() - .into_iter() - .all(|(p, _)| { p == peer_id1 || p == peer_id2 })); + let actions = sync.actions().collect::>(); + assert_eq!(actions.len(), 2); + assert!(actions.iter().all(|action| match action { + ChainSyncAction::SendBlockRequest { peer_id, .. } => + peer_id == &peer_id1 || peer_id == &peer_id2, + _ => false, + })); // add a new peer at a known block + peer_pool.add_peer(peer_id3); sync.add_peer(peer_id3, b1_hash, b1_number); + // transit the peer into a requestable state + let _ = sync.handle_new_peers().collect::>(); // we request a justification for a block we have locally sync.request_justification(&b1_hash, b1_number); @@ -146,22 +181,27 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { PeerSyncState::DownloadingJustification(b1_hash), ); - // clear old actions + // drop old actions let _ = sync.take_actions(); // we restart the sync state sync.restart(); - let actions = sync.take_actions().collect::>(); - // which should make us send out block requests to the first two peers - assert_eq!(actions.len(), 2); - assert!(actions.iter().all(|action| match action { + // which should make us cancel and send out again block requests to the first two peers + let actions = sync.actions().collect::>(); + assert_eq!(actions.len(), 4); + assert!(actions.iter().take(2).all(|action| match action { + ChainSyncAction::CancelRequest { peer_id, .. } => + peer_id == &peer_id1 || peer_id == &peer_id2, + _ => false, + })); + assert!(actions.iter().skip(2).all(|action| match action { ChainSyncAction::SendBlockRequest { peer_id, .. } => peer_id == &peer_id1 || peer_id == &peer_id2, _ => false, })); - // peer 3 should be unaffected it was downloading finality data + // peer 3 should be unaffected as it was downloading finality data assert_eq!( sync.peers.get(&peer_id3).unwrap().state, PeerSyncState::DownloadingJustification(b1_hash), @@ -262,7 +302,7 @@ fn unwrap_from_block_number(from: FromBlock) -> u64 { /// announcement from this node in its sync process. Meaning our common number didn't change. It /// is now expected that we start an ancestor search to find the common number. #[test] -fn do_ancestor_search_when_common_block_to_best_qeued_gap_is_to_big() { +fn do_ancestor_search_when_common_block_to_best_queued_gap_is_to_big() { sp_tracing::try_init_simple(); let blocks = { @@ -274,8 +314,18 @@ fn do_ancestor_search_when_common_block_to_best_qeued_gap_is_to_big() { let mut client = Arc::new(TestClientBuilder::new().build()); let info = client.info(); - - let mut sync = ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None).unwrap(); + let peer_pool = PeerPool::default(); + + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 5, + 64, + None, + peer_pool.clone(), + std::iter::empty(), + ) + .unwrap(); let peer_id1 = PeerId::random(); let peer_id2 = PeerId::random(); @@ -283,8 +333,12 @@ fn do_ancestor_search_when_common_block_to_best_qeued_gap_is_to_big() { let best_block = blocks.last().unwrap().clone(); let max_blocks_to_request = sync.max_blocks_per_request; // Connect the node we will sync from + peer_pool.add_peer(peer_id1); + peer_pool.add_peer(peer_id2); sync.add_peer(peer_id1, best_block.hash(), *best_block.header().number()); sync.add_peer(peer_id2, info.best_hash, 0); + // Transit peers into requestable state + let _ = sync.handle_new_peers().collect::>(); let mut best_block_num = 0; while best_block_num < MAX_DOWNLOAD_AHEAD { @@ -420,8 +474,18 @@ fn can_sync_huge_fork() { }; let info = client.info(); - - let mut sync = ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None).unwrap(); + let peer_pool = PeerPool::default(); + + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 5, + 64, + None, + peer_pool.clone(), + std::iter::empty(), + ) + .unwrap(); let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2 - 1].clone(); let just = (*b"TEST", Vec::new()); @@ -432,6 +496,7 @@ fn can_sync_huge_fork() { let common_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize / 2].clone(); // Connect the node we will sync from + peer_pool.add_peer(peer_id1); sync.add_peer(peer_id1, common_block.hash(), *common_block.header().number()); send_block_announce(fork_blocks.last().unwrap().header().clone(), peer_id1, &mut sync); @@ -553,8 +618,18 @@ fn syncs_fork_without_duplicate_requests() { }; let info = client.info(); - - let mut sync = ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None).unwrap(); + let peer_pool = PeerPool::default(); + + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 5, + 64, + None, + peer_pool.clone(), + std::iter::empty(), + ) + .unwrap(); let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2 - 1].clone(); let just = (*b"TEST", Vec::new()); @@ -565,6 +640,7 @@ fn syncs_fork_without_duplicate_requests() { let common_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize / 2].clone(); // Connect the node we will sync from + peer_pool.add_peer(peer_id1); sync.add_peer(peer_id1, common_block.hash(), *common_block.header().number()); send_block_announce(fork_blocks.last().unwrap().header().clone(), peer_id1, &mut sync); @@ -688,12 +764,23 @@ fn removes_target_fork_on_disconnect() { sp_tracing::try_init_simple(); let mut client = Arc::new(TestClientBuilder::new().build()); let blocks = (0..3).map(|_| build_block(&mut client, None, false)).collect::>(); - - let mut sync = ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 64, None).unwrap(); + let peer_pool = PeerPool::default(); + + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 1, + 64, + None, + peer_pool.clone(), + std::iter::empty(), + ) + .unwrap(); let peer_id1 = PeerId::random(); let common_block = blocks[1].clone(); // Connect the node we will sync from + peer_pool.add_peer(peer_id1); sync.add_peer(peer_id1, common_block.hash(), *common_block.header().number()); // Create a "new" header and announce it @@ -713,13 +800,26 @@ fn can_import_response_with_missing_blocks() { let blocks = (0..4).map(|_| build_block(&mut client2, None, false)).collect::>(); let empty_client = Arc::new(TestClientBuilder::new().build()); - - let mut sync = ChainSync::new(ChainSyncMode::Full, empty_client.clone(), 1, 64, None).unwrap(); + let peer_pool = PeerPool::default(); + + let mut sync = ChainSync::new( + ChainSyncMode::Full, + empty_client.clone(), + 1, + 64, + None, + peer_pool.clone(), + std::iter::empty(), + ) + .unwrap(); let peer_id1 = PeerId::random(); let best_block = blocks[3].clone(); + peer_pool.add_peer(peer_id1); sync.add_peer(peer_id1, best_block.hash(), *best_block.header().number()); + // Manually prepare the peer for a block request. + sync.allowed_requests.add(&peer_id1); sync.peers.get_mut(&peer_id1).unwrap().state = PeerSyncState::Available; sync.peers.get_mut(&peer_id1).unwrap().common_number = 0; @@ -745,7 +845,17 @@ fn ancestor_search_repeat() { #[test] fn sync_restart_removes_block_but_not_justification_requests() { let mut client = Arc::new(TestClientBuilder::new().build()); - let mut sync = ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 64, None).unwrap(); + let peer_pool = PeerPool::default(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 1, + 64, + None, + peer_pool.clone(), + std::iter::empty(), + ) + .unwrap(); let peers = vec![PeerId::random(), PeerId::random()]; @@ -768,12 +878,16 @@ fn sync_restart_removes_block_but_not_justification_requests() { let (b1_hash, b1_number) = new_blocks(50); - // add new peer and request blocks from them - sync.add_peer(peers[0], Hash::random(), 42); - // we don't actually perform any requests, just keep track of peers waiting for a response let mut pending_responses = HashSet::new(); + // add new peer and request blocks from them + peer_pool.add_peer(peers[0]); + sync.add_peer(peers[0], Hash::random(), 42); + sync.handle_new_peers().for_each(|result| { + pending_responses.insert(result.unwrap().0); + }); + // we wil send block requests to these peers // for these blocks we don't know about for (peer, _request) in sync.block_requests() { @@ -782,7 +896,11 @@ fn sync_restart_removes_block_but_not_justification_requests() { } // add a new peer at a known block + peer_pool.add_peer(peers[1]); sync.add_peer(peers[1], b1_hash, b1_number); + sync.handle_new_peers().for_each(|result| { + pending_responses.insert(result.unwrap().0); + }); // we request a justification for a block we have locally sync.request_justification(&b1_hash, b1_number); @@ -810,10 +928,10 @@ fn sync_restart_removes_block_but_not_justification_requests() { // restart sync sync.restart(); - let actions = sync.take_actions().collect::>(); + let actions = sync.actions().collect::>(); for action in actions.iter() { match action { - ChainSyncAction::CancelBlockRequest { peer_id } => { + ChainSyncAction::CancelRequest { peer_id } => { pending_responses.remove(&peer_id); }, ChainSyncAction::SendBlockRequest { peer_id, .. } => { @@ -887,13 +1005,26 @@ fn request_across_forks() { fork_blocks }; - let mut sync = ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None).unwrap(); + let peer_pool = PeerPool::default(); + + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 5, + 64, + None, + peer_pool.clone(), + std::iter::empty(), + ) + .unwrap(); // Add the peers, all at the common ancestor 100. let common_block = blocks.last().unwrap(); let peer_id1 = PeerId::random(); + peer_pool.add_peer(peer_id1); sync.add_peer(peer_id1, common_block.hash(), *common_block.header().number()); let peer_id2 = PeerId::random(); + peer_pool.add_peer(peer_id2); sync.add_peer(peer_id2, common_block.hash(), *common_block.header().number()); // Peer 1 announces 107 from fork 1, 100-107 get downloaded. @@ -965,3 +1096,368 @@ fn request_across_forks() { assert!(sync.is_known(&block.header.parent_hash())); } } + +#[test] +fn block_request_at_genesis_reserves_peer_in_peer_pool() { + let client = Arc::new(TestClientBuilder::new().build()); + let peer_pool = PeerPool::default(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 1, + 64, + None, + peer_pool.clone(), + std::iter::empty(), + ) + .unwrap(); + + // Add a peer. + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + sync.add_peer(peer_id, Hash::random(), 10); + + // A request is sent to our peer. + let actions = sync.actions().collect::>(); + assert_eq!(actions.len(), 1); + assert!(actions.iter().all(|action| match action { + ChainSyncAction::SendBlockRequest { peer_id: requested_peer_id, .. } => + requested_peer_id == &peer_id, + _ => false, + })); + + // The peer is reserved in `PeerPool`. + assert_eq!(peer_pool.lock().available_peers().count(), 0); +} + +#[test] +fn block_response_frees_peer_in_peer_pool() { + let client = Arc::new(TestClientBuilder::new().build()); + let peer_pool = PeerPool::default(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 1, + 64, + None, + peer_pool.clone(), + std::iter::empty(), + ) + .unwrap(); + + // Create blocks. + let blocks = { + let mut client = Arc::new(TestClientBuilder::new().build()); + (0..10).map(|_| build_block(&mut client, None, false)).collect::>() + }; + let best_block = blocks.last().unwrap().clone(); + + // Add a peer. + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + sync.add_peer(peer_id, best_block.hash(), *best_block.header().number()); + + // Transit peers into requestable state + let _ = sync.handle_new_peers().collect::>(); + + // A request is sent to our peer. + let request = get_block_request(&mut sync, FromBlock::Hash(best_block.hash()), 10, &peer_id); + + // The peer is reserved in `PeerPool`. + assert_eq!(peer_pool.lock().available_peers().count(), 0); + + // Receive response. + let mut response_blocks = blocks.clone(); + response_blocks.reverse(); + let response = create_block_response(response_blocks); + sync.on_block_data(&peer_id, Some(request), response).unwrap(); + + // The peer is now available again. + assert!(peer_pool.lock().available_peers().any(|p| *p.peer_id() == peer_id)); +} + +#[test] +fn new_peer_ancestry_search_reserves_peer_in_peer_pool() { + let mut client = Arc::new(TestClientBuilder::new().build()); + + // Import blocks to make sure we are not at genesis to kick in ancestry search. + let block = BlockBuilderBuilder::new(&*client) + .on_parent_block(client.chain_info().best_hash) + .with_parent_block_number(client.chain_info().best_number) + .build() + .unwrap() + .build() + .unwrap() + .block; + block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + + let peer_pool = PeerPool::default(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 1, + 64, + None, + peer_pool.clone(), + std::iter::empty(), + ) + .unwrap(); + + // Check that we are not at genesis, so ancestry search will be started. + assert!(!sync.best_queued_number.is_zero()); + + // Add a peer. + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + assert!(peer_pool.lock().available_peers().any(|p| *p.peer_id() == peer_id)); + sync.add_peer(peer_id, Hash::random(), 10); + + // Initiate ancestry search request to our peer. + let _ = sync.handle_new_peers().collect::>(); + + // Make sure we are in fact doing ancestry search. + assert!(matches!( + sync.peers.get(&peer_id).unwrap(), + PeerSync { state, .. } if matches!(state, PeerSyncState::AncestorSearch { .. }))); + + // The peer is reserved in `PeerPool`. + assert_eq!(peer_pool.lock().available_peers().count(), 0); +} + +#[test] +fn forced_ancestry_search_reserves_peer_in_peer_pool() { + let mut client = Arc::new(TestClientBuilder::new().build()); + + let peer_pool = PeerPool::default(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 1, + 64, + None, + peer_pool.clone(), + std::iter::empty(), + ) + .unwrap(); + + // Add a peer that will be higher than our queued number (see below). + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + assert!(peer_pool.lock().available_peers().any(|p| *p.peer_id() == peer_id)); + sync.add_peer(peer_id, Hash::random(), (MAX_BLOCKS_TO_LOOK_BACKWARDS + 2).into()); + + // Make sure adding peer doesn't generate ancestry request in `handle_new_peers`. + assert_eq!(sync.handle_new_peers().count(), 0); + assert!(matches!( + sync.peers.get(&peer_id).unwrap(), + PeerSync { state, .. } if matches!(state, PeerSyncState::Available))); + + // To trigger desired branch in `ChainSync` we must be more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` + // above the common number (genesis), the peer best number should be higher than our best queud, + // and the common number must be smaller than the last finalized number. + let blocks = (0..MAX_BLOCKS_TO_LOOK_BACKWARDS + 1) + .map(|_| build_block(&mut client, None, false)) + .collect::>(); + + // Manually update best queued. + let info = client.info(); + sync.best_queued_hash = info.best_hash; + sync.best_queued_number = info.best_number; + + // Finalize block 1. + let just = (*b"TEST", Vec::new()); + client.finalize_block(blocks[0].hash(), Some(just)).unwrap(); + + // Now the desired branch in `block_requests` should be triggered. + let _ = sync.block_requests(); + + // Make sure we are in fact doing ancestry search. + assert!(matches!( + sync.peers.get(&peer_id).unwrap(), + PeerSync { state, .. } if matches!(state, PeerSyncState::AncestorSearch { .. }))); + + // The peer is reserved in `PeerPool`. + assert_eq!(peer_pool.lock().available_peers().count(), 0); +} + +#[test] +fn peer_block_request_reserves_peer_in_peer_pool() { + let client = Arc::new(TestClientBuilder::new().build()); + + let peer_pool = PeerPool::default(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 1, + 64, + None, + peer_pool.clone(), + std::iter::empty(), + ) + .unwrap(); + + // Add a peer that will be higher than our queued number (see below). + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + assert!(peer_pool.lock().available_peers().any(|p| *p.peer_id() == peer_id)); + sync.add_peer(peer_id, Hash::random(), 10); + + // Make sure adding peer doesn't generate fork request in `handle_new_peers`. + assert_eq!(sync.handle_new_peers().count(), 0); + assert!(matches!( + sync.peers.get(&peer_id).unwrap(), + PeerSync { state, .. } if matches!(state, PeerSyncState::Available))); + + // Now we should be downloading new blocks. + let _ = sync.block_requests(); + assert!(matches!( + sync.peers.get(&peer_id).unwrap(), + PeerSync { state, .. } if matches!(state, PeerSyncState::DownloadingNew(_)))); + + // The peer is reserved in `PeerPool`. + assert_eq!(peer_pool.lock().available_peers().count(), 0); +} + +#[test] +fn fork_sync_request_reserves_peer_in_peer_pool() { + let client = Arc::new(TestClientBuilder::new().build()); + + let peer_pool = PeerPool::default(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 1, + 64, + None, + peer_pool.clone(), + std::iter::empty(), + ) + .unwrap(); + + // Add a peer that will be higher than our queued number (see below). + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + assert!(peer_pool.lock().available_peers().any(|p| *p.peer_id() == peer_id)); + let fork_hash = Hash::random(); + let fork_number = 10; + sync.add_peer(peer_id, fork_hash, fork_number); + + // Make sure adding peer doesn't generate fork request in `handle_new_peers`. + assert_eq!(sync.handle_new_peers().count(), 0); + assert!(matches!( + sync.peers.get(&peer_id).unwrap(), + PeerSync { state, .. } if matches!(state, PeerSyncState::Available))); + + // Add a fork target. + sync.set_sync_fork_request(vec![peer_id], &fork_hash, fork_number); + + // Manually patch `best_queued` to be > `peer.best_number` to trigger fork request. + sync.best_queued_number = 20; + + // Now we should be downloading an old fork. + let _ = sync.block_requests(); + assert!(matches!( + sync.peers.get(&peer_id).unwrap(), + PeerSync { state, .. } if matches!(state, PeerSyncState::DownloadingStale(_)))); + + // The peer is reserved in `PeerPool`. + assert_eq!(peer_pool.lock().available_peers().count(), 0); +} + +#[test] +fn justification_request_reserves_peer_in_peer_pool() { + let mut client = Arc::new(TestClientBuilder::new().build()); + + let peer_pool = PeerPool::default(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 1, + 64, + None, + peer_pool.clone(), + std::iter::empty(), + ) + .unwrap(); + + // Add a peer that will be higher than our queued number (see below). + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + assert!(peer_pool.lock().available_peers().any(|p| *p.peer_id() == peer_id)); + sync.add_peer(peer_id, Hash::random(), (MAX_BLOCKS_TO_LOOK_BACKWARDS + 2).into()); + + // Make sure adding peer doesn't generate ancestry request in `handle_new_peers`. + assert_eq!(sync.handle_new_peers().count(), 0); + assert!(matches!( + sync.peers.get(&peer_id).unwrap(), + PeerSync { state, .. } if matches!(state, PeerSyncState::Available))); + + // Import some blocks to request justifications for. + let blocks = (0..10).map(|_| build_block(&mut client, None, false)).collect::>(); + + let best_block = blocks.last().unwrap(); + + // Request an extra justification for block 10. + sync.request_justification(&best_block.hash(), *best_block.header().number()); + + // Generate request. + let _ = sync.justification_requests(); + assert!(matches!( + sync.peers.get(&peer_id).unwrap(), + PeerSync { state, .. } if matches!(state, PeerSyncState::DownloadingJustification(_)))); + + // The peer is reserved in `PeerPool`. + assert_eq!(peer_pool.lock().available_peers().count(), 0); +} + +#[test] +fn justification_response_frees_peer_in_peer_pool() { + let mut client = Arc::new(TestClientBuilder::new().build()); + + let peer_pool = PeerPool::default(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 1, + 64, + None, + peer_pool.clone(), + std::iter::empty(), + ) + .unwrap(); + + // Add a peer that will be higher than our queued number (see below). + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + assert!(peer_pool.lock().available_peers().any(|p| *p.peer_id() == peer_id)); + sync.add_peer(peer_id, Hash::random(), (MAX_BLOCKS_TO_LOOK_BACKWARDS + 2).into()); + + // Make sure adding peer doesn't generate ancestry request in `handle_new_peers`. + assert_eq!(sync.handle_new_peers().count(), 0); + assert!(matches!( + sync.peers.get(&peer_id).unwrap(), + PeerSync { state, .. } if matches!(state, PeerSyncState::Available))); + + // Import some blocks to request justifications for. + let blocks = (0..10).map(|_| build_block(&mut client, None, false)).collect::>(); + + let best_block = blocks.last().unwrap(); + + // Request an extra justification for block 10. + sync.request_justification(&best_block.hash(), *best_block.header().number()); + + // Generate request. + let (_peer_id, request) = sync.justification_requests().pop().unwrap(); + assert!(matches!( + sync.peers.get(&peer_id).unwrap(), + PeerSync { state, .. } if matches!(state, PeerSyncState::DownloadingJustification(_)))); + + // The peer is reserved in `PeerPool`. + assert_eq!(peer_pool.lock().available_peers().count(), 0); + + // We receive a justification response (dummy payload doesn't matter). + sync.on_block_response(peer_id, request, Vec::new()); + + // The peer is freed in `PeerPool`. + assert!(peer_pool.lock().available_peers().any(|p| *p.peer_id() == peer_id)); +} diff --git a/substrate/client/network/sync/src/strategy/state.rs b/substrate/client/network/sync/src/strategy/state.rs index ae3f7b600559..6bb1e8b4ef03 100644 --- a/substrate/client/network/sync/src/strategy/state.rs +++ b/substrate/client/network/sync/src/strategy/state.rs @@ -19,6 +19,7 @@ //! State sync strategy. use crate::{ + peer_pool::PeerPool, schema::v1::StateResponse, strategy::state_sync::{ImportResult, StateSync, StateSyncProvider}, types::{BadPeer, OpaqueStateRequest, OpaqueStateResponse, SyncState, SyncStatus}, @@ -63,12 +64,6 @@ enum PeerState { DownloadingState, } -impl PeerState { - fn is_available(&self) -> bool { - matches!(self, PeerState::Available) - } -} - struct Peer { best_number: NumberFor, state: PeerState, @@ -80,6 +75,7 @@ pub struct StateStrategy { peers: HashMap>, actions: Vec>, succeded: bool, + peer_pool: PeerPool, } impl StateStrategy { @@ -91,6 +87,7 @@ impl StateStrategy { target_justifications: Option, skip_proof: bool, initial_peers: impl Iterator)>, + peer_pool: PeerPool, ) -> Self where Client: ProofProvider + Send + Sync + 'static, @@ -111,6 +108,7 @@ impl StateStrategy { peers, actions: Vec::new(), succeded: false, + peer_pool, } } @@ -120,6 +118,7 @@ impl StateStrategy { fn new_with_provider( state_sync_provider: Box>, initial_peers: impl Iterator)>, + peer_pool: PeerPool, ) -> Self { Self { state_sync: state_sync_provider, @@ -130,6 +129,7 @@ impl StateStrategy { .collect(), actions: Vec::new(), succeded: false, + peer_pool, } } @@ -179,6 +179,7 @@ impl StateStrategy { if let Some(peer) = self.peers.get_mut(&peer_id) { peer.state = PeerState::Available; } + self.peer_pool.free_peer(&peer_id); let response: Box = response.0.downcast().map_err(|_error| { error!( @@ -305,7 +306,7 @@ impl StateStrategy { // Find a random peer that is synced as much as peer majority and is above // `min_best_number`. for (peer_id, peer) in self.peers.iter_mut() { - if peer.state.is_available() && peer.best_number >= threshold { + if peer.best_number >= threshold && self.peer_pool.try_reserve_peer(peer_id) { peer.state = new_state; return Some(*peer_id) } @@ -330,11 +331,6 @@ impl StateStrategy { } } - /// Get the number of peers known to syncing. - pub fn num_peers(&self) -> usize { - self.peers.len() - } - /// Get actions that should be performed by the owner on [`WarpSync`]'s behalf #[must_use] pub fn actions(&mut self) -> impl Iterator> { @@ -397,8 +393,15 @@ mod test { .block; let target_header = target_block.header().clone(); - let mut state_strategy = - StateStrategy::new(client, target_header, None, None, false, std::iter::empty()); + let mut state_strategy = StateStrategy::new( + client, + target_header, + None, + None, + false, + std::iter::empty(), + Default::default(), + ); assert!(state_strategy .schedule_next_peer(PeerState::DownloadingState, Zero::zero()) @@ -422,6 +425,8 @@ mod test { .map(|best_number| (PeerId::random(), best_number)) .collect::>(); let initial_peers = peers.iter().map(|(p, n)| (*p, *n)); + let peer_pool = PeerPool::default(); + peers.iter().for_each(|(peer_id, _)| peer_pool.add_peer(*peer_id)); let mut state_strategy = StateStrategy::new( client.clone(), @@ -430,6 +435,7 @@ mod test { None, false, initial_peers, + peer_pool, ); let peer_id = @@ -455,6 +461,8 @@ mod test { .map(|best_number| (PeerId::random(), best_number)) .collect::>(); let initial_peers = peers.iter().map(|(p, n)| (*p, *n)); + let peer_pool = PeerPool::default(); + peers.iter().for_each(|(peer_id, _)| peer_pool.add_peer(*peer_id)); let mut state_strategy = StateStrategy::new( client.clone(), @@ -463,6 +471,7 @@ mod test { None, false, initial_peers, + peer_pool, ); let peer_id = state_strategy.schedule_next_peer(PeerState::DownloadingState, 10); @@ -482,7 +491,10 @@ mod test { .unwrap() .block; - let initial_peers = (1..=10).map(|best_number| (PeerId::random(), best_number)); + let initial_peers = + (1..=10).map(|best_number| (PeerId::random(), best_number)).collect::>(); + let peer_pool = PeerPool::default(); + initial_peers.iter().for_each(|(peer_id, _)| peer_pool.add_peer(*peer_id)); let mut state_strategy = StateStrategy::new( client.clone(), @@ -490,7 +502,8 @@ mod test { None, None, false, - initial_peers, + initial_peers.into_iter(), + peer_pool, ); let (_peer_id, mut opaque_request) = state_strategy.state_request().unwrap(); @@ -512,7 +525,10 @@ mod test { .unwrap() .block; - let initial_peers = (1..=10).map(|best_number| (PeerId::random(), best_number)); + let initial_peers = + (1..=10).map(|best_number| (PeerId::random(), best_number)).collect::>(); + let peer_pool = PeerPool::default(); + initial_peers.iter().for_each(|(peer_id, _)| peer_pool.add_peer(*peer_id)); let mut state_strategy = StateStrategy::new( client.clone(), @@ -520,7 +536,8 @@ mod test { None, None, false, - initial_peers, + initial_peers.into_iter(), + peer_pool, ); // First request is sent. @@ -531,20 +548,66 @@ mod test { } #[test] - fn received_state_response_makes_peer_available_again() { + fn state_request_reserves_peer() { + let client = Arc::new(TestClientBuilder::new().set_no_genesis().build()); + let target_block = BlockBuilderBuilder::new(&*client) + .on_parent_block(client.chain_info().best_hash) + .with_parent_block_number(client.chain_info().best_number) + .build() + .unwrap() + .build() + .unwrap() + .block; + + let initial_peers = + (1..=10).map(|best_number| (PeerId::random(), best_number)).collect::>(); + let peer_pool = PeerPool::default(); + initial_peers.iter().for_each(|(peer_id, _)| peer_pool.add_peer(*peer_id)); + + let mut state_strategy = StateStrategy::new( + client.clone(), + target_block.header().clone(), + None, + None, + false, + initial_peers.into_iter(), + peer_pool.clone(), + ); + + let (peer_id, _opaque_request) = state_strategy.state_request().unwrap(); + + // The peer requested is reserved. + assert!(matches!( + state_strategy.peers.get(&peer_id).unwrap().state, + PeerState::DownloadingState, + )); + // Also in `PeerPool`. + assert!(!peer_pool.lock().available_peers().any(|p| *p.peer_id() == peer_id)); + } + + #[test] + fn state_response_makes_peer_available_again() { let mut state_sync_provider = MockStateSync::::new(); state_sync_provider.expect_import().return_once(|_| ImportResult::Continue); let peer_id = PeerId::random(); let initial_peers = std::iter::once((peer_id, 10)); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), initial_peers); + let peer_pool = PeerPool::default(); + peer_pool.add_peer(peer_id); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + initial_peers, + peer_pool.clone(), + ); // Manually set the peer's state. + assert!(peer_pool.try_reserve_peer(&peer_id)); state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState; let dummy_response = OpaqueStateResponse(Box::new(StateResponse::default())); state_strategy.on_state_response(peer_id, dummy_response); - assert!(state_strategy.peers.get(&peer_id).unwrap().state.is_available()); + assert!(matches!(state_strategy.peers.get(&peer_id).unwrap().state, PeerState::Available)); + // Peer is also available in `PeerPool`. + assert!(peer_pool.try_reserve_peer(&peer_id)); } #[test] @@ -554,12 +617,19 @@ mod test { state_sync_provider.expect_import().return_once(|_| ImportResult::BadResponse); let peer_id = PeerId::random(); let initial_peers = std::iter::once((peer_id, 10)); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), initial_peers); + let peer_pool = PeerPool::default(); + peer_pool.add_peer(peer_id); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + initial_peers, + peer_pool.clone(), + ); // Manually set the peer's state. + assert!(peer_pool.try_reserve_peer(&peer_id)); state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState; + + // Receiving a response drops the peer. let dummy_response = OpaqueStateResponse(Box::new(StateResponse::default())); - // Receiving response drops the peer. assert!(matches!( state_strategy.on_state_response_inner(peer_id, dummy_response), Err(BadPeer(id, _rep)) if id == peer_id, @@ -573,9 +643,15 @@ mod test { state_sync_provider.expect_import().return_once(|_| ImportResult::Continue); let peer_id = PeerId::random(); let initial_peers = std::iter::once((peer_id, 10)); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), initial_peers); + let peer_pool = PeerPool::default(); + peer_pool.add_peer(peer_id); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + initial_peers, + peer_pool.clone(), + ); // Manually set the peer's state . + assert!(peer_pool.try_reserve_peer(&peer_id)); state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState; let dummy_response = OpaqueStateResponse(Box::new(StateResponse::default())); @@ -632,9 +708,15 @@ mod test { // Prepare `StateStrategy`. let peer_id = PeerId::random(); let initial_peers = std::iter::once((peer_id, 10)); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), initial_peers); + let peer_pool = PeerPool::default(); + peer_pool.add_peer(peer_id); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + initial_peers, + peer_pool.clone(), + ); // Manually set the peer's state . + assert!(peer_pool.try_reserve_peer(&peer_id)); state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState; // Receive response. @@ -656,8 +738,11 @@ mod test { let mut state_sync_provider = MockStateSync::::new(); state_sync_provider.expect_target_hash().return_const(target_hash); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), std::iter::empty()); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + std::iter::empty(), + Default::default(), + ); // Unknown block imported. state_strategy.on_blocks_processed( @@ -679,8 +764,11 @@ mod test { let mut state_sync_provider = MockStateSync::::new(); state_sync_provider.expect_target_hash().return_const(target_hash); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), std::iter::empty()); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + std::iter::empty(), + Default::default(), + ); // Target block imported. state_strategy.on_blocks_processed( @@ -703,8 +791,11 @@ mod test { let mut state_sync_provider = MockStateSync::::new(); state_sync_provider.expect_target_hash().return_const(target_hash); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), std::iter::empty()); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + std::iter::empty(), + Default::default(), + ); // Target block import failed. state_strategy.on_blocks_processed( @@ -729,10 +820,16 @@ mod test { state_sync_provider.expect_is_complete().return_const(true); // Get enough peers for possible spurious requests. - let initial_peers = (1..=10).map(|best_number| (PeerId::random(), best_number)); - - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), initial_peers); + let initial_peers = + (1..=10).map(|best_number| (PeerId::random(), best_number)).collect::>(); + let peer_pool = PeerPool::default(); + initial_peers.iter().for_each(|(peer_id, _)| peer_pool.add_peer(*peer_id)); + + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + initial_peers.into_iter(), + peer_pool, + ); state_strategy.on_blocks_processed( 1, diff --git a/substrate/client/network/sync/src/strategy/warp.rs b/substrate/client/network/sync/src/strategy/warp.rs index 7935b5f29b68..ae87d8409d3b 100644 --- a/substrate/client/network/sync/src/strategy/warp.rs +++ b/substrate/client/network/sync/src/strategy/warp.rs @@ -21,6 +21,7 @@ pub use sp_consensus_grandpa::{AuthorityList, SetId}; use crate::{ + peer_pool::PeerPool, strategy::chain_sync::validate_blocks, types::{BadPeer, SyncState, SyncStatus}, LOG_TARGET, @@ -204,12 +205,6 @@ enum PeerState { DownloadingTargetBlock, } -impl PeerState { - fn is_available(&self) -> bool { - matches!(self, PeerState::Available) - } -} - struct Peer { best_number: NumberFor, state: PeerState, @@ -242,6 +237,7 @@ pub struct WarpSync { peers: HashMap>, actions: Vec>, result: Option>, + peer_pool: PeerPool, } impl WarpSync @@ -252,7 +248,11 @@ where /// Create a new instance. When passing a warp sync provider we will be checking for proof and /// authorities. Alternatively we can pass a target block when we want to skip downloading /// proofs, in this case we will continue polling until the target block is known. - pub fn new(client: Arc, warp_sync_config: WarpSyncConfig) -> Self { + pub fn new( + client: Arc, + warp_sync_config: WarpSyncConfig, + peer_pool: PeerPool, + ) -> Self { if client.info().finalized_state.is_some() { error!( target: LOG_TARGET, @@ -266,6 +266,7 @@ where peers: HashMap::new(), actions: vec![WarpSyncAction::Finished], result: None, + peer_pool, } } @@ -283,6 +284,7 @@ where peers: HashMap::new(), actions: Vec::new(), result: None, + peer_pool, } } @@ -355,6 +357,7 @@ where if let Some(peer) = self.peers.get_mut(peer_id) { peer.state = PeerState::Available; } + self.peer_pool.free_peer(&peer_id); let Phase::WarpProof { set_id, authorities, last_hash, warp_sync_provider } = &mut self.phase @@ -413,6 +416,7 @@ where if let Some(peer) = self.peers.get_mut(&peer_id) { peer.state = PeerState::Available; } + self.peer_pool.free_peer(&peer_id); let Phase::TargetBlock(header) = &mut self.phase else { debug!(target: LOG_TARGET, "Unexpected target block response from {peer_id}"); @@ -487,10 +491,10 @@ where targets.sort(); let median = targets[targets.len() / 2]; let threshold = std::cmp::max(median, min_best_number.unwrap_or(Zero::zero())); - // Find a random peer that is synced as much as peer majority and is above + // Find a random available peer that is synced as much as peer majority and is above // `min_best_number`. for (peer_id, peer) in self.peers.iter_mut() { - if peer.state.is_available() && peer.best_number >= threshold { + if peer.best_number >= threshold && self.peer_pool.try_reserve_peer(peer_id) { peer.state = new_state; return Some(*peer_id) } @@ -730,7 +734,7 @@ mod test { let client = mock_client_with_state(); let provider = MockWarpSyncProvider::::new(); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, Default::default()); // Warp sync instantly finishes let actions = warp_sync.actions().collect::>(); @@ -745,7 +749,7 @@ mod test { fn warp_sync_to_target_for_db_with_finalized_state_is_noop() { let client = mock_client_with_state(); let config = WarpSyncConfig::WaitForTarget; - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, Default::default()); // Warp sync instantly finishes let actions = warp_sync.actions().collect::>(); @@ -761,7 +765,7 @@ mod test { let client = mock_client_without_state(); let provider = MockWarpSyncProvider::::new(); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, Default::default()); // No actions are emitted. assert_eq!(warp_sync.actions().count(), 0) @@ -771,7 +775,7 @@ mod test { fn warp_sync_to_target_for_empty_db_doesnt_finish_instantly() { let client = mock_client_without_state(); let config = WarpSyncConfig::WaitForTarget; - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, Default::default()); // No actions are emitted. assert_eq!(warp_sync.actions().count(), 0) @@ -786,16 +790,21 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(Arc::new(client), config, peer_pool.clone()); // Warp sync is not started when there is not enough peers. for _ in 0..(MIN_PEERS_TO_START_WARP_SYNC - 1) { - warp_sync.add_peer(PeerId::random(), Hash::random(), 10); + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), 10); assert!(matches!(warp_sync.phase, Phase::WaitingForPeers { .. })) } // Now we have enough peers and warp sync is started. - warp_sync.add_peer(PeerId::random(), Hash::random(), 10); + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), 10); assert!(matches!(warp_sync.phase, Phase::WarpProof { .. })) } @@ -804,7 +813,7 @@ mod test { let client = mock_client_without_state(); let provider = MockWarpSyncProvider::::new(); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, Default::default()); assert!(warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None).is_none()); } @@ -828,10 +837,13 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(Arc::new(client), config, peer_pool.clone()); for best_number in 1..11 { - warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), best_number); } let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None); @@ -849,10 +861,13 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(Arc::new(client), config, peer_pool.clone()); for best_number in 1..11 { - warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), best_number); } let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10)); @@ -869,11 +884,14 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(Arc::new(client), config, peer_pool.clone()); // Make sure we have enough peers to make a request. for best_number in 1..11 { - warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), best_number); } // Manually set to another phase. @@ -892,11 +910,14 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(Arc::new(client), config, peer_pool.clone()); // Make sure we have enough peers to make a request. for best_number in 1..11 { - warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), best_number); } assert!(matches!(warp_sync.phase, Phase::WarpProof { .. })); @@ -923,11 +944,14 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(Arc::new(client), config, peer_pool.clone()); // Make sure we have enough peers to make requests. for best_number in 1..11 { - warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), best_number); } assert!(matches!(warp_sync.phase, Phase::WarpProof { .. })); @@ -937,6 +961,72 @@ mod test { assert!(warp_sync.warp_proof_request().is_none()); } + #[test] + fn warp_proof_request_reserves_peer() { + let client = mock_client_without_state(); + let mut provider = MockWarpSyncProvider::::new(); + provider + .expect_current_authorities() + .once() + .return_const(AuthorityList::default()); + let config = WarpSyncConfig::WithProvider(Arc::new(provider)); + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(Arc::new(client), config, peer_pool.clone()); + + // Make sure we have enough peers to make a request. + for best_number in 1..11 { + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), best_number); + } + assert!(matches!(warp_sync.phase, Phase::WarpProof { .. })); + + let (peer_id, _request) = warp_sync.warp_proof_request().unwrap(); + + // The peer is reserved. + assert!(!peer_pool.lock().available_peers().any(|p| *p.peer_id() == peer_id)); + } + + #[test] + fn warp_proof_response_frees_peer() { + let client = mock_client_without_state(); + let mut provider = MockWarpSyncProvider::::new(); + provider + .expect_current_authorities() + .once() + .return_const(AuthorityList::default()); + // It doesn't matter in this test if the warp proof verification succeeds. + provider.expect_verify().return_once(|_proof, _set_id, _authorities| { + Err(Box::new(std::io::Error::new(ErrorKind::Other, "test-verification-failure"))) + }); + let config = WarpSyncConfig::WithProvider(Arc::new(provider)); + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(Arc::new(client), config, peer_pool.clone()); + + // Make sure we have enough peers to make a request. + for best_number in 1..11 { + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), best_number); + } + assert!(matches!(warp_sync.phase, Phase::WarpProof { .. })); + + // Consume `SendWarpProofRequest` action. + let actions = warp_sync.actions().collect::>(); + assert_eq!(actions.len(), 1); + let WarpSyncAction::SendWarpProofRequest { peer_id: request_peer_id, .. } = actions[0] + else { + panic!("Invalid action"); + }; + // Peer we sent request to is now reserved. + assert!(!peer_pool.lock().available_peers().any(|p| *p.peer_id() == request_peer_id)); + + warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new())); + + // The peer we sent a request to and received a response from is now available. + assert!(peer_pool.lock().available_peers().any(|p| *p.peer_id() == request_peer_id)); + } + #[test] fn bad_warp_proof_response_drops_peer() { let client = mock_client_without_state(); @@ -950,11 +1040,14 @@ mod test { Err(Box::new(std::io::Error::new(ErrorKind::Other, "test-verification-failure"))) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(Arc::new(client), config, peer_pool.clone()); // Make sure we have enough peers to make a request. for best_number in 1..11 { - warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), best_number); } assert!(matches!(warp_sync.phase, Phase::WarpProof { .. })); @@ -991,11 +1084,14 @@ mod test { Ok(VerificationResult::Partial(set_id, authorities, Hash::random())) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(Arc::new(client), config, peer_pool.clone()); // Make sure we have enough peers to make a request. for best_number in 1..11 { - warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), best_number); } assert!(matches!(warp_sync.phase, Phase::WarpProof { .. })); @@ -1035,11 +1131,14 @@ mod test { Ok(VerificationResult::Complete(set_id, authorities, target_header)) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(client, config, peer_pool.clone()); // Make sure we have enough peers to make a request. for best_number in 1..11 { - warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), best_number); } assert!(matches!(warp_sync.phase, Phase::WarpProof { .. })); @@ -1068,11 +1167,14 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(Arc::new(client), config, peer_pool.clone()); // Make sure we have enough peers to make a request. for best_number in 1..11 { - warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), best_number); } // We are not in `Phase::TargetBlock` assert!(matches!(warp_sync.phase, Phase::WarpProof { .. })); @@ -1103,11 +1205,14 @@ mod test { Ok(VerificationResult::Complete(set_id, authorities, target_header)) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(client, config, peer_pool.clone()); // Make sure we have enough peers to make a request. for best_number in 1..11 { - warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), best_number); } // Manually set `TargetBlock` phase. @@ -1135,11 +1240,14 @@ mod test { .block; let target_header = target_block.header().clone(); let config = WarpSyncConfig::WaitForTarget; - let mut warp_sync = WarpSync::new(client, config); + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(client, config, peer_pool.clone()); // Make sure we have enough peers to make a request. for best_number in 1..11 { - warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), best_number); } // No actions generated so far. @@ -1173,17 +1281,15 @@ mod test { .build() .unwrap() .block; - let target_header = target_block.header().clone(); - // Warp proof is complete. - provider.expect_verify().return_once(move |_proof, set_id, authorities| { - Ok(VerificationResult::Complete(set_id, authorities, target_header)) - }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(client, config, peer_pool.clone()); // Make sure we have enough peers to make a request. for best_number in 1..11 { - warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), best_number); } // Manually set `TargetBlock` phase. @@ -1211,17 +1317,15 @@ mod test { .build() .unwrap() .block; - let target_header = target_block.header().clone(); - // Warp proof is complete. - provider.expect_verify().return_once(move |_proof, set_id, authorities| { - Ok(VerificationResult::Complete(set_id, authorities, target_header)) - }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(client, config, peer_pool.clone()); // Make sure we have enough peers to make a request. for best_number in 1..11 { - warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), best_number); } // Manually set `TargetBlock` phase. @@ -1265,17 +1369,15 @@ mod test { .unwrap(); let extra_block = extra_block_builder.build().unwrap().block; - let target_header = target_block.header().clone(); - // Warp proof is complete. - provider.expect_verify().return_once(move |_proof, set_id, authorities| { - Ok(VerificationResult::Complete(set_id, authorities, target_header)) - }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(client, config, peer_pool.clone()); // Make sure we have enough peers to make a request. for best_number in 1..11 { - warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), best_number); } // Manually set `TargetBlock` phase. @@ -1342,17 +1444,15 @@ mod test { .unwrap(); let wrong_block = wrong_block_builder.build().unwrap().block; - let target_header = target_block.header().clone(); - // Warp proof is complete. - provider.expect_verify().return_once(move |_proof, set_id, authorities| { - Ok(VerificationResult::Complete(set_id, authorities, target_header)) - }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(client, config, peer_pool.clone()); // Make sure we have enough peers to make a request. for best_number in 1..11 { - warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), best_number); } // Manually set `TargetBlock` phase. @@ -1395,17 +1495,15 @@ mod test { .push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6])) .unwrap(); let target_block = target_block_builder.build().unwrap().block; - let target_header = target_block.header().clone(); - // Warp proof is complete. - provider.expect_verify().return_once(move |_proof, set_id, authorities| { - Ok(VerificationResult::Complete(set_id, authorities, target_header)) - }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(client, config, peer_pool.clone()); // Make sure we have enough peers to make a request. for best_number in 1..11 { - warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), best_number); } // Manually set `TargetBlock` phase. @@ -1440,4 +1538,83 @@ mod test { assert_eq!(result.target_body, body); assert_eq!(result.target_justifications, justifications); } + + #[test] + fn target_block_request_reserves_peer() { + let client = Arc::new(TestClientBuilder::new().set_no_genesis().build()); + let target_block = BlockBuilderBuilder::new(&*client) + .on_parent_block(client.chain_info().best_hash) + .with_parent_block_number(client.chain_info().best_number) + .build() + .unwrap() + .build() + .unwrap() + .block; + let target_header = target_block.header().clone(); + let config = WarpSyncConfig::WaitForTarget; + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(client, config, peer_pool.clone()); + + // Make sure we have enough peers to make a request. + for best_number in 1..11 { + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), best_number); + } + + // No actions generated so far. + assert_eq!(warp_sync.actions().count(), 0); + + warp_sync.set_target_block(target_header); + assert!(matches!(warp_sync.phase, Phase::TargetBlock(_))); + + let (peer_id, _request) = warp_sync.target_block_request().unwrap(); + + // The peer requested is reserved. + assert!(!peer_pool.lock().available_peers().any(|p| *p.peer_id() == peer_id)); + } + + #[test] + fn target_block_response_frees_peer() { + let client = Arc::new(TestClientBuilder::new().set_no_genesis().build()); + let mut provider = MockWarpSyncProvider::::new(); + provider + .expect_current_authorities() + .once() + .return_const(AuthorityList::default()); + let target_block = BlockBuilderBuilder::new(&*client) + .on_parent_block(client.chain_info().best_hash) + .with_parent_block_number(client.chain_info().best_number) + .build() + .unwrap() + .build() + .unwrap() + .block; + let config = WarpSyncConfig::WithProvider(Arc::new(provider)); + let peer_pool = PeerPool::default(); + let mut warp_sync = WarpSync::new(client, config, peer_pool.clone()); + + // Make sure we have enough peers to make a request. + for best_number in 1..11 { + let peer_id = PeerId::random(); + peer_pool.add_peer(peer_id); + warp_sync.add_peer(peer_id, Hash::random(), best_number); + } + + // Manually set `TargetBlock` phase. + warp_sync.phase = Phase::TargetBlock(target_block.header().clone()); + + let (peer_id, request) = warp_sync.target_block_request().unwrap(); + + // The peer requested is reserved. + assert!(!peer_pool.lock().available_peers().any(|p| *p.peer_id() == peer_id)); + + // Dummy (empty) block response received. + let response = Vec::new(); + + let _ = warp_sync.on_block_response_inner(peer_id, request, response); + + // The peer we sent a request to and received a response from is now available. + assert!(peer_pool.lock().available_peers().any(|p| *p.peer_id() == peer_id)); + } }