Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Future/Stream bounds to FusedFuture/FusedStream #1779

Merged
merged 1 commit into from
Aug 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions futures-core/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ pub type BoxFuture<'a, T> = Pin<alloc::boxed::Box<dyn Future<Output = T> + Send
/// `BoxFuture`, but without the `Send` requirement.
pub type LocalBoxFuture<'a, T> = Pin<alloc::boxed::Box<dyn Future<Output = T> + 'a>>;

/// A `Future` or `TryFuture` which tracks whether or not the underlying future
/// A future which tracks whether or not the underlying future
/// should no longer be polled.
///
/// `is_terminated` will return `true` if a future should no longer be polled.
/// Usually, this state occurs after `poll` (or `try_poll`) returned
/// `Poll::Ready`. However, `is_terminated` may also return `true` if a future
/// has become inactive and can no longer make progress and should be ignored
/// or dropped rather than being `poll`ed again.
pub trait FusedFuture {
pub trait FusedFuture: Future {
/// Returns `true` if the underlying future should no longer be polled.
fn is_terminated(&self) -> bool;
}

impl<F: FusedFuture + ?Sized> FusedFuture for &mut F {
impl<F: FusedFuture + ?Sized + Unpin> FusedFuture for &mut F {
fn is_terminated(&self) -> bool {
<F as FusedFuture>::is_terminated(&**self)
}
Expand Down Expand Up @@ -92,7 +92,7 @@ mod if_alloc {
use alloc::boxed::Box;
use super::*;

impl<F: FusedFuture + ?Sized> FusedFuture for Box<F> {
impl<F: FusedFuture + ?Sized + Unpin> FusedFuture for Box<F> {
fn is_terminated(&self) -> bool {
<F as FusedFuture>::is_terminated(&**self)
}
Expand Down
8 changes: 4 additions & 4 deletions futures-core/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,20 @@ where
}
}

/// A `Stream` or `TryStream` which tracks whether or not the underlying stream
/// A stream which tracks whether or not the underlying stream
/// should no longer be polled.
///
/// `is_terminated` will return `true` if a future should no longer be polled.
/// Usually, this state occurs after `poll_next` (or `try_poll_next`) returned
/// `Poll::Ready(None)`. However, `is_terminated` may also return `true` if a
/// stream has become inactive and can no longer make progress and should be
/// ignored or dropped rather than being polled again.
pub trait FusedStream {
pub trait FusedStream: Stream {
/// Returns `true` if the stream should no longer be polled.
fn is_terminated(&self) -> bool;
}

impl<F: ?Sized + FusedStream> FusedStream for &mut F {
impl<F: ?Sized + FusedStream + Unpin> FusedStream for &mut F {
fn is_terminated(&self) -> bool {
<F as FusedStream>::is_terminated(&**self)
}
Expand Down Expand Up @@ -190,7 +190,7 @@ mod if_alloc {
}
}

impl<S: ?Sized + FusedStream> FusedStream for Box<S> {
impl<S: ?Sized + FusedStream + Unpin> FusedStream for Box<S> {
fn is_terminated(&self) -> bool {
<S as FusedStream>::is_terminated(&**self)
}
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/future/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ where
impl<A, B> FusedFuture for Either<A, B>
where
A: FusedFuture,
B: FusedFuture,
B: FusedFuture<Output = A::Output>,
{
fn is_terminated(&self) -> bool {
match self {
Expand Down Expand Up @@ -99,7 +99,7 @@ where
impl<A, B> FusedStream for Either<A, B>
where
A: FusedStream,
B: FusedStream,
B: FusedStream<Item = A::Item>,
{
fn is_terminated(&self) -> bool {
match self {
Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/future/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ impl<Fut: Future, F: FnOnce(&Fut::Output)> Inspect<Fut, F> {

impl<Fut: Future + Unpin, F> Unpin for Inspect<Fut, F> {}

impl<Fut: Future + FusedFuture, F> FusedFuture for Inspect<Fut, F> {
impl<Fut, F> FusedFuture for Inspect<Fut, F>
where Fut: FusedFuture,
F: FnOnce(&Fut::Output),
{
fn is_terminated(&self) -> bool { self.future.is_terminated() }
}

Expand Down
6 changes: 4 additions & 2 deletions futures-util/src/future/lazy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ pub fn lazy<F, R>(f: F) -> Lazy<F>
Lazy { f: Some(f) }
}

impl<F> FusedFuture for Lazy<F> {
impl<F, R> FusedFuture for Lazy<F>
where F: FnOnce(&mut Context<'_>) -> R,
{
fn is_terminated(&self) -> bool { self.f.is_none() }
}

impl<R, F> Future for Lazy<F>
impl<F, R> Future for Lazy<F>
where F: FnOnce(&mut Context<'_>) -> R,
{
type Output = R;
Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/future/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ impl<Fut, F> Map<Fut, F> {

impl<Fut: Unpin, F> Unpin for Map<Fut, F> {}

impl<Fut, F> FusedFuture for Map<Fut, F> {
impl<Fut, F, T> FusedFuture for Map<Fut, F>
where Fut: Future,
F: FnOnce(Fut::Output) -> T,
{
fn is_terminated(&self) -> bool { self.f.is_none() }
}

Expand Down
9 changes: 7 additions & 2 deletions futures-util/src/future/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,19 @@ where
}
}

impl<Fut: Future> FusedFuture for Shared<Fut> {
impl<Fut> FusedFuture for Shared<Fut>
where
Fut: Future,
Fut::Output: Clone,
{
fn is_terminated(&self) -> bool {
self.inner.is_none()
}
}

impl<Fut: Future> Future for Shared<Fut>
impl<Fut> Future for Shared<Fut>
where
Fut: Future,
Fut::Output: Clone,
{
type Output = Fut::Output;
Expand Down
6 changes: 5 additions & 1 deletion futures-util/src/future/then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ impl<Fut1, Fut2, F> Then<Fut1, Fut2, F>
}
}

impl<Fut1, Fut2, F> FusedFuture for Then<Fut1, Fut2, F> {
impl<Fut1, Fut2, F> FusedFuture for Then<Fut1, Fut2, F>
where Fut1: Future,
Fut2: Future,
F: FnOnce(Fut1::Output) -> Fut2,
{
fn is_terminated(&self) -> bool { self.chain.is_terminated() }
}

Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/stream/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ where St1: Stream,
}
}

impl<St1, St2: FusedStream> FusedStream for Chain<St1, St2> {
impl<St1, St2> FusedStream for Chain<St1, St2>
where St1: Stream,
St2: FusedStream<Item=St1::Item>,
{
fn is_terminated(&self) -> bool {
self.first.is_none() && self.second.is_terminated()
}
Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/stream/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ impl<St: Stream, C: Default> Collect<St, C> {
}
}

impl<St: FusedStream, C> FusedFuture for Collect<St, C> {
impl<St, C> FusedFuture for Collect<St, C>
where St: FusedStream,
C: Default + Extend<St:: Item>
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
Expand Down
3 changes: 2 additions & 1 deletion futures-util/src/stream/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ where

impl<St> FusedStream for Flatten<St>
where
St: Stream + FusedStream,
St: FusedStream,
St::Item: Stream,
{
fn is_terminated(&self) -> bool {
self.next.is_none() && self.stream.is_terminated()
Expand Down
6 changes: 5 additions & 1 deletion futures-util/src/stream/fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ where St: Stream,
}
}

impl<St, Fut, T, F> FusedFuture for Fold<St, Fut, T, F> {
impl<St, Fut, T, F> FusedFuture for Fold<St, Fut, T, F>
where St: Stream,
F: FnMut(T, St::Item) -> Fut,
Fut: Future<Output = T>,
{
fn is_terminated(&self) -> bool {
self.accum.is_none() && self.future.is_none()
}
Expand Down
6 changes: 5 additions & 1 deletion futures-util/src/stream/for_each.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ where St: Stream,
}
}

impl<St: FusedStream, Fut, F> FusedFuture for ForEach<St, Fut, F> {
impl<St, Fut, F> FusedFuture for ForEach<St, Fut, F>
where St: FusedStream,
F: FnMut(St::Item) -> Fut,
Fut: Future<Output = ()>,
{
fn is_terminated(&self) -> bool {
self.future.is_none() && self.stream.is_terminated()
}
Expand Down
6 changes: 5 additions & 1 deletion futures-util/src/stream/for_each_concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ where St: Stream,
}
}

impl<St, Fut, F> FusedFuture for ForEachConcurrent<St, Fut, F> {
impl<St, Fut, F> FusedFuture for ForEachConcurrent<St, Fut, F>
where St: Stream,
F: FnMut(St::Item) -> Fut,
Fut: Future<Output = ()>,
{
fn is_terminated(&self) -> bool {
self.stream.is_none() && self.futures.is_empty()
}
Expand Down
6 changes: 5 additions & 1 deletion futures-util/src/stream/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ where
}
}

impl<St: TryStream, Si: Sink<St::Ok> + Unpin> FusedFuture for Forward<St, Si> {
impl<St, Si, Item, E> FusedFuture for Forward<St, Si>
where
Si: Sink<Item, Error = E>,
St: Stream<Item = Result<Item, E>>,
{
fn is_terminated(&self) -> bool {
self.sink.is_none()
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl<St> Fuse<St> {
}
}

impl<S> FusedStream for Fuse<S> {
impl<S: Stream> FusedStream for Fuse<S> {
fn is_terminated(&self) -> bool {
self.done
}
Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/stream/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ impl<St, F> Inspect<St, F>
}
}

impl<St: Stream + FusedStream, F> FusedStream for Inspect<St, F> {
impl<St, F> FusedStream for Inspect<St, F>
where St: FusedStream,
F: FnMut(&St::Item),
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/into_future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl<St: Stream + Unpin> StreamFuture<St> {
}
}

impl<St> FusedFuture for StreamFuture<St> {
impl<St: Stream + Unpin> FusedFuture for StreamFuture<St> {
fn is_terminated(&self) -> bool {
self.stream.is_none()
}
Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/stream/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ impl<St, T, F> Map<St, F>
}
}

impl<St: FusedStream, F> FusedStream for Map<St, F> {
impl<St, F, T> FusedStream for Map<St, F>
where St: FusedStream,
F: FnMut(St::Item) -> T,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/next.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> {
}
}

impl<St: ?Sized + FusedStream> FusedFuture for Next<'_, St> {
impl<St: ?Sized + FusedStream + Unpin> FusedFuture for Next<'_, St> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/stream/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ impl<St1, St2> Select<St1, St2> {
}
}

impl<St1, St2> FusedStream for Select<St1, St2> {
impl<St1, St2> FusedStream for Select<St1, St2>
where St1: Stream,
St2: Stream<Item = St1::Item>
{
fn is_terminated(&self) -> bool {
self.stream1.is_terminated() && self.stream2.is_terminated()
}
Expand Down
6 changes: 3 additions & 3 deletions futures-util/src/stream/select_next_some.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::pin::Pin;
use futures_core::stream::{Stream, FusedStream};
use futures_core::stream::FusedStream;
use futures_core::future::{Future, FusedFuture};
use futures_core::task::{Context, Poll};
use crate::stream::StreamExt;
Expand All @@ -18,13 +18,13 @@ impl<'a, St: ?Sized> SelectNextSome<'a, St> {
}
}

impl<St: ?Sized + FusedStream> FusedFuture for SelectNextSome<'_, St> {
impl<St: ?Sized + FusedStream + Unpin> FusedFuture for SelectNextSome<'_, St> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}

impl<St: ?Sized + Stream + FusedStream + Unpin> Future for SelectNextSome<'_, St> {
impl<St: ?Sized + FusedStream + Unpin> Future for SelectNextSome<'_, St> {
type Output = St::Item;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand Down
6 changes: 5 additions & 1 deletion futures-util/src/stream/skip_while.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ impl<St, Fut, F> SkipWhile<St, Fut, F>
}
}

impl<St: Stream + FusedStream, Fut, F> FusedStream for SkipWhile<St, Fut, F> {
impl<St, Fut, F> FusedStream for SkipWhile<St, Fut, F>
where St: FusedStream,
F: FnMut(&St::Item) -> Fut,
Fut: Future<Output = bool>,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
Expand Down
6 changes: 5 additions & 1 deletion futures-util/src/stream/then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ impl<St, Fut, F> Then<St, Fut, F>
}
}

impl<St: FusedStream, Fut, F> FusedStream for Then<St, Fut, F> {
impl<St, Fut, F> FusedStream for Then<St, Fut, F>
where St: FusedStream,
F: FnMut(St::Item) -> Fut,
Fut: Future,
{
fn is_terminated(&self) -> bool {
self.future.is_none() && self.stream.is_terminated()
}
Expand Down
17 changes: 10 additions & 7 deletions futures-util/src/stream/unfold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ use pin_utils::{unsafe_pinned, unsafe_unpinned};
/// assert_eq!(result, vec![0, 2, 4]);
/// # });
/// ```
pub fn unfold<T, F, Fut, It>(init: T, f: F) -> Unfold<T, F, Fut>
pub fn unfold<T, F, Fut, Item>(init: T, f: F) -> Unfold<T, F, Fut>
where F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(It, T)>>,
Fut: Future<Output = Option<(Item, T)>>,
{
Unfold {
f,
Expand Down Expand Up @@ -90,22 +90,25 @@ impl<T, F, Fut> Unfold<T, F, Fut> {
unsafe_pinned!(fut: Option<Fut>);
}

impl<T, F, Fut> FusedStream for Unfold<T, F, Fut> {
impl<T, F, Fut, Item> FusedStream for Unfold<T, F, Fut>
where F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(Item, T)>>,
{
fn is_terminated(&self) -> bool {
self.state.is_none() && self.fut.is_none()
}
}

impl<T, F, Fut, It> Stream for Unfold<T, F, Fut>
impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
where F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(It, T)>>,
Fut: Future<Output = Option<(Item, T)>>,
{
type Item = It;
type Item = Item;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<It>> {
) -> Poll<Option<Self::Item>> {
if let Some(state) = self.as_mut().state().take() {
let fut = (self.as_mut().f())(state);
self.as_mut().fut().set(Some(fut));
Expand Down
Loading