Skip to content

Commit

Permalink
Bind threads to cores in multicore sdr.
Browse files Browse the repository at this point in the history
  • Loading branch information
porcuquine committed Oct 12, 2020
1 parent 98d945f commit a43f3a9
Show file tree
Hide file tree
Showing 8 changed files with 451 additions and 110 deletions.
10 changes: 10 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,16 @@ jobs:
steps:
- configure_environment_variables
- checkout
- run:
name: Install hwloc 1.11.9
command: |
cd /tmp
curl https://www.open-mpi.org/software/hwloc/v1.11/downloads/hwloc-1.11.9.tar.gz --location --output /tmp/hwloc-1.11.9.tar.gz
tar xzvf hwloc-1.11.9.tar.gz
cd hwloc-1.11.9
./configure
make
sudo make install
- run:
name: Install Rust
command: |
Expand Down
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ The instructions below assume you have independently installed `rust-fil-proofs`

**NOTE:** `rust-fil-proofs` can only be built for and run on 64-bit platforms; building will panic if the target architecture is not 64-bits.

Before building you will need OpenCL to be installed, on Ubuntu this can be achieved with `apt install ocl-icd-opencl-dev`. Other system dependencies such as 'gcc/clang', 'wall' and 'cmake' are also required.
Before building you will need OpenCL to be installed. On Ubuntu, this can be achieved with `apt install ocl-icd-opencl-dev`. Other system dependencies such as 'gcc/clang', 'wall' and 'cmake' are also required.

You will also need to install the hwloc library. On Ubuntu, this can be achieved with `apt install hwloc libhwloc-dev`. For other platforms, please see the [hwloc-rs Prerequisites section](https://github.com/daschl/hwloc-rs).


```
> cargo build --release --all
Expand Down Expand Up @@ -179,6 +182,18 @@ accomplished either by running the process as root, or by increasing the system
-l`. Two sector size's worth of data (for current and previous layers) must be locked -- along with 56 *
`FIL_PROOFS_PARENT_CACHE_SIZE` bytes for the parent cache.

Default parameters have been tuned to provide good performance on the AMD Ryzen Threadripper 3970x. It may be useful to
experiment with these, especially on different hardware. We have made an effort to use sensible heuristics and to ensure
reasonable behavior for a range of configurations and hardware, but actual performance or behavior of mulitcore
replication is not yet well tested except on our target. The following settings may be useful, but do expect some
failure in the search for good parameters. This might take the form of failed replication (bad proofs), errors during
replication, or even potentially crashes if parameters prove pathological. For now, this is an experimental feature, and
only the default configuration on default hardware (3970x) is known to work well.

`FIL_PROOFS_MULTICORE_SDR_PRODUCERS`: This is the number of worker threads loading node parents in parallel. The default is `3` so the producers and main thread together use a full core complex (but no more).
`FIL_PROOFS_MULTICORE_SDR_PRODUCER_STRIDE`: This is the (max) number of nodes for which a producer thread will load parents in each iteration of its loop. The default is`128`.
`FIL_PROOFS_MULTICORE_SDR_LOOKAHEAD`: This is the size of the lookahead buffer into which node parents are pre-loaded by the producer threads. The default is 800.

### GPU Usage

We can now optionally build the column hashed tree 'tree_c' using the GPU with noticeable speed-up over the CPU. To activate the GPU for this, use the environment variable
Expand Down
6 changes: 6 additions & 0 deletions storage-proofs/core/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ pub struct Settings {
pub parent_cache: String,
pub use_fil_blst: bool,
pub use_multicore_sdr: bool,
pub multicore_sdr_producers: usize,
pub multicore_sdr_producer_stride: u64,
pub multicore_sdr_lookahead: usize,
}

impl Default for Settings {
Expand All @@ -54,6 +57,9 @@ impl Default for Settings {
parent_cache: cache("filecoin-parents"),
use_fil_blst: false,
use_multicore_sdr: false,
multicore_sdr_producers: 3,
multicore_sdr_producer_stride: 128,
multicore_sdr_lookahead: 800,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions storage-proofs/porep/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ bincode = "1.1.2"
byteorder = "1.3.4"
lazy_static = "1.2"
byte-slice-cast = "0.3.5"
hwloc = "0.3.0"
libc = "0.2"

[dev-dependencies]
tempfile = "3"
Expand Down
219 changes: 219 additions & 0 deletions storage-proofs/porep/src/stacked/vanilla/cores.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
use log::*;
use std::sync::{Mutex, MutexGuard};

use anyhow::Result;
use hwloc::{ObjectType, Topology, TopologyObject, CPUBIND_THREAD};
use lazy_static::lazy_static;

use storage_proofs_core::settings;

type CoreGroup = Vec<CoreIndex>;
lazy_static! {
pub static ref TOPOLOGY: Mutex<Topology> = Mutex::new(Topology::new());
pub static ref CORE_GROUPS: Option<Vec<Mutex<CoreGroup>>> = {
let settings = settings::SETTINGS.lock().expect("settings lock failure");
let num_producers = settings.multicore_sdr_producers;
let cores_per_unit = num_producers + 1;

core_groups(cores_per_unit)
};
}

#[derive(Clone, Copy, Debug, PartialEq)]
/// `CoreIndex` is a simple wrapper type for indexes into the set of vixible cores. A `CoreIndex` should only ever be
/// created with a value known to be less than the number of visible cores.
pub struct CoreIndex(usize);

pub fn checkout_core_group() -> Option<MutexGuard<'static, CoreGroup>> {
match &*CORE_GROUPS {
Some(groups) => {
for (i, group) in groups.iter().enumerate() {
match group.try_lock() {
Ok(guard) => {
debug!("checked out core group {}", i);
return Some(guard);
}
Err(_) => debug!("core group {} locked, could not checkout", i),
}
}
None
}
None => None,
}
}

#[cfg(not(target_os = "windows"))]
pub type ThreadId = libc::pthread_t;

#[cfg(target_os = "windows")]
pub type ThreadId = winapi::winnt::HANDLE;

/// Helper method to get the thread id through libc, with current rust stable (1.5.0) its not
/// possible otherwise I think.
#[cfg(not(target_os = "windows"))]
fn get_thread_id() -> ThreadId {
unsafe { libc::pthread_self() }
}

#[cfg(target_os = "windows")]
fn get_thread_id() -> ThreadId {
unsafe { kernel32::GetCurrentThread() }
}

pub struct Cleanup {
tid: ThreadId,
prior_state: Option<hwloc::Bitmap>,
}

impl Drop for Cleanup {
fn drop(&mut self) {
match self.prior_state.take() {
Some(prior) => {
let child_topo = &TOPOLOGY;
let mut locked_topo = child_topo.lock().unwrap();
let _ = locked_topo.set_cpubind_for_thread(self.tid, prior, CPUBIND_THREAD);
}
None => (),
}
}
}

pub fn bind_core(core_index: CoreIndex) -> Result<Cleanup> {
let child_topo = &TOPOLOGY;
let tid = get_thread_id();
let mut locked_topo = child_topo.lock().unwrap();
let core = get_core_by_index(&locked_topo, core_index).map_err(|err| {
anyhow::format_err!("failed to get core at index {}: {:?}", core_index.0, err)
})?;

let cpuset = core.allowed_cpuset().ok_or_else(|| {
anyhow::format_err!("no allowed cpuset for core at index {}", core_index.0,)
})?;
debug!("allowed cpuset: {:?}", cpuset);
let mut bind_to = cpuset;

// Get only one logical processor (in case the core is SMT/hyper-threaded).
bind_to.singlify();

// Thread binding before explicit set.
let before = locked_topo.get_cpubind_for_thread(tid, CPUBIND_THREAD);

debug!("binding to {:?}", bind_to);
// Set the binding.
let result = locked_topo
.set_cpubind_for_thread(tid, bind_to, CPUBIND_THREAD)
.map_err(|err| anyhow::format_err!("failed to bind CPU: {:?}", err));

if result.is_err() {
warn!("error in bind_core, {:?}", result);
}

Ok(Cleanup {
tid,
prior_state: before,
})
}

fn get_core_by_index<'a>(topo: &'a Topology, index: CoreIndex) -> Result<&'a TopologyObject> {
let idx = index.0;

match topo.objects_with_type(&ObjectType::Core) {
Ok(all_cores) if idx < all_cores.len() => Ok(all_cores[idx]),
Ok(all_cores) => Err(anyhow::format_err!(
"idx ({}) out of range for {} cores",
idx,
all_cores.len()
)),
_e => Err(anyhow::format_err!("failed to get core by index {}", idx,)),
}
}

