From 8e5b6b74b8ca18c94ada6465db96ad7a73e82d9d Mon Sep 17 00:00:00 2001 From: Volker Mische Date: Thu, 21 Apr 2022 14:17:48 +0200 Subject: [PATCH] fix: SDR: make it possible to bind to cores if units > groups For multicore SDR it is important that the producer and conumers share the same (L3) cache. Hence we bind specific cores to threads. Prior to this change there was one multicore SDR job per group, even if the group could accompany multiple of such jobs. If there were more jobs scheduled than groups available, those additional jobs wouldn't use specific cores, but whatever the operating system decided. With this change, additional jobs are now put into the groups in case there is enough space to accompany them. Fixes #1556. --- .circleci/config.yml | 3 +- .../src/stacked/vanilla/cores.rs | 274 ++++++++++++++---- 2 files changed, 215 insertions(+), 62 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index a6c49f4b4e..5f488cb09f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -139,11 +139,12 @@ jobs: ulimit -u 20000 ulimit -n 20000 cargo test --all --verbose --release lifecycle -- --ignored --nocapture - cargo test -p storage-proofs-porep --features isolated-testing --release checkout_cores -- --test-threads=1 + cargo test -p storage-proofs-porep --features isolated-testing --release --lib stacked::vanilla::cores cargo test -p storage-proofs-porep --features isolated-testing --release test_parallel_generation_and_read_partial_range_v1_0 cargo test -p storage-proofs-porep --features isolated-testing --release test_parallel_generation_and_read_partial_range_v1_1 no_output_timeout: 30m environment: + RUST_LOG: debug RUST_TEST_THREADS: 1 FIL_PROOFS_USE_MULTICORE_SDR: true diff --git a/storage-proofs-porep/src/stacked/vanilla/cores.rs b/storage-proofs-porep/src/stacked/vanilla/cores.rs index fc486c9233..4e169457ba 100644 --- a/storage-proofs-porep/src/stacked/vanilla/cores.rs +++ b/storage-proofs-porep/src/stacked/vanilla/cores.rs @@ -1,32 +1,33 @@ +use std::convert::TryInto; use std::sync::{Mutex, MutexGuard}; use anyhow::{format_err, Result}; use hwloc::{Bitmap, ObjectType, Topology, TopologyObject, CPUBIND_THREAD}; use lazy_static::lazy_static; -use log::{debug, info, warn}; +use log::{debug, warn}; use storage_proofs_core::settings::SETTINGS; -type CoreGroup = Vec; +type CoreUnit = Vec; lazy_static! { pub static ref TOPOLOGY: Mutex = Mutex::new(Topology::new()); - pub static ref CORE_GROUPS: Option>> = { + pub static ref CORE_GROUPS: Option>> = { let num_producers = &SETTINGS.multicore_sdr_producers; let cores_per_unit = num_producers + 1; - core_groups(cores_per_unit) + core_units(cores_per_unit) }; } #[derive(Clone, Copy, Debug, PartialEq)] -/// `CoreIndex` is a simple wrapper type for indexes into the set of vixible cores. A `CoreIndex` should only ever be -/// created with a value known to be less than the number of visible cores. +/// `CoreIndex` is a simple wrapper type for indexes into the set of visible cores. A `CoreIndex` +/// should only ever be created with a value known to be less than the number of visible cores. pub struct CoreIndex(usize); -pub fn checkout_core_group() -> Option> { +pub fn checkout_core_group() -> Option> { match &*CORE_GROUPS { - Some(groups) => { - for (i, group) in groups.iter().enumerate() { - match group.try_lock() { + Some(units) => { + for (i, unit) in units.iter().enumerate() { + match unit.try_lock() { Ok(guard) => { debug!("checked out core group {}", i); return Some(guard); @@ -122,70 +123,129 @@ fn get_core_by_index(topo: &Topology, index: CoreIndex) -> Result<&TopologyObjec } } -fn core_groups(cores_per_unit: usize) -> Option>>> { +/// Group all available cores, that share a (L3) cache in a way, so that the multicore SDR can +/// operate most efficiently. +/// +/// A single multicore SDR run needs a certain amount of cores, a so-called *unit*. +/// `cores_per_unit` defines how many cores are dedicated to a single multicore SDR instance. +/// +/// On larget systems, the available cores (given by `core_count`) may be connected to separate +/// (L3) caches. All cores that belong to the same cache are called a *group*, the number of +/// groups is given by `group_count`. On smaller systems, like laptops, there usually is just a +/// single group. +/// +/// A unit is always bound to a single group. Groups may be large enough to bind multiple units. +/// Though for performance reasons it is preferred that units don't share a cache, hence the units +/// are distributed across separate groups first. Only if all groups are already bound to a unit, +/// a group will be re-used. +/// +/// Here's an example: you have a 48 core system, with 8 separate caches and you have units of size +/// 3. Your `core_count` is 48, the `group_count` is 8 and the `cores_per_unit` is 3. In every +/// group we have 6 cores available. This means that we can have two units bound to a single group. +/// You start scheduling multiple SDR multicore jobs. The first job is bound to the first group +/// which cointains cores 0, 1 and 2. The second job is then bound to the second group, which +/// contains cores 6, 7 and 8. It is *not* bound to the cores 3, 4 and 5, which belong to the first +/// group. They would fight for the same cache, which isn't ideal. Those cores will only be used +/// once all 8 groups have already a single unit bound. +/// +/// Not necessarily all cores will be used. If you e.g. have a system as in the example above, but +/// your unit is of size 4 (instead of 3), then only a single unit fits (due to its size) into a +/// single group. This would mean that the first group would only consist of cores 0, 1, 2 and 3. +/// Cores 4, 5 would be unassigned. If you schedule more than 8 multicore SDR jobs, those jobs can +/// pick any core, whicher the operating system decides to use. +fn create_core_units( + core_count: usize, + group_count: usize, + cores_per_unit: usize, +) -> Vec> { + assert_eq!(0, core_count % group_count); + // The number of cores that belong to a single group. + let group_size = core_count / group_count; + + // The number of units that can fit into single group. + let units_per_group = group_size / cores_per_unit; + + // The total number of units that can be bound to specific cores on the system. + let unit_count = group_count * units_per_group; + + debug!( + "Cores: {}, Shared Caches: {}, cores per cache (group_size): {}, cores per unit: {}", + core_count, group_count, group_size, cores_per_unit + ); + + let core_units = (0..unit_count) + .map(|i| { + (0..cores_per_unit) + .map(|j| { + // Every group gets a single unit assigned first. Only if all groups have + // already one unit, a second one will be assigned if possible. This would then + // be the second "round" of assignments. + let round = i / group_count; + // The index of the core that is bound to a unit. + let core_index = (j + i * group_size) % core_count + (round * cores_per_unit); + assert!(core_index < core_count); + core_index + }) + .collect::>() + }) + .collect::>(); + debug!("Core units: {:?}", core_units); + core_units +} + +/// Returns the number of caches that are shared between cores. +/// +/// The hwloc topology is traverse upwards starting at the given depth. As soon as there are less +/// objects than cores, we expect it to be a cache that is shared between those cores. +/// +/// When traversing upwards from the cores, the first level you reach could e.g. be a L2 cache +/// which every core has its own. But then you might reach the L3 cache, that is shared between +/// several cores. +fn get_shared_cache_count(topo: &Topology, depth: u32, core_count: usize) -> usize { + let mut depth = depth; + while depth > 0 { + let obj_count: usize = topo + .size_at_depth(depth) + .try_into() + .expect("Platform must be at lest 32-bit"); + if obj_count < core_count { + return obj_count; + } + depth -= 1; + } + 1 +} + +fn core_units(cores_per_unit: usize) -> Option>> { let topo = TOPOLOGY.lock().expect("poisoned lock"); + // At which depths the cores within one package are. If you think of the "depths" as a + // directory tree, it's the directory where all cores are stored. let core_depth = match topo.depth_or_below_for_type(&ObjectType::Core) { Ok(depth) => depth, Err(_) => return None, }; + let all_cores = topo .objects_with_type(&ObjectType::Core) .expect("objects_with_type failed"); + // The total number of physical cores, even across packages. let core_count = all_cores.len(); - let mut cache_depth = core_depth; - let mut cache_count = 1; - - while cache_depth > 0 { - let objs = topo.objects_at_depth(cache_depth); - let obj_count = objs.len(); - if obj_count < core_count { - cache_count = obj_count; - break; - } - - cache_depth -= 1; - } - - assert_eq!(0, core_count % cache_count); - let mut group_size = core_count / cache_count; - let mut group_count = cache_count; - - if cache_count <= 1 { - // If there are not more than one shared caches, there is no benefit in trying to group cores by cache. - // In that case, prefer more groups so we can still bind cores and also get some parallelism. - // Create as many full groups as possible. The last group may not be full. - group_count = core_count / cores_per_unit; - group_size = cores_per_unit; - - info!( - "found only {} shared cache(s), heuristically grouping cores into {} groups", - cache_count, group_count - ); - } else { - debug!( - "Cores: {}, Shared Caches: {}, cores per cache (group_size): {}", - core_count, cache_count, group_size - ); - } - - let core_groups = (0..group_count) - .map(|i| { - (0..group_size) - .map(|j| { - let core_index = i * group_size + j; - assert!(core_index < core_count); - CoreIndex(core_index) - }) - .collect::>() - }) - .collect::>(); + // The number of separate caches the cores are grouped into. There could e.g. be a machine with + // 48 cores. Those cores are separated into 2 packages, where each of them has 4 sepearate + // caches, where each cache contains 6 cores. Then the `group_count` would be 8. + let group_count = get_shared_cache_count(&topo, core_depth, core_count); + // The list of units the multicore SDR threads can be bound to. + let core_units = create_core_units(core_count, group_count, cores_per_unit); Some( - core_groups + core_units .iter() - .map(|group| Mutex::new(group.clone())) + .map(|unit| { + let unit_core_index = unit.iter().map(|core| CoreIndex(*core)).collect(); + Mutex::new(unit_core_index) + }) .collect::>(), ) } @@ -196,7 +256,8 @@ mod tests { #[test] fn test_cores() { - core_groups(2); + let _ = pretty_env_logger::try_init(); + core_units(2); } #[test] @@ -205,6 +266,7 @@ mod tests { // the cores we're working with may otherwise be busy and cause a // failure. fn test_checkout_cores() { + let _ = pretty_env_logger::try_init(); let checkout1 = checkout_core_group(); dbg!(&checkout1); let checkout2 = checkout_core_group(); @@ -216,4 +278,94 @@ mod tests { _ => panic!("failed to get two checkouts"), } } + + #[test] + fn test_create_core_units() { + let _ = pretty_env_logger::try_init(); + + let ci = create_core_units(18, 1, 4); + assert_eq!( + ci, + [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]] + ); + + let dc = create_core_units(32, 2, 4); + assert_eq!( + dc, + [ + [0, 1, 2, 3], + [16, 17, 18, 19], + [4, 5, 6, 7], + [20, 21, 22, 23], + [8, 9, 10, 11], + [24, 25, 26, 27], + [12, 13, 14, 15], + [28, 29, 30, 31] + ] + ); + + let amd = create_core_units(16, 4, 4); + assert_eq!( + amd, + [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]] + ); + + let amd_not_filled = create_core_units(16, 4, 3); + assert_eq!( + amd_not_filled, + [[0, 1, 2], [4, 5, 6], [8, 9, 10], [12, 13, 14]] + ); + + let amd_not_filled = create_core_units(16, 4, 3); + assert_eq!( + amd_not_filled, + [[0, 1, 2], [4, 5, 6], [8, 9, 10], [12, 13, 14]] + ); + + let intel = create_core_units(16, 2, 3); + assert_eq!(intel, [[0, 1, 2], [8, 9, 10], [3, 4, 5], [11, 12, 13]]); + + let sp = create_core_units(48, 8, 3); + assert_eq!( + sp, + [ + [0, 1, 2], + [6, 7, 8], + [12, 13, 14], + [18, 19, 20], + [24, 25, 26], + [30, 31, 32], + [36, 37, 38], + [42, 43, 44], + [3, 4, 5], + [9, 10, 11], + [15, 16, 17], + [21, 22, 23], + [27, 28, 29], + [33, 34, 35], + [39, 40, 41], + [45, 46, 47] + ] + ); + + let sp_not_filled = create_core_units(48, 8, 4); + assert_eq!( + sp_not_filled, + [ + [0, 1, 2, 3], + [6, 7, 8, 9], + [12, 13, 14, 15], + [18, 19, 20, 21], + [24, 25, 26, 27], + [30, 31, 32, 33], + [36, 37, 38, 39], + [42, 43, 44, 45] + ] + ); + + let laptop = create_core_units(4, 1, 2); + assert_eq!(laptop, [[0, 1], [2, 3]]); + let laptop_not_filled = create_core_units(4, 1, 3); + assert_eq!(laptop_not_filled, [[0, 1, 2]]); + } }