Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Share peers between syncing strategies #2814

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
796112d
Defer new peer actions in `ChainSync`
dmitry-markin Dec 21, 2023
125f9a7
Introduce `PeerPool` for per-strategy peer allocation
dmitry-markin Dec 25, 2023
be88848
Prepare `SyncingStrategy` for parallel strategies
dmitry-markin Dec 25, 2023
6ed85cd
Reserve peers for `WarpSync` strategy in `PeerPool`
dmitry-markin Dec 26, 2023
3aaa9ef
Reserve peers for `StateStrategy` in `PeerPool`
dmitry-markin Dec 26, 2023
02259bc
WIP: Reserve peers for `ChainSync` in `PeerPool`
dmitry-markin Dec 26, 2023
264723b
minor: fix compilation + rustfmt
dmitry-markin Jan 15, 2024
4bb9a7b
Apply suggestions from code review
dmitry-markin Jan 15, 2024
a1cec8c
Partially revert "Apply suggestions from code review"
dmitry-markin Jan 15, 2024
e6c2b3a
minor: docs
dmitry-markin Jan 15, 2024
1db2ad1
Apply review suggestions
dmitry-markin Jan 15, 2024
17f6bbe
Simplify peer allocation from `PeerPool` for requests
dmitry-markin Jan 19, 2024
fd53c44
Update `WarpStrategy` and `StateStrategy` tests
dmitry-markin Jan 19, 2024
fa64365
WIP: fix `ChainSync` tests
dmitry-markin Jan 19, 2024
272fbc6
Fix sync restart and `ChainSync` tests
dmitry-markin Jan 19, 2024
d8008e6
Respect `PeerPool` when making extra justification requests
dmitry-markin Jan 26, 2024
06295e2
Cancel a stale state request when restarting the sync
dmitry-markin Jan 26, 2024
4913702
Merge remote-tracking branch 'origin/master' into dm-gap-sync-strategy
dmitry-markin Jan 29, 2024
d4edc3f
Merge remote-tracking branch 'origin/master' into dm-gap-sync-strategy
dmitry-markin Jan 29, 2024
cf93e7f
Simplify startegy actions conversion into top-level `SyncingAction`
dmitry-markin Jan 29, 2024
0add17e
Add PRDoc
dmitry-markin Jan 29, 2024
9800af8
Test peers reservation in `WarpSync`
dmitry-markin Jan 29, 2024
56f92a8
Test peers reservation by `StateStrategy`
dmitry-markin Jan 29, 2024
cd6cfe3
Test `PeerPool`
dmitry-markin Jan 29, 2024
7d7f936
WIP: test peer reservation by `ChainSync`
dmitry-markin Jan 29, 2024
2605f2a
Test peer reservation in `PeerPool` by `ChainSync`
dmitry-markin Jan 30, 2024
acc7a76
Apply suggestions from code review
dmitry-markin Jan 31, 2024
da9dd82
Apply suggestions from code review
dmitry-markin Jan 31, 2024
94a07a6
Apply review suggestions
dmitry-markin Jan 31, 2024
646f0a5
Merge remote-tracking branch 'origin/master' into dm-gap-sync-strategy
dmitry-markin Feb 1, 2024
7aa59a3
minor: log message
dmitry-markin Feb 1, 2024
8d6a6d7
Rename `extra_requests.rs` -> `justification_requests.rs`, add docs
dmitry-markin Feb 1, 2024
0254680
Hide mutex behind `PeerPool` interface
dmitry-markin Feb 1, 2024
62dea92
Simplify `schedule_next_peer()`
dmitry-markin Feb 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions prdoc/pr_2814.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Share peers between syncing strategies

doc:
- audience: Node Dev
description: |
New struct `PeerPool` is added to `SyncingStrategy` to allow running syncing strategies
in parellel and sharing peers. Available peers (with no active requests) are retrieved by
calling `PeerPool::available_peers()`, and a peer is reserved for a request by a syncing
strategy using `AvailablePeer::reserve()`.

crates:
- name: sc-network-sync
1 change: 1 addition & 0 deletions substrate/client/network/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ futures-timer = "3.0.2"
libp2p = "0.51.4"
log = "0.4.17"
mockall = "0.11.3"
parking_lot = "0.12.1"
prost = "0.12"
schnellru = "0.2.1"
smallvec = "1.11.0"
Expand Down
60 changes: 24 additions & 36 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,6 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// Syncing strategy.
strategy: SyncingStrategy<B, Client>,

/// Syncing configuration for startegies.
syncing_config: SyncingConfig,

/// Blockchain client.
client: Arc<Client>,

Expand Down Expand Up @@ -441,8 +438,7 @@ where
.map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse());

// Initialize syncing strategy.
let strategy =
SyncingStrategy::new(syncing_config.clone(), client.clone(), warp_sync_config)?;
let strategy = SyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?;

