diff --git a/src/parallel/mod.rs b/src/parallel/mod.rs index dd3b89341..045aeb13a 100644 --- a/src/parallel/mod.rs +++ b/src/parallel/mod.rs @@ -32,6 +32,10 @@ //! “unindexed”. Use ndarray’s [Zip] for lock step parallel iteration of //! multiple arrays or producers at a time. //! +//! For the unindexed parallel iterators, an inherent method [`with_min_len`](Parallel::with_min_len) +//! is provided to limit the number of elements each parallel task processes in way that is +//! similar to Rayon's [`IndexedParallelIterator::with_min_len`](rayon::prelude::IndexedParallelIterator::with_min_len). +//! //! # Examples //! //! ## Arrays and array views diff --git a/src/parallel/par.rs b/src/parallel/par.rs index d9d592af6..cc905b5cf 100644 --- a/src/parallel/par.rs +++ b/src/parallel/par.rs @@ -21,11 +21,14 @@ use crate::split_at::SplitPreference; #[derive(Copy, Clone, Debug)] pub struct Parallel { iter: I, + min_len: usize, } +const DEFAULT_MIN_LEN: usize = 1; + /// Parallel producer wrapper. #[derive(Copy, Clone, Debug)] -struct ParallelProducer(I); +struct ParallelProducer(I, usize); macro_rules! par_iter_wrapper { // thread_bounds are either Sync or Send + Sync @@ -40,6 +43,7 @@ macro_rules! par_iter_wrapper { fn into_par_iter(self) -> Self::Iter { Parallel { iter: self, + min_len: DEFAULT_MIN_LEN, } } } @@ -67,7 +71,7 @@ macro_rules! par_iter_wrapper { fn with_producer(self, callback: Cb) -> Cb::Output where Cb: ProducerCallback { - callback.callback(ParallelProducer(self.iter)) + callback.callback(ParallelProducer(self.iter, self.min_len)) } fn len(&self) -> usize { @@ -106,7 +110,7 @@ macro_rules! par_iter_wrapper { fn split_at(self, i: usize) -> (Self, Self) { let (a, b) = self.0.split_at(i); - (ParallelProducer(a), ParallelProducer(b)) + (ParallelProducer(a, self.1), ParallelProducer(b, self.1)) } } @@ -131,11 +135,11 @@ macro_rules! par_iter_view_wrapper { fn into_par_iter(self) -> Self::Iter { Parallel { iter: self, + min_len: DEFAULT_MIN_LEN, } } } - impl<'a, A, D> ParallelIterator for Parallel<$view_name<'a, A, D>> where D: Dimension, A: $($thread_bounds)*, @@ -144,7 +148,7 @@ macro_rules! par_iter_view_wrapper { fn drive_unindexed(self, consumer: C) -> C::Result where C: UnindexedConsumer { - bridge_unindexed(ParallelProducer(self.iter), consumer) + bridge_unindexed(ParallelProducer(self.iter, self.min_len), consumer) } fn opt_len(&self) -> Option { @@ -152,20 +156,39 @@ macro_rules! par_iter_view_wrapper { } } + impl<'a, A, D> Parallel<$view_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + /// Sets the minimum number of elements desired to process in each job. This will not be + /// split any smaller than this length, but of course a producer could already be smaller + /// to begin with. + /// + /// ***Panics*** if `min_len` is zero. + pub fn with_min_len(self, min_len: usize) -> Self { + assert_ne!(min_len, 0, "Minimum number of elements must at least be one to avoid splitting off empty tasks."); + + Self { + min_len, + ..self + } + } + } + impl<'a, A, D> UnindexedProducer for ParallelProducer<$view_name<'a, A, D>> where D: Dimension, A: $($thread_bounds)*, { type Item = <$view_name<'a, A, D> as IntoIterator>::Item; fn split(self) -> (Self, Option) { - if self.0.len() <= 1 { + if self.0.len() <= self.1 { return (self, None) } let array = self.0; let max_axis = array.max_stride_axis(); let mid = array.len_of(max_axis) / 2; let (a, b) = array.split_at(max_axis, mid); - (ParallelProducer(a), Some(ParallelProducer(b))) + (ParallelProducer(a, self.1), Some(ParallelProducer(b, self.1))) } fn fold_with(self, folder: F) -> F @@ -217,6 +240,7 @@ macro_rules! zip_impl { fn into_par_iter(self) -> Self::Iter { Parallel { iter: self, + min_len: DEFAULT_MIN_LEN, } } } @@ -233,7 +257,7 @@ macro_rules! zip_impl { fn drive_unindexed(self, consumer: Cons) -> Cons::Result where Cons: UnindexedConsumer { - bridge_unindexed(ParallelProducer(self.iter), consumer) + bridge_unindexed(ParallelProducer(self.iter, self.min_len), consumer) } fn opt_len(&self) -> Option { @@ -251,11 +275,11 @@ macro_rules! zip_impl { type Item = ($($p::Item ,)*); fn split(self) -> (Self, Option) { - if !self.0.can_split() { + if self.0.size() <= self.1 { return (self, None) } let (a, b) = self.0.split(); - (ParallelProducer(a), Some(ParallelProducer(b))) + (ParallelProducer(a, self.1), Some(ParallelProducer(b, self.1))) } fn fold_with(self, folder: Fold) -> Fold @@ -284,6 +308,25 @@ zip_impl! { [P1 P2 P3 P4 P5 P6], } +impl Parallel> +where + D: Dimension, +{ + /// Sets the minimum number of elements desired to process in each job. This will not be + /// split any smaller than this length, but of course a producer could already be smaller + /// to begin with. + /// + /// ***Panics*** if `min_len` is zero. + pub fn with_min_len(self, min_len: usize) -> Self { + assert_ne!(min_len, 0, "Minimum number of elements must at least be one to avoid splitting off empty tasks."); + + Self { + min_len, + ..self + } + } +} + /// A parallel iterator (unindexed) that produces the splits of the array /// or producer `P`. pub(crate) struct ParallelSplits

{