From a6abbf67a6dcffb5be14bbbea8e5788819715faf Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Tue, 27 Aug 2019 23:45:53 +0900 Subject: [PATCH] Add FusedStream implementations --- futures-util/src/sink/buffer.rs | 8 +++++++- futures-util/src/sink/err_into.rs | 12 +++++++++++- futures-util/src/sink/map_err.rs | 9 ++++++++- futures-util/src/sink/with_flat_map.rs | 14 +++++++++++++- futures-util/src/stream/catch_unwind.rs | 11 ++++++++--- futures-util/src/stream/chunks.rs | 8 +++++++- futures-util/src/stream/once.rs | 8 +++++++- futures-util/src/stream/repeat.rs | 10 +++++++++- futures-util/src/stream/skip_while.rs | 2 +- futures-util/src/stream/take.rs | 10 +++++++++- futures-util/src/stream/take_while.rs | 12 +++++++++++- futures-util/src/try_stream/and_then.rs | 12 +++++++++++- futures-util/src/try_stream/or_else.rs | 12 +++++++++++- futures-util/src/try_stream/try_skip_while.rs | 12 +++++++++++- 14 files changed, 124 insertions(+), 16 deletions(-) diff --git a/futures-util/src/sink/buffer.rs b/futures-util/src/sink/buffer.rs index a39ff9d749..046efcaa4b 100644 --- a/futures-util/src/sink/buffer.rs +++ b/futures-util/src/sink/buffer.rs @@ -1,4 +1,4 @@ -use futures_core::stream::Stream; +use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; use pin_utils::{unsafe_pinned, unsafe_unpinned}; @@ -78,6 +78,12 @@ impl Stream for Buffer where S: Sink + Stream { } } +impl FusedStream for Buffer where S: Sink + FusedStream { + fn is_terminated(&self) -> bool { + self.sink.is_terminated() + } +} + impl, Item> Sink for Buffer { type Error = Si::Error; diff --git a/futures-util/src/sink/err_into.rs b/futures-util/src/sink/err_into.rs index b1b99a2bf7..d93c8aa393 100644 --- a/futures-util/src/sink/err_into.rs +++ b/futures-util/src/sink/err_into.rs @@ -1,6 +1,6 @@ use crate::sink::{SinkExt, SinkMapErr}; use core::pin::Pin; -use futures_core::stream::Stream; +use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; use futures_sink::{Sink}; use pin_utils::unsafe_pinned; @@ -57,6 +57,7 @@ impl Sink for SinkErrInto delegate_sink!(sink, Item); } +// Forwarding impl of Stream from the underlying sink impl Stream for SinkErrInto where S: Sink + Stream, S::Error: Into @@ -70,3 +71,12 @@ impl Stream for SinkErrInto self.sink().poll_next(cx) } } + +impl FusedStream for SinkErrInto + where S: Sink + FusedStream, + S::Error: Into +{ + fn is_terminated(&self) -> bool { + self.sink.is_terminated() + } +} diff --git a/futures-util/src/sink/map_err.rs b/futures-util/src/sink/map_err.rs index a7b102ca41..bd2ed7ae83 100644 --- a/futures-util/src/sink/map_err.rs +++ b/futures-util/src/sink/map_err.rs @@ -1,5 +1,5 @@ use core::pin::Pin; -use futures_core::stream::Stream; +use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; use futures_sink::{Sink}; use pin_utils::{unsafe_pinned, unsafe_unpinned}; @@ -85,6 +85,7 @@ impl Sink for SinkMapErr } } +// Forwarding impl of Stream from the underlying sink impl Stream for SinkMapErr { type Item = S::Item; @@ -95,3 +96,9 @@ impl Stream for SinkMapErr { self.sink().poll_next(cx) } } + +impl FusedStream for SinkMapErr { + fn is_terminated(&self) -> bool { + self.sink.is_terminated() + } +} diff --git a/futures-util/src/sink/with_flat_map.rs b/futures-util/src/sink/with_flat_map.rs index 1aa186785a..0b2726e07d 100644 --- a/futures-util/src/sink/with_flat_map.rs +++ b/futures-util/src/sink/with_flat_map.rs @@ -1,7 +1,7 @@ use core::fmt; use core::marker::PhantomData; use core::pin::Pin; -use futures_core::stream::Stream; +use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; use pin_utils::{unsafe_pinned, unsafe_unpinned}; @@ -113,6 +113,7 @@ where } } +// Forwarding impl of Stream from the underlying sink impl Stream for WithFlatMap where S: Stream + Sink, @@ -128,6 +129,17 @@ where } } +impl FusedStream for WithFlatMap +where + S: FusedStream + Sink, + F: FnMut(U) -> St, + St: Stream>, +{ + fn is_terminated(&self) -> bool { + self.sink.is_terminated() + } +} + impl Sink for WithFlatMap where Si: Sink, diff --git a/futures-util/src/stream/catch_unwind.rs b/futures-util/src/stream/catch_unwind.rs index 696698841b..0486c51c3a 100644 --- a/futures-util/src/stream/catch_unwind.rs +++ b/futures-util/src/stream/catch_unwind.rs @@ -1,4 +1,4 @@ -use futures_core::stream::Stream; +use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; use pin_utils::{unsafe_pinned, unsafe_unpinned}; use std::any::Any; @@ -22,8 +22,7 @@ impl CatchUnwind { } } -impl Stream for CatchUnwind -{ +impl Stream for CatchUnwind { type Item = Result>; fn poll_next( @@ -47,3 +46,9 @@ impl Stream for CatchUnwind } } } + +impl FusedStream for CatchUnwind { + fn is_terminated(&self) -> bool { + self.caught_unwind || self.stream.is_terminated() + } +} diff --git a/futures-util/src/stream/chunks.rs b/futures-util/src/stream/chunks.rs index bf75bb24bf..1c8b518df3 100644 --- a/futures-util/src/stream/chunks.rs +++ b/futures-util/src/stream/chunks.rs @@ -1,5 +1,5 @@ use crate::stream::Fuse; -use futures_core::stream::Stream; +use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; @@ -107,6 +107,12 @@ impl Stream for Chunks { } } +impl FusedStream for Chunks { + fn is_terminated(&self) -> bool { + self.stream.is_terminated() && self.items.is_empty() + } +} + // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl Sink for Chunks diff --git a/futures-util/src/stream/once.rs b/futures-util/src/stream/once.rs index 36f72cf1ec..d9d1512b52 100644 --- a/futures-util/src/stream/once.rs +++ b/futures-util/src/stream/once.rs @@ -1,6 +1,6 @@ use core::pin::Pin; use futures_core::future::Future; -use futures_core::stream::Stream; +use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; use pin_utils::unsafe_pinned; @@ -51,3 +51,9 @@ impl Stream for Once { Poll::Ready(Some(val)) } } + +impl FusedStream for Once { + fn is_terminated(&self) -> bool { + self.future.is_none() + } +} diff --git a/futures-util/src/stream/repeat.rs b/futures-util/src/stream/repeat.rs index 41b6bc5ad3..d869b5dffb 100644 --- a/futures-util/src/stream/repeat.rs +++ b/futures-util/src/stream/repeat.rs @@ -1,5 +1,5 @@ use core::pin::Pin; -use futures_core::stream::Stream; +use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; /// Stream for the [`repeat`] function. @@ -40,3 +40,11 @@ impl Stream for Repeat Poll::Ready(Some(self.item.clone())) } } + +impl FusedStream for Repeat + where T: Clone, +{ + fn is_terminated(&self) -> bool { + false + } +} diff --git a/futures-util/src/stream/skip_while.rs b/futures-util/src/stream/skip_while.rs index a3bb320a2e..f18f6c422b 100644 --- a/futures-util/src/stream/skip_while.rs +++ b/futures-util/src/stream/skip_while.rs @@ -95,7 +95,7 @@ impl FusedStream for SkipWhile Fut: Future, { fn is_terminated(&self) -> bool { - self.stream.is_terminated() + self.pending_item.is_none() && self.stream.is_terminated() } } diff --git a/futures-util/src/stream/take.rs b/futures-util/src/stream/take.rs index df2750836b..cd9113e531 100644 --- a/futures-util/src/stream/take.rs +++ b/futures-util/src/stream/take.rs @@ -1,5 +1,5 @@ use core::pin::Pin; -use futures_core::stream::Stream; +use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; @@ -81,6 +81,14 @@ impl Stream for Take } } +impl FusedStream for Take + where St: FusedStream, +{ + fn is_terminated(&self) -> bool { + self.remaining == 0 || self.stream.is_terminated() + } +} + // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl Sink for Take diff --git a/futures-util/src/stream/take_while.rs b/futures-util/src/stream/take_while.rs index 1f0ba32ca1..e6225ca3b3 100644 --- a/futures-util/src/stream/take_while.rs +++ b/futures-util/src/stream/take_while.rs @@ -1,7 +1,7 @@ use core::fmt; use core::pin::Pin; use futures_core::future::Future; -use futures_core::stream::Stream; +use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; @@ -129,6 +129,16 @@ impl Stream for TakeWhile } } +impl FusedStream for TakeWhile + where St: FusedStream, + F: FnMut(&St::Item) -> Fut, + Fut: Future, +{ + fn is_terminated(&self) -> bool { + self.done_taking || self.pending_item.is_none() && self.stream.is_terminated() + } +} + // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl Sink for TakeWhile diff --git a/futures-util/src/try_stream/and_then.rs b/futures-util/src/try_stream/and_then.rs index c500028b69..d92b9a93e9 100644 --- a/futures-util/src/try_stream/and_then.rs +++ b/futures-util/src/try_stream/and_then.rs @@ -1,7 +1,7 @@ use core::fmt; use core::pin::Pin; use futures_core::future::TryFuture; -use futures_core::stream::{Stream, TryStream}; +use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; @@ -104,6 +104,16 @@ impl Stream for AndThen } } +impl FusedStream for AndThen + where St: TryStream + FusedStream, + F: FnMut(St::Ok) -> Fut, + Fut: TryFuture, +{ + fn is_terminated(&self) -> bool { + self.future.is_none() && self.stream.is_terminated() + } +} + // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl Sink for AndThen diff --git a/futures-util/src/try_stream/or_else.rs b/futures-util/src/try_stream/or_else.rs index 0fe256572d..4a18fe967e 100644 --- a/futures-util/src/try_stream/or_else.rs +++ b/futures-util/src/try_stream/or_else.rs @@ -1,7 +1,7 @@ use core::fmt; use core::pin::Pin; use futures_core::future::TryFuture; -use futures_core::stream::{Stream, TryStream}; +use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; @@ -105,6 +105,16 @@ impl Stream for OrElse } } +impl FusedStream for OrElse + where St: TryStream + FusedStream, + F: FnMut(St::Error) -> Fut, + Fut: TryFuture, +{ + fn is_terminated(&self) -> bool { + self.future.is_none() && self.stream.is_terminated() + } +} + // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl Sink for OrElse diff --git a/futures-util/src/try_stream/try_skip_while.rs b/futures-util/src/try_stream/try_skip_while.rs index 6d446fcc2b..0c0355a92b 100644 --- a/futures-util/src/try_stream/try_skip_while.rs +++ b/futures-util/src/try_stream/try_skip_while.rs @@ -1,7 +1,7 @@ use core::fmt; use core::pin::Pin; use futures_core::future::TryFuture; -use futures_core::stream::{Stream, TryStream}; +use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; @@ -133,6 +133,16 @@ impl Stream for TrySkipWhile } } +impl FusedStream for TrySkipWhile + where St: TryStream + FusedStream, + F: FnMut(&St::Ok) -> Fut, + Fut: TryFuture, +{ + fn is_terminated(&self) -> bool { + self.pending_item.is_none() && self.stream.is_terminated() + } +} + // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl Sink for TrySkipWhile