From 036993a24b67568ba40a5f9efe69222d81a99393 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Sat, 21 Oct 2017 18:29:29 -0700 Subject: [PATCH 01/11] WIP try_fold, try_fold_with, and try_reduce --- src/iter/fold.rs | 2 +- src/iter/mod.rs | 31 +++++ src/iter/try_fold.rs | 285 +++++++++++++++++++++++++++++++++++++++++ src/iter/try_reduce.rs | 134 +++++++++++++++++++ src/lib.rs | 1 + 5 files changed, 452 insertions(+), 1 deletion(-) create mode 100644 src/iter/try_fold.rs create mode 100644 src/iter/try_reduce.rs diff --git a/src/iter/fold.rs b/src/iter/fold.rs index 1e5d84a02..d60cfb5bb 100644 --- a/src/iter/fold.rs +++ b/src/iter/fold.rs @@ -140,7 +140,7 @@ impl<'r, C, ID, F, T> Folder for FoldFolder<'r, C, ID, F> pub fn fold_with(base: I, item: U, fold_op: F) -> FoldWith where I: ParallelIterator, - F: Fn(U, I::Item) -> U + Sync, + F: Fn(U, I::Item) -> U + Sync + Send, U: Send + Clone { FoldWith { diff --git a/src/iter/mod.rs b/src/iter/mod.rs index 6a48f344a..8bfdc67ab 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -71,6 +71,7 @@ pub use either::Either; use std::cmp::{self, Ordering}; use std::iter::{Sum, Product}; use std::ops::Fn; +use std::ops::Try; use self::plumbing::*; // There is a method to the madness here: @@ -106,7 +107,10 @@ pub mod plumbing; mod for_each; mod fold; pub use self::fold::{Fold, FoldWith}; +mod try_fold; +pub use self::try_fold::{TryFold, TryFoldWith}; mod reduce; +mod try_reduce; mod skip; pub use self::skip::Skip; mod splitter; @@ -676,6 +680,15 @@ pub trait ParallelIterator: Sized + Send { }) } + /// TODO + fn try_reduce(self, identity: ID, op: OP) -> Self::Item + where OP: Fn(T, T) -> Self::Item + Sync + Send, + ID: Fn() -> T + Sync + Send, + Self::Item: Try + { + try_reduce::try_reduce(self, identity, op) + } + /// Parallel fold is similar to sequential fold except that the /// sequence of items may be subdivided before it is /// folded. Consider a list of numbers like `22 3 77 89 46`. If @@ -843,6 +856,24 @@ pub trait ParallelIterator: Sized + Send { fold::fold_with(self, init, fold_op) } + /// TODO + fn try_fold(self, identity: ID, fold_op: F) -> TryFold + where F: Fn(T, Self::Item) -> R + Sync + Send, + ID: Fn() -> T + Sync + Send, + R: Try + Send + { + try_fold::try_fold(self, identity, fold_op) + } + + /// TODO + fn try_fold_with(self, init: T, fold_op: F) -> TryFoldWith + where F: Fn(T, Self::Item) -> R + Sync + Send, + R: Try + Send, + T: Clone + Send + { + try_fold::try_fold_with(self, init, fold_op) + } + /// Sums up the items in the iterator. /// /// Note that the order in items will be reduced is not specified, diff --git a/src/iter/try_fold.rs b/src/iter/try_fold.rs new file mode 100644 index 000000000..60962d02e --- /dev/null +++ b/src/iter/try_fold.rs @@ -0,0 +1,285 @@ +use super::plumbing::*; +use super::*; + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::fmt::{self, Debug}; +use std::marker::PhantomData; +use std::ops::Try; + +pub fn try_fold(base: I, identity: ID, fold_op: F) -> TryFold + where I: ParallelIterator, + F: Fn(U::Ok, I::Item) -> U + Sync + Send, + ID: Fn() -> U::Ok + Sync + Send, + U: Try + Send +{ + TryFold { + base: base, + identity: identity, + fold_op: fold_op, + marker: PhantomData, + } +} + +/// `TryFold` is an iterator that applies a function over an iterator producing a single value. +/// This struct is created by the [`try_fold()`] method on [`ParallelIterator`] +/// +/// [`try_fold()`]: trait.ParallelIterator.html#method.try_fold +/// [`ParallelIterator`]: trait.ParallelIterator.html +#[must_use = "iterator adaptors are lazy and do nothing unless consumed"] +#[derive(Clone)] +pub struct TryFold { + base: I, + identity: ID, + fold_op: F, + marker: PhantomData, +} + +impl Debug for TryFold { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("TryFold") + .field("base", &self.base) + .finish() + } +} + +impl ParallelIterator for TryFold + where I: ParallelIterator, + F: Fn(U::Ok, I::Item) -> U + Sync + Send, + ID: Fn() -> U::Ok + Sync + Send, + U: Try + Send +{ + type Item = U; + + fn drive_unindexed(self, consumer: C) -> C::Result + where C: UnindexedConsumer + { + let full = AtomicBool::new(false); + let consumer1 = TryFoldConsumer { + base: consumer, + fold_op: &self.fold_op, + identity: &self.identity, + full: &full, + marker: PhantomData, + }; + self.base.drive_unindexed(consumer1) + } +} + +struct TryFoldConsumer<'c, U, C, ID: 'c, F: 'c> { + base: C, + fold_op: &'c F, + identity: &'c ID, + full: &'c AtomicBool, + marker: PhantomData, +} + +impl<'r, U, T, C, ID, F> Consumer for TryFoldConsumer<'r, U, C, ID, F> + where C: Consumer, + F: Fn(U::Ok, T) -> U + Sync, + ID: Fn() -> U::Ok + Sync, + U: Try + Send +{ + type Folder = TryFoldFolder<'r, C::Folder, U, F>; + type Reducer = C::Reducer; + type Result = C::Result; + + fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { + let (left, right, reducer) = self.base.split_at(index); + (TryFoldConsumer { base: left, ..self }, TryFoldConsumer { base: right, ..self }, reducer) + } + + fn into_folder(self) -> Self::Folder { + TryFoldFolder { + base: self.base.into_folder(), + result: Ok((self.identity)()), + fold_op: self.fold_op, + full: self.full, + } + } + + fn full(&self) -> bool { + self.full.load(Ordering::Relaxed) || self.base.full() + } +} + +impl<'r, U, T, C, ID, F> UnindexedConsumer for TryFoldConsumer<'r, U, C, ID, F> + where C: UnindexedConsumer, + F: Fn(U::Ok, T) -> U + Sync, + ID: Fn() -> U::Ok + Sync, + U: Try + Send +{ + fn split_off_left(&self) -> Self { + TryFoldConsumer { base: self.base.split_off_left(), ..*self } + } + + fn to_reducer(&self) -> Self::Reducer { + self.base.to_reducer() + } +} + +struct TryFoldFolder<'r, C, U: Try, F: 'r> { + base: C, + fold_op: &'r F, + full: &'r AtomicBool, + result: Result, +} + +impl<'r, C, U, F, T> Folder for TryFoldFolder<'r, C, U, F> + where C: Folder, + F: Fn(U::Ok, T) -> U + Sync, + U: Try +{ + type Result = C::Result; + + fn consume(self, item: T) -> Self { + let fold_op = self.fold_op; + let result = self.result.and_then(|acc| { + fold_op(acc, item).into_result() + }); + TryFoldFolder { + result: result, + ..self + } + } + + fn consume_iter(self, iter: I) -> Self + where I: IntoIterator + { + let fold_op = self.fold_op; + let result = self.result.and_then(|mut acc| { + for item in iter { + acc = fold_op(acc, item)?; + } + Ok(acc) + }); + TryFoldFolder { + result: result, + ..self + } + } + + fn complete(self) -> C::Result { + let item = match self.result { + Ok(ok) => U::from_ok(ok), + Err(error) => U::from_error(error), + }; + self.base.consume(item).complete() + } + + fn full(&self) -> bool { + self.result.is_err() || self.full.load(Ordering::Relaxed) || self.base.full() + } +} + +// /////////////////////////////////////////////////////////////////////////// + +pub fn try_fold_with(base: I, item: U::Ok, fold_op: F) -> TryFoldWith + where I: ParallelIterator, + F: Fn(U::Ok, I::Item) -> U + Sync, + U: Try + Send, + U::Ok: Clone + Send +{ + TryFoldWith { + base: base, + item: item, + fold_op: fold_op, + } +} + +/// `TryFoldWith` is an iterator that applies a function over an iterator producing a single value. +/// This struct is created by the [`try_fold_with()`] method on [`ParallelIterator`] +/// +/// [`try_fold_with()`]: trait.ParallelIterator.html#method.try_fold_with +/// [`ParallelIterator`]: trait.ParallelIterator.html +#[must_use = "iterator adaptors are lazy and do nothing unless consumed"] +#[derive(Clone)] +pub struct TryFoldWith { + base: I, + item: U::Ok, + fold_op: F, +} + +impl Debug for TryFoldWith + where U::Ok: Debug +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("TryFoldWith") + .field("base", &self.base) + .field("item", &self.item) + .finish() + } +} + +impl ParallelIterator for TryFoldWith + where I: ParallelIterator, + F: Fn(U::Ok, I::Item) -> U + Sync + Send, + U: Try + Send, + U::Ok: Clone + Send +{ + type Item = U; + + fn drive_unindexed(self, consumer: C) -> C::Result + where C: UnindexedConsumer + { + let full = AtomicBool::new(false); + let consumer1 = TryFoldWithConsumer { + base: consumer, + item: self.item, + fold_op: &self.fold_op, + full: &full, + }; + self.base.drive_unindexed(consumer1) + } +} + +struct TryFoldWithConsumer<'c, C, U: Try, F: 'c> { + base: C, + item: U::Ok, + fold_op: &'c F, + full: &'c AtomicBool, +} + +impl<'r, U, T, C, F> Consumer for TryFoldWithConsumer<'r, C, U, F> + where C: Consumer, + F: Fn(U::Ok, T) -> U + Sync, + U: Try + Send, + U::Ok: Clone + Send +{ + type Folder = TryFoldFolder<'r, C::Folder, U, F>; + type Reducer = C::Reducer; + type Result = C::Result; + + fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { + let (left, right, reducer) = self.base.split_at(index); + (TryFoldWithConsumer { base: left, item: self.item.clone(), ..self }, + TryFoldWithConsumer { base: right, ..self }, reducer) + } + + fn into_folder(self) -> Self::Folder { + TryFoldFolder { + base: self.base.into_folder(), + result: Ok(self.item), + fold_op: self.fold_op, + full: self.full, + } + } + + fn full(&self) -> bool { + self.full.load(Ordering::Relaxed) || self.base.full() + } +} + +impl<'r, U, T, C, F> UnindexedConsumer for TryFoldWithConsumer<'r, C, U, F> + where C: UnindexedConsumer, + F: Fn(U::Ok, T) -> U + Sync, + U: Try + Send, + U::Ok: Clone + Send +{ + fn split_off_left(&self) -> Self { + TryFoldWithConsumer { base: self.base.split_off_left(), item: self.item.clone(), ..*self } + } + + fn to_reducer(&self) -> Self::Reducer { + self.base.to_reducer() + } +} diff --git a/src/iter/try_reduce.rs b/src/iter/try_reduce.rs new file mode 100644 index 000000000..6ae556133 --- /dev/null +++ b/src/iter/try_reduce.rs @@ -0,0 +1,134 @@ +use super::ParallelIterator; +use super::plumbing::*; + +use std::ops::Try; +use std::sync::atomic::{AtomicBool, Ordering}; + +pub fn try_reduce(pi: PI, identity: ID, reduce_op: R) -> T + where PI: ParallelIterator, + R: Fn(T::Ok, T::Ok) -> T + Sync, + ID: Fn() -> T::Ok + Sync, + T: Try + Send +{ + let full = AtomicBool::new(false); + let consumer = TryReduceConsumer { + identity: &identity, + reduce_op: &reduce_op, + full: &full, + }; + pi.drive_unindexed(consumer) +} + +struct TryReduceConsumer<'r, R: 'r, ID: 'r> { + identity: &'r ID, + reduce_op: &'r R, + full: &'r AtomicBool, +} + +impl<'r, R, ID> Copy for TryReduceConsumer<'r, R, ID> {} + +impl<'r, R, ID> Clone for TryReduceConsumer<'r, R, ID> { + fn clone(&self) -> Self { + *self + } +} + +impl<'r, R, ID, T> Consumer for TryReduceConsumer<'r, R, ID> + where R: Fn(T::Ok, T::Ok) -> T + Sync, + ID: Fn() -> T::Ok + Sync, + T: Try + Send +{ + type Folder = TryReduceFolder<'r, R, T>; + type Reducer = Self; + type Result = T; + + fn split_at(self, _index: usize) -> (Self, Self, Self) { + (self, self, self) + } + + fn into_folder(self) -> Self::Folder { + TryReduceFolder { + reduce_op: self.reduce_op, + result: Ok((self.identity)()), + full: self.full, + } + } + + fn full(&self) -> bool { + self.full.load(Ordering::Relaxed) + } +} + +impl<'r, R, ID, T> UnindexedConsumer for TryReduceConsumer<'r, R, ID> + where R: Fn(T::Ok, T::Ok) -> T + Sync, + ID: Fn() -> T::Ok + Sync, + T: Try + Send +{ + fn split_off_left(&self) -> Self { + *self + } + + fn to_reducer(&self) -> Self::Reducer { + *self + } +} + +impl<'r, R, ID, T> Reducer for TryReduceConsumer<'r, R, ID> + where R: Fn(T::Ok, T::Ok) -> T + Sync, + T: Try +{ + fn reduce(self, left: T, right: T) -> T { + (self.reduce_op)(left?, right?) + } +} + +struct TryReduceFolder<'r, R: 'r, T: Try> { + reduce_op: &'r R, + result: Result, + full: &'r AtomicBool, +} + +impl<'r, R, T> Folder for TryReduceFolder<'r, R, T> + where R: Fn(T::Ok, T::Ok) -> T, + T: Try +{ + type Result = T; + + fn consume(self, item: T) -> Self { + let reduce_op = self.reduce_op; + let result = self.result.and_then(|left| { + reduce_op(left, item?).into_result() + }); + TryReduceFolder { + result: result, + ..self + } + } + + fn consume_iter(self, iter: I) -> Self + where I: IntoIterator + { + let reduce_op = self.reduce_op; + let result = self.result.and_then(|mut left| { + for right in iter { + left = reduce_op(left, right?)?; + } + Ok(left) + }); + TryReduceFolder { + result: result, + ..self + } + } + + fn complete(self) -> T { + match self.result { + Ok(ok) => T::from_ok(ok), + Err(error) => T::from_error(error), + } + } + + fn full(&self) -> bool { + self.result.is_err() || self.full.load(Ordering::Relaxed) + } +} diff --git a/src/lib.rs b/src/lib.rs index 54646488a..105d1c2dc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ #![doc(html_root_url = "https://docs.rs/rayon/1.0")] #![deny(missing_debug_implementations)] #![deny(missing_docs)] +#![feature(try_trait)] //! Data-parallelism library that makes it easy to convert sequential //! computations into parallel From 3b91b55069c3a4d51bfff46acfce687bae12fbb1 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Thu, 5 Apr 2018 16:26:02 -0700 Subject: [PATCH 02/11] Emulate Try for stable rustc --- src/iter/mod.rs | 41 ++++++++++++++++++++++++++++++++++++++++- src/iter/try_fold.rs | 4 ++-- src/iter/try_reduce.rs | 11 +++++++---- src/lib.rs | 1 - 4 files changed, 49 insertions(+), 8 deletions(-) diff --git a/src/iter/mod.rs b/src/iter/mod.rs index 8bfdc67ab..ee14b165c 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -71,8 +71,8 @@ pub use either::Either; use std::cmp::{self, Ordering}; use std::iter::{Sum, Product}; use std::ops::Fn; -use std::ops::Try; use self::plumbing::*; +use self::private::Try; // There is a method to the madness here: // @@ -2142,3 +2142,42 @@ pub trait ParallelExtend /// ``` fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator; } + +/// We hide the `Try` trait in a private module, as it's only meant to be a +/// stable clone of the standard library's `Try` trait, as yet unstable. +mod private { + /// Clone of `std::ops::Try`. + /// + /// Implementing this trait is not permitted outside of `rayon`. + pub trait Try { + private_decl!{} + + type Ok; + type Error; + fn into_result(self) -> Result; + fn from_ok(v: Self::Ok) -> Self; + fn from_error(v: Self::Error) -> Self; + } + + impl Try for Option { + private_impl!{} + + type Ok = T; + type Error = (); + + fn into_result(self) -> Result { self.ok_or(()) } + fn from_ok(v: T) -> Self { Some(v) } + fn from_error(_: ()) -> Self { None } + } + + impl Try for Result { + private_impl!{} + + type Ok = T; + type Error = E; + + fn into_result(self) -> Result { self } + fn from_ok(v: T) -> Self { Ok(v) } + fn from_error(v: E) -> Self { Err(v) } + } +} diff --git a/src/iter/try_fold.rs b/src/iter/try_fold.rs index 60962d02e..6ddfefed1 100644 --- a/src/iter/try_fold.rs +++ b/src/iter/try_fold.rs @@ -4,7 +4,7 @@ use super::*; use std::sync::atomic::{AtomicBool, Ordering}; use std::fmt::{self, Debug}; use std::marker::PhantomData; -use std::ops::Try; +use super::private::Try; pub fn try_fold(base: I, identity: ID, fold_op: F) -> TryFold where I: ParallelIterator, @@ -148,7 +148,7 @@ impl<'r, C, U, F, T> Folder for TryFoldFolder<'r, C, U, F> let fold_op = self.fold_op; let result = self.result.and_then(|mut acc| { for item in iter { - acc = fold_op(acc, item)?; + acc = fold_op(acc, item).into_result()?; } Ok(acc) }); diff --git a/src/iter/try_reduce.rs b/src/iter/try_reduce.rs index 6ae556133..101512c19 100644 --- a/src/iter/try_reduce.rs +++ b/src/iter/try_reduce.rs @@ -1,7 +1,7 @@ use super::ParallelIterator; use super::plumbing::*; -use std::ops::Try; +use super::private::Try; use std::sync::atomic::{AtomicBool, Ordering}; pub fn try_reduce(pi: PI, identity: ID, reduce_op: R) -> T @@ -78,7 +78,10 @@ impl<'r, R, ID, T> Reducer for TryReduceConsumer<'r, R, ID> T: Try { fn reduce(self, left: T, right: T) -> T { - (self.reduce_op)(left?, right?) + match (left.into_result(), right.into_result()) { + (Ok(left), Ok(right)) => (self.reduce_op)(left, right), + (Err(e), _) | (_, Err(e)) => T::from_error(e), + } } } @@ -97,7 +100,7 @@ impl<'r, R, T> Folder for TryReduceFolder<'r, R, T> fn consume(self, item: T) -> Self { let reduce_op = self.reduce_op; let result = self.result.and_then(|left| { - reduce_op(left, item?).into_result() + reduce_op(left, item.into_result()?).into_result() }); TryReduceFolder { result: result, @@ -111,7 +114,7 @@ impl<'r, R, T> Folder for TryReduceFolder<'r, R, T> let reduce_op = self.reduce_op; let result = self.result.and_then(|mut left| { for right in iter { - left = reduce_op(left, right?)?; + left = reduce_op(left, right.into_result()?).into_result()?; } Ok(left) }); diff --git a/src/lib.rs b/src/lib.rs index 105d1c2dc..54646488a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,6 @@ #![doc(html_root_url = "https://docs.rs/rayon/1.0")] #![deny(missing_debug_implementations)] #![deny(missing_docs)] -#![feature(try_trait)] //! Data-parallelism library that makes it easy to convert sequential //! computations into parallel From b407051d497cbe89bac9e5014d5cd64c21322c37 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Thu, 5 Apr 2018 17:07:16 -0700 Subject: [PATCH 03/11] Add try_for_each and try_for_each_with --- src/iter/mod.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/iter/mod.rs b/src/iter/mod.rs index ee14b165c..a05cdb58a 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -365,6 +365,25 @@ pub trait ParallelIterator: Sized + Send { self.map_with(init, op).for_each(|()| ()) } + /// TODO + fn try_for_each(self, op: OP) -> R + where OP: Fn(Self::Item) -> R + Sync + Send, + R: Try + Send + { + self.try_fold_with((), move |(), x| op(x)) + .try_reduce(|| (), |(), ()| R::from_ok(())) + } + + /// TODO + fn try_for_each_with(self, init: T, op: OP) -> R + where OP: Fn(&mut T, Self::Item) -> R + Sync + Send, + T: Send + Clone, + R: Try + Send + { + self.map_with(init, op) + .try_reduce(|| (), |(), ()| R::from_ok(())) + } + /// Counts the number of items in this parallel iterator. /// /// # Examples From 2104bc6c1c2fb919726f5d7e9a0a29a72de7e655 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Thu, 5 Apr 2018 21:20:22 -0700 Subject: [PATCH 04/11] Fix try_fold/reduce fullness --- src/iter/try_fold.rs | 17 ++--------------- src/iter/try_reduce.rs | 17 ++--------------- 2 files changed, 4 insertions(+), 30 deletions(-) diff --git a/src/iter/try_fold.rs b/src/iter/try_fold.rs index 6ddfefed1..70e27ff68 100644 --- a/src/iter/try_fold.rs +++ b/src/iter/try_fold.rs @@ -136,22 +136,9 @@ impl<'r, C, U, F, T> Folder for TryFoldFolder<'r, C, U, F> let result = self.result.and_then(|acc| { fold_op(acc, item).into_result() }); - TryFoldFolder { - result: result, - ..self + if result.is_err() { + self.full.store(true, Ordering::Relaxed); } - } - - fn consume_iter(self, iter: I) -> Self - where I: IntoIterator - { - let fold_op = self.fold_op; - let result = self.result.and_then(|mut acc| { - for item in iter { - acc = fold_op(acc, item).into_result()?; - } - Ok(acc) - }); TryFoldFolder { result: result, ..self diff --git a/src/iter/try_reduce.rs b/src/iter/try_reduce.rs index 101512c19..2ca809b21 100644 --- a/src/iter/try_reduce.rs +++ b/src/iter/try_reduce.rs @@ -102,22 +102,9 @@ impl<'r, R, T> Folder for TryReduceFolder<'r, R, T> let result = self.result.and_then(|left| { reduce_op(left, item.into_result()?).into_result() }); - TryReduceFolder { - result: result, - ..self + if result.is_err() { + self.full.store(true, Ordering::Relaxed) } - } - - fn consume_iter(self, iter: I) -> Self - where I: IntoIterator - { - let reduce_op = self.reduce_op; - let result = self.result.and_then(|mut left| { - for right in iter { - left = reduce_op(left, right.into_result()?).into_result()?; - } - Ok(left) - }); TryReduceFolder { result: result, ..self From dedb06c54c8a996a07641479f3b6d6f18a15bddb Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Thu, 5 Apr 2018 23:05:31 -0700 Subject: [PATCH 05/11] Add try_reduce_with --- src/iter/mod.rs | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/iter/mod.rs b/src/iter/mod.rs index a05cdb58a..3b7ab6809 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -708,6 +708,35 @@ pub trait ParallelIterator: Sized + Send { try_reduce::try_reduce(self, identity, op) } + /// TODO + fn try_reduce_with(self, op: OP) -> Option + where OP: Fn(T, T) -> Self::Item + Sync + Send, + Self::Item: Try, + ::Ok: Send, + ::Error: Send + { + let result = self.try_fold( + || None, + |opt_a, try_b| match (opt_a, try_b.into_result()) { + (Some(a), Ok(b)) => op(a, b).into_result().map(Some), + (_, res_b) => res_b.map(Some), + }, + ).try_reduce( + || None, + |opt_a, opt_b| match (opt_a, opt_b) { + (Some(a), Some(b)) => op(a, b).into_result().map(Some), + (Some(v), None) | (None, Some(v)) => Ok(Some(v)), + (None, None) => Ok(None), + }, + ); + + match result { + Ok(None) => None, + Ok(Some(v)) => Some(Self::Item::from_ok(v)), + Err(e) => Some(Self::Item::from_error(e)), + } + } + /// Parallel fold is similar to sequential fold except that the /// sequence of items may be subdivided before it is /// folded. Consider a list of numbers like `22 3 77 89 46`. If From 1257f479b82a31b4911dc6b931b1f785ec59b1eb Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Thu, 5 Apr 2018 23:51:45 -0700 Subject: [PATCH 06/11] Avoid two-layer tries, doubling short-circuit logic --- src/iter/mod.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/iter/mod.rs b/src/iter/mod.rs index 3b7ab6809..c2c776390 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -370,8 +370,7 @@ pub trait ParallelIterator: Sized + Send { where OP: Fn(Self::Item) -> R + Sync + Send, R: Try + Send { - self.try_fold_with((), move |(), x| op(x)) - .try_reduce(|| (), |(), ()| R::from_ok(())) + self.map(op).try_reduce(|| (), |(), ()| R::from_ok(())) } /// TODO @@ -715,13 +714,8 @@ pub trait ParallelIterator: Sized + Send { ::Ok: Send, ::Error: Send { - let result = self.try_fold( - || None, - |opt_a, try_b| match (opt_a, try_b.into_result()) { - (Some(a), Ok(b)) => op(a, b).into_result().map(Some), - (_, res_b) => res_b.map(Some), - }, - ).try_reduce( + // Map into `Result, Error>`, then reduce it. + let result = self.map(|try_b| try_b.into_result().map(Some)).try_reduce( || None, |opt_a, opt_b| match (opt_a, opt_b) { (Some(a), Some(b)) => op(a, b).into_result().map(Some), @@ -730,6 +724,7 @@ pub trait ParallelIterator: Sized + Send { }, ); + // Map `Result, Error>` back to `Option`. match result { Ok(None) => None, Ok(Some(v)) => Some(Self::Item::from_ok(v)), From 8842e9dd3c09eac2086e17e5fc400ce5bde5b9d9 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Wed, 6 Jun 2018 14:27:02 -0700 Subject: [PATCH 07/11] simplify TryReduceFolder::full --- src/iter/try_reduce.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/iter/try_reduce.rs b/src/iter/try_reduce.rs index 2ca809b21..6d9513dc5 100644 --- a/src/iter/try_reduce.rs +++ b/src/iter/try_reduce.rs @@ -119,6 +119,6 @@ impl<'r, R, T> Folder for TryReduceFolder<'r, R, T> } fn full(&self) -> bool { - self.result.is_err() || self.full.load(Ordering::Relaxed) + self.full.load(Ordering::Relaxed) } } From a4aef0524dd5059f961e92f734777a8001ab9524 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Wed, 6 Jun 2018 14:45:34 -0700 Subject: [PATCH 08/11] Remove the global short-circuit from try_fold It will now only consider the locally-folded items for short-circuit behavior. It's expected that this will commonly be followed by a `try_reduce` or the like with its own global effect. --- src/iter/try_fold.rs | 23 +++++------------------ 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/src/iter/try_fold.rs b/src/iter/try_fold.rs index 70e27ff68..10134614b 100644 --- a/src/iter/try_fold.rs +++ b/src/iter/try_fold.rs @@ -1,7 +1,6 @@ use super::plumbing::*; use super::*; -use std::sync::atomic::{AtomicBool, Ordering}; use std::fmt::{self, Debug}; use std::marker::PhantomData; use super::private::Try; @@ -53,12 +52,10 @@ impl ParallelIterator for TryFold fn drive_unindexed(self, consumer: C) -> C::Result where C: UnindexedConsumer { - let full = AtomicBool::new(false); let consumer1 = TryFoldConsumer { base: consumer, - fold_op: &self.fold_op, identity: &self.identity, - full: &full, + fold_op: &self.fold_op, marker: PhantomData, }; self.base.drive_unindexed(consumer1) @@ -67,9 +64,8 @@ impl ParallelIterator for TryFold struct TryFoldConsumer<'c, U, C, ID: 'c, F: 'c> { base: C, - fold_op: &'c F, identity: &'c ID, - full: &'c AtomicBool, + fold_op: &'c F, marker: PhantomData, } @@ -93,12 +89,11 @@ impl<'r, U, T, C, ID, F> Consumer for TryFoldConsumer<'r, U, C, ID, F> base: self.base.into_folder(), result: Ok((self.identity)()), fold_op: self.fold_op, - full: self.full, } } fn full(&self) -> bool { - self.full.load(Ordering::Relaxed) || self.base.full() + self.base.full() } } @@ -120,7 +115,6 @@ impl<'r, U, T, C, ID, F> UnindexedConsumer for TryFoldConsumer<'r, U, C, ID, struct TryFoldFolder<'r, C, U: Try, F: 'r> { base: C, fold_op: &'r F, - full: &'r AtomicBool, result: Result, } @@ -136,9 +130,6 @@ impl<'r, C, U, F, T> Folder for TryFoldFolder<'r, C, U, F> let result = self.result.and_then(|acc| { fold_op(acc, item).into_result() }); - if result.is_err() { - self.full.store(true, Ordering::Relaxed); - } TryFoldFolder { result: result, ..self @@ -154,7 +145,7 @@ impl<'r, C, U, F, T> Folder for TryFoldFolder<'r, C, U, F> } fn full(&self) -> bool { - self.result.is_err() || self.full.load(Ordering::Relaxed) || self.base.full() + self.result.is_err() || self.base.full() } } @@ -208,12 +199,10 @@ impl ParallelIterator for TryFoldWith fn drive_unindexed(self, consumer: C) -> C::Result where C: UnindexedConsumer { - let full = AtomicBool::new(false); let consumer1 = TryFoldWithConsumer { base: consumer, item: self.item, fold_op: &self.fold_op, - full: &full, }; self.base.drive_unindexed(consumer1) } @@ -223,7 +212,6 @@ struct TryFoldWithConsumer<'c, C, U: Try, F: 'c> { base: C, item: U::Ok, fold_op: &'c F, - full: &'c AtomicBool, } impl<'r, U, T, C, F> Consumer for TryFoldWithConsumer<'r, C, U, F> @@ -247,12 +235,11 @@ impl<'r, U, T, C, F> Consumer for TryFoldWithConsumer<'r, C, U, F> base: self.base.into_folder(), result: Ok(self.item), fold_op: self.fold_op, - full: self.full, } } fn full(&self) -> bool { - self.full.load(Ordering::Relaxed) || self.base.full() + self.base.full() } } From 93d96bd3448ec5e02f7d3250ca29ad9eeb08efa7 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Tue, 12 Jun 2018 13:10:05 -0700 Subject: [PATCH 09/11] Break out try_reduce_with to avoid unwanted constraints Even though we already know `Self::Item: Send`, this doesn't apply constraints to its `Try::Ok` and `Error`. Therefore, if we map to `Result, Error>` at a high level, we have to require extra `Send` bounds to make sure that map is `Send`, like: ::Ok: Send, ::Error: Send We can avoid this by implementing a custom `Consumer`, where we only hold it as a `Result` in the low level `Folder` regardless of `Send`. --- src/iter/mod.rs | 20 +----- src/iter/try_reduce_with.rs | 130 ++++++++++++++++++++++++++++++++++++ 2 files changed, 132 insertions(+), 18 deletions(-) create mode 100644 src/iter/try_reduce_with.rs diff --git a/src/iter/mod.rs b/src/iter/mod.rs index c2c776390..6c428868e 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -111,6 +111,7 @@ mod try_fold; pub use self::try_fold::{TryFold, TryFoldWith}; mod reduce; mod try_reduce; +mod try_reduce_with; mod skip; pub use self::skip::Skip; mod splitter; @@ -711,25 +712,8 @@ pub trait ParallelIterator: Sized + Send { fn try_reduce_with(self, op: OP) -> Option where OP: Fn(T, T) -> Self::Item + Sync + Send, Self::Item: Try, - ::Ok: Send, - ::Error: Send { - // Map into `Result, Error>`, then reduce it. - let result = self.map(|try_b| try_b.into_result().map(Some)).try_reduce( - || None, - |opt_a, opt_b| match (opt_a, opt_b) { - (Some(a), Some(b)) => op(a, b).into_result().map(Some), - (Some(v), None) | (None, Some(v)) => Ok(Some(v)), - (None, None) => Ok(None), - }, - ); - - // Map `Result, Error>` back to `Option`. - match result { - Ok(None) => None, - Ok(Some(v)) => Some(Self::Item::from_ok(v)), - Err(e) => Some(Self::Item::from_error(e)), - } + try_reduce_with::try_reduce_with(self, op) } /// Parallel fold is similar to sequential fold except that the diff --git a/src/iter/try_reduce_with.rs b/src/iter/try_reduce_with.rs new file mode 100644 index 000000000..fae0c32c4 --- /dev/null +++ b/src/iter/try_reduce_with.rs @@ -0,0 +1,130 @@ +use super::ParallelIterator; +use super::plumbing::*; + +use super::private::Try; +use std::sync::atomic::{AtomicBool, Ordering}; + +pub fn try_reduce_with(pi: PI, reduce_op: R) -> Option + where PI: ParallelIterator, + R: Fn(T::Ok, T::Ok) -> T + Sync, + T: Try + Send +{ + let full = AtomicBool::new(false); + let consumer = TryReduceWithConsumer { + reduce_op: &reduce_op, + full: &full, + }; + pi.drive_unindexed(consumer) +} + +struct TryReduceWithConsumer<'r, R: 'r> { + reduce_op: &'r R, + full: &'r AtomicBool, +} + +impl<'r, R> Copy for TryReduceWithConsumer<'r, R> {} + +impl<'r, R> Clone for TryReduceWithConsumer<'r, R> { + fn clone(&self) -> Self { + *self + } +} + +impl<'r, R, T> Consumer for TryReduceWithConsumer<'r, R> + where R: Fn(T::Ok, T::Ok) -> T + Sync, + T: Try + Send +{ + type Folder = TryReduceWithFolder<'r, R, T>; + type Reducer = Self; + type Result = Option; + + fn split_at(self, _index: usize) -> (Self, Self, Self) { + (self, self, self) + } + + fn into_folder(self) -> Self::Folder { + TryReduceWithFolder { + reduce_op: self.reduce_op, + opt_result: None, + full: self.full, + } + } + + fn full(&self) -> bool { + self.full.load(Ordering::Relaxed) + } +} + +impl<'r, R, T> UnindexedConsumer for TryReduceWithConsumer<'r, R> + where R: Fn(T::Ok, T::Ok) -> T + Sync, + T: Try + Send +{ + fn split_off_left(&self) -> Self { + *self + } + + fn to_reducer(&self) -> Self::Reducer { + *self + } +} + +impl<'r, R, T> Reducer> for TryReduceWithConsumer<'r, R> + where R: Fn(T::Ok, T::Ok) -> T + Sync, + T: Try +{ + fn reduce(self, left: Option, right: Option) -> Option { + let reduce_op = self.reduce_op; + match (left, right) { + (None, x) | (x, None) => x, + (Some(a), Some(b)) => { + match (a.into_result(), b.into_result()) { + (Ok(a), Ok(b)) => Some(reduce_op(a, b)), + (Err(e), _) | (_, Err(e)) => Some(T::from_error(e)), + } + } + } + } +} + +struct TryReduceWithFolder<'r, R: 'r, T: Try> { + reduce_op: &'r R, + opt_result: Option>, + full: &'r AtomicBool, +} + +impl<'r, R, T> Folder for TryReduceWithFolder<'r, R, T> + where R: Fn(T::Ok, T::Ok) -> T, + T: Try +{ + type Result = Option; + + fn consume(self, item: T) -> Self { + let reduce_op = self.reduce_op; + let result = match self.opt_result { + None => item.into_result(), + Some(Ok(a)) => match item.into_result() { + Ok(b) => reduce_op(a, b).into_result(), + Err(e) => Err(e), + }, + Some(Err(e)) => Err(e), + }; + if result.is_err() { + self.full.store(true, Ordering::Relaxed) + } + TryReduceWithFolder { + opt_result: Some(result), + ..self + } + } + + fn complete(self) -> Option { + self.opt_result.map(|result| match result { + Ok(ok) => T::from_ok(ok), + Err(error) => T::from_error(error), + }) + } + + fn full(&self) -> bool { + self.full.load(Ordering::Relaxed) + } +} From b864f484b80f090aba83b35ac40ffdd8ebf7eebe Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Tue, 12 Jun 2018 14:10:58 -0700 Subject: [PATCH 10/11] Document ParallelIterator::try_* --- src/iter/mod.rs | 168 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 162 insertions(+), 6 deletions(-) diff --git a/src/iter/mod.rs b/src/iter/mod.rs index 6c428868e..8967eade0 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -66,6 +66,12 @@ //! [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html //! [split]: fn.split.html //! [plumbing]: plumbing +//! +//! Note: Several of the `ParallelIterator` methods rely on a `Try` trait which +//! has been deliberately obscured from the public API. This trait is intended +//! to mirror the unstable `std::ops::Try` with implementations for `Option` and +//! `Result`, where `Some`/`Ok` values will let those iterators continue, but +//! `None`/`Err` values will exit early. pub use either::Either; use std::cmp::{self, Ordering}; @@ -366,7 +372,26 @@ pub trait ParallelIterator: Sized + Send { self.map_with(init, op).for_each(|()| ()) } - /// TODO + /// Executes a fallible `OP` on each item produced by the iterator, in parallel. + /// + /// If the `OP` returns `Result::Err` or `Option::None`, we will attempt to + /// stop processing the rest of the items in the iterator as soon as + /// possible, and we will return that terminating value. Otherwise, we will + /// return an empty `Result::Ok(())` or `Option::Some(())`. If there are + /// multiple errors in parallel, it is not specified which will be returned. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// use std::io::{self, Write}; + /// + /// // This will stop iteration early if there's any write error, like + /// // having piped output get closed on the other end. + /// (0..100).into_par_iter() + /// .try_for_each(|x| writeln!(io::stdout(), "{:?}", x)) + /// .expect("expected no write errors"); + /// ``` fn try_for_each(self, op: OP) -> R where OP: Fn(Self::Item) -> R + Sync + Send, R: Try + Send @@ -374,7 +399,33 @@ pub trait ParallelIterator: Sized + Send { self.map(op).try_reduce(|| (), |(), ()| R::from_ok(())) } - /// TODO + /// Executes a fallible `OP` on the given `init` value with each item + /// produced by the iterator, in parallel. + /// + /// This combines the `init` semantics of [`for_each_with()`] and the + /// failure semantics of [`try_for_each()`]. + /// + /// [`for_each_with()`]: #method.for_each_with + /// [`try_for_each()`]: #method.try_for_each + /// + /// # Examples + /// + /// ``` + /// use std::sync::mpsc::channel; + /// use rayon::prelude::*; + /// + /// let (sender, receiver) = channel(); + /// + /// (0..5).into_par_iter() + /// .try_for_each_with(sender, |s, x| s.send(x)) + /// .expect("expected no send errors"); + /// + /// let mut res: Vec<_> = receiver.iter().collect(); + /// + /// res.sort(); + /// + /// assert_eq!(&res[..], &[0, 1, 2, 3, 4]) + /// ``` fn try_for_each_with(self, init: T, op: OP) -> R where OP: Fn(&mut T, Self::Item) -> R + Sync + Send, T: Send + Clone, @@ -699,7 +750,37 @@ pub trait ParallelIterator: Sized + Send { }) } - /// TODO + /// Reduces the items in the iterator into one item using a fallible `op`. + /// The `identity` argument is used the same way as in [`reduce()`]. + /// + /// [`reduce()`]: #method.reduce + /// + /// If a `Result::Err` or `Option::None` item is found, or if `op` reduces + /// to one, we will attempt to stop processing the rest of the items in the + /// iterator as soon as possible, and we will return that terminating value. + /// Otherwise, we will return the final reduced `Result::Ok(T)` or + /// `Option::Some(T)`. If there are multiple errors in parallel, it is not + /// specified which will be returned. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// // Compute the sum of squares, being careful about overflow. + /// fn sum_squares>(iter: I) -> Option { + /// iter.into_par_iter() + /// .map(|i| i.checked_mul(i)) // square each item, + /// .try_reduce(|| 0, i32::checked_add) // and add them up! + /// } + /// assert_eq!(sum_squares(0..5), Some(0 + 1 + 4 + 9 + 16)); + /// + /// // The sum might overflow + /// assert_eq!(sum_squares(0..10_000), None); + /// + /// // Or the squares might overflow before it even reaches `try_reduce` + /// assert_eq!(sum_squares(1_000_000..1_000_001), None); + /// ``` fn try_reduce(self, identity: ID, op: OP) -> Self::Item where OP: Fn(T, T) -> Self::Item + Sync + Send, ID: Fn() -> T + Sync + Send, @@ -708,7 +789,41 @@ pub trait ParallelIterator: Sized + Send { try_reduce::try_reduce(self, identity, op) } - /// TODO + /// Reduces the items in the iterator into one item using a fallible `op`. + /// + /// Like [`reduce_with()`], if the iterator is empty, `None` is returned; + /// otherwise, `Some` is returned. Beyond that, it behaves like + /// [`try_reduce()`] for handling `Err`/`None`. + /// + /// [`reduce_with()`]: #method.reduce_with + /// [`try_reduce()`]: #method.try_reduce + /// + /// For instance, with `Option` items, the return value may be: + /// - `None`, the iterator was empty + /// - `Some(None)`, we stopped after encountering `None`. + /// - `Some(Some(x))`, the entire iterator reduced to `x`. + /// + /// With `Result` items, the nesting is more obvious: + /// - `None`, the iterator was empty + /// - `Some(Err(e))`, we stopped after encountering an error `e`. + /// - `Some(Ok(x))`, the entire iterator reduced to `x`. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let files = ["/dev/null", "/does/not/exist"]; + /// + /// // Find the biggest file + /// files.into_par_iter() + /// .map(|path| std::fs::metadata(path).map(|m| (path, m.len()))) + /// .try_reduce_with(|a, b| { + /// Ok(if a.1 >= b.1 { a } else { b }) + /// }) + /// .expect("Some value, since the iterator is not empty") + /// .expect_err("not found"); + /// ``` fn try_reduce_with(self, op: OP) -> Option where OP: Fn(T, T) -> Self::Item + Sync + Send, Self::Item: Try, @@ -883,7 +998,31 @@ pub trait ParallelIterator: Sized + Send { fold::fold_with(self, init, fold_op) } - /// TODO + /// Perform a fallible parallel fold. + /// + /// This is a variation of [`fold()`] for operations which can fail with + /// `Option::None` or `Result::Err`. The first such failure stops + /// processing the local set of items, without affecting other folds in the + /// iterator's subdivisions. + /// + /// Often, `try_fold()` will be followed by [`try_reduce()`] + /// for a final reduction and global short-circuiting effect. + /// + /// [`fold()`]: #method.fold + /// [`try_reduce()`]: #method.try_reduce + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let bytes = 0..22_u8; + /// let sum = bytes.into_par_iter() + /// .try_fold(|| 0_u32, |a: u32, b: u8| a.checked_add(b as u32)) + /// .try_reduce(|| 0, u32::checked_add); + /// + /// assert_eq!(sum, Some((0..22).sum())); // compare to sequential + /// ``` fn try_fold(self, identity: ID, fold_op: F) -> TryFold where F: Fn(T, Self::Item) -> R + Sync + Send, ID: Fn() -> T + Sync + Send, @@ -892,7 +1031,24 @@ pub trait ParallelIterator: Sized + Send { try_fold::try_fold(self, identity, fold_op) } - /// TODO + /// Perform a fallible parallel fold with a cloneable `init` value. + /// + /// This combines the `init` semantics of [`fold_with()`] and the failure + /// semantics of [`try_fold()`]. + /// + /// [`fold_with()`]: #method.fold_with + /// [`try_fold()`]: #method.try_fold + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let bytes = 0..22_u8; + /// let sum = bytes.into_par_iter() + /// .try_fold_with(0_u32, |a: u32, b: u8| a.checked_add(b as u32)) + /// .try_reduce(|| 0, u32::checked_add); + /// + /// assert_eq!(sum, Some((0..22).sum())); // compare to sequential + /// ``` fn try_fold_with(self, init: T, fold_op: F) -> TryFoldWith where F: Fn(T, Self::Item) -> R + Sync + Send, R: Try + Send, From 800c736267d6bf18287c3c9ec05c6d7b5f977ab1 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Tue, 19 Jun 2018 15:51:32 -0700 Subject: [PATCH 11/11] Add generic tests for try_fold --- src/compile_fail/must_use.rs | 2 ++ tests/clones.rs | 2 ++ tests/debug.rs | 2 ++ 3 files changed, 6 insertions(+) diff --git a/src/compile_fail/must_use.rs b/src/compile_fail/must_use.rs index ab5b75e57..c74031502 100644 --- a/src/compile_fail/must_use.rs +++ b/src/compile_fail/must_use.rs @@ -27,6 +27,8 @@ must_use! { flatten /** v.par_iter().flatten(); */ fold /** v.par_iter().fold(|| 0, |x, _| x); */ fold_with /** v.par_iter().fold_with(0, |x, _| x); */ + try_fold /** v.par_iter().try_fold(|| 0, |x, _| Some(x)); */ + try_fold_with /** v.par_iter().try_fold_with(0, |x, _| Some(x)); */ inspect /** v.par_iter().inspect(|_| {}); */ interleave /** v.par_iter().interleave(&v); */ interleave_shortest /** v.par_iter().interleave_shortest(&v); */ diff --git a/tests/clones.rs b/tests/clones.rs index 7b54b91e0..f7054d0c6 100644 --- a/tests/clones.rs +++ b/tests/clones.rs @@ -114,6 +114,8 @@ fn clone_adaptors() { check(v.par_iter().flatten()); check(v.par_iter().with_max_len(1).fold(|| 0, |x, _| x)); check(v.par_iter().with_max_len(1).fold_with(0, |x, _| x)); + check(v.par_iter().with_max_len(1).try_fold(|| 0, |_, &x| x)); + check(v.par_iter().with_max_len(1).try_fold_with(0, |_, &x| x)); check(v.par_iter().inspect(|_| ())); check(v.par_iter().update(|_| ())); check(v.par_iter().interleave(&v)); diff --git a/tests/debug.rs b/tests/debug.rs index d5fe3611b..cf499fbda 100644 --- a/tests/debug.rs +++ b/tests/debug.rs @@ -125,6 +125,8 @@ fn debug_adaptors() { check(v.par_iter().map(Some).flatten()); check(v.par_iter().fold(|| 0, |x, _| x)); check(v.par_iter().fold_with(0, |x, _| x)); + check(v.par_iter().try_fold(|| 0, |x, _| Some(x))); + check(v.par_iter().try_fold_with(0, |x, _| Some(x))); check(v.par_iter().inspect(|_| ())); check(v.par_iter().update(|_| ())); check(v.par_iter().interleave(&v));