Skip to content

Commit

Permalink
Improve multicore SDR (#1501)
Browse files Browse the repository at this point in the history
* refactor(storage-proofs-porep): use slice indexing instead of fake pointers

* fix(storage-proofs-porep): select optimal parents in multicore sdr

Based on #1477
Thanks to @qy3u for finding

* fix(storage-proofs-porep): actually clear base_parents_missing
  • Loading branch information
dignifiedquire authored Sep 9, 2021
1 parent e4aed82 commit ff7daa9
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 18 deletions.
43 changes: 25 additions & 18 deletions storage-proofs-porep/src/stacked/vanilla/create_label/multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use generic_array::{
use log::{debug, info};
use mapr::MmapMut;
use merkletree::store::{DiskStore, Store, StoreConfig};
use storage_proofs_core::api_version::ApiVersion;
use storage_proofs_core::{
cache_key::CacheKey,
drgraph::{Graph, BASE_DEGREE},
Expand Down Expand Up @@ -54,14 +55,16 @@ const SHA256_INITIAL_DIGEST: [u32; 8] = [
];

#[inline]
#[allow(clippy::too_many_arguments)]
fn fill_buffer(
cur_node: u64,
parents_cache: &CacheReader<u32>,
mut cur_parent: &[u32], // parents for this node
cur_parent: &[u32], // parents for this node
layer_labels: &UnsafeSlice<'_, u32>,
exp_labels: Option<&UnsafeSlice<'_, u32>>, // None for layer0
buf: &mut [u8],
base_parent_missing: &mut BitMask,
api_version: ApiVersion,
) {
let cur_node_swap = cur_node.to_be_bytes(); // Note switch to big endian
buf[36..44].copy_from_slice(&cur_node_swap); // update buf with current node
Expand All @@ -74,50 +77,46 @@ fn fill_buffer(
compress256!(cur_node_ptr, buf, 1);

// Fill in the base parents
// Node 5 (prev node) will always be missing, and there tend to be
// frequent close references.
let (predecessor_index, other_drg_parents) = match api_version {
ApiVersion::V1_0_0 => (BASE_DEGREE - 1, (0..BASE_DEGREE - 1)),
ApiVersion::V1_1_0 => (0, (1..BASE_DEGREE)),
};

if cur_node > MIN_BASE_PARENT_NODE {
// Mark base parent 5 as missing
// base_parent_missing.set_all(0x20);
base_parent_missing.set(5);
// Mark base parent predecessor as missing
base_parent_missing.clear();
base_parent_missing.set(predecessor_index);

// Skip the last base parent - it always points to the preceding node,
// which we know is not ready and will be filled in the main loop
for k in 0..BASE_DEGREE - 1 {
for k in other_drg_parents {
unsafe {
if cur_parent[0] as u64 >= parents_cache.get_consumer() {
if cur_parent[k] as u64 >= parents_cache.get_consumer() {
// Node is not ready
base_parent_missing.set(k);
} else {
let parent_data = {
let offset = cur_parent[0] as usize * NODE_WORDS;
let offset = cur_parent[k] as usize * NODE_WORDS;
&layer_labels.as_slice()[offset..offset + NODE_WORDS]
};
let a = SHA_BLOCK_SIZE + (NODE_SIZE * k);
buf[a..a + NODE_SIZE].copy_from_slice(parent_data.as_byte_slice());
};

// Advance pointer for the last base parent
cur_parent = &cur_parent[1..];
}
}
// Advance pointer for the last base parent
cur_parent = &cur_parent[1..];
} else {
base_parent_missing.set_upto(BASE_DEGREE as u8);
cur_parent = &cur_parent[BASE_DEGREE..];
}

if let Some(exp_labels) = exp_labels {
// Read from each of the expander parent nodes
for k in BASE_DEGREE..DEGREE {
for (k, parent) in cur_parent.iter().enumerate().take(DEGREE).skip(BASE_DEGREE) {
let parent_data = unsafe {
let offset = cur_parent[0] as usize * NODE_WORDS;
let offset = *parent as usize * NODE_WORDS;
&exp_labels.as_slice()[offset..offset + NODE_WORDS]
};
let a = SHA_BLOCK_SIZE + (NODE_SIZE * k);
buf[a..a + NODE_SIZE].copy_from_slice(parent_data.as_byte_slice());
cur_parent = &cur_parent[1..];
}
}
}
Expand Down Expand Up @@ -149,6 +148,7 @@ fn create_label_runner(
lookahead: u64,
ring_buf: &RingBuf,
base_parent_missing: &UnsafeSlice<'_, BitMask>,
api_version: ApiVersion,
) {
info!("created label runner");
// Label data bytes per node
Expand Down Expand Up @@ -187,6 +187,7 @@ fn create_label_runner(
exp_labels,
buf,
bpm,
api_version,
);
}

Expand All @@ -200,6 +201,7 @@ fn create_label_runner(
}
}

#[allow(clippy::too_many_arguments)]
fn create_layer_labels(
parents_cache: &CacheReader<u32>,
replica_id: &[u8],
Expand All @@ -208,6 +210,7 @@ fn create_layer_labels(
num_nodes: u64,
cur_layer: u32,
core_group: Arc<Option<MutexGuard<'_, Vec<CoreIndex>>>>,
api_version: ApiVersion,
) {
info!("Creating labels for layer {}", cur_layer);
// num_producers is the number of producer threads
Expand Down Expand Up @@ -284,6 +287,7 @@ fn create_layer_labels(
lookahead as u64,
ring_buf,
base_parent_missing,
api_version,
)
}));
}
Expand Down Expand Up @@ -344,6 +348,7 @@ fn create_layer_labels(
cur_node_ptr = &mut cur_node_ptr[8..];
// Grab the current slot of the ring_buf
let buf = unsafe { ring_buf.slot_mut(cur_slot) };

// Fill in the base parents
for k in 0..BASE_DEGREE {
let bpm = unsafe { base_parent_missing.get(cur_slot) };
Expand Down Expand Up @@ -499,6 +504,7 @@ pub fn create_labels_for_encoding<Tree: 'static + MerkleTreeTrait, T: AsRef<[u8]
node_count,
layer as u32,
core_group.clone(),
graph.api_version(),
);

// Cache reset happens in two parts.
Expand Down Expand Up @@ -589,6 +595,7 @@ pub fn create_labels_for_decoding<Tree: 'static + MerkleTreeTrait, T: AsRef<[u8]
node_count,
layer as u32,
core_group.clone(),
graph.api_version(),
);

// Cache reset happens in two parts.
Expand Down
5 changes: 5 additions & 0 deletions storage-proofs-porep/src/stacked/vanilla/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,11 @@ where
// round 7 (37)
hasher.finish_with(parents[0])
}

/// Returns the current `ApiVersion`.
pub fn api_version(&self) -> ApiVersion {
self.api_version
}
}

impl<H, G> ParameterSetMetadata for StackedGraph<H, G>
Expand Down
5 changes: 5 additions & 0 deletions storage-proofs-porep/src/stacked/vanilla/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ impl BitMask {
pub fn get(self, i: usize) -> bool {
self.0 & (1 << i) != 0
}

#[inline]
pub fn clear(&mut self) {
self.0 = 0;
}
}

#[derive(Debug)]
Expand Down

0 comments on commit ff7daa9

Please sign in to comment.