Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FusedStream implementations #1831

Merged
merged 1 commit into from
Aug 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion futures-util/src/sink/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures_core::stream::Stream;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
use pin_utils::{unsafe_pinned, unsafe_unpinned};
Expand Down Expand Up @@ -78,6 +78,12 @@ impl<S, Item> Stream for Buffer<S, Item> where S: Sink<Item> + Stream {
}
}

impl<S, Item> FusedStream for Buffer<S, Item> where S: Sink<Item> + FusedStream {
fn is_terminated(&self) -> bool {
self.sink.is_terminated()
}
}

impl<Si: Sink<Item>, Item> Sink<Item> for Buffer<Si, Item> {
type Error = Si::Error;

Expand Down
12 changes: 11 additions & 1 deletion futures-util/src/sink/err_into.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::sink::{SinkExt, SinkMapErr};
use core::pin::Pin;
use futures_core::stream::Stream;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
use futures_sink::{Sink};
use pin_utils::unsafe_pinned;
Expand Down Expand Up @@ -57,6 +57,7 @@ impl<Si, Item, E> Sink<Item> for SinkErrInto<Si, Item, E>
delegate_sink!(sink, Item);
}

// Forwarding impl of Stream from the underlying sink
impl<S, Item, E> Stream for SinkErrInto<S, Item, E>
where S: Sink<Item> + Stream,
S::Error: Into<E>
Expand All @@ -70,3 +71,12 @@ impl<S, Item, E> Stream for SinkErrInto<S, Item, E>
self.sink().poll_next(cx)
}
}

impl<S, Item, E> FusedStream for SinkErrInto<S, Item, E>
where S: Sink<Item> + FusedStream,
S::Error: Into<E>
{
fn is_terminated(&self) -> bool {
self.sink.is_terminated()
}
}
9 changes: 8 additions & 1 deletion futures-util/src/sink/map_err.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::pin::Pin;
use futures_core::stream::Stream;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
use futures_sink::{Sink};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
Expand Down Expand Up @@ -85,6 +85,7 @@ impl<Si, F, E, Item> Sink<Item> for SinkMapErr<Si, F>
}
}

// Forwarding impl of Stream from the underlying sink
impl<S: Stream, F> Stream for SinkMapErr<S, F> {
type Item = S::Item;

Expand All @@ -95,3 +96,9 @@ impl<S: Stream, F> Stream for SinkMapErr<S, F> {
self.sink().poll_next(cx)
}
}

impl<S: FusedStream, F> FusedStream for SinkMapErr<S, F> {
fn is_terminated(&self) -> bool {
self.sink.is_terminated()
}
}
14 changes: 13 additions & 1 deletion futures-util/src/sink/with_flat_map.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use core::fmt;
use core::marker::PhantomData;
use core::pin::Pin;
use futures_core::stream::Stream;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
use pin_utils::{unsafe_pinned, unsafe_unpinned};
Expand Down Expand Up @@ -113,6 +113,7 @@ where
}
}

// Forwarding impl of Stream from the underlying sink
impl<S, Item, U, St, F> Stream for WithFlatMap<S, Item, U, St, F>
where
S: Stream + Sink<Item>,
Expand All @@ -128,6 +129,17 @@ where
}
}

impl<S, Item, U, St, F> FusedStream for WithFlatMap<S, Item, U, St, F>
where
S: FusedStream + Sink<Item>,
F: FnMut(U) -> St,
St: Stream<Item = Result<Item, S::Error>>,
{
fn is_terminated(&self) -> bool {
self.sink.is_terminated()
}
}

impl<Si, Item, U, St, F> Sink<U> for WithFlatMap<Si, Item, U, St, F>
where
Si: Sink<Item>,
Expand Down
11 changes: 8 additions & 3 deletions futures-util/src/stream/catch_unwind.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures_core::stream::Stream;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::any::Any;
Expand All @@ -22,8 +22,7 @@ impl<St: Stream + UnwindSafe> CatchUnwind<St> {
}
}

impl<St: Stream + UnwindSafe> Stream for CatchUnwind<St>
{
impl<St: Stream + UnwindSafe> Stream for CatchUnwind<St> {
type Item = Result<St::Item, Box<dyn Any + Send>>;

fn poll_next(
Expand All @@ -47,3 +46,9 @@ impl<St: Stream + UnwindSafe> Stream for CatchUnwind<St>
}
}
}

impl<St: FusedStream + UnwindSafe> FusedStream for CatchUnwind<St> {
fn is_terminated(&self) -> bool {
self.caught_unwind || self.stream.is_terminated()
}
}
8 changes: 7 additions & 1 deletion futures-util/src/stream/chunks.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::stream::Fuse;
use futures_core::stream::Stream;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
Expand Down Expand Up @@ -107,6 +107,12 @@ impl<St: Stream> Stream for Chunks<St> {
}
}

