diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 676b0e2ec94..d61ec1e33c5 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -36,7 +36,7 @@ rt = ["tokio/rt"] __docs_rs = ["futures-util"] [dependencies] -tokio = { version = "1.0.0", path = "../tokio", features = ["sync"] } +tokio = { version = "1.7.0", path = "../tokio", features = ["sync"] } bytes = "1.0.0" futures-core = "0.3.0" diff --git a/tokio-util/src/sync/cancellation_token.rs b/tokio-util/src/sync/cancellation_token.rs index f193f6bc32f..2a6ef392bd4 100644 --- a/tokio-util/src/sync/cancellation_token.rs +++ b/tokio-util/src/sync/cancellation_token.rs @@ -1,18 +1,15 @@ //! An asynchronously awaitable `CancellationToken`. //! The token allows to signal a cancellation request to one or more tasks. pub(crate) mod guard; +mod tree_node; -use crate::loom::sync::atomic::AtomicUsize; -use crate::loom::sync::Mutex; -use crate::sync::intrusive_double_linked_list::{LinkedList, ListNode}; - +use crate::loom::sync::Arc; use core::future::Future; use core::pin::Pin; -use core::ptr::NonNull; -use core::sync::atomic::Ordering; -use core::task::{Context, Poll, Waker}; +use core::task::{Context, Poll}; use guard::DropGuard; +use pin_project_lite::pin_project; /// A token which can be used to signal a cancellation request to one or more /// tasks. @@ -24,9 +21,9 @@ use guard::DropGuard; /// /// # Examples /// -/// ```ignore +/// ```no_run /// use tokio::select; -/// use tokio::scope::CancellationToken; +/// use tokio_util::sync::CancellationToken; /// /// #[tokio::main] /// async fn main() { @@ -55,31 +52,20 @@ use guard::DropGuard; /// } /// ``` pub struct CancellationToken { - inner: NonNull, + inner: Arc, } -// Safety: The CancellationToken is thread-safe and can be moved between threads, -// since all methods are internally synchronized. -unsafe impl Send for CancellationToken {} -unsafe impl Sync for CancellationToken {} - -/// A Future that is resolved once the corresponding [`CancellationToken`] -/// was cancelled -#[must_use = "futures do nothing unless polled"] -pub struct WaitForCancellationFuture<'a> { - /// The CancellationToken that is associated with this WaitForCancellationFuture - cancellation_token: Option<&'a CancellationToken>, - /// Node for waiting at the cancellation_token - wait_node: ListNode, - /// Whether this future was registered at the token yet as a waiter - is_registered: bool, +pin_project! { + /// A Future that is resolved once the corresponding [`CancellationToken`] + /// is cancelled. + #[must_use = "futures do nothing unless polled"] + pub struct WaitForCancellationFuture<'a> { + cancellation_token: &'a CancellationToken, + #[pin] + future: tokio::sync::futures::Notified<'a>, + } } -// Safety: Futures can be sent between threads as long as the underlying -// cancellation_token is thread-safe (Sync), -// which allows to poll/register/unregister from a different thread. -unsafe impl<'a> Send for WaitForCancellationFuture<'a> {} - // ===== impl CancellationToken ===== impl core::fmt::Debug for CancellationToken { @@ -92,43 +78,16 @@ impl core::fmt::Debug for CancellationToken { impl Clone for CancellationToken { fn clone(&self) -> Self { - // Safety: The state inside a `CancellationToken` is always valid, since - // is reference counted - let inner = self.state(); - - // Tokens are cloned by increasing their refcount - let current_state = inner.snapshot(); - inner.increment_refcount(current_state); - - CancellationToken { inner: self.inner } + tree_node::increase_handle_refcount(&self.inner); + CancellationToken { + inner: self.inner.clone(), + } } } impl Drop for CancellationToken { fn drop(&mut self) { - let token_state_pointer = self.inner; - - // Safety: The state inside a `CancellationToken` is always valid, since - // is reference counted - let inner = unsafe { &mut *self.inner.as_ptr() }; - - let mut current_state = inner.snapshot(); - - // We need to safe the parent, since the state might be released by the - // next call - let parent = inner.parent; - - // Drop our own refcount - current_state = inner.decrement_refcount(current_state); - - // If this was the last reference, unregister from the parent - if current_state.refcount == 0 { - if let Some(mut parent) = parent { - // Safety: Since we still retain a reference on the parent, it must be valid. - let parent = unsafe { parent.as_mut() }; - parent.unregister_child(token_state_pointer, current_state); - } - } + tree_node::decrease_handle_refcount(&self.inner); } } @@ -141,29 +100,11 @@ impl Default for CancellationToken { impl CancellationToken { /// Creates a new CancellationToken in the non-cancelled state. pub fn new() -> CancellationToken { - let state = Box::new(CancellationTokenState::new( - None, - StateSnapshot { - cancel_state: CancellationState::NotCancelled, - has_parent_ref: false, - refcount: 1, - }, - )); - - // Safety: We just created the Box. The pointer is guaranteed to be - // not null CancellationToken { - inner: unsafe { NonNull::new_unchecked(Box::into_raw(state)) }, + inner: Arc::new(tree_node::TreeNode::new()), } } - /// Returns a reference to the utilized `CancellationTokenState`. - fn state(&self) -> &CancellationTokenState { - // Safety: The state inside a `CancellationToken` is always valid, since - // is reference counted - unsafe { &*self.inner.as_ptr() } - } - /// Creates a `CancellationToken` which will get cancelled whenever the /// current token gets cancelled. /// @@ -172,9 +113,9 @@ impl CancellationToken { /// /// # Examples /// - /// ```ignore + /// ```no_run /// use tokio::select; - /// use tokio::scope::CancellationToken; + /// use tokio_util::sync::CancellationToken; /// /// #[tokio::main] /// async fn main() { @@ -203,56 +144,8 @@ impl CancellationToken { /// } /// ``` pub fn child_token(&self) -> CancellationToken { - let inner = self.state(); - - // Increment the refcount of this token. It will be referenced by the - // child, independent of whether the child is immediately cancelled or - // not. - let _current_state = inner.increment_refcount(inner.snapshot()); - - let mut unpacked_child_state = StateSnapshot { - has_parent_ref: true, - refcount: 1, - cancel_state: CancellationState::NotCancelled, - }; - let mut child_token_state = Box::new(CancellationTokenState::new( - Some(self.inner), - unpacked_child_state, - )); - - { - let mut guard = inner.synchronized.lock().unwrap(); - if guard.is_cancelled { - // This task was already cancelled. In this case we should not - // insert the child into the list, since it would never get removed - // from the list. - (*child_token_state.synchronized.lock().unwrap()).is_cancelled = true; - unpacked_child_state.cancel_state = CancellationState::Cancelled; - // Since it's not in the list, the parent doesn't need to retain - // a reference to it. - unpacked_child_state.has_parent_ref = false; - child_token_state - .state - .store(unpacked_child_state.pack(), Ordering::SeqCst); - } else { - if let Some(mut first_child) = guard.first_child { - child_token_state.from_parent.next_peer = Some(first_child); - // Safety: We manipulate other child task inside the Mutex - // and retain a parent reference on it. The child token can't - // get invalidated while the Mutex is held. - unsafe { - first_child.as_mut().from_parent.prev_peer = - Some((&mut *child_token_state).into()) - }; - } - guard.first_child = Some((&mut *child_token_state).into()); - } - }; - - let child_token_ptr = Box::into_raw(child_token_state); - // Safety: We just created the pointer from a `Box` CancellationToken { - inner: unsafe { NonNull::new_unchecked(child_token_ptr) }, + inner: tree_node::child_node(&self.inner), } } @@ -260,21 +153,33 @@ impl CancellationToken { /// derived from it. /// /// This will wake up all tasks which are waiting for cancellation. + /// + /// Be aware that cancellation is not an atomic operation. It is possible + /// for another thread running in parallel with a call to `cancel` to first + /// receive `true` from `is_cancelled` on one child node, and then receive + /// `false` from `is_cancelled` on another child node. However, once the + /// call to `cancel` returns, all child nodes have been fully cancelled. pub fn cancel(&self) { - self.state().cancel(); + tree_node::cancel(&self.inner); } - /// Returns `true` if the `CancellationToken` had been cancelled + /// Returns `true` if the `CancellationToken` is cancelled. pub fn is_cancelled(&self) -> bool { - self.state().is_cancelled() + tree_node::is_cancelled(&self.inner) } /// Returns a `Future` that gets fulfilled when cancellation is requested. + /// + /// The future will complete immediately if the token is already cancelled + /// when this method is called. + /// + /// # Cancel safety + /// + /// This method is cancel safe. pub fn cancelled(&self) -> WaitForCancellationFuture<'_> { WaitForCancellationFuture { - cancellation_token: Some(self), - wait_node: ListNode::new(WaitQueueEntry::new()), - is_registered: false, + cancellation_token: self, + future: self.inner.notified(), } } @@ -285,26 +190,6 @@ impl CancellationToken { pub fn drop_guard(self) -> DropGuard { DropGuard { inner: Some(self) } } - - unsafe fn register( - &self, - wait_node: &mut ListNode, - cx: &mut Context<'_>, - ) -> Poll<()> { - self.state().register(wait_node, cx) - } - - fn check_for_cancellation( - &self, - wait_node: &mut ListNode, - cx: &mut Context<'_>, - ) -> Poll<()> { - self.state().check_for_cancellation(wait_node, cx) - } - - fn unregister(&self, wait_node: &mut ListNode) { - self.state().unregister(wait_node) - } } // ===== impl WaitForCancellationFuture ===== @@ -319,560 +204,21 @@ impl<'a> Future for WaitForCancellationFuture<'a> { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - // Safety: We do not move anything out of `WaitForCancellationFuture` - let mut_self: &mut WaitForCancellationFuture<'_> = unsafe { Pin::get_unchecked_mut(self) }; - - let cancellation_token = mut_self - .cancellation_token - .expect("polled WaitForCancellationFuture after completion"); - - let poll_res = if !mut_self.is_registered { - // Safety: The `ListNode` is pinned through the Future, - // and we will unregister it in `WaitForCancellationFuture::drop` - // before the Future is dropped and the memory reference is invalidated. - unsafe { cancellation_token.register(&mut mut_self.wait_node, cx) } - } else { - cancellation_token.check_for_cancellation(&mut mut_self.wait_node, cx) - }; - - if let Poll::Ready(()) = poll_res { - // The cancellation_token was signalled - mut_self.cancellation_token = None; - // A signalled Token means the Waker won't be enqueued anymore - mut_self.is_registered = false; - mut_self.wait_node.task = None; - } else { - // This `Future` and its stored `Waker` stay registered at the - // `CancellationToken` - mut_self.is_registered = true; - } - - poll_res - } -} - -impl<'a> Drop for WaitForCancellationFuture<'a> { - fn drop(&mut self) { - // If this WaitForCancellationFuture has been polled and it was added to the - // wait queue at the cancellation_token, it must be removed before dropping. - // Otherwise the cancellation_token would access invalid memory. - if let Some(token) = self.cancellation_token { - if self.is_registered { - token.unregister(&mut self.wait_node); - } - } - } -} - -/// Tracks how the future had interacted with the [`CancellationToken`] -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -enum PollState { - /// The task has never interacted with the [`CancellationToken`]. - New, - /// The task was added to the wait queue at the [`CancellationToken`]. - Waiting, - /// The task has been polled to completion. - Done, -} - -/// Tracks the WaitForCancellationFuture waiting state. -/// Access to this struct is synchronized through the mutex in the CancellationToken. -struct WaitQueueEntry { - /// The task handle of the waiting task - task: Option, - // Current polling state. This state is only updated inside the Mutex of - // the CancellationToken. - state: PollState, -} - -impl WaitQueueEntry { - /// Creates a new WaitQueueEntry - fn new() -> WaitQueueEntry { - WaitQueueEntry { - task: None, - state: PollState::New, - } - } -} - -struct SynchronizedState { - waiters: LinkedList, - first_child: Option>, - is_cancelled: bool, -} - -impl SynchronizedState { - fn new() -> Self { - Self { - waiters: LinkedList::new(), - first_child: None, - is_cancelled: false, - } - } -} - -/// Information embedded in child tokens which is synchronized through the Mutex -/// in their parent. -struct SynchronizedThroughParent { - next_peer: Option>, - prev_peer: Option>, -} - -/// Possible states of a `CancellationToken` -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -enum CancellationState { - NotCancelled = 0, - Cancelling = 1, - Cancelled = 2, -} - -impl CancellationState { - fn pack(self) -> usize { - self as usize - } - - fn unpack(value: usize) -> Self { - match value { - 0 => CancellationState::NotCancelled, - 1 => CancellationState::Cancelling, - 2 => CancellationState::Cancelled, - _ => unreachable!("Invalid value"), - } - } -} - -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -struct StateSnapshot { - /// The amount of references to this particular CancellationToken. - /// `CancellationToken` structs hold these references to a `CancellationTokenState`. - /// Also the state is referenced by the state of each child. - refcount: usize, - /// Whether the state is still referenced by it's parent and can therefore - /// not be freed. - has_parent_ref: bool, - /// Whether the token is cancelled - cancel_state: CancellationState, -} - -impl StateSnapshot { - /// Packs the snapshot into a `usize` - fn pack(self) -> usize { - self.refcount << 3 | if self.has_parent_ref { 4 } else { 0 } | self.cancel_state.pack() - } - - /// Unpacks the snapshot from a `usize` - fn unpack(value: usize) -> Self { - let refcount = value >> 3; - let has_parent_ref = value & 4 != 0; - let cancel_state = CancellationState::unpack(value & 0x03); - - StateSnapshot { - refcount, - has_parent_ref, - cancel_state, - } - } - - /// Whether this `CancellationTokenState` is still referenced by any - /// `CancellationToken`. - fn has_refs(&self) -> bool { - self.refcount != 0 || self.has_parent_ref - } -} - -/// The maximum permitted amount of references to a CancellationToken. This -/// is derived from the intent to never use more than 32bit in the `Snapshot`. -const MAX_REFS: u32 = (std::u32::MAX - 7) >> 3; - -/// Internal state of the `CancellationToken` pair above -struct CancellationTokenState { - state: AtomicUsize, - parent: Option>, - from_parent: SynchronizedThroughParent, - synchronized: Mutex, -} - -impl CancellationTokenState { - fn new( - parent: Option>, - state: StateSnapshot, - ) -> CancellationTokenState { - CancellationTokenState { - parent, - from_parent: SynchronizedThroughParent { - prev_peer: None, - next_peer: None, - }, - state: AtomicUsize::new(state.pack()), - synchronized: Mutex::new(SynchronizedState::new()), - } - } - - /// Returns a snapshot of the current atomic state of the token - fn snapshot(&self) -> StateSnapshot { - StateSnapshot::unpack(self.state.load(Ordering::SeqCst)) - } - - fn atomic_update_state(&self, mut current_state: StateSnapshot, func: F) -> StateSnapshot - where - F: Fn(StateSnapshot) -> StateSnapshot, - { - let mut current_packed_state = current_state.pack(); + let mut this = self.project(); loop { - let next_state = func(current_state); - match self.state.compare_exchange( - current_packed_state, - next_state.pack(), - Ordering::SeqCst, - Ordering::SeqCst, - ) { - Ok(_) => { - return next_state; - } - Err(actual) => { - current_packed_state = actual; - current_state = StateSnapshot::unpack(actual); - } - } - } - } - - fn increment_refcount(&self, current_state: StateSnapshot) -> StateSnapshot { - self.atomic_update_state(current_state, |mut state: StateSnapshot| { - if state.refcount >= MAX_REFS as usize { - eprintln!("[ERROR] Maximum reference count for CancellationToken was exceeded"); - std::process::abort(); - } - state.refcount += 1; - state - }) - } - - fn decrement_refcount(&self, current_state: StateSnapshot) -> StateSnapshot { - let current_state = self.atomic_update_state(current_state, |mut state: StateSnapshot| { - state.refcount -= 1; - state - }); - - // Drop the State if it is not referenced anymore - if !current_state.has_refs() { - // Safety: `CancellationTokenState` is always stored in refcounted - // Boxes - let _ = unsafe { Box::from_raw(self as *const Self as *mut Self) }; - } - - current_state - } - - fn remove_parent_ref(&self, current_state: StateSnapshot) -> StateSnapshot { - let current_state = self.atomic_update_state(current_state, |mut state: StateSnapshot| { - state.has_parent_ref = false; - state - }); - - // Drop the State if it is not referenced anymore - if !current_state.has_refs() { - // Safety: `CancellationTokenState` is always stored in refcounted - // Boxes - let _ = unsafe { Box::from_raw(self as *const Self as *mut Self) }; - } - - current_state - } - - /// Unregisters a child from the parent token. - /// The child tokens state is not exactly known at this point in time. - /// If the parent token is cancelled, the child token gets removed from the - /// parents list, and might therefore already have been freed. If the parent - /// token is not cancelled, the child token is still valid. - fn unregister_child( - &mut self, - mut child_state: NonNull, - current_child_state: StateSnapshot, - ) { - let removed_child = { - // Remove the child toke from the parents linked list - let mut guard = self.synchronized.lock().unwrap(); - if !guard.is_cancelled { - // Safety: Since the token was not cancelled, the child must - // still be in the list and valid. - let mut child_state = unsafe { child_state.as_mut() }; - debug_assert!(child_state.snapshot().has_parent_ref); - - if guard.first_child == Some(child_state.into()) { - guard.first_child = child_state.from_parent.next_peer; - } - // Safety: If peers wouldn't be valid anymore, they would try - // to remove themselves from the list. This would require locking - // the Mutex that we currently own. - unsafe { - if let Some(mut prev_peer) = child_state.from_parent.prev_peer { - prev_peer.as_mut().from_parent.next_peer = - child_state.from_parent.next_peer; - } - if let Some(mut next_peer) = child_state.from_parent.next_peer { - next_peer.as_mut().from_parent.prev_peer = - child_state.from_parent.prev_peer; - } - } - child_state.from_parent.prev_peer = None; - child_state.from_parent.next_peer = None; - - // The child is no longer referenced by the parent, since we were able - // to remove its reference from the parents list. - true - } else { - // Do not touch the linked list anymore. If the parent is cancelled - // it will move all childs outside of the Mutex and manipulate - // the pointers there. Manipulating the pointers here too could - // lead to races. Therefore leave them just as as and let the - // parent deal with it. The parent will make sure to retain a - // reference to this state as long as it manipulates the list - // pointers. Therefore the pointers are not dangling. - false - } - }; - - if removed_child { - // If the token removed itself from the parents list, it can reset - // the parent ref status. If it is isn't able to do so, because the - // parent removed it from the list, there is no need to do this. - // The parent ref acts as as another reference count. Therefore - // removing this reference can free the object. - // Safety: The token was in the list. This means the parent wasn't - // cancelled before, and the token must still be alive. - unsafe { child_state.as_mut().remove_parent_ref(current_child_state) }; - } - - // Decrement the refcount on the parent and free it if necessary - self.decrement_refcount(self.snapshot()); - } - - fn cancel(&self) { - // Move the state of the CancellationToken from `NotCancelled` to `Cancelling` - let mut current_state = self.snapshot(); - - let state_after_cancellation = loop { - if current_state.cancel_state != CancellationState::NotCancelled { - // Another task already initiated the cancellation - return; + if this.cancellation_token.is_cancelled() { + return Poll::Ready(()); } - let mut next_state = current_state; - next_state.cancel_state = CancellationState::Cancelling; - match self.state.compare_exchange( - current_state.pack(), - next_state.pack(), - Ordering::SeqCst, - Ordering::SeqCst, - ) { - Ok(_) => break next_state, - Err(actual) => current_state = StateSnapshot::unpack(actual), + // No wakeups can be lost here because there is always a call to + // `is_cancelled` between the creation of the future and the call to + // `poll`, and the code that sets the cancelled flag does so before + // waking the `Notified`. + if this.future.as_mut().poll(cx).is_pending() { + return Poll::Pending; } - }; - - // This task cancelled the token - - // Take the task list out of the Token - // We do not want to cancel child token inside this lock. If one of the - // child tasks would have additional child tokens, we would recursively - // take locks. - - // Doing this action has an impact if the child token is dropped concurrently: - // It will try to deregister itself from the parent task, but can not find - // itself in the task list anymore. Therefore it needs to assume the parent - // has extracted the list and will process it. It may not modify the list. - // This is OK from a memory safety perspective, since the parent still - // retains a reference to the child task until it finished iterating over - // it. - - let mut first_child = { - let mut guard = self.synchronized.lock().unwrap(); - // Save the cancellation also inside the Mutex - // This allows child tokens which want to detach themselves to detect - // that this is no longer required since the parent cleared the list. - guard.is_cancelled = true; - - // Wakeup all waiters - // This happens inside the lock to make cancellation reliable - // If we would access waiters outside of the lock, the pointers - // may no longer be valid. - // Typically this shouldn't be an issue, since waking a task should - // only move it from the blocked into the ready state and not have - // further side effects. - - // Use a reverse iterator, so that the oldest waiter gets - // scheduled first - guard.waiters.reverse_drain(|waiter| { - // We are not allowed to move the `Waker` out of the list node. - // The `Future` relies on the fact that the old `Waker` stays there - // as long as the `Future` has not completed in order to perform - // the `will_wake()` check. - // Therefore `wake_by_ref` is used instead of `wake()` - if let Some(handle) = &mut waiter.task { - handle.wake_by_ref(); - } - // Mark the waiter to have been removed from the list. - waiter.state = PollState::Done; - }); - - guard.first_child.take() - }; - - while let Some(mut child) = first_child { - // Safety: We know this is a valid pointer since it is in our child pointer - // list. It can't have been freed in between, since we retain a a reference - // to each child. - let mut_child = unsafe { child.as_mut() }; - - // Get the next child and clean up list pointers - first_child = mut_child.from_parent.next_peer; - mut_child.from_parent.prev_peer = None; - mut_child.from_parent.next_peer = None; - - // Cancel the child task - mut_child.cancel(); - - // Drop the parent reference. This `CancellationToken` is not interested - // in interacting with the child anymore. - // This is ONLY allowed once we promised not to touch the state anymore - // after this interaction. - mut_child.remove_parent_ref(mut_child.snapshot()); - } - - // The cancellation has completed - // At this point in time tasks which registered a wait node can be sure - // that this wait node already had been dequeued from the list without - // needing to inspect the list. - self.atomic_update_state(state_after_cancellation, |mut state| { - state.cancel_state = CancellationState::Cancelled; - state - }); - } - /// Returns `true` if the `CancellationToken` had been cancelled - fn is_cancelled(&self) -> bool { - let current_state = self.snapshot(); - current_state.cancel_state != CancellationState::NotCancelled - } - - /// Registers a waiting task at the `CancellationToken`. - /// Safety: This method is only safe as long as the waiting waiting task - /// will properly unregister the wait node before it gets moved. - unsafe fn register( - &self, - wait_node: &mut ListNode, - cx: &mut Context<'_>, - ) -> Poll<()> { - debug_assert_eq!(PollState::New, wait_node.state); - let current_state = self.snapshot(); - - // Perform an optimistic cancellation check before. This is not strictly - // necessary since we also check for cancellation in the Mutex, but - // reduces the necessary work to be performed for tasks which already - // had been cancelled. - if current_state.cancel_state != CancellationState::NotCancelled { - return Poll::Ready(()); - } - - // So far the token is not cancelled. However it could be cancelled before - // we get the chance to store the `Waker`. Therefore we need to check - // for cancellation again inside the mutex. - let mut guard = self.synchronized.lock().unwrap(); - if guard.is_cancelled { - // Cancellation was signalled - wait_node.state = PollState::Done; - Poll::Ready(()) - } else { - // Added the task to the wait queue - wait_node.task = Some(cx.waker().clone()); - wait_node.state = PollState::Waiting; - guard.waiters.add_front(wait_node); - Poll::Pending - } - } - - fn check_for_cancellation( - &self, - wait_node: &mut ListNode, - cx: &mut Context<'_>, - ) -> Poll<()> { - debug_assert!( - wait_node.task.is_some(), - "Method can only be called after task had been registered" - ); - - let current_state = self.snapshot(); - - if current_state.cancel_state != CancellationState::NotCancelled { - // If the cancellation had been fully completed we know that our `Waker` - // is no longer registered at the `CancellationToken`. - // Otherwise the cancel call may or may not yet have iterated - // through the waiters list and removed the wait nodes. - // If it hasn't yet, we need to remove it. Otherwise an attempt to - // reuse the `wait_nodeĀ“ might get freed due to the `WaitForCancellationFuture` - // getting dropped before the cancellation had interacted with it. - if current_state.cancel_state != CancellationState::Cancelled { - self.unregister(wait_node); - } - Poll::Ready(()) - } else { - // Check if we need to swap the `Waker`. This will make the check more - // expensive, since the `Waker` is synchronized through the Mutex. - // If we don't need to perform a `Waker` update, an atomic check for - // cancellation is sufficient. - let need_waker_update = wait_node - .task - .as_ref() - .map(|waker| waker.will_wake(cx.waker())) - .unwrap_or(true); - - if need_waker_update { - let guard = self.synchronized.lock().unwrap(); - if guard.is_cancelled { - // Cancellation was signalled. Since this cancellation signal - // is set inside the Mutex, the old waiter must already have - // been removed from the waiting list - debug_assert_eq!(PollState::Done, wait_node.state); - wait_node.task = None; - Poll::Ready(()) - } else { - // The WaitForCancellationFuture is already in the queue. - // The CancellationToken can't have been cancelled, - // since this would change the is_cancelled flag inside the mutex. - // Therefore we just have to update the Waker. A follow-up - // cancellation will always use the new waker. - wait_node.task = Some(cx.waker().clone()); - Poll::Pending - } - } else { - // Do nothing. If the token gets cancelled, this task will get - // woken again and can fetch the cancellation. - Poll::Pending - } - } - } - - fn unregister(&self, wait_node: &mut ListNode) { - debug_assert!( - wait_node.task.is_some(), - "waiter can not be active without task" - ); - - let mut guard = self.synchronized.lock().unwrap(); - // WaitForCancellationFuture only needs to get removed if it has been added to - // the wait queue of the CancellationToken. - // This has happened in the PollState::Waiting case. - if let PollState::Waiting = wait_node.state { - // Safety: Due to the state, we know that the node must be part - // of the waiter list - if !unsafe { guard.waiters.remove(wait_node) } { - // Panic if the address isn't found. This can only happen if the contract was - // violated, e.g. the WaitQueueEntry got moved after the initial poll. - panic!("Future could not be removed from wait queue"); - } - wait_node.state = PollState::Done; + this.future.set(this.cancellation_token.inner.notified()); } - wait_node.task = None; } } diff --git a/tokio-util/src/sync/cancellation_token/tree_node.rs b/tokio-util/src/sync/cancellation_token/tree_node.rs new file mode 100644 index 00000000000..b6cd698e23d --- /dev/null +++ b/tokio-util/src/sync/cancellation_token/tree_node.rs @@ -0,0 +1,373 @@ +//! This mod provides the logic for the inner tree structure of the CancellationToken. +//! +//! CancellationTokens are only light handles with references to TreeNode. +//! All the logic is actually implemented in the TreeNode. +//! +//! A TreeNode is part of the cancellation tree and may have one parent and an arbitrary number of +//! children. +//! +//! A TreeNode can receive the request to perform a cancellation through a CancellationToken. +//! This cancellation request will cancel the node and all of its descendants. +//! +//! As soon as a node cannot get cancelled any more (because it was already cancelled or it has no +//! more CancellationTokens pointing to it any more), it gets removed from the tree, to keep the +//! tree as small as possible. +//! +//! # Invariants +//! +//! Those invariants shall be true at any time. +//! +//! 1. A node that has no parents and no handles can no longer be cancelled. +//! This is important during both cancellation and refcounting. +//! +//! 2. If node B *is* or *was* a child of node A, then node B was created *after* node A. +//! This is important for deadlock safety, as it is used for lock order. +//! Node B can only become the child of node A in two ways: +//! - being created with `child_node()`, in which case it is trivially true that +//! node A already existed when node B was created +//! - being moved A->C->B to A->B because node C was removed in `decrease_handle_refcount()` +//! or `cancel()`. In this case the invariant still holds, as B was younger than C, and C +//! was younger than A, therefore B is also younger than A. +//! +//! 3. If two nodes are both unlocked and node A is the parent of node B, then node B is a child of +//! node A. It is important to always restore that invariant before dropping the lock of a node. +//! +//! # Deadlock safety +//! +//! We always lock in the order of creation time. We can prove this through invariant #2. +//! Specifically, through invariant #2, we know that we always have to lock a parent +//! before its child. +//! +use crate::loom::sync::{Arc, Mutex, MutexGuard}; + +/// A node of the cancellation tree structure +/// +/// The actual data it holds is wrapped inside a mutex for synchronization. +pub(crate) struct TreeNode { + inner: Mutex, + waker: tokio::sync::Notify, +} +impl TreeNode { + pub(crate) fn new() -> Self { + Self { + inner: Mutex::new(Inner { + parent: None, + parent_idx: 0, + children: vec![], + is_cancelled: false, + num_handles: 1, + }), + waker: tokio::sync::Notify::new(), + } + } + + pub(crate) fn notified(&self) -> tokio::sync::futures::Notified<'_> { + self.waker.notified() + } +} + +/// The data contained inside a TreeNode. +/// +/// This struct exists so that the data of the node can be wrapped +/// in a Mutex. +struct Inner { + parent: Option>, + parent_idx: usize, + children: Vec>, + is_cancelled: bool, + num_handles: usize, +} + +/// Returns whether or not the node is cancelled +pub(crate) fn is_cancelled(node: &Arc) -> bool { + node.inner.lock().unwrap().is_cancelled +} + +/// Creates a child node +pub(crate) fn child_node(parent: &Arc) -> Arc { + let mut locked_parent = parent.inner.lock().unwrap(); + + // Do not register as child if we are already cancelled. + // Cancelled trees can never be uncancelled and therefore + // need no connection to parents or children any more. + if locked_parent.is_cancelled { + return Arc::new(TreeNode { + inner: Mutex::new(Inner { + parent: None, + parent_idx: 0, + children: vec![], + is_cancelled: true, + num_handles: 1, + }), + waker: tokio::sync::Notify::new(), + }); + } + + let child = Arc::new(TreeNode { + inner: Mutex::new(Inner { + parent: Some(parent.clone()), + parent_idx: locked_parent.children.len(), + children: vec![], + is_cancelled: false, + num_handles: 1, + }), + waker: tokio::sync::Notify::new(), + }); + + locked_parent.children.push(child.clone()); + + child +} + +/// Disconnects the given parent from all of its children. +/// +/// Takes a reference to [Inner] to make sure the parent is already locked. +fn disconnect_children(node: &mut Inner) { + for child in std::mem::take(&mut node.children) { + let mut locked_child = child.inner.lock().unwrap(); + locked_child.parent_idx = 0; + locked_child.parent = None; + } +} + +/// Figures out the parent of the node and locks the node and its parent atomically. +/// +/// The basic principle of preventing deadlocks in the tree is +/// that we always lock the parent first, and then the child. +/// For more info look at *deadlock safety* and *invariant #2*. +/// +/// Sadly, it's impossible to figure out the parent of a node without +/// locking it. To then achieve locking order consistency, the node +/// has to be unlocked before the parent gets locked. +/// This leaves a small window where we already assume that we know the parent, +/// but neither the parent nor the node is locked. Therefore, the parent could change. +/// +/// To prevent that this problem leaks into the rest of the code, it is abstracted +/// in this function. +/// +/// The locked child and optionally its locked parent, if a parent exists, get passed +/// to the `func` argument via (node, None) or (node, Some(parent)). +fn with_locked_node_and_parent(node: &Arc, func: F) -> Ret +where + F: FnOnce(MutexGuard<'_, Inner>, Option>) -> Ret, +{ + let mut potential_parent = { + let locked_node = node.inner.lock().unwrap(); + match locked_node.parent.clone() { + Some(parent) => parent, + // If we locked the node and its parent is `None`, we are in a valid state + // and can return. + None => return func(locked_node, None), + } + }; + + loop { + // Deadlock safety: + // + // Due to invariant #2, we know that we have to lock the parent first, and then the child. + // This is true even if the potential_parent is no longer the current parent or even its + // sibling, as the invariant still holds. + let locked_parent = potential_parent.inner.lock().unwrap(); + let locked_node = node.inner.lock().unwrap(); + + let actual_parent = match locked_node.parent.clone() { + Some(parent) => parent, + // If we locked the node and its parent is `None`, we are in a valid state + // and can return. + None => { + // Was the wrong parent, so unlock it before calling `func` + drop(locked_parent); + return func(locked_node, None); + } + }; + + // Loop until we managed to lock both the node and its parent + if Arc::ptr_eq(&actual_parent, &potential_parent) { + return func(locked_node, Some(locked_parent)); + } + + // Drop locked_parent before reassigning to potential_parent, + // as potential_parent is borrowed in it + drop(locked_node); + drop(locked_parent); + + potential_parent = actual_parent; + } +} + +/// Moves all children from `node` to `parent`. +/// +/// `parent` MUST have been a parent of the node when they both got locked, +/// otherwise there is a potential for a deadlock as invariant #2 would be violated. +/// +/// To aquire the locks for node and parent, use [with_locked_node_and_parent]. +fn move_children_to_parent(node: &mut Inner, parent: &mut Inner) { + // Pre-allocate in the parent, for performance + parent.children.reserve(node.children.len()); + + for child in std::mem::take(&mut node.children) { + { + let mut child_locked = child.inner.lock().unwrap(); + child_locked.parent = node.parent.clone(); + child_locked.parent_idx = parent.children.len(); + } + parent.children.push(child); + } +} + +/// Removes a child from the parent. +/// +/// `parent` MUST be the parent of `node`. +/// To aquire the locks for node and parent, use [with_locked_node_and_parent]. +fn remove_child(parent: &mut Inner, mut node: MutexGuard<'_, Inner>) { + // Query the position from where to remove a node + let pos = node.parent_idx; + node.parent = None; + node.parent_idx = 0; + + // Unlock node, so that only one child at a time is locked. + // Otherwise we would violate the lock order (see 'deadlock safety') as we + // don't know the creation order of the child nodes + drop(node); + + // If `node` is the last element in the list, we don't need any swapping + if parent.children.len() == pos + 1 { + parent.children.pop().unwrap(); + } else { + // If `node` is not the last element in the list, we need to + // replace it with the last element + let replacement_child = parent.children.pop().unwrap(); + replacement_child.inner.lock().unwrap().parent_idx = pos; + parent.children[pos] = replacement_child; + } + + let len = parent.children.len(); + if 4 * len <= parent.children.capacity() { + // equal to: + // parent.children.shrink_to(2 * len); + // but shrink_to was not yet stabilized in our minimal compatible version + let old_children = std::mem::replace(&mut parent.children, Vec::with_capacity(2 * len)); + parent.children.extend(old_children); + } +} + +/// Increases the reference count of handles. +pub(crate) fn increase_handle_refcount(node: &Arc) { + let mut locked_node = node.inner.lock().unwrap(); + + // Once no handles are left over, the node gets detached from the tree. + // There should never be a new handle once all handles are dropped. + assert!(locked_node.num_handles > 0); + + locked_node.num_handles += 1; +} + +/// Decreases the reference count of handles. +/// +/// Once no handle is left, we can remove the node from the +/// tree and connect its parent directly to its children. +pub(crate) fn decrease_handle_refcount(node: &Arc) { + let num_handles = { + let mut locked_node = node.inner.lock().unwrap(); + locked_node.num_handles -= 1; + locked_node.num_handles + }; + + if num_handles == 0 { + with_locked_node_and_parent(node, |mut node, parent| { + // Remove the node from the tree + match parent { + Some(mut parent) => { + // As we want to remove ourselves from the tree, + // we have to move the children to the parent, so that + // they still receive the cancellation event without us. + // Moving them does not violate invariant #1. + move_children_to_parent(&mut node, &mut parent); + + // Remove the node from the parent + remove_child(&mut parent, node); + } + None => { + // Due to invariant #1, we can assume that our + // children can no longer be cancelled through us. + // (as we now have neither a parent nor handles) + // Therefore we can disconnect them. + disconnect_children(&mut node); + } + } + }); + } +} + +/// Cancels a node and its children. +pub(crate) fn cancel(node: &Arc) { + let mut locked_node = node.inner.lock().unwrap(); + + if locked_node.is_cancelled { + return; + } + + // One by one, adopt grandchildren and then cancel and detach the child + while let Some(child) = locked_node.children.pop() { + // This can't deadlock because the mutex we are already + // holding is the parent of child. + let mut locked_child = child.inner.lock().unwrap(); + + // Detach the child from node + // No need to modify node.children, as the child already got removed with `.pop` + locked_child.parent = None; + locked_child.parent_idx = 0; + + // If child is already cancelled, detaching is enough + if locked_child.is_cancelled { + continue; + } + + // Cancel or adopt grandchildren + while let Some(grandchild) = locked_child.children.pop() { + // This can't deadlock because the two mutexes we are already + // holding is the parent and grandparent of grandchild. + let mut locked_grandchild = grandchild.inner.lock().unwrap(); + + // Detach the grandchild + locked_grandchild.parent = None; + locked_grandchild.parent_idx = 0; + + // If grandchild is already cancelled, detaching is enough + if locked_grandchild.is_cancelled { + continue; + } + + // For performance reasons, only adopt grandchildren that have children. + // Otherwise, just cancel them right away, no need for another iteration. + if locked_grandchild.children.is_empty() { + // Cancel the grandchild + locked_grandchild.is_cancelled = true; + locked_grandchild.children = Vec::new(); + drop(locked_grandchild); + grandchild.waker.notify_waiters(); + } else { + // Otherwise, adopt grandchild + locked_grandchild.parent = Some(node.clone()); + locked_grandchild.parent_idx = locked_node.children.len(); + drop(locked_grandchild); + locked_node.children.push(grandchild); + } + } + + // Cancel the child + locked_child.is_cancelled = true; + locked_child.children = Vec::new(); + drop(locked_child); + child.waker.notify_waiters(); + + // Now the child is cancelled and detached and all its children are adopted. + // Just continue until all (including adopted) children are cancelled and detached. + } + + // Cancel the node itself. + locked_node.is_cancelled = true; + locked_node.children = Vec::new(); + drop(locked_node); + node.waker.notify_waiters(); +} diff --git a/tokio-util/src/sync/intrusive_double_linked_list.rs b/tokio-util/src/sync/intrusive_double_linked_list.rs deleted file mode 100644 index 0a5ecff9a37..00000000000 --- a/tokio-util/src/sync/intrusive_double_linked_list.rs +++ /dev/null @@ -1,788 +0,0 @@ -//! An intrusive double linked list of data - -#![allow(dead_code, unreachable_pub)] - -use core::{ - marker::PhantomPinned, - ops::{Deref, DerefMut}, - ptr::NonNull, -}; - -/// A node which carries data of type `T` and is stored in an intrusive list -#[derive(Debug)] -pub struct ListNode { - /// The previous node in the list. `None` if there is no previous node. - prev: Option>>, - /// The next node in the list. `None` if there is no previous node. - next: Option>>, - /// The data which is associated to this list item - data: T, - /// Prevents `ListNode`s from being `Unpin`. They may never be moved, since - /// the list semantics require addresses to be stable. - _pin: PhantomPinned, -} - -impl ListNode { - /// Creates a new node with the associated data - pub fn new(data: T) -> ListNode { - Self { - prev: None, - next: None, - data, - _pin: PhantomPinned, - } - } -} - -impl Deref for ListNode { - type Target = T; - - fn deref(&self) -> &T { - &self.data - } -} - -impl DerefMut for ListNode { - fn deref_mut(&mut self) -> &mut T { - &mut self.data - } -} - -/// An intrusive linked list of nodes, where each node carries associated data -/// of type `T`. -#[derive(Debug)] -pub struct LinkedList { - head: Option>>, - tail: Option>>, -} - -impl LinkedList { - /// Creates an empty linked list - pub fn new() -> Self { - LinkedList:: { - head: None, - tail: None, - } - } - - /// Adds a node at the front of the linked list. - /// Safety: This function is only safe as long as `node` is guaranteed to - /// get removed from the list before it gets moved or dropped. - /// In addition to this `node` may not be added to another other list before - /// it is removed from the current one. - pub unsafe fn add_front(&mut self, node: &mut ListNode) { - node.next = self.head; - node.prev = None; - if let Some(mut head) = self.head { - head.as_mut().prev = Some(node.into()) - }; - self.head = Some(node.into()); - if self.tail.is_none() { - self.tail = Some(node.into()); - } - } - - /// Inserts a node into the list in a way that the list keeps being sorted. - /// Safety: This function is only safe as long as `node` is guaranteed to - /// get removed from the list before it gets moved or dropped. - /// In addition to this `node` may not be added to another other list before - /// it is removed from the current one. - pub unsafe fn add_sorted(&mut self, node: &mut ListNode) - where - T: PartialOrd, - { - if self.head.is_none() { - // First node in the list - self.head = Some(node.into()); - self.tail = Some(node.into()); - return; - } - - let mut prev: Option>> = None; - let mut current = self.head; - - while let Some(mut current_node) = current { - if node.data < current_node.as_ref().data { - // Need to insert before the current node - current_node.as_mut().prev = Some(node.into()); - match prev { - Some(mut prev) => { - prev.as_mut().next = Some(node.into()); - } - None => { - // We are inserting at the beginning of the list - self.head = Some(node.into()); - } - } - node.next = current; - node.prev = prev; - return; - } - prev = current; - current = current_node.as_ref().next; - } - - // We looped through the whole list and the nodes data is bigger or equal - // than everything we found up to now. - // Insert at the end. Since we checked before that the list isn't empty, - // tail always has a value. - node.prev = self.tail; - node.next = None; - self.tail.as_mut().unwrap().as_mut().next = Some(node.into()); - self.tail = Some(node.into()); - } - - /// Returns the first node in the linked list without removing it from the list - /// The function is only safe as long as valid pointers are stored inside - /// the linked list. - /// The returned pointer is only guaranteed to be valid as long as the list - /// is not mutated - pub fn peek_first(&self) -> Option<&mut ListNode> { - // Safety: When the node was inserted it was promised that it is alive - // until it gets removed from the list. - // The returned node has a pointer which constrains it to the lifetime - // of the list. This is ok, since the Node is supposed to outlive - // its insertion in the list. - unsafe { - self.head - .map(|mut node| &mut *(node.as_mut() as *mut ListNode)) - } - } - - /// Returns the last node in the linked list without removing it from the list - /// The function is only safe as long as valid pointers are stored inside - /// the linked list. - /// The returned pointer is only guaranteed to be valid as long as the list - /// is not mutated - pub fn peek_last(&self) -> Option<&mut ListNode> { - // Safety: When the node was inserted it was promised that it is alive - // until it gets removed from the list. - // The returned node has a pointer which constrains it to the lifetime - // of the list. This is ok, since the Node is supposed to outlive - // its insertion in the list. - unsafe { - self.tail - .map(|mut node| &mut *(node.as_mut() as *mut ListNode)) - } - } - - /// Removes the first node from the linked list - pub fn remove_first(&mut self) -> Option<&mut ListNode> { - #![allow(clippy::debug_assert_with_mut_call)] - - // Safety: When the node was inserted it was promised that it is alive - // until it gets removed from the list - unsafe { - let mut head = self.head?; - self.head = head.as_mut().next; - - let first_ref = head.as_mut(); - match first_ref.next { - None => { - // This was the only node in the list - debug_assert_eq!(Some(first_ref.into()), self.tail); - self.tail = None; - } - Some(mut next) => { - next.as_mut().prev = None; - } - } - - first_ref.prev = None; - first_ref.next = None; - Some(&mut *(first_ref as *mut ListNode)) - } - } - - /// Removes the last node from the linked list and returns it - pub fn remove_last(&mut self) -> Option<&mut ListNode> { - #![allow(clippy::debug_assert_with_mut_call)] - - // Safety: When the node was inserted it was promised that it is alive - // until it gets removed from the list - unsafe { - let mut tail = self.tail?; - self.tail = tail.as_mut().prev; - - let last_ref = tail.as_mut(); - match last_ref.prev { - None => { - // This was the last node in the list - debug_assert_eq!(Some(last_ref.into()), self.head); - self.head = None; - } - Some(mut prev) => { - prev.as_mut().next = None; - } - } - - last_ref.prev = None; - last_ref.next = None; - Some(&mut *(last_ref as *mut ListNode)) - } - } - - /// Returns whether the linked list does not contain any node - pub fn is_empty(&self) -> bool { - if self.head.is_some() { - return false; - } - - debug_assert!(self.tail.is_none()); - true - } - - /// Removes the given `node` from the linked list. - /// Returns whether the `node` was removed. - /// It is also only safe if it is known that the `node` is either part of this - /// list, or of no list at all. If `node` is part of another list, the - /// behavior is undefined. - pub unsafe fn remove(&mut self, node: &mut ListNode) -> bool { - #![allow(clippy::debug_assert_with_mut_call)] - - match node.prev { - None => { - // This might be the first node in the list. If it is not, the - // node is not in the list at all. Since our precondition is that - // the node must either be in this list or in no list, we check that - // the node is really in no list. - if self.head != Some(node.into()) { - debug_assert!(node.next.is_none()); - return false; - } - self.head = node.next; - } - Some(mut prev) => { - debug_assert_eq!(prev.as_ref().next, Some(node.into())); - prev.as_mut().next = node.next; - } - } - - match node.next { - None => { - // This must be the last node in our list. Otherwise the list - // is inconsistent. - debug_assert_eq!(self.tail, Some(node.into())); - self.tail = node.prev; - } - Some(mut next) => { - debug_assert_eq!(next.as_mut().prev, Some(node.into())); - next.as_mut().prev = node.prev; - } - } - - node.next = None; - node.prev = None; - - true - } - - /// Drains the list iby calling a callback on each list node - /// - /// The method does not return an iterator since stopping or deferring - /// draining the list is not permitted. If the method would push nodes to - /// an iterator we could not guarantee that the nodes do not get utilized - /// after having been removed from the list anymore. - pub fn drain(&mut self, mut func: F) - where - F: FnMut(&mut ListNode), - { - let mut current = self.head; - self.head = None; - self.tail = None; - - while let Some(mut node) = current { - // Safety: The nodes have not been removed from the list yet and must - // therefore contain valid data. The nodes can also not be added to - // the list again during iteration, since the list is mutably borrowed. - unsafe { - let node_ref = node.as_mut(); - current = node_ref.next; - - node_ref.next = None; - node_ref.prev = None; - - // Note: We do not reset the pointers from the next element in the - // list to the current one since we will iterate over the whole - // list anyway, and therefore clean up all pointers. - - func(node_ref); - } - } - } - - /// Drains the list in reverse order by calling a callback on each list node - /// - /// The method does not return an iterator since stopping or deferring - /// draining the list is not permitted. If the method would push nodes to - /// an iterator we could not guarantee that the nodes do not get utilized - /// after having been removed from the list anymore. - pub fn reverse_drain(&mut self, mut func: F) - where - F: FnMut(&mut ListNode), - { - let mut current = self.tail; - self.head = None; - self.tail = None; - - while let Some(mut node) = current { - // Safety: The nodes have not been removed from the list yet and must - // therefore contain valid data. The nodes can also not be added to - // the list again during iteration, since the list is mutably borrowed. - unsafe { - let node_ref = node.as_mut(); - current = node_ref.prev; - - node_ref.next = None; - node_ref.prev = None; - - // Note: We do not reset the pointers from the next element in the - // list to the current one since we will iterate over the whole - // list anyway, and therefore clean up all pointers. - - func(node_ref); - } - } - } -} - -#[cfg(all(test, feature = "std"))] // Tests make use of Vec at the moment -mod tests { - use super::*; - - fn collect_list(mut list: LinkedList) -> Vec { - let mut result = Vec::new(); - list.drain(|node| { - result.push(**node); - }); - result - } - - fn collect_reverse_list(mut list: LinkedList) -> Vec { - let mut result = Vec::new(); - list.reverse_drain(|node| { - result.push(**node); - }); - result - } - - unsafe fn add_nodes(list: &mut LinkedList, nodes: &mut [&mut ListNode]) { - for node in nodes.iter_mut() { - list.add_front(node); - } - } - - unsafe fn assert_clean(node: &mut ListNode) { - assert!(node.next.is_none()); - assert!(node.prev.is_none()); - } - - #[test] - fn insert_and_iterate() { - unsafe { - let mut a = ListNode::new(5); - let mut b = ListNode::new(7); - let mut c = ListNode::new(31); - - let mut setup = |list: &mut LinkedList| { - assert_eq!(true, list.is_empty()); - list.add_front(&mut c); - assert_eq!(31, **list.peek_first().unwrap()); - assert_eq!(false, list.is_empty()); - list.add_front(&mut b); - assert_eq!(7, **list.peek_first().unwrap()); - list.add_front(&mut a); - assert_eq!(5, **list.peek_first().unwrap()); - }; - - let mut list = LinkedList::new(); - setup(&mut list); - let items: Vec = collect_list(list); - assert_eq!([5, 7, 31].to_vec(), items); - - let mut list = LinkedList::new(); - setup(&mut list); - let items: Vec = collect_reverse_list(list); - assert_eq!([31, 7, 5].to_vec(), items); - } - } - - #[test] - fn add_sorted() { - unsafe { - let mut a = ListNode::new(5); - let mut b = ListNode::new(7); - let mut c = ListNode::new(31); - let mut d = ListNode::new(99); - - let mut list = LinkedList::new(); - list.add_sorted(&mut a); - let items: Vec = collect_list(list); - assert_eq!([5].to_vec(), items); - - let mut list = LinkedList::new(); - list.add_sorted(&mut a); - let items: Vec = collect_reverse_list(list); - assert_eq!([5].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut d, &mut c, &mut b]); - list.add_sorted(&mut a); - let items: Vec = collect_list(list); - assert_eq!([5, 7, 31, 99].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut d, &mut c, &mut b]); - list.add_sorted(&mut a); - let items: Vec = collect_reverse_list(list); - assert_eq!([99, 31, 7, 5].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut d, &mut c, &mut a]); - list.add_sorted(&mut b); - let items: Vec = collect_list(list); - assert_eq!([5, 7, 31, 99].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut d, &mut c, &mut a]); - list.add_sorted(&mut b); - let items: Vec = collect_reverse_list(list); - assert_eq!([99, 31, 7, 5].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut d, &mut b, &mut a]); - list.add_sorted(&mut c); - let items: Vec = collect_list(list); - assert_eq!([5, 7, 31, 99].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut d, &mut b, &mut a]); - list.add_sorted(&mut c); - let items: Vec = collect_reverse_list(list); - assert_eq!([99, 31, 7, 5].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - list.add_sorted(&mut d); - let items: Vec = collect_list(list); - assert_eq!([5, 7, 31, 99].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - list.add_sorted(&mut d); - let items: Vec = collect_reverse_list(list); - assert_eq!([99, 31, 7, 5].to_vec(), items); - } - } - - #[test] - fn drain_and_collect() { - unsafe { - let mut a = ListNode::new(5); - let mut b = ListNode::new(7); - let mut c = ListNode::new(31); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - - let taken_items: Vec = collect_list(list); - assert_eq!([5, 7, 31].to_vec(), taken_items); - } - } - - #[test] - fn peek_last() { - unsafe { - let mut a = ListNode::new(5); - let mut b = ListNode::new(7); - let mut c = ListNode::new(31); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - - let last = list.peek_last(); - assert_eq!(31, **last.unwrap()); - list.remove_last(); - - let last = list.peek_last(); - assert_eq!(7, **last.unwrap()); - list.remove_last(); - - let last = list.peek_last(); - assert_eq!(5, **last.unwrap()); - list.remove_last(); - - let last = list.peek_last(); - assert!(last.is_none()); - } - } - - #[test] - fn remove_first() { - unsafe { - // We iterate forward and backwards through the manipulated lists - // to make sure pointers in both directions are still ok. - let mut a = ListNode::new(5); - let mut b = ListNode::new(7); - let mut c = ListNode::new(31); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - let removed = list.remove_first().unwrap(); - assert_clean(removed); - assert!(!list.is_empty()); - let items: Vec = collect_list(list); - assert_eq!([7, 31].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - let removed = list.remove_first().unwrap(); - assert_clean(removed); - assert!(!list.is_empty()); - let items: Vec = collect_reverse_list(list); - assert_eq!([31, 7].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut b, &mut a]); - let removed = list.remove_first().unwrap(); - assert_clean(removed); - assert!(!list.is_empty()); - let items: Vec = collect_list(list); - assert_eq!([7].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut b, &mut a]); - let removed = list.remove_first().unwrap(); - assert_clean(removed); - assert!(!list.is_empty()); - let items: Vec = collect_reverse_list(list); - assert_eq!([7].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut a]); - let removed = list.remove_first().unwrap(); - assert_clean(removed); - assert!(list.is_empty()); - let items: Vec = collect_list(list); - assert!(items.is_empty()); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut a]); - let removed = list.remove_first().unwrap(); - assert_clean(removed); - assert!(list.is_empty()); - let items: Vec = collect_reverse_list(list); - assert!(items.is_empty()); - } - } - - #[test] - fn remove_last() { - unsafe { - // We iterate forward and backwards through the manipulated lists - // to make sure pointers in both directions are still ok. - let mut a = ListNode::new(5); - let mut b = ListNode::new(7); - let mut c = ListNode::new(31); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - let removed = list.remove_last().unwrap(); - assert_clean(removed); - assert!(!list.is_empty()); - let items: Vec = collect_list(list); - assert_eq!([5, 7].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - let removed = list.remove_last().unwrap(); - assert_clean(removed); - assert!(!list.is_empty()); - let items: Vec = collect_reverse_list(list); - assert_eq!([7, 5].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut b, &mut a]); - let removed = list.remove_last().unwrap(); - assert_clean(removed); - assert!(!list.is_empty()); - let items: Vec = collect_list(list); - assert_eq!([5].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut b, &mut a]); - let removed = list.remove_last().unwrap(); - assert_clean(removed); - assert!(!list.is_empty()); - let items: Vec = collect_reverse_list(list); - assert_eq!([5].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut a]); - let removed = list.remove_last().unwrap(); - assert_clean(removed); - assert!(list.is_empty()); - let items: Vec = collect_list(list); - assert!(items.is_empty()); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut a]); - let removed = list.remove_last().unwrap(); - assert_clean(removed); - assert!(list.is_empty()); - let items: Vec = collect_reverse_list(list); - assert!(items.is_empty()); - } - } - - #[test] - fn remove_by_address() { - unsafe { - let mut a = ListNode::new(5); - let mut b = ListNode::new(7); - let mut c = ListNode::new(31); - - { - // Remove first - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - assert_eq!(true, list.remove(&mut a)); - assert_clean((&mut a).into()); - // a should be no longer there and can't be removed twice - assert_eq!(false, list.remove(&mut a)); - assert_eq!(Some((&mut b).into()), list.head); - assert_eq!(Some((&mut c).into()), b.next); - assert_eq!(Some((&mut b).into()), c.prev); - let items: Vec = collect_list(list); - assert_eq!([7, 31].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - assert_eq!(true, list.remove(&mut a)); - assert_clean((&mut a).into()); - // a should be no longer there and can't be removed twice - assert_eq!(false, list.remove(&mut a)); - assert_eq!(Some((&mut c).into()), b.next); - assert_eq!(Some((&mut b).into()), c.prev); - let items: Vec = collect_reverse_list(list); - assert_eq!([31, 7].to_vec(), items); - } - - { - // Remove middle - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - assert_eq!(true, list.remove(&mut b)); - assert_clean((&mut b).into()); - assert_eq!(Some((&mut c).into()), a.next); - assert_eq!(Some((&mut a).into()), c.prev); - let items: Vec = collect_list(list); - assert_eq!([5, 31].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - assert_eq!(true, list.remove(&mut b)); - assert_clean((&mut b).into()); - assert_eq!(Some((&mut c).into()), a.next); - assert_eq!(Some((&mut a).into()), c.prev); - let items: Vec = collect_reverse_list(list); - assert_eq!([31, 5].to_vec(), items); - } - - { - // Remove last - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - assert_eq!(true, list.remove(&mut c)); - assert_clean((&mut c).into()); - assert!(b.next.is_none()); - assert_eq!(Some((&mut b).into()), list.tail); - let items: Vec = collect_list(list); - assert_eq!([5, 7].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut c, &mut b, &mut a]); - assert_eq!(true, list.remove(&mut c)); - assert_clean((&mut c).into()); - assert!(b.next.is_none()); - assert_eq!(Some((&mut b).into()), list.tail); - let items: Vec = collect_reverse_list(list); - assert_eq!([7, 5].to_vec(), items); - } - - { - // Remove first of two - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut b, &mut a]); - assert_eq!(true, list.remove(&mut a)); - assert_clean((&mut a).into()); - // a should be no longer there and can't be removed twice - assert_eq!(false, list.remove(&mut a)); - assert_eq!(Some((&mut b).into()), list.head); - assert_eq!(Some((&mut b).into()), list.tail); - assert!(b.next.is_none()); - assert!(b.prev.is_none()); - let items: Vec = collect_list(list); - assert_eq!([7].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut b, &mut a]); - assert_eq!(true, list.remove(&mut a)); - assert_clean((&mut a).into()); - // a should be no longer there and can't be removed twice - assert_eq!(false, list.remove(&mut a)); - assert_eq!(Some((&mut b).into()), list.head); - assert_eq!(Some((&mut b).into()), list.tail); - assert!(b.next.is_none()); - assert!(b.prev.is_none()); - let items: Vec = collect_reverse_list(list); - assert_eq!([7].to_vec(), items); - } - - { - // Remove last of two - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut b, &mut a]); - assert_eq!(true, list.remove(&mut b)); - assert_clean((&mut b).into()); - assert_eq!(Some((&mut a).into()), list.head); - assert_eq!(Some((&mut a).into()), list.tail); - assert!(a.next.is_none()); - assert!(a.prev.is_none()); - let items: Vec = collect_list(list); - assert_eq!([5].to_vec(), items); - - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut b, &mut a]); - assert_eq!(true, list.remove(&mut b)); - assert_clean((&mut b).into()); - assert_eq!(Some((&mut a).into()), list.head); - assert_eq!(Some((&mut a).into()), list.tail); - assert!(a.next.is_none()); - assert!(a.prev.is_none()); - let items: Vec = collect_reverse_list(list); - assert_eq!([5].to_vec(), items); - } - - { - // Remove last item - let mut list = LinkedList::new(); - add_nodes(&mut list, &mut [&mut a]); - assert_eq!(true, list.remove(&mut a)); - assert_clean((&mut a).into()); - assert!(list.head.is_none()); - assert!(list.tail.is_none()); - let items: Vec = collect_list(list); - assert!(items.is_empty()); - } - - { - // Remove missing - let mut list = LinkedList::new(); - list.add_front(&mut b); - list.add_front(&mut a); - assert_eq!(false, list.remove(&mut c)); - } - } - } -} diff --git a/tokio-util/src/sync/mod.rs b/tokio-util/src/sync/mod.rs index 0b78a156cf3..bcdc99cb959 100644 --- a/tokio-util/src/sync/mod.rs +++ b/tokio-util/src/sync/mod.rs @@ -3,8 +3,6 @@ mod cancellation_token; pub use cancellation_token::{guard::DropGuard, CancellationToken, WaitForCancellationFuture}; -mod intrusive_double_linked_list; - mod mpsc; pub use mpsc::PollSender; diff --git a/tokio-util/tests/sync_cancellation_token.rs b/tokio-util/tests/sync_cancellation_token.rs index 4d86f2c46ce..28ba284b6c2 100644 --- a/tokio-util/tests/sync_cancellation_token.rs +++ b/tokio-util/tests/sync_cancellation_token.rs @@ -1,7 +1,7 @@ #![warn(rust_2018_idioms)] use tokio::pin; -use tokio_util::sync::CancellationToken; +use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; use core::future::Future; use core::task::{Context, Poll}; @@ -77,6 +77,46 @@ fn cancel_child_token_through_parent() { ); } +#[test] +fn cancel_grandchild_token_through_parent_if_child_was_dropped() { + let (waker, wake_counter) = new_count_waker(); + let token = CancellationToken::new(); + + let intermediate_token = token.child_token(); + let child_token = intermediate_token.child_token(); + drop(intermediate_token); + assert!(!child_token.is_cancelled()); + + let child_fut = child_token.cancelled(); + pin!(child_fut); + let parent_fut = token.cancelled(); + pin!(parent_fut); + + assert_eq!( + Poll::Pending, + child_fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Pending, + parent_fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!(wake_counter, 0); + + token.cancel(); + assert_eq!(wake_counter, 2); + assert!(token.is_cancelled()); + assert!(child_token.is_cancelled()); + + assert_eq!( + Poll::Ready(()), + child_fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Ready(()), + parent_fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); +} + #[test] fn cancel_child_token_without_parent() { let (waker, wake_counter) = new_count_waker(); @@ -206,6 +246,134 @@ fn drop_multiple_child_tokens() { } } +#[test] +fn cancel_only_all_descendants() { + // ARRANGE + let (waker, wake_counter) = new_count_waker(); + + let parent_token = CancellationToken::new(); + let token = parent_token.child_token(); + let sibling_token = parent_token.child_token(); + let child1_token = token.child_token(); + let child2_token = token.child_token(); + let grandchild_token = child1_token.child_token(); + let grandchild2_token = child1_token.child_token(); + let grandgrandchild_token = grandchild_token.child_token(); + + assert!(!parent_token.is_cancelled()); + assert!(!token.is_cancelled()); + assert!(!sibling_token.is_cancelled()); + assert!(!child1_token.is_cancelled()); + assert!(!child2_token.is_cancelled()); + assert!(!grandchild_token.is_cancelled()); + assert!(!grandchild2_token.is_cancelled()); + assert!(!grandgrandchild_token.is_cancelled()); + + let parent_fut = parent_token.cancelled(); + let fut = token.cancelled(); + let sibling_fut = sibling_token.cancelled(); + let child1_fut = child1_token.cancelled(); + let child2_fut = child2_token.cancelled(); + let grandchild_fut = grandchild_token.cancelled(); + let grandchild2_fut = grandchild2_token.cancelled(); + let grandgrandchild_fut = grandgrandchild_token.cancelled(); + + pin!(parent_fut); + pin!(fut); + pin!(sibling_fut); + pin!(child1_fut); + pin!(child2_fut); + pin!(grandchild_fut); + pin!(grandchild2_fut); + pin!(grandgrandchild_fut); + + assert_eq!( + Poll::Pending, + parent_fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Pending, + fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Pending, + sibling_fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Pending, + child1_fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Pending, + child2_fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Pending, + grandchild_fut + .as_mut() + .poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Pending, + grandchild2_fut + .as_mut() + .poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Pending, + grandgrandchild_fut + .as_mut() + .poll(&mut Context::from_waker(&waker)) + ); + assert_eq!(wake_counter, 0); + + // ACT + token.cancel(); + + // ASSERT + assert_eq!(wake_counter, 6); + assert!(!parent_token.is_cancelled()); + assert!(token.is_cancelled()); + assert!(!sibling_token.is_cancelled()); + assert!(child1_token.is_cancelled()); + assert!(child2_token.is_cancelled()); + assert!(grandchild_token.is_cancelled()); + assert!(grandchild2_token.is_cancelled()); + assert!(grandgrandchild_token.is_cancelled()); + + assert_eq!( + Poll::Ready(()), + fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Ready(()), + child1_fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Ready(()), + child2_fut.as_mut().poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Ready(()), + grandchild_fut + .as_mut() + .poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Ready(()), + grandchild2_fut + .as_mut() + .poll(&mut Context::from_waker(&waker)) + ); + assert_eq!( + Poll::Ready(()), + grandgrandchild_fut + .as_mut() + .poll(&mut Context::from_waker(&waker)) + ); + assert_eq!(wake_counter, 6); +} + #[test] fn drop_parent_before_child_tokens() { let token = CancellationToken::new(); @@ -218,3 +386,15 @@ fn drop_parent_before_child_tokens() { drop(child1); drop(child2); } + +#[test] +fn derives_send_sync() { + fn assert_send() {} + fn assert_sync() {} + + assert_send::(); + assert_sync::(); + + assert_send::>(); + assert_sync::>(); +}