From 5686a22d42d371cafabb357718d80a8d793b9c0f Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Fri, 25 Dec 2020 15:48:54 +0100 Subject: [PATCH] chore: create stream_ext module --- tokio-stream/src/lib.rs | 918 +----------------- tokio-stream/src/stream_ext.rs | 917 +++++++++++++++++ tokio-stream/src/{ => stream_ext}/all.rs | 0 tokio-stream/src/{ => stream_ext}/any.rs | 0 tokio-stream/src/{ => stream_ext}/chain.rs | 3 +- tokio-stream/src/{ => stream_ext}/collect.rs | 0 tokio-stream/src/{ => stream_ext}/filter.rs | 0 .../src/{ => stream_ext}/filter_map.rs | 0 tokio-stream/src/{ => stream_ext}/fold.rs | 0 tokio-stream/src/{ => stream_ext}/fuse.rs | 0 tokio-stream/src/{ => stream_ext}/map.rs | 0 tokio-stream/src/{ => stream_ext}/merge.rs | 3 +- tokio-stream/src/{ => stream_ext}/next.rs | 0 tokio-stream/src/{ => stream_ext}/skip.rs | 0 .../src/{ => stream_ext}/skip_while.rs | 0 tokio-stream/src/{ => stream_ext}/take.rs | 0 .../src/{ => stream_ext}/take_while.rs | 0 tokio-stream/src/{ => stream_ext}/throttle.rs | 0 tokio-stream/src/{ => stream_ext}/timeout.rs | 3 +- tokio-stream/src/{ => stream_ext}/try_next.rs | 3 +- 20 files changed, 927 insertions(+), 920 deletions(-) create mode 100644 tokio-stream/src/stream_ext.rs rename tokio-stream/src/{ => stream_ext}/all.rs (100%) rename tokio-stream/src/{ => stream_ext}/any.rs (100%) rename tokio-stream/src/{ => stream_ext}/chain.rs (95%) rename tokio-stream/src/{ => stream_ext}/collect.rs (100%) rename tokio-stream/src/{ => stream_ext}/filter.rs (100%) rename tokio-stream/src/{ => stream_ext}/filter_map.rs (100%) rename tokio-stream/src/{ => stream_ext}/fold.rs (100%) rename tokio-stream/src/{ => stream_ext}/fuse.rs (100%) rename tokio-stream/src/{ => stream_ext}/map.rs (100%) rename tokio-stream/src/{ => stream_ext}/merge.rs (97%) rename tokio-stream/src/{ => stream_ext}/next.rs (100%) rename tokio-stream/src/{ => stream_ext}/skip.rs (100%) rename tokio-stream/src/{ => stream_ext}/skip_while.rs (100%) rename tokio-stream/src/{ => stream_ext}/take.rs (100%) rename tokio-stream/src/{ => stream_ext}/take_while.rs (100%) rename tokio-stream/src/{ => stream_ext}/throttle.rs (100%) rename tokio-stream/src/{ => stream_ext}/timeout.rs (97%) rename tokio-stream/src/{ => stream_ext}/try_next.rs (95%) diff --git a/tokio-stream/src/lib.rs b/tokio-stream/src/lib.rs index dbe865054f0..3c6d84ee469 100644 --- a/tokio-stream/src/lib.rs +++ b/tokio-stream/src/lib.rs @@ -81,46 +81,15 @@ #[macro_use] mod macros; -mod all; -use all::AllFuture; - -mod any; -use any::AnyFuture; - -mod chain; -use chain::Chain; - -mod collect; -use collect::Collect; -pub use collect::FromStream; +mod stream_ext; +pub use stream_ext::{collect::FromStream, StreamExt}; mod empty; pub use empty::{empty, Empty}; -mod filter; -use filter::Filter; - -mod filter_map; -use filter_map::FilterMap; - -mod fold; -use fold::FoldFuture; - -mod fuse; -use fuse::Fuse; - mod iter; pub use iter::{iter, Iter}; -mod map; -use map::Map; - -mod merge; -use merge::Merge; - -mod next; -use next::Next; - mod once; pub use once::{once, Once}; @@ -130,888 +99,5 @@ pub use pending::{pending, Pending}; mod stream_map; pub use stream_map::StreamMap; -mod skip; -use skip::Skip; - -mod skip_while; -use skip_while::SkipWhile; - -mod try_next; -use try_next::TryNext; - -mod take; -use take::Take; - -mod take_while; -use take_while::TakeWhile; - -cfg_time! { - mod timeout; - use timeout::Timeout; - use tokio::time::Duration; - mod throttle; - use crate::throttle::{throttle, Throttle}; -} - #[doc(no_inline)] pub use futures_core::Stream; - -/// An extension trait for the [`Stream`] trait that provides a variety of -/// convenient combinator functions. -/// -/// Be aware that the `Stream` trait in Tokio is a re-export of the trait found -/// in the [futures] crate, however both Tokio and futures provide separate -/// `StreamExt` utility traits, and some utilities are only available on one of -/// these traits. Click [here][futures-StreamExt] to see the other `StreamExt` -/// trait in the futures crate. -/// -/// If you need utilities from both `StreamExt` traits, you should prefer to -/// import one of them, and use the other through the fully qualified call -/// syntax. For example: -/// ``` -/// // import one of the traits: -/// use futures::stream::StreamExt; -/// # #[tokio::main(flavor = "current_thread")] -/// # async fn main() { -/// -/// let a = tokio_stream::iter(vec![1, 3, 5]); -/// let b = tokio_stream::iter(vec![2, 4, 6]); -/// -/// // use the fully qualified call syntax for the other trait: -/// let merged = tokio_stream::StreamExt::merge(a, b); -/// -/// // use normal call notation for futures::stream::StreamExt::collect -/// let output: Vec<_> = merged.collect().await; -/// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]); -/// # } -/// ``` -/// -/// [`Stream`]: crate::Stream -/// [futures]: https://docs.rs/futures -/// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html -pub trait StreamExt: Stream { - /// Consumes and returns the next value in the stream or `None` if the - /// stream is finished. - /// - /// Equivalent to: - /// - /// ```ignore - /// async fn next(&mut self) -> Option; - /// ``` - /// - /// Note that because `next` doesn't take ownership over the stream, - /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a - /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can - /// be done by boxing the stream using [`Box::pin`] or - /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` - /// crate. - /// - /// # Examples - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let mut stream = stream::iter(1..=3); - /// - /// assert_eq!(stream.next().await, Some(1)); - /// assert_eq!(stream.next().await, Some(2)); - /// assert_eq!(stream.next().await, Some(3)); - /// assert_eq!(stream.next().await, None); - /// # } - /// ``` - fn next(&mut self) -> Next<'_, Self> - where - Self: Unpin, - { - Next::new(self) - } - - /// Consumes and returns the next item in the stream. If an error is - /// encountered before the next item, the error is returned instead. - /// - /// Equivalent to: - /// - /// ```ignore - /// async fn try_next(&mut self) -> Result, E>; - /// ``` - /// - /// This is similar to the [`next`](StreamExt::next) combinator, - /// but returns a [`Result, E>`](Result) rather than - /// an [`Option>`](Option), making for easy use - /// with the [`?`](std::ops::Try) operator. - /// - /// # Examples - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]); - /// - /// assert_eq!(stream.try_next().await, Ok(Some(1))); - /// assert_eq!(stream.try_next().await, Ok(Some(2))); - /// assert_eq!(stream.try_next().await, Err("nope")); - /// # } - /// ``` - fn try_next(&mut self) -> TryNext<'_, Self> - where - Self: Stream> + Unpin, - { - TryNext::new(self) - } - - /// Maps this stream's items to a different type, returning a new stream of - /// the resulting type. - /// - /// The provided closure is executed over all elements of this stream as - /// they are made available. It is executed inline with calls to - /// [`poll_next`](Stream::poll_next). - /// - /// Note that this function consumes the stream passed into it and returns a - /// wrapped version of it, similar to the existing `map` methods in the - /// standard library. - /// - /// # Examples - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let stream = stream::iter(1..=3); - /// let mut stream = stream.map(|x| x + 3); - /// - /// assert_eq!(stream.next().await, Some(4)); - /// assert_eq!(stream.next().await, Some(5)); - /// assert_eq!(stream.next().await, Some(6)); - /// # } - /// ``` - fn map(self, f: F) -> Map - where - F: FnMut(Self::Item) -> T, - Self: Sized, - { - Map::new(self, f) - } - - /// Combine two streams into one by interleaving the output of both as it - /// is produced. - /// - /// Values are produced from the merged stream in the order they arrive from - /// the two source streams. If both source streams provide values - /// simultaneously, the merge stream alternates between them. This provides - /// some level of fairness. You should not chain calls to `merge`, as this - /// will break the fairness of the merging. - /// - /// The merged stream completes once **both** source streams complete. When - /// one source stream completes before the other, the merge stream - /// exclusively polls the remaining stream. - /// - /// For merging multiple streams, consider using [`StreamMap`] instead. - /// - /// [`StreamMap`]: crate::StreamMap - /// - /// # Examples - /// - /// ``` - /// use tokio_stream::{StreamExt, Stream}; - /// use tokio::sync::mpsc; - /// use tokio::time; - /// - /// use std::time::Duration; - /// use std::pin::Pin; - /// - /// # /* - /// #[tokio::main] - /// # */ - /// # #[tokio::main(flavor = "current_thread")] - /// async fn main() { - /// # time::pause(); - /// let (tx1, mut rx1) = mpsc::channel::(10); - /// let (tx2, mut rx2) = mpsc::channel::(10); - /// - /// // Convert the channels to a `Stream`. - /// let rx1 = Box::pin(async_stream::stream! { - /// while let Some(item) = rx1.recv().await { - /// yield item; - /// } - /// }) as Pin + Send>>; - /// - /// let rx2 = Box::pin(async_stream::stream! { - /// while let Some(item) = rx2.recv().await { - /// yield item; - /// } - /// }) as Pin + Send>>; - /// - /// let mut rx = rx1.merge(rx2); - /// - /// tokio::spawn(async move { - /// // Send some values immediately - /// tx1.send(1).await.unwrap(); - /// tx1.send(2).await.unwrap(); - /// - /// // Let the other task send values - /// time::sleep(Duration::from_millis(20)).await; - /// - /// tx1.send(4).await.unwrap(); - /// }); - /// - /// tokio::spawn(async move { - /// // Wait for the first task to send values - /// time::sleep(Duration::from_millis(5)).await; - /// - /// tx2.send(3).await.unwrap(); - /// - /// time::sleep(Duration::from_millis(25)).await; - /// - /// // Send the final value - /// tx2.send(5).await.unwrap(); - /// }); - /// - /// assert_eq!(1, rx.next().await.unwrap()); - /// assert_eq!(2, rx.next().await.unwrap()); - /// assert_eq!(3, rx.next().await.unwrap()); - /// assert_eq!(4, rx.next().await.unwrap()); - /// assert_eq!(5, rx.next().await.unwrap()); - /// - /// // The merged stream is consumed - /// assert!(rx.next().await.is_none()); - /// } - /// ``` - fn merge(self, other: U) -> Merge - where - U: Stream, - Self: Sized, - { - Merge::new(self, other) - } - - /// Filters the values produced by this stream according to the provided - /// predicate. - /// - /// As values of this stream are made available, the provided predicate `f` - /// will be run against them. If the predicate - /// resolves to `true`, then the stream will yield the value, but if the - /// predicate resolves to `false`, then the value - /// will be discarded and the next value will be produced. - /// - /// Note that this function consumes the stream passed into it and returns a - /// wrapped version of it, similar to [`Iterator::filter`] method in the - /// standard library. - /// - /// # Examples - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let stream = stream::iter(1..=8); - /// let mut evens = stream.filter(|x| x % 2 == 0); - /// - /// assert_eq!(Some(2), evens.next().await); - /// assert_eq!(Some(4), evens.next().await); - /// assert_eq!(Some(6), evens.next().await); - /// assert_eq!(Some(8), evens.next().await); - /// assert_eq!(None, evens.next().await); - /// # } - /// ``` - fn filter(self, f: F) -> Filter - where - F: FnMut(&Self::Item) -> bool, - Self: Sized, - { - Filter::new(self, f) - } - - /// Filters the values produced by this stream while simultaneously mapping - /// them to a different type according to the provided closure. - /// - /// As values of this stream are made available, the provided function will - /// be run on them. If the predicate `f` resolves to - /// [`Some(item)`](Some) then the stream will yield the value `item`, but if - /// it resolves to [`None`], then the value will be skipped. - /// - /// Note that this function consumes the stream passed into it and returns a - /// wrapped version of it, similar to [`Iterator::filter_map`] method in the - /// standard library. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let stream = stream::iter(1..=8); - /// let mut evens = stream.filter_map(|x| { - /// if x % 2 == 0 { Some(x + 1) } else { None } - /// }); - /// - /// assert_eq!(Some(3), evens.next().await); - /// assert_eq!(Some(5), evens.next().await); - /// assert_eq!(Some(7), evens.next().await); - /// assert_eq!(Some(9), evens.next().await); - /// assert_eq!(None, evens.next().await); - /// # } - /// ``` - fn filter_map(self, f: F) -> FilterMap - where - F: FnMut(Self::Item) -> Option, - Self: Sized, - { - FilterMap::new(self, f) - } - - /// Creates a stream which ends after the first `None`. - /// - /// After a stream returns `None`, behavior is undefined. Future calls to - /// `poll_next` may or may not return `Some(T)` again or they may panic. - /// `fuse()` adapts a stream, ensuring that after `None` is given, it will - /// return `None` forever. - /// - /// # Examples - /// - /// ``` - /// use tokio_stream::{Stream, StreamExt}; - /// - /// use std::pin::Pin; - /// use std::task::{Context, Poll}; - /// - /// // a stream which alternates between Some and None - /// struct Alternate { - /// state: i32, - /// } - /// - /// impl Stream for Alternate { - /// type Item = i32; - /// - /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - /// let val = self.state; - /// self.state = self.state + 1; - /// - /// // if it's even, Some(i32), else None - /// if val % 2 == 0 { - /// Poll::Ready(Some(val)) - /// } else { - /// Poll::Ready(None) - /// } - /// } - /// } - /// - /// #[tokio::main] - /// async fn main() { - /// let mut stream = Alternate { state: 0 }; - /// - /// // the stream goes back and forth - /// assert_eq!(stream.next().await, Some(0)); - /// assert_eq!(stream.next().await, None); - /// assert_eq!(stream.next().await, Some(2)); - /// assert_eq!(stream.next().await, None); - /// - /// // however, once it is fused - /// let mut stream = stream.fuse(); - /// - /// assert_eq!(stream.next().await, Some(4)); - /// assert_eq!(stream.next().await, None); - /// - /// // it will always return `None` after the first time. - /// assert_eq!(stream.next().await, None); - /// assert_eq!(stream.next().await, None); - /// assert_eq!(stream.next().await, None); - /// } - /// ``` - fn fuse(self) -> Fuse - where - Self: Sized, - { - Fuse::new(self) - } - - /// Creates a new stream of at most `n` items of the underlying stream. - /// - /// Once `n` items have been yielded from this stream then it will always - /// return that the stream is done. - /// - /// # Examples - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let mut stream = stream::iter(1..=10).take(3); - /// - /// assert_eq!(Some(1), stream.next().await); - /// assert_eq!(Some(2), stream.next().await); - /// assert_eq!(Some(3), stream.next().await); - /// assert_eq!(None, stream.next().await); - /// # } - /// ``` - fn take(self, n: usize) -> Take - where - Self: Sized, - { - Take::new(self, n) - } - - /// Take elements from this stream while the provided predicate - /// resolves to `true`. - /// - /// This function, like `Iterator::take_while`, will take elements from the - /// stream until the predicate `f` resolves to `false`. Once one element - /// returns false it will always return that the stream is done. - /// - /// # Examples - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3); - /// - /// assert_eq!(Some(1), stream.next().await); - /// assert_eq!(Some(2), stream.next().await); - /// assert_eq!(Some(3), stream.next().await); - /// assert_eq!(None, stream.next().await); - /// # } - /// ``` - fn take_while(self, f: F) -> TakeWhile - where - F: FnMut(&Self::Item) -> bool, - Self: Sized, - { - TakeWhile::new(self, f) - } - - /// Creates a new stream that will skip the `n` first items of the - /// underlying stream. - /// - /// # Examples - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let mut stream = stream::iter(1..=10).skip(7); - /// - /// assert_eq!(Some(8), stream.next().await); - /// assert_eq!(Some(9), stream.next().await); - /// assert_eq!(Some(10), stream.next().await); - /// assert_eq!(None, stream.next().await); - /// # } - /// ``` - fn skip(self, n: usize) -> Skip - where - Self: Sized, - { - Skip::new(self, n) - } - - /// Skip elements from the underlying stream while the provided predicate - /// resolves to `true`. - /// - /// This function, like [`Iterator::skip_while`], will ignore elemets from the - /// stream until the predicate `f` resolves to `false`. Once one element - /// returns false, the rest of the elements will be yielded. - /// - /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while() - /// - /// # Examples - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3); - /// - /// assert_eq!(Some(3), stream.next().await); - /// assert_eq!(Some(4), stream.next().await); - /// assert_eq!(Some(1), stream.next().await); - /// assert_eq!(None, stream.next().await); - /// # } - /// ``` - fn skip_while(self, f: F) -> SkipWhile - where - F: FnMut(&Self::Item) -> bool, - Self: Sized, - { - SkipWhile::new(self, f) - } - - /// Tests if every element of the stream matches a predicate. - /// - /// Equivalent to: - /// - /// ```ignore - /// async fn all(&mut self, f: F) -> bool; - /// ``` - /// - /// `all()` takes a closure that returns `true` or `false`. It applies - /// this closure to each element of the stream, and if they all return - /// `true`, then so does `all`. If any of them return `false`, it - /// returns `false`. An empty stream returns `true`. - /// - /// `all()` is short-circuiting; in other words, it will stop processing - /// as soon as it finds a `false`, given that no matter what else happens, - /// the result will also be `false`. - /// - /// An empty stream returns `true`. - /// - /// # Examples - /// - /// Basic usage: - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let a = [1, 2, 3]; - /// - /// assert!(stream::iter(&a).all(|&x| x > 0).await); - /// - /// assert!(!stream::iter(&a).all(|&x| x > 2).await); - /// # } - /// ``` - /// - /// Stopping at the first `false`: - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let a = [1, 2, 3]; - /// - /// let mut iter = stream::iter(&a); - /// - /// assert!(!iter.all(|&x| x != 2).await); - /// - /// // we can still use `iter`, as there are more elements. - /// assert_eq!(iter.next().await, Some(&3)); - /// # } - /// ``` - fn all(&mut self, f: F) -> AllFuture<'_, Self, F> - where - Self: Unpin, - F: FnMut(Self::Item) -> bool, - { - AllFuture::new(self, f) - } - - /// Tests if any element of the stream matches a predicate. - /// - /// Equivalent to: - /// - /// ```ignore - /// async fn any(&mut self, f: F) -> bool; - /// ``` - /// - /// `any()` takes a closure that returns `true` or `false`. It applies - /// this closure to each element of the stream, and if any of them return - /// `true`, then so does `any()`. If they all return `false`, it - /// returns `false`. - /// - /// `any()` is short-circuiting; in other words, it will stop processing - /// as soon as it finds a `true`, given that no matter what else happens, - /// the result will also be `true`. - /// - /// An empty stream returns `false`. - /// - /// Basic usage: - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let a = [1, 2, 3]; - /// - /// assert!(stream::iter(&a).any(|&x| x > 0).await); - /// - /// assert!(!stream::iter(&a).any(|&x| x > 5).await); - /// # } - /// ``` - /// - /// Stopping at the first `true`: - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// let a = [1, 2, 3]; - /// - /// let mut iter = stream::iter(&a); - /// - /// assert!(iter.any(|&x| x != 2).await); - /// - /// // we can still use `iter`, as there are more elements. - /// assert_eq!(iter.next().await, Some(&2)); - /// # } - /// ``` - fn any(&mut self, f: F) -> AnyFuture<'_, Self, F> - where - Self: Unpin, - F: FnMut(Self::Item) -> bool, - { - AnyFuture::new(self, f) - } - - /// Combine two streams into one by first returning all values from the - /// first stream then all values from the second stream. - /// - /// As long as `self` still has values to emit, no values from `other` are - /// emitted, even if some are ready. - /// - /// # Examples - /// - /// ``` - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// #[tokio::main] - /// async fn main() { - /// let one = stream::iter(vec![1, 2, 3]); - /// let two = stream::iter(vec![4, 5, 6]); - /// - /// let mut stream = one.chain(two); - /// - /// assert_eq!(stream.next().await, Some(1)); - /// assert_eq!(stream.next().await, Some(2)); - /// assert_eq!(stream.next().await, Some(3)); - /// assert_eq!(stream.next().await, Some(4)); - /// assert_eq!(stream.next().await, Some(5)); - /// assert_eq!(stream.next().await, Some(6)); - /// assert_eq!(stream.next().await, None); - /// } - /// ``` - fn chain(self, other: U) -> Chain - where - U: Stream, - Self: Sized, - { - Chain::new(self, other) - } - - /// A combinator that applies a function to every element in a stream - /// producing a single, final value. - /// - /// Equivalent to: - /// - /// ```ignore - /// async fn fold(self, init: B, f: F) -> B; - /// ``` - /// - /// # Examples - /// Basic usage: - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, *}; - /// - /// let s = stream::iter(vec![1u8, 2, 3]); - /// let sum = s.fold(0, |acc, x| acc + x).await; - /// - /// assert_eq!(sum, 6); - /// # } - /// ``` - fn fold(self, init: B, f: F) -> FoldFuture - where - Self: Sized, - F: FnMut(B, Self::Item) -> B, - { - FoldFuture::new(self, init, f) - } - - /// Drain stream pushing all emitted values into a collection. - /// - /// Equivalent to: - /// - /// ```ignore - /// async fn collect(self) -> T; - /// ``` - /// - /// `collect` streams all values, awaiting as needed. Values are pushed into - /// a collection. A number of different target collection types are - /// supported, including [`Vec`](std::vec::Vec), - /// [`String`](std::string::String), and [`Bytes`]. - /// - /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html - /// - /// # `Result` - /// - /// `collect()` can also be used with streams of type `Result` where - /// `T: FromStream<_>`. In this case, `collect()` will stream as long as - /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered, - /// streaming is terminated and `collect()` returns the `Err`. - /// - /// # Notes - /// - /// `FromStream` is currently a sealed trait. Stabilization is pending - /// enhancements to the Rust language. - /// - /// # Examples - /// - /// Basic usage: - /// - /// ``` - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// #[tokio::main] - /// async fn main() { - /// let doubled: Vec = - /// stream::iter(vec![1, 2, 3]) - /// .map(|x| x * 2) - /// .collect() - /// .await; - /// - /// assert_eq!(vec![2, 4, 6], doubled); - /// } - /// ``` - /// - /// Collecting a stream of `Result` values - /// - /// ``` - /// use tokio_stream::{self as stream, StreamExt}; - /// - /// #[tokio::main] - /// async fn main() { - /// // A stream containing only `Ok` values will be collected - /// let values: Result, &str> = - /// stream::iter(vec![Ok(1), Ok(2), Ok(3)]) - /// .collect() - /// .await; - /// - /// assert_eq!(Ok(vec![1, 2, 3]), values); - /// - /// // A stream containing `Err` values will return the first error. - /// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")]; - /// - /// let values: Result, &str> = - /// stream::iter(results) - /// .collect() - /// .await; - /// - /// assert_eq!(Err("no"), values); - /// } - /// ``` - fn collect(self) -> Collect - where - T: FromStream, - Self: Sized, - { - Collect::new(self) - } - - /// Applies a per-item timeout to the passed stream. - /// - /// `timeout()` takes a `Duration` that represents the maximum amount of - /// time each element of the stream has to complete before timing out. - /// - /// If the wrapped stream yields a value before the deadline is reached, the - /// value is returned. Otherwise, an error is returned. The caller may decide - /// to continue consuming the stream and will eventually get the next source - /// stream value once it becomes available. - /// - /// # Notes - /// - /// This function consumes the stream passed into it and returns a - /// wrapped version of it. - /// - /// Polling the returned stream will continue to poll the inner stream even - /// if one or more items time out. - /// - /// # Examples - /// - /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3): - /// - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; - /// use std::time::Duration; - /// # let int_stream = stream::iter(1..=3); - /// - /// let int_stream = int_stream.timeout(Duration::from_secs(1)); - /// tokio::pin!(int_stream); - /// - /// // When no items time out, we get the 3 elements in succession: - /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); - /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); - /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); - /// assert_eq!(int_stream.try_next().await, Ok(None)); - /// - /// // If the second item times out, we get an error and continue polling the stream: - /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); - /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); - /// assert!(int_stream.try_next().await.is_err()); - /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); - /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); - /// assert_eq!(int_stream.try_next().await, Ok(None)); - /// - /// // If we want to stop consuming the source stream the first time an - /// // element times out, we can use the `take_while` operator: - /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); - /// let mut int_stream = int_stream.take_while(Result::is_ok); - /// - /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); - /// assert_eq!(int_stream.try_next().await, Ok(None)); - /// # } - /// ``` - #[cfg(all(feature = "time"))] - #[cfg_attr(docsrs, doc(cfg(feature = "time")))] - fn timeout(self, duration: Duration) -> Timeout - where - Self: Sized, - { - Timeout::new(self, duration) - } - - /// Slows down a stream by enforcing a delay between items. - /// - /// # Example - /// - /// Create a throttled stream. - /// ```rust,no_run - /// use std::time::Duration; - /// use tokio_stream::StreamExt; - /// - /// # async fn dox() { - /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2)); - /// tokio::pin!(item_stream); - /// - /// loop { - /// // The string will be produced at most every 2 seconds - /// println!("{:?}", item_stream.next().await); - /// } - /// # } - /// ``` - #[cfg(all(feature = "time"))] - #[cfg_attr(docsrs, doc(cfg(feature = "time")))] - fn throttle(self, duration: Duration) -> Throttle - where - Self: Sized, - { - throttle(duration, self) - } -} - -impl StreamExt for St where St: Stream {} - -/// Merge the size hints from two streams. -fn merge_size_hints( - (left_low, left_high): (usize, Option), - (right_low, right_hign): (usize, Option), -) -> (usize, Option) { - let low = left_low.saturating_add(right_low); - let high = match (left_high, right_hign) { - (Some(h1), Some(h2)) => h1.checked_add(h2), - _ => None, - }; - (low, high) -} diff --git a/tokio-stream/src/stream_ext.rs b/tokio-stream/src/stream_ext.rs new file mode 100644 index 00000000000..51532ee1cb6 --- /dev/null +++ b/tokio-stream/src/stream_ext.rs @@ -0,0 +1,917 @@ +use futures_core::Stream; + +mod all; +use all::AllFuture; + +mod any; +use any::AnyFuture; + +mod chain; +use chain::Chain; + +pub(crate) mod collect; +use collect::{Collect, FromStream}; + +mod filter; +use filter::Filter; + +mod filter_map; +use filter_map::FilterMap; + +mod fold; +use fold::FoldFuture; + +mod fuse; +use fuse::Fuse; + +mod map; +use map::Map; + +mod merge; +use merge::Merge; + +mod next; +use next::Next; + +mod skip; +use skip::Skip; + +mod skip_while; +use skip_while::SkipWhile; + +mod try_next; +use try_next::TryNext; + +mod take; +use take::Take; + +mod take_while; +use take_while::TakeWhile; + +cfg_time! { + mod timeout; + use timeout::Timeout; + use tokio::time::Duration; + mod throttle; + use throttle::{throttle, Throttle}; +} + +/// An extension trait for the [`Stream`] trait that provides a variety of +/// convenient combinator functions. +/// +/// Be aware that the `Stream` trait in Tokio is a re-export of the trait found +/// in the [futures] crate, however both Tokio and futures provide separate +/// `StreamExt` utility traits, and some utilities are only available on one of +/// these traits. Click [here][futures-StreamExt] to see the other `StreamExt` +/// trait in the futures crate. +/// +/// If you need utilities from both `StreamExt` traits, you should prefer to +/// import one of them, and use the other through the fully qualified call +/// syntax. For example: +/// ``` +/// // import one of the traits: +/// use futures::stream::StreamExt; +/// # #[tokio::main(flavor = "current_thread")] +/// # async fn main() { +/// +/// let a = tokio_stream::iter(vec![1, 3, 5]); +/// let b = tokio_stream::iter(vec![2, 4, 6]); +/// +/// // use the fully qualified call syntax for the other trait: +/// let merged = tokio_stream::StreamExt::merge(a, b); +/// +/// // use normal call notation for futures::stream::StreamExt::collect +/// let output: Vec<_> = merged.collect().await; +/// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]); +/// # } +/// ``` +/// +/// [`Stream`]: crate::Stream +/// [futures]: https://docs.rs/futures +/// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html +pub trait StreamExt: Stream { + /// Consumes and returns the next value in the stream or `None` if the + /// stream is finished. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn next(&mut self) -> Option; + /// ``` + /// + /// Note that because `next` doesn't take ownership over the stream, + /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a + /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can + /// be done by boxing the stream using [`Box::pin`] or + /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils` + /// crate. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let mut stream = stream::iter(1..=3); + /// + /// assert_eq!(stream.next().await, Some(1)); + /// assert_eq!(stream.next().await, Some(2)); + /// assert_eq!(stream.next().await, Some(3)); + /// assert_eq!(stream.next().await, None); + /// # } + /// ``` + fn next(&mut self) -> Next<'_, Self> + where + Self: Unpin, + { + Next::new(self) + } + + /// Consumes and returns the next item in the stream. If an error is + /// encountered before the next item, the error is returned instead. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn try_next(&mut self) -> Result, E>; + /// ``` + /// + /// This is similar to the [`next`](StreamExt::next) combinator, + /// but returns a [`Result, E>`](Result) rather than + /// an [`Option>`](Option), making for easy use + /// with the [`?`](std::ops::Try) operator. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]); + /// + /// assert_eq!(stream.try_next().await, Ok(Some(1))); + /// assert_eq!(stream.try_next().await, Ok(Some(2))); + /// assert_eq!(stream.try_next().await, Err("nope")); + /// # } + /// ``` + fn try_next(&mut self) -> TryNext<'_, Self> + where + Self: Stream> + Unpin, + { + TryNext::new(self) + } + + /// Maps this stream's items to a different type, returning a new stream of + /// the resulting type. + /// + /// The provided closure is executed over all elements of this stream as + /// they are made available. It is executed inline with calls to + /// [`poll_next`](Stream::poll_next). + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to the existing `map` methods in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let stream = stream::iter(1..=3); + /// let mut stream = stream.map(|x| x + 3); + /// + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// # } + /// ``` + fn map(self, f: F) -> Map + where + F: FnMut(Self::Item) -> T, + Self: Sized, + { + Map::new(self, f) + } + + /// Combine two streams into one by interleaving the output of both as it + /// is produced. + /// + /// Values are produced from the merged stream in the order they arrive from + /// the two source streams. If both source streams provide values + /// simultaneously, the merge stream alternates between them. This provides + /// some level of fairness. You should not chain calls to `merge`, as this + /// will break the fairness of the merging. + /// + /// The merged stream completes once **both** source streams complete. When + /// one source stream completes before the other, the merge stream + /// exclusively polls the remaining stream. + /// + /// For merging multiple streams, consider using [`StreamMap`] instead. + /// + /// [`StreamMap`]: crate::StreamMap + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{StreamExt, Stream}; + /// use tokio::sync::mpsc; + /// use tokio::time; + /// + /// use std::time::Duration; + /// use std::pin::Pin; + /// + /// # /* + /// #[tokio::main] + /// # */ + /// # #[tokio::main(flavor = "current_thread")] + /// async fn main() { + /// # time::pause(); + /// let (tx1, mut rx1) = mpsc::channel::(10); + /// let (tx2, mut rx2) = mpsc::channel::(10); + /// + /// // Convert the channels to a `Stream`. + /// let rx1 = Box::pin(async_stream::stream! { + /// while let Some(item) = rx1.recv().await { + /// yield item; + /// } + /// }) as Pin + Send>>; + /// + /// let rx2 = Box::pin(async_stream::stream! { + /// while let Some(item) = rx2.recv().await { + /// yield item; + /// } + /// }) as Pin + Send>>; + /// + /// let mut rx = rx1.merge(rx2); + /// + /// tokio::spawn(async move { + /// // Send some values immediately + /// tx1.send(1).await.unwrap(); + /// tx1.send(2).await.unwrap(); + /// + /// // Let the other task send values + /// time::sleep(Duration::from_millis(20)).await; + /// + /// tx1.send(4).await.unwrap(); + /// }); + /// + /// tokio::spawn(async move { + /// // Wait for the first task to send values + /// time::sleep(Duration::from_millis(5)).await; + /// + /// tx2.send(3).await.unwrap(); + /// + /// time::sleep(Duration::from_millis(25)).await; + /// + /// // Send the final value + /// tx2.send(5).await.unwrap(); + /// }); + /// + /// assert_eq!(1, rx.next().await.unwrap()); + /// assert_eq!(2, rx.next().await.unwrap()); + /// assert_eq!(3, rx.next().await.unwrap()); + /// assert_eq!(4, rx.next().await.unwrap()); + /// assert_eq!(5, rx.next().await.unwrap()); + /// + /// // The merged stream is consumed + /// assert!(rx.next().await.is_none()); + /// } + /// ``` + fn merge(self, other: U) -> Merge + where + U: Stream, + Self: Sized, + { + Merge::new(self, other) + } + + /// Filters the values produced by this stream according to the provided + /// predicate. + /// + /// As values of this stream are made available, the provided predicate `f` + /// will be run against them. If the predicate + /// resolves to `true`, then the stream will yield the value, but if the + /// predicate resolves to `false`, then the value + /// will be discarded and the next value will be produced. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to [`Iterator::filter`] method in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let stream = stream::iter(1..=8); + /// let mut evens = stream.filter(|x| x % 2 == 0); + /// + /// assert_eq!(Some(2), evens.next().await); + /// assert_eq!(Some(4), evens.next().await); + /// assert_eq!(Some(6), evens.next().await); + /// assert_eq!(Some(8), evens.next().await); + /// assert_eq!(None, evens.next().await); + /// # } + /// ``` + fn filter(self, f: F) -> Filter + where + F: FnMut(&Self::Item) -> bool, + Self: Sized, + { + Filter::new(self, f) + } + + /// Filters the values produced by this stream while simultaneously mapping + /// them to a different type according to the provided closure. + /// + /// As values of this stream are made available, the provided function will + /// be run on them. If the predicate `f` resolves to + /// [`Some(item)`](Some) then the stream will yield the value `item`, but if + /// it resolves to [`None`], then the value will be skipped. + /// + /// Note that this function consumes the stream passed into it and returns a + /// wrapped version of it, similar to [`Iterator::filter_map`] method in the + /// standard library. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let stream = stream::iter(1..=8); + /// let mut evens = stream.filter_map(|x| { + /// if x % 2 == 0 { Some(x + 1) } else { None } + /// }); + /// + /// assert_eq!(Some(3), evens.next().await); + /// assert_eq!(Some(5), evens.next().await); + /// assert_eq!(Some(7), evens.next().await); + /// assert_eq!(Some(9), evens.next().await); + /// assert_eq!(None, evens.next().await); + /// # } + /// ``` + fn filter_map(self, f: F) -> FilterMap + where + F: FnMut(Self::Item) -> Option, + Self: Sized, + { + FilterMap::new(self, f) + } + + /// Creates a stream which ends after the first `None`. + /// + /// After a stream returns `None`, behavior is undefined. Future calls to + /// `poll_next` may or may not return `Some(T)` again or they may panic. + /// `fuse()` adapts a stream, ensuring that after `None` is given, it will + /// return `None` forever. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{Stream, StreamExt}; + /// + /// use std::pin::Pin; + /// use std::task::{Context, Poll}; + /// + /// // a stream which alternates between Some and None + /// struct Alternate { + /// state: i32, + /// } + /// + /// impl Stream for Alternate { + /// type Item = i32; + /// + /// fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + /// let val = self.state; + /// self.state = self.state + 1; + /// + /// // if it's even, Some(i32), else None + /// if val % 2 == 0 { + /// Poll::Ready(Some(val)) + /// } else { + /// Poll::Ready(None) + /// } + /// } + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// let mut stream = Alternate { state: 0 }; + /// + /// // the stream goes back and forth + /// assert_eq!(stream.next().await, Some(0)); + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, Some(2)); + /// assert_eq!(stream.next().await, None); + /// + /// // however, once it is fused + /// let mut stream = stream.fuse(); + /// + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, None); + /// + /// // it will always return `None` after the first time. + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, None); + /// assert_eq!(stream.next().await, None); + /// } + /// ``` + fn fuse(self) -> Fuse + where + Self: Sized, + { + Fuse::new(self) + } + + /// Creates a new stream of at most `n` items of the underlying stream. + /// + /// Once `n` items have been yielded from this stream then it will always + /// return that the stream is done. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let mut stream = stream::iter(1..=10).take(3); + /// + /// assert_eq!(Some(1), stream.next().await); + /// assert_eq!(Some(2), stream.next().await); + /// assert_eq!(Some(3), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn take(self, n: usize) -> Take + where + Self: Sized, + { + Take::new(self, n) + } + + /// Take elements from this stream while the provided predicate + /// resolves to `true`. + /// + /// This function, like `Iterator::take_while`, will take elements from the + /// stream until the predicate `f` resolves to `false`. Once one element + /// returns false it will always return that the stream is done. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3); + /// + /// assert_eq!(Some(1), stream.next().await); + /// assert_eq!(Some(2), stream.next().await); + /// assert_eq!(Some(3), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn take_while(self, f: F) -> TakeWhile + where + F: FnMut(&Self::Item) -> bool, + Self: Sized, + { + TakeWhile::new(self, f) + } + + /// Creates a new stream that will skip the `n` first items of the + /// underlying stream. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let mut stream = stream::iter(1..=10).skip(7); + /// + /// assert_eq!(Some(8), stream.next().await); + /// assert_eq!(Some(9), stream.next().await); + /// assert_eq!(Some(10), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn skip(self, n: usize) -> Skip + where + Self: Sized, + { + Skip::new(self, n) + } + + /// Skip elements from the underlying stream while the provided predicate + /// resolves to `true`. + /// + /// This function, like [`Iterator::skip_while`], will ignore elemets from the + /// stream until the predicate `f` resolves to `false`. Once one element + /// returns false, the rest of the elements will be yielded. + /// + /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while() + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3); + /// + /// assert_eq!(Some(3), stream.next().await); + /// assert_eq!(Some(4), stream.next().await); + /// assert_eq!(Some(1), stream.next().await); + /// assert_eq!(None, stream.next().await); + /// # } + /// ``` + fn skip_while(self, f: F) -> SkipWhile + where + F: FnMut(&Self::Item) -> bool, + Self: Sized, + { + SkipWhile::new(self, f) + } + + /// Tests if every element of the stream matches a predicate. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn all(&mut self, f: F) -> bool; + /// ``` + /// + /// `all()` takes a closure that returns `true` or `false`. It applies + /// this closure to each element of the stream, and if they all return + /// `true`, then so does `all`. If any of them return `false`, it + /// returns `false`. An empty stream returns `true`. + /// + /// `all()` is short-circuiting; in other words, it will stop processing + /// as soon as it finds a `false`, given that no matter what else happens, + /// the result will also be `false`. + /// + /// An empty stream returns `true`. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// assert!(stream::iter(&a).all(|&x| x > 0).await); + /// + /// assert!(!stream::iter(&a).all(|&x| x > 2).await); + /// # } + /// ``` + /// + /// Stopping at the first `false`: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// let mut iter = stream::iter(&a); + /// + /// assert!(!iter.all(|&x| x != 2).await); + /// + /// // we can still use `iter`, as there are more elements. + /// assert_eq!(iter.next().await, Some(&3)); + /// # } + /// ``` + fn all(&mut self, f: F) -> AllFuture<'_, Self, F> + where + Self: Unpin, + F: FnMut(Self::Item) -> bool, + { + AllFuture::new(self, f) + } + + /// Tests if any element of the stream matches a predicate. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn any(&mut self, f: F) -> bool; + /// ``` + /// + /// `any()` takes a closure that returns `true` or `false`. It applies + /// this closure to each element of the stream, and if any of them return + /// `true`, then so does `any()`. If they all return `false`, it + /// returns `false`. + /// + /// `any()` is short-circuiting; in other words, it will stop processing + /// as soon as it finds a `true`, given that no matter what else happens, + /// the result will also be `true`. + /// + /// An empty stream returns `false`. + /// + /// Basic usage: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// assert!(stream::iter(&a).any(|&x| x > 0).await); + /// + /// assert!(!stream::iter(&a).any(|&x| x > 5).await); + /// # } + /// ``` + /// + /// Stopping at the first `true`: + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// let a = [1, 2, 3]; + /// + /// let mut iter = stream::iter(&a); + /// + /// assert!(iter.any(|&x| x != 2).await); + /// + /// // we can still use `iter`, as there are more elements. + /// assert_eq!(iter.next().await, Some(&2)); + /// # } + /// ``` + fn any(&mut self, f: F) -> AnyFuture<'_, Self, F> + where + Self: Unpin, + F: FnMut(Self::Item) -> bool, + { + AnyFuture::new(self, f) + } + + /// Combine two streams into one by first returning all values from the + /// first stream then all values from the second stream. + /// + /// As long as `self` still has values to emit, no values from `other` are + /// emitted, even if some are ready. + /// + /// # Examples + /// + /// ``` + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// #[tokio::main] + /// async fn main() { + /// let one = stream::iter(vec![1, 2, 3]); + /// let two = stream::iter(vec![4, 5, 6]); + /// + /// let mut stream = one.chain(two); + /// + /// assert_eq!(stream.next().await, Some(1)); + /// assert_eq!(stream.next().await, Some(2)); + /// assert_eq!(stream.next().await, Some(3)); + /// assert_eq!(stream.next().await, Some(4)); + /// assert_eq!(stream.next().await, Some(5)); + /// assert_eq!(stream.next().await, Some(6)); + /// assert_eq!(stream.next().await, None); + /// } + /// ``` + fn chain(self, other: U) -> Chain + where + U: Stream, + Self: Sized, + { + Chain::new(self, other) + } + + /// A combinator that applies a function to every element in a stream + /// producing a single, final value. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn fold(self, init: B, f: F) -> B; + /// ``` + /// + /// # Examples + /// Basic usage: + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, *}; + /// + /// let s = stream::iter(vec![1u8, 2, 3]); + /// let sum = s.fold(0, |acc, x| acc + x).await; + /// + /// assert_eq!(sum, 6); + /// # } + /// ``` + fn fold(self, init: B, f: F) -> FoldFuture + where + Self: Sized, + F: FnMut(B, Self::Item) -> B, + { + FoldFuture::new(self, init, f) + } + + /// Drain stream pushing all emitted values into a collection. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn collect(self) -> T; + /// ``` + /// + /// `collect` streams all values, awaiting as needed. Values are pushed into + /// a collection. A number of different target collection types are + /// supported, including [`Vec`](std::vec::Vec), + /// [`String`](std::string::String), and [`Bytes`]. + /// + /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html + /// + /// # `Result` + /// + /// `collect()` can also be used with streams of type `Result` where + /// `T: FromStream<_>`. In this case, `collect()` will stream as long as + /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered, + /// streaming is terminated and `collect()` returns the `Err`. + /// + /// # Notes + /// + /// `FromStream` is currently a sealed trait. Stabilization is pending + /// enhancements to the Rust language. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// #[tokio::main] + /// async fn main() { + /// let doubled: Vec = + /// stream::iter(vec![1, 2, 3]) + /// .map(|x| x * 2) + /// .collect() + /// .await; + /// + /// assert_eq!(vec![2, 4, 6], doubled); + /// } + /// ``` + /// + /// Collecting a stream of `Result` values + /// + /// ``` + /// use tokio_stream::{self as stream, StreamExt}; + /// + /// #[tokio::main] + /// async fn main() { + /// // A stream containing only `Ok` values will be collected + /// let values: Result, &str> = + /// stream::iter(vec![Ok(1), Ok(2), Ok(3)]) + /// .collect() + /// .await; + /// + /// assert_eq!(Ok(vec![1, 2, 3]), values); + /// + /// // A stream containing `Err` values will return the first error. + /// let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")]; + /// + /// let values: Result, &str> = + /// stream::iter(results) + /// .collect() + /// .await; + /// + /// assert_eq!(Err("no"), values); + /// } + /// ``` + fn collect(self) -> Collect + where + T: FromStream, + Self: Sized, + { + Collect::new(self) + } + + /// Applies a per-item timeout to the passed stream. + /// + /// `timeout()` takes a `Duration` that represents the maximum amount of + /// time each element of the stream has to complete before timing out. + /// + /// If the wrapped stream yields a value before the deadline is reached, the + /// value is returned. Otherwise, an error is returned. The caller may decide + /// to continue consuming the stream and will eventually get the next source + /// stream value once it becomes available. + /// + /// # Notes + /// + /// This function consumes the stream passed into it and returns a + /// wrapped version of it. + /// + /// Polling the returned stream will continue to poll the inner stream even + /// if one or more items time out. + /// + /// # Examples + /// + /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3): + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt}; + /// use std::time::Duration; + /// # let int_stream = stream::iter(1..=3); + /// + /// let int_stream = int_stream.timeout(Duration::from_secs(1)); + /// tokio::pin!(int_stream); + /// + /// // When no items time out, we get the 3 elements in succession: + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// + /// // If the second item times out, we get an error and continue polling the stream: + /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert!(int_stream.try_next().await.is_err()); + /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// + /// // If we want to stop consuming the source stream the first time an + /// // element times out, we can use the `take_while` operator: + /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); + /// let mut int_stream = int_stream.take_while(Result::is_ok); + /// + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// # } + /// ``` + #[cfg(all(feature = "time"))] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + fn timeout(self, duration: Duration) -> Timeout + where + Self: Sized, + { + Timeout::new(self, duration) + } + + /// Slows down a stream by enforcing a delay between items. + /// + /// # Example + /// + /// Create a throttled stream. + /// ```rust,no_run + /// use std::time::Duration; + /// use tokio_stream::StreamExt; + /// + /// # async fn dox() { + /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2)); + /// tokio::pin!(item_stream); + /// + /// loop { + /// // The string will be produced at most every 2 seconds + /// println!("{:?}", item_stream.next().await); + /// } + /// # } + /// ``` + #[cfg(all(feature = "time"))] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + fn throttle(self, duration: Duration) -> Throttle + where + Self: Sized, + { + throttle(duration, self) + } +} + +impl StreamExt for St where St: Stream {} + +/// Merge the size hints from two streams. +fn merge_size_hints( + (left_low, left_high): (usize, Option), + (right_low, right_hign): (usize, Option), +) -> (usize, Option) { + let low = left_low.saturating_add(right_low); + let high = match (left_high, right_hign) { + (Some(h1), Some(h2)) => h1.checked_add(h2), + _ => None, + }; + (low, high) +} diff --git a/tokio-stream/src/all.rs b/tokio-stream/src/stream_ext/all.rs similarity index 100% rename from tokio-stream/src/all.rs rename to tokio-stream/src/stream_ext/all.rs diff --git a/tokio-stream/src/any.rs b/tokio-stream/src/stream_ext/any.rs similarity index 100% rename from tokio-stream/src/any.rs rename to tokio-stream/src/stream_ext/any.rs diff --git a/tokio-stream/src/chain.rs b/tokio-stream/src/stream_ext/chain.rs similarity index 95% rename from tokio-stream/src/chain.rs rename to tokio-stream/src/stream_ext/chain.rs index cfdef83d73c..bd64f33ce4e 100644 --- a/tokio-stream/src/chain.rs +++ b/tokio-stream/src/stream_ext/chain.rs @@ -1,4 +1,5 @@ -use crate::{Fuse, Stream}; +use crate::stream_ext::Fuse; +use crate::Stream; use core::pin::Pin; use core::task::{Context, Poll}; diff --git a/tokio-stream/src/collect.rs b/tokio-stream/src/stream_ext/collect.rs similarity index 100% rename from tokio-stream/src/collect.rs rename to tokio-stream/src/stream_ext/collect.rs diff --git a/tokio-stream/src/filter.rs b/tokio-stream/src/stream_ext/filter.rs similarity index 100% rename from tokio-stream/src/filter.rs rename to tokio-stream/src/stream_ext/filter.rs diff --git a/tokio-stream/src/filter_map.rs b/tokio-stream/src/stream_ext/filter_map.rs similarity index 100% rename from tokio-stream/src/filter_map.rs rename to tokio-stream/src/stream_ext/filter_map.rs diff --git a/tokio-stream/src/fold.rs b/tokio-stream/src/stream_ext/fold.rs similarity index 100% rename from tokio-stream/src/fold.rs rename to tokio-stream/src/stream_ext/fold.rs diff --git a/tokio-stream/src/fuse.rs b/tokio-stream/src/stream_ext/fuse.rs similarity index 100% rename from tokio-stream/src/fuse.rs rename to tokio-stream/src/stream_ext/fuse.rs diff --git a/tokio-stream/src/map.rs b/tokio-stream/src/stream_ext/map.rs similarity index 100% rename from tokio-stream/src/map.rs rename to tokio-stream/src/stream_ext/map.rs diff --git a/tokio-stream/src/merge.rs b/tokio-stream/src/stream_ext/merge.rs similarity index 97% rename from tokio-stream/src/merge.rs rename to tokio-stream/src/stream_ext/merge.rs index ea0ace0e21e..9d5123c85a3 100644 --- a/tokio-stream/src/merge.rs +++ b/tokio-stream/src/stream_ext/merge.rs @@ -1,4 +1,5 @@ -use crate::{Fuse, Stream}; +use crate::stream_ext::Fuse; +use crate::Stream; use core::pin::Pin; use core::task::{Context, Poll}; diff --git a/tokio-stream/src/next.rs b/tokio-stream/src/stream_ext/next.rs similarity index 100% rename from tokio-stream/src/next.rs rename to tokio-stream/src/stream_ext/next.rs diff --git a/tokio-stream/src/skip.rs b/tokio-stream/src/stream_ext/skip.rs similarity index 100% rename from tokio-stream/src/skip.rs rename to tokio-stream/src/stream_ext/skip.rs diff --git a/tokio-stream/src/skip_while.rs b/tokio-stream/src/stream_ext/skip_while.rs similarity index 100% rename from tokio-stream/src/skip_while.rs rename to tokio-stream/src/stream_ext/skip_while.rs diff --git a/tokio-stream/src/take.rs b/tokio-stream/src/stream_ext/take.rs similarity index 100% rename from tokio-stream/src/take.rs rename to tokio-stream/src/stream_ext/take.rs diff --git a/tokio-stream/src/take_while.rs b/tokio-stream/src/stream_ext/take_while.rs similarity index 100% rename from tokio-stream/src/take_while.rs rename to tokio-stream/src/stream_ext/take_while.rs diff --git a/tokio-stream/src/throttle.rs b/tokio-stream/src/stream_ext/throttle.rs similarity index 100% rename from tokio-stream/src/throttle.rs rename to tokio-stream/src/stream_ext/throttle.rs diff --git a/tokio-stream/src/timeout.rs b/tokio-stream/src/stream_ext/timeout.rs similarity index 97% rename from tokio-stream/src/timeout.rs rename to tokio-stream/src/stream_ext/timeout.rs index 9ebbaa2332c..de17dc0091f 100644 --- a/tokio-stream/src/timeout.rs +++ b/tokio-stream/src/stream_ext/timeout.rs @@ -1,4 +1,5 @@ -use crate::{Fuse, Stream}; +use crate::stream_ext::Fuse; +use crate::Stream; use tokio::time::{Instant, Sleep}; use core::future::Future; diff --git a/tokio-stream/src/try_next.rs b/tokio-stream/src/stream_ext/try_next.rs similarity index 95% rename from tokio-stream/src/try_next.rs rename to tokio-stream/src/stream_ext/try_next.rs index e91e8a429b5..af27d87d8e9 100644 --- a/tokio-stream/src/try_next.rs +++ b/tokio-stream/src/stream_ext/try_next.rs @@ -1,4 +1,5 @@ -use crate::{Next, Stream}; +use crate::stream_ext::Next; +use crate::Stream; use core::future::Future; use core::marker::PhantomPinned;