impl<St: FusedStream> FusedStream for Chunks<St> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated() && self.items.is_empty()
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for Chunks<S>
Expand Down
8 changes: 7 additions & 1 deletion futures-util/src/stream/once.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;

Expand Down Expand Up @@ -51,3 +51,9 @@ impl<Fut: Future> Stream for Once<Fut> {
Poll::Ready(Some(val))
}
}

impl<Fut: Future> FusedStream for Once<Fut> {
fn is_terminated(&self) -> bool {
self.future.is_none()
}
}
10 changes: 9 additions & 1 deletion futures-util/src/stream/repeat.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::pin::Pin;
use futures_core::stream::Stream;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};

/// Stream for the [`repeat`] function.
Expand Down Expand Up @@ -40,3 +40,11 @@ impl<T> Stream for Repeat<T>
Poll::Ready(Some(self.item.clone()))
}
}

impl<T> FusedStream for Repeat<T>
where T: Clone,
{
fn is_terminated(&self) -> bool {
false
}
}
2 changes: 1 addition & 1 deletion futures-util/src/stream/skip_while.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl<St, Fut, F> FusedStream for SkipWhile<St, Fut, F>
Fut: Future<Output = bool>,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
self.pending_item.is_none() && self.stream.is_terminated()
}
}

Expand Down
10 changes: 9 additions & 1 deletion futures-util/src/stream/take.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::pin::Pin;
use futures_core::stream::Stream;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
Expand Down Expand Up @@ -81,6 +81,14 @@ impl<St> Stream for Take<St>
}
}

impl<St> FusedStream for Take<St>
where St: FusedStream,
{
fn is_terminated(&self) -> bool {
self.remaining == 0 || self.stream.is_terminated()
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for Take<S>
Expand Down
12 changes: 11 additions & 1 deletion futures-util/src/stream/take_while.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
Expand Down Expand Up @@ -129,6 +129,16 @@ impl<St, Fut, F> Stream for TakeWhile<St, Fut, F>
}
}

impl<St, Fut, F> FusedStream for TakeWhile<St, Fut, F>
where St: FusedStream,
F: FnMut(&St::Item) -> Fut,
Fut: Future<Output = bool>,
{
fn is_terminated(&self) -> bool {
self.done_taking || self.pending_item.is_none() && self.stream.is_terminated()
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Fut, F, Item> Sink<Item> for TakeWhile<S, Fut, F>
Expand Down
12 changes: 11 additions & 1 deletion futures-util/src/try_stream/and_then.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::TryFuture;
use futures_core::stream::{Stream, TryStream};
use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
Expand Down Expand Up @@ -104,6 +104,16 @@ impl<St, Fut, F> Stream for AndThen<St, Fut, F>
}
}

impl<St, Fut, F> FusedStream for AndThen<St, Fut, F>
where St: TryStream + FusedStream,
F: FnMut(St::Ok) -> Fut,
Fut: TryFuture<Error = St::Error>,
{
fn is_terminated(&self) -> bool {
self.future.is_none() && self.stream.is_terminated()
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Fut, F, Item> Sink<Item> for AndThen<S, Fut, F>
Expand Down
12 changes: 11 additions & 1 deletion futures-util/src/try_stream/or_else.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::TryFuture;
use futures_core::stream::{Stream, TryStream};
use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
Expand Down Expand Up @@ -105,6 +105,16 @@ impl<St, Fut, F> Stream for OrElse<St, Fut, F>
}
}

impl<St, Fut, F> FusedStream for OrElse<St, Fut, F>
where St: TryStream + FusedStream,
F: FnMut(St::Error) -> Fut,
Fut: TryFuture<Ok = St::Ok>,
{
fn is_terminated(&self) -> bool {
self.future.is_none() && self.stream.is_terminated()
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Fut, F, Item> Sink<Item> for OrElse<S, Fut, F>
Expand Down
12 changes: 11 additions & 1 deletion futures-util/src/try_stream/try_skip_while.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::TryFuture;
use futures_core::stream::{Stream, TryStream};
use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
Expand Down Expand Up @@ -133,6 +133,16 @@ impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F>
}
}

impl<St, Fut, F> FusedStream for TrySkipWhile<St, Fut, F>
where St: TryStream + FusedStream,
F: FnMut(&St::Ok) -> Fut,
Fut: TryFuture<Ok = bool, Error = St::Error>,
{
fn is_terminated(&self) -> bool {
self.pending_item.is_none() && self.stream.is_terminated()
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Fut, F, Item, E> Sink<Item> for TrySkipWhile<S, Fut, F>
Expand Down