Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Pause Substrate sync #52

Merged
merged 3 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
array-bytes = "4.1"
atomic = "0.5.3"
chrono = "0.4.10"
clap = { version = "4.2.5", features = ["derive", "string"] }
fdlimit = "0.2.1"
Expand Down
12 changes: 8 additions & 4 deletions client/cli/src/arg_enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,14 @@ impl Into<sc_network::config::SyncMode> for SyncMode {
fn into(self) -> sc_network::config::SyncMode {
match self {
SyncMode::Full => sc_network::config::SyncMode::Full,
SyncMode::Fast =>
sc_network::config::SyncMode::Fast { skip_proofs: false, storage_chain_mode: false },
SyncMode::FastUnsafe =>
sc_network::config::SyncMode::Fast { skip_proofs: true, storage_chain_mode: false },
SyncMode::Fast => sc_network::config::SyncMode::LightState {
skip_proofs: false,
storage_chain_mode: false,
},
SyncMode::FastUnsafe => sc_network::config::SyncMode::LightState {
skip_proofs: true,
storage_chain_mode: false,
},
SyncMode::Warp => sc_network::config::SyncMode::Warp,
}
}
Expand Down
5 changes: 3 additions & 2 deletions client/cli/src/params/network_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::{arg_enums::SyncMode, params::node_key_params::NodeKeyParams};
use atomic::Atomic;
use clap::Args;
use sc_network::{
config::{
Expand All @@ -28,7 +29,7 @@ use sc_service::{
config::{Multiaddr, MultiaddrWithPeerId},
ChainSpec, ChainType,
};
use std::{borrow::Cow, path::PathBuf};
use std::{borrow::Cow, path::PathBuf, sync::Arc};

/// Parameters used to create the network configuration.
#[derive(Debug, Clone, Args)]
Expand Down Expand Up @@ -243,7 +244,7 @@ impl NetworkParams {
kademlia_disjoint_query_paths: self.kademlia_disjoint_query_paths,
yamux_window_size: None,
ipfs_server: self.ipfs_server,
sync_mode: self.sync.into(),
sync_mode: Arc::new(Atomic::new(self.sync.into())),
force_synced: self.force_synced || is_dev,
}
}
Expand Down
1 change: 1 addition & 0 deletions client/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ array-bytes = "4.1"
async-channel = "1.8.0"
async-trait = "0.1"
asynchronous-codec = "0.6"
atomic = "0.5.3"
bytes = "1"
codec = { package = "parity-scale-codec", version = "3.2.2", features = ["derive"] }
either = "1.5.3"
Expand Down
56 changes: 30 additions & 26 deletions client/network/common/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,32 +191,45 @@ pub enum PollBlockAnnounceValidation<H> {
/// The announcement.
announce: BlockAnnounce<H>,
},
/// The announcement header should be imported.
ImportHeader {
/// Who sent the processed block announcement?
who: PeerId,
/// Was this their new best block?
is_best: bool,
/// The announcement.
announce: BlockAnnounce<H>,
},
/// The block announcement should be skipped.
Skip,
}

/// Operation mode.
#[derive(Debug, PartialEq, Eq)]
/// Sync operation mode.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum SyncMode {
// Sync headers only
Light,
// Sync headers and block bodies
/// Full block download and verification.
Full,
// Sync headers and the last finalied state
LightState { storage_chain_mode: bool, skip_proofs: bool },
// Warp sync mode.
/// Download blocks and the latest state.
LightState {
/// Skip state proof download and verification.
skip_proofs: bool,
/// Download indexed transactions for recent blocks.
storage_chain_mode: bool,
},
/// Warp sync - verify authority set transitions and the latest state.
Warp,
/// Sync is paused.
Paused,
}

impl SyncMode {
/// Returns `true` if `self` is [`Self::Warp`].
pub fn is_warp(&self) -> bool {
matches!(self, Self::Warp)
}

/// Returns `true` if `self` is [`Self::LightState`].
pub fn light_state(&self) -> bool {
matches!(self, Self::LightState { .. })
}
}

