Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve multicore SDR #1501

Merged
merged 3 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 25 additions & 18 deletions storage-proofs-porep/src/stacked/vanilla/create_label/multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use generic_array::{
use log::{debug, info};
use mapr::MmapMut;
use merkletree::store::{DiskStore, Store, StoreConfig};
use storage_proofs_core::api_version::ApiVersion;
use storage_proofs_core::{
cache_key::CacheKey,
drgraph::{Graph, BASE_DEGREE},
Expand Down Expand Up @@ -54,14 +55,16 @@ const SHA256_INITIAL_DIGEST: [u32; 8] = [
];

#[inline]
#[allow(clippy::too_many_arguments)]
fn fill_buffer(
cur_node: u64,
parents_cache: &CacheReader<u32>,
mut cur_parent: &[u32], // parents for this node
cur_parent: &[u32], // parents for this node
layer_labels: &UnsafeSlice<'_, u32>,
exp_labels: Option<&UnsafeSlice<'_, u32>>, // None for layer0
buf: &mut [u8],
base_parent_missing: &mut BitMask,
api_version: ApiVersion,
) {
let cur_node_swap = cur_node.to_be_bytes(); // Note switch to big endian
buf[36..44].copy_from_slice(&cur_node_swap); // update buf with current node
Expand All @@ -74,50 +77,46 @@ fn fill_buffer(
compress256!(cur_node_ptr, buf, 1);

// Fill in the base parents
// Node 5 (prev node) will always be missing, and there tend to be
// frequent close references.
let (predecessor_index, other_drg_parents) = match api_version {
ApiVersion::V1_0_0 => (BASE_DEGREE - 1, (0..BASE_DEGREE - 1)),
ApiVersion::V1_1_0 => (0, (1..BASE_DEGREE)),
};

if cur_node > MIN_BASE_PARENT_NODE {
// Mark base parent 5 as missing
// base_parent_missing.set_all(0x20);
base_parent_missing.set(5);
// Mark base parent predecessor as missing
base_parent_missing.clear();
base_parent_missing.set(predecessor_index);

// Skip the last base parent - it always points to the preceding node,
// which we know is not ready and will be filled in the main loop
for k in 0..BASE_DEGREE - 1 {
for k in other_drg_parents {
unsafe {
if cur_parent[0] as u64 >= parents_cache.get_consumer() {
if cur_parent[k] as u64 >= parents_cache.get_consumer() {
// Node is not ready
base_parent_missing.set(k);
} else {
let parent_data = {
let offset = cur_parent[0] as usize * NODE_WORDS;
let offset = cur_parent[k] as usize * NODE_WORDS;
&layer_labels.as_slice()[offset..offset + NODE_WORDS]
};
let a = SHA_BLOCK_SIZE + (NODE_SIZE * k);
buf[a..a + NODE_SIZE].copy_from_slice(parent_data.as_byte_slice());
};

// Advance pointer for the last base parent
cur_parent = &cur_parent[1..];
}
}
// Advance pointer for the last base parent
cur_parent = &cur_parent[1..];
} else {
base_parent_missing.set_upto(BASE_DEGREE as u8);
cur_parent = &cur_parent[BASE_DEGREE..];
}

if let Some(exp_labels) = exp_labels {
// Read from each of the expander parent nodes
for k in BASE_DEGREE..DEGREE {
for (k, parent) in cur_parent.iter().enumerate().take(DEGREE).skip(BASE_DEGREE) {
let parent_data = unsafe {
let offset = cur_parent[0] as usize * NODE_WORDS;
let offset = *parent as usize * NODE_WORDS;
&exp_labels.as_slice()[offset..offset + NODE_WORDS]
};
let a = SHA_BLOCK_SIZE + (NODE_SIZE * k);
buf[a..a + NODE_SIZE].copy_from_slice(parent_data.as_byte_slice());
cur_parent = &cur_parent[1..];
}
}
}
Expand Down Expand Up @@ -149,6 +148,7 @@ fn create_label_runner(
lookahead: u64,
ring_buf: &RingBuf,
base_parent_missing: &UnsafeSlice<'_, BitMask>,
api_version: ApiVersion,
) {
info!("created label runner");
// Label data bytes per node
Expand Down Expand Up @@ -187,6 +187,7 @@ fn create_label_runner(
exp_labels,
buf,
bpm,
api_version,
);
}

Expand All @@ -200,6 +201,7 @@ fn create_label_runner(
}
}

#[allow(clippy::too_many_arguments)]
fn create_layer_labels(
parents_cache: &CacheReader<u32>,
replica_id: &[u8],
Expand All @@ -208,6 +210,7 @@ fn create_layer_labels(
num_nodes: u64,
cur_layer: u32,
core_group: Arc<Option<MutexGuard<'_, Vec<CoreIndex>>>>,
api_version: ApiVersion,
) {
info!("Creating labels for layer {}", cur_layer);
// num_producers is the number of producer threads
Expand Down Expand Up @@ -284,6 +287,7 @@ fn create_layer_labels(
lookahead as u64,
ring_buf,
base_parent_missing,
api_version,
)
}));
}
Expand Down Expand Up @@ -344,6 +348,7 @@ fn create_layer_labels(
cur_node_ptr = &mut cur_node_ptr[8..];
// Grab the current slot of the ring_buf
let buf = unsafe { ring_buf.slot_mut(cur_slot) };

// Fill in the base parents
for k in 0..BASE_DEGREE {
let bpm = unsafe { base_parent_missing.get(cur_slot) };
Expand Down Expand Up @@ -499,6 +504,7 @@ pub fn create_labels_for_encoding<Tree: 'static + MerkleTreeTrait, T: AsRef<[u8]
node_count,
layer as u32,
core_group.clone(),
graph.api_version(),
);

// Cache reset happens in two parts.
Expand Down Expand Up @@ -589,6 +595,7 @@ pub fn create_labels_for_decoding<Tree: 'static + MerkleTreeTrait, T: AsRef<[u8]
node_count,
layer as u32,
core_group.clone(),
graph.api_version(),
);

// Cache reset happens in two parts.
Expand Down
5 changes: 5 additions & 0 deletions storage-proofs-porep/src/stacked/vanilla/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,11 @@ where
// round 7 (37)
hasher.finish_with(parents[0])
}

/// Returns the current `ApiVersion`.
pub fn api_version(&self) -> ApiVersion {
self.api_version
}
}

impl<H, G> ParameterSetMetadata for StackedGraph<H, G>
Expand Down
5 changes: 5 additions & 0 deletions storage-proofs-porep/src/stacked/vanilla/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ impl BitMask {
pub fn get(self, i: usize) -> bool {
self.0 & (1 << i) != 0
}

#[inline]
pub fn clear(&mut self) {
self.0 = 0;
}
}

#[derive(Debug)]
Expand Down