Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Rename future::empty to pending and add stream::pending #1689

Merged
merged 4 commits into from
Jun 27, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions futures-executor/src/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,14 @@ impl LocalPool {
/// ```
/// use futures::executor::LocalPool;
/// use futures::task::LocalSpawnExt;
/// use futures::future::{ready, empty};
/// use futures::future::{ready, pending};
///
/// let mut pool = LocalPool::new();
/// let mut spawner = pool.spawner();
///
/// spawner.spawn_local(ready(())).unwrap();
/// spawner.spawn_local(ready(())).unwrap();
/// spawner.spawn_local(empty()).unwrap();
/// spawner.spawn_local(pending()).unwrap();
///
/// // Run the two ready tasks and return true for them.
/// pool.try_run_one(); // returns true after completing one of the ready futures
Expand Down Expand Up @@ -210,14 +210,14 @@ impl LocalPool {
/// ```
/// use futures::executor::LocalPool;
/// use futures::task::LocalSpawnExt;
/// use futures::future::{ready, empty};
/// use futures::future::{ready, pending};
///
/// let mut pool = LocalPool::new();
/// let mut spawner = pool.spawner();
///
/// spawner.spawn_local(ready(())).unwrap();
/// spawner.spawn_local(ready(())).unwrap();
/// spawner.spawn_local(empty()).unwrap();
/// spawner.spawn_local(pending()).unwrap();
///
/// // Runs the two ready task and returns.
/// // The empty task remains in the pool.
Expand Down
6 changes: 3 additions & 3 deletions futures-test/src/future/assert_unmoved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl<Fut> Drop for AssertUnmoved<Fut> {
mod tests {
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_util::future::empty;
use futures_util::future::pending;
use futures_util::task::noop_waker;
use std::pin::Pin;

Expand All @@ -72,7 +72,7 @@ mod tests {
#[test]
fn dont_panic_when_not_polled() {
// This shouldn't panic.
let future = AssertUnmoved::new(empty::<()>());
let future = AssertUnmoved::new(pending::<()>());
drop(future);
}

Expand All @@ -84,7 +84,7 @@ mod tests {
let mut cx = Context::from_waker(&waker);

// First we allocate the future on the stack and poll it.
let mut future = AssertUnmoved::new(empty::<()>());
let mut future = AssertUnmoved::new(pending::<()>());
let pinned_future = unsafe { Pin::new_unchecked(&mut future) };
assert_eq!(pinned_future.poll(&mut cx), Poll::Pending);

Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/async_await/select_mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ macro_rules! document_select_macro {
/// use futures::future;
/// use futures::select;
/// let mut a = future::ready(4);
/// let mut b = future::empty::<()>();
/// let mut b = future::pending::<()>();
///
/// let res = select! {
/// a_res = a => a_res + 1,
Expand All @@ -49,7 +49,7 @@ macro_rules! document_select_macro {
/// use futures::stream::{self, StreamExt};
/// use futures::select;
/// let mut st = stream::iter(vec![2]).fuse();
/// let mut fut = future::empty::<()>();
/// let mut fut = future::pending::<()>();
///
/// select! {
/// x = st.next() => assert_eq!(Some(2), x),
Expand Down
6 changes: 3 additions & 3 deletions futures-util/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use futures_core::future::{BoxFuture, LocalBoxFuture};
pub use futures_core::future::FusedFuture;

// Primitive futures
mod empty;
pub use self::empty::{empty, Empty};

mod lazy;
pub use self::lazy::{lazy, Lazy};

mod pending;
pub use self::pending::{pending, Pending};

mod maybe_done;
pub use self::maybe_done::{maybe_done, MaybeDone};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use core::marker;
use core::pin::Pin;
use futures_core::future::{Future, FusedFuture};
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll};

/// Future for the [`empty`] function.
/// Future for the [`pending`] function.
Marwes marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Empty<T> {
pub struct Pending<T> {
_data: marker::PhantomData<T>,
}

impl<T> FusedFuture for Empty<T> {
fn is_terminated(&self) -> bool { false }
impl<T> FusedFuture for Pending<T> {
fn is_terminated(&self) -> bool {
false
}
}

/// Creates a future which never resolves, representing a computation that never
Expand All @@ -26,16 +28,18 @@ impl<T> FusedFuture for Empty<T> {
/// # futures::executor::block_on(async {
/// use futures::future;
///
/// let future = future::empty();
/// let future = future::pending();
/// let () = future.await;
/// unreachable!();
/// # });
/// ```
pub fn empty<T>() -> Empty<T> {
Empty { _data: marker::PhantomData }
pub fn pending<T>() -> Pending<T> {
Pending {
_data: marker::PhantomData,
}
}

impl<T> Future for Empty<T> {
impl<T> Future for Pending<T> {
type Output = T;

fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<T> {
Expand Down
29 changes: 29 additions & 0 deletions futures-util/src/stream/pending.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use core::marker;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This module does not seem to be defined...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦‍♂️ Fixed...

use core::pin::Pin;

use futures_core::{Stream, Poll};
use futures_core::task;

/// A stream which never returns any elements.
///
/// This stream can be created with the `stream::pending` function.
Marwes marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Pending<T> {
_data: marker::PhantomData<T>,
}

/// Creates a stream which never returns any elements.
///
/// The returned stream will always return `Pending` when polled.
pub fn pending<T>() -> Pending<T> {
Pending { _data: marker::PhantomData }
}

impl<T> Stream for Pending<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Pending
}
}
2 changes: 1 addition & 1 deletion futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,9 @@ pub mod future {
pub use futures_core::future::BoxFuture;

pub use futures_util::future::{
empty, Empty,
lazy, Lazy,
maybe_done, MaybeDone,
pending, Pending,
poll_fn, PollFn,
ready, ok, err, Ready,
select, Select,
Expand Down
38 changes: 26 additions & 12 deletions futures/tests/futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use futures::executor::{block_on, block_on_stream};
use futures::future::{self, join, Future, FutureExt};
use futures::stream::{FusedStream, FuturesUnordered, StreamExt};
use futures::task::Poll;
use futures_test::{assert_stream_done, assert_stream_next};
use futures_test::future::FutureTestExt;
use futures_test::task::noop_context;
use futures_test::{assert_stream_done, assert_stream_next};

#[test]
fn is_terminated() {
Expand Down Expand Up @@ -40,7 +40,11 @@ fn works_1() {
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();

let mut iter = block_on_stream(vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>());
let mut iter = block_on_stream(
vec![a_rx, b_rx, c_rx]
.into_iter()
.collect::<FuturesUnordered<_>>(),
);

b_tx.send(99).unwrap();
assert_eq!(Some(Ok(99)), iter.next());
Expand All @@ -61,7 +65,9 @@ fn works_2() {
let mut stream = vec![
a_rx.boxed(),
join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(),
].into_iter().collect::<FuturesUnordered<_>>();
]
.into_iter()
.collect::<FuturesUnordered<_>>();

a_tx.send(9).unwrap();
b_tx.send(10).unwrap();
Expand All @@ -78,10 +84,12 @@ fn from_iterator() {
let stream = vec![
future::ready::<i32>(1),
future::ready::<i32>(2),
future::ready::<i32>(3)
].into_iter().collect::<FuturesUnordered<_>>();
future::ready::<i32>(3),
]
.into_iter()
.collect::<FuturesUnordered<_>>();
assert_eq!(stream.len(), 3);
assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1,2,3]);
assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
}

#[test]
Expand All @@ -93,7 +101,9 @@ fn finished_future() {
let mut stream = vec![
Box::new(a_rx) as Box<dyn Future<Output = Result<_, _>> + Unpin>,
Box::new(future::select(b_rx, c_rx).map(|e| e.factor_first().0)) as _,
].into_iter().collect::<FuturesUnordered<_>>();
]
.into_iter()
.collect::<FuturesUnordered<_>>();

let cx = &mut noop_context();
for _ in 0..10 {
Expand All @@ -113,7 +123,9 @@ fn iter_mut_cancel() {
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();

let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>();
let mut stream = vec![a_rx, b_rx, c_rx]
.into_iter()
.collect::<FuturesUnordered<_>>();

for rx in stream.iter_mut() {
rx.close();
Expand All @@ -134,10 +146,12 @@ fn iter_mut_cancel() {
#[test]
fn iter_mut_len() {
let mut stream = vec![
future::empty::<()>(),
future::empty::<()>(),
future::empty::<()>()
].into_iter().collect::<FuturesUnordered<_>>();
future::pending::<()>(),
future::pending::<()>(),
future::pending::<()>(),
]
.into_iter()
.collect::<FuturesUnordered<_>>();

let mut iter_mut = stream.iter_mut();
assert_eq!(iter_mut.len(), 3);
Expand Down
4 changes: 2 additions & 2 deletions futures/tests/unfold.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use futures::future;
use futures::stream;

use futures_test::future::FutureTestExt;
use futures_test::{
assert_stream_pending, assert_stream_next, assert_stream_done,
assert_stream_done, assert_stream_next, assert_stream_pending,
};
use futures_test::future::FutureTestExt;

#[test]
fn unfold1() {
Expand Down