From 0805eacf138b1285a528e5b879514c83143c8591 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Fri, 26 Jul 2019 04:59:36 +0900 Subject: [PATCH] Add TryStreamExt::try_flatten (#1731) * Add TryStreamExt::try_flatten * Minor clean up StreamExt::flatten --- futures-util/src/stream/flatten.rs | 54 +++++---- futures-util/src/try_stream/mod.rs | 49 ++++++++ futures-util/src/try_stream/try_flatten.rs | 127 +++++++++++++++++++++ futures/src/lib.rs | 2 +- 4 files changed, 209 insertions(+), 23 deletions(-) create mode 100644 futures-util/src/try_stream/try_flatten.rs diff --git a/futures-util/src/stream/flatten.rs b/futures-util/src/stream/flatten.rs index df43f7e1ee..b46c0b13bb 100644 --- a/futures-util/src/stream/flatten.rs +++ b/futures-util/src/stream/flatten.rs @@ -9,26 +9,35 @@ use pin_utils::unsafe_pinned; #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Flatten - where St: Stream, +where + St: Stream, { stream: St, next: Option, } -impl Unpin for Flatten -where St: Stream + Unpin, - St::Item: Stream + Unpin, -{} +impl Unpin for Flatten +where + St: Stream + Unpin, + St::Item: Unpin, +{ +} -impl Flatten -where St: Stream, - St::Item: Stream, +impl Flatten +where + St: Stream, { unsafe_pinned!(stream: St); unsafe_pinned!(next: Option); +} - pub(super) fn new(stream: St) -> Flatten{ - Flatten { stream, next: None, } +impl Flatten +where + St: Stream, + St::Item: Stream, +{ + pub(super) fn new(stream: St) -> Self { + Self { stream, next: None } } /// Acquires a reference to the underlying stream that this combinator is @@ -64,22 +73,23 @@ where St: Stream, } } -impl FusedStream for Flatten { +impl FusedStream for Flatten +where + St: Stream + FusedStream, +{ fn is_terminated(&self) -> bool { self.next.is_none() && self.stream.is_terminated() } } impl Stream for Flatten - where St: Stream, - St::Item: Stream, +where + St: Stream, + St::Item: Stream, { type Item = ::Item; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { if self.next.is_none() { match ready!(self.as_mut().stream().poll_next(cx)) { @@ -87,9 +97,9 @@ impl Stream for Flatten None => return Poll::Ready(None), } } - let item = ready!(self.as_mut().next().as_pin_mut().unwrap().poll_next(cx)); - if item.is_some() { - return Poll::Ready(item); + + if let Some(item) = ready!(self.as_mut().next().as_pin_mut().unwrap().poll_next(cx)) { + return Poll::Ready(Some(item)); } else { self.as_mut().next().set(None); } @@ -100,8 +110,8 @@ impl Stream for Flatten // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl Sink for Flatten - where S: Stream + Sink, - S::Item: Stream, +where + S: Stream + Sink, { type Error = S::Error; diff --git a/futures-util/src/try_stream/mod.rs b/futures-util/src/try_stream/mod.rs index 434502a050..e791613ea5 100644 --- a/futures-util/src/try_stream/mod.rs +++ b/futures-util/src/try_stream/mod.rs @@ -47,6 +47,9 @@ pub use self::try_filter::TryFilter; mod try_filter_map; pub use self::try_filter_map::TryFilterMap; +mod try_flatten; +pub use self::try_flatten::TryFlatten; + mod try_collect; pub use self::try_collect::TryCollect; @@ -556,6 +559,52 @@ pub trait TryStreamExt: TryStream { TryFilterMap::new(self, f) } + /// Flattens a stream of streams into just one continuous stream. + /// + /// If this stream's elements are themselves streams then this combinator + /// will flatten out the entire stream to one long chain of elements. Any + /// errors are passed through without looking at them, but otherwise each + /// individual stream will get exhausted before moving on to the next. + /// + /// # Examples + /// + /// ``` + /// #![feature(async_await)] + /// # futures::executor::block_on(async { + /// use futures::channel::mpsc; + /// use futures::stream::{StreamExt, TryStreamExt}; + /// use std::thread; + /// + /// let (tx1, rx1) = mpsc::unbounded(); + /// let (tx2, rx2) = mpsc::unbounded(); + /// let (tx3, rx3) = mpsc::unbounded(); + /// + /// thread::spawn(move || { + /// tx1.unbounded_send(Ok(1)).unwrap(); + /// }); + /// thread::spawn(move || { + /// tx2.unbounded_send(Ok(2)).unwrap(); + /// tx2.unbounded_send(Err(3)).unwrap(); + /// }); + /// thread::spawn(move || { + /// tx3.unbounded_send(Ok(rx1)).unwrap(); + /// tx3.unbounded_send(Ok(rx2)).unwrap(); + /// tx3.unbounded_send(Err(4)).unwrap(); + /// }); + /// + /// let mut stream = rx3.try_flatten(); + /// assert_eq!(stream.next().await, Some(Ok(1))); + /// assert_eq!(stream.next().await, Some(Ok(2))); + /// assert_eq!(stream.next().await, Some(Err(3))); + /// # }); + /// ``` + fn try_flatten(self) -> TryFlatten + where Self::Ok: TryStream, + ::Error: From, + Self: Sized, + { + TryFlatten::new(self) + } /// Attempt to execute an accumulating asynchronous computation over a /// stream, collecting all the values into one final result. diff --git a/futures-util/src/try_stream/try_flatten.rs b/futures-util/src/try_stream/try_flatten.rs new file mode 100644 index 0000000000..a853e60a61 --- /dev/null +++ b/futures-util/src/try_stream/try_flatten.rs @@ -0,0 +1,127 @@ +use core::pin::Pin; +use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use pin_utils::unsafe_pinned; + +/// Stream for the [`try_flatten`](super::TryStreamExt::try_flatten) method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct TryFlatten +where + St: TryStream, +{ + stream: St, + next: Option, +} + +impl Unpin for TryFlatten +where + St: TryStream + Unpin, + St::Ok: Unpin, +{ +} + +impl TryFlatten +where + St: TryStream, +{ + unsafe_pinned!(stream: St); + unsafe_pinned!(next: Option); +} + +impl TryFlatten +where + St: TryStream, + St::Ok: TryStream, + ::Error: From, +{ + pub(super) fn new(stream: St) -> Self { + Self { stream, next: None } + } + + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &St { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut St { + &mut self.stream + } + + /// Acquires a pinned mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> { + self.stream() + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> St { + self.stream + } +} + +impl FusedStream for TryFlatten +where + St: TryStream + FusedStream, +{ + fn is_terminated(&self) -> bool { + self.next.is_none() && self.stream.is_terminated() + } +} + +impl Stream for TryFlatten +where + St: TryStream, + St::Ok: TryStream, + ::Error: From, +{ + type Item = Result<::Ok, ::Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + if self.next.is_none() { + match ready!(self.as_mut().stream().try_poll_next(cx)?) { + Some(e) => self.as_mut().next().set(Some(e)), + None => return Poll::Ready(None), + } + } + + if let Some(item) = ready!(self + .as_mut() + .next() + .as_pin_mut() + .unwrap() + .try_poll_next(cx)?) + { + return Poll::Ready(Some(Ok(item))); + } else { + self.as_mut().next().set(None); + } + } + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl Sink for TryFlatten +where + S: TryStream + Sink, +{ + type Error = >::Error; + + delegate_sink!(stream, Item); +} diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 2a8b65c65f..675b2656bd 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -449,7 +449,7 @@ pub mod stream { TryStreamExt, AndThen, ErrInto, MapOk, MapErr, OrElse, InspectOk, InspectErr, - TryNext, TryForEach, TryFilterMap, + TryNext, TryForEach, TryFilterMap, TryFlatten, TryCollect, TryFold, TrySkipWhile, IntoStream, };