let block_announce_protocol_name = block_announce_config.protocol_name().clone();
let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
Expand Down Expand Up @@ -471,7 +467,6 @@ where
roles,
client,
strategy,
syncing_config,
network_service,
peers: HashMap::new(),
block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
Expand Down Expand Up @@ -661,8 +656,15 @@ where
Some(event) => self.process_notification_event(event),
None => return,
},
warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused =>
self.pass_warp_sync_target_block_header(warp_target_block_header),
warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused => {
if let Err(_) = self.pass_warp_sync_target_block_header(warp_target_block_header) {
error!(
target: LOG_TARGET,
"Failed to set warp sync target block header, terminating `SyncingEngine`.",
);
return
}
},
response_event = self.pending_responses.select_next_some() =>
self.process_response_event(response_event),
validation_result = self.block_announce_validator.select_next_some() =>
Expand All @@ -675,14 +677,17 @@ where

// Process actions requested by a syncing strategy.
if let Err(e) = self.process_strategy_actions() {
error!("Terminating `SyncingEngine` due to fatal error: {e:?}");
error!(
target: LOG_TARGET,
"Terminating `SyncingEngine` due to fatal error: {e:?}.",
);
return
}
}
}

fn process_strategy_actions(&mut self) -> Result<(), ClientError> {
for action in self.strategy.actions() {
for action in self.strategy.actions()? {
match action {
SyncingAction::SendBlockRequest { peer_id, request } => {
// Sending block request implies dropping obsolete pending response as we are
Expand All @@ -699,7 +704,7 @@ where
removed,
)
},
SyncingAction::CancelBlockRequest { peer_id } => {
SyncingAction::CancelRequest { peer_id } => {
let removed = self.pending_responses.remove(&peer_id);

trace!(
Expand Down Expand Up @@ -753,20 +758,8 @@ where
number,
)
},
SyncingAction::Finished => {
let connected_peers = self.peers.iter().filter_map(|(peer_id, peer)| {
peer.info.roles.is_full().then_some((
*peer_id,
peer.info.best_hash,
peer.info.best_number,
))
});
self.strategy.switch_to_next(
self.syncing_config.clone(),
self.client.clone(),
connected_peers,
)?;
},
// Nothing to do, this is handled internally by `SyncingStrategy`.
SyncingAction::Finished => {},
}
}

Expand Down Expand Up @@ -948,23 +941,18 @@ where
}
}

