From 19f37defaba0cad14d49a5ba0e0a1cff55c6904e Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 26 Jun 2023 21:16:44 +0300 Subject: [PATCH 1/3] Unify `SyncMode` data structures under one --- client/cli/src/arg_enums.rs | 12 ++++--- client/network/common/src/sync.rs | 54 ++++++++++++++-------------- client/network/src/config.rs | 36 +------------------ client/network/sync/src/engine.rs | 59 ++++--------------------------- client/network/sync/src/lib.rs | 27 -------------- client/network/sync/src/mock.rs | 1 - client/network/test/src/lib.rs | 2 +- client/network/test/src/sync.rs | 4 +-- client/service/src/builder.rs | 5 +-- client/service/src/config.rs | 2 +- 10 files changed, 50 insertions(+), 152 deletions(-) diff --git a/client/cli/src/arg_enums.rs b/client/cli/src/arg_enums.rs index 89cb4500803b9..982979605a32d 100644 --- a/client/cli/src/arg_enums.rs +++ b/client/cli/src/arg_enums.rs @@ -258,10 +258,14 @@ impl Into for SyncMode { fn into(self) -> sc_network::config::SyncMode { match self { SyncMode::Full => sc_network::config::SyncMode::Full, - SyncMode::Fast => - sc_network::config::SyncMode::Fast { skip_proofs: false, storage_chain_mode: false }, - SyncMode::FastUnsafe => - sc_network::config::SyncMode::Fast { skip_proofs: true, storage_chain_mode: false }, + SyncMode::Fast => sc_network::config::SyncMode::LightState { + skip_proofs: false, + storage_chain_mode: false, + }, + SyncMode::FastUnsafe => sc_network::config::SyncMode::LightState { + skip_proofs: true, + storage_chain_mode: false, + }, SyncMode::Warp => sc_network::config::SyncMode::Warp, } } diff --git a/client/network/common/src/sync.rs b/client/network/common/src/sync.rs index 8f88cad83f9b7..1c8bcfce39270 100644 --- a/client/network/common/src/sync.rs +++ b/client/network/common/src/sync.rs @@ -191,32 +191,43 @@ pub enum PollBlockAnnounceValidation { /// The announcement. announce: BlockAnnounce, }, - /// The announcement header should be imported. - ImportHeader { - /// Who sent the processed block announcement? - who: PeerId, - /// Was this their new best block? - is_best: bool, - /// The announcement. - announce: BlockAnnounce, - }, /// The block announcement should be skipped. Skip, } -/// Operation mode. -#[derive(Debug, PartialEq, Eq)] +/// Sync operation mode. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] pub enum SyncMode { - // Sync headers only - Light, - // Sync headers and block bodies + /// Full block download and verification. Full, - // Sync headers and the last finalied state - LightState { storage_chain_mode: bool, skip_proofs: bool }, - // Warp sync mode. + /// Download blocks and the latest state. + LightState { + /// Skip state proof download and verification. + skip_proofs: bool, + /// Download indexed transactions for recent blocks. + storage_chain_mode: bool, + }, + /// Warp sync - verify authority set transitions and the latest state. Warp, } +impl SyncMode { + /// Returns `true` if `self` is [`Self::Warp`]. + pub fn is_warp(&self) -> bool { + matches!(self, Self::Warp) + } + + /// Returns `true` if `self` is [`Self::LightState`]. + pub fn light_state(&self) -> bool { + matches!(self, Self::LightState { .. }) + } +} + +impl Default for SyncMode { + fn default() -> Self { + Self::Full + } +} #[derive(Debug)] pub struct Metrics { pub queued_blocks: u32, @@ -361,12 +372,6 @@ pub trait ChainSync: Send { response: BlockResponse, ) -> Result, BadPeer>; - /// Procss received block data. - fn process_block_response_data( - &mut self, - blocks_to_import: Result, BadPeer>, - ); - /// Handle a response from the remote to a justification request that we made. /// /// `request` must be the original request that triggered `response`. @@ -406,9 +411,6 @@ pub trait ChainSync: Send { /// [`ChainSync::push_block_announce_validation`]. /// /// This should be polled until it returns [`Poll::Pending`]. - /// - /// If [`PollBlockAnnounceValidation::ImportHeader`] is returned, then the caller MUST try to - /// import passed header (call `on_block_data`). The network request isn't sent in this case. fn poll_block_announce_validation( &mut self, cx: &mut std::task::Context<'_>, diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 68fad74cbe0eb..6855fbae24326 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -37,7 +37,7 @@ use zeroize::Zeroize; pub use sc_network_common::{ role::{Role, Roles}, - sync::warp::WarpSyncProvider, + sync::{warp::WarpSyncProvider, SyncMode}, ExHashT, }; use sc_utils::mpsc::TracingUnboundedSender; @@ -275,40 +275,6 @@ impl NonReservedPeerMode { } } -/// Sync operation mode. -#[derive(Copy, Clone, Debug, Eq, PartialEq)] -pub enum SyncMode { - /// Full block download and verification. - Full, - /// Download blocks and the latest state. - Fast { - /// Skip state proof download and verification. - skip_proofs: bool, - /// Download indexed transactions for recent blocks. - storage_chain_mode: bool, - }, - /// Warp sync - verify authority set transitions and the latest state. - Warp, -} - -impl SyncMode { - /// Returns if `self` is [`Self::Warp`]. - pub fn is_warp(&self) -> bool { - matches!(self, Self::Warp) - } - - /// Returns if `self` is [`Self::Fast`]. - pub fn is_fast(&self) -> bool { - matches!(self, Self::Fast { .. }) - } -} - -impl Default for SyncMode { - fn default() -> Self { - Self::Full - } -} - /// The configuration of a node's secret key, describing the type of key /// and how it is obtained. A node's identity keypair is the result of /// the evaluation of the node key configuration. diff --git a/client/network/sync/src/engine.rs b/client/network/sync/src/engine.rs index bd208be5ff774..d12a67211c296 100644 --- a/client/network/sync/src/engine.rs +++ b/client/network/sync/src/engine.rs @@ -37,22 +37,16 @@ use prometheus_endpoint::{ use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider}; use sc_consensus::import_queue::ImportQueueService; use sc_network::{ - config::{ - FullNetworkConfiguration, NonDefaultSetConfig, ProtocolId, SyncMode as SyncOperationMode, - }, + config::{FullNetworkConfiguration, NonDefaultSetConfig, ProtocolId}, utils::LruHashSet, NotificationsSink, ProtocolName, ReputationChange, }; use sc_network_common::{ role::Roles, sync::{ - message::{ - generic::{BlockData, BlockResponse}, - BlockAnnounce, BlockAnnouncesHandshake, BlockState, - }, + message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState}, warp::WarpSyncParams, BadPeer, ChainSync as ChainSyncT, ExtendedPeerInfo, PollBlockAnnounceValidation, SyncEvent, - SyncMode, }, }; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; @@ -297,12 +291,7 @@ where rx: sc_utils::mpsc::TracingUnboundedReceiver>, force_synced: bool, ) -> Result<(Self, SyncingService, NonDefaultSetConfig), ClientError> { - let mode = match net_config.network_config.sync_mode { - SyncOperationMode::Full => SyncMode::Full, - SyncOperationMode::Fast { skip_proofs, storage_chain_mode } => - SyncMode::LightState { skip_proofs, storage_chain_mode }, - SyncOperationMode::Warp => SyncMode::Warp, - }; + let mode = net_config.network_config.sync_mode; let max_parallel_downloads = net_config.network_config.max_parallel_downloads; let max_blocks_per_request = if net_config.network_config.max_blocks_per_request > crate::MAX_BLOCKS_IN_RESPONSE as u32 @@ -488,8 +477,8 @@ where &mut self, validation_result: PollBlockAnnounceValidation, ) { - let (header, _is_best, who) = match validation_result { - PollBlockAnnounceValidation::Skip => return, + match validation_result { + PollBlockAnnounceValidation::Skip => {}, PollBlockAnnounceValidation::Nothing { is_best: _, who, announce } => { self.update_peer_info(&who); @@ -498,19 +487,6 @@ where self.block_announce_data_cache.put(announce.header.hash(), data); } } - - return - }, - PollBlockAnnounceValidation::ImportHeader { announce, is_best, who } => { - self.update_peer_info(&who); - - if let Some(data) = announce.data { - if !data.is_empty() { - self.block_announce_data_cache.put(announce.header.hash(), data); - } - } - - (announce.header, is_best, who) }, PollBlockAnnounceValidation::Failure { who, disconnect } => { if disconnect { @@ -519,31 +495,8 @@ where } self.network_service.report_peer(who, rep::BAD_BLOCK_ANNOUNCEMENT); - return }, - }; - - // to import header from announced block let's construct response to request that normally - // would have been sent over network (but it is not in our case) - let blocks_to_import = self.chain_sync.on_block_data( - &who, - None, - BlockResponse { - id: 0, - blocks: vec![BlockData { - hash: header.hash(), - header: Some(header), - body: None, - indexed_body: None, - receipt: None, - message_queue: None, - justification: None, - justifications: None, - }], - }, - ); - - self.chain_sync.process_block_response_data(blocks_to_import); + } } /// Push a block announce validation. diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index d5686c8d614c3..6cbb144bd1c82 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -1015,19 +1015,6 @@ where Ok(self.validate_and_queue_blocks(new_blocks, gap)) } - fn process_block_response_data(&mut self, blocks_to_import: Result, BadPeer>) { - match blocks_to_import { - Ok(OnBlockData::Import(origin, blocks)) => self.import_blocks(origin, blocks), - Ok(OnBlockData::Request(peer, req)) => self.send_block_request(peer, req), - Ok(OnBlockData::Continue) => {}, - Err(BadPeer(id, repu)) => { - self.network_service - .disconnect_peer(id, self.block_announce_protocol_name.clone()); - self.network_service.report_peer(id, repu); - }, - } - } - fn on_block_justification( &mut self, who: PeerId, @@ -1438,7 +1425,6 @@ where match self.mode { SyncMode::Full => BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY, - SyncMode::Light => BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION, SyncMode::LightState { storage_chain_mode: false, .. } | SyncMode::Warp => BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY, SyncMode::LightState { storage_chain_mode: true, .. } => @@ -1451,7 +1437,6 @@ where fn skip_execution(&self) -> bool { match self.mode { SyncMode::Full => false, - SyncMode::Light => true, SyncMode::LightState { .. } => true, SyncMode::Warp => true, } @@ -1703,18 +1688,6 @@ where return PollBlockAnnounceValidation::Nothing { is_best, who, announce } } - let requires_additional_data = self.mode != SyncMode::Light || !known_parent; - if !requires_additional_data { - trace!( - target: "sync", - "Importing new header announced from {}: {} {:?}", - who, - hash, - announce.header, - ); - return PollBlockAnnounceValidation::ImportHeader { is_best, announce, who } - } - if self.status().state == SyncState::Idle { trace!( target: "sync", diff --git a/client/network/sync/src/mock.rs b/client/network/sync/src/mock.rs index c041c81263e5e..6ba087c8cf45a 100644 --- a/client/network/sync/src/mock.rs +++ b/client/network/sync/src/mock.rs @@ -62,7 +62,6 @@ mockall::mock! { request: Option>, response: BlockResponse, ) -> Result, BadPeer>; - fn process_block_response_data(&mut self, blocks_to_import: Result, BadPeer>); fn on_block_justification( &mut self, who: PeerId, diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 08869da2619f9..ee98e5e04736b 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -770,7 +770,7 @@ where *genesis_extra_storage = storage; } - if matches!(config.sync_mode, SyncMode::Fast { .. } | SyncMode::Warp) { + if matches!(config.sync_mode, SyncMode::LightState { .. } | SyncMode::Warp) { test_client_builder = test_client_builder.set_no_genesis(); } let backend = test_client_builder.backend(); diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index 81707445dc9d3..7c6e341c30fa5 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -1132,7 +1132,7 @@ async fn syncs_state() { let mut config_two = FullPeerConfig::default(); config_two.extra_storage = Some(genesis_storage); config_two.sync_mode = - SyncMode::Fast { skip_proofs: *skip_proofs, storage_chain_mode: false }; + SyncMode::LightState { skip_proofs: *skip_proofs, storage_chain_mode: false }; net.add_full_peer_with_config(config_two); let hashes = net.peer(0).push_blocks(64, false); // Wait for peer 1 to sync header chain. @@ -1175,7 +1175,7 @@ async fn syncs_indexed_blocks() { net.add_full_peer_with_config(FullPeerConfig { storage_chain: true, ..Default::default() }); net.add_full_peer_with_config(FullPeerConfig { storage_chain: true, - sync_mode: SyncMode::Fast { skip_proofs: false, storage_chain_mode: true }, + sync_mode: SyncMode::LightState { skip_proofs: false, storage_chain_mode: true }, ..Default::default() }); net.peer(0).generate_blocks_at( diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 5a0136bf5edcc..d9f4569cd5851 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -227,7 +227,7 @@ where wasm_runtime_overrides: config.wasm_runtime_overrides.clone(), no_genesis: matches!( config.network.sync_mode, - SyncMode::Fast { .. } | SyncMode::Warp { .. } + SyncMode::LightState { .. } | SyncMode::Warp { .. } ), wasm_runtime_substitutes, }, @@ -799,7 +799,8 @@ where if client.requires_full_sync() { match config.network.sync_mode { - SyncMode::Fast { .. } => return Err("Fast sync doesn't work for archive nodes".into()), + SyncMode::LightState { .. } => + return Err("Fast sync doesn't work for archive nodes".into()), SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()), SyncMode::Full => {}, } diff --git a/client/service/src/config.rs b/client/service/src/config.rs index c0fb2dc9c4c70..52e17c95e6783 100644 --- a/client/service/src/config.rs +++ b/client/service/src/config.rs @@ -234,7 +234,7 @@ impl Configuration { /// Returns true if the genesis state writting will be skipped while initializing the genesis /// block. pub fn no_genesis(&self) -> bool { - matches!(self.network.sync_mode, SyncMode::Fast { .. } | SyncMode::Warp { .. }) + matches!(self.network.sync_mode, SyncMode::LightState { .. } | SyncMode::Warp { .. }) } /// Returns the database config for creating the backend. From 2e289a1e0e4808503d896f573af32b2a2e501e55 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 26 Jun 2023 21:08:19 +0300 Subject: [PATCH 2/3] Make syncing mode swappable on the fly --- Cargo.lock | 10 +++ client/cli/Cargo.toml | 1 + client/cli/src/params/network_params.rs | 5 +- client/network/Cargo.toml | 1 + client/network/src/config.rs | 8 ++- client/network/sync/Cargo.toml | 1 + client/network/sync/src/engine.rs | 2 +- client/network/sync/src/lib.rs | 82 +++++++++++++------------ client/network/test/Cargo.toml | 1 + client/network/test/src/lib.rs | 10 ++- client/network/test/src/sync.rs | 15 +++-- client/service/src/builder.rs | 8 +-- client/service/src/config.rs | 6 +- 13 files changed, 92 insertions(+), 58 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5affebc76fdeb..78a6f8080e7aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -726,6 +726,12 @@ dependencies = [ "pin-project-lite 0.2.9", ] +[[package]] +name = "atomic" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" + [[package]] name = "atomic-waker" version = "1.1.1" @@ -9014,6 +9020,7 @@ name = "sc-cli" version = "0.10.0-dev" dependencies = [ "array-bytes 4.2.0", + "atomic", "chrono", "clap 4.3.2", "fdlimit", @@ -9590,6 +9597,7 @@ dependencies = [ "async-channel", "async-trait", "asynchronous-codec", + "atomic", "bytes", "either", "fnv", @@ -9759,6 +9767,7 @@ dependencies = [ "array-bytes 4.2.0", "async-channel", "async-trait", + "atomic", "fork-tree", "futures", "futures-timer", @@ -9796,6 +9805,7 @@ name = "sc-network-test" version = "0.8.0" dependencies = [ "async-trait", + "atomic", "futures", "futures-timer", "libp2p", diff --git a/client/cli/Cargo.toml b/client/cli/Cargo.toml index 08e3f18d3961a..032b2d4ba143e 100644 --- a/client/cli/Cargo.toml +++ b/client/cli/Cargo.toml @@ -14,6 +14,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] array-bytes = "4.1" +atomic = "0.5.3" chrono = "0.4.10" clap = { version = "4.2.5", features = ["derive", "string"] } fdlimit = "0.2.1" diff --git a/client/cli/src/params/network_params.rs b/client/cli/src/params/network_params.rs index c65b4058c3ac0..6523fc99c5366 100644 --- a/client/cli/src/params/network_params.rs +++ b/client/cli/src/params/network_params.rs @@ -17,6 +17,7 @@ // along with this program. If not, see . use crate::{arg_enums::SyncMode, params::node_key_params::NodeKeyParams}; +use atomic::Atomic; use clap::Args; use sc_network::{ config::{ @@ -28,7 +29,7 @@ use sc_service::{ config::{Multiaddr, MultiaddrWithPeerId}, ChainSpec, ChainType, }; -use std::{borrow::Cow, path::PathBuf}; +use std::{borrow::Cow, path::PathBuf, sync::Arc}; /// Parameters used to create the network configuration. #[derive(Debug, Clone, Args)] @@ -243,7 +244,7 @@ impl NetworkParams { kademlia_disjoint_query_paths: self.kademlia_disjoint_query_paths, yamux_window_size: None, ipfs_server: self.ipfs_server, - sync_mode: self.sync.into(), + sync_mode: Arc::new(Atomic::new(self.sync.into())), force_synced: self.force_synced || is_dev, } } diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index be8112327eb80..0b55923af4add 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -18,6 +18,7 @@ array-bytes = "4.1" async-channel = "1.8.0" async-trait = "0.1" asynchronous-codec = "0.6" +atomic = "0.5.3" bytes = "1" codec = { package = "parity-scale-codec", version = "3.2.2", features = ["derive"] } either = "1.5.3" diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 6855fbae24326..e64e075727b38 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -43,6 +43,7 @@ pub use sc_network_common::{ use sc_utils::mpsc::TracingUnboundedSender; use sp_runtime::traits::Block as BlockT; +use atomic::Atomic; use std::{ error::Error, fmt, fs, @@ -53,6 +54,7 @@ use std::{ path::{Path, PathBuf}, pin::Pin, str::{self, FromStr}, + sync::Arc, }; pub use libp2p::{ @@ -556,8 +558,8 @@ pub struct NetworkConfiguration { /// Maximum number of blocks per request. pub max_blocks_per_request: u32, - /// Initial syncing mode. - pub sync_mode: SyncMode, + /// Syncing mode. + pub sync_mode: Arc>, /// True if Kademlia random discovery should be enabled. /// @@ -623,7 +625,7 @@ impl NetworkConfiguration { transport: TransportConfig::Normal { enable_mdns: false, allow_private_ip: true }, max_parallel_downloads: 5, max_blocks_per_request: 64, - sync_mode: SyncMode::Full, + sync_mode: Arc::new(Atomic::new(SyncMode::Full)), enable_dht_random_walk: true, allow_non_globals_in_dht: false, kademlia_disjoint_query_paths: false, diff --git a/client/network/sync/Cargo.toml b/client/network/sync/Cargo.toml index d603c5c263479..944610b62bb5b 100644 --- a/client/network/sync/Cargo.toml +++ b/client/network/sync/Cargo.toml @@ -19,6 +19,7 @@ prost-build = "0.11" array-bytes = "4.1" async-channel = "1.8.0" async-trait = "0.1.58" +atomic = "0.5.3" codec = { package = "parity-scale-codec", version = "3.2.2", features = ["derive"] } futures = "0.3.21" futures-timer = "3.0.2" diff --git a/client/network/sync/src/engine.rs b/client/network/sync/src/engine.rs index d12a67211c296..f1febefa9919f 100644 --- a/client/network/sync/src/engine.rs +++ b/client/network/sync/src/engine.rs @@ -291,7 +291,7 @@ where rx: sc_utils::mpsc::TracingUnboundedReceiver>, force_synced: bool, ) -> Result<(Self, SyncingService, NonDefaultSetConfig), ClientError> { - let mode = net_config.network_config.sync_mode; + let mode = Arc::clone(&net_config.network_config.sync_mode); let max_parallel_downloads = net_config.network_config.max_parallel_downloads; let max_blocks_per_request = if net_config.network_config.max_blocks_per_request > crate::MAX_BLOCKS_IN_RESPONSE as u32 diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 6cbb144bd1c82..ea20fd19e35f3 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -36,6 +36,7 @@ use crate::{ warp::{WarpProofImportResult, WarpSync}, }; +use atomic::Atomic; use codec::Encode; use extra_requests::ExtraRequests; use futures::{ @@ -89,7 +90,7 @@ use std::{ iter, ops::Range, pin::Pin, - sync::Arc, + sync::{atomic::Ordering, Arc}, }; pub use service::chain_sync::SyncingService; @@ -298,7 +299,7 @@ pub struct ChainSync { /// The best block hash in our queue of blocks to import best_queued_hash: B::Hash, /// Current mode (full/light) - mode: SyncMode, + mode: Arc>, /// Any extra justification requests. extra_justifications: ExtraRequests, /// A set of hashes of blocks that are being downloaded or have been @@ -518,20 +519,21 @@ where SyncState::Idle }; - let warp_sync_progress = match (&self.warp_sync, &self.mode, &self.gap_sync) { - (_, _, Some(gap_sync)) => Some(WarpSyncProgress { - phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number), - total_bytes: 0, - }), - (None, SyncMode::Warp, _) => Some(WarpSyncProgress { - phase: WarpSyncPhase::AwaitingPeers { - required_peers: MIN_PEERS_TO_START_WARP_SYNC, - }, - total_bytes: 0, - }), - (Some(sync), _, _) => Some(sync.progress()), - _ => None, - }; + let warp_sync_progress = + match (&self.warp_sync, self.mode.load(Ordering::Acquire), &self.gap_sync) { + (_, _, Some(gap_sync)) => Some(WarpSyncProgress { + phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number), + total_bytes: 0, + }), + (None, SyncMode::Warp, _) => Some(WarpSyncProgress { + phase: WarpSyncPhase::AwaitingPeers { + required_peers: MIN_PEERS_TO_START_WARP_SYNC, + }, + total_bytes: 0, + }), + (Some(sync), _, _) => Some(sync.progress()), + _ => None, + }; SyncStatus { state: sync_state, @@ -653,7 +655,7 @@ where }, ); - if let SyncMode::Warp = self.mode { + if self.mode.load(Ordering::Acquire) == SyncMode::Warp { if self.peers.len() >= MIN_PEERS_TO_START_WARP_SYNC && self.warp_sync.is_none() { log::debug!(target: "sync", "Starting warp state sync."); @@ -1083,7 +1085,7 @@ where is_descendent_of(&**client, base, block) }); - if let SyncMode::LightState { skip_proofs, .. } = &self.mode { + if let SyncMode::LightState { skip_proofs, .. } = self.mode.load(Ordering::Acquire) { if self.state_sync.is_none() && !self.peers.is_empty() && self.queue_blocks.is_empty() { // Finalized a recent block. let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect(); @@ -1102,7 +1104,7 @@ where header, None, None, - *skip_proofs, + skip_proofs, )); self.allowed_requests.set_all(); } @@ -1323,7 +1325,7 @@ where { /// Create a new instance. pub fn new( - mode: SyncMode, + mode: Arc>, client: Arc, protocol_id: ProtocolId, fork_id: &Option, @@ -1422,7 +1424,7 @@ where } fn required_block_attributes(&self) -> BlockAttributes { - match self.mode { + match self.mode.load(Ordering::Acquire) { SyncMode::Full => BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY, SyncMode::LightState { storage_chain_mode: false, .. } | SyncMode::Warp => @@ -1435,7 +1437,7 @@ where } fn skip_execution(&self) -> bool { - match self.mode { + match self.mode.load(Ordering::Acquire) { SyncMode::Full => false, SyncMode::LightState { .. } => true, SyncMode::Warp => true, @@ -1748,25 +1750,27 @@ where /// state for. fn reset_sync_start_point(&mut self) -> Result<(), ClientError> { let info = self.client.info(); - if matches!(self.mode, SyncMode::LightState { .. }) && info.finalized_state.is_some() { + if matches!(self.mode.load(Ordering::Acquire), SyncMode::LightState { .. }) && + info.finalized_state.is_some() + { warn!( target: "sync", "Can't use fast sync mode with a partially synced database. Reverting to full sync mode." ); - self.mode = SyncMode::Full; + self.mode.store(SyncMode::Full, Ordering::Release); } - if matches!(self.mode, SyncMode::Warp) && info.finalized_state.is_some() { + if self.mode.load(Ordering::Acquire) == SyncMode::Warp && info.finalized_state.is_some() { warn!( target: "sync", "Can't use warp sync mode with a partially synced database. Reverting to full sync mode." ); - self.mode = SyncMode::Full; + self.mode.store(SyncMode::Full, Ordering::Release); } self.import_existing = false; self.best_queued_hash = info.best_hash; self.best_queued_number = info.best_number; - if self.mode == SyncMode::Full && + if self.mode.load(Ordering::Acquire) == SyncMode::Full && self.client.block_status(info.best_hash)? != BlockStatus::InChainWithState { self.import_existing = true; @@ -2251,7 +2255,7 @@ where } fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest)> { - if self.mode == SyncMode::Warp { + if self.mode.load(Ordering::Acquire) == SyncMode::Warp { return self .warp_target_block_request() .map_or_else(|| Vec::new(), |req| Vec::from([req])) @@ -2643,7 +2647,7 @@ where self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)), ); self.state_sync = None; - self.mode = SyncMode::Full; + self.mode.store(SyncMode::Full, Ordering::Release); output.extend(self.restart()); } let warp_sync_complete = self @@ -2657,7 +2661,7 @@ where self.warp_sync.as_ref().map_or(0, |s| s.progress().total_bytes / (1024 * 1024)), ); self.warp_sync = None; - self.mode = SyncMode::Full; + self.mode.store(SyncMode::Full, Ordering::Release); output.extend(self.restart()); } let gap_sync_complete = @@ -3105,7 +3109,7 @@ mod test { let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let (mut sync, _) = ChainSync::new( - SyncMode::Full, + Arc::new(Atomic::new(SyncMode::Full)), client.clone(), ProtocolId::from("test-protocol-name"), &Some(String::from("test-fork-id")), @@ -3173,7 +3177,7 @@ mod test { NetworkServiceProvider::new(); let (mut sync, _) = ChainSync::new( - SyncMode::Full, + Arc::new(Atomic::new(SyncMode::Full)), client.clone(), ProtocolId::from("test-protocol-name"), &Some(String::from("test-fork-id")), @@ -3357,7 +3361,7 @@ mod test { NetworkServiceProvider::new(); let (mut sync, _) = ChainSync::new( - SyncMode::Full, + Arc::new(Atomic::new(SyncMode::Full)), client.clone(), ProtocolId::from("test-protocol-name"), &Some(String::from("test-fork-id")), @@ -3485,7 +3489,7 @@ mod test { let info = client.info(); let (mut sync, _) = ChainSync::new( - SyncMode::Full, + Arc::new(Atomic::new(SyncMode::Full)), client.clone(), ProtocolId::from("test-protocol-name"), &Some(String::from("test-fork-id")), @@ -3644,7 +3648,7 @@ mod test { let info = client.info(); let (mut sync, _) = ChainSync::new( - SyncMode::Full, + Arc::new(Atomic::new(SyncMode::Full)), client.clone(), ProtocolId::from("test-protocol-name"), &Some(String::from("test-fork-id")), @@ -3788,7 +3792,7 @@ mod test { let info = client.info(); let (mut sync, _) = ChainSync::new( - SyncMode::Full, + Arc::new(Atomic::new(SyncMode::Full)), client.clone(), ProtocolId::from("test-protocol-name"), &Some(String::from("test-fork-id")), @@ -3934,7 +3938,7 @@ mod test { let blocks = (0..3).map(|_| build_block(&mut client, None, false)).collect::>(); let (mut sync, _) = ChainSync::new( - SyncMode::Full, + Arc::new(Atomic::new(SyncMode::Full)), client.clone(), ProtocolId::from("test-protocol-name"), &Some(String::from("test-fork-id")), @@ -3981,7 +3985,7 @@ mod test { let empty_client = Arc::new(TestClientBuilder::new().build()); let (mut sync, _) = ChainSync::new( - SyncMode::Full, + Arc::new(Atomic::new(SyncMode::Full)), empty_client.clone(), ProtocolId::from("test-protocol-name"), &Some(String::from("test-fork-id")), @@ -4036,7 +4040,7 @@ mod test { let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let (mut sync, _) = ChainSync::new( - SyncMode::Full, + Arc::new(Atomic::new(SyncMode::Full)), client.clone(), ProtocolId::from("test-protocol-name"), &Some(String::from("test-fork-id")), diff --git a/client/network/test/Cargo.toml b/client/network/test/Cargo.toml index 4a1adbb94f1ee..bc076c843ed22 100644 --- a/client/network/test/Cargo.toml +++ b/client/network/test/Cargo.toml @@ -15,6 +15,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] tokio = "1.22.0" async-trait = "0.1.57" +atomic = "0.5.3" futures = "0.3.21" futures-timer = "3.0.1" libp2p = "0.51.3" diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index ee98e5e04736b..8cc2671ac7579 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -28,11 +28,12 @@ use std::{ collections::HashMap, marker::PhantomData, pin::Pin, - sync::Arc, + sync::{atomic::Ordering, Arc}, task::{Context as FutureContext, Poll}, time::Duration, }; +use atomic::Atomic; use futures::{channel::oneshot, future::BoxFuture, pin_mut, prelude::*}; use libp2p::{build_multiaddr, PeerId}; use log::trace; @@ -701,7 +702,7 @@ pub struct FullPeerConfig { /// Whether the full peer should have the authority role. pub is_authority: bool, /// Syncing mode - pub sync_mode: SyncMode, + pub sync_mode: Arc>, /// Extra genesis storage. pub extra_storage: Option, /// Enable transaction indexing. @@ -770,7 +771,10 @@ where *genesis_extra_storage = storage; } - if matches!(config.sync_mode, SyncMode::LightState { .. } | SyncMode::Warp) { + if matches!( + config.sync_mode.load(Ordering::Acquire), + SyncMode::LightState { .. } | SyncMode::Warp + ) { test_client_builder = test_client_builder.set_no_genesis(); } let backend = test_client_builder.backend(); diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index 7c6e341c30fa5..0e473df504360 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -1131,8 +1131,10 @@ async fn syncs_state() { net.add_full_peer_with_config(config_one); let mut config_two = FullPeerConfig::default(); config_two.extra_storage = Some(genesis_storage); - config_two.sync_mode = - SyncMode::LightState { skip_proofs: *skip_proofs, storage_chain_mode: false }; + config_two.sync_mode = Arc::new(Atomic::new(SyncMode::LightState { + skip_proofs: *skip_proofs, + storage_chain_mode: false, + })); net.add_full_peer_with_config(config_two); let hashes = net.peer(0).push_blocks(64, false); // Wait for peer 1 to sync header chain. @@ -1175,7 +1177,10 @@ async fn syncs_indexed_blocks() { net.add_full_peer_with_config(FullPeerConfig { storage_chain: true, ..Default::default() }); net.add_full_peer_with_config(FullPeerConfig { storage_chain: true, - sync_mode: SyncMode::LightState { skip_proofs: false, storage_chain_mode: true }, + sync_mode: Arc::new(Atomic::new(SyncMode::LightState { + skip_proofs: false, + storage_chain_mode: true, + })), ..Default::default() }); net.peer(0).generate_blocks_at( @@ -1228,7 +1233,7 @@ async fn warp_sync() { net.add_full_peer_with_config(Default::default()); net.add_full_peer_with_config(Default::default()); net.add_full_peer_with_config(FullPeerConfig { - sync_mode: SyncMode::Warp, + sync_mode: Arc::new(Atomic::new(SyncMode::Warp)), ..Default::default() }); let gap_end = net.peer(0).push_blocks(63, false).pop().unwrap(); @@ -1269,7 +1274,7 @@ async fn warp_sync_to_target_block() { let target_block = net.peer(0).client.header(target).unwrap().unwrap(); net.add_full_peer_with_config(FullPeerConfig { - sync_mode: SyncMode::Warp, + sync_mode: Arc::new(Atomic::new(SyncMode::Warp)), target_block: Some(target_block), ..Default::default() }); diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index d9f4569cd5851..8489817a14a35 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -76,7 +76,7 @@ use sp_keystore::KeystorePtr; use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero}; use std::{ str::FromStr, - sync::Arc, + sync::{atomic::Ordering, Arc}, time::{Duration, SystemTime}, }; @@ -226,7 +226,7 @@ where offchain_indexing_api: config.offchain_worker.indexing_enabled, wasm_runtime_overrides: config.wasm_runtime_overrides.clone(), no_genesis: matches!( - config.network.sync_mode, + config.network.sync_mode.load(Ordering::Acquire), SyncMode::LightState { .. } | SyncMode::Warp { .. } ), wasm_runtime_substitutes, @@ -793,12 +793,12 @@ where block_relay, } = params; - if warp_sync_params.is_none() && config.network.sync_mode.is_warp() { + if warp_sync_params.is_none() && config.network.sync_mode.load(Ordering::Acquire).is_warp() { return Err("Warp sync enabled, but no warp sync provider configured.".into()) } if client.requires_full_sync() { - match config.network.sync_mode { + match config.network.sync_mode.load(Ordering::Acquire) { SyncMode::LightState { .. } => return Err("Fast sync doesn't work for archive nodes".into()), SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()), diff --git a/client/service/src/config.rs b/client/service/src/config.rs index 52e17c95e6783..5ba3131df9447 100644 --- a/client/service/src/config.rs +++ b/client/service/src/config.rs @@ -41,6 +41,7 @@ use std::{ io, iter, net::SocketAddr, path::{Path, PathBuf}, + sync::atomic::Ordering, }; use tempfile::TempDir; @@ -234,7 +235,10 @@ impl Configuration { /// Returns true if the genesis state writting will be skipped while initializing the genesis /// block. pub fn no_genesis(&self) -> bool { - matches!(self.network.sync_mode, SyncMode::LightState { .. } | SyncMode::Warp { .. }) + matches!( + self.network.sync_mode.load(Ordering::Acquire), + SyncMode::LightState { .. } | SyncMode::Warp { .. } + ) } /// Returns the database config for creating the backend. From 5367ab29340fe230bf3347522392f050b9723dcd Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 26 Jun 2023 23:36:21 +0300 Subject: [PATCH 3/3] Allow pausing of Substrate sync when desired --- client/network/common/src/sync.rs | 2 ++ client/network/sync/src/lib.rs | 13 +++++++++---- client/service/src/builder.rs | 2 +- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/client/network/common/src/sync.rs b/client/network/common/src/sync.rs index 1c8bcfce39270..62bb16ae989f9 100644 --- a/client/network/common/src/sync.rs +++ b/client/network/common/src/sync.rs @@ -209,6 +209,8 @@ pub enum SyncMode { }, /// Warp sync - verify authority set transitions and the latest state. Warp, + /// Sync is paused. + Paused, } impl SyncMode { diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index ea20fd19e35f3..b02f41810b294 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -1433,6 +1433,7 @@ where BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::INDEXED_BODY, + SyncMode::Paused => BlockAttributes::empty(), } } @@ -1441,6 +1442,7 @@ where SyncMode::Full => false, SyncMode::LightState { .. } => true, SyncMode::Warp => true, + SyncMode::Paused => false, } } @@ -2255,10 +2257,13 @@ where } fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest)> { - if self.mode.load(Ordering::Acquire) == SyncMode::Warp { - return self - .warp_target_block_request() - .map_or_else(|| Vec::new(), |req| Vec::from([req])) + match self.mode.load(Ordering::Acquire) { + SyncMode::Warp => + return self.warp_target_block_request().map_or_else(|| Vec::new(), |req| vec![req]), + SyncMode::Paused => return Vec::new(), + SyncMode::LightState { .. } | SyncMode::Full => { + // Continue + }, } if self.allowed_requests.is_empty() || self.state_sync.is_some() { diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 8489817a14a35..a60558e5a2123 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -802,7 +802,7 @@ where SyncMode::LightState { .. } => return Err("Fast sync doesn't work for archive nodes".into()), SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()), - SyncMode::Full => {}, + SyncMode::Full | SyncMode::Paused => {}, } }