Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Import target block body during warp sync #12300

Merged
merged 9 commits into from
Sep 20, 2022
2 changes: 2 additions & 0 deletions client/network/common/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ pub enum OnBlockData<Block: BlockT> {
Import(BlockOrigin, Vec<IncomingBlock<Block>>),
/// A new block request needs to be made to the given peer.
Request(PeerId, BlockRequest<Block>),
/// Continue processing events.
Continue,
}

/// Result of [`ChainSync::on_block_justification`].
Expand Down
3 changes: 3 additions & 0 deletions client/network/common/src/sync/warp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ pub enum WarpSyncPhase<Block: BlockT> {
AwaitingPeers,
/// Downloading and verifying grandpa warp proofs.
DownloadingWarpProofs,
/// Downloading target block.
DownloadingTargetBlock,
/// Downloading state data.
DownloadingState,
/// Importing state.
Expand All @@ -77,6 +79,7 @@ impl<Block: BlockT> fmt::Display for WarpSyncPhase<Block> {
match self {
Self::AwaitingPeers => write!(f, "Waiting for peers"),
Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"),
Self::DownloadingTargetBlock => write!(f, "Downloading target block"),
Self::DownloadingState => write!(f, "Downloading state"),
Self::ImportingState => write!(f, "Importing state"),
Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n),
Expand Down
2 changes: 2 additions & 0 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ where
CustomMessageOutcome::BlockImport(origin, blocks),
Ok(OnBlockData::Request(peer, req)) =>
prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, peer, req),
Ok(OnBlockData::Continue) => CustomMessageOutcome::None,
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
Expand Down Expand Up @@ -974,6 +975,7 @@ where
CustomMessageOutcome::BlockImport(origin, blocks),
Ok(OnBlockData::Request(peer, req)) =>
prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, peer, req),
Ok(OnBlockData::Continue) => CustomMessageOutcome::None,
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
Expand Down
81 changes: 73 additions & 8 deletions client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ use std::{
pin::Pin,
sync::Arc,
};
use warp::TargetBlockImportResult;

mod extra_requests;

Expand Down Expand Up @@ -315,6 +316,8 @@ pub enum PeerSyncState<B: BlockT> {
DownloadingState,
/// Downloading warp proof.
DownloadingWarpProof,
/// Downloading warp sync target block.
DownloadingWarpTarget,
/// Actively downloading block history after warp sync.
DownloadingGap(NumberFor<B>),
}
Expand Down Expand Up @@ -659,10 +662,11 @@ where
}

fn block_requests(&mut self) -> Box<dyn Iterator<Item = (&PeerId, BlockRequest<B>)> + '_> {
if self.allowed_requests.is_empty() ||
self.state_sync.is_some() ||
self.mode == SyncMode::Warp
{
if self.mode == SyncMode::Warp {
return Box::new(std::iter::once(self.warp_target_block_request()).flatten())
}

if self.allowed_requests.is_empty() || self.state_sync.is_some() {
return Box::new(std::iter::empty())
}

Expand Down Expand Up @@ -824,7 +828,7 @@ where
// Only one pending state request is allowed.
return None
}
if let Some(request) = sync.next_warp_poof_request() {
if let Some(request) = sync.next_warp_proof_request() {
let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
if !targets.is_empty() {
targets.sort();
Expand Down Expand Up @@ -1031,6 +1035,40 @@ where
Vec::new()
}
},
PeerSyncState::DownloadingWarpTarget => {
peer.state = PeerSyncState::Available;
if let Some(warp_sync) = &mut self.warp_sync {
if blocks.len() == 1 {
altonen marked this conversation as resolved.
Show resolved Hide resolved
validate_blocks::<B>(&blocks, who, Some(request))?;
match warp_sync.import_target_block(
blocks.pop().expect("`blocks` len checked above."),
) {
TargetBlockImportResult::Success =>
return Ok(OnBlockData::Continue),
TargetBlockImportResult::BadResponse =>
return Err(BadPeer(*who, rep::VERIFICATION_FAIL)),
}
} else if blocks.is_empty() {
debug!(target: "sync", "Empty block response from {}", who);
return Err(BadPeer(*who, rep::NO_BLOCK))
} else {
debug!(
target: "sync",
"Too many blocks ({}) in warp target block response from {}",
blocks.len(),
who,
);
return Err(BadPeer(*who, rep::NOT_REQUESTED))
}
} else {
debug!(
target: "sync",
"Logic error: we think we are downloading warp target block from {}, but no warp sync is happening.",
who,
);
return Ok(OnBlockData::Continue)
}
},
PeerSyncState::Available |
PeerSyncState::DownloadingJustification(..) |
PeerSyncState::DownloadingState |
Expand Down Expand Up @@ -1112,12 +1150,12 @@ where
};

