From b24df49a9d26057b44a869c6effce494ec10dd35 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Sat, 14 May 2022 20:15:53 +0200 Subject: [PATCH 1/4] chore: disable warnings in old CI (#4691) --- .circleci/config.yml | 25 +++++++ .cirrus.yml | 4 +- .github/workflows/ci.yml | 14 ++-- .github/workflows/loom.yml | 3 +- Cargo.toml | 1 - tests-build/Cargo.toml | 16 ----- tests-build/README.md | 2 - tests-build/src/lib.rs | 2 - .../tests/fail/macros_core_no_default.rs | 6 -- .../tests/fail/macros_core_no_default.stderr | 7 -- tests-build/tests/fail/macros_dead_code.rs | 8 --- .../tests/fail/macros_dead_code.stderr | 11 --- .../tests/fail/macros_invalid_input.rs | 40 ----------- .../tests/fail/macros_invalid_input.stderr | 71 ------------------- .../tests/fail/macros_type_mismatch.rs | 26 ------- .../tests/fail/macros_type_mismatch.stderr | 45 ------------ tests-build/tests/macros.rs | 27 ------- tests-build/tests/macros_clippy.rs | 7 -- .../tests/pass/forward_args_and_output.rs | 13 ---- tests-build/tests/pass/macros_main_loop.rs | 14 ---- tests-build/tests/pass/macros_main_return.rs | 6 -- tokio/src/time/driver/sleep.rs | 12 ++-- 22 files changed, 37 insertions(+), 323 deletions(-) create mode 100644 .circleci/config.yml delete mode 100644 tests-build/Cargo.toml delete mode 100644 tests-build/README.md delete mode 100644 tests-build/src/lib.rs delete mode 100644 tests-build/tests/fail/macros_core_no_default.rs delete mode 100644 tests-build/tests/fail/macros_core_no_default.stderr delete mode 100644 tests-build/tests/fail/macros_dead_code.rs delete mode 100644 tests-build/tests/fail/macros_dead_code.stderr delete mode 100644 tests-build/tests/fail/macros_invalid_input.rs delete mode 100644 tests-build/tests/fail/macros_invalid_input.stderr delete mode 100644 tests-build/tests/fail/macros_type_mismatch.rs delete mode 100644 tests-build/tests/fail/macros_type_mismatch.stderr delete mode 100644 tests-build/tests/macros.rs delete mode 100644 tests-build/tests/macros_clippy.rs delete mode 100644 tests-build/tests/pass/forward_args_and_output.rs delete mode 100644 tests-build/tests/pass/macros_main_loop.rs delete mode 100644 tests-build/tests/pass/macros_main_return.rs diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 00000000000..97670e93784 --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,25 @@ +version: 2.1 +jobs: + test-arm: + machine: + image: ubuntu-2004:202101-01 + resource_class: arm.medium + environment: + # Change to pin rust versino + RUST_STABLE: stable + steps: + - checkout + - run: + name: Install Rust + command: | + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs -o rustup.sh + chmod +x rustup.sh + ./rustup.sh -y --default-toolchain $RUST_STABLE + source "$HOME"/.cargo/env + # Only run Tokio tests + - run: cargo test --all-features -p tokio + +workflows: + ci: + jobs: + - test-arm diff --git a/.cirrus.yml b/.cirrus.yml index 4bef869c24f..390f7883fef 100644 --- a/.cirrus.yml +++ b/.cirrus.yml @@ -1,7 +1,5 @@ freebsd_instance: image: freebsd-12-2-release-amd64 -env: - RUSTFLAGS: -D warnings # Test FreeBSD in a full VM on cirrus-ci.com. Test the i686 target too, in the # same VM. The binary will be built in 32-bit mode, but will execute on a @@ -25,7 +23,7 @@ task: name: FreeBSD docs env: RUSTFLAGS: --cfg docsrs - RUSTDOCFLAGS: --cfg docsrs -Dwarnings + RUSTDOCFLAGS: --cfg docsrs setup_script: - pkg install -y bash curl - curl https://sh.rustup.rs -sSf --output rustup.sh diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 05f6e87f330..862dab192b6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,7 +7,6 @@ on: name: CI env: - RUSTFLAGS: -Dwarnings RUST_BACKTRACE: 1 nightly: nightly-2021-10-25 minrust: 1.45.2 @@ -65,11 +64,6 @@ jobs: run: cargo hack test --each-feature working-directory: tests-integration - # Run macro build tests - - name: test tests-build --each-feature - run: cargo hack test --each-feature - working-directory: tests-build - # Build benchmarks. Run of benchmarks is done by bench.yml workflow. - name: build benches run: cargo build --benches @@ -129,7 +123,7 @@ jobs: run: cargo test --all-features working-directory: tokio env: - RUSTFLAGS: --cfg tokio_unstable -Dwarnings + RUSTFLAGS: --cfg tokio_unstable miri: name: miri @@ -213,7 +207,7 @@ jobs: - name: check --each-feature --unstable run: cargo hack check --all --each-feature -Z avoid-dev-deps env: - RUSTFLAGS: --cfg tokio_unstable -Dwarnings + RUSTFLAGS: --cfg tokio_unstable minrust: name: minrust @@ -298,7 +292,7 @@ jobs: run: cargo doc --lib --no-deps --all-features --document-private-items env: RUSTFLAGS: --cfg docsrs - RUSTDOCFLAGS: --cfg docsrs -Dwarnings + RUSTDOCFLAGS: --cfg docsrs loom-compile: name: build loom tests @@ -312,7 +306,7 @@ jobs: run: cargo test --no-run --lib --features full working-directory: tokio env: - RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings + RUSTFLAGS: --cfg loom --cfg tokio_unstable check-readme: name: Check README diff --git a/.github/workflows/loom.yml b/.github/workflows/loom.yml index fde9f1114e1..088828a1b51 100644 --- a/.github/workflows/loom.yml +++ b/.github/workflows/loom.yml @@ -8,7 +8,6 @@ on: name: Loom env: - RUSTFLAGS: -Dwarnings RUST_BACKTRACE: 1 jobs: @@ -35,6 +34,6 @@ jobs: run: cargo test --lib --release --features full -- --nocapture $SCOPE working-directory: tokio env: - RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings + RUSTFLAGS: --cfg loom --cfg tokio_unstable LOOM_MAX_PREEMPTIONS: 2 SCOPE: ${{ matrix.scope }} diff --git a/Cargo.toml b/Cargo.toml index bc01f186281..0ed9192a249 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,5 @@ members = [ "benches", "examples", "stress-test", - "tests-build", "tests-integration", ] diff --git a/tests-build/Cargo.toml b/tests-build/Cargo.toml deleted file mode 100644 index 299af0cf417..00000000000 --- a/tests-build/Cargo.toml +++ /dev/null @@ -1,16 +0,0 @@ -[package] -name = "tests-build" -version = "0.1.0" -authors = ["Tokio Contributors "] -edition = "2018" -publish = false - -[features] -full = ["tokio/full"] -rt = ["tokio/rt", "tokio/macros"] - -[dependencies] -tokio = { path = "../tokio", optional = true } - -[dev-dependencies] -trybuild = "1.0" diff --git a/tests-build/README.md b/tests-build/README.md deleted file mode 100644 index f491e2bc377..00000000000 --- a/tests-build/README.md +++ /dev/null @@ -1,2 +0,0 @@ -Tests the various combination of feature flags. This is broken out to a separate -crate to work around limitations with cargo features. diff --git a/tests-build/src/lib.rs b/tests-build/src/lib.rs deleted file mode 100644 index 7b019cc7c26..00000000000 --- a/tests-build/src/lib.rs +++ /dev/null @@ -1,2 +0,0 @@ -#[cfg(feature = "tokio")] -pub use tokio; diff --git a/tests-build/tests/fail/macros_core_no_default.rs b/tests-build/tests/fail/macros_core_no_default.rs deleted file mode 100644 index 23f8847df7d..00000000000 --- a/tests-build/tests/fail/macros_core_no_default.rs +++ /dev/null @@ -1,6 +0,0 @@ -use tests_build::tokio; - -#[tokio::main] -async fn my_fn() {} - -fn main() {} diff --git a/tests-build/tests/fail/macros_core_no_default.stderr b/tests-build/tests/fail/macros_core_no_default.stderr deleted file mode 100644 index 676acc8dbe3..00000000000 --- a/tests-build/tests/fail/macros_core_no_default.stderr +++ /dev/null @@ -1,7 +0,0 @@ -error: The default runtime flavor is `multi_thread`, but the `rt-multi-thread` feature is disabled. - --> $DIR/macros_core_no_default.rs:3:1 - | -3 | #[tokio::main] - | ^^^^^^^^^^^^^^ - | - = note: this error originates in the attribute macro `tokio::main` (in Nightly builds, run with -Z macro-backtrace for more info) diff --git a/tests-build/tests/fail/macros_dead_code.rs b/tests-build/tests/fail/macros_dead_code.rs deleted file mode 100644 index f2ada6f835d..00000000000 --- a/tests-build/tests/fail/macros_dead_code.rs +++ /dev/null @@ -1,8 +0,0 @@ -#![deny(dead_code)] - -use tests_build::tokio; - -#[tokio::main] -async fn f() {} - -fn main() {} diff --git a/tests-build/tests/fail/macros_dead_code.stderr b/tests-build/tests/fail/macros_dead_code.stderr deleted file mode 100644 index 816c294bd31..00000000000 --- a/tests-build/tests/fail/macros_dead_code.stderr +++ /dev/null @@ -1,11 +0,0 @@ -error: function is never used: `f` - --> $DIR/macros_dead_code.rs:6:10 - | -6 | async fn f() {} - | ^ - | -note: the lint level is defined here - --> $DIR/macros_dead_code.rs:1:9 - | -1 | #![deny(dead_code)] - | ^^^^^^^^^ diff --git a/tests-build/tests/fail/macros_invalid_input.rs b/tests-build/tests/fail/macros_invalid_input.rs deleted file mode 100644 index eb04eca76b6..00000000000 --- a/tests-build/tests/fail/macros_invalid_input.rs +++ /dev/null @@ -1,40 +0,0 @@ -use tests_build::tokio; - -#[tokio::main] -fn main_is_not_async() {} - -#[tokio::main(foo)] -async fn main_attr_has_unknown_args() {} - -#[tokio::main(threadpool::bar)] -async fn main_attr_has_path_args() {} - -#[tokio::test] -fn test_is_not_async() {} - -#[tokio::test(foo)] -async fn test_attr_has_args() {} - -#[tokio::test(foo = 123)] -async fn test_unexpected_attr() {} - -#[tokio::test(flavor = 123)] -async fn test_flavor_not_string() {} - -#[tokio::test(flavor = "foo")] -async fn test_unknown_flavor() {} - -#[tokio::test(flavor = "multi_thread", start_paused = false)] -async fn test_multi_thread_with_start_paused() {} - -#[tokio::test(flavor = "multi_thread", worker_threads = "foo")] -async fn test_worker_threads_not_int() {} - -#[tokio::test(flavor = "current_thread", worker_threads = 4)] -async fn test_worker_threads_and_current_thread() {} - -#[tokio::test] -#[test] -async fn test_has_second_test_attr() {} - -fn main() {} diff --git a/tests-build/tests/fail/macros_invalid_input.stderr b/tests-build/tests/fail/macros_invalid_input.stderr deleted file mode 100644 index 11337a94fe5..00000000000 --- a/tests-build/tests/fail/macros_invalid_input.stderr +++ /dev/null @@ -1,71 +0,0 @@ -error: the `async` keyword is missing from the function declaration - --> $DIR/macros_invalid_input.rs:4:1 - | -4 | fn main_is_not_async() {} - | ^^ - -error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`, `start_paused` - --> $DIR/macros_invalid_input.rs:6:15 - | -6 | #[tokio::main(foo)] - | ^^^ - -error: Must have specified ident - --> $DIR/macros_invalid_input.rs:9:15 - | -9 | #[tokio::main(threadpool::bar)] - | ^^^^^^^^^^^^^^^ - -error: the `async` keyword is missing from the function declaration - --> $DIR/macros_invalid_input.rs:13:1 - | -13 | fn test_is_not_async() {} - | ^^ - -error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`, `start_paused` - --> $DIR/macros_invalid_input.rs:15:15 - | -15 | #[tokio::test(foo)] - | ^^^ - -error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`, `start_paused` - --> $DIR/macros_invalid_input.rs:18:15 - | -18 | #[tokio::test(foo = 123)] - | ^^^^^^^^^ - -error: Failed to parse value of `flavor` as string. - --> $DIR/macros_invalid_input.rs:21:24 - | -21 | #[tokio::test(flavor = 123)] - | ^^^ - -error: No such runtime flavor `foo`. The runtime flavors are `current_thread` and `multi_thread`. - --> $DIR/macros_invalid_input.rs:24:24 - | -24 | #[tokio::test(flavor = "foo")] - | ^^^^^ - -error: The `start_paused` option requires the `current_thread` runtime flavor. Use `#[tokio::test(flavor = "current_thread")]` - --> $DIR/macros_invalid_input.rs:27:55 - | -27 | #[tokio::test(flavor = "multi_thread", start_paused = false)] - | ^^^^^ - -error: Failed to parse value of `worker_threads` as integer. - --> $DIR/macros_invalid_input.rs:30:57 - | -30 | #[tokio::test(flavor = "multi_thread", worker_threads = "foo")] - | ^^^^^ - -error: The `worker_threads` option requires the `multi_thread` runtime flavor. Use `#[tokio::test(flavor = "multi_thread")]` - --> $DIR/macros_invalid_input.rs:33:59 - | -33 | #[tokio::test(flavor = "current_thread", worker_threads = 4)] - | ^ - -error: second test attribute is supplied - --> $DIR/macros_invalid_input.rs:37:1 - | -37 | #[test] - | ^^^^^^^ diff --git a/tests-build/tests/fail/macros_type_mismatch.rs b/tests-build/tests/fail/macros_type_mismatch.rs deleted file mode 100644 index 0a5b9c4c727..00000000000 --- a/tests-build/tests/fail/macros_type_mismatch.rs +++ /dev/null @@ -1,26 +0,0 @@ -use tests_build::tokio; - -#[tokio::main] -async fn missing_semicolon_or_return_type() { - Ok(()) -} - -#[tokio::main] -async fn missing_return_type() { - return Ok(()); -} - -#[tokio::main] -async fn extra_semicolon() -> Result<(), ()> { - /* TODO(taiki-e): help message still wrong - help: try using a variant of the expected enum - | - 23 | Ok(Ok(());) - | - 23 | Err(Ok(());) - | - */ - Ok(()); -} - -fn main() {} diff --git a/tests-build/tests/fail/macros_type_mismatch.stderr b/tests-build/tests/fail/macros_type_mismatch.stderr deleted file mode 100644 index a8fa99bc63b..00000000000 --- a/tests-build/tests/fail/macros_type_mismatch.stderr +++ /dev/null @@ -1,45 +0,0 @@ -error[E0308]: mismatched types - --> $DIR/macros_type_mismatch.rs:5:5 - | -5 | Ok(()) - | ^^^^^^ expected `()`, found enum `Result` - | - = note: expected unit type `()` - found enum `Result<(), _>` -help: consider using a semicolon here - | -5 | Ok(()); - | + -help: try adding a return type - | -4 | async fn missing_semicolon_or_return_type() -> Result<(), _> { - | ++++++++++++++++ - -error[E0308]: mismatched types - --> $DIR/macros_type_mismatch.rs:10:5 - | -9 | async fn missing_return_type() { - | - help: try adding a return type: `-> Result<(), _>` -10 | return Ok(()); - | ^^^^^^^^^^^^^^ expected `()`, found enum `Result` - | - = note: expected unit type `()` - found enum `Result<(), _>` - -error[E0308]: mismatched types - --> $DIR/macros_type_mismatch.rs:23:5 - | -14 | async fn extra_semicolon() -> Result<(), ()> { - | -------------- expected `Result<(), ()>` because of return type -... -23 | Ok(()); - | ^^^^^^^ expected enum `Result`, found `()` - | - = note: expected enum `Result<(), ()>` - found unit type `()` -help: try using a variant of the expected enum - | -23 | Ok(Ok(());) - | -23 | Err(Ok(());) - | diff --git a/tests-build/tests/macros.rs b/tests-build/tests/macros.rs deleted file mode 100644 index 0a180dfb74f..00000000000 --- a/tests-build/tests/macros.rs +++ /dev/null @@ -1,27 +0,0 @@ -#[test] -fn compile_fail_full() { - let t = trybuild::TestCases::new(); - - #[cfg(feature = "full")] - t.pass("tests/pass/forward_args_and_output.rs"); - - #[cfg(feature = "full")] - t.pass("tests/pass/macros_main_return.rs"); - - #[cfg(feature = "full")] - t.pass("tests/pass/macros_main_loop.rs"); - - #[cfg(feature = "full")] - t.compile_fail("tests/fail/macros_invalid_input.rs"); - - #[cfg(feature = "full")] - t.compile_fail("tests/fail/macros_dead_code.rs"); - - #[cfg(feature = "full")] - t.compile_fail("tests/fail/macros_type_mismatch.rs"); - - #[cfg(all(feature = "rt", not(feature = "full")))] - t.compile_fail("tests/fail/macros_core_no_default.rs"); - - drop(t); -} diff --git a/tests-build/tests/macros_clippy.rs b/tests-build/tests/macros_clippy.rs deleted file mode 100644 index 0f3f4bb0b8b..00000000000 --- a/tests-build/tests/macros_clippy.rs +++ /dev/null @@ -1,7 +0,0 @@ -#[cfg(feature = "full")] -#[tokio::test] -async fn test_with_semicolon_without_return_type() { - #![deny(clippy::semicolon_if_nothing_returned)] - - dbg!(0); -} diff --git a/tests-build/tests/pass/forward_args_and_output.rs b/tests-build/tests/pass/forward_args_and_output.rs deleted file mode 100644 index e6ef1f88c1e..00000000000 --- a/tests-build/tests/pass/forward_args_and_output.rs +++ /dev/null @@ -1,13 +0,0 @@ -use tests_build::tokio; - -fn main() {} - -// arguments and output type is forwarded so other macros can access them - -#[tokio::test] -async fn test_fn_has_args(_x: u8) {} - -#[tokio::test] -async fn test_has_output() -> Result<(), Box> { - Ok(()) -} diff --git a/tests-build/tests/pass/macros_main_loop.rs b/tests-build/tests/pass/macros_main_loop.rs deleted file mode 100644 index d7d51982c36..00000000000 --- a/tests-build/tests/pass/macros_main_loop.rs +++ /dev/null @@ -1,14 +0,0 @@ -use tests_build::tokio; - -#[tokio::main] -async fn main() -> Result<(), ()> { - loop { - if !never() { - return Ok(()); - } - } -} - -fn never() -> bool { - std::time::Instant::now() > std::time::Instant::now() -} diff --git a/tests-build/tests/pass/macros_main_return.rs b/tests-build/tests/pass/macros_main_return.rs deleted file mode 100644 index d4d34ec26d3..00000000000 --- a/tests-build/tests/pass/macros_main_return.rs +++ /dev/null @@ -1,6 +0,0 @@ -use tests_build::tokio; - -#[tokio::main] -async fn main() -> Result<(), ()> { - return Ok(()); -} diff --git a/tokio/src/time/driver/sleep.rs b/tokio/src/time/driver/sleep.rs index 43ff694ffc6..c0571ca2ab9 100644 --- a/tokio/src/time/driver/sleep.rs +++ b/tokio/src/time/driver/sleep.rs @@ -324,8 +324,8 @@ impl Sleep { } fn reset_inner(self: Pin<&mut Self>, deadline: Instant) { - let me = self.project(); - me.entry.reset(deadline); + let mut me = self.project(); + me.entry.as_mut().reset(deadline); (*me.inner).deadline = deadline; #[cfg(all(tokio_unstable, feature = "tracing"))] @@ -349,12 +349,12 @@ impl Sleep { cfg_not_trace! { fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - let me = self.project(); + let mut me = self.project(); // Keep track of task budget let coop = ready!(crate::coop::poll_proceed(cx)); - me.entry.poll_elapsed(cx).map(move |r| { + me.entry.as_mut().poll_elapsed(cx).map(move |r| { coop.made_progress(); r }) @@ -363,7 +363,7 @@ impl Sleep { cfg_trace! { fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - let me = self.project(); + let mut me = self.project(); // Keep track of task budget let coop = ready!(trace_poll_op!( "poll_elapsed", @@ -371,7 +371,7 @@ impl Sleep { me.inner.resource_span.id(), )); - let result = me.entry.poll_elapsed(cx).map(move |r| { + let result = me.entry.as_mut().poll_elapsed(cx).map(move |r| { coop.made_progress(); r }); From 0105d9971f21ef07f50c9ee479551a4997d33ef1 Mon Sep 17 00:00:00 2001 From: Finomnis Date: Fri, 13 May 2022 23:26:15 +0200 Subject: [PATCH 2/4] sync: rewrite `CancellationToken` (#4652) --- tokio-util/Cargo.toml | 2 +- tokio-util/src/sync/cancellation_token.rs | 760 ++--------------- .../src/sync/cancellation_token/tree_node.rs | 373 +++++++++ .../src/sync/intrusive_double_linked_list.rs | 788 ------------------ tokio-util/src/sync/mod.rs | 2 - tokio-util/tests/sync_cancellation_token.rs | 182 +++- 6 files changed, 608 insertions(+), 1499 deletions(-) create mode 100644 tokio-util/src/sync/cancellation_token/tree_node.rs delete mode 100644 tokio-util/src/sync/intrusive_double_linked_list.rs diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 676b0e2ec94..d61ec1e33c5 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -36,7 +36,7 @@ rt = ["tokio/rt"] __docs_rs = ["futures-util"] [dependencies] -tokio = { version = "1.0.0", path = "../tokio", features = ["sync"] } +tokio = { version = "1.7.0", path = "../tokio", features = ["sync"] } bytes = "1.0.0" futures-core = "0.3.0" diff --git a/tokio-util/src/sync/cancellation_token.rs b/tokio-util/src/sync/cancellation_token.rs index f193f6bc32f..2a6ef392bd4 100644 --- a/tokio-util/src/sync/cancellation_token.rs +++ b/tokio-util/src/sync/cancellation_token.rs @@ -1,18 +1,15 @@ //! An asynchronously awaitable `CancellationToken`. //! The token allows to signal a cancellation request to one or more tasks. pub(crate) mod guard; +mod tree_node; -use crate::loom::sync::atomic::AtomicUsize; -use crate::loom::sync::Mutex; -use crate::sync::intrusive_double_linked_list::{LinkedList, ListNode}; - +use crate::loom::sync::Arc; use core::future::Future; use core::pin::Pin; -use core::ptr::NonNull; -use core::sync::atomic::Ordering; -use core::task::{Context, Poll, Waker}; +use core::task::{Context, Poll}; use guard::DropGuard; +use pin_project_lite::pin_project; /// A token which can be used to signal a cancellation request to one or more /// tasks. @@ -24,9 +21,9 @@ use guard::DropGuard; /// /// # Examples /// -/// ```ignore +/// ```no_run /// use tokio::select; -/// use tokio::scope::CancellationToken; +/// use tokio_util::sync::CancellationToken; /// /// #[tokio::main] /// async fn main() { @@ -55,31 +52,20 @@ use guard::DropGuard; /// } /// ``` pub struct CancellationToken { - inner: NonNull, + inner: Arc, } -// Safety: The CancellationToken is thread-safe and can be moved between threads, -// since all methods are internally synchronized. -unsafe impl Send for CancellationToken {} -unsafe impl Sync for CancellationToken {} - -/// A Future that is resolved once the corresponding [`CancellationToken`] -/// was cancelled -#[must_use = "futures do nothing unless polled"] -pub struct WaitForCancellationFuture<'a> { - /// The CancellationToken that is associated with this WaitForCancellationFuture - cancellation_token: Option<&'a CancellationToken>, - /// Node for waiting at the cancellation_token - wait_node: ListNode, - /// Whether this future was registered at the token yet as a waiter - is_registered: bool, +pin_project! { + /// A Future that is resolved once the corresponding [`CancellationToken`] + /// is cancelled. + #[must_use = "futures do nothing unless polled"] + pub struct WaitForCancellationFuture<'a> { + cancellation_token: &'a CancellationToken, + #[pin] + future: tokio::sync::futures::Notified<'a>, + } } -// Safety: Futures can be sent between threads as long as the underlying -// cancellation_token is thread-safe (Sync), -// which allows to poll/register/unregister from a different thread. -unsafe impl<'a> Send for WaitForCancellationFuture<'a> {} - // ===== impl CancellationToken ===== impl core::fmt::Debug for CancellationToken { @@ -92,43 +78,16 @@ impl core::fmt::Debug for CancellationToken { impl Clone for CancellationToken { fn clone(&self) -> Self { - // Safety: The state inside a `CancellationToken` is always valid, since - // is reference counted - let inner = self.state(); - - // Tokens are cloned by increasing their refcount - let current_state = inner.snapshot(); - inner.increment_refcount(current_state); - - CancellationToken { inner: self.inner } + tree_node::increase_handle_refcount(&self.inner); + CancellationToken { + inner: self.inner.clone(), + } } } impl Drop for CancellationToken { fn drop(&mut self) { - let token_state_pointer = self.inner; - - // Safety: The state inside a `CancellationToken` is always valid, since - // is reference counted - let inner = unsafe { &mut *self.inner.as_ptr() }; - - let mut current_state = inner.snapshot(); - - // We need to safe the parent, since the state might be released by the - // next call - let parent = inner.parent; - - // Drop our own refcount - current_state = inner.decrement_refcount(current_state); - - // If this was the last reference, unregister from the parent - if current_state.refcount == 0 { - if let Some(mut parent) = parent { - // Safety: Since we still retain a reference on the parent, it must be valid. - let parent = unsafe { parent.as_mut() }; - parent.unregister_child(token_state_pointer, current_state); - } - } + tree_node::decrease_handle_refcount(&self.inner); } } @@ -141,29 +100,11 @@ impl Default for CancellationToken { impl CancellationToken { /// Creates a new CancellationToken in the non-cancelled state. pub fn new() -> CancellationToken { - let state = Box::new(CancellationTokenState::new( - None, - StateSnapshot { - cancel_state: CancellationState::NotCancelled, - has_parent_ref: false, - refcount: 1, - }, - )); - - // Safety: We just created the Box. The pointer is guaranteed to be - // not null CancellationToken { - inner: unsafe { NonNull::new_unchecked(Box::into_raw(state)) }, + inner: Arc::new(tree_node::TreeNode::new()), } } - /// Returns a reference to the utilized `CancellationTokenState`. - fn state(&self) -> &CancellationTokenState { - // Safety: The state inside a `CancellationToken` is always valid, since - // is reference counted - unsafe { &*self.inner.as_ptr() } - } - /// Creates a `CancellationToken` which will get cancelled whenever the /// current token gets cancelled. /// @@ -172,9 +113,9 @@ impl CancellationToken { /// /// # Examples /// - /// ```ignore + /// ```no_run /// use tokio::select; - /// use tokio::scope::CancellationToken; + /// use tokio_util::sync::CancellationToken; /// /// #[tokio::main] /// async fn main() { @@ -203,56 +144,8 @@ impl CancellationToken { /// } /// ``` pub fn child_token(&self) -> CancellationToken { - let inner = self.state(); - - // Increment the refcount of this token. It will be referenced by the - // child, independent of whether the child is immediately cancelled or - // not. - let _current_state = inner.increment_refcount(inner.snapshot()); - - let mut unpacked_child_state = StateSnapshot { - has_parent_ref: true, - refcount: 1, - cancel_state: CancellationState::NotCancelled, - }; - let mut child_token_state = Box::new(CancellationTokenState::new( - Some(self.inner), - unpacked_child_state, - )); - - { - let mut guard = inner.synchronized.lock().unwrap(); - if guard.is_cancelled { - // This task was already cancelled. In this case we should not - // insert the child into the list, since it would never get removed - // from the list. - (*child_token_state.synchronized.lock().unwrap()).is_cancelled = true; - unpacked_child_state.cancel_state = CancellationState::Cancelled; - // Since it's not in the list, the parent doesn't need to retain - // a reference to it. - unpacked_child_state.has_parent_ref = false; - child_token_state - .state - .store(unpacked_child_state.pack(), Ordering::SeqCst); - } else { - if let Some(mut first_child) = guard.first_child { - child_token_state.from_parent.next_peer = Some(first_child); - // Safety: We manipulate other child task inside the Mutex - // and retain a parent reference on it. The child token can't - // get invalidated while the Mutex is held. - unsafe { - first_child.as_mut().from_parent.prev_peer = - Some((&mut *child_token_state).into()) - }; - } - guard.first_child = Some((&mut *child_token_state).into()); - } - }; - - let child_token_ptr = Box::into_raw(child_token_state); - // Safety: We just created the pointer from a `Box` CancellationToken { - inner: unsafe { NonNull::new_unchecked(child_token_ptr) }, + inner: tree_node::child_node(&self.inner), } } @@ -260,21 +153,33 @@ impl CancellationToken { /// derived from it. /// /// This will wake up all tasks which are waiting for cancellation. + /// + /// Be aware that cancellation is not an atomic operation. It is possible + /// for another thread running in parallel with a call to `cancel` to first + /// receive `true` from `is_cancelled` on one child node, and then receive + /// `false` from `is_cancelled` on another child node. However, once the + /// call to `cancel` returns, all child nodes have been fully cancelled. pub fn cancel(&self) { - self.state().cancel(); + tree_node::cancel(&self.inner); } - /// Returns `true` if the `CancellationToken` had been cancelled + /// Returns `true` if the `CancellationToken` is cancelled. pub fn is_cancelled(&self) -> bool { - self.state().is_cancelled() + tree_node::is_cancelled(&self.inner) } /// Returns a `Future` that gets fulfilled when cancellation is requested. + /// + /// The future will complete immediately if the token is already cancelled + /// when this method is called. + /// + /// # Cancel safety + /// + /// This method is cancel safe. pub fn cancelled(&self) -> WaitForCancellationFuture<'_> { WaitForCancellationFuture { - cancellation_token: Some(self), - wait_node: ListNode::new(WaitQueueEntry::new()), - is_registered: false, + cancellation_token: self, + future: self.inner.notified(), } } @@ -285,26 +190,6 @@ impl CancellationToken { pub fn drop_guard(self) -> DropGuard { DropGuard { inner: Some(self) } } - - unsafe fn register( - &self, - wait_node: &mut ListNode, - cx: &mut Context<'_>, - ) -> Poll<()> { - self.state().register(wait_node, cx) - } - - fn check_for_cancellation( - &self, - wait_node: &mut ListNode, - cx: &mut Context<'_>, - ) -> Poll<()> { - self.state().check_for_cancellation(wait_node, cx) - } - - fn unregister(&self, wait_node: &mut ListNode) { - self.state().unregister(wait_node) - } } // ===== impl WaitForCancellationFuture ===== @@ -319,560 +204,21 @@ impl<'a> Future for WaitForCancellationFuture<'a> { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - // Safety: We do not move anything out of `WaitForCancellationFuture` - let mut_self: &mut WaitForCancellationFuture<'_> = unsafe { Pin::get_unchecked_mut(self) }; - - let cancellation_token = mut_self - .cancellation_token - .expect("polled WaitForCancellationFuture after completion"); - - let poll_res = if !mut_self.is_registered { - // Safety: The `ListNode` is pinned through the Future, - // and we will unregister it in `WaitForCancellationFuture::drop` - // before the Future is dropped and the memory reference is invalidated. - unsafe { cancellation_token.register(&mut mut_self.wait_node, cx) } - } else { - cancellation_token.check_for_cancellation(&mut mut_self.wait_node, cx) - }; - - if let Poll::Ready(()) = poll_res { - // The cancellation_token was signalled - mut_self.cancellation_token = None; - // A signalled Token means the Waker won't be enqueued anymore - mut_self.is_registered = false; - mut_self.wait_node.task = None; - } else { - // This `Future` and its stored `Waker` stay registered at the - // `CancellationToken` - mut_self.is_registered = true; - } - - poll_res - } -} - -impl<'a> Drop for WaitForCancellationFuture<'a> { - fn drop(&mut self) { - // If this WaitForCancellationFuture has been polled and it was added to the - // wait queue at the cancellation_token, it must be removed before dropping. - // Otherwise the cancellation_token would access invalid memory. - if let Some(token) = self.cancellation_token { - if self.is_registered { - token.unregister(&mut self.wait_node); - } - } - } -} - -/// Tracks how the future had interacted with the [`CancellationToken`] -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -enum PollState { - /// The task has never interacted with the [`CancellationToken`]. - New, - /// The task was added to the wait queue at the [`CancellationToken`]. - Waiting, - /// The task has been polled to completion. - Done, -} - -/// Tracks the WaitForCancellationFuture waiting state. -/// Access to this struct is synchronized through the mutex in the CancellationToken. -struct WaitQueueEntry { - /// The task handle of the waiting task - task: Option, - // Current polling state. This state is only updated inside the Mutex of - // the CancellationToken. - state: PollState, -} - -impl WaitQueueEntry { - /// Creates a new WaitQueueEntry - fn new() -> WaitQueueEntry { - WaitQueueEntry { - task: None, - state: PollState::New, - } - } -} - -struct SynchronizedState { - waiters: LinkedList, - first_child: Option>, - is_cancelled: bool, -} - -impl SynchronizedState { - fn new() -> Self { - Self { - waiters: LinkedList::new(), - first_child: None, - is_cancelled: false, - } - } -} - -/// Information embedded in child tokens which is synchronized through the Mutex -/// in their parent. -struct SynchronizedThroughParent { - next_peer: Option>, - prev_peer: Option>, -} - -/// Possible states of a `CancellationToken` -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -enum CancellationState { - NotCancelled = 0, - Cancelling = 1, - Cancelled = 2, -} - -impl CancellationState { - fn pack(self) -> usize { - self as usize - } - - fn unpack(value: usize) -> Self { - match value { - 0 => CancellationState::NotCancelled, - 1 => CancellationState::Cancelling, - 2 => CancellationState::Cancelled, - _ => unreachable!("Invalid value"), - } - } -} - -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -struct StateSnapshot { - /// The amount of references to this particular CancellationToken. - /// `CancellationToken` structs hold these references to a `CancellationTokenState`. - /// Also the state is referenced by the state of each child. - refcount: usize, - /// Whether the state is still referenced by it's parent and can therefore - /// not be freed. - has_parent_ref: bool, - /// Whether the token is cancelled - cancel_state: CancellationState, -} - -impl StateSnapshot { - /// Packs the snapshot into a `usize` - fn pack(self) -> usize { - self.refcount << 3 | if self.has_parent_ref { 4 } else { 0 } | self.cancel_state.pack() - } - - /// Unpacks the snapshot from a `usize` - fn unpack(value: usize) -> Self { - let refcount = value >> 3; - let has_parent_ref = value & 4 != 0; - let cancel_state = CancellationState::unpack(value & 0x03); - - StateSnapshot { - refcount, - has_parent_ref, - cancel_state, - } - } - - /// Whether this `CancellationTokenState` is still referenced by any - /// `CancellationToken`. - fn has_refs(&self) -> bool { - self.refcount != 0 || self.has_parent_ref - } -} - -/// The maximum permitted amount of references to a CancellationToken. This -/// is derived from the intent to never use more than 32bit in the `Snapshot`. -const MAX_REFS: u32 = (std::u32::MAX - 7) >> 3; - -/// Internal state of the `CancellationToken` pair above -struct CancellationTokenState { - state: AtomicUsize, - parent: Option>, - from_parent: SynchronizedThroughParent, - synchronized: Mutex, -} - -impl CancellationTokenState { - fn new( - parent: Option>, - state: StateSnapshot, - ) -> CancellationTokenState { - CancellationTokenState { - parent, - from_parent: SynchronizedThroughParent { - prev_peer: None, - next_peer: None, - }, - state: AtomicUsize::new(state.pack()), - synchronized: Mutex::new(SynchronizedState::new()), - } - } - - /// Returns a snapshot of the current atomic state of the token - fn snapshot(&self) -> StateSnapshot { - StateSnapshot::unpack(self.state.load(Ordering::SeqCst)) - } - - fn atomic_update_state(&self, mut current_state: StateSnapshot, func: F) -> StateSnapshot - where - F: Fn(StateSnapshot) -> StateSnapshot, - { - let mut current_packed_state = current_state.pack(); + let mut this = self.project(); loop { - let next_state = func(current_state); - match self.state.compare_exchange( - current_packed_state, - next_state.pack(), - Ordering::SeqCst, - Ordering::SeqCst, - ) { - Ok(_) => { - return next_state; - } - Err(actual) => { - current_packed_state = actual; - current_state = StateSnapshot::unpack(actual); - } - } - } - } - - fn increment_refcount(&self, current_state: StateSnapshot) -> StateSnapshot { - self.atomic_update_state(current_state, |mut state: StateSnapshot| { - if state.refcount >= MAX_REFS as usize { - eprintln!("[ERROR] Maximum reference count for CancellationToken was exceeded"); - std::process::abort(); - } - state.refcount += 1; - state - }) - } - - fn decrement_refcount(&self, current_state: StateSnapshot) -> StateSnapshot { - let current_state = self.atomic_update_state(current_state, |mut state: StateSnapshot| { - state.refcount -= 1; - state - }); - - // Drop the State if it is not referenced anymore - if !current_state.has_refs() { - // Safety: `CancellationTokenState` is always stored in refcounted - // Boxes - let _ = unsafe { Box::from_raw(self as *const Self as *mut Self) }; - } - - current_state - } - - fn remove_parent_ref(&self, current_state: StateSnapshot) -> StateSnapshot { - let current_state = self.atomic_update_state(current_state, |mut state: StateSnapshot| { - state.has_parent_ref = false; - state - }); - - // Drop the State if it is not referenced anymore - if !current_state.has_refs() { - // Safety: `CancellationTokenState` is always stored in refcounted - // Boxes - let _ = unsafe { Box::from_raw(self as *const Self as *mut Self) }; - } - - current_state - } - - /// Unregisters a child from the parent token. - /// The child tokens state is not exactly known at this point in time. - /// If the parent token is cancelled, the child token gets removed from the - /// parents list, and might therefore already have been freed. If the parent - /// token is not cancelled, the child token is still valid. - fn unregister_child( - &mut self, - mut child_state: NonNull, - current_child_state: StateSnapshot, - ) { - let removed_child = { - // Remove the child toke from the parents linked list - let mut guard = self.synchronized.lock().unwrap(); - if !guard.is_cancelled { - // Safety: Since the token was not cancelled, the child must - // still be in the list and valid. - let mut child_state = unsafe { child_state.as_mut() }; - debug_assert!(child_state.snapshot().has_parent_ref); - - if guard.first_child == Some(child_state.into()) { - guard.first_child = child_state.from_parent.next_peer; - } - // Safety: If peers wouldn't be valid anymore, they would try - // to remove themselves from the list. This would require locking - // the Mutex that we currently own. - unsafe { - if let Some(mut prev_peer) = child_state.from_parent.prev_peer { - prev_peer.as_mut().from_parent.next_peer = - child_state.from_parent.next_peer; - } - if let Some(mut next_peer) = child_state.from_parent.next_peer { - next_peer.as_mut().from_parent.prev_peer = - child_state.from_parent.prev_peer; - } - } - child_state.from_parent.prev_peer = None; - child_state.from_parent.next_peer = None; - - // The child is no longer referenced by the parent, since we were able - // to remove its reference from the parents list. - true - } else { - // Do not touch the linked list anymore. If the parent is cancelled - // it will move all childs outside of the Mutex and manipulate - // the pointers there. Manipulating the pointers here too could - // lead to races. Therefore leave them just as as and let the - // parent deal with it. The parent will make sure to retain a - // reference to this state as long as it manipulates the list - // pointers. Therefore the pointers are not dangling. - false - } - }; - - if removed_child { - // If the token removed itself from the parents list, it can reset - // the parent ref status. If it is isn't able to do so, because the - // parent removed it from the list, there is no need to do this. - // The parent ref acts as as another reference count. Therefore - // removing this reference can free the object. - // Safety: The token was in the list. This means the parent wasn't - // cancelled before, and the token must still be alive. - unsafe { child_state.as_mut().remove_parent_ref(current_child_state) }; - } - - // Decrement the refcount on the parent and free it if necessary - self.decrement_refcount(self.snapshot()); - } - - fn cancel(&self) { - // Move the state of the CancellationToken from `NotCancelled` to `Cancelling` - let mut current_state = self.snapshot(); - - let state_after_cancellation = loop { - if current_state.cancel_state != CancellationState::NotCancelled { - // Another task already initiated the cancellation - return; + if this.cancellation_token.is_cancelled() { + return Poll::Ready(()); } - let mut next_state = current_state; - next_state.cancel_state = CancellationState::Cancelling; - match self.state.compare_exchange( - current_state.pack(), - next_state.pack(), - Ordering::SeqCst, - Ordering::SeqCst, - ) { - Ok(_) => break next_state, - Err(actual) => current_state = StateSnapshot::unpack(actual), + // No wakeups can be lost here because there is always a call to + // `is_cancelled` between the creation of the future and the call to + // `poll`, and the code that sets the cancelled flag does so before + // waking the `Notified`. + if this.future.as_mut().poll(cx).is_pending() { + return Poll::Pending; } - }; - - // This task cancelled the token - - // Take the task list out of the Token - // We do not want to cancel child token inside this lock. If one of the - // child tasks would have additional child tokens, we would recursively - // take locks. - - // Doing this action has an impact if the child token is dropped concurrently: - // It will try to deregister itself from the parent task, but can not find - // itself in the task list anymore. Therefore it needs to assume the parent - // has extracted the list and will process it. It may not modify the list. - // This is OK from a memory safety perspective, since the parent still - // retains a reference to the child task until it finished iterating over - // it. - - let mut first_child = { - let mut guard = self.synchronized.lock().unwrap(); - // Save the cancellation also inside the Mutex - // This allows child tokens which want to detach themselves to detect - // that this is no longer required since the parent cleared the list. - guard.is_cancelled = true; - - // Wakeup all waiters - // This happens inside the lock to make cancellation reliable - // If we would access waiters outside of the lock, the pointers - // may no longer be valid. - // Typically this shouldn't be an issue, since waking a task should - // only move it from the blocked into the ready state and not have - // further side effects. - - // Use a reverse iterator, so that the oldest waiter gets - // scheduled first - guard.waiters.reverse_drain(|waiter| { - // We are not allowed to move the `Waker` out of the list node. - // The `Future` relies on the fact that the old `Waker` stays there - // as long as the `Future` has not completed in order to perform - // the `will_wake()` check. - // Therefore `wake_by_ref` is used instead of `wake()` - if let Some(handle) = &mut waiter.task { - handle.wake_by_ref(); - } - // Mark the waiter to have been removed from the list. - waiter.state = PollState::Done; - }); - - guard.first_child.take() - }; - - while let Some(mut child) = first_child { - // Safety: We know this is a valid pointer since it is in our child pointer - // list. It can't have been freed in between, since we retain a a reference - // to each child. - let mut_child = unsafe { child.as_mut() }; - - // Get the next child and clean up list pointers - first_child = mut_child.from_parent.next_peer; - mut_child.from_parent.prev_peer = None; - mut_child.from_parent.next_peer = None; - - // Cancel the child task - mut_child.cancel(); - - // Drop the parent reference. This `CancellationToken` is not interested - // in interacting with the child anymore. - // This is ONLY allowed once we promised not to touch the state anymore - // after this interaction. - mut_child.remove_parent_ref(mut_child.snapshot()); - } - - // The cancellation has completed - // At this point in time tasks which registered a wait node can be sure - // that this wait node already had been dequeued from the list without - // needing to inspect the list. - self.atomic_update_state(state_after_cancellation, |mut state| { - state.cancel_state = CancellationState::Cancelled; - state - }); - } - /// Returns `true` if the `CancellationToken` had been cancelled - fn is_cancelled(&self) -> bool { - let current_state = self.snapshot(); - current_state.cancel_state != CancellationState::NotCancelled - } - - /// Registers a waiting task at the `CancellationToken`. - /// Safety: This method is only safe as long as the waiting waiting task - /// will properly unregister the wait node before it gets moved. - unsafe fn register( - &self, - wait_node: &mut ListNode, - cx: &mut Context<'_>, - ) -> Poll<()> { - debug_assert_eq!(PollState::New, wait_node.state); - let current_state = self.snapshot(); - - // Perform an optimistic cancellation check before. This is not strictly - // necessary since we also check for cancellation in the Mutex, but - // reduces the necessary work to be performed for tasks which already - // had been cancelled. - if current_state.cancel_state != CancellationState::NotCancelled { - return Poll::Ready(()); - } - - // So far the token is not cancelled. However it could be cancelled before - // we get the chance to store the `Waker`. Therefore we need to check - // for cancellation again inside the mutex. - let mut guard = self.synchronized.lock().unwrap(); - if guard.is_cancelled { - // Cancellation was signalled - wait_node.state = PollState::Done; - Poll::Ready(()) - } else { - // Added the task to the wait queue - wait_node.task = Some(cx.waker().clone()); - wait_node.state = PollState::Waiting; - guard.waiters.add_front(wait_node); - Poll::Pending - } - } - - fn check_for_cancellation( - &self, - wait_node: &mut ListNode, - cx: &mut Context<'_>, - ) -> Poll<()> { - debug_assert!( - wait_node.task.is_some(), - "Method can only be called after task had been registered" - ); - - let current_state = self.snapshot(); - - if current_state.cancel_state != CancellationState::NotCancelled { - // If the cancellation had been fully completed we know that our `Waker` - // is no longer registered at the `CancellationToken`. - // Otherwise the cancel call may or may not yet have iterated - // through the waiters list and removed the wait nodes. - // If it hasn't yet, we need to remove it. Otherwise an attempt to - // reuse the `wait_nodeĀ“ might get freed due to the `WaitForCancellationFuture` - // getting dropped before the cancellation had interacted with it. - if current_state.cancel_state != CancellationState::Cancelled { - self.unregister(wait_node); - } - Poll::Ready(()) - } else { - // Check if we need to swap the `Waker`. This will make the check more - // expensive, since the `Waker` is synchronized through the Mutex. - // If we don't need to perform a `Waker` update, an atomic check for - // cancellation is sufficient. - let need_waker_update = wait_node - .task - .as_ref() - .map(|waker| waker.will_wake(cx.waker())) - .unwrap_or(true); - - if need_waker_update { - let guard = self.synchronized.lock().unwrap(); - if guard.is_cancelled { - // Cancellation was signalled. Since this cancellation signal - // is set inside the Mutex, the old waiter must already have - // been removed from the waiting list - debug_assert_eq!(PollState::Done, wait_node.state); - wait_node.task = None; - Poll::Ready(()) - } else { - // The WaitForCancellationFuture is already in the queue. - // The CancellationToken can't have been cancelled, - // since this would change the is_cancelled flag inside the mutex. - // Therefore we just have to update the Waker. A follow-up - // cancellation will always use the new waker. - wait_node.task = Some(cx.waker().clone()); - Poll::Pending - } - } else { - // Do nothing. If the token gets cancelled, this task will get - // woken again and can fetch the cancellation. - Poll::Pending - } - } - } - - fn unregister(&self, wait_node: &mut ListNode) { - debug_assert!( - wait_node.task.is_some(), - "waiter can not be active without task" - ); - - let mut guard = self.synchronized.lock().unwrap(); - // WaitForCancellationFuture only needs to get removed if it has been added to - // the wait queue of the CancellationToken. - // This has happened in the PollState::Waiting case. - if let PollState::Waiting = wait_node.state { - // Safety: Due to the state, we know that the node must be part - // of the waiter list - if !unsafe { guard.waiters.remove(wait_node) } { - // Panic if the address isn't found. This can only happen if the contract was - // violated, e.g. the WaitQueueEntry got moved after the initial poll. - panic!("Future could not be removed from wait queue"); - } - wait_node.state = PollState::Done; + this.future.set(this.cancellation_token.inner.notified()); } - wait_node.task = None; } } diff --git a/tokio-util/src/sync/cancellation_token/tree_node.rs b/tokio-util/src/sync/cancellation_token/tree_node.rs new file mode 100644 index 00000000000..b6cd698e23d --- /dev/null +++ b/tokio-util/src/sync/cancellation_token/tree_node.rs @@ -0,0 +1,373 @@ +//! This mod provides the logic for the inner tree structure of the CancellationToken. +//! +//! CancellationTokens are only light handles with references to TreeNode. +//! All the logic is actually implemented in the TreeNode. +//! +//! A TreeNode is part of the cancellation tree and may have one parent and an arbitrary number of +//! children. +//! +//! A TreeNode can receive the request to perform a cancellation through a CancellationToken. +//! This cancellation request will cancel the node and all of its descendants. +//! +//! As soon as a node cannot get cancelled any more (because it was already cancelled or it has no +//! more CancellationTokens pointing to it any more), it gets removed from the tree, to keep the +//! tree as small as possible. +//! +//! # Invariants +//! +//! Those invariants shall be true at any time. +//! +//! 1. A node that has no parents and no handles can no longer be cancelled. +//! This is important during both cancellation and refcounting. +//! +//! 2. If node B *is* or *was* a child of node A, then node B was created *after* node A. +//! This is important for deadlock safety, as it is used for lock order. +//! Node B can only become the child of node A in two ways: +//! - being created with `child_node()`, in which case it is trivially true that +//! node A already existed when node B was created +//! - being moved A->C->B to A->B because node C was removed in `decrease_handle_refcount()` +//! or `cancel()`. In this case the invariant still holds, as B was younger than C, and C +//! was younger than A, therefore B is also younger than A. +//! +//! 3. If two nodes are both unlocked and node A is the parent of node B, then node B is a child of +//! node A. It is important to always restore that invariant before dropping the lock of a node. +//! +//! # Deadlock safety +//! +//! We always lock in the order of creation time. We can prove this through invariant #2. +//! Specifically, through invariant #2, we know that we always have to lock a parent +//! before its child. +//! +use crate::loom::sync::{Arc, Mutex, MutexGuard}; + +/// A node of the cancellation tree structure +/// +/// The actual data it holds is wrapped inside a mutex for synchronization. +pub(crate) struct TreeNode { + inner: Mutex, + waker: tokio::sync::Notify, +} +impl TreeNode { + pub(crate) fn new() -> Self { + Self { + inner: Mutex::new(Inner { + parent: None, + parent_idx: 0, + children: vec![], + is_cancelled: false, + num_handles: 1, + }), + waker: tokio::sync::Notify::new(), + } + } + + pub(crate) fn notified(&self) -> tokio::sync::futures::Notified<'_> { + self.waker.notified() + } +} + +/// The data contained inside a TreeNode. +/// +/// This struct exists so that the data of the node can be wrapped +/// in a Mutex. +struct Inner { + parent: Option>, + parent_idx: usize, + children: Vec>, + is_cancelled: bool, + num_handles: usize, +} + +/// Returns whether or not the node is cancelled +pub(crate) fn is_cancelled(node: &Arc) -> bool { + node.inner.lock().unwrap().is_cancelled +} + +/// Creates a child node +pub(crate) fn child_node(parent: &Arc) -> Arc { + let mut locked_parent = parent.inner.lock().unwrap(); + + // Do not register as child if we are already cancelled. + // Cancelled trees can never be uncancelled and therefore + // need no connection to parents or children any more. + if locked_parent.is_cancelled { + return Arc::new(TreeNode { + inner: Mutex::new(Inner { + parent: None, + parent_idx: 0, + children: vec![], + is_cancelled: true, + num_handles: 1, + }), + waker: tokio::sync::Notify::new(), + }); + } + + let child = Arc::new(TreeNode { + inner: Mutex::new(Inner { + parent: Some(parent.clone()), + parent_idx: locked_parent.children.len(), + children: vec![], + is_cancelled: false, + num_handles: 1, + }), + waker: tokio::sync::Notify::new(), + }); + + locked_parent.children.push(child.clone()); + + child +} + +/// Disconnects the given parent from all of its children. +/// +/// Takes a reference to [Inner] to make sure the parent is already locked. +fn disconnect_children(node: &mut Inner) { + for child in std::mem::take(&mut node.children) { + let mut locked_child = child.inner.lock().unwrap(); + locked_child.parent_idx = 0; + locked_child.parent = None; + } +} + +/// Figures out the parent of the node and locks the node and its parent atomically. +/// +/// The basic principle of preventing deadlocks in the tree is +/// that we always lock the parent first, and then the child. +/// For more info look at *deadlock safety* and *invariant #2*. +/// +/// Sadly, it's impossible to figure out the parent of a node without +/// locking it. To then achieve locking order consistency, the node +/// has to be unlocked before the parent gets locked. +/// This leaves a small window where we already assume that we know the parent, +/// but neither the parent nor the node is locked. Therefore, the parent could change. +/// +/// To prevent that this problem leaks into the rest of the code, it is abstracted +/// in this function. +/// +/// The locked child and optionally its locked parent, if a parent exists, get passed +/// to the `func` argument via (node, None) or (node, Some(parent)). +fn with_locked_node_and_parent(node: &Arc, func: F) -> Ret +where + F: FnOnce(MutexGuard<'_, Inner>, Option>) -> Ret, +{ + let mut potential_parent = { + let locked_node = node.inner.lock().unwrap(); + match locked_node.parent.clone() { + Some(parent) => parent, + // If we locked the node and its parent is `None`, we are in a valid state + // and can return. + None => return func(locked_node, None), + } + }; + + loop { + // Deadlock safety: + // + // Due to invariant #2, we know that we have to lock the parent first, and then the child. + // This is true even if the potential_parent is no longer the current parent or even its + // sibling, as the invariant still holds. + let locked_parent = potential_parent.inner.lock().unwrap(); + let locked_node = node.inner.lock().unwrap(); + + let actual_parent = match locked_node.parent.clone() { + Some(parent) => parent, + // If we locked the node and its parent is `None`, we are in a valid state + // and can return. + None => { + // Was the wrong parent, so unlock it before calling `func` + drop(locked_parent); + return func(locked_node, None); + } + }; + + // Loop until we managed to lock both the node and its parent + if Arc::ptr_eq(&actual_parent, &potential_parent) { + return func(locked_node, Some(locked_parent)); + } + + // Drop locked_parent before reassigning to potential_parent, + // as potential_parent is borrowed in it + drop(locked_node); + drop(locked_parent); + + potential_parent = actual_parent; + } +} + +/// Moves all children from `node` to `parent`. +/// +/// `parent` MUST have been a parent of the node when they both got locked, +/// otherwise there is a potential for a deadlock as invariant #2 would be violated. +/// +/// To aquire the locks for node and parent, use [with_locked_node_and_parent]. +fn move_children_to_parent(node: &mut Inner, parent: &mut Inner) { + // Pre-allocate in the parent, for performance + parent.children.reserve(node.children.len()); + + for child in std::mem::take(&mut node.children) { + { + let mut child_locked = child.inner.lock().unwrap(); + child_locked.parent = node.parent.clone(); + child_locked.parent_idx = parent.children.len(); + } + parent.children.push(child); + } +} + +/// Removes a child from the parent. +/// +/// `parent` MUST be the parent of `node`. +/// To aquire the locks for node and parent, use [with_locked_node_and_parent]. +fn remove_child(parent: &mut Inner, mut node: MutexGuard<'_, Inner>) { + // Query the position from where to remove a node + let pos = node.parent_idx; + node.parent = None; + node.parent_idx = 0; + + // Unlock node, so that only one child at a time is locked. + // Otherwise we would violate the lock order (see 'deadlock safety') as we + // don't know the creation order of the child nodes + drop(node); + + // If `node` is the last element in the list, we don't need any swapping + if parent.children.len() == pos + 1 { + parent.children.pop().unwrap(); + } else { + // If `node` is not the last element in the list, we need to + // replace it with the last element + let replacement_child = parent.children.pop().unwrap(); + replacement_child.inner.lock().unwrap().parent_idx = pos; + parent.children[pos] = replacement_child; + } + + let len = parent.children.len(); + if 4 * len <= parent.children.capacity() { + // equal to: + // parent.children.shrink_to(2 * len); + // but shrink_to was not yet stabilized in our minimal compatible version + let old_children = std::mem::replace(&mut parent.children, Vec::with_capacity(2 * len)); + parent.children.extend(old_children); + } +} + +/// Increases the reference count of handles. +pub(crate) fn increase_handle_refcount(node: &Arc) { + let mut locked_node = node.inner.lock().unwrap(); + + // Once no handles are left over, the node gets detached from the tree. + // There should never be a new handle once all handles are dropped. + assert!(locked_node.num_handles > 0); + + locked_node.num_handles += 1; +} + +/// Decreases the reference count of handles. +/// +/// Once no handle is left, we can remove the node from the +/// tree and connect its parent directly to its children. +pub(crate) fn decrease_handle_refcount(node: &Arc) { + let num_handles = { + let mut locked_node = node.inner.lock().unwrap(); + locked_node.num_handles -= 1; + locked_node.num_handles + }; + + if num_handles == 0 { + with_locked_node_and_parent(node, |mut node, parent| { + // Remove the node from the tree + match parent { + Some(mut parent) => { + // As we want to remove ourselves from the tree, + // we have to move the children to the parent, so that + // they still receive the cancellation event without us. + // Moving them does not violate invariant #1. + move_children_to_parent(&mut node, &mut parent); + + // Remove the node from the parent + remove_child(&mut parent, node); + } + None => { + // Due to invariant #1, we can assume that our + // children can no longer be cancelled through us. + // (as we now have neither a parent nor handles) + // Therefore we can disconnect them. + disconnect_children(&mut node); + } + } + }); + } +} + +/// Cancels a node and its children. +pub(crate) fn cancel(node: &Arc) { + let mut locked_node = node.inner.lock().unwrap(); + + if locked_node.is_cancelled { + return; + } + + // One by one, adopt grandchildren and then cancel and detach the child + while let Some(child) = locked_node.children.pop() { + // This can't deadlock because the mutex we are already + // holding is the parent of child. + let mut locked_child = child.inner.lock().unwrap(); + + // Detach the child from node + // No need to modify node.children, as the child already got removed with `.pop` + locked_child.parent = None; + locked_child.parent_idx = 0; + + // If child is already cancelled, detaching is enough + if locked_child.is_cancelled { + continue; + } + + // Cancel or adopt grandchildren + while let Some(grandchild) = locked_child.children.pop() { + // This can't deadlock because the two mutexes we are already + // holding is the parent and grandparent of grandchild. + let mut locked_grandchild = grandchild.inner.lock().unwrap(); + + // Detach the grandchild + locked_grandchild.parent = None; + locked_grandchild.parent_idx = 0; + + // If grandchild is already cancelled, detaching is enough + if locked_grandchild.is_cancelled { + continue; + } + + // For performance reasons, only adopt grandchildren that have children. + // Otherwise, just cancel them right away, no need for another iteration. + if locked_grandchild.children.is_empty() { + // Cancel the grandchild + locked_grandchild.is_cancelled = true; + locked_grandchild.children = Vec::new(); + drop(locked_grandchild); + grandchild.waker.notify_waiters(); + } else { + // Otherwise, adopt grandchild + locked_grandchild.parent = Some(node.clone()); + locked_grandchild.parent_idx = locked_node.children.len(); + drop(locked_grandchild); + locked_node.children.push(grandchild); + } + } + + // Cancel the child + locked_child.is_cancelled = true; + locked_child.children = Vec::new(); + drop(locked_child); + child.waker.notify_waiters(); + + // Now the child is cancelled and detached and all its children are adopted. + // Just continue until all (including adopted) children are cancelled and detached. + } + + // Cancel the node itself. + locked_node.is_cancelled = true; + locked_node.children = Vec::new(); + drop(locked_node); + node.waker.notify_waiters(); +} diff --git a/tokio-util/src/sync/intrusive_double_linked_list.rs b/tokio-util/src/sync/intrusive_double_linked_list.rs deleted file mode 100644 index 0a5ecff9a37..00000000000 --- a/tokio-util/src/sync/intrusive_double_linked_list.rs +++ /dev/null @@ -1,788 +0,0 @@ -//! An intrusive double linked list of data - -#![allow(dead_code, unreachable_pub)] - -use core::{ - marker::PhantomPinned, - ops::{Deref, DerefMut}, - ptr::NonNull, -}; - -/// A node which carries data of type `T` and is stored in an intrusive list -#[derive(Debug)] -pub struct ListNode { - /// The previous node in the list. `None` if there is no previous node. - prev: Option>>, - /// The next node in the list. `None` if there is no previous node. - next: Option>>, - /// The data which is associated to this list item - data: T, - /// Prevents `ListNode`s from being `Unpin`. They may never be moved, since - /// the list semantics require addresses to be stable. - _pin: PhantomPinned, -} - -impl ListNode { - /// Creates a new node with the associated data - pub fn new(data: T) -> ListNode { - Self { - prev: None, - next: None, - data, - _pin: PhantomPinned, - } - } -} - -impl Deref for ListNode { - type Target = T; - - fn deref(&self) -> &T { - &self.data - } -} - -impl DerefMut for ListNode { - fn deref_mut(&mut self) -> &mut T { - &mut self.data - } -} - -/// An intrusive linked list of nodes, where each node carries associated data -/// of type `T`. -#[derive(Debug)] -pub struct LinkedList { - head: Option>>, - tail: Option>>, -} - -impl LinkedList { - /// Creates an empty linked list - pub fn new() -> Self { - LinkedList:: { - head: None, - tail: None, - } - } - - /// Adds a node at the front of the linked list. - /// Safety: This function is only safe as long as `node` is guaranteed to - /// get removed from the list before it gets moved or dropped. - /// In addition to this `node` may not be added to another other list before - /// it is removed from the current one. - pub unsafe fn add_front(&mut self, node: &mut ListNode) { - node.next = self.head; - node.prev = None; - if let Some(mut head) = self.head { - head.as_mut().prev = Some(node.into()) - }; - self.head = Some(node.into()); - if self.tail.is_none() { - self.tail = Some(node.into()); - } - } - - /// Inserts a node into the list in a way that the list keeps being sorted. - /// Safety: This function is only safe as long as `node` is guaranteed to - /// get removed from the list before it gets moved or dropped. - /// In addition to this `node` may not be added to another other list before - /// it is removed from the current one. - pub unsafe fn add_sorted(&mut self, node: &mut ListNode) - where - T: PartialOrd, - { - if self.head.is_none() { - // First node in the list - self.head = Some(node.into()); - self.tail = Some(node.into()); - return; - } - - let mut prev: Option>> = None; - let mut current = self.head; - - while let Some(mut current_node) = current { - if node.data < current_node.as_ref().data { - // Need to insert before the current node - current_node.as_mut().prev = Some(node.into()); - match prev { - Some(mut prev) => { - prev.as_mut().next = Some(node.into()); - } - None => { - // We are inserting at the beginning of the list - self.head = Some(node.into()); - } - } - node.next = current; - node.prev = prev; - return; - } - prev = current; - current = current_node.as_ref().next; - } - - // We looped through the whole list and the nodes data is bigger or equal - // than everything we found up to now. - // Insert at the end. Since we checked before that the list isn't empty, - // tail always has a value. - node.prev = self.tail; - node.next = None; - self.tail.as_mut().unwrap().as_mut().next = Some(node.into()); - self.tail = Some(node.into()); - } - - /// Returns the first node in the linked list without removing it from the list - /// The function is only safe as long as valid pointers are stored inside - /// the linked list. - /// The returned pointer is only guaranteed to be valid as long as the list - /// is not mutated - pub fn peek_first(&self) -> Option<&mut ListNode> { - // Safety: When the node was inserted it was promised that it is alive - // until it gets removed from the list. - // The returned node has a pointer which constrains it to the lifetime - // of the list. This is ok, since the Node is supposed to outlive - // its insertion in the list. - unsafe { - self.head - .map(|mut node| &mut *(node.as_mut() as *mut ListNode)) - } - } - - /// Returns the last node in the linked list without removing it from the list - /// The function is only safe as long as valid pointers are stored inside - /// the linked list. - /// The returned pointer is only guaranteed to be valid as long as the list - /// is not mutated - pub fn peek_last(&self) -> Option<&mut ListNode> { - // Safety: When the node was inserted it was promised that it is alive - // until it gets removed from the list. - // The returned node has a pointer which constrains it to the lifetime - // of the list. This is ok, since the Node is supposed to outlive - // its insertion in the list. - unsafe { - self.tail - .map(|mut node| &mut *(node.as_mut() as *mut ListNode)) - } - } - - /// Removes the first node from the linked list - pub fn remove_first(&mut self) -> Option<&mut ListNode> { - #![allow(clippy::debug_assert_with_mut_call)] - - // Safety: When the node was inserted it was promised that it is alive - // until it gets removed from the list - unsafe { - let mut head = self.head?; - self.head = head.as_mut().next; - - let first_ref = head.as_mut(); - match first_ref.next { - None => { - // This was the only node in the list - debug_assert_eq!(Some(first_ref.into()), self.tail); - self.tail = None; - } - Some(mut next) => { - next.as_mut().prev = None; - } - } - - first_ref.prev = None; - first_ref.next = None; - Some(&mut *(first_ref as *mut ListNode)) - } - } - - /// Removes the last node from the linked list and returns it - pub fn remove_last(&mut self) -> Option<&mut ListNode> { - #![allow(clippy::debug_assert_with_mut_call)] - - // Safety: When the node was inserted it was promised that it is alive - // until it gets removed from the list - unsafe { - let mut tail = self.tail?; - self.tail = tail.as_mut().prev; - - let last_ref = tail.as_mut(); - match last_ref.prev { - None => { - // This was the last node in the list - debug_assert_eq!(Some(last_ref.into()), self.head); - self.head = None; - } - Some(mut prev) => { - prev.as_mut().next = None; - } - } - - last_ref.prev = None; - last_ref.next = None; - Some(&mut *(last_ref as *mut ListNode)) - } - } - - /// Returns whether the linked list does not contain any node - pub fn is_empty(&self) -> bool { - if self.head.is_some() { - return false; - } - - debug_assert!(self.tail.is_none()); - true - } - - /// Removes the given `node` from the linked list. - /// Returns whether the `node` was removed. - /// It is also only safe if it is known that the `node` is either part of this - /// list, or of no list at all. If `node` is part of another list, the - /// behavior is undefined. - pub unsafe fn remove(&mut self, node: &mut ListNode) -> bool { - #![allow(clippy::debug_assert_with_mut_call)] - - match node.prev { - None => { - // This might be the first node in the list. If it is not, the - // node is not in the list at all. Since our precondition is that - // the node must either be in this list or in no list, we check that - // the node is really in no list. - if self.head != Some(node.into()) { - debug_assert!(node.next.is_none()); - return false; - } - self.head = node.next; - } - Some(mut prev) => { - debug_assert_eq!(prev.as_ref().next, Some(node.into())); - prev.as_mut().next = node.next; - } - } - - match node.next { - None => { - // This must be the last node in our list. Otherwise the list - // is inconsistent. - debug_assert_eq!(self.tail, Some(node.into())); - self.tail = node.prev; - } - Some(mut next) => { - debug_assert_eq!(next.as_mut().prev, Some(node.into())); - next.as_mut().prev = node.prev; - } - } - - node.next = None; - node.prev = None; - - true - } - - /// Drains the list iby calling a callback on each list node - /// - /// The method does not return an iterator since stopping or deferring - /// draining the list is not permitted. If the method would push nodes to - /// an iterator we could not guarantee that the nodes do not get utilized - /// after having been removed from the list anymore. - pub fn drain(&mut self, mut func: F) - where - F: FnMut(&mut ListNode), - { - let mut current = self.head; - self.head = None; - self.tail = None; - - while let Some(mut node) = current { - // Safety: The nodes have not been removed from the list yet and must - // therefore contain valid data. The nodes can also not be added to - // the list again during iteration, since the list is mutably borrowed. - unsafe { - let node_ref = node.as_mut(); - current = node_ref.next; - - node_ref.next = None; - node_ref.prev = None; - - // Note: We do not reset the pointers from the next element in the - // list to the current one since we will iterate over the whole - // list anyway, and therefore clean up all pointers. - - func(node_ref); - } - } - } - - /// Drains the list in reverse order by calling a callback on each list node - /// - /// The method does not return an iterator since stopping or deferring - /// draining the list is not permitted. If the method would push nodes to - /// an iterator we could not guarantee that the nodes do not get utilized - /// after having been removed from the list anymore. - pub fn reverse_drain(&mut self, mut func: F) - where - F: FnMut(&mut ListNode), - { - let mut current = self.tail; - self.head = None; - self.tail = None; - - while let Some(mut node) = current { - // Safety: The nodes have not been removed from the list yet and must - // therefore contain valid data. The nodes can also not be added to - // the list again during iteration, since the list is mutably borrowed. - unsafe { - let node_ref = node.as_mut(); - current = node_ref.prev; - - node_ref.next = None; - node_ref.prev = None; - - // Note: We do not reset the pointers from the next element in the - // list to the current one since we will iterate over the whole - // list anyway, and therefore clean up all pointers. - - func(node_ref); - } - } - } -} - -#[cfg(all(test, feature = "std"))] // Tests make use of Vec at the moment -mod tests { - use super::*; - - fn collect_list(mut list: LinkedList) -> Vec { - let mut result = Vec::new(); - list.drain(|node| { - result.push(**node); - }); - result - } - - fn collect_reverse_list(mut list: LinkedList) -> Vec { - let mut result = Vec::new(); - list.reverse_drain(|node| { - result.push(**node); - }); - result - } - - unsafe fn add_nodes(list: &mut LinkedList, nodes: &mut [&mut ListNode]) { - for node in nodes.iter_mut() { - list.add_front(node); - } - } - - unsafe fn assert_clean(node: &mut ListNode) { - assert!(node.next.is_none()); - assert!(node.prev.is_none()); - } - - #[test] - fn insert_and_iterate() { - unsafe { - let mut a = ListNode::new(5); - let mut b = ListNode::new(7); - let mut c = ListNode::new(31); - - let mut setup = |list: &mut LinkedList| { - assert_eq!(true, list.is_empty()); - list.add_front(&mut c); - assert_eq!(31, **list.peek_first().unwrap()); - assert_eq!(false, list.is_empty()); - list.add_front(&mut b); - assert_eq!(7, **list.peek_first().unwrap()); - list.add_front(&mut a); - assert_eq!(5, **list.peek_first().unwrap()); - }; - - let mut list = LinkedList::new(); - setup(&mut list); - let items: Vec = collect_list(list); - assert_eq!([5, 7, 31].to_vec(), items); - - let mut list = LinkedList::new(); - setup(&mut list); - let items: Vec = collect_reverse_list(list); - assert_eq!([31, 7, 5].to_vec(), items); - } - } - - #[test] - fn add_sorted() { - unsafe { - let mut a = ListNode::new(5); - let mut b = ListNode::new(7); - let mut c = ListNode::new(31); - let mut d = ListNode::new(99); - - let mut list = LinkedList::new(); - list.add_sorted(&mut a); - let items: Vec = collect_list(list); - assert_eq!([5].to_vec(), items); - - let mut list = LinkedList::new(); - list.add_sorted(&mut a); - let items: Vec = collect_reverse_list(list); - assert_eq!([5].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut d, &mut c, &mut b]); - list.add_sorted(&mut a); - let items: Vec = collect_list(list); - assert_eq!([5, 7, 31, 99].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut d, &mut c, &mut b]); - list.add_sorted(&mut a); - let items: Vec = collect_reverse_list(list); - assert_eq!([99, 31, 7, 5].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut d, &mut c, &mut a]); - list.add_sorted(&mut b); - let items: Vec = collect_list(list); - assert_eq!([5, 7, 31, 99].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut d, &mut c, &mut a]); - list.add_sorted(&mut b); - let items: Vec = collect_reverse_list(list); - assert_eq!([99, 31, 7, 5].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut d, &mut b, &mut a]); - list.add_sorted(&mut c); - let items: Vec = collect_list(list); - assert_eq!([5, 7, 31, 99].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut d, &mut b, &mut a]); - list.add_sorted(&mut c); - let items: Vec = collect_reverse_list(list); - assert_eq!([99, 31, 7, 5].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - list.add_sorted(&mut d); - let items: Vec = collect_list(list); - assert_eq!([5, 7, 31, 99].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - list.add_sorted(&mut d); - let items: Vec = collect_reverse_list(list); - assert_eq!([99, 31, 7, 5].to_vec(), items); - } - } - - #[test] - fn drain_and_collect() { - unsafe { - let mut a = ListNode::new(5); - let mut b = ListNode::new(7); - let mut c = ListNode::new(31); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - - let taken_items: Vec = collect_list(list); - assert_eq!([5, 7, 31].to_vec(), taken_items); - } - } - - #[test] - fn peek_last() { - unsafe { - let mut a = ListNode::new(5); - let mut b = ListNode::new(7); - let mut c = ListNode::new(31); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - - let last = list.peek_last(); - assert_eq!(31, **last.unwrap()); - list.remove_last(); - - let last = list.peek_last(); - assert_eq!(7, **last.unwrap()); - list.remove_last(); - - let last = list.peek_last(); - assert_eq!(5, **last.unwrap()); - list.remove_last(); - - let last = list.peek_last(); - assert!(last.is_none()); - } - } - - #[test] - fn remove_first() { - unsafe { - // We iterate forward and backwards through the manipulated lists - // to make sure pointers in both directions are still ok. - let mut a = ListNode::new(5); - let mut b = ListNode::new(7); - let mut c = ListNode::new(31); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - let removed = list.remove_first().unwrap(); - assert_clean(removed); - assert!(!list.is_empty()); - let items: Vec = collect_list(list); - assert_eq!([7, 31].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - let removed = list.remove_first().unwrap(); - assert_clean(removed); - assert!(!list.is_empty()); - let items: Vec = collect_reverse_list(list); - assert_eq!([31, 7].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut b, &mut a]); - let removed = list.remove_first().unwrap(); - assert_clean(removed); - assert!(!list.is_empty()); - let items: Vec = collect_list(list); - assert_eq!([7].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut b, &mut a]); - let removed = list.remove_first().unwrap(); - assert_clean(removed); - assert!(!list.is_empty()); - let items: Vec = collect_reverse_list(list); - assert_eq!([7].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut a]); - let removed = list.remove_first().unwrap(); - assert_clean(removed); - assert!(list.is_empty()); - let items: Vec = collect_list(list); - assert!(items.is_empty()); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut a]); - let removed = list.remove_first().unwrap(); - assert_clean(removed); - assert!(list.is_empty()); - let items: Vec = collect_reverse_list(list); - assert!(items.is_empty()); - } - } - - #[test] - fn remove_last() { - unsafe { - // We iterate forward and backwards through the manipulated lists - // to make sure pointers in both directions are still ok. - let mut a = ListNode::new(5); - let mut b = ListNode::new(7); - let mut c = ListNode::new(31); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - let removed = list.remove_last().unwrap(); - assert_clean(removed); - assert!(!list.is_empty()); - let items: Vec = collect_list(list); - assert_eq!([5, 7].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - let removed = list.remove_last().unwrap(); - assert_clean(removed); - assert!(!list.is_empty()); - let items: Vec = collect_reverse_list(list); - assert_eq!([7, 5].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut b, &mut a]); - let removed = list.remove_last().unwrap(); - assert_clean(removed); - assert!(!list.is_empty()); - let items: Vec = collect_list(list); - assert_eq!([5].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut b, &mut a]); - let removed = list.remove_last().unwrap(); - assert_clean(removed); - assert!(!list.is_empty()); - let items: Vec = collect_reverse_list(list); - assert_eq!([5].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut a]); - let removed = list.remove_last().unwrap(); - assert_clean(removed); - assert!(list.is_empty()); - let items: Vec = collect_list(list); - assert!(items.is_empty()); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut a]); - let removed = list.remove_last().unwrap(); - assert_clean(removed); - assert!(list.is_empty()); - let items: Vec = collect_reverse_list(list); - assert!(items.is_empty()); - } - } - - #[test] - fn remove_by_address() { - unsafe { - let mut a = ListNode::new(5); - let mut b = ListNode::new(7); - let mut c = ListNode::new(31); - - { - // Remove first - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - assert_eq!(true, list.remove(&mut a)); - assert_clean((&mut a).into()); - // a should be no longer there and can't be removed twice - assert_eq!(false, list.remove(&mut a)); - assert_eq!(Some((&mut b).into()), list.head); - assert_eq!(Some((&mut c).into()), b.next); - assert_eq!(Some((&mut b).into()), c.prev); - let items: Vec = collect_list(list); - assert_eq!([7, 31].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - assert_eq!(true, list.remove(&mut a)); - assert_clean((&mut a).into()); - // a should be no longer there and can't be removed twice - assert_eq!(false, list.remove(&mut a)); - assert_eq!(Some((&mut c).into()), b.next); - assert_eq!(Some((&mut b).into()), c.prev); - let items: Vec = collect_reverse_list(list); - assert_eq!([31, 7].to_vec(), items); - } - - { - // Remove middle - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - assert_eq!(true, list.remove(&mut b)); - assert_clean((&mut b).into()); - assert_eq!(Some((&mut c).into()), a.next); - assert_eq!(Some((&mut a).into()), c.prev); - let items: Vec = collect_list(list); - assert_eq!([5, 31].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - assert_eq!(true, list.remove(&mut b)); - assert_clean((&mut b).into()); - assert_eq!(Some((&mut c).into()), a.next); - assert_eq!(Some((&mut a).into()), c.prev); - let items: Vec = collect_reverse_list(list); - assert_eq!([31, 5].to_vec(), items); - } - - { - // Remove last - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - assert_eq!(true, list.remove(&mut c)); - assert_clean((&mut c).into()); - assert!(b.next.is_none()); - assert_eq!(Some((&mut b).into()), list.tail); - let items: Vec = collect_list(list); - assert_eq!([5, 7].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - assert_eq!(true, list.remove(&mut c)); - assert_clean((&mut c).into()); - assert!(b.next.is_none()); - assert_eq!(Some((&mut b).into()), list.tail); - let items: Vec = collect_reverse_list(list); - assert_eq!([7, 5].to_vec(), items); - } - - { - // Remove first of two - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut b, &mut a]); - assert_eq!(true, list.remove(&mut a)); - assert_clean((&mut a).into()); - // a should be no longer there and can't be removed twice - assert_eq!(false, list.remove(&mut a)); - assert_eq!(Some((&mut b).into()), list.head); - assert_eq!(Some((&mut b).into()), list.tail); - assert!(b.next.is_none()); - assert!(b.prev.is_none()); - let items: Vec = collect_list(list); - assert_eq!([7].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut b, &mut a]); - assert_eq!(true, list.remove(&mut a)); - assert_clean((&mut a).into()); - // a should be no longer there and can't be removed twice - assert_eq!(false, list.remove(&mut a)); - assert_eq!(Some((&mut b).into()), list.head); - assert_eq!(Some((&mut b).into()), list.tail); - assert!(b.next.is_none()); - assert!(b.prev.is_none()); - let items: Vec = collect_reverse_list(list); - assert_eq!([7].to_vec(), items); - } - - { - // Remove last of two - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut b, &mut a]); - assert_eq!(true, list.remove(&mut b)); - assert_clean((&mut b).into()); - assert_eq!(Some((&mut a).into()), list.head); - assert_eq!(Some((&mut a).into()), list.tail); - assert!(a.next.is_none()); - assert!(a.prev.is_none()); - let items: Vec = collect_list(list); - assert_eq!([5].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut b, &mut a]); - assert_eq!(true, list.remove(&mut b)); - assert_clean((&mut b).into()); - assert_eq!(Some((&mut a).into()), list.head); - assert_eq!(Some((&mut a).into()), list.tail); - assert!(a.next.is_none()); - assert!(a.prev.is_none()); - let items: Vec = collect_reverse_list(list); - assert_eq!([5].to_vec(), items); - } - - { - // Remove last item - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut a]); - assert_eq!(true, list.remove(&mut a)); - assert_clean((&mut a).into()); - assert!(list.head.is_none()); - assert!(list.tail.is_none()); - let items: Vec = collect_list(list); - assert!(items.is_empty()); - } - - { - // Remove missing - let mut list = LinkedList::new(); - list.add_front(&mut b); - list.add_front(&mut a); - assert_eq!(false, list.remove(&mut c)); - } - } - } -} diff --git a/tokio-util/src/sync/mod.rs b/tokio-util/src/sync/mod.rs index 0b78a156cf3..bcdc99cb959 100644 --- a/tokio-util/src/sync/mod.rs +++ b/tokio-util/src/sync/mod.rs @@ -3,8 +3,6 @@ mod cancellation_token; pub use cancellation_token::{guard::DropGuard, CancellationToken, WaitForCancellationFuture}; -mod intrusive_double_linked_list; - mod mpsc; pub use mpsc::PollSender; diff --git a/tokio-util/tests/sync_cancellation_token.rs b/tokio-util/tests/sync_cancellation_token.rs index 4d86f2c46ce..28ba284b6c2 100644 --- a/tokio-util/tests/sync_cancellation_token.rs +++ b/tokio-util/tests/sync_cancellation_token.rs @@ -1,7 +1,7 @@ #![warn(rust_2018_idioms)] use tokio::pin; -use tokio_util::sync::CancellationToken; +use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; use core::future::Future; use core::task::{Context, Poll}; @@ -77,6 +77,46 @@ fn cancel_child_token_through_parent() { ); } +#[test] +fn cancel_grandchild_token_through_parent_if_child_was_dropped() { + let (waker, wake_counter) = new_count_waker(); + let token = CancellationToken::new(); + + let intermediate_token = token.child_token(); + let child_token = intermediate_token.child_token(); + drop(intermediate_token); + assert!(!child_token.is_cancelled()); + + let child_fut = child_token.cancelled(); + pin!(child_fut); + let parent_fut = token.cancelled(); + pin!(parent_fut); + + assert_eq!( + Poll::Pending, + child_fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Pending, + parent_fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!(wake_counter, 0); + + token.cancel(); + assert_eq!(wake_counter, 2); + assert!(token.is_cancelled()); + assert!(child_token.is_cancelled()); + + assert_eq!( + Poll::Ready(()), + child_fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Ready(()), + parent_fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); +} + #[test] fn cancel_child_token_without_parent() { let (waker, wake_counter) = new_count_waker(); @@ -206,6 +246,134 @@ fn drop_multiple_child_tokens() { } } +#[test] +fn cancel_only_all_descendants() { + // ARRANGE + let (waker, wake_counter) = new_count_waker(); + + let parent_token = CancellationToken::new(); + let token = parent_token.child_token(); + let sibling_token = parent_token.child_token(); + let child1_token = token.child_token(); + let child2_token = token.child_token(); + let grandchild_token = child1_token.child_token(); + let grandchild2_token = child1_token.child_token(); + let grandgrandchild_token = grandchild_token.child_token(); + + assert!(!parent_token.is_cancelled()); + assert!(!token.is_cancelled()); + assert!(!sibling_token.is_cancelled()); + assert!(!child1_token.is_cancelled()); + assert!(!child2_token.is_cancelled()); + assert!(!grandchild_token.is_cancelled()); + assert!(!grandchild2_token.is_cancelled()); + assert!(!grandgrandchild_token.is_cancelled()); + + let parent_fut = parent_token.cancelled(); + let fut = token.cancelled(); + let sibling_fut = sibling_token.cancelled(); + let child1_fut = child1_token.cancelled(); + let child2_fut = child2_token.cancelled(); + let grandchild_fut = grandchild_token.cancelled(); + let grandchild2_fut = grandchild2_token.cancelled(); + let grandgrandchild_fut = grandgrandchild_token.cancelled(); + + pin!(parent_fut); + pin!(fut); + pin!(sibling_fut); + pin!(child1_fut); + pin!(child2_fut); + pin!(grandchild_fut); + pin!(grandchild2_fut); + pin!(grandgrandchild_fut); + + assert_eq!( + Poll::Pending, + parent_fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Pending, + fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Pending, + sibling_fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Pending, + child1_fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Pending, + child2_fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Pending, + grandchild_fut + .as_mut() + .poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Pending, + grandchild2_fut + .as_mut() + .poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Pending, + grandgrandchild_fut + .as_mut() + .poll(&mut Context::from_waker(&waker)) + ); + assert_eq!(wake_counter, 0); + + // ACT + token.cancel(); + + // ASSERT + assert_eq!(wake_counter, 6); + assert!(!parent_token.is_cancelled()); + assert!(token.is_cancelled()); + assert!(!sibling_token.is_cancelled()); + assert!(child1_token.is_cancelled()); + assert!(child2_token.is_cancelled()); + assert!(grandchild_token.is_cancelled()); + assert!(grandchild2_token.is_cancelled()); + assert!(grandgrandchild_token.is_cancelled()); + + assert_eq!( + Poll::Ready(()), + fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Ready(()), + child1_fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Ready(()), + child2_fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Ready(()), + grandchild_fut + .as_mut() + .poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Ready(()), + grandchild2_fut + .as_mut() + .poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Ready(()), + grandgrandchild_fut + .as_mut() + .poll(&mut Context::from_waker(&waker)) + ); + assert_eq!(wake_counter, 6); +} + #[test] fn drop_parent_before_child_tokens() { let token = CancellationToken::new(); @@ -218,3 +386,15 @@ fn drop_parent_before_child_tokens() { drop(child1); drop(child2); } + +#[test] +fn derives_send_sync() { + fn assert_send() {} + fn assert_sync() {} + + assert_send::(); + assert_sync::(); + + assert_send::>(); + assert_sync::>(); +} From 2659adf5fe19de84adb9dc96362c886c0bea7aa8 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Sat, 14 May 2022 17:15:15 +0200 Subject: [PATCH 3/4] chore: prepare tokio-util 0.6.10 (#4691) --- tokio-util/CHANGELOG.md | 6 ++++++ tokio-util/Cargo.toml | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/tokio-util/CHANGELOG.md b/tokio-util/CHANGELOG.md index 3dd100c3032..5801fc2fbc3 100644 --- a/tokio-util/CHANGELOG.md +++ b/tokio-util/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.6.10 (May 14, 2021) + +This is a backport for a memory leak in `CancellationToken`. ([#4652]) + +[#4652]: https://github.com/tokio-rs/tokio/pull/4652 + # 0.6.9 (October 29, 2021) ### Added diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index d61ec1e33c5..939b2a9c4f9 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -6,7 +6,7 @@ name = "tokio-util" # - Cargo.toml # - Update CHANGELOG.md. # - Create "tokio-util-0.6.x" git tag. -version = "0.6.9" +version = "0.6.10" edition = "2018" authors = ["Tokio Contributors "] license = "MIT" From cdb132d333c9f8d20f6f4daef0bd72a67ce83f5b Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Sat, 14 May 2022 20:59:52 +0200 Subject: [PATCH 4/4] Revert "chore: disable warnings in old CI (#4691)" This reverts commit b24df49a9d26057b44a869c6effce494ec10dd35 so we can merge the CHANGELOG changes in #4691 into master. --- .circleci/config.yml | 25 ------- .cirrus.yml | 4 +- .github/workflows/ci.yml | 14 ++-- .github/workflows/loom.yml | 3 +- Cargo.toml | 1 + tests-build/Cargo.toml | 16 +++++ tests-build/README.md | 2 + tests-build/src/lib.rs | 2 + .../tests/fail/macros_core_no_default.rs | 6 ++ .../tests/fail/macros_core_no_default.stderr | 7 ++ tests-build/tests/fail/macros_dead_code.rs | 8 +++ .../tests/fail/macros_dead_code.stderr | 11 +++ .../tests/fail/macros_invalid_input.rs | 40 +++++++++++ .../tests/fail/macros_invalid_input.stderr | 71 +++++++++++++++++++ .../tests/fail/macros_type_mismatch.rs | 26 +++++++ .../tests/fail/macros_type_mismatch.stderr | 45 ++++++++++++ tests-build/tests/macros.rs | 27 +++++++ tests-build/tests/macros_clippy.rs | 7 ++ .../tests/pass/forward_args_and_output.rs | 13 ++++ tests-build/tests/pass/macros_main_loop.rs | 14 ++++ tests-build/tests/pass/macros_main_return.rs | 6 ++ tokio/src/time/driver/sleep.rs | 12 ++-- 22 files changed, 323 insertions(+), 37 deletions(-) delete mode 100644 .circleci/config.yml create mode 100644 tests-build/Cargo.toml create mode 100644 tests-build/README.md create mode 100644 tests-build/src/lib.rs create mode 100644 tests-build/tests/fail/macros_core_no_default.rs create mode 100644 tests-build/tests/fail/macros_core_no_default.stderr create mode 100644 tests-build/tests/fail/macros_dead_code.rs create mode 100644 tests-build/tests/fail/macros_dead_code.stderr create mode 100644 tests-build/tests/fail/macros_invalid_input.rs create mode 100644 tests-build/tests/fail/macros_invalid_input.stderr create mode 100644 tests-build/tests/fail/macros_type_mismatch.rs create mode 100644 tests-build/tests/fail/macros_type_mismatch.stderr create mode 100644 tests-build/tests/macros.rs create mode 100644 tests-build/tests/macros_clippy.rs create mode 100644 tests-build/tests/pass/forward_args_and_output.rs create mode 100644 tests-build/tests/pass/macros_main_loop.rs create mode 100644 tests-build/tests/pass/macros_main_return.rs diff --git a/.circleci/config.yml b/.circleci/config.yml deleted file mode 100644 index 97670e93784..00000000000 --- a/.circleci/config.yml +++ /dev/null @@ -1,25 +0,0 @@ -version: 2.1 -jobs: - test-arm: - machine: - image: ubuntu-2004:202101-01 - resource_class: arm.medium - environment: - # Change to pin rust versino - RUST_STABLE: stable - steps: - - checkout - - run: - name: Install Rust - command: | - curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs -o rustup.sh - chmod +x rustup.sh - ./rustup.sh -y --default-toolchain $RUST_STABLE - source "$HOME"/.cargo/env - # Only run Tokio tests - - run: cargo test --all-features -p tokio - -workflows: - ci: - jobs: - - test-arm diff --git a/.cirrus.yml b/.cirrus.yml index 390f7883fef..4bef869c24f 100644 --- a/.cirrus.yml +++ b/.cirrus.yml @@ -1,5 +1,7 @@ freebsd_instance: image: freebsd-12-2-release-amd64 +env: + RUSTFLAGS: -D warnings # Test FreeBSD in a full VM on cirrus-ci.com. Test the i686 target too, in the # same VM. The binary will be built in 32-bit mode, but will execute on a @@ -23,7 +25,7 @@ task: name: FreeBSD docs env: RUSTFLAGS: --cfg docsrs - RUSTDOCFLAGS: --cfg docsrs + RUSTDOCFLAGS: --cfg docsrs -Dwarnings setup_script: - pkg install -y bash curl - curl https://sh.rustup.rs -sSf --output rustup.sh diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 862dab192b6..05f6e87f330 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,6 +7,7 @@ on: name: CI env: + RUSTFLAGS: -Dwarnings RUST_BACKTRACE: 1 nightly: nightly-2021-10-25 minrust: 1.45.2 @@ -64,6 +65,11 @@ jobs: run: cargo hack test --each-feature working-directory: tests-integration + # Run macro build tests + - name: test tests-build --each-feature + run: cargo hack test --each-feature + working-directory: tests-build + # Build benchmarks. Run of benchmarks is done by bench.yml workflow. - name: build benches run: cargo build --benches @@ -123,7 +129,7 @@ jobs: run: cargo test --all-features working-directory: tokio env: - RUSTFLAGS: --cfg tokio_unstable + RUSTFLAGS: --cfg tokio_unstable -Dwarnings miri: name: miri @@ -207,7 +213,7 @@ jobs: - name: check --each-feature --unstable run: cargo hack check --all --each-feature -Z avoid-dev-deps env: - RUSTFLAGS: --cfg tokio_unstable + RUSTFLAGS: --cfg tokio_unstable -Dwarnings minrust: name: minrust @@ -292,7 +298,7 @@ jobs: run: cargo doc --lib --no-deps --all-features --document-private-items env: RUSTFLAGS: --cfg docsrs - RUSTDOCFLAGS: --cfg docsrs + RUSTDOCFLAGS: --cfg docsrs -Dwarnings loom-compile: name: build loom tests @@ -306,7 +312,7 @@ jobs: run: cargo test --no-run --lib --features full working-directory: tokio env: - RUSTFLAGS: --cfg loom --cfg tokio_unstable + RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings check-readme: name: Check README diff --git a/.github/workflows/loom.yml b/.github/workflows/loom.yml index 088828a1b51..fde9f1114e1 100644 --- a/.github/workflows/loom.yml +++ b/.github/workflows/loom.yml @@ -8,6 +8,7 @@ on: name: Loom env: + RUSTFLAGS: -Dwarnings RUST_BACKTRACE: 1 jobs: @@ -34,6 +35,6 @@ jobs: run: cargo test --lib --release --features full -- --nocapture $SCOPE working-directory: tokio env: - RUSTFLAGS: --cfg loom --cfg tokio_unstable + RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings LOOM_MAX_PREEMPTIONS: 2 SCOPE: ${{ matrix.scope }} diff --git a/Cargo.toml b/Cargo.toml index 0ed9192a249..bc01f186281 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,5 +11,6 @@ members = [ "benches", "examples", "stress-test", + "tests-build", "tests-integration", ] diff --git a/tests-build/Cargo.toml b/tests-build/Cargo.toml new file mode 100644 index 00000000000..299af0cf417 --- /dev/null +++ b/tests-build/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "tests-build" +version = "0.1.0" +authors = ["Tokio Contributors "] +edition = "2018" +publish = false + +[features] +full = ["tokio/full"] +rt = ["tokio/rt", "tokio/macros"] + +[dependencies] +tokio = { path = "../tokio", optional = true } + +[dev-dependencies] +trybuild = "1.0" diff --git a/tests-build/README.md b/tests-build/README.md new file mode 100644 index 00000000000..f491e2bc377 --- /dev/null +++ b/tests-build/README.md @@ -0,0 +1,2 @@ +Tests the various combination of feature flags. This is broken out to a separate +crate to work around limitations with cargo features. diff --git a/tests-build/src/lib.rs b/tests-build/src/lib.rs new file mode 100644 index 00000000000..7b019cc7c26 --- /dev/null +++ b/tests-build/src/lib.rs @@ -0,0 +1,2 @@ +#[cfg(feature = "tokio")] +pub use tokio; diff --git a/tests-build/tests/fail/macros_core_no_default.rs b/tests-build/tests/fail/macros_core_no_default.rs new file mode 100644 index 00000000000..23f8847df7d --- /dev/null +++ b/tests-build/tests/fail/macros_core_no_default.rs @@ -0,0 +1,6 @@ +use tests_build::tokio; + +#[tokio::main] +async fn my_fn() {} + +fn main() {} diff --git a/tests-build/tests/fail/macros_core_no_default.stderr b/tests-build/tests/fail/macros_core_no_default.stderr new file mode 100644 index 00000000000..676acc8dbe3 --- /dev/null +++ b/tests-build/tests/fail/macros_core_no_default.stderr @@ -0,0 +1,7 @@ +error: The default runtime flavor is `multi_thread`, but the `rt-multi-thread` feature is disabled. + --> $DIR/macros_core_no_default.rs:3:1 + | +3 | #[tokio::main] + | ^^^^^^^^^^^^^^ + | + = note: this error originates in the attribute macro `tokio::main` (in Nightly builds, run with -Z macro-backtrace for more info) diff --git a/tests-build/tests/fail/macros_dead_code.rs b/tests-build/tests/fail/macros_dead_code.rs new file mode 100644 index 00000000000..f2ada6f835d --- /dev/null +++ b/tests-build/tests/fail/macros_dead_code.rs @@ -0,0 +1,8 @@ +#![deny(dead_code)] + +use tests_build::tokio; + +#[tokio::main] +async fn f() {} + +fn main() {} diff --git a/tests-build/tests/fail/macros_dead_code.stderr b/tests-build/tests/fail/macros_dead_code.stderr new file mode 100644 index 00000000000..816c294bd31 --- /dev/null +++ b/tests-build/tests/fail/macros_dead_code.stderr @@ -0,0 +1,11 @@ +error: function is never used: `f` + --> $DIR/macros_dead_code.rs:6:10 + | +6 | async fn f() {} + | ^ + | +note: the lint level is defined here + --> $DIR/macros_dead_code.rs:1:9 + | +1 | #![deny(dead_code)] + | ^^^^^^^^^ diff --git a/tests-build/tests/fail/macros_invalid_input.rs b/tests-build/tests/fail/macros_invalid_input.rs new file mode 100644 index 00000000000..eb04eca76b6 --- /dev/null +++ b/tests-build/tests/fail/macros_invalid_input.rs @@ -0,0 +1,40 @@ +use tests_build::tokio; + +#[tokio::main] +fn main_is_not_async() {} + +#[tokio::main(foo)] +async fn main_attr_has_unknown_args() {} + +#[tokio::main(threadpool::bar)] +async fn main_attr_has_path_args() {} + +#[tokio::test] +fn test_is_not_async() {} + +#[tokio::test(foo)] +async fn test_attr_has_args() {} + +#[tokio::test(foo = 123)] +async fn test_unexpected_attr() {} + +#[tokio::test(flavor = 123)] +async fn test_flavor_not_string() {} + +#[tokio::test(flavor = "foo")] +async fn test_unknown_flavor() {} + +#[tokio::test(flavor = "multi_thread", start_paused = false)] +async fn test_multi_thread_with_start_paused() {} + +#[tokio::test(flavor = "multi_thread", worker_threads = "foo")] +async fn test_worker_threads_not_int() {} + +#[tokio::test(flavor = "current_thread", worker_threads = 4)] +async fn test_worker_threads_and_current_thread() {} + +#[tokio::test] +#[test] +async fn test_has_second_test_attr() {} + +fn main() {} diff --git a/tests-build/tests/fail/macros_invalid_input.stderr b/tests-build/tests/fail/macros_invalid_input.stderr new file mode 100644 index 00000000000..11337a94fe5 --- /dev/null +++ b/tests-build/tests/fail/macros_invalid_input.stderr @@ -0,0 +1,71 @@ +error: the `async` keyword is missing from the function declaration + --> $DIR/macros_invalid_input.rs:4:1 + | +4 | fn main_is_not_async() {} + | ^^ + +error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`, `start_paused` + --> $DIR/macros_invalid_input.rs:6:15 + | +6 | #[tokio::main(foo)] + | ^^^ + +error: Must have specified ident + --> $DIR/macros_invalid_input.rs:9:15 + | +9 | #[tokio::main(threadpool::bar)] + | ^^^^^^^^^^^^^^^ + +error: the `async` keyword is missing from the function declaration + --> $DIR/macros_invalid_input.rs:13:1 + | +13 | fn test_is_not_async() {} + | ^^ + +error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`, `start_paused` + --> $DIR/macros_invalid_input.rs:15:15 + | +15 | #[tokio::test(foo)] + | ^^^ + +error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`, `start_paused` + --> $DIR/macros_invalid_input.rs:18:15 + | +18 | #[tokio::test(foo = 123)] + | ^^^^^^^^^ + +error: Failed to parse value of `flavor` as string. + --> $DIR/macros_invalid_input.rs:21:24 + | +21 | #[tokio::test(flavor = 123)] + | ^^^ + +error: No such runtime flavor `foo`. The runtime flavors are `current_thread` and `multi_thread`. + --> $DIR/macros_invalid_input.rs:24:24 + | +24 | #[tokio::test(flavor = "foo")] + | ^^^^^ + +error: The `start_paused` option requires the `current_thread` runtime flavor. Use `#[tokio::test(flavor = "current_thread")]` + --> $DIR/macros_invalid_input.rs:27:55 + | +27 | #[tokio::test(flavor = "multi_thread", start_paused = false)] + | ^^^^^ + +error: Failed to parse value of `worker_threads` as integer. + --> $DIR/macros_invalid_input.rs:30:57 + | +30 | #[tokio::test(flavor = "multi_thread", worker_threads = "foo")] + | ^^^^^ + +error: The `worker_threads` option requires the `multi_thread` runtime flavor. Use `#[tokio::test(flavor = "multi_thread")]` + --> $DIR/macros_invalid_input.rs:33:59 + | +33 | #[tokio::test(flavor = "current_thread", worker_threads = 4)] + | ^ + +error: second test attribute is supplied + --> $DIR/macros_invalid_input.rs:37:1 + | +37 | #[test] + | ^^^^^^^ diff --git a/tests-build/tests/fail/macros_type_mismatch.rs b/tests-build/tests/fail/macros_type_mismatch.rs new file mode 100644 index 00000000000..0a5b9c4c727 --- /dev/null +++ b/tests-build/tests/fail/macros_type_mismatch.rs @@ -0,0 +1,26 @@ +use tests_build::tokio; + +#[tokio::main] +async fn missing_semicolon_or_return_type() { + Ok(()) +} + +#[tokio::main] +async fn missing_return_type() { + return Ok(()); +} + +#[tokio::main] +async fn extra_semicolon() -> Result<(), ()> { + /* TODO(taiki-e): help message still wrong + help: try using a variant of the expected enum + | + 23 | Ok(Ok(());) + | + 23 | Err(Ok(());) + | + */ + Ok(()); +} + +fn main() {} diff --git a/tests-build/tests/fail/macros_type_mismatch.stderr b/tests-build/tests/fail/macros_type_mismatch.stderr new file mode 100644 index 00000000000..a8fa99bc63b --- /dev/null +++ b/tests-build/tests/fail/macros_type_mismatch.stderr @@ -0,0 +1,45 @@ +error[E0308]: mismatched types + --> $DIR/macros_type_mismatch.rs:5:5 + | +5 | Ok(()) + | ^^^^^^ expected `()`, found enum `Result` + | + = note: expected unit type `()` + found enum `Result<(), _>` +help: consider using a semicolon here + | +5 | Ok(()); + | + +help: try adding a return type + | +4 | async fn missing_semicolon_or_return_type() -> Result<(), _> { + | ++++++++++++++++ + +error[E0308]: mismatched types + --> $DIR/macros_type_mismatch.rs:10:5 + | +9 | async fn missing_return_type() { + | - help: try adding a return type: `-> Result<(), _>` +10 | return Ok(()); + | ^^^^^^^^^^^^^^ expected `()`, found enum `Result` + | + = note: expected unit type `()` + found enum `Result<(), _>` + +error[E0308]: mismatched types + --> $DIR/macros_type_mismatch.rs:23:5 + | +14 | async fn extra_semicolon() -> Result<(), ()> { + | -------------- expected `Result<(), ()>` because of return type +... +23 | Ok(()); + | ^^^^^^^ expected enum `Result`, found `()` + | + = note: expected enum `Result<(), ()>` + found unit type `()` +help: try using a variant of the expected enum + | +23 | Ok(Ok(());) + | +23 | Err(Ok(());) + | diff --git a/tests-build/tests/macros.rs b/tests-build/tests/macros.rs new file mode 100644 index 00000000000..0a180dfb74f --- /dev/null +++ b/tests-build/tests/macros.rs @@ -0,0 +1,27 @@ +#[test] +fn compile_fail_full() { + let t = trybuild::TestCases::new(); + + #[cfg(feature = "full")] + t.pass("tests/pass/forward_args_and_output.rs"); + + #[cfg(feature = "full")] + t.pass("tests/pass/macros_main_return.rs"); + + #[cfg(feature = "full")] + t.pass("tests/pass/macros_main_loop.rs"); + + #[cfg(feature = "full")] + t.compile_fail("tests/fail/macros_invalid_input.rs"); + + #[cfg(feature = "full")] + t.compile_fail("tests/fail/macros_dead_code.rs"); + + #[cfg(feature = "full")] + t.compile_fail("tests/fail/macros_type_mismatch.rs"); + + #[cfg(all(feature = "rt", not(feature = "full")))] + t.compile_fail("tests/fail/macros_core_no_default.rs"); + + drop(t); +} diff --git a/tests-build/tests/macros_clippy.rs b/tests-build/tests/macros_clippy.rs new file mode 100644 index 00000000000..0f3f4bb0b8b --- /dev/null +++ b/tests-build/tests/macros_clippy.rs @@ -0,0 +1,7 @@ +#[cfg(feature = "full")] +#[tokio::test] +async fn test_with_semicolon_without_return_type() { + #![deny(clippy::semicolon_if_nothing_returned)] + + dbg!(0); +} diff --git a/tests-build/tests/pass/forward_args_and_output.rs b/tests-build/tests/pass/forward_args_and_output.rs new file mode 100644 index 00000000000..e6ef1f88c1e --- /dev/null +++ b/tests-build/tests/pass/forward_args_and_output.rs @@ -0,0 +1,13 @@ +use tests_build::tokio; + +fn main() {} + +// arguments and output type is forwarded so other macros can access them + +#[tokio::test] +async fn test_fn_has_args(_x: u8) {} + +#[tokio::test] +async fn test_has_output() -> Result<(), Box> { + Ok(()) +} diff --git a/tests-build/tests/pass/macros_main_loop.rs b/tests-build/tests/pass/macros_main_loop.rs new file mode 100644 index 00000000000..d7d51982c36 --- /dev/null +++ b/tests-build/tests/pass/macros_main_loop.rs @@ -0,0 +1,14 @@ +use tests_build::tokio; + +#[tokio::main] +async fn main() -> Result<(), ()> { + loop { + if !never() { + return Ok(()); + } + } +} + +fn never() -> bool { + std::time::Instant::now() > std::time::Instant::now() +} diff --git a/tests-build/tests/pass/macros_main_return.rs b/tests-build/tests/pass/macros_main_return.rs new file mode 100644 index 00000000000..d4d34ec26d3 --- /dev/null +++ b/tests-build/tests/pass/macros_main_return.rs @@ -0,0 +1,6 @@ +use tests_build::tokio; + +#[tokio::main] +async fn main() -> Result<(), ()> { + return Ok(()); +} diff --git a/tokio/src/time/driver/sleep.rs b/tokio/src/time/driver/sleep.rs index c0571ca2ab9..43ff694ffc6 100644 --- a/tokio/src/time/driver/sleep.rs +++ b/tokio/src/time/driver/sleep.rs @@ -324,8 +324,8 @@ impl Sleep { } fn reset_inner(self: Pin<&mut Self>, deadline: Instant) { - let mut me = self.project(); - me.entry.as_mut().reset(deadline); + let me = self.project(); + me.entry.reset(deadline); (*me.inner).deadline = deadline; #[cfg(all(tokio_unstable, feature = "tracing"))] @@ -349,12 +349,12 @@ impl Sleep { cfg_not_trace! { fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - let mut me = self.project(); + let me = self.project(); // Keep track of task budget let coop = ready!(crate::coop::poll_proceed(cx)); - me.entry.as_mut().poll_elapsed(cx).map(move |r| { + me.entry.poll_elapsed(cx).map(move |r| { coop.made_progress(); r }) @@ -363,7 +363,7 @@ impl Sleep { cfg_trace! { fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - let mut me = self.project(); + let me = self.project(); // Keep track of task budget let coop = ready!(trace_poll_op!( "poll_elapsed", @@ -371,7 +371,7 @@ impl Sleep { me.inner.resource_span.id(), )); - let result = me.entry.as_mut().poll_elapsed(cx).map(move |r| { + let result = me.entry.poll_elapsed(cx).map(move |r| { coop.made_progress(); r });