Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: SDR: make it possible to bind to cores if units > groups #1588

Merged
merged 1 commit into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,12 @@ jobs:
ulimit -u 20000
ulimit -n 20000
cargo test --all --verbose --release lifecycle -- --ignored --nocapture
cargo test -p storage-proofs-porep --features isolated-testing --release checkout_cores -- --test-threads=1
cargo test -p storage-proofs-porep --features isolated-testing --release --lib stacked::vanilla::cores
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case you are wondering about this change. It still runs the original and now two additional tests. And the --test-threads=1 is already set by the RUST_TEST_THREADS: 1 below (you need to click once on expanding the source file).

cargo test -p storage-proofs-porep --features isolated-testing --release test_parallel_generation_and_read_partial_range_v1_0
cargo test -p storage-proofs-porep --features isolated-testing --release test_parallel_generation_and_read_partial_range_v1_1
no_output_timeout: 30m
environment:
RUST_LOG: debug
RUST_TEST_THREADS: 1
FIL_PROOFS_USE_MULTICORE_SDR: true

Expand Down
274 changes: 213 additions & 61 deletions storage-proofs-porep/src/stacked/vanilla/cores.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,33 @@
use std::convert::TryInto;
use std::sync::{Mutex, MutexGuard};

use anyhow::{format_err, Result};
use hwloc::{Bitmap, ObjectType, Topology, TopologyObject, CPUBIND_THREAD};
use lazy_static::lazy_static;
use log::{debug, info, warn};
use log::{debug, warn};
use storage_proofs_core::settings::SETTINGS;

