Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Unify SyncMode data structures under one #14465

Merged
merged 1 commit into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions client/cli/src/arg_enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,14 @@ impl Into<sc_network::config::SyncMode> for SyncMode {
fn into(self) -> sc_network::config::SyncMode {
match self {
SyncMode::Full => sc_network::config::SyncMode::Full,
SyncMode::Fast =>
sc_network::config::SyncMode::Fast { skip_proofs: false, storage_chain_mode: false },
SyncMode::FastUnsafe =>
sc_network::config::SyncMode::Fast { skip_proofs: true, storage_chain_mode: false },
SyncMode::Fast => sc_network::config::SyncMode::LightState {
skip_proofs: false,
storage_chain_mode: false,
},
SyncMode::FastUnsafe => sc_network::config::SyncMode::LightState {
skip_proofs: true,
storage_chain_mode: false,
},
SyncMode::Warp => sc_network::config::SyncMode::Warp,
}
}
Expand Down
54 changes: 28 additions & 26 deletions client/network/common/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,32 +185,43 @@ pub enum PollBlockAnnounceValidation<H> {
/// The announcement.
announce: BlockAnnounce<H>,
},
/// The announcement header should be imported.
ImportHeader {
/// Who sent the processed block announcement?
who: PeerId,
/// Was this their new best block?
is_best: bool,
/// The announcement.
announce: BlockAnnounce<H>,
},
/// The block announcement should be skipped.
Skip,
}

/// Operation mode.
#[derive(Debug, PartialEq, Eq)]
/// Sync operation mode.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum SyncMode {
// Sync headers only
Light,
// Sync headers and block bodies
/// Full block download and verification.
Full,
// Sync headers and the last finalied state
LightState { storage_chain_mode: bool, skip_proofs: bool },
// Warp sync mode.
/// Download blocks and the latest state.
LightState {
/// Skip state proof download and verification.
skip_proofs: bool,
/// Download indexed transactions for recent blocks.
storage_chain_mode: bool,
},
/// Warp sync - verify authority set transitions and the latest state.
Warp,
}

impl SyncMode {
/// Returns `true` if `self` is [`Self::Warp`].
pub fn is_warp(&self) -> bool {
matches!(self, Self::Warp)
}

/// Returns `true` if `self` is [`Self::LightState`].
pub fn light_state(&self) -> bool {
matches!(self, Self::LightState { .. })
}
}

