diff --git a/filecoin-proofs/src/api/post_util.rs b/filecoin-proofs/src/api/post_util.rs index 0960b2001..3a299c344 100644 --- a/filecoin-proofs/src/api/post_util.rs +++ b/filecoin-proofs/src/api/post_util.rs @@ -147,7 +147,7 @@ pub fn generate_single_vanilla_proof( let comm_c = replica.safe_comm_c(); let comm_r_last = replica.safe_comm_r_last(); - let mut priv_sectors = vec![fallback::PrivateSector { + let priv_sectors = vec![fallback::PrivateSector { tree, comm_c, comm_r_last, diff --git a/filecoin-proofs/src/api/seal.rs b/filecoin-proofs/src/api/seal.rs index a898e0e49..376162e39 100644 --- a/filecoin-proofs/src/api/seal.rs +++ b/filecoin-proofs/src/api/seal.rs @@ -137,13 +137,12 @@ where base_tree_leafs, ); - // MT for original data is always named tree-d, and it will be - // referenced later in the process as such. let mut config = StoreConfig::new( cache_path.as_ref(), CacheKey::CommDTree.to_string(), default_rows_to_discard(base_tree_leafs, BINARY_ARITY), ); + let data_tree = create_base_merkle_tree::>( Some(config.clone()), base_tree_leafs, diff --git a/storage-proofs-core/src/compound_proof.rs b/storage-proofs-core/src/compound_proof.rs index 11bfbae86..524d046d4 100644 --- a/storage-proofs-core/src/compound_proof.rs +++ b/storage-proofs-core/src/compound_proof.rs @@ -264,7 +264,7 @@ where groth_proofs .into_iter() .map(|groth_proof| { - let mut proof_vec = vec![]; + let mut proof_vec = Vec::new(); groth_proof.write(&mut proof_vec)?; let gp = groth16::Proof::::read(&proof_vec[..])?; Ok(gp) diff --git a/storage-proofs-porep/src/stacked/vanilla/memory_handling.rs b/storage-proofs-porep/src/stacked/vanilla/memory_handling.rs index 327e4f963..a404c29e5 100644 --- a/storage-proofs-porep/src/stacked/vanilla/memory_handling.rs +++ b/storage-proofs-porep/src/stacked/vanilla/memory_handling.rs @@ -3,7 +3,7 @@ use std::fs::File; use std::hint::spin_loop; use std::marker::{PhantomData, Sync}; use std::mem::size_of; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::slice; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; @@ -30,6 +30,16 @@ struct IncrementingCursor { cur_safe: AtomicUsize, } +fn compare_and_swap(atomic: &AtomicUsize, before: usize, after: usize) -> usize { + match atomic.compare_exchange_weak(before, after, Ordering::SeqCst, Ordering::SeqCst) { + Ok(x) => { + assert_eq!(x, before); + before + } + _ => after, + } +} + /// IncrementingCursor provides an atomic variable which can be incremented such that only one thread attempting the /// increment is selected to perform actions required to effect the transition. Unselected threads wait until the /// transition has completed. Transition and wait condition are both specified by closures supplied by the caller. @@ -40,15 +50,17 @@ impl IncrementingCursor { cur_safe: AtomicUsize::new(val), } } + fn store(&self, val: usize) { self.cur.store(val, Ordering::SeqCst); self.cur_safe.store(val, Ordering::SeqCst); } + fn compare_and_swap(&self, before: usize, after: usize) { - self.cur.compare_and_swap(before, after, Ordering::SeqCst); - self.cur_safe - .compare_and_swap(before, after, Ordering::SeqCst); + compare_and_swap(&self.cur, before, after); + compare_and_swap(&self.cur_safe, before, after); } + fn increment bool, G: Fn()>(&self, target: usize, wait_fn: F, advance_fn: G) { // Check using `cur_safe`, to ensure we wait until the current cursor value is safe to use. // If we were to instead check `cur`, it could have been incremented but not yet safe. @@ -56,8 +68,7 @@ impl IncrementingCursor { if target > cur { // Only one producer will successfully increment `cur`. We need this second atomic because we cannot // increment `cur_safe` until after the underlying resource has been advanced. - let instant_cur = self.cur.compare_and_swap(cur, cur + 1, Ordering::SeqCst); - + let instant_cur = compare_and_swap(&self.cur, cur, cur + 1); if instant_cur == cur { // We successfully incremented `self.cur`, so we are responsible for advancing the resource. { @@ -132,9 +143,11 @@ impl CacheReader { pub unsafe fn increment_consumer(&self) { self.consumer.fetch_add(1, Ordering::SeqCst); } + pub fn store_consumer(&self, val: u64) { self.consumer.store(val, Ordering::SeqCst); } + pub fn get_consumer(&self) -> u64 { self.consumer.load(Ordering::SeqCst) } @@ -164,6 +177,7 @@ impl CacheReader { bufs[0] = buf0; Ok(()) } + pub fn finish_reset(&self) -> Result<()> { let buf1 = Self::map_buf(self.window_size as u64, self.window_size, &self.file)?; let bufs = unsafe { self.get_mut_bufs() }; diff --git a/storage-proofs-porep/src/stacked/vanilla/proof.rs b/storage-proofs-porep/src/stacked/vanilla/proof.rs index 84c960dca..9189d6d7c 100644 --- a/storage-proofs-porep/src/stacked/vanilla/proof.rs +++ b/storage-proofs-porep/src/stacked/vanilla/proof.rs @@ -103,9 +103,12 @@ impl<'a, Tree: 'static + MerkleTreeTrait, G: 'static + Hasher> StackedDrg<'a, Tr let mut parents = vec![0; base_degree]; graph.base_parents(x, &mut parents)?; - for parent in &parents { - columns.push(t_aux.column(*parent)?); - } + columns.extend( + parents + .into_par_iter() + .map(|parent| t_aux.column(parent)) + .collect::>>>()?, + ); debug_assert!(columns.len() == base_degree); @@ -116,7 +119,10 @@ impl<'a, Tree: 'static + MerkleTreeTrait, G: 'static + Hasher> StackedDrg<'a, Tr let mut parents = vec![0; graph.expansion_degree()]; graph.expanded_parents(x, &mut parents)?; - parents.iter().map(|parent| t_aux.column(*parent)).collect() + parents + .into_par_iter() + .map(|parent| t_aux.column(parent)) + .collect() }; (0..partition_count) @@ -194,7 +200,7 @@ impl<'a, Tree: 'static + MerkleTreeTrait, G: 'static + Hasher> StackedDrg<'a, Tr graph.base_parents(challenge, &mut parents)?; parents - .into_iter() + .into_par_iter() .map(|parent| t_aux.domain_node_at_layer(layer, parent)) .collect::>()? } else { @@ -203,7 +209,7 @@ impl<'a, Tree: 'static + MerkleTreeTrait, G: 'static + Hasher> StackedDrg<'a, Tr let base_parents_count = graph.base_graph().degree(); parents - .into_iter() + .into_par_iter() .enumerate() .map(|(i, parent)| { if i < base_parents_count { @@ -502,25 +508,18 @@ impl<'a, Tree: 'static + MerkleTreeTrait, G: 'static + Hasher> StackedDrg<'a, Tr layers ]; - rayon::scope(|s| { - // capture a shadowed version of layer_data. - let layer_data: &mut Vec<_> = &mut layer_data; - - // gather all layer data in parallel. - s.spawn(move |_| { - for (layer_index, mut layer_bytes) in - layer_data.iter_mut().enumerate() - { - let store = labels.labels_for_layer(layer_index + 1); - let start = (i * nodes_count) + node_index; - let end = start + chunked_nodes_count; - - store - .read_range_into(start, end, &mut layer_bytes) - .expect("failed to read store range"); - } - }); - }); + // gather all layer data. + for (layer_index, mut layer_bytes) in + layer_data.iter_mut().enumerate() + { + let store = labels.labels_for_layer(layer_index + 1); + let start = (i * nodes_count) + node_index; + let end = start + chunked_nodes_count; + + store + .read_range_into(start, end, &mut layer_bytes) + .expect("failed to read store range"); + } (0..chunked_nodes_count) .into_par_iter()