From 0473cda9c4d6260d50e99009e1628fd7f97e4a41 Mon Sep 17 00:00:00 2001 From: Wim Looman Date: Fri, 3 Aug 2018 19:18:06 +0200 Subject: [PATCH] Initial testing utilities crate --- .travis.yml | 2 + Cargo.toml | 1 + futures-test/Cargo.toml | 26 ++++ futures-test/LICENSE-APACHE | 1 + futures-test/LICENSE-MIT | 1 + futures-test/src/assert.rs | 125 ++++++++++++++++++ futures-test/src/future/delay.rs | 44 ++++++ futures-test/src/future/mod.rs | 65 +++++++++ futures-test/src/lib.rs | 31 +++++ futures-test/src/task/context.rs | 65 +++++++++ futures-test/src/task/mod.rs | 20 +++ futures-test/src/task/spawn/mod.rs | 11 ++ futures-test/src/task/spawn/noop.rs | 55 ++++++++ futures-test/src/task/spawn/panic.rs | 55 ++++++++ futures-test/src/task/spawn/record.rs | 57 ++++++++ futures-test/src/task/wake/counter.rs | 58 ++++++++ futures-test/src/task/wake/mod.rs | 11 ++ futures-test/src/task/wake/noop.rs | 76 +++++++++++ futures-test/src/task/wake/panic.rs | 80 +++++++++++ futures/Cargo.toml | 1 + futures/tests/abortable.rs | 24 ++-- futures/tests/basic_combinators.rs | 4 +- futures/tests/eager_drop.rs | 4 +- futures/tests/fuse.rs | 10 +- futures/tests/futures_ordered.rs | 24 ++-- futures/tests/futures_unordered.rs | 15 +-- futures/tests/oneshot.rs | 4 +- futures/tests/support/assert.rs | 38 ------ .../tests/support/counter_waker_context.rs | 33 ----- futures/tests/support/delayed.rs | 35 ----- futures/tests/support/mod.rs | 44 ------ futures/tests/support/noop_waker_context.rs | 19 --- futures/tests/support/panic_executor.rs | 10 -- futures/tests/support/panic_waker_context.rs | 21 --- futures/tests/support/run_in_background.rs | 16 --- futures/tests/unfold.rs | 31 +++-- 36 files changed, 835 insertions(+), 282 deletions(-) create mode 100644 futures-test/Cargo.toml create mode 120000 futures-test/LICENSE-APACHE create mode 120000 futures-test/LICENSE-MIT create mode 100644 futures-test/src/assert.rs create mode 100644 futures-test/src/future/delay.rs create mode 100644 futures-test/src/future/mod.rs create mode 100644 futures-test/src/lib.rs create mode 100644 futures-test/src/task/context.rs create mode 100644 futures-test/src/task/mod.rs create mode 100644 futures-test/src/task/spawn/mod.rs create mode 100644 futures-test/src/task/spawn/noop.rs create mode 100644 futures-test/src/task/spawn/panic.rs create mode 100644 futures-test/src/task/spawn/record.rs create mode 100644 futures-test/src/task/wake/counter.rs create mode 100644 futures-test/src/task/wake/mod.rs create mode 100644 futures-test/src/task/wake/noop.rs create mode 100644 futures-test/src/task/wake/panic.rs delete mode 100644 futures/tests/support/assert.rs delete mode 100644 futures/tests/support/counter_waker_context.rs delete mode 100644 futures/tests/support/delayed.rs delete mode 100644 futures/tests/support/mod.rs delete mode 100644 futures/tests/support/noop_waker_context.rs delete mode 100644 futures/tests/support/panic_executor.rs delete mode 100644 futures/tests/support/panic_waker_context.rs delete mode 100644 futures/tests/support/run_in_background.rs diff --git a/.travis.yml b/.travis.yml index 68289e3aab..edb1a91433 100644 --- a/.travis.yml +++ b/.travis.yml @@ -52,6 +52,7 @@ matrix: - cargo build --manifest-path futures-io/Cargo.toml --all-features - cargo build --manifest-path futures-sink/Cargo.toml --all-features - cargo build --manifest-path futures-util/Cargo.toml --all-features + - cargo build --manifest-path futures-test/Cargo.toml --all-features - name: cargo build --all-features (with minimal versions) rust: nightly @@ -81,6 +82,7 @@ matrix: - RUSTDOCFLAGS=-Dwarnings cargo doc --all --exclude futures-preview --exclude futures-executor-preview + --exclude futures-test-preview - cargo doc script: diff --git a/Cargo.toml b/Cargo.toml index fa26b657f8..f02526a563 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,4 +7,5 @@ members = [ "futures-io", "futures-sink", "futures-util", + "futures-test", ] diff --git a/futures-test/Cargo.toml b/futures-test/Cargo.toml new file mode 100644 index 0000000000..e108573e86 --- /dev/null +++ b/futures-test/Cargo.toml @@ -0,0 +1,26 @@ +cargo-features = ["edition"] + +[package] +name = "futures-test-preview" +edition = "2018" +version = "0.3.0-alpha.3" +authors = [] +license = "MIT OR Apache-2.0" +repository = "https://github.com/rust-lang-nursery/futures-rs" +homepage = "https://rust-lang-nursery.github.io/futures-rs" +documentation = "https://rust-lang-nursery.github.io/futures-doc/0.3.0-alpha.3/futures_test" +description = """ +Common utilities for testing components built off futures-rs. +""" + +[lib] +name = "futures_test" + +[dependencies] +futures-core-preview = { version = "0.3.0-alpha.2", path = "../futures-core", default-features = false } +futures-util-preview = { version = "0.3.0-alpha.2", path = "../futures-util", default-features = false } +futures-executor-preview = { version = "0.3.0-alpha.2", path = "../futures-executor", default-features = false } +pin-utils = { version = "0.1.0-alpha.1", default-features = false } + +[dev-dependencies] +futures-preview = { version = "0.3.0-alpha.2", path = "../futures", default-features = false, features = ["std"] } diff --git a/futures-test/LICENSE-APACHE b/futures-test/LICENSE-APACHE new file mode 120000 index 0000000000..965b606f33 --- /dev/null +++ b/futures-test/LICENSE-APACHE @@ -0,0 +1 @@ +../LICENSE-APACHE \ No newline at end of file diff --git a/futures-test/LICENSE-MIT b/futures-test/LICENSE-MIT new file mode 120000 index 0000000000..76219eb72e --- /dev/null +++ b/futures-test/LICENSE-MIT @@ -0,0 +1 @@ +../LICENSE-MIT \ No newline at end of file diff --git a/futures-test/src/assert.rs b/futures-test/src/assert.rs new file mode 100644 index 0000000000..46d9a33b8b --- /dev/null +++ b/futures-test/src/assert.rs @@ -0,0 +1,125 @@ +use futures_core::stream::Stream; +use std::marker::Unpin; + +#[doc(hidden)] +pub fn assert_is_unpin_stream(_: &mut S) {} + +/// Assert that the next poll to the provided stream will return +/// [`Poll::Pending`][futures_core::task::Poll::Pending]. +/// +/// # Examples +/// +/// ``` +/// #![feature(async_await, futures_api, pin)] +/// use futures::stream; +/// use futures_test::future::FutureTestExt; +/// use futures_test::{ +/// assert_stream_pending, assert_stream_next, assert_stream_done, +/// }; +/// use pin_utils::pin_mut; +/// +/// let mut stream = stream::once((async { 5 }).delay()); +/// pin_mut!(stream); +/// +/// assert_stream_pending!(stream); +/// assert_stream_next!(stream, 5); +/// assert_stream_done!(stream); +/// ``` +#[macro_export] +macro_rules! assert_stream_pending { + ($stream:expr) => {{ + let mut stream = &mut $stream; + $crate::assert::assert_is_unpin_stream(stream); + let stream = $crate::std_reexport::mem::PinMut::new(stream); + let cx = &mut $crate::task::no_spawn_context(); + let poll = $crate::futures_core_reexport::stream::Stream::poll_next( + stream, cx, + ); + if poll.is_ready() { + panic!("assertion failed: stream is not pending"); + } + }}; +} + +/// Assert that the next poll to the provided stream will return +/// [`Poll::Ready`][futures_core::task::Poll::Ready] with the provided item. +/// +/// # Examples +/// +/// ``` +/// #![feature(async_await, futures_api, pin)] +/// use futures::stream; +/// use futures_test::future::FutureTestExt; +/// use futures_test::{ +/// assert_stream_pending, assert_stream_next, assert_stream_done, +/// }; +/// use pin_utils::pin_mut; +/// +/// let mut stream = stream::once((async { 5 }).delay()); +/// pin_mut!(stream); +/// +/// assert_stream_pending!(stream); +/// assert_stream_next!(stream, 5); +/// assert_stream_done!(stream); +/// ``` +#[macro_export] +macro_rules! assert_stream_next { + ($stream:expr, $item:expr) => {{ + let mut stream = &mut $stream; + $crate::assert::assert_is_unpin_stream(stream); + let stream = $crate::std_reexport::mem::PinMut::new(stream); + let cx = &mut $crate::task::no_spawn_context(); + match $crate::futures_core_reexport::stream::Stream::poll_next(stream, cx) { + $crate::futures_core_reexport::task::Poll::Ready(Some(x)) => { + assert_eq!(x, $item); + } + $crate::futures_core_reexport::task::Poll::Ready(None) => { + panic!("assertion failed: expected stream to provide item but stream is at its end"); + } + $crate::futures_core_reexport::task::Poll::Pending => { + panic!("assertion failed: expected stream to provide item but stream wasn't ready"); + } + } + }} +} + +/// Assert that the next poll to the provided stream will return an empty +/// [`Poll::Ready`][futures_core::task::Poll::Ready] signalling the +/// completion of the stream. +/// +/// # Examples +/// +/// ``` +/// #![feature(async_await, futures_api, pin)] +/// use futures::stream; +/// use futures_test::future::FutureTestExt; +/// use futures_test::{ +/// assert_stream_pending, assert_stream_next, assert_stream_done, +/// }; +/// use pin_utils::pin_mut; +/// +/// let mut stream = stream::once((async { 5 }).delay()); +/// pin_mut!(stream); +/// +/// assert_stream_pending!(stream); +/// assert_stream_next!(stream, 5); +/// assert_stream_done!(stream); +/// ``` +#[macro_export] +macro_rules! assert_stream_done { + ($stream:expr) => {{ + let mut stream = &mut $stream; + $crate::assert::assert_is_unpin_stream(stream); + let stream = $crate::std_reexport::mem::PinMut::new(stream); + let cx = &mut $crate::task::no_spawn_context(); + match $crate::futures_core_reexport::stream::Stream::poll_next(stream, cx) { + $crate::futures_core_reexport::task::Poll::Ready(Some(_)) => { + panic!("assertion failed: expected stream to be done but had more elements"); + } + $crate::futures_core_reexport::task::Poll::Ready(None) => {} + $crate::futures_core_reexport::task::Poll::Pending => { + panic!("assertion failed: expected stream to be done but was pending"); + } + } + }} +} diff --git a/futures-test/src/future/delay.rs b/futures-test/src/future/delay.rs new file mode 100644 index 0000000000..986b84e072 --- /dev/null +++ b/futures-test/src/future/delay.rs @@ -0,0 +1,44 @@ +use futures_core::future::Future; +use futures_core::task::{self, Poll}; +use std::mem::PinMut; + +/// Combinator that guarantees one [`Poll::Pending`] before polling its inner +/// future. +/// +/// This is created by the [`FutureTestExt::delay`][super::FutureTestExt::delay] +/// method. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct Delayed { + future: Fut, + polled_before: bool, +} + +impl Delayed { + unsafe_pinned!(future: Fut); + unsafe_unpinned!(polled_before: bool); + + pub(super) fn new(future: Fut) -> Self { + Self { + future, + polled_before: false, + } + } +} + +impl Future for Delayed { + type Output = Fut::Output; + + fn poll( + mut self: PinMut, + cx: &mut task::Context, + ) -> Poll { + if *self.polled_before() { + self.future().poll(cx) + } else { + *self.polled_before() = true; + cx.waker().wake(); + Poll::Pending + } + } +} diff --git a/futures-test/src/future/mod.rs b/futures-test/src/future/mod.rs new file mode 100644 index 0000000000..325b07cedf --- /dev/null +++ b/futures-test/src/future/mod.rs @@ -0,0 +1,65 @@ +//! Additional combinators for testing futures. + +mod delay; + +use self::delay::Delayed; +use futures_core::future::Future; +use futures_executor; +use std::thread; + +/// Additional combinators for testing futures. +pub trait FutureTestExt: Future { + /// Introduces one [`Poll::Pending`][futures_core::task::Poll::Pending] + /// before polling the given future + /// + /// # Examples + /// + /// ``` + /// #![feature(async_await, futures_api, pin)] + /// use futures::task::Poll; + /// use futures::future::FutureExt; + /// use futures_test::task; + /// use futures_test::future::FutureTestExt; + /// use pin_utils::pin_mut; + /// + /// let future = (async { 5 }).delay(); + /// pin_mut!(future); + /// + /// let cx = &mut task::no_spawn_context(); + /// + /// assert_eq!(future.poll_unpin(cx), Poll::Pending); + /// assert_eq!(future.poll_unpin(cx), Poll::Ready(5)); + /// ``` + fn delay(self) -> Delayed + where + Self: Sized, + { + delay::Delayed::new(self) + } + + /// Runs this future on a dedicated executor running in a background thread. + /// + /// # Examples + /// + /// ``` + /// #![feature(async_await, futures_api, pin)] + /// use futures::channel::oneshot; + /// use futures::executor::block_on; + /// use futures_test::future::FutureTestExt; + /// + /// let (tx, rx) = oneshot::channel::(); + /// + /// (async { tx.send(5).unwrap() }).run_in_background(); + /// + /// assert_eq!(block_on(rx), Ok(5)); + /// ``` + fn run_in_background(self) + where + Self: Sized + Send + 'static, + Self::Output: Send, + { + thread::spawn(|| futures_executor::block_on(self)); + } +} + +impl FutureTestExt for Fut where Fut: Future {} diff --git a/futures-test/src/lib.rs b/futures-test/src/lib.rs new file mode 100644 index 0000000000..7e9b6f0b59 --- /dev/null +++ b/futures-test/src/lib.rs @@ -0,0 +1,31 @@ +//! Utilities to make testing [`Future`s][futures_core::Future] easier + +#![feature( + arbitrary_self_types, + async_await, + await_macro, + futures_api, + pin, +)] +#![warn(missing_docs, missing_debug_implementations)] +#![deny(bare_trait_objects)] +#![doc( + html_root_url = "https://rust-lang-nursery.github.io/futures-doc/0.3.0-alpha.3/futures_test" +)] + +#[doc(hidden)] +pub use std as std_reexport; + +#[doc(hidden)] +pub extern crate futures_core as futures_core_reexport; + +#[macro_use] +extern crate pin_utils; + +#[macro_use] +#[doc(hidden)] +pub mod assert; + +pub mod task; + +pub mod future; diff --git a/futures-test/src/task/context.rs b/futures-test/src/task/context.rs new file mode 100644 index 0000000000..8c5b35e00c --- /dev/null +++ b/futures-test/src/task/context.rs @@ -0,0 +1,65 @@ +use crate::task::{spawn, wake}; +use futures_core::task::Context; + +/// Create a new [`task::Context`][futures_core::task::Context] where both +/// the [`waker`][futures_core::task::Context::waker] and +/// [`spawner`][futures_core::task::Context::spawner] will both panic if used. +/// +/// # Examples +/// +/// ```should_panic +/// #![feature(futures_api)] +/// use futures_test::task; +/// +/// let cx = task::panic_context(); +/// cx.waker().wake(); // Will panic +/// ``` +pub fn panic_context() -> Context<'static> { + Context::new(wake::Panic::local_waker_ref(), spawn::Panic::spawn_mut()) +} + +/// Create a new [`task::Context`][futures_core::task::Context] where the +/// [`waker`][futures_core::task::Context::waker] will ignore any calls to +/// `wake` while the [`spawner`][futures_core::task::Context::spawner] will +/// panic if used. +/// +/// # Examples +/// +/// ``` +/// #![feature(async_await, futures_api, pin)] +/// use futures::future::Future; +/// use futures::task::Poll; +/// use futures_test::task::no_spawn_context; +/// use pin_utils::pin_mut; +/// +/// let mut future = async { 5 }; +/// pin_mut!(future); +/// +/// assert_eq!(future.poll(&mut no_spawn_context()), Poll::Ready(5)); +/// ``` +pub fn no_spawn_context() -> Context<'static> { + Context::new(wake::Noop::local_waker_ref(), spawn::Panic::spawn_mut()) +} + +/// Create a new [`task::Context`][futures_core::task::Context] where the +/// [`waker`][futures_core::task::Context::waker] and +/// [`spawner`][futures_core::task::Context::spawner] will both ignore any +/// uses. +/// +/// # Examples +/// +/// ``` +/// #![feature(async_await, futures_api, pin)] +/// use futures::future::Future; +/// use futures::task::Poll; +/// use futures_test::task::noop_context; +/// use pin_utils::pin_mut; +/// +/// let mut future = async { 5 }; +/// pin_mut!(future); +/// +/// assert_eq!(future.poll(&mut noop_context()), Poll::Ready(5)); +/// ``` +pub fn noop_context() -> Context<'static> { + Context::new(wake::Noop::local_waker_ref(), spawn::Noop::spawn_mut()) +} diff --git a/futures-test/src/task/mod.rs b/futures-test/src/task/mod.rs new file mode 100644 index 0000000000..7ea04a32d6 --- /dev/null +++ b/futures-test/src/task/mod.rs @@ -0,0 +1,20 @@ +//! Task related utilities. +//! +//! In the majority of use cases you can use the functions exported below to +//! create a [`Context`][futures_core::task::Context] appropriate to use in your +//! tests. +//! +//! For more complex test cases you can take a `Context` from one of these +//! functions and then use the +//! [`Context::with_waker`][futures_core::task::Context::with_waker] and +//! [`Context::with_spawner`][futures_core::task::Context::with_spawner] +//! methods to change the implementations used. See the examples on +//! the provided implementations in [`wake`] and +//! [`spawn`] for more details. + +mod context; + +pub mod spawn; +pub mod wake; + +pub use self::context::{no_spawn_context, noop_context, panic_context}; diff --git a/futures-test/src/task/spawn/mod.rs b/futures-test/src/task/spawn/mod.rs new file mode 100644 index 0000000000..3d801743ae --- /dev/null +++ b/futures-test/src/task/spawn/mod.rs @@ -0,0 +1,11 @@ +//! Implementations of [`Spawn`][futures_core::task::Spawn] with various +//! behaviour for test purposes. + +mod noop; +pub use self::noop::Noop; + +mod panic; +pub use self::panic::Panic; + +mod record; +pub use self::record::Record; diff --git a/futures-test/src/task/spawn/noop.rs b/futures-test/src/task/spawn/noop.rs new file mode 100644 index 0000000000..53787b7080 --- /dev/null +++ b/futures-test/src/task/spawn/noop.rs @@ -0,0 +1,55 @@ +use futures_core::future::FutureObj; +use futures_core::task::{Spawn, SpawnObjError}; +use std::cell::UnsafeCell; + +/// An implementation of [`Spawn`][futures_core::task::Spawn] that +/// discards spawned futures when used. +/// +/// # Examples +/// +/// ``` +/// #![feature(async_await, futures_api)] +/// use futures::task::SpawnExt; +/// use futures_test::task::{panic_context, spawn}; +/// +/// let mut cx = panic_context(); +/// let mut spawn = spawn::Noop::new(); +/// let cx = &mut cx.with_spawner(&mut spawn); +/// +/// cx.spawner().spawn(async { }); +/// ``` +#[derive(Debug)] +pub struct Noop { + _reserved: (), +} + +impl Noop { + /// Create a new instance + pub fn new() -> Self { + Self { _reserved: () } + } + + /// Get a thread local reference to a singleton instance of [`Noop`] as a + /// [`Spawn`]. + pub fn spawn_mut() -> &'static mut dyn Spawn { + thread_local! { + static INSTANCE: UnsafeCell = UnsafeCell::new(Noop { _reserved: () }); + } + INSTANCE.with(|i| unsafe { &mut *i.get() }) + } +} + +impl Spawn for Noop { + fn spawn_obj( + &mut self, + _future: FutureObj<'static, ()>, + ) -> Result<(), SpawnObjError> { + Ok(()) + } +} + +impl Default for Noop { + fn default() -> Self { + Self::new() + } +} diff --git a/futures-test/src/task/spawn/panic.rs b/futures-test/src/task/spawn/panic.rs new file mode 100644 index 0000000000..3f3ffc1357 --- /dev/null +++ b/futures-test/src/task/spawn/panic.rs @@ -0,0 +1,55 @@ +use futures_core::future::FutureObj; +use futures_core::task::{Spawn, SpawnObjError}; +use std::cell::UnsafeCell; + +/// An implementation of [`Spawn`][futures_core::task::Spawn] that panics +/// when used. +/// +/// # Examples +/// +/// ```should_panic +/// #![feature(async_await, futures_api)] +/// use futures::task::SpawnExt; +/// use futures_test::task::{noop_context, spawn}; +/// +/// let mut cx = noop_context(); +/// let mut spawn = spawn::Panic::new(); +/// let cx = &mut cx.with_spawner(&mut spawn); +/// +/// cx.spawner().spawn(async { }); // Will panic +/// ``` +#[derive(Debug)] +pub struct Panic { + _reserved: (), +} + +impl Panic { + /// Create a new instance + pub fn new() -> Self { + Self { _reserved: () } + } + + /// Get a thread local reference to a singleton instance of [`Panic`] as a + /// [`Spawn`]. + pub fn spawn_mut() -> &'static mut dyn Spawn { + thread_local! { + static INSTANCE: UnsafeCell = UnsafeCell::new(Panic { _reserved: () }); + } + INSTANCE.with(|i| unsafe { &mut *i.get() }) + } +} + +impl Spawn for Panic { + fn spawn_obj( + &mut self, + _future: FutureObj<'static, ()>, + ) -> Result<(), SpawnObjError> { + panic!("should not spawn") + } +} + +impl Default for Panic { + fn default() -> Self { + Self::new() + } +} diff --git a/futures-test/src/task/spawn/record.rs b/futures-test/src/task/spawn/record.rs new file mode 100644 index 0000000000..04028637ce --- /dev/null +++ b/futures-test/src/task/spawn/record.rs @@ -0,0 +1,57 @@ +use futures_core::future::FutureObj; +use futures_core::task::{Spawn, SpawnObjError}; + +/// An implementation of [`Spawn`][futures_core::task::Spawn] that records +/// any [`Future`][futures_core::future::Future]s spawned on it. +/// +/// # Examples +/// +/// ``` +/// #![feature(async_await, futures_api)] +/// use futures::task::SpawnExt; +/// use futures_test::task::{panic_context, spawn}; +/// +/// let mut recorder = spawn::Record::new(); +/// +/// { +/// let mut cx = panic_context(); +/// let cx = &mut cx.with_spawner(&mut recorder); +/// cx.spawner().spawn(async { }); +/// } +/// +/// assert_eq!(recorder.spawned().len(), 1); +/// ``` +#[derive(Debug)] +pub struct Record { + spawned: Vec>, +} + +impl Record { + /// Create a new instance + pub fn new() -> Self { + Self { + spawned: Vec::new(), + } + } + + /// Inspect any futures that were spawned onto this [`Spawn`]. + pub fn spawned(&self) -> &[FutureObj<'static, ()>] { + &self.spawned + } +} + +impl Spawn for Record { + fn spawn_obj( + &mut self, + future: FutureObj<'static, ()>, + ) -> Result<(), SpawnObjError> { + self.spawned.push(future); + Ok(()) + } +} + +impl Default for Record { + fn default() -> Self { + Self::new() + } +} diff --git a/futures-test/src/task/wake/counter.rs b/futures-test/src/task/wake/counter.rs new file mode 100644 index 0000000000..9311915cbc --- /dev/null +++ b/futures-test/src/task/wake/counter.rs @@ -0,0 +1,58 @@ +use futures_core::task::{local_waker, LocalWaker, Wake}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +/// An implementation of [`Wake`][futures_core::task::Wake] that tracks how many +/// times it has been woken. +/// +/// # Examples +/// +/// ``` +/// #![feature(futures_api)] +/// use futures_test::task::{panic_context, wake}; +/// +/// let (wake_counter, local_waker) = wake::Counter::new(); +/// let mut cx = panic_context(); +/// let cx = &mut cx.with_waker(&local_waker); +/// +/// assert_eq!(wake_counter.count(), 0); +/// +/// cx.waker().wake(); +/// cx.waker().wake(); +/// +/// assert_eq!(wake_counter.count(), 2); +/// ``` +#[derive(Debug)] +pub struct Counter { + count: AtomicUsize, +} + +impl Counter { + /// Create a new instance with an associated [`LocalWaker`] + pub fn new() -> (Arc, LocalWaker) { + let arc = Arc::new(Self { + count: AtomicUsize::new(0), + }); + let local_waker = unsafe { local_waker(arc.clone()) }; + (arc, local_waker) + } + + /// Get the number of times this [`Counter`] has been woken + pub fn count(&self) -> usize { + self.count.load(Ordering::SeqCst) + } +} + +impl Default for Counter { + fn default() -> Self { + Self { + count: AtomicUsize::new(0), + } + } +} + +impl Wake for Counter { + fn wake(arc_self: &Arc) { + arc_self.count.fetch_add(1, Ordering::SeqCst); + } +} diff --git a/futures-test/src/task/wake/mod.rs b/futures-test/src/task/wake/mod.rs new file mode 100644 index 0000000000..718a3c92f7 --- /dev/null +++ b/futures-test/src/task/wake/mod.rs @@ -0,0 +1,11 @@ +//! Implementations of [`Wake`][futures_core::task::Wake] with various behaviour +//! for test purposes. + +mod counter; +pub use self::counter::Counter; + +mod noop; +pub use self::noop::Noop; + +mod panic; +pub use self::panic::Panic; diff --git a/futures-test/src/task/wake/noop.rs b/futures-test/src/task/wake/noop.rs new file mode 100644 index 0000000000..de08f464dd --- /dev/null +++ b/futures-test/src/task/wake/noop.rs @@ -0,0 +1,76 @@ +use futures_core::task::{LocalWaker, UnsafeWake, Wake, Waker}; +use std::cell::UnsafeCell; +use std::ptr::NonNull; +use std::sync::Arc; + +/// An implementation of [`Wake`][futures_core::task::Wake] that does nothing +/// when woken. +/// +/// # Examples +/// +/// ``` +/// #![feature(futures_api)] +/// use futures_test::task::{panic_context, wake}; +/// +/// let mut cx = panic_context(); +/// let cx = &mut cx.with_waker(wake::Noop::local_waker_ref()); +/// +/// cx.waker().wake(); +/// ``` +#[derive(Debug)] +pub struct Noop { + _reserved: (), +} + +impl Noop { + /// Create a new instance + pub fn new() -> Self { + Self { _reserved: () } + } + + fn unsafe_wake() -> NonNull { + static mut INSTANCE: Noop = Noop { _reserved: () }; + unsafe { NonNull::new_unchecked(&mut INSTANCE as *mut dyn UnsafeWake) } + } + + /// Create a new [`Waker`] referencing a singleton instance of [`Noop`]. + pub fn waker() -> Waker { + unsafe { Waker::new(Self::unsafe_wake()) } + } + + /// Create a new [`LocalWaker`] referencing a singleton instance of + /// [`Noop`]. + pub fn local_waker() -> LocalWaker { + unsafe { LocalWaker::new(Self::unsafe_wake()) } + } + + /// Get a thread local reference to a [`LocalWaker`] referencing a singleton + /// instance of [`Noop`]. + pub fn local_waker_ref() -> &'static LocalWaker { + thread_local! { + static LOCAL_WAKER_INSTANCE: UnsafeCell = + UnsafeCell::new(Noop::local_waker()); + } + LOCAL_WAKER_INSTANCE.with(|l| unsafe { &mut *l.get() }) + } +} + +impl Default for Noop { + fn default() -> Self { + Self::new() + } +} + +impl Wake for Noop { + fn wake(_arc_self: &Arc) {} +} + +unsafe impl UnsafeWake for Noop { + unsafe fn clone_raw(&self) -> Waker { + Noop::waker() + } + + unsafe fn drop_raw(&self) {} + + unsafe fn wake(&self) {} +} diff --git a/futures-test/src/task/wake/panic.rs b/futures-test/src/task/wake/panic.rs new file mode 100644 index 0000000000..ad5f1b1662 --- /dev/null +++ b/futures-test/src/task/wake/panic.rs @@ -0,0 +1,80 @@ +use futures_core::task::{LocalWaker, UnsafeWake, Wake, Waker}; +use std::cell::UnsafeCell; +use std::ptr::NonNull; +use std::sync::Arc; + +/// An implementation of [`Wake`][futures_core::task::Wake] that panics when +/// woken. +/// +/// # Examples +/// +/// ```should_panic +/// #![feature(futures_api)] +/// use futures_test::task::{noop_context, wake}; +/// +/// let mut cx = noop_context(); +/// let cx = &mut cx.with_waker(wake::Panic::local_waker_ref()); +/// +/// cx.waker().wake(); // Will panic +/// ``` +#[derive(Debug)] +pub struct Panic { + _reserved: (), +} + +impl Panic { + /// Create a new instance + pub fn new() -> Self { + Self { _reserved: () } + } + + fn unsafe_wake() -> NonNull { + static mut INSTANCE: Panic = Panic { _reserved: () }; + unsafe { NonNull::new_unchecked(&mut INSTANCE as *mut dyn UnsafeWake) } + } + + /// Create a new [`Waker`] referencing a singleton instance of [`Panic`]. + pub fn waker() -> Waker { + unsafe { Waker::new(Self::unsafe_wake()) } + } + + /// Create a new [`LocalWaker`] referencing a singleton instance of + /// [`Panic`]. + pub fn local_waker() -> LocalWaker { + unsafe { LocalWaker::new(Self::unsafe_wake()) } + } + + /// Get a thread local reference to a [`LocalWaker`] referencing a singleton + /// instance of [`Panic`]. + pub fn local_waker_ref() -> &'static LocalWaker { + thread_local! { + static LOCAL_WAKER_INSTANCE: UnsafeCell = + UnsafeCell::new(Panic::local_waker()); + } + LOCAL_WAKER_INSTANCE.with(|l| unsafe { &mut *l.get() }) + } +} + +impl Default for Panic { + fn default() -> Self { + Self::new() + } +} + +impl Wake for Panic { + fn wake(_arc_self: &Arc) { + panic!("should not be woken") + } +} + +unsafe impl UnsafeWake for Panic { + unsafe fn clone_raw(&self) -> Waker { + Panic::waker() + } + + unsafe fn drop_raw(&self) {} + + unsafe fn wake(&self) { + panic!("should not be woken") + } +} diff --git a/futures/Cargo.toml b/futures/Cargo.toml index 7c3952f532..32c33ec7fc 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -34,6 +34,7 @@ futures-util-preview = { path = "../futures-util", version = "0.3.0-alpha.2", de [dev-dependencies] pin-utils = "0.1.0-alpha.1" +futures-test-preview = { path = "../futures-test", version = "0.3.0-alpha.3", default-features = false } [features] nightly = ["futures-util-preview/nightly"] diff --git a/futures/tests/abortable.rs b/futures/tests/abortable.rs index fc9368694b..010281732d 100644 --- a/futures/tests/abortable.rs +++ b/futures/tests/abortable.rs @@ -1,13 +1,10 @@ #![feature(pin, arbitrary_self_types, futures_api)] -use futures::FutureExt; use futures::channel::oneshot; use futures::executor::block_on; -use futures::future::{abortable, Aborted}; +use futures::future::{abortable, Aborted, FutureExt}; use futures::task::Poll; - -mod support; -use self::support::with_counter_waker_context; +use futures_test::task::{panic_context, wake}; #[test] fn abortable_works() { @@ -23,14 +20,15 @@ fn abortable_awakens() { let (_tx, a_rx) = oneshot::channel::<()>(); let (mut abortable_rx, abort_handle) = abortable(a_rx); - with_counter_waker_context(|cx, counter| { - assert_eq!(0, counter.get()); - assert_eq!(Poll::Pending, abortable_rx.poll_unpin(cx)); - assert_eq!(0, counter.get()); - abort_handle.abort(); - assert_eq!(1, counter.get()); - assert_eq!(Poll::Ready(Err(Aborted)), abortable_rx.poll_unpin(cx)); - }) + let (wake_counter, local_waker) = wake::Counter::new(); + let mut cx = panic_context(); + let cx = &mut cx.with_waker(&local_waker); + assert_eq!(0, wake_counter.count()); + assert_eq!(Poll::Pending, abortable_rx.poll_unpin(cx)); + assert_eq!(0, wake_counter.count()); + abort_handle.abort(); + assert_eq!(1, wake_counter.count()); + assert_eq!(Poll::Ready(Err(Aborted)), abortable_rx.poll_unpin(cx)); } #[test] diff --git a/futures/tests/basic_combinators.rs b/futures/tests/basic_combinators.rs index cff77a4ab4..e370a05ad6 100644 --- a/futures/tests/basic_combinators.rs +++ b/futures/tests/basic_combinators.rs @@ -1,11 +1,9 @@ #![feature(pin, arbitrary_self_types, futures_api)] use futures::future::{self, FutureExt, TryFutureExt}; +use futures_test::future::FutureTestExt; use std::sync::mpsc; -mod support; -use self::support::RunInBackgroundExt; - #[test] fn basic_future_combinators() { let (tx1, rx) = mpsc::channel(); diff --git a/futures/tests/eager_drop.rs b/futures/tests/eager_drop.rs index 2b97b7b2fa..7f9bff7d47 100644 --- a/futures/tests/eager_drop.rs +++ b/futures/tests/eager_drop.rs @@ -3,13 +3,11 @@ use futures::channel::oneshot; use futures::future::{self, Future, FutureExt, TryFutureExt}; use futures::task::{self, Poll}; +use futures_test::future::FutureTestExt; use pin_utils::unsafe_pinned; use std::mem::PinMut; use std::sync::mpsc; -mod support; -use self::support::RunInBackgroundExt; - #[test] fn map_ok() { // The closure given to `map_ok` should have been dropped by the time `map` diff --git a/futures/tests/fuse.rs b/futures/tests/fuse.rs index 67c672e877..1c7d8e5e80 100644 --- a/futures/tests/fuse.rs +++ b/futures/tests/fuse.rs @@ -1,14 +1,12 @@ #![feature(pin, arbitrary_self_types, futures_api)] use futures::future::{self, FutureExt}; - -mod support; +use futures_test::task::panic_context; #[test] fn fuse() { let mut future = future::ready::(2).fuse(); - support::with_panic_waker_context(|cx| { - assert!(future.poll_unpin(cx).is_ready()); - assert!(future.poll_unpin(cx).is_pending()); - }) + let cx = &mut panic_context(); + assert!(future.poll_unpin(cx).is_ready()); + assert!(future.poll_unpin(cx).is_pending()); } diff --git a/futures/tests/futures_ordered.rs b/futures/tests/futures_ordered.rs index a98a32dfe2..8389febbbc 100644 --- a/futures/tests/futures_ordered.rs +++ b/futures/tests/futures_ordered.rs @@ -4,8 +4,7 @@ use futures::channel::oneshot; use futures::executor::{block_on, block_on_stream}; use futures::future::{self, FutureExt, FutureObj}; use futures::stream::{StreamExt, futures_ordered, FuturesOrdered}; - -mod support; +use futures_test::task::no_spawn_context; #[test] fn works_1() { @@ -16,9 +15,7 @@ fn works_1() { let mut stream = futures_ordered(vec![a_rx, b_rx, c_rx]); b_tx.send(99).unwrap(); - support::with_noop_waker_context(|cx| { - assert!(stream.poll_next_unpin(cx).is_pending()); - }); + assert!(stream.poll_next_unpin(&mut no_spawn_context()).is_pending()); a_tx.send(33).unwrap(); c_tx.send(33).unwrap(); @@ -41,14 +38,13 @@ fn works_2() { FutureObj::new(Box::new(b_rx.join(c_rx).map(|(a, b)| Ok(a? + b?)))), ]); - support::with_noop_waker_context(|cx| { - a_tx.send(33).unwrap(); - b_tx.send(33).unwrap(); - assert!(stream.poll_next_unpin(cx).is_ready()); - assert!(stream.poll_next_unpin(cx).is_pending()); - c_tx.send(33).unwrap(); - assert!(stream.poll_next_unpin(cx).is_ready()); - }) + let cx = &mut no_spawn_context(); + a_tx.send(33).unwrap(); + b_tx.send(33).unwrap(); + assert!(stream.poll_next_unpin(cx).is_ready()); + assert!(stream.poll_next_unpin(cx).is_pending()); + c_tx.send(33).unwrap(); + assert!(stream.poll_next_unpin(cx).is_ready()); } #[test] @@ -74,7 +70,7 @@ fn queue_never_unblocked() { Box::new(b_rx.select(c_rx).then(|res| Ok(Box::new(res) as Box))) as _, ]); - support::with_noop_waker_context(f)(|cx| { + with_no_spawn_context(|cx| { for _ in 0..10 { assert!(stream.poll_next(cx).unwrap().is_pending()); } diff --git a/futures/tests/futures_unordered.rs b/futures/tests/futures_unordered.rs index de958c94f2..1d3fb89b95 100644 --- a/futures/tests/futures_unordered.rs +++ b/futures/tests/futures_unordered.rs @@ -5,10 +5,9 @@ use futures::executor::{block_on, block_on_stream}; use futures::future::{self, FutureExt, FutureObj}; use futures::stream::{StreamExt, futures_unordered, FuturesUnordered}; use futures::task::Poll; +use futures_test::task::no_spawn_context; use std::boxed::Box; -mod support; - #[test] fn works_1() { let (a_tx, a_rx) = oneshot::channel::(); @@ -40,12 +39,12 @@ fn works_2() { a_tx.send(9).unwrap(); b_tx.send(10).unwrap(); - support::with_noop_waker_context(|cx| { - assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(Some(Ok(9)))); - c_tx.send(20).unwrap(); - assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(Some(Ok(30)))); - assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(None)); - }) + + let cx = &mut no_spawn_context(); + assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(Some(Ok(9)))); + c_tx.send(20).unwrap(); + assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(Some(Ok(30)))); + assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(None)); } #[test] diff --git a/futures/tests/oneshot.rs b/futures/tests/oneshot.rs index b74a7c1a58..956680f6da 100644 --- a/futures/tests/oneshot.rs +++ b/futures/tests/oneshot.rs @@ -2,12 +2,10 @@ use futures::channel::oneshot; use futures::future::{FutureExt, TryFutureExt}; +use futures_test::future::FutureTestExt; use std::sync::mpsc; use std::thread; -mod support; -use self::support::RunInBackgroundExt; - #[test] fn oneshot_send1() { let (tx1, rx1) = oneshot::channel::(); diff --git a/futures/tests/support/assert.rs b/futures/tests/support/assert.rs deleted file mode 100644 index d72a6fbebc..0000000000 --- a/futures/tests/support/assert.rs +++ /dev/null @@ -1,38 +0,0 @@ -use futures::stream::Stream; -use futures::task::Poll; -use std::fmt; -use std::mem::PinMut; - -use super::{with_noop_waker_context, with_panic_waker_context}; - -pub fn assert_stream_pending(stream: PinMut) { - with_noop_waker_context(|cx| { - match stream.poll_next(cx) { - Poll::Ready(_) => panic!("stream is not pending"), - Poll::Pending => {}, - } - }) -} - -pub fn assert_stream_next(stream: PinMut, item: S::Item) - where S::Item: Eq + fmt::Debug -{ - with_panic_waker_context(|cx| { - match stream.poll_next(cx) { - Poll::Ready(Some(x)) => assert_eq!(x, item), - Poll::Ready(None) => panic!("stream is at its end"), - Poll::Pending => panic!("stream wasn't ready"), - } - }) -} - -pub fn assert_stream_done(stream: PinMut) -{ - with_panic_waker_context(|cx| { - match stream.poll_next(cx) { - Poll::Ready(Some(_)) => panic!("stream had more elements"), - Poll::Ready(None) => {}, - Poll::Pending => panic!("stream wasn't ready"), - } - }) -} diff --git a/futures/tests/support/counter_waker_context.rs b/futures/tests/support/counter_waker_context.rs deleted file mode 100644 index 1a875acc17..0000000000 --- a/futures/tests/support/counter_waker_context.rs +++ /dev/null @@ -1,33 +0,0 @@ -use super::panic_executor::PanicExecutor; -use futures::task::{self, Wake}; -use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; - -pub struct CounterWaker(AtomicUsize); - -impl CounterWaker { - pub fn get(&self) -> usize { - self.0.load(Ordering::SeqCst) - } - - pub fn set(&self, x: usize) { - self.0.store(x, Ordering::SeqCst) - } -} - -pub fn with_counter_waker_context(f: F) -> R - where F: FnOnce(&mut task::Context, &Arc) -> R -{ - impl Wake for CounterWaker { - fn wake(arc_self: &Arc) { - arc_self.0.fetch_add(1, Ordering::SeqCst); - } - } - - let counter_arc = Arc::new(CounterWaker(AtomicUsize::new(0))); - let counter_waker = unsafe { task::local_waker_ref(&counter_arc) }; - let exec = &mut PanicExecutor; - - let cx = &mut task::Context::new(&counter_waker, exec); - f(cx, &counter_arc) -} diff --git a/futures/tests/support/delayed.rs b/futures/tests/support/delayed.rs deleted file mode 100644 index 82c7ed962e..0000000000 --- a/futures/tests/support/delayed.rs +++ /dev/null @@ -1,35 +0,0 @@ -use futures::future::Future; -use futures::task::{self, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; -use std::mem::PinMut; - -pub struct Delayed { - future: F, - polled_before: bool -} - -impl Delayed { - unsafe_pinned!(future: F); - unsafe_unpinned!(polled_before: bool); -} - -impl Future for Delayed { - type Output = F::Output; - - fn poll(mut self: PinMut, cx: &mut task::Context) -> Poll { - if *self.polled_before() { - self.future().poll(cx) - } else { - *self.polled_before() = true; - cx.waker().wake(); - Poll::Pending - } - } -} - -/// Introduces one `Poll::Pending` before polling the given future -pub fn delayed(future: F) -> Delayed - where F: Future, -{ - Delayed { future, polled_before: false } -} diff --git a/futures/tests/support/mod.rs b/futures/tests/support/mod.rs deleted file mode 100644 index 1a501c089e..0000000000 --- a/futures/tests/support/mod.rs +++ /dev/null @@ -1,44 +0,0 @@ -#![allow(dead_code)] - -pub mod assert; - -mod delayed; -pub use self::delayed::{delayed, Delayed}; - -mod run_in_background; -pub use self::run_in_background::RunInBackgroundExt; - -mod counter_waker_context; -pub use self::counter_waker_context::with_counter_waker_context; - -mod noop_waker_context; -pub use self::noop_waker_context::with_noop_waker_context; - -mod panic_executor; - -mod panic_waker_context; -pub use self::panic_waker_context::with_panic_waker_context; - - -// pub fn f_ok(a: i32) -> FutureResult { Ok(a).into_future() } -// pub fn f_err(a: u32) -> FutureResult { Err(a).into_future() } -// pub fn r_ok(a: i32) -> Result { Ok(a) } -// pub fn r_err(a: u32) -> Result { Err(a) } - -// pub fn assert_done(f: F, result: Result) -// where T: Future, -// T::Item: Eq + fmt::Debug, -// T::Error: Eq + fmt::Debug, -// F: FnOnce() -> T, -// { -// assert_eq!(block_on(f()), result); -// } - -// pub fn assert_empty T>(mut f: F) -// where T::Error: Debug -// { -// panic_waker_cx(|cx| { -// assert!(f().poll(cx).unwrap().is_pending()) -// }) -// } - diff --git a/futures/tests/support/noop_waker_context.rs b/futures/tests/support/noop_waker_context.rs deleted file mode 100644 index 44cfed035f..0000000000 --- a/futures/tests/support/noop_waker_context.rs +++ /dev/null @@ -1,19 +0,0 @@ -use super::panic_executor::PanicExecutor; -use futures::task::{self, Wake}; -use std::sync::Arc; - -pub fn with_noop_waker_context(f: F) -> R - where F: FnOnce(&mut task::Context) -> R -{ - struct NoopWake; - - impl Wake for NoopWake { - fn wake(_: &Arc) {} - } - - let noop_waker = unsafe { task::local_waker(Arc::new(NoopWake)) }; - let exec = &mut PanicExecutor; - - let cx = &mut task::Context::new(&noop_waker, exec); - f(cx) -} diff --git a/futures/tests/support/panic_executor.rs b/futures/tests/support/panic_executor.rs deleted file mode 100644 index 9af821409f..0000000000 --- a/futures/tests/support/panic_executor.rs +++ /dev/null @@ -1,10 +0,0 @@ -use futures::future::FutureObj; -use futures::task::{Spawn, SpawnObjError}; - -pub struct PanicExecutor; - -impl Spawn for PanicExecutor { - fn spawn_obj(&mut self, _: FutureObj<'static, ()>) -> Result<(), SpawnObjError> { - panic!("should not spawn") - } -} diff --git a/futures/tests/support/panic_waker_context.rs b/futures/tests/support/panic_waker_context.rs deleted file mode 100644 index 134c536499..0000000000 --- a/futures/tests/support/panic_waker_context.rs +++ /dev/null @@ -1,21 +0,0 @@ -use super::panic_executor::PanicExecutor; -use futures::task::{self, Wake}; -use std::sync::Arc; - -pub fn with_panic_waker_context(f: F) -> R - where F: FnOnce(&mut task::Context) -> R -{ - struct PanicWake; - - impl Wake for PanicWake { - fn wake(_: &Arc) { - panic!("should not be woken"); - } - } - - let panic_waker = unsafe { task::local_waker(Arc::new(PanicWake)) }; - let exec = &mut PanicExecutor; - - let cx = &mut task::Context::new(&panic_waker, exec); - f(cx) -} diff --git a/futures/tests/support/run_in_background.rs b/futures/tests/support/run_in_background.rs deleted file mode 100644 index 16634b1f2e..0000000000 --- a/futures/tests/support/run_in_background.rs +++ /dev/null @@ -1,16 +0,0 @@ -use futures::executor::block_on; -use futures::future::Future; -use std::thread; - -pub trait RunInBackgroundExt { - fn run_in_background(self); -} - -impl RunInBackgroundExt for F - where F: Future + Sized + Send + 'static, - F::Output: Send, -{ - fn run_in_background(self) { - thread::spawn(|| block_on(self)); - } -} diff --git a/futures/tests/unfold.rs b/futures/tests/unfold.rs index d7f15d8799..ed6fb3732a 100644 --- a/futures/tests/unfold.rs +++ b/futures/tests/unfold.rs @@ -2,37 +2,36 @@ use futures::future; use futures::stream; -use pin_utils::pin_mut; -mod support; -use self::support::assert::*; +use futures_test::{ + assert_stream_pending, assert_stream_next, assert_stream_done, +}; +use futures_test::future::FutureTestExt; #[test] fn unfold1() { - let stream = stream::unfold(0, |state| { + let mut stream = stream::unfold(0, |state| { if state <= 2 { - support::delayed(future::ready(Some((state * 2, state + 1)))) + future::ready(Some((state * 2, state + 1))).delay() } else { - support::delayed(future::ready(None)) + future::ready(None).delay() } }); - pin_mut!(stream); - // Creates the future with the closure // Not ready (delayed future) - assert_stream_pending(stream.reborrow()); + assert_stream_pending!(stream); // Future is ready, yields the item - assert_stream_next(stream.reborrow(), 0); + assert_stream_next!(stream, 0); // Repeat - assert_stream_pending(stream.reborrow()); - assert_stream_next(stream.reborrow(), 2); + assert_stream_pending!(stream); + assert_stream_next!(stream, 2); - assert_stream_pending(stream.reborrow()); - assert_stream_next(stream.reborrow(), 4); + assert_stream_pending!(stream); + assert_stream_next!(stream, 4); // No more items - assert_stream_pending(stream.reborrow()); - assert_stream_done(stream.reborrow()); + assert_stream_pending!(stream); + assert_stream_done!(stream); }