match import_result {
state::ImportResult::Import(hash, header, state) => {
state::ImportResult::Import(hash, header, state, body) => {
let origin = BlockOrigin::NetworkInitialSync;
let block = IncomingBlock {
hash,
header: Some(header),
body: None,
body,
indexed_body: None,
justifications: None,
origin: None,
Expand Down Expand Up @@ -1400,7 +1438,7 @@ where
hash,
);
self.state_sync =
Some(StateSync::new(self.client.clone(), header, *skip_proofs));
Some(StateSync::new(self.client.clone(), header, None, *skip_proofs));
self.allowed_requests.set_all();
}
}
Expand Down Expand Up @@ -2163,6 +2201,33 @@ where
})
.collect()
}

/// Generate block request for downloading of the target block body during warp sync.
fn warp_target_block_request(&mut self) -> Option<(&PeerId, BlockRequest<B>)> {
if let Some(sync) = &self.warp_sync {
if self.allowed_requests.is_empty() ||
sync.is_complete() ||
self.peers
.iter()
.any(|(_, peer)| peer.state == PeerSyncState::DownloadingWarpTarget)
{
// Only one pending warp target block request is allowed.
return None
}
if let Some((target_number, request)) = sync.next_target_block_request() {
// Find a random peer that has a block with the target number.
for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() && peer.best_number >= target_number {
trace!(target: "sync", "New warp target block request for {}", id);
peer.state = PeerSyncState::DownloadingWarpTarget;
self.allowed_requests.clear();
return Some((id, request))
}
}
}
}
None
}
}

// This is purely during a backwards compatible transitionary period and should be removed
Expand Down
18 changes: 13 additions & 5 deletions client/network/sync/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct StateSync<B: BlockT, Client> {
target_block: B::Hash,
target_header: B::Header,
target_root: B::Hash,
target_body: Option<Vec<B::Extrinsic>>,
last_key: SmallVec<[Vec<u8>; 2]>,
state: HashMap<Vec<u8>, (Vec<(Vec<u8>, Vec<u8>)>, Vec<Vec<u8>>)>,
complete: bool,
Expand All @@ -46,7 +47,7 @@ pub struct StateSync<B: BlockT, Client> {
/// Import state chunk result.
pub enum ImportResult<B: BlockT> {
/// State is complete and ready for import.
Import(B::Hash, B::Header, ImportedState<B>),
Import(B::Hash, B::Header, ImportedState<B>, Option<Vec<B::Extrinsic>>),
/// Continue downloading.
Continue,
/// Bad state chunk.
Expand All @@ -59,12 +60,18 @@ where
Client: ProofProvider<B> + Send + Sync + 'static,
{
/// Create a new instance.
pub fn new(client: Arc<Client>, target: B::Header, skip_proof: bool) -> Self {
pub fn new(
client: Arc<Client>,
target_header: B::Header,
target_body: Option<Vec<B::Extrinsic>>,
skip_proof: bool,
) -> Self {
Self {
client,
target_block: target.hash(),
target_root: *target.state_root(),
target_header: target,
target_block: target_header.hash(),
target_root: *target_header.state_root(),
target_header,
target_body,
last_key: SmallVec::default(),
state: HashMap::default(),
complete: false,
Expand Down Expand Up @@ -213,6 +220,7 @@ where
block: self.target_block,
state: std::mem::take(&mut self.state).into(),
},
self.target_body.clone(),
)
} else {
ImportResult::Continue
Expand Down
Loading