From ac78c9082206d558e8b4958e6730bc3d38b4f365 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Tue, 5 Nov 2019 12:17:12 +0100 Subject: [PATCH] Header-only sync for old forks (#3942) * Header-only sync for old forks * Simplified blocks-count * Update core/consensus/common/src/block_import.rs Co-Authored-By: Marcio Diaz --- core/client/db/src/lib.rs | 43 +++++++++------ core/client/src/client.rs | 67 +++++++++++++---------- core/consensus/common/src/block_import.rs | 6 ++ core/consensus/common/src/import_queue.rs | 11 +++- core/finality-grandpa/src/light_import.rs | 4 ++ core/finality-grandpa/src/tests.rs | 1 + core/network/src/protocol/sync.rs | 50 ++++++++++------- core/network/src/test/block_import.rs | 4 +- core/network/src/test/mod.rs | 25 +++++---- core/network/src/test/sync.rs | 43 ++++++++++++++- core/test-client/src/lib.rs | 6 ++ 11 files changed, 179 insertions(+), 81 deletions(-) diff --git a/core/client/db/src/lib.rs b/core/client/db/src/lib.rs index 8bd0001981611..1b42cfaab8f97 100644 --- a/core/client/db/src/lib.rs +++ b/core/client/db/src/lib.rs @@ -894,6 +894,12 @@ impl> Backend { inmem } + /// Returns total numbet of blocks (headers) in the block DB. + #[cfg(feature = "test-helpers")] + pub fn blocks_count(&self) -> u64 { + self.blockchain.db.iter(columns::HEADER).count() as u64 + } + /// Read (from storage or cache) changes trie config. /// /// Currently changes tries configuration is set up once (at genesis) and could not @@ -1115,7 +1121,7 @@ impl> Backend { ); transaction.put(columns::HEADER, &lookup_key, &pending_block.header.encode()); - if let Some(body) = pending_block.body { + if let Some(body) = &pending_block.body { transaction.put(columns::BODY, &lookup_key, &body.encode()); } if let Some(justification) = pending_block.justification { @@ -1127,21 +1133,26 @@ impl> Backend { transaction.put(columns::META, meta_keys::GENESIS_HASH, hash.as_ref()); } - let mut changeset: state_db::ChangeSet> = state_db::ChangeSet::default(); - for (key, (val, rc)) in operation.db_updates.drain() { - if rc > 0 { - changeset.inserted.push((key, val.to_vec())); - } else if rc < 0 { - changeset.deleted.push(key); + let finalized = if pending_block.body.is_some() { + let mut changeset: state_db::ChangeSet> = state_db::ChangeSet::default(); + for (key, (val, rc)) in operation.db_updates.drain() { + if rc > 0 { + changeset.inserted.push((key, val.to_vec())); + } else if rc < 0 { + changeset.deleted.push(key); + } } - } - let number_u64 = number.saturated_into::(); - let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset) - .map_err(|e: state_db::Error| client::error::Error::from(format!("State database error: {:?}", e)))?; - apply_state_commit(&mut transaction, commit); - - // Check if need to finalize. Genesis is always finalized instantly. - let finalized = number_u64 == 0 || pending_block.leaf_state.is_final(); + let number_u64 = number.saturated_into::(); + let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset) + .map_err(|e: state_db::Error| client::error::Error::from(format!("State database error: {:?}", e)))?; + apply_state_commit(&mut transaction, commit); + + // Check if need to finalize. Genesis is always finalized instantly. + let finalized = number_u64 == 0 || pending_block.leaf_state.is_final(); + finalized + } else { + false + }; let header = &pending_block.header; let is_best = pending_block.leaf_state.is_best(); @@ -1581,7 +1592,7 @@ mod tests { }; let mut op = backend.begin_operation().unwrap(); backend.begin_state_operation(&mut op, block_id).unwrap(); - op.set_block_data(header, None, None, NewBlockState::Best).unwrap(); + op.set_block_data(header, Some(Vec::new()), None, NewBlockState::Best).unwrap(); op.update_changes_trie((changes_trie_update, ChangesTrieCacheAction::Clear)).unwrap(); backend.commit_operation(op).unwrap(); diff --git a/core/client/src/client.rs b/core/client/src/client.rs index 71d6e4f01d637..8c18636b28f8e 100644 --- a/core/client/src/client.rs +++ b/core/client/src/client.rs @@ -927,22 +927,39 @@ impl Client where BlockOrigin::Genesis | BlockOrigin::NetworkInitialSync | BlockOrigin::File => false, }; - self.backend.begin_state_operation(&mut operation.op, BlockId::Hash(parent_hash))?; + let storage_changes = match &body { + Some(body) => { + self.backend.begin_state_operation(&mut operation.op, BlockId::Hash(parent_hash))?; + + // ensure parent block is finalized to maintain invariant that + // finality is called sequentially. + if finalized { + self.apply_finality_with_block_hash(operation, parent_hash, None, info.best_hash, make_notifications)?; + } - // ensure parent block is finalized to maintain invariant that - // finality is called sequentially. - if finalized { - self.apply_finality_with_block_hash(operation, parent_hash, None, info.best_hash, make_notifications)?; - } + // FIXME #1232: correct path logic for when to execute this function + let (storage_update, changes_update, storage_changes) = self.block_execution( + &operation.op, + &import_headers, + origin, + hash, + &body, + )?; - // FIXME #1232: correct path logic for when to execute this function - let (storage_update, changes_update, storage_changes) = self.block_execution( - &operation.op, - &import_headers, - origin, - hash, - body.clone(), - )?; + operation.op.update_cache(new_cache); + if let Some(storage_update) = storage_update { + operation.op.update_db_storage(storage_update)?; + } + if let Some(storage_changes) = storage_changes.clone() { + operation.op.update_storage(storage_changes.0, storage_changes.1)?; + } + if let Some(Some(changes_update)) = changes_update { + operation.op.update_changes_trie(changes_update)?; + } + storage_changes + }, + None => Default::default() + }; let is_new_best = finalized || match fork_choice { ForkChoiceStrategy::LongestChain => import_headers.post().number() > &info.best_number, @@ -977,17 +994,6 @@ impl Client where leaf_state, )?; - operation.op.update_cache(new_cache); - if let Some(storage_update) = storage_update { - operation.op.update_db_storage(storage_update)?; - } - if let Some(storage_changes) = storage_changes.clone() { - operation.op.update_storage(storage_changes.0, storage_changes.1)?; - } - if let Some(Some(changes_update)) = changes_update { - operation.op.update_changes_trie(changes_update)?; - } - operation.op.insert_aux(aux)?; if make_notifications { @@ -1014,7 +1020,7 @@ impl Client where import_headers: &PrePostHeader, origin: BlockOrigin, hash: Block::Hash, - body: Option>, + body: &[Block::Extrinsic], ) -> error::Result<( Option>, Option>>, @@ -1052,7 +1058,7 @@ impl Client where let encoded_block = ::encode_from( import_headers.pre(), - &body.unwrap_or_default() + body, ); let (_, storage_update, changes_update) = self.executor @@ -1523,7 +1529,7 @@ impl<'a, B, E, Block, RA> consensus::BlockImport for &'a Client, ) -> Result { - let BlockCheckParams { hash, number, parent_hash } = block; + let BlockCheckParams { hash, number, parent_hash, header_only } = block; if let Some(h) = self.fork_blocks.as_ref().and_then(|x| x.get(&number)) { if &hash != h { @@ -1541,7 +1547,9 @@ impl<'a, B, E, Block, RA> consensus::BlockImport for &'a Client {}, - BlockStatus::Unknown | BlockStatus::InChainPruned => return Ok(ImportResult::UnknownParent), + BlockStatus::Unknown => return Ok(ImportResult::UnknownParent), + BlockStatus::InChainPruned if header_only => {}, + BlockStatus::InChainPruned => return Ok(ImportResult::MissingState), BlockStatus::KnownBad => return Ok(ImportResult::KnownBad), } @@ -1553,7 +1561,6 @@ impl<'a, B, E, Block, RA> consensus::BlockImport for &'a Client return Ok(ImportResult::KnownBad), } - Ok(ImportResult::imported(false)) } } diff --git a/core/consensus/common/src/block_import.rs b/core/consensus/common/src/block_import.rs index 4342ee38df10a..fa28cc319d764 100644 --- a/core/consensus/common/src/block_import.rs +++ b/core/consensus/common/src/block_import.rs @@ -35,11 +35,15 @@ pub enum ImportResult { KnownBad, /// Block parent is not in the chain. UnknownParent, + /// Parent state is missing. + MissingState, } /// Auxiliary data associated with an imported block result. #[derive(Debug, Default, PartialEq, Eq)] pub struct ImportedAux { + /// Only the header has been imported. Block body verification was skipped. + pub header_only: bool, /// Clear all pending justification requests. pub clear_justification_requests: bool, /// Request a justification for the given block. @@ -98,6 +102,8 @@ pub struct BlockCheckParams { pub number: NumberFor, /// Parent hash of the block that we verify. pub parent_hash: Block::Hash, + /// Don't check state availability + pub header_only: bool, } /// Data required to import a Block. diff --git a/core/consensus/common/src/import_queue.rs b/core/consensus/common/src/import_queue.rs index dc1678fcf189d..08581e2cf9419 100644 --- a/core/consensus/common/src/import_queue.rs +++ b/core/consensus/common/src/import_queue.rs @@ -203,6 +203,10 @@ pub fn import_single_block>( Ok(BlockImportResult::ImportedKnown(number)) }, Ok(ImportResult::Imported(aux)) => Ok(BlockImportResult::ImportedUnknown(number, aux, peer.clone())), + Ok(ImportResult::MissingState) => { + debug!(target: "sync", "Parent state is missing for {}: {:?}, parent: {:?}", number, hash, parent_hash); + Err(BlockImportError::UnknownParent) + }, Ok(ImportResult::UnknownParent) => { debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent_hash); Err(BlockImportError::UnknownParent) @@ -217,7 +221,12 @@ pub fn import_single_block>( } } }; - match import_error(import_handle.check_block(BlockCheckParams { hash, number, parent_hash }))? { + match import_error(import_handle.check_block(BlockCheckParams { + hash, + number, + parent_hash, + header_only: block.body.is_none(), + }))? { BlockImportResult::ImportedUnknown { .. } => (), r => return Ok(r), // Any other successful result means that the block is already imported. } diff --git a/core/finality-grandpa/src/light_import.rs b/core/finality-grandpa/src/light_import.rs index 30af3a06d3f76..2f4c9b6b25437 100644 --- a/core/finality-grandpa/src/light_import.rs +++ b/core/finality-grandpa/src/light_import.rs @@ -680,6 +680,7 @@ pub mod tests { bad_justification: false, needs_finality_proof: false, is_new_best: true, + header_only: false, })); } @@ -692,6 +693,7 @@ pub mod tests { bad_justification: false, needs_finality_proof: false, is_new_best: true, + header_only: false, })); } @@ -705,6 +707,7 @@ pub mod tests { bad_justification: false, needs_finality_proof: true, is_new_best: true, + header_only: false, })); } @@ -721,6 +724,7 @@ pub mod tests { bad_justification: false, needs_finality_proof: true, is_new_best: false, + header_only: false, }, )); } diff --git a/core/finality-grandpa/src/tests.rs b/core/finality-grandpa/src/tests.rs index 2339379a609d4..10ebe9ea0cc5d 100644 --- a/core/finality-grandpa/src/tests.rs +++ b/core/finality-grandpa/src/tests.rs @@ -984,6 +984,7 @@ fn allows_reimporting_change_blocks() { bad_justification: false, needs_finality_proof: false, is_new_best: true, + header_only: false, }), ); diff --git a/core/network/src/protocol/sync.rs b/core/network/src/protocol/sync.rs index 34bc68f933686..8c30c2ce873f9 100644 --- a/core/network/src/protocol/sync.rs +++ b/core/network/src/protocol/sync.rs @@ -158,6 +158,7 @@ pub struct PeerInfo { } struct ForkTarget { + header_only: bool, number: NumberFor, parent_hash: Option, peers: HashSet, @@ -480,13 +481,7 @@ impl ChainSync { return; } - let block_status = self.client.block_status(&BlockId::Number(number - One::one())) - .unwrap_or(BlockStatus::Unknown); - if block_status == BlockStatus::InChainPruned { - trace!(target: "sync", "Refusing to sync ancient block {:?}", hash); - return; - } - + trace!(target: "sync", "Downloading requested old fork {:?}", hash); self.is_idle = false; for peer_id in &peers { if let Some(peer) = self.peers.get_mut(peer_id) { @@ -507,6 +502,7 @@ impl ChainSync { number, peers: Default::default(), parent_hash: None, + header_only: true, }) .peers.extend(peers); } @@ -571,7 +567,7 @@ impl ChainSync { let major_sync = self.status().state == SyncState::Downloading; let blocks = &mut self.blocks; let attrs = &self.required_block_attributes; - let fork_targets = &self.fork_targets; + let fork_targets = &mut self.fork_targets; let mut have_requests = false; let last_finalized = self.client.info().chain.finalized_number; let best_queued = self.best_queued_number; @@ -662,10 +658,10 @@ impl ChainSync { }).collect() } PeerSyncState::AncestorSearch(num, state) => { - let block_hash_match = match (blocks.get(0), self.client.block_hash(*num)) { + let matching_hash = match (blocks.get(0), self.client.block_hash(*num)) { (Some(block), Ok(maybe_our_block_hash)) => { trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", num, block.hash, who); - maybe_our_block_hash.map_or(false, |x| x == block.hash) + maybe_our_block_hash.filter(|x| x == &block.hash) }, (None, _) => { debug!(target: "sync", "Invalid response when searching for ancestor from {}", who); @@ -676,27 +672,34 @@ impl ChainSync { return Err(BadPeer(who, ANCESTRY_BLOCK_ERROR_REPUTATION_CHANGE)) } }; - if block_hash_match && peer.common_number < *num { + if matching_hash.is_some() && peer.common_number < *num { peer.common_number = *num; } - if !block_hash_match && num.is_zero() { + if matching_hash.is_none() && num.is_zero() { trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who); return Err(BadPeer(who, GENESIS_MISMATCH_REPUTATION_CHANGE)) } - if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *num, block_hash_match) { + if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *num, matching_hash.is_some()) { peer.state = PeerSyncState::AncestorSearch(next_num, next_state); return Ok(OnBlockData::Request(who, ancestry_request::(next_num))) } else { // Ancestry search is complete. Check if peer is on a stale fork unknown to us and // add it to sync targets if necessary. - trace!(target: "sync", "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={}", + trace!(target: "sync", "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})", self.best_queued_hash, self.best_queued_number, peer.best_hash, peer.best_number, - peer.common_number + matching_hash, + peer.common_number, ); - if peer.common_number < peer.best_number && peer.best_number < self.best_queued_number { + let client = &self.client; + if peer.common_number < peer.best_number + && peer.best_number < self.best_queued_number + && matching_hash.and_then( + |h| client.block_status(&BlockId::Hash(h)).ok() + ).unwrap_or(BlockStatus::Unknown) != BlockStatus::InChainPruned + { trace!(target: "sync", "Added fork target {} for {}" , peer.best_hash, who); self.fork_targets .entry(peer.best_hash.clone()) @@ -704,6 +707,7 @@ impl ChainSync { number: peer.best_number, parent_hash: None, peers: Default::default(), + header_only: false, }) .peers.insert(who); } @@ -1085,6 +1089,7 @@ impl ChainSync { number, parent_hash: Some(header.parent_hash().clone()), peers: Default::default(), + header_only: false, }) .peers.insert(who); } @@ -1250,28 +1255,35 @@ fn peer_block_request( /// Get pending fork sync targets for a peer. fn fork_sync_request( id: &PeerId, - targets: &HashMap>, + targets: &mut HashMap>, best_num: NumberFor, finalized: NumberFor, attributes: &message::BlockAttributes, check_block: impl Fn(&B::Hash) -> BlockStatus, ) -> Option<(B::Hash, BlockRequest)> { + targets.retain(|hash, r| if r.number > finalized { + true + } else { + trace!(target: "sync", "Removed expired fork sync request {:?} (#{})", hash, r.number); + false + }); for (hash, r) in targets { if !r.peers.contains(id) { continue } if r.number <= best_num { - trace!(target: "sync", "Downloading requested fork {:?} from {}", hash, id); let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block); let mut count = (r.number - finalized).saturated_into::(); // up to the last finalized block if parent_status != BlockStatus::Unknown { // request only single block count = 1; } + let attributes = if r.header_only { BlockAttributes::HEADER } else { attributes.clone() }; + trace!(target: "sync", "Downloading requested fork {:?} from {}, {} blocks", hash, id, count); return Some((hash.clone(), message::generic::BlockRequest { id: 0, - fields: attributes.clone(), + fields: attributes, from: message::FromBlock::Hash(hash.clone()), to: None, direction: message::Direction::Descending, diff --git a/core/network/src/test/block_import.rs b/core/network/src/test/block_import.rs index f2830548a501a..4a4d7a5d4aa05 100644 --- a/core/network/src/test/block_import.rs +++ b/core/network/src/test/block_import.rs @@ -37,7 +37,7 @@ fn prepare_good_block() -> (TestClient, Hash, u64, PeerId, IncomingBlock) (client, hash, number, peer_id.clone(), IncomingBlock { hash, header, - body: None, + body: Some(Vec::new()), justification, origin: Some(peer_id.clone()) }) @@ -53,7 +53,7 @@ fn import_single_good_block_works() { match import_single_block(&mut test_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier(true)) { Ok(BlockImportResult::ImportedUnknown(ref num, ref aux, ref org)) if *num == number && *aux == expected_aux && *org == Some(peer_id) => {} - _ => panic!() + r @ _ => panic!("{:?}", r) } } diff --git a/core/network/src/test/mod.rs b/core/network/src/test/mod.rs index 0c50179f10ace..67c06bb2d4a05 100644 --- a/core/network/src/test/mod.rs +++ b/core/network/src/test/mod.rs @@ -374,16 +374,10 @@ impl> Peer { } } - /// Count the current number of known blocks. Note that: - /// 1. this might be expensive as it creates an in-memory-copy of the chain - /// to count the blocks, thus if you have a different way of testing this - /// (e.g. `info.best_hash`) - use that. - /// 2. This is not always increasing nor accurate, as the - /// orphaned and proven-to-never-finalized blocks may be pruned at any time. - /// Therefore, this number can drop again. - pub fn blocks_count(&self) -> usize { + /// Count the total number of imported blocks. + pub fn blocks_count(&self) -> u64 { self.backend.as_ref().map( - |backend| backend.as_in_memory().blockchain().blocks_count() + |backend| backend.blocks_count() ).unwrap_or(0) } } @@ -519,9 +513,16 @@ pub trait TestNetFactory: Sized { net } - /// Add a full peer. fn add_full_peer(&mut self, config: &ProtocolConfig) { - let test_client_builder = TestClientBuilder::with_default_backend(); + self.add_full_peer_with_states(config, None) + } + + /// Add a full peer. + fn add_full_peer_with_states(&mut self, config: &ProtocolConfig, keep_blocks: Option) { + let test_client_builder = match keep_blocks { + Some(keep_blocks) => TestClientBuilder::with_pruning_window(keep_blocks), + None => TestClientBuilder::with_default_backend(), + }; let backend = test_client_builder.backend(); let (c, longest_chain) = test_client_builder.build_with_longest_chain(); let client = Arc::new(c); @@ -679,7 +680,7 @@ pub trait TestNetFactory: Sized { if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 { return Async::NotReady } - match (highest, peer.client.info().chain.best_number) { + match (highest, peer.client.info().chain.best_hash) { (None, b) => highest = Some(b), (Some(ref a), ref b) if a == b => {}, (Some(_), _) => return Async::NotReady, diff --git a/core/network/src/test/sync.rs b/core/network/src/test/sync.rs index b1b2b9d407262..dd9185373f062 100644 --- a/core/network/src/test/sync.rs +++ b/core/network/src/test/sync.rs @@ -236,7 +236,14 @@ fn sync_no_common_longer_chain_fails() { let mut net = TestNet::new(3); net.peer(0).push_blocks(20, true); net.peer(1).push_blocks(20, false); - net.block_until_sync(&mut runtime); + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> { + net.poll(); + if net.peer(0).is_major_syncing() { + Ok(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + })).unwrap(); let peer1 = &net.peers()[1]; assert!(!net.peers()[0].blockchain_canon_equals(peer1)); } @@ -592,3 +599,37 @@ fn can_sync_explicit_forks() { Ok(Async::Ready(())) })).unwrap(); } + +#[test] +fn syncs_header_only_forks() { + let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); + let mut net = TestNet::new(0); + let config = ProtocolConfig::default(); + net.add_full_peer_with_states(&config, None); + net.add_full_peer_with_states(&config, Some(3)); + net.peer(0).push_blocks(2, false); + net.peer(1).push_blocks(2, false); + + net.peer(0).push_blocks(2, true); + let small_hash = net.peer(0).client().info().chain.best_hash; + let small_number = net.peer(0).client().info().chain.best_number; + net.peer(1).push_blocks(4, false); + + net.block_until_sync(&mut runtime); + // Peer 1 won't sync the small fork because common block state is missing + assert_eq!(9, net.peer(0).blocks_count()); + assert_eq!(7, net.peer(1).blocks_count()); + + // Request explicit header-only sync request for the ancient fork. + let first_peer_id = net.peer(0).id(); + net.peer(1).set_sync_fork_request(vec![first_peer_id], small_hash, small_number); + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> { + net.poll(); + if net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none() { + return Ok(Async::NotReady) + } + Ok(Async::Ready(())) + })).unwrap(); +} + diff --git a/core/test-client/src/lib.rs b/core/test-client/src/lib.rs index dbe4431456a74..a075caec3143d 100644 --- a/core/test-client/src/lib.rs +++ b/core/test-client/src/lib.rs @@ -98,6 +98,12 @@ impl TestClientBuilder< pub fn backend(&self) -> Arc> { self.backend.clone() } + + /// Create new `TestClientBuilder` with default backend and pruning window size + pub fn with_pruning_window(keep_blocks: u32) -> Self { + let backend = Arc::new(Backend::new_test(keep_blocks, 0)); + Self::with_backend(backend) + } } impl TestClientBuilder {