Skip to content

Commit

Permalink
fix: use current process binding to limit thread cores (#1633)
Browse files Browse the repository at this point in the history
Use the current processes bound cores to limit the possible cores that
threads can be bound to. This allows core binding to work properly when the
lotus-worker service is limited to certain CPUs by cgroups.
  • Loading branch information
clinta authored and storojs72 committed Jan 11, 2023
1 parent c6f4348 commit 9a58f66
Showing 1 changed file with 92 additions and 13 deletions.
105 changes: 92 additions & 13 deletions storage-proofs-porep/src/stacked/vanilla/cores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ fn create_core_units(
core_count: usize,
group_count: usize,
cores_per_unit: usize,
allowed_cores: &hwloc::CpuSet,
) -> Vec<Vec<usize>> {
assert_eq!(0, core_count % group_count);
// The number of cores that belong to a single group.
Expand All @@ -176,18 +177,22 @@ fn create_core_units(
let core_units = (0..unit_count)
.map(|i| {
(0..cores_per_unit)
.map(|j| {
.filter_map(|j| {
// Every group gets a single unit assigned first. Only if all groups have
// already one unit, a second one will be assigned if possible. This would then
// be the second "round" of assignments.
let round = i / group_count;
// The index of the core that is bound to a unit.
let core_index = (j + i * group_size) % core_count + (round * cores_per_unit);
assert!(core_index < core_count);
core_index

allowed_cores
.is_set(core_index.try_into().ok()?)
.then_some(core_index)
})
.collect::<Vec<_>>()
})
.filter(|x| !x.is_empty())
.collect::<Vec<_>>();
debug!("Core units: {:?}", core_units);
core_units
Expand Down Expand Up @@ -229,6 +234,15 @@ fn core_units(cores_per_unit: usize) -> Option<Vec<Mutex<CoreUnit>>> {
let all_cores = topo
.objects_with_type(&ObjectType::Core)
.expect("objects_with_type failed");

let allowed_cores = topo
.get_cpubind(hwloc::CpuBindFlags::empty())
.unwrap_or_else(|| {
topo.object_at_root()
.allowed_cpuset()
.unwrap_or_else(hwloc::CpuSet::full)
});

// The total number of physical cores, even across packages.
let core_count = all_cores.len();

Expand All @@ -238,7 +252,8 @@ fn core_units(cores_per_unit: usize) -> Option<Vec<Mutex<CoreUnit>>> {
let group_count = get_shared_cache_count(&topo, core_depth, core_count);

// The list of units the multicore SDR threads can be bound to.
let core_units = create_core_units(core_count, group_count, cores_per_unit);
let core_units = create_core_units(core_count, group_count, cores_per_unit, &allowed_cores);
// this needs to take the all_cores vec instead of just a core count
Some(
core_units
.iter()
Expand Down Expand Up @@ -283,13 +298,13 @@ mod tests {
fn test_create_core_units() {
fil_logger::maybe_init();

let ci = create_core_units(18, 1, 4);
let ci = create_core_units(18, 1, 4, &(0..18).collect());
assert_eq!(
ci,
[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
);

let dc = create_core_units(32, 2, 4);
let dc = create_core_units(32, 2, 4, &(0..32).collect());
assert_eq!(
dc,
[
Expand All @@ -304,28 +319,28 @@ mod tests {
]
);

let amd = create_core_units(16, 4, 4);
let amd = create_core_units(16, 4, 4, &(0..16).collect());
assert_eq!(
amd,
[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
);

let amd_not_filled = create_core_units(16, 4, 3);
let amd_not_filled = create_core_units(16, 4, 3, &(0..16).collect());
assert_eq!(
amd_not_filled,
[[0, 1, 2], [4, 5, 6], [8, 9, 10], [12, 13, 14]]
);

let amd_not_filled = create_core_units(16, 4, 3);
let amd_not_filled = create_core_units(16, 4, 3, &(0..16).collect());
assert_eq!(
amd_not_filled,
[[0, 1, 2], [4, 5, 6], [8, 9, 10], [12, 13, 14]]
);

let intel = create_core_units(16, 2, 3);
let intel = create_core_units(16, 2, 3, &(0..16).collect());
assert_eq!(intel, [[0, 1, 2], [8, 9, 10], [3, 4, 5], [11, 12, 13]]);

let sp = create_core_units(48, 8, 3);
let sp = create_core_units(48, 8, 3, &(0..48).collect());
assert_eq!(
sp,
[
Expand All @@ -348,7 +363,7 @@ mod tests {
]
);

let sp_not_filled = create_core_units(48, 8, 4);
let sp_not_filled = create_core_units(48, 8, 4, &(0..48).collect());
assert_eq!(
sp_not_filled,
[
Expand All @@ -363,9 +378,73 @@ mod tests {
]
);

let laptop = create_core_units(4, 1, 2);
let laptop = create_core_units(4, 1, 2, &(0..4).collect());
assert_eq!(laptop, [[0, 1], [2, 3]]);
let laptop_not_filled = create_core_units(4, 1, 3);
let laptop_not_filled = create_core_units(4, 1, 3, &(0..4).collect());
assert_eq!(laptop_not_filled, [[0, 1, 2]]);

let amd_limited_0 = create_core_units(16, 4, 4, &(0..8).collect());
assert_eq!(amd_limited_0, [[0, 1, 2, 3], [4, 5, 6, 7]]);

let amd_limited_1 = create_core_units(16, 4, 4, &(8..16).collect());
assert_eq!(amd_limited_1, [[8, 9, 10, 11], [12, 13, 14, 15]]);

let sp_limited_0 = create_core_units(48, 8, 3, &(0..24).collect());
assert_eq!(
sp_limited_0,
[
[0, 1, 2],
[6, 7, 8],
[12, 13, 14],
[18, 19, 20],
[3, 4, 5],
[9, 10, 11],
[15, 16, 17],
[21, 22, 23],
]
);

let sp_limited_1 = create_core_units(48, 8, 3, &(24..48).collect());
assert_eq!(
sp_limited_1,
[
[24, 25, 26],
[30, 31, 32],
[36, 37, 38],
[42, 43, 44],
[27, 28, 29],
[33, 34, 35],
[39, 40, 41],
[45, 46, 47]
]
);

let limited_group = create_core_units(
16,
4,
4,
&vec![0, 1, 2, 4, 5, 6, 8, 9, 10, 12, 13, 14]
.into_iter()
.collect(),
);
assert_eq!(
limited_group,
[[0, 1, 2], [4, 5, 6], [8, 9, 10], [12, 13, 14],]
);

let limited_non_continuous = create_core_units(48, 8, 3, &(0..12).chain(24..36).collect());
assert_eq!(
limited_non_continuous,
[
[0, 1, 2],
[6, 7, 8],
[24, 25, 26],
[30, 31, 32],
[3, 4, 5],
[9, 10, 11],
[27, 28, 29],
[33, 34, 35],
]
);
}
}

0 comments on commit 9a58f66

Please sign in to comment.