fn pass_warp_sync_target_block_header(&mut self, header: Result<B::Header, oneshot::Canceled>) {
fn pass_warp_sync_target_block_header(
&mut self,
header: Result<B::Header, oneshot::Canceled>,
) -> Result<(), ()> {
match header {
Ok(header) =>
if let SyncingStrategy::WarpSyncStrategy(warp_sync) = &mut self.strategy {
warp_sync.set_target_block(header);
} else {
error!(
target: LOG_TARGET,
"Cannot set warp sync target block: no warp sync strategy is active."
);
debug_assert!(false);
},
Ok(header) => self.strategy.set_warp_sync_target_block_header(header),
Err(err) => {
error!(
target: LOG_TARGET,
"Failed to get target block for warp sync. Error: {err:?}",
);
Err(())
},
}
}
Expand Down
83 changes: 51 additions & 32 deletions substrate/client/network/sync/src/extra_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::{
request_metrics::Metrics,
strategy::chain_sync::{PeerSync, PeerSyncState},
LOG_TARGET,
peer_pool::PeerPool, request_metrics::Metrics, strategy::chain_sync::PeerSync, LOG_TARGET,
dmitry-markin marked this conversation as resolved.
Show resolved Hide resolved
};
use fork_tree::ForkTree;
use libp2p::PeerId;
use log::{debug, trace, warn};
use log::{debug, error, trace, warn};
use parking_lot::Mutex;
use sp_blockchain::Error as ClientError;
use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
use std::{
collections::{HashMap, HashSet, VecDeque},
sync::Arc,
time::{Duration, Instant},
};

Expand Down Expand Up @@ -288,6 +288,7 @@ impl<'a, B: BlockT> Matcher<'a, B> {
pub(crate) fn next(
&mut self,
peers: &HashMap<PeerId, PeerSync<B>>,
peer_pool: &Arc<Mutex<PeerPool>>,
altonen marked this conversation as resolved.
Show resolved Hide resolved
) -> Option<(PeerId, ExtraRequest<B>)> {
if self.remaining == 0 {
return None
Expand All @@ -299,36 +300,41 @@ impl<'a, B: BlockT> Matcher<'a, B> {
}

while let Some(request) = self.extras.pending_requests.pop_front() {
for (peer, sync) in
peers.iter().filter(|(_, sync)| sync.state == PeerSyncState::Available)
{
for mut available_peer in peer_pool.lock().available_peers() {
let peer_id = available_peer.peer_id();
let Some(sync) = peers.get(peer_id) else {
error!(
target: LOG_TARGET,
"Peer {peer_id} is available, but not known to `ChainSync`",
);
debug_assert!(false);
continue
};
// only ask peers that have synced at least up to the block number that we're asking
// the extra for
if sync.best_number < request.1 {
continue
}
// don't request to any peers that already have pending requests
if self.extras.active_requests.contains_key(peer) {
continue
}
// only ask if the same request has not failed for this peer before
if self
.extras
.failed_requests
.get(&request)
.map(|rr| rr.iter().any(|i| &i.0 == peer))
.map(|rr| rr.iter().any(|i| &i.0 == peer_id))
.unwrap_or(false)
{
continue
}
self.extras.active_requests.insert(*peer, request);

available_peer.reserve();
self.extras.active_requests.insert(*peer_id, request);

trace!(target: LOG_TARGET,
"Sending {} request to {:?} for {:?}",
self.extras.request_type_name, peer, request,
self.extras.request_type_name, peer_id, request,
);

return Some((*peer, request))
return Some((*peer_id, request))
}

self.extras.pending_requests.push_back(request);
Expand All @@ -346,19 +352,26 @@ impl<'a, B: BlockT> Matcher<'a, B> {
#[cfg(test)]
mod tests {
use super::*;
use crate::strategy::chain_sync::PeerSync;
use crate::{
peer_pool::PeerPool,
strategy::chain_sync::{PeerSync, PeerSyncState},
};
use parking_lot::Mutex;
use quickcheck::{Arbitrary, Gen, QuickCheck};
use sp_blockchain::Error as ClientError;
use sp_test_primitives::{Block, BlockNumber, Hash};
use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

#[test]
fn requests_are_processed_in_order() {
fn property(mut peers: ArbitraryPeers) {
fn property(mut ap: ArbitraryPeers) {
let mut requests = ExtraRequests::<Block>::new("test");

let num_peers_available =
peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
ap.peers.values().filter(|s| s.state == PeerSyncState::Available).count();

for i in 0..num_peers_available {
requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0]))
Expand All @@ -368,9 +381,9 @@ mod tests {
let mut m = requests.matcher();

for p in &pending {
let (peer, r) = m.next(&peers.0).unwrap();
let (peer, r) = m.next(&ap.peers, &ap.peer_pool).unwrap();
assert_eq!(p, &r);
peers.0.get_mut(&peer).unwrap().state =
ap.peers.get_mut(&peer).unwrap().state =
PeerSyncState::DownloadingJustification(r.0);
}
}
Expand All @@ -397,19 +410,19 @@ mod tests {

#[test]
fn disconnecting_implies_rescheduling() {
fn property(mut peers: ArbitraryPeers) -> bool {
fn property(mut ap: ArbitraryPeers) -> bool {
let mut requests = ExtraRequests::<Block>::new("test");

let num_peers_available =
peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
ap.peers.values().filter(|s| s.state == PeerSyncState::Available).count();

for i in 0..num_peers_available {
requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0]))
}

let mut m = requests.matcher();
while let Some((peer, r)) = m.next(&peers.0) {
peers.0.get_mut(&peer).unwrap().state =
while let Some((peer, r)) = m.next(&ap.peers, &ap.peer_pool) {
ap.peers.get_mut(&peer).unwrap().state =
PeerSyncState::DownloadingJustification(r.0);
}

Expand All @@ -433,19 +446,19 @@ mod tests {

#[test]
fn no_response_reschedules() {
fn property(mut peers: ArbitraryPeers) {
fn property(mut ap: ArbitraryPeers) {
let mut requests = ExtraRequests::<Block>::new("test");

let num_peers_available =
peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
ap.peers.values().filter(|s| s.state == PeerSyncState::Available).count();

for i in 0..num_peers_available {
requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0]))
}

let mut m = requests.matcher();
while let Some((peer, r)) = m.next(&peers.0) {
peers.0.get_mut(&peer).unwrap().state =
while let Some((peer, r)) = m.next(&ap.peers, &ap.peer_pool) {
ap.peers.get_mut(&peer).unwrap().state =
PeerSyncState::DownloadingJustification(r.0);
}

Expand Down Expand Up @@ -570,16 +583,22 @@ mod tests {
}

#[derive(Debug, Clone)]
struct ArbitraryPeers(HashMap<PeerId, PeerSync<Block>>);
struct ArbitraryPeers {
peers: HashMap<PeerId, PeerSync<Block>>,
peer_pool: Arc<Mutex<PeerPool>>,
}

impl Arbitrary for ArbitraryPeers {
fn arbitrary(g: &mut Gen) -> Self {
let mut peers = HashMap::with_capacity(g.size());
let peer_pool = Arc::new(Mutex::new(PeerPool::default()));
for _ in 0..g.size() {
let ps = ArbitraryPeerSync::arbitrary(g).0;
peers.insert(ps.peer_id, ps);
let peer_id = ps.peer_id;
peers.insert(peer_id, ps);
peer_pool.lock().add_peer(peer_id);
}
ArbitraryPeers(peers)
ArbitraryPeers { peers, peer_pool }
}
}
}
1 change: 1 addition & 0 deletions substrate/client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub use types::{SyncEvent, SyncEventStream, SyncState, SyncStatus, SyncStatusPro
mod block_announce_validator;
mod extra_requests;
mod futures_stream;
mod peer_pool;
mod pending_responses;
mod request_metrics;
mod schema;
Expand Down
Loading
Loading