Skip to content

Commit

Permalink
feat: Rename future::empty to pending and add stream:Lpending
Browse files Browse the repository at this point in the history
Fixes the inconsistency between `future::empty` and `stream::empty`.
I went with `pending` over `never` since there are already a future
called `Never` in the library.

Closes rust-lang#1624
  • Loading branch information
Markus Westerlind committed Jun 27, 2019
1 parent d9ced4e commit 2e1aca4
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 36 deletions.
8 changes: 4 additions & 4 deletions futures-executor/src/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,14 @@ impl LocalPool {
/// ```
/// use futures::executor::LocalPool;
/// use futures::task::LocalSpawnExt;
/// use futures::future::{ready, empty};
/// use futures::future::{ready, pending};
///
/// let mut pool = LocalPool::new();
/// let mut spawner = pool.spawner();
///
/// spawner.spawn_local(ready(())).unwrap();
/// spawner.spawn_local(ready(())).unwrap();
/// spawner.spawn_local(empty()).unwrap();
/// spawner.spawn_local(pending()).unwrap();
///
/// // Run the two ready tasks and return true for them.
/// pool.try_run_one(); // returns true after completing one of the ready futures
Expand Down Expand Up @@ -210,14 +210,14 @@ impl LocalPool {
/// ```
/// use futures::executor::LocalPool;
/// use futures::task::LocalSpawnExt;
/// use futures::future::{ready, empty};
/// use futures::future::{ready, pending};
///
/// let mut pool = LocalPool::new();
/// let mut spawner = pool.spawner();
///
/// spawner.spawn_local(ready(())).unwrap();
/// spawner.spawn_local(ready(())).unwrap();
/// spawner.spawn_local(empty()).unwrap();
/// spawner.spawn_local(pending()).unwrap();
///
/// // Runs the two ready task and returns.
/// // The empty task remains in the pool.
Expand Down
6 changes: 3 additions & 3 deletions futures-test/src/future/assert_unmoved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl<Fut> Drop for AssertUnmoved<Fut> {
mod tests {
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_util::future::empty;
use futures_util::future::pending;
use futures_util::task::noop_waker;
use std::pin::Pin;

Expand All @@ -72,7 +72,7 @@ mod tests {
#[test]
fn dont_panic_when_not_polled() {
// This shouldn't panic.
let future = AssertUnmoved::new(empty::<()>());
let future = AssertUnmoved::new(pending::<()>());
drop(future);
}

Expand All @@ -84,7 +84,7 @@ mod tests {
let mut cx = Context::from_waker(&waker);

// First we allocate the future on the stack and poll it.
let mut future = AssertUnmoved::new(empty::<()>());
let mut future = AssertUnmoved::new(pending::<()>());
let pinned_future = unsafe { Pin::new_unchecked(&mut future) };
assert_eq!(pinned_future.poll(&mut cx), Poll::Pending);

Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/async_await/select_mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ macro_rules! document_select_macro {
/// use futures::future;
/// use futures::select;
/// let mut a = future::ready(4);
/// let mut b = future::empty::<()>();
/// let mut b = future::pending::<()>();
///
/// let res = select! {
/// a_res = a => a_res + 1,
Expand All @@ -49,7 +49,7 @@ macro_rules! document_select_macro {
/// use futures::stream::{self, StreamExt};
/// use futures::select;
/// let mut st = stream::iter(vec![2]).fuse();
/// let mut fut = future::empty::<()>();
/// let mut fut = future::pending::<()>();
///
/// select! {
/// x = st.next() => assert_eq!(Some(2), x),
Expand Down
6 changes: 3 additions & 3 deletions futures-util/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use futures_core::future::{BoxFuture, LocalBoxFuture};
pub use futures_core::future::FusedFuture;

// Primitive futures
mod empty;
pub use self::empty::{empty, Empty};

mod lazy;
pub use self::lazy::{lazy, Lazy};

mod pending;
pub use self::pending::{pending, Pending};

mod maybe_done;
pub use self::maybe_done::{maybe_done, MaybeDone};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use core::marker;
use core::pin::Pin;
use futures_core::future::{Future, FusedFuture};
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll};

/// Future for the [`empty`] function.
/// Future for the [`pending`] function.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Empty<T> {
pub struct Pending<T> {
_data: marker::PhantomData<T>,
}

impl<T> FusedFuture for Empty<T> {
fn is_terminated(&self) -> bool { false }
impl<T> FusedFuture for Pending<T> {
fn is_terminated(&self) -> bool {
false
}
}

/// Creates a future which never resolves, representing a computation that never
Expand All @@ -26,16 +28,18 @@ impl<T> FusedFuture for Empty<T> {
/// # futures::executor::block_on(async {
/// use futures::future;
///
/// let future = future::empty();
/// let future = future::pending();
/// let () = future.await;
/// unreachable!();
/// # });
/// ```
pub fn empty<T>() -> Empty<T> {
Empty { _data: marker::PhantomData }
pub fn pending<T>() -> Pending<T> {
Pending {
_data: marker::PhantomData,
}
}

impl<T> Future for Empty<T> {
impl<T> Future for Pending<T> {
type Output = T;

fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<T> {
Expand Down
29 changes: 29 additions & 0 deletions futures-util/src/stream/pending.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use core::marker;
use core::pin::Pin;

use futures_core::{Stream, Poll};
use futures_core::task;

/// A stream which never returns any elements.
///
/// This stream can be created with the `stream::pending` function.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Pending<T> {
_data: marker::PhantomData<T>,
}

/// Creates a stream which never returns any elements.
///
/// The returned stream will always return `Pending` when polled.
pub fn pending<T>() -> Pending<T> {
Pending { _data: marker::PhantomData }
}

impl<T> Stream for Pending<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Pending
}
}
2 changes: 1 addition & 1 deletion futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,9 @@ pub mod future {
pub use futures_core::future::BoxFuture;

pub use futures_util::future::{
empty, Empty,
lazy, Lazy,
maybe_done, MaybeDone,
pending, Pending,
poll_fn, PollFn,
ready, ok, err, Ready,
select, Select,
Expand Down
38 changes: 26 additions & 12 deletions futures/tests/futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use futures::executor::{block_on, block_on_stream};
use futures::future::{self, join, Future, FutureExt};
use futures::stream::{FusedStream, FuturesUnordered, StreamExt};
use futures::task::Poll;
use futures_test::{assert_stream_done, assert_stream_next};
use futures_test::future::FutureTestExt;
use futures_test::task::noop_context;
use futures_test::{assert_stream_done, assert_stream_next};

#[test]
fn is_terminated() {
Expand Down Expand Up @@ -40,7 +40,11 @@ fn works_1() {
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();

let mut iter = block_on_stream(vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>());
let mut iter = block_on_stream(
vec![a_rx, b_rx, c_rx]
.into_iter()
.collect::<FuturesUnordered<_>>(),
);

b_tx.send(99).unwrap();
assert_eq!(Some(Ok(99)), iter.next());
Expand All @@ -61,7 +65,9 @@ fn works_2() {
let mut stream = vec![
a_rx.boxed(),
join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(),
].into_iter().collect::<FuturesUnordered<_>>();
]
.into_iter()
.collect::<FuturesUnordered<_>>();

a_tx.send(9).unwrap();
b_tx.send(10).unwrap();
Expand All @@ -78,10 +84,12 @@ fn from_iterator() {
let stream = vec![
future::ready::<i32>(1),
future::ready::<i32>(2),
future::ready::<i32>(3)
].into_iter().collect::<FuturesUnordered<_>>();
future::ready::<i32>(3),
]
.into_iter()
.collect::<FuturesUnordered<_>>();
assert_eq!(stream.len(), 3);
assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1,2,3]);
assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
}

#[test]
Expand All @@ -93,7 +101,9 @@ fn finished_future() {
let mut stream = vec![
Box::new(a_rx) as Box<dyn Future<Output = Result<_, _>> + Unpin>,
Box::new(future::select(b_rx, c_rx).map(|e| e.factor_first().0)) as _,
].into_iter().collect::<FuturesUnordered<_>>();
]
.into_iter()
.collect::<FuturesUnordered<_>>();

let cx = &mut noop_context();
for _ in 0..10 {
Expand All @@ -113,7 +123,9 @@ fn iter_mut_cancel() {
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();

let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>();
let mut stream = vec![a_rx, b_rx, c_rx]
.into_iter()
.collect::<FuturesUnordered<_>>();

for rx in stream.iter_mut() {
rx.close();
Expand All @@ -134,10 +146,12 @@ fn iter_mut_cancel() {
#[test]
fn iter_mut_len() {
let mut stream = vec![
future::empty::<()>(),
future::empty::<()>(),
future::empty::<()>()
].into_iter().collect::<FuturesUnordered<_>>();
future::pending::<()>(),
future::pending::<()>(),
future::pending::<()>(),
]
.into_iter()
.collect::<FuturesUnordered<_>>();

let mut iter_mut = stream.iter_mut();
assert_eq!(iter_mut.len(), 3);
Expand Down
4 changes: 2 additions & 2 deletions futures/tests/unfold.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use futures::future;
use futures::stream;

use futures_test::future::FutureTestExt;
use futures_test::{
assert_stream_pending, assert_stream_next, assert_stream_done,
assert_stream_done, assert_stream_next, assert_stream_pending,
};
use futures_test::future::FutureTestExt;

#[test]
fn unfold1() {
Expand Down

0 comments on commit 2e1aca4

Please sign in to comment.