diff --git a/src/compile_fail/must_use.rs b/src/compile_fail/must_use.rs index ab5b75e57..c74031502 100644 --- a/src/compile_fail/must_use.rs +++ b/src/compile_fail/must_use.rs @@ -27,6 +27,8 @@ must_use! { flatten /** v.par_iter().flatten(); */ fold /** v.par_iter().fold(|| 0, |x, _| x); */ fold_with /** v.par_iter().fold_with(0, |x, _| x); */ + try_fold /** v.par_iter().try_fold(|| 0, |x, _| Some(x)); */ + try_fold_with /** v.par_iter().try_fold_with(0, |x, _| Some(x)); */ inspect /** v.par_iter().inspect(|_| {}); */ interleave /** v.par_iter().interleave(&v); */ interleave_shortest /** v.par_iter().interleave_shortest(&v); */ diff --git a/src/iter/fold.rs b/src/iter/fold.rs index 1e5d84a02..d60cfb5bb 100644 --- a/src/iter/fold.rs +++ b/src/iter/fold.rs @@ -140,7 +140,7 @@ impl<'r, C, ID, F, T> Folder for FoldFolder<'r, C, ID, F> pub fn fold_with(base: I, item: U, fold_op: F) -> FoldWith where I: ParallelIterator, - F: Fn(U, I::Item) -> U + Sync, + F: Fn(U, I::Item) -> U + Sync + Send, U: Send + Clone { FoldWith { diff --git a/src/iter/mod.rs b/src/iter/mod.rs index dace49562..76d6ff297 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -66,12 +66,19 @@ //! [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html //! [split]: fn.split.html //! [plumbing]: plumbing +//! +//! Note: Several of the `ParallelIterator` methods rely on a `Try` trait which +//! has been deliberately obscured from the public API. This trait is intended +//! to mirror the unstable `std::ops::Try` with implementations for `Option` and +//! `Result`, where `Some`/`Ok` values will let those iterators continue, but +//! `None`/`Err` values will exit early. pub use either::Either; use std::cmp::{self, Ordering}; use std::iter::{Sum, Product}; use std::ops::Fn; use self::plumbing::*; +use self::private::Try; // There is a method to the madness here: // @@ -109,7 +116,11 @@ pub mod plumbing; mod for_each; mod fold; pub use self::fold::{Fold, FoldWith}; +mod try_fold; +pub use self::try_fold::{TryFold, TryFoldWith}; mod reduce; +mod try_reduce; +mod try_reduce_with; mod skip; pub use self::skip::Skip; mod splitter; @@ -364,6 +375,69 @@ pub trait ParallelIterator: Sized + Send { self.map_with(init, op).for_each(|()| ()) } + /// Executes a fallible `OP` on each item produced by the iterator, in parallel. + /// + /// If the `OP` returns `Result::Err` or `Option::None`, we will attempt to + /// stop processing the rest of the items in the iterator as soon as + /// possible, and we will return that terminating value. Otherwise, we will + /// return an empty `Result::Ok(())` or `Option::Some(())`. If there are + /// multiple errors in parallel, it is not specified which will be returned. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// use std::io::{self, Write}; + /// + /// // This will stop iteration early if there's any write error, like + /// // having piped output get closed on the other end. + /// (0..100).into_par_iter() + /// .try_for_each(|x| writeln!(io::stdout(), "{:?}", x)) + /// .expect("expected no write errors"); + /// ``` + fn try_for_each(self, op: OP) -> R + where OP: Fn(Self::Item) -> R + Sync + Send, + R: Try + Send + { + self.map(op).try_reduce(|| (), |(), ()| R::from_ok(())) + } + + /// Executes a fallible `OP` on the given `init` value with each item + /// produced by the iterator, in parallel. + /// + /// This combines the `init` semantics of [`for_each_with()`] and the + /// failure semantics of [`try_for_each()`]. + /// + /// [`for_each_with()`]: #method.for_each_with + /// [`try_for_each()`]: #method.try_for_each + /// + /// # Examples + /// + /// ``` + /// use std::sync::mpsc::channel; + /// use rayon::prelude::*; + /// + /// let (sender, receiver) = channel(); + /// + /// (0..5).into_par_iter() + /// .try_for_each_with(sender, |s, x| s.send(x)) + /// .expect("expected no send errors"); + /// + /// let mut res: Vec<_> = receiver.iter().collect(); + /// + /// res.sort(); + /// + /// assert_eq!(&res[..], &[0, 1, 2, 3, 4]) + /// ``` + fn try_for_each_with(self, init: T, op: OP) -> R + where OP: Fn(&mut T, Self::Item) -> R + Sync + Send, + T: Send + Clone, + R: Try + Send + { + self.map_with(init, op) + .try_reduce(|| (), |(), ()| R::from_ok(())) + } + /// Counts the number of items in this parallel iterator. /// /// # Examples @@ -679,6 +753,87 @@ pub trait ParallelIterator: Sized + Send { }) } + /// Reduces the items in the iterator into one item using a fallible `op`. + /// The `identity` argument is used the same way as in [`reduce()`]. + /// + /// [`reduce()`]: #method.reduce + /// + /// If a `Result::Err` or `Option::None` item is found, or if `op` reduces + /// to one, we will attempt to stop processing the rest of the items in the + /// iterator as soon as possible, and we will return that terminating value. + /// Otherwise, we will return the final reduced `Result::Ok(T)` or + /// `Option::Some(T)`. If there are multiple errors in parallel, it is not + /// specified which will be returned. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// // Compute the sum of squares, being careful about overflow. + /// fn sum_squares>(iter: I) -> Option { + /// iter.into_par_iter() + /// .map(|i| i.checked_mul(i)) // square each item, + /// .try_reduce(|| 0, i32::checked_add) // and add them up! + /// } + /// assert_eq!(sum_squares(0..5), Some(0 + 1 + 4 + 9 + 16)); + /// + /// // The sum might overflow + /// assert_eq!(sum_squares(0..10_000), None); + /// + /// // Or the squares might overflow before it even reaches `try_reduce` + /// assert_eq!(sum_squares(1_000_000..1_000_001), None); + /// ``` + fn try_reduce(self, identity: ID, op: OP) -> Self::Item + where OP: Fn(T, T) -> Self::Item + Sync + Send, + ID: Fn() -> T + Sync + Send, + Self::Item: Try + { + try_reduce::try_reduce(self, identity, op) + } + + /// Reduces the items in the iterator into one item using a fallible `op`. + /// + /// Like [`reduce_with()`], if the iterator is empty, `None` is returned; + /// otherwise, `Some` is returned. Beyond that, it behaves like + /// [`try_reduce()`] for handling `Err`/`None`. + /// + /// [`reduce_with()`]: #method.reduce_with + /// [`try_reduce()`]: #method.try_reduce + /// + /// For instance, with `Option` items, the return value may be: + /// - `None`, the iterator was empty + /// - `Some(None)`, we stopped after encountering `None`. + /// - `Some(Some(x))`, the entire iterator reduced to `x`. + /// + /// With `Result` items, the nesting is more obvious: + /// - `None`, the iterator was empty + /// - `Some(Err(e))`, we stopped after encountering an error `e`. + /// - `Some(Ok(x))`, the entire iterator reduced to `x`. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let files = ["/dev/null", "/does/not/exist"]; + /// + /// // Find the biggest file + /// files.into_par_iter() + /// .map(|path| std::fs::metadata(path).map(|m| (path, m.len()))) + /// .try_reduce_with(|a, b| { + /// Ok(if a.1 >= b.1 { a } else { b }) + /// }) + /// .expect("Some value, since the iterator is not empty") + /// .expect_err("not found"); + /// ``` + fn try_reduce_with(self, op: OP) -> Option + where OP: Fn(T, T) -> Self::Item + Sync + Send, + Self::Item: Try, + { + try_reduce_with::try_reduce_with(self, op) + } + /// Parallel fold is similar to sequential fold except that the /// sequence of items may be subdivided before it is /// folded. Consider a list of numbers like `22 3 77 89 46`. If @@ -846,6 +1001,65 @@ pub trait ParallelIterator: Sized + Send { fold::fold_with(self, init, fold_op) } + /// Perform a fallible parallel fold. + /// + /// This is a variation of [`fold()`] for operations which can fail with + /// `Option::None` or `Result::Err`. The first such failure stops + /// processing the local set of items, without affecting other folds in the + /// iterator's subdivisions. + /// + /// Often, `try_fold()` will be followed by [`try_reduce()`] + /// for a final reduction and global short-circuiting effect. + /// + /// [`fold()`]: #method.fold + /// [`try_reduce()`]: #method.try_reduce + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let bytes = 0..22_u8; + /// let sum = bytes.into_par_iter() + /// .try_fold(|| 0_u32, |a: u32, b: u8| a.checked_add(b as u32)) + /// .try_reduce(|| 0, u32::checked_add); + /// + /// assert_eq!(sum, Some((0..22).sum())); // compare to sequential + /// ``` + fn try_fold(self, identity: ID, fold_op: F) -> TryFold + where F: Fn(T, Self::Item) -> R + Sync + Send, + ID: Fn() -> T + Sync + Send, + R: Try + Send + { + try_fold::try_fold(self, identity, fold_op) + } + + /// Perform a fallible parallel fold with a cloneable `init` value. + /// + /// This combines the `init` semantics of [`fold_with()`] and the failure + /// semantics of [`try_fold()`]. + /// + /// [`fold_with()`]: #method.fold_with + /// [`try_fold()`]: #method.try_fold + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let bytes = 0..22_u8; + /// let sum = bytes.into_par_iter() + /// .try_fold_with(0_u32, |a: u32, b: u8| a.checked_add(b as u32)) + /// .try_reduce(|| 0, u32::checked_add); + /// + /// assert_eq!(sum, Some((0..22).sum())); // compare to sequential + /// ``` + fn try_fold_with(self, init: T, fold_op: F) -> TryFoldWith + where F: Fn(T, Self::Item) -> R + Sync + Send, + R: Try + Send, + T: Clone + Send + { + try_fold::try_fold_with(self, init, fold_op) + } + /// Sums up the items in the iterator. /// /// Note that the order in items will be reduced is not specified, @@ -2114,3 +2328,42 @@ pub trait ParallelExtend /// ``` fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator; } + +/// We hide the `Try` trait in a private module, as it's only meant to be a +/// stable clone of the standard library's `Try` trait, as yet unstable. +mod private { + /// Clone of `std::ops::Try`. + /// + /// Implementing this trait is not permitted outside of `rayon`. + pub trait Try { + private_decl!{} + + type Ok; + type Error; + fn into_result(self) -> Result; + fn from_ok(v: Self::Ok) -> Self; + fn from_error(v: Self::Error) -> Self; + } + + impl Try for Option { + private_impl!{} + + type Ok = T; + type Error = (); + + fn into_result(self) -> Result { self.ok_or(()) } + fn from_ok(v: T) -> Self { Some(v) } + fn from_error(_: ()) -> Self { None } + } + + impl Try for Result { + private_impl!{} + + type Ok = T; + type Error = E; + + fn into_result(self) -> Result { self } + fn from_ok(v: T) -> Self { Ok(v) } + fn from_error(v: E) -> Self { Err(v) } + } +} diff --git a/src/iter/try_fold.rs b/src/iter/try_fold.rs new file mode 100644 index 000000000..10134614b --- /dev/null +++ b/src/iter/try_fold.rs @@ -0,0 +1,259 @@ +use super::plumbing::*; +use super::*; + +use std::fmt::{self, Debug}; +use std::marker::PhantomData; +use super::private::Try; + +pub fn try_fold(base: I, identity: ID, fold_op: F) -> TryFold + where I: ParallelIterator, + F: Fn(U::Ok, I::Item) -> U + Sync + Send, + ID: Fn() -> U::Ok + Sync + Send, + U: Try + Send +{ + TryFold { + base: base, + identity: identity, + fold_op: fold_op, + marker: PhantomData, + } +} + +/// `TryFold` is an iterator that applies a function over an iterator producing a single value. +/// This struct is created by the [`try_fold()`] method on [`ParallelIterator`] +/// +/// [`try_fold()`]: trait.ParallelIterator.html#method.try_fold +/// [`ParallelIterator`]: trait.ParallelIterator.html +#[must_use = "iterator adaptors are lazy and do nothing unless consumed"] +#[derive(Clone)] +pub struct TryFold { + base: I, + identity: ID, + fold_op: F, + marker: PhantomData, +} + +impl Debug for TryFold { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("TryFold") + .field("base", &self.base) + .finish() + } +} + +impl ParallelIterator for TryFold + where I: ParallelIterator, + F: Fn(U::Ok, I::Item) -> U + Sync + Send, + ID: Fn() -> U::Ok + Sync + Send, + U: Try + Send +{ + type Item = U; + + fn drive_unindexed(self, consumer: C) -> C::Result + where C: UnindexedConsumer + { + let consumer1 = TryFoldConsumer { + base: consumer, + identity: &self.identity, + fold_op: &self.fold_op, + marker: PhantomData, + }; + self.base.drive_unindexed(consumer1) + } +} + +struct TryFoldConsumer<'c, U, C, ID: 'c, F: 'c> { + base: C, + identity: &'c ID, + fold_op: &'c F, + marker: PhantomData, +} + +impl<'r, U, T, C, ID, F> Consumer for TryFoldConsumer<'r, U, C, ID, F> + where C: Consumer, + F: Fn(U::Ok, T) -> U + Sync, + ID: Fn() -> U::Ok + Sync, + U: Try + Send +{ + type Folder = TryFoldFolder<'r, C::Folder, U, F>; + type Reducer = C::Reducer; + type Result = C::Result; + + fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { + let (left, right, reducer) = self.base.split_at(index); + (TryFoldConsumer { base: left, ..self }, TryFoldConsumer { base: right, ..self }, reducer) + } + + fn into_folder(self) -> Self::Folder { + TryFoldFolder { + base: self.base.into_folder(), + result: Ok((self.identity)()), + fold_op: self.fold_op, + } + } + + fn full(&self) -> bool { + self.base.full() + } +} + +impl<'r, U, T, C, ID, F> UnindexedConsumer for TryFoldConsumer<'r, U, C, ID, F> + where C: UnindexedConsumer, + F: Fn(U::Ok, T) -> U + Sync, + ID: Fn() -> U::Ok + Sync, + U: Try + Send +{ + fn split_off_left(&self) -> Self { + TryFoldConsumer { base: self.base.split_off_left(), ..*self } + } + + fn to_reducer(&self) -> Self::Reducer { + self.base.to_reducer() + } +} + +struct TryFoldFolder<'r, C, U: Try, F: 'r> { + base: C, + fold_op: &'r F, + result: Result, +} + +impl<'r, C, U, F, T> Folder for TryFoldFolder<'r, C, U, F> + where C: Folder, + F: Fn(U::Ok, T) -> U + Sync, + U: Try +{ + type Result = C::Result; + + fn consume(self, item: T) -> Self { + let fold_op = self.fold_op; + let result = self.result.and_then(|acc| { + fold_op(acc, item).into_result() + }); + TryFoldFolder { + result: result, + ..self + } + } + + fn complete(self) -> C::Result { + let item = match self.result { + Ok(ok) => U::from_ok(ok), + Err(error) => U::from_error(error), + }; + self.base.consume(item).complete() + } + + fn full(&self) -> bool { + self.result.is_err() || self.base.full() + } +} + +// /////////////////////////////////////////////////////////////////////////// + +pub fn try_fold_with(base: I, item: U::Ok, fold_op: F) -> TryFoldWith + where I: ParallelIterator, + F: Fn(U::Ok, I::Item) -> U + Sync, + U: Try + Send, + U::Ok: Clone + Send +{ + TryFoldWith { + base: base, + item: item, + fold_op: fold_op, + } +} + +/// `TryFoldWith` is an iterator that applies a function over an iterator producing a single value. +/// This struct is created by the [`try_fold_with()`] method on [`ParallelIterator`] +/// +/// [`try_fold_with()`]: trait.ParallelIterator.html#method.try_fold_with +/// [`ParallelIterator`]: trait.ParallelIterator.html +#[must_use = "iterator adaptors are lazy and do nothing unless consumed"] +#[derive(Clone)] +pub struct TryFoldWith { + base: I, + item: U::Ok, + fold_op: F, +} + +impl Debug for TryFoldWith + where U::Ok: Debug +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("TryFoldWith") + .field("base", &self.base) + .field("item", &self.item) + .finish() + } +} + +impl ParallelIterator for TryFoldWith + where I: ParallelIterator, + F: Fn(U::Ok, I::Item) -> U + Sync + Send, + U: Try + Send, + U::Ok: Clone + Send +{ + type Item = U; + + fn drive_unindexed(self, consumer: C) -> C::Result + where C: UnindexedConsumer + { + let consumer1 = TryFoldWithConsumer { + base: consumer, + item: self.item, + fold_op: &self.fold_op, + }; + self.base.drive_unindexed(consumer1) + } +} + +struct TryFoldWithConsumer<'c, C, U: Try, F: 'c> { + base: C, + item: U::Ok, + fold_op: &'c F, +} + +impl<'r, U, T, C, F> Consumer for TryFoldWithConsumer<'r, C, U, F> + where C: Consumer, + F: Fn(U::Ok, T) -> U + Sync, + U: Try + Send, + U::Ok: Clone + Send +{ + type Folder = TryFoldFolder<'r, C::Folder, U, F>; + type Reducer = C::Reducer; + type Result = C::Result; + + fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { + let (left, right, reducer) = self.base.split_at(index); + (TryFoldWithConsumer { base: left, item: self.item.clone(), ..self }, + TryFoldWithConsumer { base: right, ..self }, reducer) + } + + fn into_folder(self) -> Self::Folder { + TryFoldFolder { + base: self.base.into_folder(), + result: Ok(self.item), + fold_op: self.fold_op, + } + } + + fn full(&self) -> bool { + self.base.full() + } +} + +impl<'r, U, T, C, F> UnindexedConsumer for TryFoldWithConsumer<'r, C, U, F> + where C: UnindexedConsumer, + F: Fn(U::Ok, T) -> U + Sync, + U: Try + Send, + U::Ok: Clone + Send +{ + fn split_off_left(&self) -> Self { + TryFoldWithConsumer { base: self.base.split_off_left(), item: self.item.clone(), ..*self } + } + + fn to_reducer(&self) -> Self::Reducer { + self.base.to_reducer() + } +} diff --git a/src/iter/try_reduce.rs b/src/iter/try_reduce.rs new file mode 100644 index 000000000..6d9513dc5 --- /dev/null +++ b/src/iter/try_reduce.rs @@ -0,0 +1,124 @@ +use super::ParallelIterator; +use super::plumbing::*; + +use super::private::Try; +use std::sync::atomic::{AtomicBool, Ordering}; + +pub fn try_reduce(pi: PI, identity: ID, reduce_op: R) -> T + where PI: ParallelIterator, + R: Fn(T::Ok, T::Ok) -> T + Sync, + ID: Fn() -> T::Ok + Sync, + T: Try + Send +{ + let full = AtomicBool::new(false); + let consumer = TryReduceConsumer { + identity: &identity, + reduce_op: &reduce_op, + full: &full, + }; + pi.drive_unindexed(consumer) +} + +struct TryReduceConsumer<'r, R: 'r, ID: 'r> { + identity: &'r ID, + reduce_op: &'r R, + full: &'r AtomicBool, +} + +impl<'r, R, ID> Copy for TryReduceConsumer<'r, R, ID> {} + +impl<'r, R, ID> Clone for TryReduceConsumer<'r, R, ID> { + fn clone(&self) -> Self { + *self + } +} + +impl<'r, R, ID, T> Consumer for TryReduceConsumer<'r, R, ID> + where R: Fn(T::Ok, T::Ok) -> T + Sync, + ID: Fn() -> T::Ok + Sync, + T: Try + Send +{ + type Folder = TryReduceFolder<'r, R, T>; + type Reducer = Self; + type Result = T; + + fn split_at(self, _index: usize) -> (Self, Self, Self) { + (self, self, self) + } + + fn into_folder(self) -> Self::Folder { + TryReduceFolder { + reduce_op: self.reduce_op, + result: Ok((self.identity)()), + full: self.full, + } + } + + fn full(&self) -> bool { + self.full.load(Ordering::Relaxed) + } +} + +impl<'r, R, ID, T> UnindexedConsumer for TryReduceConsumer<'r, R, ID> + where R: Fn(T::Ok, T::Ok) -> T + Sync, + ID: Fn() -> T::Ok + Sync, + T: Try + Send +{ + fn split_off_left(&self) -> Self { + *self + } + + fn to_reducer(&self) -> Self::Reducer { + *self + } +} + +impl<'r, R, ID, T> Reducer for TryReduceConsumer<'r, R, ID> + where R: Fn(T::Ok, T::Ok) -> T + Sync, + T: Try +{ + fn reduce(self, left: T, right: T) -> T { + match (left.into_result(), right.into_result()) { + (Ok(left), Ok(right)) => (self.reduce_op)(left, right), + (Err(e), _) | (_, Err(e)) => T::from_error(e), + } + } +} + +struct TryReduceFolder<'r, R: 'r, T: Try> { + reduce_op: &'r R, + result: Result, + full: &'r AtomicBool, +} + +impl<'r, R, T> Folder for TryReduceFolder<'r, R, T> + where R: Fn(T::Ok, T::Ok) -> T, + T: Try +{ + type Result = T; + + fn consume(self, item: T) -> Self { + let reduce_op = self.reduce_op; + let result = self.result.and_then(|left| { + reduce_op(left, item.into_result()?).into_result() + }); + if result.is_err() { + self.full.store(true, Ordering::Relaxed) + } + TryReduceFolder { + result: result, + ..self + } + } + + fn complete(self) -> T { + match self.result { + Ok(ok) => T::from_ok(ok), + Err(error) => T::from_error(error), + } + } + + fn full(&self) -> bool { + self.full.load(Ordering::Relaxed) + } +} diff --git a/src/iter/try_reduce_with.rs b/src/iter/try_reduce_with.rs new file mode 100644 index 000000000..fae0c32c4 --- /dev/null +++ b/src/iter/try_reduce_with.rs @@ -0,0 +1,130 @@ +use super::ParallelIterator; +use super::plumbing::*; + +use super::private::Try; +use std::sync::atomic::{AtomicBool, Ordering}; + +pub fn try_reduce_with(pi: PI, reduce_op: R) -> Option + where PI: ParallelIterator, + R: Fn(T::Ok, T::Ok) -> T + Sync, + T: Try + Send +{ + let full = AtomicBool::new(false); + let consumer = TryReduceWithConsumer { + reduce_op: &reduce_op, + full: &full, + }; + pi.drive_unindexed(consumer) +} + +struct TryReduceWithConsumer<'r, R: 'r> { + reduce_op: &'r R, + full: &'r AtomicBool, +} + +impl<'r, R> Copy for TryReduceWithConsumer<'r, R> {} + +impl<'r, R> Clone for TryReduceWithConsumer<'r, R> { + fn clone(&self) -> Self { + *self + } +} + +impl<'r, R, T> Consumer for TryReduceWithConsumer<'r, R> + where R: Fn(T::Ok, T::Ok) -> T + Sync, + T: Try + Send +{ + type Folder = TryReduceWithFolder<'r, R, T>; + type Reducer = Self; + type Result = Option; + + fn split_at(self, _index: usize) -> (Self, Self, Self) { + (self, self, self) + } + + fn into_folder(self) -> Self::Folder { + TryReduceWithFolder { + reduce_op: self.reduce_op, + opt_result: None, + full: self.full, + } + } + + fn full(&self) -> bool { + self.full.load(Ordering::Relaxed) + } +} + +impl<'r, R, T> UnindexedConsumer for TryReduceWithConsumer<'r, R> + where R: Fn(T::Ok, T::Ok) -> T + Sync, + T: Try + Send +{ + fn split_off_left(&self) -> Self { + *self + } + + fn to_reducer(&self) -> Self::Reducer { + *self + } +} + +impl<'r, R, T> Reducer> for TryReduceWithConsumer<'r, R> + where R: Fn(T::Ok, T::Ok) -> T + Sync, + T: Try +{ + fn reduce(self, left: Option, right: Option) -> Option { + let reduce_op = self.reduce_op; + match (left, right) { + (None, x) | (x, None) => x, + (Some(a), Some(b)) => { + match (a.into_result(), b.into_result()) { + (Ok(a), Ok(b)) => Some(reduce_op(a, b)), + (Err(e), _) | (_, Err(e)) => Some(T::from_error(e)), + } + } + } + } +} + +struct TryReduceWithFolder<'r, R: 'r, T: Try> { + reduce_op: &'r R, + opt_result: Option>, + full: &'r AtomicBool, +} + +impl<'r, R, T> Folder for TryReduceWithFolder<'r, R, T> + where R: Fn(T::Ok, T::Ok) -> T, + T: Try +{ + type Result = Option; + + fn consume(self, item: T) -> Self { + let reduce_op = self.reduce_op; + let result = match self.opt_result { + None => item.into_result(), + Some(Ok(a)) => match item.into_result() { + Ok(b) => reduce_op(a, b).into_result(), + Err(e) => Err(e), + }, + Some(Err(e)) => Err(e), + }; + if result.is_err() { + self.full.store(true, Ordering::Relaxed) + } + TryReduceWithFolder { + opt_result: Some(result), + ..self + } + } + + fn complete(self) -> Option { + self.opt_result.map(|result| match result { + Ok(ok) => T::from_ok(ok), + Err(error) => T::from_error(error), + }) + } + + fn full(&self) -> bool { + self.full.load(Ordering::Relaxed) + } +} diff --git a/tests/clones.rs b/tests/clones.rs index 7b54b91e0..f7054d0c6 100644 --- a/tests/clones.rs +++ b/tests/clones.rs @@ -114,6 +114,8 @@ fn clone_adaptors() { check(v.par_iter().flatten()); check(v.par_iter().with_max_len(1).fold(|| 0, |x, _| x)); check(v.par_iter().with_max_len(1).fold_with(0, |x, _| x)); + check(v.par_iter().with_max_len(1).try_fold(|| 0, |_, &x| x)); + check(v.par_iter().with_max_len(1).try_fold_with(0, |_, &x| x)); check(v.par_iter().inspect(|_| ())); check(v.par_iter().update(|_| ())); check(v.par_iter().interleave(&v)); diff --git a/tests/debug.rs b/tests/debug.rs index d5fe3611b..cf499fbda 100644 --- a/tests/debug.rs +++ b/tests/debug.rs @@ -125,6 +125,8 @@ fn debug_adaptors() { check(v.par_iter().map(Some).flatten()); check(v.par_iter().fold(|| 0, |x, _| x)); check(v.par_iter().fold_with(0, |x, _| x)); + check(v.par_iter().try_fold(|| 0, |x, _| Some(x))); + check(v.par_iter().try_fold_with(0, |x, _| Some(x))); check(v.par_iter().inspect(|_| ())); check(v.par_iter().update(|_| ())); check(v.par_iter().interleave(&v));