From 25b7eff10b1380cc5a7d6f6a1d32793c57e63841 Mon Sep 17 00:00:00 2001 From: Adam Reichold Date: Wed, 6 Oct 2021 09:03:01 +0200 Subject: [PATCH 1/4] Add Parallel::with_min_len to limit splitting Splitting of parallel iterators working `ArrayView` and `Zip` is currently not limited and continues until only one element is left which can lead to excessive overhead for algorithms formulated in terms of these iterators. Since these iterators are also not indexed, Rayon's generic `IndexedParallelIterator::with_min_len` does not apply. However, since the number of elements is known and currently checked against to one to determine if another split is possible, it appears straight-forward to replace this constant by a parameter and make it available to the user via a `Parallel::with_min_len` inherent method. --- src/parallel/par.rs | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/src/parallel/par.rs b/src/parallel/par.rs index d9d592af6..7db216c53 100644 --- a/src/parallel/par.rs +++ b/src/parallel/par.rs @@ -21,11 +21,24 @@ use crate::split_at::SplitPreference; #[derive(Copy, Clone, Debug)] pub struct Parallel { iter: I, + min_len: usize, } +impl Parallel { + /// Sets the minimum number of elements desired to process in each job. This will not be split any smaller than this length, but of course a producer could already be smaller to begin with. + pub fn with_min_len(self, min_len: usize) -> Self { + Self { + min_len, + ..self + } + } +} + +const DEFAULT_MIN_LEN: usize = 1; + /// Parallel producer wrapper. #[derive(Copy, Clone, Debug)] -struct ParallelProducer(I); +struct ParallelProducer(I, usize); macro_rules! par_iter_wrapper { // thread_bounds are either Sync or Send + Sync @@ -40,6 +53,7 @@ macro_rules! par_iter_wrapper { fn into_par_iter(self) -> Self::Iter { Parallel { iter: self, + min_len: DEFAULT_MIN_LEN, } } } @@ -67,7 +81,7 @@ macro_rules! par_iter_wrapper { fn with_producer(self, callback: Cb) -> Cb::Output where Cb: ProducerCallback { - callback.callback(ParallelProducer(self.iter)) + callback.callback(ParallelProducer(self.iter, self.min_len)) } fn len(&self) -> usize { @@ -106,7 +120,7 @@ macro_rules! par_iter_wrapper { fn split_at(self, i: usize) -> (Self, Self) { let (a, b) = self.0.split_at(i); - (ParallelProducer(a), ParallelProducer(b)) + (ParallelProducer(a, self.1), ParallelProducer(b, self.1)) } } @@ -131,6 +145,7 @@ macro_rules! par_iter_view_wrapper { fn into_par_iter(self) -> Self::Iter { Parallel { iter: self, + min_len: DEFAULT_MIN_LEN, } } } @@ -144,7 +159,7 @@ macro_rules! par_iter_view_wrapper { fn drive_unindexed(self, consumer: C) -> C::Result where C: UnindexedConsumer { - bridge_unindexed(ParallelProducer(self.iter), consumer) + bridge_unindexed(ParallelProducer(self.iter, self.min_len), consumer) } fn opt_len(&self) -> Option { @@ -158,14 +173,14 @@ macro_rules! par_iter_view_wrapper { { type Item = <$view_name<'a, A, D> as IntoIterator>::Item; fn split(self) -> (Self, Option) { - if self.0.len() <= 1 { + if self.0.len() <= self.1 { return (self, None) } let array = self.0; let max_axis = array.max_stride_axis(); let mid = array.len_of(max_axis) / 2; let (a, b) = array.split_at(max_axis, mid); - (ParallelProducer(a), Some(ParallelProducer(b))) + (ParallelProducer(a, self.1), Some(ParallelProducer(b, self.1))) } fn fold_with(self, folder: F) -> F @@ -217,6 +232,7 @@ macro_rules! zip_impl { fn into_par_iter(self) -> Self::Iter { Parallel { iter: self, + min_len: DEFAULT_MIN_LEN, } } } @@ -233,7 +249,7 @@ macro_rules! zip_impl { fn drive_unindexed(self, consumer: Cons) -> Cons::Result where Cons: UnindexedConsumer { - bridge_unindexed(ParallelProducer(self.iter), consumer) + bridge_unindexed(ParallelProducer(self.iter, self.min_len), consumer) } fn opt_len(&self) -> Option { @@ -251,11 +267,11 @@ macro_rules! zip_impl { type Item = ($($p::Item ,)*); fn split(self) -> (Self, Option) { - if !self.0.can_split() { + if self.0.size() <= self.1 { return (self, None) } let (a, b) = self.0.split(); - (ParallelProducer(a), Some(ParallelProducer(b))) + (ParallelProducer(a, self.1), Some(ParallelProducer(b, self.1))) } fn fold_with(self, folder: Fold) -> Fold From f596c9e128fa35fb8d6ea4c580a891c7911646bd Mon Sep 17 00:00:00 2001 From: Adam Reichold Date: Mon, 25 Oct 2021 19:56:34 +0200 Subject: [PATCH 2/4] Limit the with_min_len method to those parallel wrappers where it would not shadow a trait method Additionally, the min_len field does not affect those parallel iterators which are indexed as the splitting positions are chosen by Rayon which has its own `with_min_len` method to achieve the same effect. --- src/parallel/par.rs | 37 ++++++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/src/parallel/par.rs b/src/parallel/par.rs index 7db216c53..eb53f96cb 100644 --- a/src/parallel/par.rs +++ b/src/parallel/par.rs @@ -24,16 +24,6 @@ pub struct Parallel { min_len: usize, } -impl Parallel { - /// Sets the minimum number of elements desired to process in each job. This will not be split any smaller than this length, but of course a producer could already be smaller to begin with. - pub fn with_min_len(self, min_len: usize) -> Self { - Self { - min_len, - ..self - } - } -} - const DEFAULT_MIN_LEN: usize = 1; /// Parallel producer wrapper. @@ -150,7 +140,6 @@ macro_rules! par_iter_view_wrapper { } } - impl<'a, A, D> ParallelIterator for Parallel<$view_name<'a, A, D>> where D: Dimension, A: $($thread_bounds)*, @@ -167,6 +156,19 @@ macro_rules! par_iter_view_wrapper { } } + impl<'a, A, D> Parallel<$view_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + /// Sets the minimum number of elements desired to process in each job. This will not be split any smaller than this length, but of course a producer could already be smaller to begin with. + pub fn with_min_len(self, min_len: usize) -> Self { + Self { + min_len, + ..self + } + } + } + impl<'a, A, D> UnindexedProducer for ParallelProducer<$view_name<'a, A, D>> where D: Dimension, A: $($thread_bounds)*, @@ -300,6 +302,19 @@ zip_impl! { [P1 P2 P3 P4 P5 P6], } +impl Parallel> +where + D: Dimension, +{ + /// Sets the minimum number of elements desired to process in each job. This will not be split any smaller than this length, but of course a producer could already be smaller to begin with. + pub fn with_min_len(self, min_len: usize) -> Self { + Self { + min_len, + ..self + } + } +} + /// A parallel iterator (unindexed) that produces the splits of the array /// or producer `P`. pub(crate) struct ParallelSplits

{ From f71ec6b501693c9a2264757447194367c5aa30c6 Mon Sep 17 00:00:00 2001 From: Adam Reichold Date: Mon, 25 Oct 2021 20:19:44 +0200 Subject: [PATCH 3/4] Explain the relation of Parallel::with_min_len with Rayon's IndexedParallelIterator::with_min_len in the module-level docs. --- src/parallel/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/parallel/mod.rs b/src/parallel/mod.rs index dd3b89341..045aeb13a 100644 --- a/src/parallel/mod.rs +++ b/src/parallel/mod.rs @@ -32,6 +32,10 @@ //! “unindexed”. Use ndarray’s [Zip] for lock step parallel iteration of //! multiple arrays or producers at a time. //! +//! For the unindexed parallel iterators, an inherent method [`with_min_len`](Parallel::with_min_len) +//! is provided to limit the number of elements each parallel task processes in way that is +//! similar to Rayon's [`IndexedParallelIterator::with_min_len`](rayon::prelude::IndexedParallelIterator::with_min_len). +//! //! # Examples //! //! ## Arrays and array views From 4efc563e1bb1ae4f4d820ecae9180f9cc5ebcd49 Mon Sep 17 00:00:00 2001 From: Adam Reichold Date: Mon, 25 Oct 2021 20:33:52 +0200 Subject: [PATCH 4/4] Assert non-zero parallel producer mininum length to avoid splittinf off empty tasks. --- src/parallel/par.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/parallel/par.rs b/src/parallel/par.rs index eb53f96cb..cc905b5cf 100644 --- a/src/parallel/par.rs +++ b/src/parallel/par.rs @@ -160,8 +160,14 @@ macro_rules! par_iter_view_wrapper { where D: Dimension, A: $($thread_bounds)*, { - /// Sets the minimum number of elements desired to process in each job. This will not be split any smaller than this length, but of course a producer could already be smaller to begin with. + /// Sets the minimum number of elements desired to process in each job. This will not be + /// split any smaller than this length, but of course a producer could already be smaller + /// to begin with. + /// + /// ***Panics*** if `min_len` is zero. pub fn with_min_len(self, min_len: usize) -> Self { + assert_ne!(min_len, 0, "Minimum number of elements must at least be one to avoid splitting off empty tasks."); + Self { min_len, ..self @@ -306,8 +312,14 @@ impl Parallel> where D: Dimension, { - /// Sets the minimum number of elements desired to process in each job. This will not be split any smaller than this length, but of course a producer could already be smaller to begin with. + /// Sets the minimum number of elements desired to process in each job. This will not be + /// split any smaller than this length, but of course a producer could already be smaller + /// to begin with. + /// + /// ***Panics*** if `min_len` is zero. pub fn with_min_len(self, min_len: usize) -> Self { + assert_ne!(min_len, 0, "Minimum number of elements must at least be one to avoid splitting off empty tasks."); + Self { min_len, ..self