Skip to content

Commit

Permalink
Allow pausing of Substrate sync when desired
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Aug 12, 2024
1 parent af29a24 commit 4670a2c
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 13 deletions.
8 changes: 7 additions & 1 deletion substrate/client/cli/src/params/network_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ use sc_service::{
config::{Multiaddr, MultiaddrWithPeerId},
ChainSpec, ChainType,
};
use std::{borrow::Cow, num::NonZeroUsize, path::PathBuf};
use std::{
borrow::Cow,
num::NonZeroUsize,
path::PathBuf,
sync::{atomic::AtomicBool, Arc},
};

/// Parameters used to create the network configuration.
#[derive(Debug, Clone, Args)]
Expand Down Expand Up @@ -278,6 +283,7 @@ impl NetworkParams {
yamux_window_size: None,
ipfs_server: self.ipfs_server,
sync_mode: self.sync.into(),
pause_sync: Arc::new(AtomicBool::new(false)),
network_backend: self.network_backend.into(),
}
}
Expand Down
6 changes: 5 additions & 1 deletion substrate/client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use std::{
path::{Path, PathBuf},
pin::Pin,
str::{self, FromStr},
sync::Arc,
sync::{atomic::AtomicBool, Arc},
};

/// Protocol name prefix, transmitted on the wire for legacy protocol names.
Expand Down Expand Up @@ -629,6 +629,9 @@ pub struct NetworkConfiguration {
/// Initial syncing mode.
pub sync_mode: SyncMode,

/// Whether to pause Substrate sync
pub pause_sync: Arc<AtomicBool>,

/// True if Kademlia random discovery should be enabled.
///
/// If true, the node will automatically randomly walk the DHT in order to find new peers.
Expand Down Expand Up @@ -698,6 +701,7 @@ impl NetworkConfiguration {
max_parallel_downloads: 5,
max_blocks_per_request: 64,
sync_mode: SyncMode::Full,
pause_sync: Arc::new(AtomicBool::new(false)),
enable_dht_random_walk: true,
allow_non_globals_in_dht: false,
kademlia_disjoint_query_paths: false,
Expand Down
2 changes: 2 additions & 0 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ where
N: NetworkBackend<B, <B as BlockT>::Hash>,
{
let mode = net_config.network_config.sync_mode;
let pause_sync = Arc::clone(&net_config.network_config.pause_sync);
let max_parallel_downloads = net_config.network_config.max_parallel_downloads;
let max_blocks_per_request =
if net_config.network_config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 {
Expand All @@ -335,6 +336,7 @@ where
};
let syncing_config = SyncingConfig {
mode,
pause_sync,
max_parallel_downloads,
max_blocks_per_request,
metrics_registry: metrics_registry.cloned(),
Expand Down
11 changes: 10 additions & 1 deletion substrate/client/network/sync/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ use sp_runtime::{
Justifications,
};
use state::{StateStrategy, StateStrategyAction};
use std::{collections::HashMap, sync::Arc};
use std::{
collections::HashMap,
sync::{atomic::AtomicBool, Arc},
};
use warp::{EncodedProof, WarpProofRequest, WarpSync, WarpSyncAction, WarpSyncConfig};

/// Corresponding `ChainSync` mode.
Expand All @@ -63,6 +66,8 @@ fn chain_sync_mode(sync_mode: SyncMode) -> ChainSyncMode {
pub struct SyncingConfig {
/// Syncing mode.
pub mode: SyncMode,
/// Whether to pause Substrate sync
pub pause_sync: Arc<AtomicBool>,
/// The number of parallel downloads to guard against slow peers.
pub max_parallel_downloads: u32,
/// Maximum number of blocks to request.
Expand Down Expand Up @@ -206,6 +211,7 @@ where
} else {
let chain_sync = ChainSync::new(
chain_sync_mode(config.mode),
config.pause_sync.clone(),
client.clone(),
config.max_parallel_downloads,
config.max_blocks_per_request,
Expand Down Expand Up @@ -548,6 +554,7 @@ where
);
let chain_sync = match ChainSync::new(
chain_sync_mode(self.config.mode),
self.config.pause_sync.clone(),
self.client.clone(),
self.config.max_parallel_downloads,
self.config.max_blocks_per_request,
Expand Down Expand Up @@ -576,6 +583,7 @@ where
}
let chain_sync = match ChainSync::new(
chain_sync_mode(self.config.mode),
self.config.pause_sync.clone(),
self.client.clone(),
self.config.max_parallel_downloads,
self.config.max_blocks_per_request,
Expand Down Expand Up @@ -640,6 +648,7 @@ mod test {
// Initialize syncing strategy.
let config = SyncingConfig {
mode: SyncMode::Warp,
pause_sync: Arc::new(AtomicBool::new(false)),
max_parallel_downloads: 3,
max_blocks_per_request: 64,
metrics_registry: None,
Expand Down
13 changes: 12 additions & 1 deletion substrate/client/network/sync/src/strategy/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ use sp_runtime::{
use std::{
collections::{HashMap, HashSet},
ops::Range,
sync::Arc,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};

#[cfg(test)]
Expand Down Expand Up @@ -258,6 +261,8 @@ pub struct ChainSync<B: BlockT, Client> {
best_queued_hash: B::Hash,
/// Current mode (full/light)
mode: ChainSyncMode,
/// Whether to pause Substrate sync
pause_sync: Arc<AtomicBool>,
/// Any extra justification requests.
extra_justifications: ExtraRequests<B>,
/// A set of hashes of blocks that are being downloaded or have been
Expand Down Expand Up @@ -369,6 +374,7 @@ where
/// Create a new instance.
pub fn new(
mode: ChainSyncMode,
pause_sync: Arc<AtomicBool>,
client: Arc<Client>,
max_parallel_downloads: u32,
max_blocks_per_request: u32,
Expand All @@ -383,6 +389,7 @@ where
best_queued_number: Zero::zero(),
extra_justifications: ExtraRequests::new("justification"),
mode,
pause_sync,
queue_blocks: Default::default(),
fork_targets: Default::default(),
allowed_requests: Default::default(),
Expand Down Expand Up @@ -1520,6 +1527,10 @@ where

/// Get block requests scheduled by sync to be sent out.
fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
if self.pause_sync.load(Ordering::Acquire) {
return Vec::new();
}

if self.allowed_requests.is_empty() || self.state_sync.is_some() {
return Vec::new()
}
Expand Down
18 changes: 9 additions & 9 deletions substrate/client/network/sync/src/strategy/chain_sync/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn processes_empty_response_on_justification_request_for_unknown_block() {
let peer_id = PeerId::random();

let mut sync =
ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 64, None, std::iter::empty())
ChainSync::new(ChainSyncMode::Full, Default::default(), client.clone(), 1, 64, None, std::iter::empty())
.unwrap();

let (a1_hash, a1_number) = {
Expand Down Expand Up @@ -96,7 +96,7 @@ fn restart_doesnt_affect_peers_downloading_finality_data() {
// we request max 8 blocks to always initiate block requests to both peers for the test to be
// deterministic
let mut sync =
ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 8, None, std::iter::empty())
ChainSync::new(ChainSyncMode::Full, Default::default(), client.clone(), 1, 8, None, std::iter::empty())
.unwrap();

let peer_id1 = PeerId::random();
Expand Down Expand Up @@ -292,7 +292,7 @@ fn do_ancestor_search_when_common_block_to_best_queued_gap_is_to_big() {
let info = client.info();

let mut sync =
ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None, std::iter::empty())
ChainSync::new(ChainSyncMode::Full, Default::default(), client.clone(), 5, 64, None, std::iter::empty())
.unwrap();

let peer_id1 = PeerId::random();
Expand Down Expand Up @@ -440,7 +440,7 @@ fn can_sync_huge_fork() {
let info = client.info();

let mut sync =
ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None, std::iter::empty())
ChainSync::new(ChainSyncMode::Full, Default::default(), client.clone(), 5, 64, None, std::iter::empty())
.unwrap();

let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2 - 1].clone();
Expand Down Expand Up @@ -575,7 +575,7 @@ fn syncs_fork_without_duplicate_requests() {
let info = client.info();

let mut sync =
ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None, std::iter::empty())
ChainSync::new(ChainSyncMode::Full, Default::default(), client.clone(), 5, 64, None, std::iter::empty())
.unwrap();

let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2 - 1].clone();
Expand Down Expand Up @@ -712,7 +712,7 @@ fn removes_target_fork_on_disconnect() {
let blocks = (0..3).map(|_| build_block(&mut client, None, false)).collect::<Vec<_>>();

let mut sync =
ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 64, None, std::iter::empty())
ChainSync::new(ChainSyncMode::Full, Default::default(), client.clone(), 1, 64, None, std::iter::empty())
.unwrap();

let peer_id1 = PeerId::random();
Expand All @@ -739,7 +739,7 @@ fn can_import_response_with_missing_blocks() {
let empty_client = Arc::new(TestClientBuilder::new().build());

let mut sync =
ChainSync::new(ChainSyncMode::Full, empty_client.clone(), 1, 64, None, std::iter::empty())
ChainSync::new(ChainSyncMode::Full, Default::default(), empty_client.clone(), 1, 64, None, std::iter::empty())
.unwrap();

let peer_id1 = PeerId::random();
Expand Down Expand Up @@ -772,7 +772,7 @@ fn ancestor_search_repeat() {
fn sync_restart_removes_block_but_not_justification_requests() {
let mut client = Arc::new(TestClientBuilder::new().build());
let mut sync =
ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 64, None, std::iter::empty())
ChainSync::new(ChainSyncMode::Full, Default::default(), client.clone(), 1, 64, None, std::iter::empty())
.unwrap();

let peers = vec![PeerId::random(), PeerId::random()];
Expand Down Expand Up @@ -916,7 +916,7 @@ fn request_across_forks() {
};

let mut sync =
ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None, std::iter::empty())
ChainSync::new(ChainSyncMode::Full, Default::default(), client.clone(), 5, 64, None, std::iter::empty())
.unwrap();

// Add the peers, all at the common ancestor 100.
Expand Down

0 comments on commit 4670a2c

Please sign in to comment.