From 8b59a89c82b3081a12c4f66b53860df2f94b21ce Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sun, 14 Jul 2019 16:12:53 +0900 Subject: [PATCH] Minor clean up StreamExt::flatten --- futures-util/src/stream/flatten.rs | 54 ++++++++++++++++++------------ 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/futures-util/src/stream/flatten.rs b/futures-util/src/stream/flatten.rs index df43f7e1ee..b46c0b13bb 100644 --- a/futures-util/src/stream/flatten.rs +++ b/futures-util/src/stream/flatten.rs @@ -9,26 +9,35 @@ use pin_utils::unsafe_pinned; #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Flatten - where St: Stream, +where + St: Stream, { stream: St, next: Option, } -impl Unpin for Flatten -where St: Stream + Unpin, - St::Item: Stream + Unpin, -{} +impl Unpin for Flatten +where + St: Stream + Unpin, + St::Item: Unpin, +{ +} -impl Flatten -where St: Stream, - St::Item: Stream, +impl Flatten +where + St: Stream, { unsafe_pinned!(stream: St); unsafe_pinned!(next: Option); +} - pub(super) fn new(stream: St) -> Flatten{ - Flatten { stream, next: None, } +impl Flatten +where + St: Stream, + St::Item: Stream, +{ + pub(super) fn new(stream: St) -> Self { + Self { stream, next: None } } /// Acquires a reference to the underlying stream that this combinator is @@ -64,22 +73,23 @@ where St: Stream, } } -impl FusedStream for Flatten { +impl FusedStream for Flatten +where + St: Stream + FusedStream, +{ fn is_terminated(&self) -> bool { self.next.is_none() && self.stream.is_terminated() } } impl Stream for Flatten - where St: Stream, - St::Item: Stream, +where + St: Stream, + St::Item: Stream, { type Item = ::Item; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { if self.next.is_none() { match ready!(self.as_mut().stream().poll_next(cx)) { @@ -87,9 +97,9 @@ impl Stream for Flatten None => return Poll::Ready(None), } } - let item = ready!(self.as_mut().next().as_pin_mut().unwrap().poll_next(cx)); - if item.is_some() { - return Poll::Ready(item); + + if let Some(item) = ready!(self.as_mut().next().as_pin_mut().unwrap().poll_next(cx)) { + return Poll::Ready(Some(item)); } else { self.as_mut().next().set(None); } @@ -100,8 +110,8 @@ impl Stream for Flatten // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl Sink for Flatten - where S: Stream + Sink, - S::Item: Stream, +where + S: Stream + Sink, { type Error = S::Error;