Skip to content

Commit

Permalink
Document ParallelIterator::try_*
Browse files Browse the repository at this point in the history
  • Loading branch information
cuviper committed Jun 12, 2018
1 parent 2e796e3 commit 900bfbc
Showing 1 changed file with 162 additions and 6 deletions.
168 changes: 162 additions & 6 deletions src/iter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@
//! [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
//! [split]: fn.split.html
//! [plumbing]: plumbing
//!
//! Note: Several of the `ParallelIterator` methods rely on a `Try` trait which
//! has been deliberately obscured from the public API. This trait is intended
//! to mirror the unstable `std::ops::Try` with implementations for `Option` and
//! `Result`, where `Some`/`Ok` values will let those iterators continue, but
//! `None`/`Err` values will exit early.

pub use either::Either;
use std::cmp::{self, Ordering};
Expand Down Expand Up @@ -366,15 +372,60 @@ pub trait ParallelIterator: Sized + Send {
self.map_with(init, op).for_each(|()| ())
}

/// TODO
/// Executes a fallible `OP` on each item produced by the iterator, in parallel.
///
/// If the `OP` returns `Result::Err` or `Option::None`, we will attempt to
/// stop processing the rest of the items in the iterator as soon as
/// possible, and we will return that terminating value. Otherwise, we will
/// return an empty `Result::Ok(())` or `Option::Some(())`. If there are
/// multiple errors in parallel, it is not specified which will be returned.
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
/// use std::io::{self, Write};
///
/// // This will stop iteration early if there's any write error, like
/// // having piped output get closed on the other end.
/// (0..100).into_par_iter()
/// .try_for_each(|x| writeln!(io::stdout(), "{:?}", x))
/// .expect("expected no write errors");
/// ```
fn try_for_each<OP, R>(self, op: OP) -> R
where OP: Fn(Self::Item) -> R + Sync + Send,
R: Try<Ok = ()> + Send
{
self.map(op).try_reduce(|| (), |(), ()| R::from_ok(()))
}

/// TODO
/// Executes a fallible `OP` on the given `init` value with each item
/// produced by the iterator, in parallel.
///
/// This combines the `init` semantics of [`for_each_with()`] and the
/// failure semantics of [`try_for_each()`].
///
/// [`for_each_with()`]: #method.for_each_with
/// [`try_for_each()`]: #method.try_for_each
///
/// # Examples
///
/// ```
/// use std::sync::mpsc::channel;
/// use rayon::prelude::*;
///
/// let (sender, receiver) = channel();
///
/// (0..5).into_par_iter()
/// .try_for_each_with(sender, |s, x| s.send(x))
/// .expect("expected no send errors");
///
/// let mut res: Vec<_> = receiver.iter().collect();
///
/// res.sort();
///
/// assert_eq!(&res[..], &[0, 1, 2, 3, 4])
/// ```
fn try_for_each_with<OP, T, R>(self, init: T, op: OP) -> R
where OP: Fn(&mut T, Self::Item) -> R + Sync + Send,
T: Send + Clone,
Expand Down Expand Up @@ -699,7 +750,37 @@ pub trait ParallelIterator: Sized + Send {
})
}

/// TODO
/// Reduces the items in the iterator into one item using a fallible `op`.
/// The `identity` argument is used the same way as in [`reduce()`].
///
/// [`reduce()`]: #method.reduce
///
/// If a `Result::Err` or `Option::None` item is found, or if `op` reduces
/// to one, we will attempt to stop processing the rest of the items in the
/// iterator as soon as possible, and we will return that terminating value.
/// Otherwise, we will return the final reduced `Result::Ok(T)` or
/// `Option::Some(T)`. If there are multiple errors in parallel, it is not
/// specified which will be returned.
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
///
/// // Compute the sum of squares, being careful about overflow.
/// fn sum_squares<I: IntoParallelIterator<Item = i32>>(iter: I) -> Option<i32> {
/// iter.into_par_iter()
/// .map(|i| i.checked_mul(i)) // square each item,
/// .try_reduce(|| 0, i32::checked_add) // and add them up!
/// }
/// assert_eq!(sum_squares(0..5), Some(0 + 1 + 4 + 9 + 16));
///
/// // The sum might overflow
/// assert_eq!(sum_squares(0..10_000), None);
///
/// // Or the squares might overflow before it even reaches `try_reduce`
/// assert_eq!(sum_squares(1_000_000..1_000_001), None);
/// ```
fn try_reduce<T, OP, ID>(self, identity: ID, op: OP) -> Self::Item
where OP: Fn(T, T) -> Self::Item + Sync + Send,
ID: Fn() -> T + Sync + Send,
Expand All @@ -708,7 +789,41 @@ pub trait ParallelIterator: Sized + Send {
try_reduce::try_reduce(self, identity, op)
}