fn core_groups(cores_per_unit: usize) -> Option<Vec<Mutex<Vec<CoreIndex>>>> {
let topo = TOPOLOGY.lock().unwrap();

let core_depth = match topo.depth_or_below_for_type(&ObjectType::Core) {
Ok(depth) => depth,
Err(_) => return None,
};
let all_cores = topo.objects_with_type(&ObjectType::Core).unwrap();
let core_count = all_cores.len();

let mut cache_depth = core_depth;
let mut cache_count = 0;

while cache_depth > 0 {
let objs = topo.objects_at_depth(cache_depth);
let obj_count = objs.len();
if obj_count < core_count {
cache_count = obj_count;
break;
}

cache_depth -= 1;
}

assert_eq!(0, core_count % cache_count);
let mut group_size = core_count / cache_count;
let mut group_count = cache_count;

if cache_count <= 1 {
// If there are not more than one shared caches, there is no benefit in trying to group cores by cache.
// In that case, prefer more groups so we can still bind cores and also get some parallelism.
// Create as many full groups as possible. The last group may not be full.
group_count = core_count / cores_per_unit;
group_size = cores_per_unit;

info!(
"found only {} shared cache(s), heuristically grouping cores into {} groups",
cache_count, group_count
);
} else {
debug!(
"Cores: {}, Shared Caches: {}, cores per cache (group_size): {}",
core_count, cache_count, group_size
);
}

let core_groups = (0..group_count)
.map(|i| {
(0..group_size)
.map(|j| {
let core_index = i * group_size + j;
assert!(core_index < core_count);
CoreIndex(core_index)
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();

Some(
core_groups
.iter()
.map(|group| Mutex::new(group.clone()))
.collect::<Vec<_>>(),
)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_cores() {
core_groups(2);
}

#[test]
fn test_checkout_cores() {
let checkout1 = checkout_core_group();
dbg!(&checkout1);
let checkout2 = checkout_core_group();
dbg!(&checkout2);

// This test might fail if run on a machine with fewer than four cores.
match (checkout1, checkout2) {
(Some(c1), Some(c2)) => assert!(*c1 != *c2),
_ => panic!("failed to get two checkouts"),
}
}
}
Loading

0 comments on commit a43f3a9

Please sign in to comment.