This repository has been archived by the owner on Nov 6, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Multithreaded snapshot creation #9239
Merged
Merged
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
f432170
Add Progress to Snapshot Secondary chunks creation
ngotchac 4418342
Use half of CPUs to multithread snapshot creation
ngotchac 422d214
Use env var to define number of threads
ngotchac 5242cd5
info to debug logs
ngotchac 9d44c47
Add Snapshot threads as CLI option
ngotchac 8f3ece7
Randomize chunks per thread
ngotchac d278cb9
Remove randomness, add debugging
ngotchac 4ee58d2
Add warning
ngotchac db4f5d5
Add tracing
ngotchac 603d265
Use parity-common fix seek branch
ngotchac 73b8858
Fix log
ngotchac 4c68b1c
Fix tests
ngotchac 680cef5
Fix tests
ngotchac 17f49d9
Merge branch 'master' into ng-multithread-snapshot-creation
ngotchac 14eda40
Merge branch 'master' into ng-multithread-snapshot-creation
ngotchac cd5e93d
Merge branch 'master' into ng-multithread-snapshot-creation
ngotchac 09a9076
PR Grumbles
ngotchac 4304c23
Merge branch 'master' into ng-multithread-snapshot-creation
ngotchac 4e265cb
PR Grumble II
ngotchac 7670562
Merge branch 'master' into ng-multithread-snapshot-creation
ngotchac 4c8e900
Merge branch 'master' into ng-multithread-snapshot-creation
ngotchac 8dff8f1
Update Cargo.lock
ngotchac 3763dd0
PR Grumbles
ngotchac 087468a
Default snapshot threads to half number of CPUs
ngotchac ceabd5c
Fix default snapshot threads // min 1
ngotchac 1610154
Merge branch 'master' into ng-multithread-snapshot-creation
ngotchac d8c10b7
Merge branch 'master' into ng-multithread-snapshot-creation
ngotchac File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
//! https://wiki.parity.io/Warp-Sync-Snapshot-Format | ||
|
||
use std::collections::{HashMap, HashSet}; | ||
use std::cmp; | ||
use std::sync::Arc; | ||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; | ||
use hash::{keccak, KECCAK_NULL_RLP, KECCAK_EMPTY}; | ||
|
@@ -43,6 +44,7 @@ use trie::{Trie, TrieMut}; | |
use ethtrie::{TrieDB, TrieDBMut}; | ||
use rlp::{RlpStream, Rlp}; | ||
use bloom_journal::Bloom; | ||
use num_cpus; | ||
|
||
use self::io::SnapshotWriter; | ||
|
||
|
@@ -88,6 +90,28 @@ const MAX_CHUNK_SIZE: usize = PREFERRED_CHUNK_SIZE / 4 * 5; | |
const MIN_SUPPORTED_STATE_CHUNK_VERSION: u64 = 1; | ||
// current state chunk version. | ||
const STATE_CHUNK_VERSION: u64 = 2; | ||
/// number of snapshot subparts, must be a power of 2 in [1; 256] | ||
const SNAPSHOT_SUBPARTS: usize = 16; | ||
/// Maximum number of snapshot subparts (must be a multiple of `SNAPSHOT_SUBPARTS`) | ||
const MAX_SNAPSHOT_SUBPARTS: usize = 256; | ||
|
||
/// Configuration for the Snapshot service | ||
#[derive(Debug, Clone, PartialEq)] | ||
pub struct SnapshotConfiguration { | ||
/// If `true`, no periodic snapshots will be created | ||
pub no_periodic: bool, | ||
/// Number of threads for creating snapshots | ||
pub processing_threads: usize, | ||
} | ||
|
||
impl Default for SnapshotConfiguration { | ||
fn default() -> Self { | ||
SnapshotConfiguration { | ||
no_periodic: false, | ||
processing_threads: ::std::cmp::max(1, num_cpus::get() / 2), | ||
} | ||
} | ||
} | ||
|
||
/// A progress indicator for snapshots. | ||
#[derive(Debug, Default)] | ||
|
@@ -130,7 +154,8 @@ pub fn take_snapshot<W: SnapshotWriter + Send>( | |
block_at: H256, | ||
state_db: &HashDB<KeccakHasher>, | ||
writer: W, | ||
p: &Progress | ||
p: &Progress, | ||
processing_threads: usize, | ||
) -> Result<(), Error> { | ||
let start_header = chain.block_header_data(&block_at) | ||
.ok_or(Error::InvalidStartingBlock(BlockId::Hash(block_at)))?; | ||
|
@@ -142,17 +167,45 @@ pub fn take_snapshot<W: SnapshotWriter + Send>( | |
let writer = Mutex::new(writer); | ||
let chunker = engine.snapshot_components().ok_or(Error::SnapshotsUnsupported)?; | ||
let snapshot_version = chunker.current_version(); | ||
let (state_hashes, block_hashes) = scope(|scope| { | ||
let (state_hashes, block_hashes) = scope(|scope| -> Result<(Vec<H256>, Vec<H256>), Error> { | ||
let writer = &writer; | ||
let block_guard = scope.spawn(move || chunk_secondary(chunker, chain, block_at, writer, p)); | ||
let state_res = chunk_state(state_db, &state_root, writer, p); | ||
|
||
state_res.and_then(|state_hashes| { | ||
block_guard.join().map(|block_hashes| (state_hashes, block_hashes)) | ||
}) | ||
// The number of threads must be between 1 and SNAPSHOT_SUBPARTS | ||
assert!(processing_threads >= 1, "Cannot use less than 1 threads for creating snapshots"); | ||
let num_threads: usize = cmp::min(processing_threads, SNAPSHOT_SUBPARTS); | ||
info!(target: "snapshot", "Using {} threads for Snapshot creation.", num_threads); | ||
|
||
let mut state_guards = Vec::with_capacity(num_threads as usize); | ||
|
||
for thread_idx in 0..num_threads { | ||
let state_guard = scope.spawn(move || -> Result<Vec<H256>, Error> { | ||
let mut chunk_hashes = Vec::new(); | ||
|
||
for part in (thread_idx..SNAPSHOT_SUBPARTS).step_by(num_threads) { | ||
debug!(target: "snapshot", "Chunking part {} in thread {}", part, thread_idx); | ||
let mut hashes = chunk_state(state_db, &state_root, writer, p, Some(part))?; | ||
chunk_hashes.append(&mut hashes); | ||
} | ||
|
||
Ok(chunk_hashes) | ||
}); | ||
state_guards.push(state_guard); | ||
} | ||
|
||
let block_hashes = block_guard.join()?; | ||
let mut state_hashes = Vec::new(); | ||
|
||
for guard in state_guards { | ||
let part_state_hashes = guard.join()?; | ||
state_hashes.extend(part_state_hashes); | ||
} | ||
|
||
debug!(target: "snapshot", "Took a snapshot of {} accounts", p.accounts.load(Ordering::SeqCst)); | ||
Ok((state_hashes, block_hashes)) | ||
})?; | ||
|
||
info!("produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len()); | ||
info!(target: "snapshot", "produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len()); | ||
|
||
let manifest_data = ManifestData { | ||
version: snapshot_version, | ||
|
@@ -200,6 +253,7 @@ pub fn chunk_secondary<'a>(mut chunker: Box<SnapshotComponents>, chain: &'a Bloc | |
chain, | ||
start_hash, | ||
&mut chunk_sink, | ||
progress, | ||
PREFERRED_CHUNK_SIZE, | ||
)?; | ||
} | ||
|
@@ -263,10 +317,12 @@ impl<'a> StateChunker<'a> { | |
|
||
/// Walk the given state database starting from the given root, | ||
/// creating chunks and writing them out. | ||
/// `part` is a number between 0 and 15, which describe which part of | ||
This comment was marked as resolved.
Sorry, something went wrong. |
||
/// the tree should be chunked. | ||
/// | ||
/// Returns a list of hashes of chunks created, or any error it may | ||
/// have encountered. | ||
pub fn chunk_state<'a>(db: &HashDB<KeccakHasher>, root: &H256, writer: &Mutex<SnapshotWriter + 'a>, progress: &'a Progress) -> Result<Vec<H256>, Error> { | ||
pub fn chunk_state<'a>(db: &HashDB<KeccakHasher>, root: &H256, writer: &Mutex<SnapshotWriter + 'a>, progress: &'a Progress, part: Option<usize>) -> Result<Vec<H256>, Error> { | ||
ordian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let account_trie = TrieDB::new(db, &root)?; | ||
|
||
let mut chunker = StateChunker { | ||
|
@@ -281,11 +337,33 @@ pub fn chunk_state<'a>(db: &HashDB<KeccakHasher>, root: &H256, writer: &Mutex<Sn | |
let mut used_code = HashSet::new(); | ||
|
||
// account_key here is the address' hash. | ||
for item in account_trie.iter()? { | ||
let mut account_iter = account_trie.iter()?; | ||
|
||
let mut seek_to = None; | ||
|
||
if let Some(part) = part { | ||
assert!(part < 16, "Wrong chunk state part number (must be <16) in snapshot creation."); | ||
|
||
let part_offset = MAX_SNAPSHOT_SUBPARTS / SNAPSHOT_SUBPARTS; | ||
let mut seek_from = vec![0; 32]; | ||
seek_from[0] = (part * part_offset) as u8; | ||
account_iter.seek(&seek_from)?; | ||
|
||
// Set the upper-bound, except for the last part | ||
if part < SNAPSHOT_SUBPARTS - 1 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we could get rid of this if account_key[0] > seek_to {
break;
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But |
||
seek_to = Some(((part + 1) * part_offset) as u8) | ||
} | ||
} | ||
This comment was marked as resolved.
Sorry, something went wrong. |
||
|
||
for item in account_iter { | ||
let (account_key, account_data) = item?; | ||
let account = ::rlp::decode(&*account_data)?; | ||
let account_key_hash = H256::from_slice(&account_key); | ||
|
||
if seek_to.map_or(false, |seek_to| account_key[0] >= seek_to) { | ||
break; | ||
} | ||
|
||
let account = ::rlp::decode(&*account_data)?; | ||
let account_db = AccountDB::from_hash(db, account_key_hash); | ||
|
||
let fat_rlps = account::to_fat_rlps(&account_key_hash, &account, &account_db, &mut used_code, PREFERRED_CHUNK_SIZE - chunker.chunk_size(), PREFERRED_CHUNK_SIZE)?; | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can avoid cloning:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extend
expects a mutable reference :/There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, not sure what you mean: http://play.rust-lang.org/?gist=855f74c9c518bb8777179e8e3daee376&version=stable&mode=debug&edition=2015
https://doc.rust-lang.org/std/vec/struct.Vec.html#impl-Extend%3CT%3E
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oups, you're right, sorry.