type CoreGroup = Vec<CoreIndex>;
type CoreUnit = Vec<CoreIndex>;
lazy_static! {
pub static ref TOPOLOGY: Mutex<Topology> = Mutex::new(Topology::new());
pub static ref CORE_GROUPS: Option<Vec<Mutex<CoreGroup>>> = {
pub static ref CORE_GROUPS: Option<Vec<Mutex<CoreUnit>>> = {
let num_producers = &SETTINGS.multicore_sdr_producers;
let cores_per_unit = num_producers + 1;

core_groups(cores_per_unit)
core_units(cores_per_unit)
};
}

#[derive(Clone, Copy, Debug, PartialEq)]
/// `CoreIndex` is a simple wrapper type for indexes into the set of vixible cores. A `CoreIndex` should only ever be
/// created with a value known to be less than the number of visible cores.
/// `CoreIndex` is a simple wrapper type for indexes into the set of visible cores. A `CoreIndex`
/// should only ever be created with a value known to be less than the number of visible cores.
pub struct CoreIndex(usize);

pub fn checkout_core_group() -> Option<MutexGuard<'static, CoreGroup>> {
pub fn checkout_core_group() -> Option<MutexGuard<'static, CoreUnit>> {
match &*CORE_GROUPS {
Some(groups) => {
for (i, group) in groups.iter().enumerate() {
match group.try_lock() {
Some(units) => {
for (i, unit) in units.iter().enumerate() {
match unit.try_lock() {
Ok(guard) => {
debug!("checked out core group {}", i);
return Some(guard);
Expand Down Expand Up @@ -122,70 +123,129 @@ fn get_core_by_index(topo: &Topology, index: CoreIndex) -> Result<&TopologyObjec
}
}

fn core_groups(cores_per_unit: usize) -> Option<Vec<Mutex<Vec<CoreIndex>>>> {
/// Group all available cores, that share a (L3) cache in a way, so that the multicore SDR can
/// operate most efficiently.
///
/// A single multicore SDR run needs a certain amount of cores, a so-called *unit*.
/// `cores_per_unit` defines how many cores are dedicated to a single multicore SDR instance.
///
/// On larget systems, the available cores (given by `core_count`) may be connected to separate
/// (L3) caches. All cores that belong to the same cache are called a *group*, the number of
/// groups is given by `group_count`. On smaller systems, like laptops, there usually is just a
/// single group.
///
/// A unit is always bound to a single group. Groups may be large enough to bind multiple units.
/// Though for performance reasons it is preferred that units don't share a cache, hence the units
/// are distributed across separate groups first. Only if all groups are already bound to a unit,
/// a group will be re-used.
///
/// Here's an example: you have a 48 core system, with 8 separate caches and you have units of size
/// 3. Your `core_count` is 48, the `group_count` is 8 and the `cores_per_unit` is 3. In every
/// group we have 6 cores available. This means that we can have two units bound to a single group.
/// You start scheduling multiple SDR multicore jobs. The first job is bound to the first group
/// which cointains cores 0, 1 and 2. The second job is then bound to the second group, which
/// contains cores 6, 7 and 8. It is *not* bound to the cores 3, 4 and 5, which belong to the first
/// group. They would fight for the same cache, which isn't ideal. Those cores will only be used
/// once all 8 groups have already a single unit bound.
///
/// Not necessarily all cores will be used. If you e.g. have a system as in the example above, but
/// your unit is of size 4 (instead of 3), then only a single unit fits (due to its size) into a
/// single group. This would mean that the first group would only consist of cores 0, 1, 2 and 3.
/// Cores 4, 5 would be unassigned. If you schedule more than 8 multicore SDR jobs, those jobs can
/// pick any core, whicher the operating system decides to use.
fn create_core_units(
core_count: usize,
group_count: usize,
cores_per_unit: usize,
) -> Vec<Vec<usize>> {
assert_eq!(0, core_count % group_count);
// The number of cores that belong to a single group.
let group_size = core_count / group_count;

// The number of units that can fit into single group.
let units_per_group = group_size / cores_per_unit;

// The total number of units that can be bound to specific cores on the system.
let unit_count = group_count * units_per_group;

debug!(
"Cores: {}, Shared Caches: {}, cores per cache (group_size): {}, cores per unit: {}",
core_count, group_count, group_size, cores_per_unit
);

let core_units = (0..unit_count)
.map(|i| {
(0..cores_per_unit)
.map(|j| {
// Every group gets a single unit assigned first. Only if all groups have
// already one unit, a second one will be assigned if possible. This would then
// be the second "round" of assignments.
let round = i / group_count;
// The index of the core that is bound to a unit.
let core_index = (j + i * group_size) % core_count + (round * cores_per_unit);
assert!(core_index < core_count);
core_index
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
debug!("Core units: {:?}", core_units);
core_units
}

/// Returns the number of caches that are shared between cores.
///
/// The hwloc topology is traverse upwards starting at the given depth. As soon as there are less
/// objects than cores, we expect it to be a cache that is shared between those cores.
///
/// When traversing upwards from the cores, the first level you reach could e.g. be a L2 cache
/// which every core has its own. But then you might reach the L3 cache, that is shared between
/// several cores.
fn get_shared_cache_count(topo: &Topology, depth: u32, core_count: usize) -> usize {
let mut depth = depth;
while depth > 0 {
let obj_count: usize = topo
.size_at_depth(depth)
.try_into()
.expect("Platform must be at lest 32-bit");
if obj_count < core_count {
return obj_count;
}
depth -= 1;
}
1
}

fn core_units(cores_per_unit: usize) -> Option<Vec<Mutex<CoreUnit>>> {
let topo = TOPOLOGY.lock().expect("poisoned lock");

// At which depths the cores within one package are. If you think of the "depths" as a
// directory tree, it's the directory where all cores are stored.
let core_depth = match topo.depth_or_below_for_type(&ObjectType::Core) {
Ok(depth) => depth,
Err(_) => return None,
};

let all_cores = topo
.objects_with_type(&ObjectType::Core)
.expect("objects_with_type failed");
// The total number of physical cores, even across packages.
let core_count = all_cores.len();

let mut cache_depth = core_depth;
let mut cache_count = 1;

while cache_depth > 0 {
let objs = topo.objects_at_depth(cache_depth);
let obj_count = objs.len();
if obj_count < core_count {
cache_count = obj_count;
break;
}

cache_depth -= 1;
}

assert_eq!(0, core_count % cache_count);
let mut group_size = core_count / cache_count;
let mut group_count = cache_count;

if cache_count <= 1 {
// If there are not more than one shared caches, there is no benefit in trying to group cores by cache.
// In that case, prefer more groups so we can still bind cores and also get some parallelism.
// Create as many full groups as possible. The last group may not be full.
group_count = core_count / cores_per_unit;
group_size = cores_per_unit;

info!(
"found only {} shared cache(s), heuristically grouping cores into {} groups",
cache_count, group_count
);
} else {
debug!(
"Cores: {}, Shared Caches: {}, cores per cache (group_size): {}",
core_count, cache_count, group_size
);
}

let core_groups = (0..group_count)
.map(|i| {
(0..group_size)
.map(|j| {
let core_index = i * group_size + j;
assert!(core_index < core_count);
CoreIndex(core_index)
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
// The number of separate caches the cores are grouped into. There could e.g. be a machine with
// 48 cores. Those cores are separated into 2 packages, where each of them has 4 sepearate
// caches, where each cache contains 6 cores. Then the `group_count` would be 8.
let group_count = get_shared_cache_count(&topo, core_depth, core_count);

// The list of units the multicore SDR threads can be bound to.
let core_units = create_core_units(core_count, group_count, cores_per_unit);
Some(
core_groups
core_units
.iter()
.map(|group| Mutex::new(group.clone()))
.map(|unit| {
let unit_core_index = unit.iter().map(|core| CoreIndex(*core)).collect();
Mutex::new(unit_core_index)
})
.collect::<Vec<_>>(),
)
}
Expand All @@ -196,7 +256,8 @@ mod tests {

#[test]
fn test_cores() {
core_groups(2);
fil_logger::maybe_init();
core_units(2);
}

#[test]
Expand All @@ -205,6 +266,7 @@ mod tests {
// the cores we're working with may otherwise be busy and cause a
// failure.
fn test_checkout_cores() {
fil_logger::maybe_init();
let checkout1 = checkout_core_group();
dbg!(&checkout1);
let checkout2 = checkout_core_group();
Expand All @@ -216,4 +278,94 @@ mod tests {
_ => panic!("failed to get two checkouts"),
}
}

#[test]
fn test_create_core_units() {
fil_logger::maybe_init();

let ci = create_core_units(18, 1, 4);
assert_eq!(
ci,
[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
);

let dc = create_core_units(32, 2, 4);
assert_eq!(
dc,
[
[0, 1, 2, 3],
[16, 17, 18, 19],
[4, 5, 6, 7],
[20, 21, 22, 23],
[8, 9, 10, 11],
[24, 25, 26, 27],
[12, 13, 14, 15],
[28, 29, 30, 31]
]
);

let amd = create_core_units(16, 4, 4);
assert_eq!(
amd,
[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
);

let amd_not_filled = create_core_units(16, 4, 3);
assert_eq!(
amd_not_filled,
[[0, 1, 2], [4, 5, 6], [8, 9, 10], [12, 13, 14]]
);

let amd_not_filled = create_core_units(16, 4, 3);
assert_eq!(
amd_not_filled,
[[0, 1, 2], [4, 5, 6], [8, 9, 10], [12, 13, 14]]
);

let intel = create_core_units(16, 2, 3);
assert_eq!(intel, [[0, 1, 2], [8, 9, 10], [3, 4, 5], [11, 12, 13]]);

let sp = create_core_units(48, 8, 3);
assert_eq!(
sp,
[
[0, 1, 2],
[6, 7, 8],
[12, 13, 14],
[18, 19, 20],
[24, 25, 26],
[30, 31, 32],
[36, 37, 38],
[42, 43, 44],
[3, 4, 5],
[9, 10, 11],
[15, 16, 17],
[21, 22, 23],
[27, 28, 29],
[33, 34, 35],
[39, 40, 41],
[45, 46, 47]
]
);

let sp_not_filled = create_core_units(48, 8, 4);
assert_eq!(
sp_not_filled,
[
[0, 1, 2, 3],
[6, 7, 8, 9],
[12, 13, 14, 15],
[18, 19, 20, 21],
[24, 25, 26, 27],
[30, 31, 32, 33],
[36, 37, 38, 39],
[42, 43, 44, 45]
]
);

let laptop = create_core_units(4, 1, 2);
assert_eq!(laptop, [[0, 1], [2, 3]]);
let laptop_not_filled = create_core_units(4, 1, 3);
assert_eq!(laptop_not_filled, [[0, 1, 2]]);
}
}