impl Default for SyncMode {
fn default() -> Self {
Self::Full
}
}
#[derive(Debug)]
pub struct Metrics {
pub queued_blocks: u32,
Expand Down Expand Up @@ -376,12 +387,6 @@ pub trait ChainSync<Block: BlockT>: Send {
response: BlockResponse<Block>,
) -> Result<OnBlockData<Block>, BadPeer>;

/// Procss received block data.
fn process_block_response_data(
&mut self,
blocks_to_import: Result<OnBlockData<Block>, BadPeer>,
);

/// Handle a response from the remote to a justification request that we made.
///
/// `request` must be the original request that triggered `response`.
Expand Down Expand Up @@ -421,9 +426,6 @@ pub trait ChainSync<Block: BlockT>: Send {
/// [`ChainSync::push_block_announce_validation`].
///
/// This should be polled until it returns [`Poll::Pending`].
///
/// If [`PollBlockAnnounceValidation::ImportHeader`] is returned, then the caller MUST try to
/// import passed header (call `on_block_data`). The network request isn't sent in this case.
fn poll_block_announce_validation(
&mut self,
cx: &mut std::task::Context<'_>,
Expand Down
36 changes: 1 addition & 35 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use zeroize::Zeroize;

pub use sc_network_common::{
role::{Role, Roles},
sync::warp::WarpSyncProvider,
sync::{warp::WarpSyncProvider, SyncMode},
ExHashT,
};
use sc_utils::mpsc::TracingUnboundedSender;
Expand Down Expand Up @@ -277,40 +277,6 @@ impl NonReservedPeerMode {
}
}

/// Sync operation mode.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum SyncMode {
/// Full block download and verification.
Full,
/// Download blocks and the latest state.
Fast {
/// Skip state proof download and verification.
skip_proofs: bool,
/// Download indexed transactions for recent blocks.
storage_chain_mode: bool,
},
/// Warp sync - verify authority set transitions and the latest state.
Warp,
}

impl SyncMode {
/// Returns if `self` is [`Self::Warp`].
pub fn is_warp(&self) -> bool {
matches!(self, Self::Warp)
}

/// Returns if `self` is [`Self::Fast`].
pub fn is_fast(&self) -> bool {
matches!(self, Self::Fast { .. })
}
}

impl Default for SyncMode {
fn default() -> Self {
Self::Full
}
}

/// The configuration of a node's secret key, describing the type of key
/// and how it is obtained. A node's identity keypair is the result of
/// the evaluation of the node key configuration.
Expand Down
59 changes: 6 additions & 53 deletions client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,16 @@ use prometheus_endpoint::{
use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
use sc_consensus::import_queue::ImportQueueService;
use sc_network::{
config::{
FullNetworkConfiguration, NonDefaultSetConfig, ProtocolId, SyncMode as SyncOperationMode,
},
config::{FullNetworkConfiguration, NonDefaultSetConfig, ProtocolId},
utils::LruHashSet,
NotificationsSink, ProtocolName, ReputationChange,
};
use sc_network_common::{
role::Roles,
sync::{
message::{
generic::{BlockData, BlockResponse},
BlockAnnounce, BlockAnnouncesHandshake, BlockState,
},
message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState},
warp::WarpSyncParams,
BadPeer, ChainSync as ChainSyncT, ExtendedPeerInfo, PollBlockAnnounceValidation, SyncEvent,
SyncMode,
},
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
Expand Down Expand Up @@ -290,12 +284,7 @@ where
warp_sync_protocol_name: Option<ProtocolName>,
rx: sc_utils::mpsc::TracingUnboundedReceiver<sc_network::SyncEvent<B>>,
) -> Result<(Self, SyncingService<B>, NonDefaultSetConfig), ClientError> {
let mode = match net_config.network_config.sync_mode {
SyncOperationMode::Full => SyncMode::Full,
SyncOperationMode::Fast { skip_proofs, storage_chain_mode } =>
SyncMode::LightState { skip_proofs, storage_chain_mode },
SyncOperationMode::Warp => SyncMode::Warp,
};
let mode = net_config.network_config.sync_mode;
let max_parallel_downloads = net_config.network_config.max_parallel_downloads;
let max_blocks_per_request = if net_config.network_config.max_blocks_per_request >
crate::MAX_BLOCKS_IN_RESPONSE as u32
Expand Down Expand Up @@ -469,8 +458,8 @@ where
&mut self,
validation_result: PollBlockAnnounceValidation<B::Header>,
) {
let (header, _is_best, who) = match validation_result {
PollBlockAnnounceValidation::Skip => return,
match validation_result {
PollBlockAnnounceValidation::Skip => {},
PollBlockAnnounceValidation::Nothing { is_best: _, who, announce } => {
self.update_peer_info(&who);

Expand All @@ -479,19 +468,6 @@ where
self.block_announce_data_cache.put(announce.header.hash(), data);
}
}

return
},
PollBlockAnnounceValidation::ImportHeader { announce, is_best, who } => {
self.update_peer_info(&who);

if let Some(data) = announce.data {
if !data.is_empty() {
self.block_announce_data_cache.put(announce.header.hash(), data);
}
}

(announce.header, is_best, who)
},
PollBlockAnnounceValidation::Failure { who, disconnect } => {
if disconnect {
Expand All @@ -500,31 +476,8 @@ where
}

self.network_service.report_peer(who, rep::BAD_BLOCK_ANNOUNCEMENT);
return
},
};

// to import header from announced block let's construct response to request that normally
// would have been sent over network (but it is not in our case)
let blocks_to_import = self.chain_sync.on_block_data(
&who,
None,
BlockResponse {
id: 0,
blocks: vec![BlockData {
hash: header.hash(),
header: Some(header),
body: None,
indexed_body: None,
receipt: None,
message_queue: None,
justification: None,
justifications: None,
}],
},
);

self.chain_sync.process_block_response_data(blocks_to_import);
}
}

/// Push a block announce validation.
Expand Down
27 changes: 0 additions & 27 deletions client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,19 +1003,6 @@ where
Ok(self.validate_and_queue_blocks(new_blocks, gap))
}

fn process_block_response_data(&mut self, blocks_to_import: Result<OnBlockData<B>, BadPeer>) {
match blocks_to_import {
Ok(OnBlockData::Import(origin, blocks)) => self.import_blocks(origin, blocks),
Ok(OnBlockData::Request(peer, req)) => self.send_block_request(peer, req),
Ok(OnBlockData::Continue) => {},
Err(BadPeer(id, repu)) => {
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
self.network_service.report_peer(id, repu);
},
}
}

