Skip to content

Commit

Permalink
Merge pull request #1081 from adamreichold/with-min-len
Browse files Browse the repository at this point in the history
Add Parallel::with_min_len to limit splitting
  • Loading branch information
bluss committed Oct 27, 2021
2 parents ee5880b + 4efc563 commit 72e6798
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 10 deletions.
4 changes: 4 additions & 0 deletions src/parallel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
//! “unindexed”. Use ndarray’s [Zip] for lock step parallel iteration of
//! multiple arrays or producers at a time.
//!
//! For the unindexed parallel iterators, an inherent method [`with_min_len`](Parallel::with_min_len)
//! is provided to limit the number of elements each parallel task processes in way that is
//! similar to Rayon's [`IndexedParallelIterator::with_min_len`](rayon::prelude::IndexedParallelIterator::with_min_len).
//!
//! # Examples
//!
//! ## Arrays and array views
Expand Down
63 changes: 53 additions & 10 deletions src/parallel/par.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ use crate::split_at::SplitPreference;
#[derive(Copy, Clone, Debug)]
pub struct Parallel<I> {
iter: I,
min_len: usize,
}

const DEFAULT_MIN_LEN: usize = 1;

/// Parallel producer wrapper.
#[derive(Copy, Clone, Debug)]
struct ParallelProducer<I>(I);
struct ParallelProducer<I>(I, usize);

macro_rules! par_iter_wrapper {
// thread_bounds are either Sync or Send + Sync
Expand All @@ -40,6 +43,7 @@ macro_rules! par_iter_wrapper {
fn into_par_iter(self) -> Self::Iter {
Parallel {
iter: self,
min_len: DEFAULT_MIN_LEN,
}
}
}
Expand Down Expand Up @@ -67,7 +71,7 @@ macro_rules! par_iter_wrapper {
fn with_producer<Cb>(self, callback: Cb) -> Cb::Output
where Cb: ProducerCallback<Self::Item>
{
callback.callback(ParallelProducer(self.iter))
callback.callback(ParallelProducer(self.iter, self.min_len))
}

fn len(&self) -> usize {
Expand Down Expand Up @@ -106,7 +110,7 @@ macro_rules! par_iter_wrapper {

fn split_at(self, i: usize) -> (Self, Self) {
let (a, b) = self.0.split_at(i);
(ParallelProducer(a), ParallelProducer(b))
(ParallelProducer(a, self.1), ParallelProducer(b, self.1))
}
}

Expand All @@ -131,11 +135,11 @@ macro_rules! par_iter_view_wrapper {
fn into_par_iter(self) -> Self::Iter {
Parallel {
iter: self,
min_len: DEFAULT_MIN_LEN,
}
}
}


impl<'a, A, D> ParallelIterator for Parallel<$view_name<'a, A, D>>
where D: Dimension,
A: $($thread_bounds)*,
Expand All @@ -144,28 +148,47 @@ macro_rules! par_iter_view_wrapper {
fn drive_unindexed<C>(self, consumer: C) -> C::Result
where C: UnindexedConsumer<Self::Item>
{
bridge_unindexed(ParallelProducer(self.iter), consumer)
bridge_unindexed(ParallelProducer(self.iter, self.min_len), consumer)
}

fn opt_len(&self) -> Option<usize> {
None
}
}

impl<'a, A, D> Parallel<$view_name<'a, A, D>>
where D: Dimension,
A: $($thread_bounds)*,
{
/// Sets the minimum number of elements desired to process in each job. This will not be
/// split any smaller than this length, but of course a producer could already be smaller
/// to begin with.
///
/// ***Panics*** if `min_len` is zero.
pub fn with_min_len(self, min_len: usize) -> Self {
assert_ne!(min_len, 0, "Minimum number of elements must at least be one to avoid splitting off empty tasks.");

Self {
min_len,
..self
}
}
}

impl<'a, A, D> UnindexedProducer for ParallelProducer<$view_name<'a, A, D>>
where D: Dimension,
A: $($thread_bounds)*,
{
type Item = <$view_name<'a, A, D> as IntoIterator>::Item;
fn split(self) -> (Self, Option<Self>) {
if self.0.len() <= 1 {
if self.0.len() <= self.1 {
return (self, None)
}
let array = self.0;
let max_axis = array.max_stride_axis();
let mid = array.len_of(max_axis) / 2;
let (a, b) = array.split_at(max_axis, mid);
(ParallelProducer(a), Some(ParallelProducer(b)))
(ParallelProducer(a, self.1), Some(ParallelProducer(b, self.1)))
}

fn fold_with<F>(self, folder: F) -> F
Expand Down Expand Up @@ -217,6 +240,7 @@ macro_rules! zip_impl {
fn into_par_iter(self) -> Self::Iter {
Parallel {
iter: self,
min_len: DEFAULT_MIN_LEN,
}
}
}
Expand All @@ -233,7 +257,7 @@ macro_rules! zip_impl {
fn drive_unindexed<Cons>(self, consumer: Cons) -> Cons::Result
where Cons: UnindexedConsumer<Self::Item>
{
bridge_unindexed(ParallelProducer(self.iter), consumer)
bridge_unindexed(ParallelProducer(self.iter, self.min_len), consumer)
}

fn opt_len(&self) -> Option<usize> {
Expand All @@ -251,11 +275,11 @@ macro_rules! zip_impl {
type Item = ($($p::Item ,)*);

fn split(self) -> (Self, Option<Self>) {
if !self.0.can_split() {
if self.0.size() <= self.1 {
return (self, None)
}
let (a, b) = self.0.split();
(ParallelProducer(a), Some(ParallelProducer(b)))
(ParallelProducer(a, self.1), Some(ParallelProducer(b, self.1)))
}

fn fold_with<Fold>(self, folder: Fold) -> Fold
Expand Down Expand Up @@ -284,6 +308,25 @@ zip_impl! {
[P1 P2 P3 P4 P5 P6],
}

impl<D, Parts> Parallel<Zip<Parts, D>>
where
D: Dimension,
{
/// Sets the minimum number of elements desired to process in each job. This will not be
/// split any smaller than this length, but of course a producer could already be smaller
/// to begin with.
///
/// ***Panics*** if `min_len` is zero.
pub fn with_min_len(self, min_len: usize) -> Self {
assert_ne!(min_len, 0, "Minimum number of elements must at least be one to avoid splitting off empty tasks.");

Self {
min_len,
..self
}
}
}

/// A parallel iterator (unindexed) that produces the splits of the array
/// or producer `P`.
pub(crate) struct ParallelSplits<P> {
Expand Down

0 comments on commit 72e6798

Please sign in to comment.