Skip to content

Commit

Permalink
Add Future/Stream bounds to FusedFuture/FusedStream
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e authored and cramertj committed Aug 6, 2019
1 parent ab7743d commit 22e2942
Show file tree
Hide file tree
Showing 46 changed files with 181 additions and 65 deletions.
8 changes: 4 additions & 4 deletions futures-core/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ pub type BoxFuture<'a, T> = Pin<alloc::boxed::Box<dyn Future<Output = T> + Send
#[cfg(feature = "alloc")]
pub type LocalBoxFuture<'a, T> = Pin<alloc::boxed::Box<dyn Future<Output = T> + 'a>>;

/// A `Future` or `TryFuture` which tracks whether or not the underlying future
/// A future which tracks whether or not the underlying future
/// should no longer be polled.
///
/// `is_terminated` will return `true` if a future should no longer be polled.
/// Usually, this state occurs after `poll` (or `try_poll`) returned
/// `Poll::Ready`. However, `is_terminated` may also return `true` if a future
/// has become inactive and can no longer make progress and should be ignored
/// or dropped rather than being `poll`ed again.
pub trait FusedFuture {
pub trait FusedFuture: Future {
/// Returns `true` if the underlying future should no longer be polled.
fn is_terminated(&self) -> bool;
}

impl<F: FusedFuture + ?Sized> FusedFuture for &mut F {
impl<F: FusedFuture + ?Sized + Unpin> FusedFuture for &mut F {
fn is_terminated(&self) -> bool {
<F as FusedFuture>::is_terminated(&**self)
}
Expand Down Expand Up @@ -92,7 +92,7 @@ mod if_alloc {
use alloc::boxed::Box;
use super::*;

impl<F: FusedFuture + ?Sized> FusedFuture for Box<F> {
impl<F: FusedFuture + ?Sized + Unpin> FusedFuture for Box<F> {
fn is_terminated(&self) -> bool {
<F as FusedFuture>::is_terminated(&**self)
}
Expand Down
8 changes: 4 additions & 4 deletions futures-core/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,20 @@ where
}
}

/// A `Stream` or `TryStream` which tracks whether or not the underlying stream
/// A stream which tracks whether or not the underlying stream
/// should no longer be polled.
///
/// `is_terminated` will return `true` if a future should no longer be polled.
/// Usually, this state occurs after `poll_next` (or `try_poll_next`) returned
/// `Poll::Ready(None)`. However, `is_terminated` may also return `true` if a
/// stream has become inactive and can no longer make progress and should be
/// ignored or dropped rather than being polled again.
pub trait FusedStream {
pub trait FusedStream: Stream {
/// Returns `true` if the stream should no longer be polled.
fn is_terminated(&self) -> bool;
}

impl<F: ?Sized + FusedStream> FusedStream for &mut F {
impl<F: ?Sized + FusedStream + Unpin> FusedStream for &mut F {
fn is_terminated(&self) -> bool {
<F as FusedStream>::is_terminated(&**self)
}
Expand Down Expand Up @@ -194,7 +194,7 @@ mod if_alloc {
}
}

impl<S: ?Sized + FusedStream> FusedStream for Box<S> {
impl<S: ?Sized + FusedStream + Unpin> FusedStream for Box<S> {
fn is_terminated(&self) -> bool {
<S as FusedStream>::is_terminated(&**self)
}
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/future/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ where
impl<A, B> FusedFuture for Either<A, B>
where
A: FusedFuture,
B: FusedFuture,
B: FusedFuture<Output = A::Output>,
{
fn is_terminated(&self) -> bool {
match self {
Expand Down Expand Up @@ -99,7 +99,7 @@ where
impl<A, B> FusedStream for Either<A, B>
where
A: FusedStream,
B: FusedStream,
B: FusedStream<Item = A::Item>,
{
fn is_terminated(&self) -> bool {
match self {
Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/future/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ impl<Fut: Future, F: FnOnce(&Fut::Output)> Inspect<Fut, F> {

impl<Fut: Future + Unpin, F> Unpin for Inspect<Fut, F> {}

impl<Fut: Future + FusedFuture, F> FusedFuture for Inspect<Fut, F> {
impl<Fut, F> FusedFuture for Inspect<Fut, F>
where Fut: FusedFuture,
F: FnOnce(&Fut::Output),
{
fn is_terminated(&self) -> bool { self.future.is_terminated() }
}

Expand Down
6 changes: 4 additions & 2 deletions futures-util/src/future/lazy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ pub fn lazy<F, R>(f: F) -> Lazy<F>
Lazy { f: Some(f) }
}

impl<F> FusedFuture for Lazy<F> {
impl<F, R> FusedFuture for Lazy<F>
where F: FnOnce(&mut Context<'_>) -> R,
{
fn is_terminated(&self) -> bool { self.f.is_none() }
}

impl<R, F> Future for Lazy<F>
impl<F, R> Future for Lazy<F>
where F: FnOnce(&mut Context<'_>) -> R,
{
type Output = R;
Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/future/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ impl<Fut, F> Map<Fut, F> {

impl<Fut: Unpin, F> Unpin for Map<Fut, F> {}

impl<Fut, F> FusedFuture for Map<Fut, F> {
impl<Fut, F, T> FusedFuture for Map<Fut, F>
where Fut: Future,
F: FnOnce(Fut::Output) -> T,
{
fn is_terminated(&self) -> bool { self.f.is_none() }
}

Expand Down
9 changes: 7 additions & 2 deletions futures-util/src/future/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,19 @@ where
}
}

impl<Fut: Future> FusedFuture for Shared<Fut> {
impl<Fut> FusedFuture for Shared<Fut>
where
Fut: Future,
Fut::Output: Clone,
{
fn is_terminated(&self) -> bool {
self.inner.is_none()
}
}

impl<Fut: Future> Future for Shared<Fut>
impl<Fut> Future for Shared<Fut>
where
Fut: Future,
Fut::Output: Clone,
{
type Output = Fut::Output;
Expand Down
6 changes: 5 additions & 1 deletion futures-util/src/future/then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ impl<Fut1, Fut2, F> Then<Fut1, Fut2, F>
}
}

impl<Fut1, Fut2, F> FusedFuture for Then<Fut1, Fut2, F> {
impl<Fut1, Fut2, F> FusedFuture for Then<Fut1, Fut2, F>
where Fut1: Future,
Fut2: Future,
F: FnOnce(Fut1::Output) -> Fut2,
{
fn is_terminated(&self) -> bool { self.chain.is_terminated() }
}

Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/stream/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ where St1: Stream,
}
}

impl<St1, St2: FusedStream> FusedStream for Chain<St1, St2> {
impl<St1, St2> FusedStream for Chain<St1, St2>
where St1: Stream,
St2: FusedStream<Item=St1::Item>,
{
fn is_terminated(&self) -> bool {
self.first.is_none() && self.second.is_terminated()
}
Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/stream/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ impl<St: Stream, C: Default> Collect<St, C> {
}
}

impl<St: FusedStream, C> FusedFuture for Collect<St, C> {
impl<St, C> FusedFuture for Collect<St, C>
where St: FusedStream,
C: Default + Extend<St:: Item>
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
Expand Down
3 changes: 2 additions & 1 deletion futures-util/src/stream/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ where

impl<St> FusedStream for Flatten<St>
where
St: Stream + FusedStream,
St: FusedStream,
St::Item: Stream,
{
fn is_terminated(&self) -> bool {
self.next.is_none() && self.stream.is_terminated()
Expand Down
6 changes: 5 additions & 1 deletion futures-util/src/stream/fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ where St: Stream,
}
}

impl<St, Fut, T, F> FusedFuture for Fold<St, Fut, T, F> {
impl<St, Fut, T, F> FusedFuture for Fold<St, Fut, T, F>
where St: Stream,
F: FnMut(T, St::Item) -> Fut,
Fut: Future<Output = T>,
{
fn is_terminated(&self) -> bool {
self.accum.is_none() && self.future.is_none()
}
Expand Down
6 changes: 5 additions & 1 deletion futures-util/src/stream/for_each.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ where St: Stream,
}
}

impl<St: FusedStream, Fut, F> FusedFuture for ForEach<St, Fut, F> {
impl<St, Fut, F> FusedFuture for ForEach<St, Fut, F>
where St: FusedStream,
F: FnMut(St::Item) -> Fut,
Fut: Future<Output = ()>,
{
fn is_terminated(&self) -> bool {
self.future.is_none() && self.stream.is_terminated()
}
Expand Down
6 changes: 5 additions & 1 deletion futures-util/src/stream/for_each_concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ where St: Stream,
}
}

impl<St, Fut, F> FusedFuture for ForEachConcurrent<St, Fut, F> {
impl<St, Fut, F> FusedFuture for ForEachConcurrent<St, Fut, F>
where St: Stream,
F: FnMut(St::Item) -> Fut,
Fut: Future<Output = ()>,
{
fn is_terminated(&self) -> bool {
self.stream.is_none() && self.futures.is_empty()
}
Expand Down
6 changes: 5 additions & 1 deletion futures-util/src/stream/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ where
}
}

impl<St: TryStream, Si: Sink<St::Ok> + Unpin> FusedFuture for Forward<St, Si> {
impl<St, Si, Item, E> FusedFuture for Forward<St, Si>
where
Si: Sink<Item, Error = E>,
St: Stream<Item = Result<Item, E>>,
{
fn is_terminated(&self) -> bool {
self.sink.is_none()
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl<St> Fuse<St> {
}
}

impl<S> FusedStream for Fuse<S> {
impl<S: Stream> FusedStream for Fuse<S> {
fn is_terminated(&self) -> bool {
self.done
}
Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/stream/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ impl<St, F> Inspect<St, F>
}
}

impl<St: Stream + FusedStream, F> FusedStream for Inspect<St, F> {
impl<St, F> FusedStream for Inspect<St, F>
where St: FusedStream,
F: FnMut(&St::Item),
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/into_future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl<St: Stream + Unpin> StreamFuture<St> {
}
}

impl<St> FusedFuture for StreamFuture<St> {
impl<St: Stream + Unpin> FusedFuture for StreamFuture<St> {
fn is_terminated(&self) -> bool {
self.stream.is_none()
}
Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/stream/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ impl<St, T, F> Map<St, F>
}
}

impl<St: FusedStream, F> FusedStream for Map<St, F> {
impl<St, F, T> FusedStream for Map<St, F>
where St: FusedStream,
F: FnMut(St::Item) -> T,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/next.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> {
}
}

impl<St: ?Sized + FusedStream> FusedFuture for Next<'_, St> {
impl<St: ?Sized + FusedStream + Unpin> FusedFuture for Next<'_, St> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/stream/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ impl<St1, St2> Select<St1, St2> {
}
}

impl<St1, St2> FusedStream for Select<St1, St2> {
impl<St1, St2> FusedStream for Select<St1, St2>
where St1: Stream,
St2: Stream<Item = St1::Item>
{
fn is_terminated(&self) -> bool {
self.stream1.is_terminated() && self.stream2.is_terminated()
}
Expand Down
6 changes: 3 additions & 3 deletions futures-util/src/stream/select_next_some.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::pin::Pin;
use futures_core::stream::{Stream, FusedStream};
use futures_core::stream::FusedStream;
use futures_core::future::{Future, FusedFuture};
use futures_core::task::{Context, Poll};
use crate::stream::StreamExt;
Expand All @@ -18,13 +18,13 @@ impl<'a, St: ?Sized> SelectNextSome<'a, St> {
}
}

impl<St: ?Sized + FusedStream> FusedFuture for SelectNextSome<'_, St> {
impl<St: ?Sized + FusedStream + Unpin> FusedFuture for SelectNextSome<'_, St> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}

impl<St: ?Sized + Stream + FusedStream + Unpin> Future for SelectNextSome<'_, St> {
impl<St: ?Sized + FusedStream + Unpin> Future for SelectNextSome<'_, St> {
type Output = St::Item;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand Down
6 changes: 5 additions & 1 deletion futures-util/src/stream/skip_while.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ impl<St, Fut, F> SkipWhile<St, Fut, F>
}
}

impl<St: Stream + FusedStream, Fut, F> FusedStream for SkipWhile<St, Fut, F> {
impl<St, Fut, F> FusedStream for SkipWhile<St, Fut, F>
where St: FusedStream,
F: FnMut(&St::Item) -> Fut,
Fut: Future<Output = bool>,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
Expand Down
6 changes: 5 additions & 1 deletion futures-util/src/stream/then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ impl<St, Fut, F> Then<St, Fut, F>
}
}

impl<St: FusedStream, Fut, F> FusedStream for Then<St, Fut, F> {
impl<St, Fut, F> FusedStream for Then<St, Fut, F>
where St: FusedStream,
F: FnMut(St::Item) -> Fut,
Fut: Future,
{
fn is_terminated(&self) -> bool {
self.future.is_none() && self.stream.is_terminated()
}
Expand Down
17 changes: 10 additions & 7 deletions futures-util/src/stream/unfold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ use pin_utils::{unsafe_pinned, unsafe_unpinned};
/// assert_eq!(result, vec![0, 2, 4]);
/// # });
/// ```
pub fn unfold<T, F, Fut, It>(init: T, f: F) -> Unfold<T, F, Fut>
pub fn unfold<T, F, Fut, Item>(init: T, f: F) -> Unfold<T, F, Fut>
where F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(It, T)>>,
Fut: Future<Output = Option<(Item, T)>>,
{
Unfold {
f,
Expand Down Expand Up @@ -90,22 +90,25 @@ impl<T, F, Fut> Unfold<T, F, Fut> {
unsafe_pinned!(fut: Option<Fut>);
}

impl<T, F, Fut> FusedStream for Unfold<T, F, Fut> {
impl<T, F, Fut, Item> FusedStream for Unfold<T, F, Fut>
where F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(Item, T)>>,
{
fn is_terminated(&self) -> bool {
self.state.is_none() && self.fut.is_none()
}
}

impl<T, F, Fut, It> Stream for Unfold<T, F, Fut>
impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
where F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(It, T)>>,
Fut: Future<Output = Option<(Item, T)>>,
{
type Item = It;
type Item = Item;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<It>> {
) -> Poll<Option<Self::Item>> {
if let Some(state) = self.as_mut().state().take() {
let fut = (self.as_mut().f())(state);
self.as_mut().fut().set(Some(fut));
Expand Down
Loading

0 comments on commit 22e2942

Please sign in to comment.