fn on_block_justification(
&mut self,
who: PeerId,
Expand Down Expand Up @@ -1499,7 +1486,6 @@ where
match self.mode {
SyncMode::Full =>
BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
SyncMode::Light => BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION,
SyncMode::LightState { storage_chain_mode: false, .. } | SyncMode::Warp =>
BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
SyncMode::LightState { storage_chain_mode: true, .. } =>
Expand All @@ -1512,7 +1498,6 @@ where
fn skip_execution(&self) -> bool {
match self.mode {
SyncMode::Full => false,
SyncMode::Light => true,
SyncMode::LightState { .. } => true,
SyncMode::Warp => true,
}
Expand Down Expand Up @@ -1759,18 +1744,6 @@ where
return PollBlockAnnounceValidation::Nothing { is_best, who, announce }
}

let requires_additional_data = self.mode != SyncMode::Light || !known_parent;
if !requires_additional_data {
trace!(
target: "sync",
"Importing new header announced from {}: {} {:?}",
who,
hash,
announce.header,
);
return PollBlockAnnounceValidation::ImportHeader { is_best, announce, who }
}

if self.status().state == SyncState::Idle {
trace!(
target: "sync",
Expand Down
1 change: 0 additions & 1 deletion client/network/sync/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ mockall::mock! {
request: Option<BlockRequest<Block>>,
response: BlockResponse<Block>,
) -> Result<OnBlockData<Block>, BadPeer>;
fn process_block_response_data(&mut self, blocks_to_import: Result<OnBlockData<Block>, BadPeer>);
fn on_block_justification(
&mut self,
who: PeerId,
Expand Down
2 changes: 1 addition & 1 deletion client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ where
*genesis_extra_storage = storage;
}

if matches!(config.sync_mode, SyncMode::Fast { .. } | SyncMode::Warp) {
if matches!(config.sync_mode, SyncMode::LightState { .. } | SyncMode::Warp) {
test_client_builder = test_client_builder.set_no_genesis();
}
let backend = test_client_builder.backend();
Expand Down
4 changes: 2 additions & 2 deletions client/network/test/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1132,7 +1132,7 @@ async fn syncs_state() {
let mut config_two = FullPeerConfig::default();
config_two.extra_storage = Some(genesis_storage);
config_two.sync_mode =
SyncMode::Fast { skip_proofs: *skip_proofs, storage_chain_mode: false };
SyncMode::LightState { skip_proofs: *skip_proofs, storage_chain_mode: false };
net.add_full_peer_with_config(config_two);
let hashes = net.peer(0).push_blocks(64, false);
// Wait for peer 1 to sync header chain.
Expand Down Expand Up @@ -1175,7 +1175,7 @@ async fn syncs_indexed_blocks() {
net.add_full_peer_with_config(FullPeerConfig { storage_chain: true, ..Default::default() });
net.add_full_peer_with_config(FullPeerConfig {
storage_chain: true,
sync_mode: SyncMode::Fast { skip_proofs: false, storage_chain_mode: true },
sync_mode: SyncMode::LightState { skip_proofs: false, storage_chain_mode: true },
..Default::default()
});
net.peer(0).generate_blocks_at(
Expand Down
5 changes: 3 additions & 2 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ where
wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
no_genesis: matches!(
config.network.sync_mode,
SyncMode::Fast { .. } | SyncMode::Warp { .. }
SyncMode::LightState { .. } | SyncMode::Warp { .. }
),
wasm_runtime_substitutes,
},
Expand Down Expand Up @@ -794,7 +794,8 @@ where

if client.requires_full_sync() {
match config.network.sync_mode {
SyncMode::Fast { .. } => return Err("Fast sync doesn't work for archive nodes".into()),
SyncMode::LightState { .. } =>
return Err("Fast sync doesn't work for archive nodes".into()),
SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()),
SyncMode::Full => {},
}
Expand Down
2 changes: 1 addition & 1 deletion client/service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl Configuration {
/// Returns true if the genesis state writting will be skipped while initializing the genesis
/// block.
pub fn no_genesis(&self) -> bool {
matches!(self.network.sync_mode, SyncMode::Fast { .. } | SyncMode::Warp { .. })
matches!(self.network.sync_mode, SyncMode::LightState { .. } | SyncMode::Warp { .. })
}

/// Returns the database config for creating the backend.
Expand Down
Loading