/// TODO
/// Reduces the items in the iterator into one item using a fallible `op`.
///
/// Like [`reduce_with()`], if the iterator is empty, `None` is returned;
/// otherwise, `Some` is returned. Beyond that, it behaves like
/// [`try_reduce()`] for handling `Err`/`None`.
///
/// [`reduce_with()`]: #method.reduce_with
/// [`try_reduce()`]: #method.try_reduce
///
/// For instance, with `Option` items, the return value may be:
/// - `None`, the iterator was empty
/// - `Some(None)`, we stopped after encountering `None`.
/// - `Some(Some(x))`, the entire iterator reduced to `x`.
///
/// With `Result` items, the nesting is more obvious:
/// - `None`, the iterator was empty
/// - `Some(Err(e))`, we stopped after encountering an error `e`.
/// - `Some(Ok(x))`, the entire iterator reduced to `x`.
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
///
/// let files = ["/dev/null", "/does/not/exist"];
///
/// // Find the biggest file
/// files.into_par_iter()
/// .map(|path| std::fs::metadata(path).map(|m| (path, m.len())))
/// .try_reduce_with(|a, b| {
/// Ok(if a.1 >= b.1 { a } else { b })
/// })
/// .expect("Some value, since the iterator is not empty")
/// .expect_err("not found");
/// ```
fn try_reduce_with<T, OP>(self, op: OP) -> Option<Self::Item>
where OP: Fn(T, T) -> Self::Item + Sync + Send,
Self::Item: Try<Ok = T>,
Expand Down Expand Up @@ -883,7 +998,31 @@ pub trait ParallelIterator: Sized + Send {
fold::fold_with(self, init, fold_op)
}

/// TODO
/// Perform a fallible parallel fold.
///
/// This is a variation of [`fold()`] for operations which can fail with
/// `Option::None` or `Result::Err`. The first such failure stops
/// processing the local set of items, without affecting other folds in the
/// iterator's subdivisions.
///
/// Often, `try_fold()` will be followed by [`try_reduce()`]
/// for a final reduction and global short-circuiting effect.
///
/// [`fold()`]: #method.fold
/// [`try_reduce()`]: #method.try_reduce
///
/// # Examples
///
/// ```
/// use rayon::prelude::*;
///
/// let bytes = 0..22_u8;
/// let sum = bytes.into_par_iter()
/// .try_fold(|| 0_u32, |a: u32, b: u8| a.checked_add(b as u32))
/// .try_reduce(|| 0, u32::checked_add);
///
/// assert_eq!(sum, Some((0..22).sum())); // compare to sequential
/// ```
fn try_fold<T, R, ID, F>(self, identity: ID, fold_op: F) -> TryFold<Self, R, ID, F>
where F: Fn(T, Self::Item) -> R + Sync + Send,
ID: Fn() -> T + Sync + Send,
Expand All @@ -892,7 +1031,24 @@ pub trait ParallelIterator: Sized + Send {
try_fold::try_fold(self, identity, fold_op)
}

/// TODO
/// Perform a fallible parallel fold with a cloneable `init` value.
///
/// This combines the `init` semantics of [`fold_with()`] and the failure
/// semantics of [`try_fold()`].
///
/// [`fold_with()`]: #method.fold_with
/// [`try_fold()`]: #method.try_fold
///
/// ```
/// use rayon::prelude::*;
///
/// let bytes = 0..22_u8;
/// let sum = bytes.into_par_iter()
/// .try_fold_with(0_u32, |a: u32, b: u8| a.checked_add(b as u32))
/// .try_reduce(|| 0, u32::checked_add);
///
/// assert_eq!(sum, Some((0..22).sum())); // compare to sequential
/// ```
fn try_fold_with<F, T, R>(self, init: T, fold_op: F) -> TryFoldWith<Self, R, F>
where F: Fn(T, Self::Item) -> R + Sync + Send,
R: Try<Ok = T> + Send,
Expand Down

0 comments on commit 900bfbc

Please sign in to comment.