impl Default for SyncMode {
fn default() -> Self {
Self::Full
}
}
#[derive(Debug)]
pub struct Metrics {
pub queued_blocks: u32,
Expand Down Expand Up @@ -361,12 +374,6 @@ pub trait ChainSync<Block: BlockT>: Send {
response: BlockResponse<Block>,
) -> Result<OnBlockData<Block>, BadPeer>;

/// Procss received block data.
fn process_block_response_data(
&mut self,
blocks_to_import: Result<OnBlockData<Block>, BadPeer>,
);

/// Handle a response from the remote to a justification request that we made.
///
/// `request` must be the original request that triggered `response`.
Expand Down Expand Up @@ -406,9 +413,6 @@ pub trait ChainSync<Block: BlockT>: Send {
/// [`ChainSync::push_block_announce_validation`].
///
/// This should be polled until it returns [`Poll::Pending`].
///
/// If [`PollBlockAnnounceValidation::ImportHeader`] is returned, then the caller MUST try to
/// import passed header (call `on_block_data`). The network request isn't sent in this case.
fn poll_block_announce_validation(
&mut self,
cx: &mut std::task::Context<'_>,
Expand Down
44 changes: 6 additions & 38 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ use zeroize::Zeroize;

pub use sc_network_common::{
role::{Role, Roles},
sync::warp::WarpSyncProvider,
sync::{warp::WarpSyncProvider, SyncMode},
ExHashT,
};
use sc_utils::mpsc::TracingUnboundedSender;
use sp_runtime::traits::Block as BlockT;

use atomic::Atomic;
use std::{
error::Error,
fmt, fs,
Expand All @@ -53,6 +54,7 @@ use std::{
path::{Path, PathBuf},
pin::Pin,
str::{self, FromStr},
sync::Arc,
};

pub use libp2p::{
Expand Down Expand Up @@ -275,40 +277,6 @@ impl NonReservedPeerMode {
}
}

/// Sync operation mode.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum SyncMode {
/// Full block download and verification.
Full,
/// Download blocks and the latest state.
Fast {
/// Skip state proof download and verification.
skip_proofs: bool,
/// Download indexed transactions for recent blocks.
storage_chain_mode: bool,
},
/// Warp sync - verify authority set transitions and the latest state.
Warp,
}

impl SyncMode {
/// Returns if `self` is [`Self::Warp`].
pub fn is_warp(&self) -> bool {
matches!(self, Self::Warp)
}

/// Returns if `self` is [`Self::Fast`].
pub fn is_fast(&self) -> bool {
matches!(self, Self::Fast { .. })
}
}

impl Default for SyncMode {
fn default() -> Self {
Self::Full
}
}

/// The configuration of a node's secret key, describing the type of key
/// and how it is obtained. A node's identity keypair is the result of
/// the evaluation of the node key configuration.
Expand Down Expand Up @@ -590,8 +558,8 @@ pub struct NetworkConfiguration {
/// Maximum number of blocks per request.
pub max_blocks_per_request: u32,

/// Initial syncing mode.
pub sync_mode: SyncMode,
/// Syncing mode.
pub sync_mode: Arc<Atomic<SyncMode>>,

/// True if Kademlia random discovery should be enabled.
///
Expand Down Expand Up @@ -657,7 +625,7 @@ impl NetworkConfiguration {
transport: TransportConfig::Normal { enable_mdns: false, allow_private_ip: true },
max_parallel_downloads: 5,
max_blocks_per_request: 64,
sync_mode: SyncMode::Full,
sync_mode: Arc::new(Atomic::new(SyncMode::Full)),
enable_dht_random_walk: true,
allow_non_globals_in_dht: false,
kademlia_disjoint_query_paths: false,
Expand Down
1 change: 1 addition & 0 deletions client/network/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ prost-build = "0.11"
array-bytes = "4.1"
async-channel = "1.8.0"
async-trait = "0.1.58"
atomic = "0.5.3"
codec = { package = "parity-scale-codec", version = "3.2.2", features = ["derive"] }
futures = "0.3.21"
futures-timer = "3.0.2"
Expand Down
59 changes: 6 additions & 53 deletions client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,16 @@ use prometheus_endpoint::{
use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
use sc_consensus::import_queue::ImportQueueService;
use sc_network::{
config::{
FullNetworkConfiguration, NonDefaultSetConfig, ProtocolId, SyncMode as SyncOperationMode,
},
config::{FullNetworkConfiguration, NonDefaultSetConfig, ProtocolId},
utils::LruHashSet,
NotificationsSink, ProtocolName, ReputationChange,
};
use sc_network_common::{
role::Roles,
sync::{
message::{
generic::{BlockData, BlockResponse},
BlockAnnounce, BlockAnnouncesHandshake, BlockState,
},
message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState},
warp::WarpSyncParams,
BadPeer, ChainSync as ChainSyncT, ExtendedPeerInfo, PollBlockAnnounceValidation, SyncEvent,
SyncMode,
},
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
Expand Down Expand Up @@ -297,12 +291,7 @@ where
rx: sc_utils::mpsc::TracingUnboundedReceiver<sc_network::SyncEvent<B>>,
force_synced: bool,
) -> Result<(Self, SyncingService<B>, NonDefaultSetConfig), ClientError> {
let mode = match net_config.network_config.sync_mode {
SyncOperationMode::Full => SyncMode::Full,
SyncOperationMode::Fast { skip_proofs, storage_chain_mode } =>
SyncMode::LightState { skip_proofs, storage_chain_mode },
SyncOperationMode::Warp => SyncMode::Warp,
};
let mode = Arc::clone(&net_config.network_config.sync_mode);
let max_parallel_downloads = net_config.network_config.max_parallel_downloads;
let max_blocks_per_request = if net_config.network_config.max_blocks_per_request >
crate::MAX_BLOCKS_IN_RESPONSE as u32
Expand Down Expand Up @@ -488,8 +477,8 @@ where
&mut self,
validation_result: PollBlockAnnounceValidation<B::Header>,
) {
let (header, _is_best, who) = match validation_result {
PollBlockAnnounceValidation::Skip => return,
match validation_result {
PollBlockAnnounceValidation::Skip => {},
PollBlockAnnounceValidation::Nothing { is_best: _, who, announce } => {
self.update_peer_info(&who);

Expand All @@ -498,19 +487,6 @@ where
self.block_announce_data_cache.put(announce.header.hash(), data);
}
}

return
},
PollBlockAnnounceValidation::ImportHeader { announce, is_best, who } => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, why did you remove this option?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a part of paritytech#14465 and it was no longer used after SyncMode::Light enum variant was gone, I just pulled the strings and removed all unused code after that.

self.update_peer_info(&who);

if let Some(data) = announce.data {
if !data.is_empty() {
self.block_announce_data_cache.put(announce.header.hash(), data);
}
}

(announce.header, is_best, who)
},
PollBlockAnnounceValidation::Failure { who, disconnect } => {
if disconnect {
Expand All @@ -519,31 +495,8 @@ where
}

self.network_service.report_peer(who, rep::BAD_BLOCK_ANNOUNCEMENT);
return
},
};

// to import header from announced block let's construct response to request that normally
// would have been sent over network (but it is not in our case)
let blocks_to_import = self.chain_sync.on_block_data(
&who,
None,
BlockResponse {
id: 0,
blocks: vec![BlockData {
hash: header.hash(),
header: Some(header),
body: None,
indexed_body: None,
receipt: None,
message_queue: None,
justification: None,
justifications: None,
}],
},
);

self.chain_sync.process_block_response_data(blocks_to_import);
}
}

/// Push a block announce validation.
Expand Down
Loading
Loading