From c11a81b60ba6129d32dc0df371622dcc1db59528 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Fri, 25 Oct 2019 17:52:47 +0200 Subject: [PATCH 1/9] Add utility type Registry to the sync module --- benches/mutex.rs | 42 +++++++++ benches/task.rs | 11 +++ src/sync/channel.rs | 202 ++----------------------------------------- src/sync/mod.rs | 3 + src/sync/mutex.rs | 105 +++++++--------------- src/sync/registry.rs | 198 ++++++++++++++++++++++++++++++++++++++++++ src/sync/rwlock.rs | 2 +- tests/channel.rs | 2 + 8 files changed, 294 insertions(+), 271 deletions(-) create mode 100644 benches/mutex.rs create mode 100644 benches/task.rs create mode 100644 src/sync/registry.rs diff --git a/benches/mutex.rs b/benches/mutex.rs new file mode 100644 index 000000000..b159ba127 --- /dev/null +++ b/benches/mutex.rs @@ -0,0 +1,42 @@ +#![feature(test)] + +extern crate test; + +use std::sync::Arc; + +use async_std::sync::Mutex; +use async_std::task; +use test::Bencher; + +#[bench] +fn create(b: &mut Bencher) { + b.iter(|| Mutex::new(())); +} + +#[bench] +fn contention(b: &mut Bencher) { + b.iter(|| task::block_on(run(10, 1000))); +} + +#[bench] +fn no_contention(b: &mut Bencher) { + b.iter(|| task::block_on(run(1, 10000))); +} + +async fn run(task: usize, iter: usize) { + let m = Arc::new(Mutex::new(())); + let mut tasks = Vec::new(); + + for _ in 0..task { + let m = m.clone(); + tasks.push(task::spawn(async move { + for _ in 0..iter { + let _ = m.lock().await; + } + })); + } + + for t in tasks { + t.await; + } +} diff --git a/benches/task.rs b/benches/task.rs new file mode 100644 index 000000000..7eafee7ea --- /dev/null +++ b/benches/task.rs @@ -0,0 +1,11 @@ +#![feature(test)] + +extern crate test; + +use async_std::task; +use test::{black_box, Bencher}; + +#[bench] +fn block_on(b: &mut Bencher) { + b.iter(|| task::block_on(async {})); +} diff --git a/src/sync/channel.rs b/src/sync/channel.rs index 6751417af..27ce0a1f2 100644 --- a/src/sync/channel.rs +++ b/src/sync/channel.rs @@ -4,17 +4,17 @@ use std::future::Future; use std::isize; use std::marker::PhantomData; use std::mem; -use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::process; use std::ptr; -use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{self, AtomicUsize, Ordering}; use std::sync::Arc; -use std::task::{Context, Poll, Waker}; +use std::task::{Context, Poll}; use crossbeam_utils::{Backoff, CachePadded}; -use futures_core::stream::Stream; -use slab::Slab; + +use crate::stream::Stream; +use crate::sync::Registry; /// Creates a bounded multi-producer multi-consumer channel. /// @@ -938,195 +938,3 @@ enum PopError { /// The channel is empty and disconnected. Disconnected, } - -/// A list of blocked channel operations. -struct Blocked { - /// A list of registered channel operations. - /// - /// Each entry has a waker associated with the task that is executing the operation. If the - /// waker is set to `None`, that means the task has been woken up but hasn't removed itself - /// from the registry yet. - entries: Slab>, - - /// The number of wakers in the entry list. - waker_count: usize, -} - -/// A registry of blocked channel operations. -/// -/// Blocked operations register themselves in a registry. Successful operations on the opposite -/// side of the channel wake blocked operations in the registry. -struct Registry { - /// A list of blocked channel operations. - blocked: Spinlock, - - /// Set to `true` if there are no wakers in the registry. - /// - /// Note that this either means there are no entries in the registry, or that all entries have - /// been notified. - is_empty: AtomicBool, -} - -impl Registry { - /// Creates a new registry. - fn new() -> Registry { - Registry { - blocked: Spinlock::new(Blocked { - entries: Slab::new(), - waker_count: 0, - }), - is_empty: AtomicBool::new(true), - } - } - - /// Registers a blocked channel operation and returns a key associated with it. - fn register(&self, cx: &Context<'_>) -> usize { - let mut blocked = self.blocked.lock(); - - // Insert a new entry into the list of blocked tasks. - let w = cx.waker().clone(); - let key = blocked.entries.insert(Some(w)); - - blocked.waker_count += 1; - if blocked.waker_count == 1 { - self.is_empty.store(false, Ordering::SeqCst); - } - - key - } - - /// Re-registers a blocked channel operation by filling in its waker. - fn reregister(&self, key: usize, cx: &Context<'_>) { - let mut blocked = self.blocked.lock(); - - let was_none = blocked.entries[key].is_none(); - let w = cx.waker().clone(); - blocked.entries[key] = Some(w); - - if was_none { - blocked.waker_count += 1; - if blocked.waker_count == 1 { - self.is_empty.store(false, Ordering::SeqCst); - } - } - } - - /// Unregisters a channel operation. - /// - /// If `completed` is `true`, the operation will be removed from the registry. If `completed` - /// is `false`, that means the operation was cancelled so another one will be notified. - fn unregister(&self, key: usize, completed: bool) { - let mut blocked = self.blocked.lock(); - let mut removed = false; - - match blocked.entries.remove(key) { - Some(_) => removed = true, - None => { - if !completed { - // This operation was cancelled. Notify another one. - if let Some((_, opt_waker)) = blocked.entries.iter_mut().next() { - if let Some(w) = opt_waker.take() { - w.wake(); - removed = true; - } - } - } - } - } - - if removed { - blocked.waker_count -= 1; - if blocked.waker_count == 0 { - self.is_empty.store(true, Ordering::SeqCst); - } - } - } - - /// Notifies one blocked channel operation. - #[inline] - fn notify_one(&self) { - if !self.is_empty.load(Ordering::SeqCst) { - let mut blocked = self.blocked.lock(); - - if let Some((_, opt_waker)) = blocked.entries.iter_mut().next() { - // If there is no waker in this entry, that means it was already woken. - if let Some(w) = opt_waker.take() { - w.wake(); - - blocked.waker_count -= 1; - if blocked.waker_count == 0 { - self.is_empty.store(true, Ordering::SeqCst); - } - } - } - } - } - - /// Notifies all blocked channel operations. - #[inline] - fn notify_all(&self) { - if !self.is_empty.load(Ordering::SeqCst) { - let mut blocked = self.blocked.lock(); - - for (_, opt_waker) in blocked.entries.iter_mut() { - // If there is no waker in this entry, that means it was already woken. - if let Some(w) = opt_waker.take() { - w.wake(); - } - } - - blocked.waker_count = 0; - self.is_empty.store(true, Ordering::SeqCst); - } - } -} - -/// A simple spinlock. -struct Spinlock { - flag: AtomicBool, - value: UnsafeCell, -} - -impl Spinlock { - /// Returns a new spinlock initialized with `value`. - fn new(value: T) -> Spinlock { - Spinlock { - flag: AtomicBool::new(false), - value: UnsafeCell::new(value), - } - } - - /// Locks the spinlock. - fn lock(&self) -> SpinlockGuard<'_, T> { - let backoff = Backoff::new(); - while self.flag.swap(true, Ordering::Acquire) { - backoff.snooze(); - } - SpinlockGuard { parent: self } - } -} - -/// A guard holding a spinlock locked. -struct SpinlockGuard<'a, T> { - parent: &'a Spinlock, -} - -impl<'a, T> Drop for SpinlockGuard<'a, T> { - fn drop(&mut self) { - self.parent.flag.store(false, Ordering::Release); - } -} - -impl<'a, T> Deref for SpinlockGuard<'a, T> { - type Target = T; - - fn deref(&self) -> &T { - unsafe { &*self.parent.value.get() } - } -} - -impl<'a, T> DerefMut for SpinlockGuard<'a, T> { - fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.parent.value.get() } - } -} diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 3ad2776ca..402f6b318 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -45,3 +45,6 @@ cfg_unstable! { mod barrier; mod channel; } + +pub(crate) mod registry; +pub(crate) use registry::Registry; diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index cd7a3577f..b56e0fb7b 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -2,18 +2,11 @@ use std::cell::UnsafeCell; use std::fmt; use std::ops::{Deref, DerefMut}; use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; - -use slab::Slab; +use std::sync::atomic::{AtomicBool, Ordering}; use crate::future::Future; -use crate::task::{Context, Poll, Waker}; - -/// Set if the mutex is locked. -const LOCK: usize = 1; - -/// Set if there are tasks blocked on the mutex. -const BLOCKED: usize = 1 << 1; +use crate::sync::Registry; +use crate::task::{Context, Poll}; /// A mutual exclusion primitive for protecting shared data. /// @@ -49,8 +42,8 @@ const BLOCKED: usize = 1 << 1; /// # }) /// ``` pub struct Mutex { - state: AtomicUsize, - blocked: std::sync::Mutex>>, + locked: AtomicBool, + registry: Registry, value: UnsafeCell, } @@ -69,8 +62,8 @@ impl Mutex { /// ``` pub fn new(t: T) -> Mutex { Mutex { - state: AtomicUsize::new(0), - blocked: std::sync::Mutex::new(Slab::new()), + locked: AtomicBool::new(false), + registry: Registry::new(), value: UnsafeCell::new(t), } } @@ -105,75 +98,48 @@ impl Mutex { pub struct LockFuture<'a, T> { mutex: &'a Mutex, opt_key: Option, - acquired: bool, } impl<'a, T> Future for LockFuture<'a, T> { type Output = MutexGuard<'a, T>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.mutex.try_lock() { - Some(guard) => { - self.acquired = true; - Poll::Ready(guard) - } + let poll = match self.mutex.try_lock() { + Some(guard) => Poll::Ready(guard), None => { - let mut blocked = self.mutex.blocked.lock().unwrap(); - // Register the current task. match self.opt_key { - None => { - // Insert a new entry into the list of blocked tasks. - let w = cx.waker().clone(); - let key = blocked.insert(Some(w)); - self.opt_key = Some(key); - - if blocked.len() == 1 { - self.mutex.state.fetch_or(BLOCKED, Ordering::Relaxed); - } - } - Some(key) => { - // There is already an entry in the list of blocked tasks. Just - // reset the waker if it was removed. - if blocked[key].is_none() { - let w = cx.waker().clone(); - blocked[key] = Some(w); - } - } + None => self.opt_key = Some(self.mutex.registry.register(cx)), + Some(key) => self.mutex.registry.reregister(key, cx), } // Try locking again because it's possible the mutex got unlocked just // before the current task was registered as a blocked task. match self.mutex.try_lock() { - Some(guard) => { - self.acquired = true; - Poll::Ready(guard) - } + Some(guard) => Poll::Ready(guard), None => Poll::Pending, } } + }; + + if poll.is_ready() { + // If the current task was registered, unregister now. + if let Some(key) = self.opt_key.take() { + // `true` means the operation is completed. + self.mutex.registry.unregister(key, true); + } } + + poll } } impl Drop for LockFuture<'_, T> { fn drop(&mut self) { + // If the current task was registered, unregister now. if let Some(key) = self.opt_key { - let mut blocked = self.mutex.blocked.lock().unwrap(); - let opt_waker = blocked.remove(key); - - if opt_waker.is_none() && !self.acquired { - // We were awoken but didn't acquire the lock. Wake up another task. - if let Some((_, opt_waker)) = blocked.iter_mut().next() { - if let Some(w) = opt_waker.take() { - w.wake(); - } - } - } - - if blocked.is_empty() { - self.mutex.state.fetch_and(!BLOCKED, Ordering::Relaxed); - } + // `false` means the operation is canceled. + self.mutex.registry.unregister(key, false); } } } @@ -181,7 +147,6 @@ impl Mutex { LockFuture { mutex: self, opt_key: None, - acquired: false, } .await } @@ -220,7 +185,7 @@ impl Mutex { /// # }) /// ``` pub fn try_lock(&self) -> Option> { - if self.state.fetch_or(LOCK, Ordering::Acquire) & LOCK == 0 { + if !self.locked.swap(true, Ordering::Acquire) { Some(MutexGuard(self)) } else { None @@ -303,19 +268,13 @@ unsafe impl Sync for MutexGuard<'_, T> {} impl Drop for MutexGuard<'_, T> { fn drop(&mut self) { - let state = self.0.state.fetch_and(!LOCK, Ordering::AcqRel); + // Use `AcqRel` to: + // 1. Release changes made to the value inside the mutex. + // 2. Acquire changes made to the registry. + self.0.locked.swap(false, Ordering::AcqRel); - // If there are any blocked tasks, wake one of them up. - if state & BLOCKED != 0 { - let mut blocked = self.0.blocked.lock().unwrap(); - - if let Some((_, opt_waker)) = blocked.iter_mut().next() { - // If there is no waker in this entry, that means it was already woken. - if let Some(w) = opt_waker.take() { - w.wake(); - } - } - } + // Notify one blocked `lock()` operation. + self.0.registry.notify_one(); } } diff --git a/src/sync/registry.rs b/src/sync/registry.rs new file mode 100644 index 000000000..157cef452 --- /dev/null +++ b/src/sync/registry.rs @@ -0,0 +1,198 @@ +//! A common utility for building synchronization primitives. +//! +//! When an async operation is blocked, it needs to register itself somewhere so that it can be +//! notified later on. Additionally, operations may be cancellable and we need to make sure +//! notifications are not lost if an operation gets cancelled just after picking up a notification. +//! +//! The `Registry` type helps with registering and notifying such async operations. + +use std::cell::UnsafeCell; +use std::ops::{Deref, DerefMut}; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use crossbeam_utils::Backoff; +use slab::Slab; + +use crate::task::{Context, Waker}; + +/// Set when the entry list is locked. +const LOCKED: usize = 1 << 0; + +/// Set when there are tasks for `notify_one()` to wake. +const NOTIFY_ONE: usize = 1 << 1; + +/// Set when there are tasks for `notify_all()` to wake. +const NOTIFY_ALL: usize = 1 << 2; + +/// A list of blocked operations. +struct Blocked { + /// A list of registered operations. + /// + /// Each entry has a waker associated with the task that is executing the operation. If the + /// waker is set to `None`, that means the task has been woken up but hasn't removed itself + /// from the registry yet. + entries: Slab>, + + /// The number of entries that are `None`. + none_count: usize, +} + +/// A registry of blocked async operations. +pub struct Registry { + /// Holds three bits: `LOCKED`, `NOTIFY_ONE`, and `NOTIFY_ALL`. + flag: AtomicUsize, + + /// A list of registered blocked operations. + blocked: UnsafeCell, +} + +impl Registry { + /// Creates a new registry. + #[inline] + pub fn new() -> Registry { + Registry { + flag: AtomicUsize::new(0), + blocked: UnsafeCell::new(Blocked { + entries: Slab::new(), + none_count: 0, + }), + } + } + + /// Registers a blocked operation and returns a key associated with it. + pub fn register(&self, cx: &Context<'_>) -> usize { + let w = cx.waker().clone(); + self.lock().entries.insert(Some(w)) + } + + /// Re-registers a blocked operation by filling in its waker. + pub fn reregister(&self, key: usize, cx: &Context<'_>) { + let mut blocked = self.lock(); + + match &mut blocked.entries[key] { + None => { + // Fill in the waker. + let w = cx.waker().clone(); + blocked.entries[key] = Some(w); + blocked.none_count -= 1; + } + Some(w) => { + // Replace the waker if the existing one is different. + if !w.will_wake(cx.waker()) { + *w = cx.waker().clone(); + } + } + } + } + + /// Unregisters an operation. + /// + /// If `completed` is `true`, the operation will be removed from the registry. If `completed` + /// is `false`, that means the operation was canceled so another one will be notified instead. + pub fn unregister(&self, key: usize, completed: bool) { + let mut blocked = self.lock(); + + // Remove the operation and check if it has been notified. + if blocked.entries.remove(key).is_none() { + blocked.none_count -= 1; + + // If the operation was notified but also canceled... + if !completed { + // Notify another operation. + if let Some((_, opt_waker)) = blocked.entries.iter_mut().next() { + if let Some(w) = opt_waker.take() { + w.wake(); + blocked.none_count += 1; + } + } + } + } + } + + /// Notifies one blocked operation. + #[inline] + pub fn notify_one(&self) { + // Use `SeqCst` ordering to synchronize with `Lock::drop()`. + if self.flag.load(Ordering::SeqCst) & NOTIFY_ONE != 0 { + self.notify(false); + } + } + + /// Notifies all blocked operations. + // TODO: Delete this attribute when `crate::sync::channel` is stabilized. + #[cfg(feature = "unstable")] + #[inline] + pub fn notify_all(&self) { + // Use `SeqCst` ordering to synchronize with `Lock::drop()`. + if self.flag.load(Ordering::SeqCst) & NOTIFY_ALL != 0 { + self.notify(true); + } + } + + /// Notifies registered operations, either one or all of them. + fn notify(&self, all: bool) { + let mut blocked = &mut *self.lock(); + + for (_, opt_waker) in blocked.entries.iter_mut() { + // If there is no waker in this entry, that means it was already woken. + if let Some(w) = opt_waker.take() { + w.wake(); + blocked.none_count += 1; + } + if !all { + break; + } + } + } + + /// Locks the list of entries. + #[cold] + fn lock(&self) -> Lock<'_> { + let backoff = Backoff::new(); + while self.flag.fetch_or(LOCKED, Ordering::Acquire) & LOCKED != 0 { + backoff.snooze(); + } + Lock { registry: self } + } +} + +/// Guard holding a registry locked. +struct Lock<'a> { + registry: &'a Registry, +} + +impl Drop for Lock<'_> { + #[inline] + fn drop(&mut self) { + let mut flag = 0; + + // If there is at least one entry and all are `Some`, then `notify_one()` has work to do. + if !self.entries.is_empty() && self.none_count == 0 { + flag |= NOTIFY_ONE; + } + + // If there is at least one `Some` entry, then `notify_all()` has work to do. + if self.entries.len() - self.none_count > 0 { + flag |= NOTIFY_ALL; + } + + // Use `SeqCst` ordering to synchronize with `Registry::lock_to_notify()`. + self.registry.flag.store(flag, Ordering::SeqCst); + } +} + +impl Deref for Lock<'_> { + type Target = Blocked; + + #[inline] + fn deref(&self) -> &Blocked { + unsafe { &*self.registry.blocked.get() } + } +} + +impl DerefMut for Lock<'_> { + #[inline] + fn deref_mut(&mut self) -> &mut Blocked { + unsafe { &mut *self.registry.blocked.get() } + } +} diff --git a/src/sync/rwlock.rs b/src/sync/rwlock.rs index ed1d2185b..20619e716 100644 --- a/src/sync/rwlock.rs +++ b/src/sync/rwlock.rs @@ -10,7 +10,7 @@ use crate::future::Future; use crate::task::{Context, Poll, Waker}; /// Set if a write lock is held. -const WRITE_LOCK: usize = 1; +const WRITE_LOCK: usize = 1 << 0; /// Set if there are read operations blocked on the lock. const BLOCKED_READS: usize = 1 << 1; diff --git a/tests/channel.rs b/tests/channel.rs index 91622b0d0..72e9c8174 100644 --- a/tests/channel.rs +++ b/tests/channel.rs @@ -1,3 +1,5 @@ +#![cfg(feature = "unstable")] + use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; From 86ba88bafe3498ec9d8f1e7f54f05b6021f20516 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Fri, 25 Oct 2019 18:42:16 +0200 Subject: [PATCH 2/9] Remove unused import --- benches/task.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benches/task.rs b/benches/task.rs index 7eafee7ea..b31447130 100644 --- a/benches/task.rs +++ b/benches/task.rs @@ -3,7 +3,7 @@ extern crate test; use async_std::task; -use test::{black_box, Bencher}; +use test::Bencher; #[bench] fn block_on(b: &mut Bencher) { From 7462e0bd841f68f2b1ef13afb865b5f43be0e597 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Fri, 25 Oct 2019 22:38:59 +0200 Subject: [PATCH 3/9] Split unregister into complete and cancel --- src/sync/channel.rs | 15 +++++---------- src/sync/mutex.rs | 6 ++---- src/sync/registry.rs | 28 ++++++++++++++-------------- 3 files changed, 21 insertions(+), 28 deletions(-) diff --git a/src/sync/channel.rs b/src/sync/channel.rs index 27ce0a1f2..19402973b 100644 --- a/src/sync/channel.rs +++ b/src/sync/channel.rs @@ -169,8 +169,7 @@ impl Sender { if poll.is_ready() { // If the current task was registered, unregister now. if let Some(key) = self.opt_key.take() { - // `true` means the send operation is completed. - self.sender.channel.sends.unregister(key, true); + self.sender.channel.sends.complete(key); } } @@ -182,8 +181,7 @@ impl Sender { fn drop(&mut self) { // If the current task was registered, unregister now. if let Some(key) = self.opt_key { - // `false` means the send operation is cancelled. - self.sender.channel.sends.unregister(key, false); + self.sender.channel.sends.cancel(key); } } } @@ -390,8 +388,7 @@ impl Receiver { fn drop(&mut self) { // If the current task was registered, unregister now. if let Some(key) = self.opt_key { - // `false` means the receive operation is cancelled. - self.channel.recvs.unregister(key, false); + self.channel.recvs.cancel(key); } } } @@ -486,8 +483,7 @@ impl Drop for Receiver { fn drop(&mut self) { // If the current task was registered as blocked on this stream, unregister now. if let Some(key) = self.opt_key { - // `false` means the last request for a stream item is cancelled. - self.channel.streams.unregister(key, false); + self.channel.streams.cancel(key); } // Decrement the receiver count and disconnect the channel if it drops down to zero. @@ -561,8 +557,7 @@ fn poll_recv( if poll.is_ready() { // If the current task was registered, unregister now. if let Some(key) = opt_key.take() { - // `true` means the receive operation is completed. - registry.unregister(key, true); + registry.complete(key); } } diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index b56e0fb7b..df1588fa1 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -125,8 +125,7 @@ impl Mutex { if poll.is_ready() { // If the current task was registered, unregister now. if let Some(key) = self.opt_key.take() { - // `true` means the operation is completed. - self.mutex.registry.unregister(key, true); + self.mutex.registry.complete(key); } } @@ -138,8 +137,7 @@ impl Mutex { fn drop(&mut self) { // If the current task was registered, unregister now. if let Some(key) = self.opt_key { - // `false` means the operation is canceled. - self.mutex.registry.unregister(key, false); + self.mutex.registry.cancel(key); } } } diff --git a/src/sync/registry.rs b/src/sync/registry.rs index 157cef452..a3f8861ba 100644 --- a/src/sync/registry.rs +++ b/src/sync/registry.rs @@ -85,25 +85,25 @@ impl Registry { } } - /// Unregisters an operation. - /// - /// If `completed` is `true`, the operation will be removed from the registry. If `completed` - /// is `false`, that means the operation was canceled so another one will be notified instead. - pub fn unregister(&self, key: usize, completed: bool) { + /// Unregisters a completed operation. + pub fn complete(&self, key: usize) { let mut blocked = self.lock(); + if blocked.entries.remove(key).is_none() { + blocked.none_count -= 1; + } + } - // Remove the operation and check if it has been notified. + /// Unregisters a cancelled operation. + pub fn cancel(&self, key: usize) { + let mut blocked = self.lock(); if blocked.entries.remove(key).is_none() { blocked.none_count -= 1; - // If the operation was notified but also canceled... - if !completed { - // Notify another operation. - if let Some((_, opt_waker)) = blocked.entries.iter_mut().next() { - if let Some(w) = opt_waker.take() { - w.wake(); - blocked.none_count += 1; - } + // The operation was cancelled and notified so notify another operation instead. + if let Some((_, opt_waker)) = blocked.entries.iter_mut().next() { + if let Some(w) = opt_waker.take() { + w.wake(); + blocked.none_count += 1; } } } From 232219e793e4de3b1e108bc9fa8959f8743f064c Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sun, 27 Oct 2019 18:39:37 +0100 Subject: [PATCH 4/9] Refactoring and renaming --- src/sync/channel.rs | 136 ++++++++++++++----------- src/sync/mod.rs | 4 +- src/sync/mutex.rs | 47 +++++---- src/sync/{registry.rs => waker_map.rs} | 112 +++++++++----------- 4 files changed, 149 insertions(+), 150 deletions(-) rename src/sync/{registry.rs => waker_map.rs} (52%) diff --git a/src/sync/channel.rs b/src/sync/channel.rs index 19402973b..a3dcc5ce9 100644 --- a/src/sync/channel.rs +++ b/src/sync/channel.rs @@ -14,7 +14,7 @@ use std::task::{Context, Poll}; use crossbeam_utils::{Backoff, CachePadded}; use crate::stream::Stream; -use crate::sync::Registry; +use crate::sync::WakerMap; /// Creates a bounded multi-producer multi-consumer channel. /// @@ -128,7 +128,7 @@ impl Sender { /// ``` pub async fn send(&self, msg: T) { struct SendFuture<'a, T> { - sender: &'a Sender, + channel: &'a Channel, msg: Option, opt_key: Option, } @@ -142,23 +142,23 @@ impl Sender { let msg = self.msg.take().unwrap(); // Try sending the message. - let poll = match self.sender.channel.push(msg) { + let poll = match self.channel.try_send(msg) { Ok(()) => Poll::Ready(()), - Err(PushError::Disconnected(msg)) => { + Err(TrySendError::Disconnected(msg)) => { self.msg = Some(msg); Poll::Pending } - Err(PushError::Full(msg)) => { - // Register the current task. + Err(TrySendError::Full(msg)) => { + // Insert this send operation. match self.opt_key { - None => self.opt_key = Some(self.sender.channel.sends.register(cx)), - Some(key) => self.sender.channel.sends.reregister(key, cx), + None => self.opt_key = Some(self.channel.send_wakers.insert(cx)), + Some(key) => self.channel.send_wakers.update(key, cx), } // Try sending the message again. - match self.sender.channel.push(msg) { + match self.channel.try_send(msg) { Ok(()) => Poll::Ready(()), - Err(PushError::Disconnected(msg)) | Err(PushError::Full(msg)) => { + Err(TrySendError::Disconnected(msg)) | Err(TrySendError::Full(msg)) => { self.msg = Some(msg); Poll::Pending } @@ -167,9 +167,9 @@ impl Sender { }; if poll.is_ready() { - // If the current task was registered, unregister now. + // If the current task is in the map, remove it. if let Some(key) = self.opt_key.take() { - self.sender.channel.sends.complete(key); + self.channel.send_wakers.remove(key); } } @@ -179,15 +179,17 @@ impl Sender { impl Drop for SendFuture<'_, T> { fn drop(&mut self) { - // If the current task was registered, unregister now. + // If the current task is still in the map, that means it is being cancelled now. + // Wake up another task instead. if let Some(key) = self.opt_key { - self.sender.channel.sends.cancel(key); + self.channel.send_wakers.remove(key); + self.channel.send_wakers.notify_one(); } } } SendFuture { - sender: self, + channel: &self.channel, msg: Some(msg), opt_key: None, } @@ -338,7 +340,7 @@ pub struct Receiver { /// The inner channel. channel: Arc>, - /// The registration key for this receiver in the `channel.streams` registry. + /// The key for this receiver in the `channel.stream_wakers` map. opt_key: Option, } @@ -380,15 +382,22 @@ impl Receiver { type Output = Option; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - poll_recv(&self.channel, &self.channel.recvs, &mut self.opt_key, cx) + poll_recv( + &self.channel, + &self.channel.recv_wakers, + &mut self.opt_key, + cx, + ) } } impl Drop for RecvFuture<'_, T> { fn drop(&mut self) { - // If the current task was registered, unregister now. + // If the current task is still in the map, that means it is being cancelled now. + // Wake up another task instead. if let Some(key) = self.opt_key { - self.channel.recvs.cancel(key); + self.channel.recv_wakers.remove(key); + self.channel.recv_wakers.notify_one(); } } } @@ -481,9 +490,11 @@ impl Receiver { impl Drop for Receiver { fn drop(&mut self) { - // If the current task was registered as blocked on this stream, unregister now. + // If the current task is still in the stream map, that means it is being cancelled now. + // Wake up another stream task instead. if let Some(key) = self.opt_key { - self.channel.streams.cancel(key); + self.channel.stream_wakers.remove(key); + self.channel.stream_wakers.notify_one(); } // Decrement the receiver count and disconnect the channel if it drops down to zero. @@ -514,7 +525,12 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = &mut *self; - poll_recv(&this.channel, &this.channel.streams, &mut this.opt_key, cx) + poll_recv( + &this.channel, + &this.channel.stream_wakers, + &mut this.opt_key, + cx, + ) } } @@ -526,38 +542,38 @@ impl fmt::Debug for Receiver { /// Polls a receive operation on a channel. /// -/// If the receive operation is blocked, the current task will be registered in `registry` and its -/// registration key will then be stored in `opt_key`. +/// If the receive operation is blocked, the current task will be inserted into `wakers` and its +/// associated key will then be stored in `opt_key`. fn poll_recv( channel: &Channel, - registry: &Registry, + wakers: &WakerMap, opt_key: &mut Option, cx: &mut Context<'_>, ) -> Poll> { // Try receiving a message. - let poll = match channel.pop() { + let poll = match channel.try_recv() { Ok(msg) => Poll::Ready(Some(msg)), - Err(PopError::Disconnected) => Poll::Ready(None), - Err(PopError::Empty) => { - // Register the current task. + Err(TryRecvError::Disconnected) => Poll::Ready(None), + Err(TryRecvError::Empty) => { + // Insert this receive operation. match *opt_key { - None => *opt_key = Some(registry.register(cx)), - Some(key) => registry.reregister(key, cx), + None => *opt_key = Some(wakers.insert(cx)), + Some(key) => wakers.update(key, cx), } // Try receiving a message again. - match channel.pop() { + match channel.try_recv() { Ok(msg) => Poll::Ready(Some(msg)), - Err(PopError::Disconnected) => Poll::Ready(None), - Err(PopError::Empty) => Poll::Pending, + Err(TryRecvError::Disconnected) => Poll::Ready(None), + Err(TryRecvError::Empty) => Poll::Pending, } } }; if poll.is_ready() { - // If the current task was registered, unregister now. + // If the current task is in the map, remove it. if let Some(key) = opt_key.take() { - registry.complete(key); + wakers.remove(key); } } @@ -607,13 +623,13 @@ struct Channel { mark_bit: usize, /// Send operations waiting while the channel is full. - sends: Registry, + send_wakers: WakerMap, /// Receive operations waiting while the channel is empty and not disconnected. - recvs: Registry, + recv_wakers: WakerMap, /// Streams waiting while the channel is empty and not disconnected. - streams: Registry, + stream_wakers: WakerMap, /// The number of currently active `Sender`s. sender_count: AtomicUsize, @@ -667,17 +683,17 @@ impl Channel { mark_bit, head: CachePadded::new(AtomicUsize::new(head)), tail: CachePadded::new(AtomicUsize::new(tail)), - sends: Registry::new(), - recvs: Registry::new(), - streams: Registry::new(), + send_wakers: WakerMap::new(), + recv_wakers: WakerMap::new(), + stream_wakers: WakerMap::new(), sender_count: AtomicUsize::new(1), receiver_count: AtomicUsize::new(1), _marker: PhantomData, } } - /// Attempts to push a message. - fn push(&self, msg: T) -> Result<(), PushError> { + /// Attempts to send a message. + fn try_send(&self, msg: T) -> Result<(), TrySendError> { let backoff = Backoff::new(); let mut tail = self.tail.load(Ordering::Relaxed); @@ -716,10 +732,10 @@ impl Channel { slot.stamp.store(stamp, Ordering::Release); // Wake a blocked receive operation. - self.recvs.notify_one(); + self.recv_wakers.notify_one(); // Wake all blocked streams. - self.streams.notify_all(); + self.stream_wakers.notify_all(); return Ok(()); } @@ -738,9 +754,9 @@ impl Channel { // Check if the channel is disconnected. if tail & self.mark_bit != 0 { - return Err(PushError::Disconnected(msg)); + return Err(TrySendError::Disconnected(msg)); } else { - return Err(PushError::Full(msg)); + return Err(TrySendError::Full(msg)); } } @@ -754,8 +770,8 @@ impl Channel { } } - /// Attempts to pop a message. - fn pop(&self) -> Result { + /// Attempts to receive a message. + fn try_recv(&self) -> Result { let backoff = Backoff::new(); let mut head = self.head.load(Ordering::Relaxed); @@ -794,7 +810,7 @@ impl Channel { slot.stamp.store(stamp, Ordering::Release); // Wake a blocked send operation. - self.sends.notify_one(); + self.send_wakers.notify_one(); return Ok(msg); } @@ -811,10 +827,10 @@ impl Channel { if (tail & !self.mark_bit) == head { // If the channel is disconnected... if tail & self.mark_bit != 0 { - return Err(PopError::Disconnected); + return Err(TryRecvError::Disconnected); } else { // Otherwise, the receive operation is not ready. - return Err(PopError::Empty); + return Err(TryRecvError::Empty); } } @@ -883,9 +899,9 @@ impl Channel { if tail & self.mark_bit == 0 { // Notify everyone blocked on this channel. - self.sends.notify_all(); - self.recvs.notify_all(); - self.streams.notify_all(); + self.send_wakers.notify_all(); + self.recv_wakers.notify_all(); + self.stream_wakers.notify_all(); } } } @@ -916,8 +932,8 @@ impl Drop for Channel { } } -/// An error returned from the `push()` method. -enum PushError { +/// An error returned from the `try_send()` method. +enum TrySendError { /// The channel is full but not disconnected. Full(T), @@ -925,8 +941,8 @@ enum PushError { Disconnected(T), } -/// An error returned from the `pop()` method. -enum PopError { +/// An error returned from the `try_recv()` method. +enum TryRecvError { /// The channel is empty but not disconnected. Empty, diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 402f6b318..d193a3861 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -46,5 +46,5 @@ cfg_unstable! { mod channel; } -pub(crate) mod registry; -pub(crate) use registry::Registry; +pub(crate) mod waker_map; +pub(crate) use waker_map::WakerMap; diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index df1588fa1..b0bc52a77 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -5,7 +5,7 @@ use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use crate::future::Future; -use crate::sync::Registry; +use crate::sync::WakerMap; use crate::task::{Context, Poll}; /// A mutual exclusion primitive for protecting shared data. @@ -43,7 +43,7 @@ use crate::task::{Context, Poll}; /// ``` pub struct Mutex { locked: AtomicBool, - registry: Registry, + wakers: WakerMap, value: UnsafeCell, } @@ -63,7 +63,7 @@ impl Mutex { pub fn new(t: T) -> Mutex { Mutex { locked: AtomicBool::new(false), - registry: Registry::new(), + wakers: WakerMap::new(), value: UnsafeCell::new(t), } } @@ -107,14 +107,14 @@ impl Mutex { let poll = match self.mutex.try_lock() { Some(guard) => Poll::Ready(guard), None => { - // Register the current task. + // Insert this lock operation. match self.opt_key { - None => self.opt_key = Some(self.mutex.registry.register(cx)), - Some(key) => self.mutex.registry.reregister(key, cx), + None => self.opt_key = Some(self.mutex.wakers.insert(cx)), + Some(key) => self.mutex.wakers.update(key, cx), } // Try locking again because it's possible the mutex got unlocked just - // before the current task was registered as a blocked task. + // before the current task was inserted into the waker map. match self.mutex.try_lock() { Some(guard) => Poll::Ready(guard), None => Poll::Pending, @@ -123,9 +123,9 @@ impl Mutex { }; if poll.is_ready() { - // If the current task was registered, unregister now. + // If the current task is in the map, remove it. if let Some(key) = self.opt_key.take() { - self.mutex.registry.complete(key); + self.mutex.wakers.remove(key); } } @@ -135,9 +135,11 @@ impl Mutex { impl Drop for LockFuture<'_, T> { fn drop(&mut self) { - // If the current task was registered, unregister now. + // If the current task is still in the map, that means it is being cancelled now. + // Wake up another task instead. if let Some(key) = self.opt_key { - self.mutex.registry.cancel(key); + self.mutex.wakers.remove(key); + self.mutex.wakers.notify_one(); } } } @@ -229,18 +231,15 @@ impl Mutex { impl fmt::Debug for Mutex { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self.try_lock() { - None => { - struct LockedPlaceholder; - impl fmt::Debug for LockedPlaceholder { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("") - } - } - f.debug_struct("Mutex") - .field("data", &LockedPlaceholder) - .finish() + struct Locked; + impl fmt::Debug for Locked { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("") } + } + + match self.try_lock() { + None => f.debug_struct("Mutex").field("data", &Locked).finish(), Some(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(), } } @@ -268,11 +267,11 @@ impl Drop for MutexGuard<'_, T> { fn drop(&mut self) { // Use `AcqRel` to: // 1. Release changes made to the value inside the mutex. - // 2. Acquire changes made to the registry. + // 2. Acquire changes made to the waker map. self.0.locked.swap(false, Ordering::AcqRel); // Notify one blocked `lock()` operation. - self.0.registry.notify_one(); + self.0.wakers.notify_one(); } } diff --git a/src/sync/registry.rs b/src/sync/waker_map.rs similarity index 52% rename from src/sync/registry.rs rename to src/sync/waker_map.rs index a3f8861ba..eb976f760 100644 --- a/src/sync/registry.rs +++ b/src/sync/waker_map.rs @@ -1,10 +1,8 @@ //! A common utility for building synchronization primitives. //! //! When an async operation is blocked, it needs to register itself somewhere so that it can be -//! notified later on. Additionally, operations may be cancellable and we need to make sure -//! notifications are not lost if an operation gets cancelled just after picking up a notification. -//! -//! The `Registry` type helps with registering and notifying such async operations. +//! notified later on. The `WakerMap` type helps with keeping track of such async operations and +//! notifying them when they may make progress. use std::cell::UnsafeCell; use std::ops::{Deref, DerefMut}; @@ -24,57 +22,59 @@ const NOTIFY_ONE: usize = 1 << 1; /// Set when there are tasks for `notify_all()` to wake. const NOTIFY_ALL: usize = 1 << 2; -/// A list of blocked operations. -struct Blocked { - /// A list of registered operations. +/// Inner representation of `WakerMap`. +struct Inner { + /// A list of entries in the map. + /// + /// Each entry has an optional waker associated with the task that is executing the operation. + /// If the waker is set to `None`, that means the task has been woken up but hasn't removed + /// itself from the `WakerMap` yet. /// - /// Each entry has a waker associated with the task that is executing the operation. If the - /// waker is set to `None`, that means the task has been woken up but hasn't removed itself - /// from the registry yet. + /// The key of each entry is its index in the `Slab`. entries: Slab>, - /// The number of entries that are `None`. + /// The number of entries that have the waker set to `None`. none_count: usize, } -/// A registry of blocked async operations. -pub struct Registry { +/// A map holding wakers. +pub struct WakerMap { /// Holds three bits: `LOCKED`, `NOTIFY_ONE`, and `NOTIFY_ALL`. flag: AtomicUsize, - /// A list of registered blocked operations. - blocked: UnsafeCell, + /// A map holding wakers. + inner: UnsafeCell, } -impl Registry { - /// Creates a new registry. +impl WakerMap { + /// Creates a new `WakerMap`. #[inline] - pub fn new() -> Registry { - Registry { + pub fn new() -> WakerMap { + WakerMap { flag: AtomicUsize::new(0), - blocked: UnsafeCell::new(Blocked { + inner: UnsafeCell::new(Inner { entries: Slab::new(), none_count: 0, }), } } - /// Registers a blocked operation and returns a key associated with it. - pub fn register(&self, cx: &Context<'_>) -> usize { + /// Inserts a waker for a blocked operation and returns a key associated with it. + pub fn insert(&self, cx: &Context<'_>) -> usize { let w = cx.waker().clone(); self.lock().entries.insert(Some(w)) } - /// Re-registers a blocked operation by filling in its waker. - pub fn reregister(&self, key: usize, cx: &Context<'_>) { - let mut blocked = self.lock(); + /// Updates the waker of a previously inserted entry. + pub fn update(&self, key: usize, cx: &Context<'_>) { + let mut inner = self.lock(); - match &mut blocked.entries[key] { + match &mut inner.entries[key] { None => { // Fill in the waker. let w = cx.waker().clone(); - blocked.entries[key] = Some(w); - blocked.none_count -= 1; + inner.entries[key] = Some(w); + inner.none_count -= 1; } Some(w) => { // Replace the waker if the existing one is different. @@ -85,27 +85,11 @@ impl Registry { } } - /// Unregisters a completed operation. - pub fn complete(&self, key: usize) { - let mut blocked = self.lock(); - if blocked.entries.remove(key).is_none() { - blocked.none_count -= 1; - } - } - - /// Unregisters a cancelled operation. - pub fn cancel(&self, key: usize) { - let mut blocked = self.lock(); - if blocked.entries.remove(key).is_none() { - blocked.none_count -= 1; - - // The operation was cancelled and notified so notify another operation instead. - if let Some((_, opt_waker)) = blocked.entries.iter_mut().next() { - if let Some(w) = opt_waker.take() { - w.wake(); - blocked.none_count += 1; - } - } + /// Removes a waker. + pub fn remove(&self, key: usize) { + let mut inner = self.lock(); + if inner.entries.remove(key).is_none() { + inner.none_count -= 1; } } @@ -119,7 +103,7 @@ impl Registry { } /// Notifies all blocked operations. - // TODO: Delete this attribute when `crate::sync::channel` is stabilized. + // TODO: Delete this attribute when `crate::sync::channel()` is stabilized. #[cfg(feature = "unstable")] #[inline] pub fn notify_all(&self) { @@ -129,15 +113,15 @@ impl Registry { } } - /// Notifies registered operations, either one or all of them. + /// Notifies blocked operations, either one or all of them. fn notify(&self, all: bool) { - let mut blocked = &mut *self.lock(); + let mut inner = &mut *self.lock(); - for (_, opt_waker) in blocked.entries.iter_mut() { + for (_, opt_waker) in inner.entries.iter_mut() { // If there is no waker in this entry, that means it was already woken. if let Some(w) = opt_waker.take() { w.wake(); - blocked.none_count += 1; + inner.none_count += 1; } if !all { break; @@ -152,13 +136,13 @@ impl Registry { while self.flag.fetch_or(LOCKED, Ordering::Acquire) & LOCKED != 0 { backoff.snooze(); } - Lock { registry: self } + Lock { waker_map: self } } } -/// Guard holding a registry locked. +/// A guard holding a `WakerMap` locked. struct Lock<'a> { - registry: &'a Registry, + waker_map: &'a WakerMap, } impl Drop for Lock<'_> { @@ -176,23 +160,23 @@ impl Drop for Lock<'_> { flag |= NOTIFY_ALL; } - // Use `SeqCst` ordering to synchronize with `Registry::lock_to_notify()`. - self.registry.flag.store(flag, Ordering::SeqCst); + // Use `SeqCst` ordering to synchronize with `WakerMap::lock_to_notify()`. + self.waker_map.flag.store(flag, Ordering::SeqCst); } } impl Deref for Lock<'_> { - type Target = Blocked; + type Target = Inner; #[inline] - fn deref(&self) -> &Blocked { - unsafe { &*self.registry.blocked.get() } + fn deref(&self) -> &Inner { + unsafe { &*self.waker_map.inner.get() } } } impl DerefMut for Lock<'_> { #[inline] - fn deref_mut(&mut self) -> &mut Blocked { - unsafe { &mut *self.registry.blocked.get() } + fn deref_mut(&mut self) -> &mut Inner { + unsafe { &mut *self.waker_map.inner.get() } } } From bbe8184b9570048ba13512caa057fa4f56f519b8 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sun, 27 Oct 2019 19:24:20 +0100 Subject: [PATCH 5/9] Split remove() into complete() and cancel() --- src/sync/channel.rs | 15 +++++---------- src/sync/mutex.rs | 6 ++---- src/sync/waker_map.rs | 21 +++++++++++++++++++-- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/src/sync/channel.rs b/src/sync/channel.rs index a3dcc5ce9..4222fb098 100644 --- a/src/sync/channel.rs +++ b/src/sync/channel.rs @@ -169,7 +169,7 @@ impl Sender { if poll.is_ready() { // If the current task is in the map, remove it. if let Some(key) = self.opt_key.take() { - self.channel.send_wakers.remove(key); + self.channel.send_wakers.complete(key); } } @@ -182,8 +182,7 @@ impl Sender { // If the current task is still in the map, that means it is being cancelled now. // Wake up another task instead. if let Some(key) = self.opt_key { - self.channel.send_wakers.remove(key); - self.channel.send_wakers.notify_one(); + self.channel.send_wakers.cancel(key); } } } @@ -394,10 +393,8 @@ impl Receiver { impl Drop for RecvFuture<'_, T> { fn drop(&mut self) { // If the current task is still in the map, that means it is being cancelled now. - // Wake up another task instead. if let Some(key) = self.opt_key { - self.channel.recv_wakers.remove(key); - self.channel.recv_wakers.notify_one(); + self.channel.recv_wakers.cancel(key); } } } @@ -491,10 +488,8 @@ impl Receiver { impl Drop for Receiver { fn drop(&mut self) { // If the current task is still in the stream map, that means it is being cancelled now. - // Wake up another stream task instead. if let Some(key) = self.opt_key { - self.channel.stream_wakers.remove(key); - self.channel.stream_wakers.notify_one(); + self.channel.stream_wakers.cancel(key); } // Decrement the receiver count and disconnect the channel if it drops down to zero. @@ -573,7 +568,7 @@ fn poll_recv( if poll.is_ready() { // If the current task is in the map, remove it. if let Some(key) = opt_key.take() { - wakers.remove(key); + wakers.complete(key); } } diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index b0bc52a77..a79ed32a3 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -125,7 +125,7 @@ impl Mutex { if poll.is_ready() { // If the current task is in the map, remove it. if let Some(key) = self.opt_key.take() { - self.mutex.wakers.remove(key); + self.mutex.wakers.complete(key); } } @@ -136,10 +136,8 @@ impl Mutex { impl Drop for LockFuture<'_, T> { fn drop(&mut self) { // If the current task is still in the map, that means it is being cancelled now. - // Wake up another task instead. if let Some(key) = self.opt_key { - self.mutex.wakers.remove(key); - self.mutex.wakers.notify_one(); + self.mutex.wakers.cancel(key); } } } diff --git a/src/sync/waker_map.rs b/src/sync/waker_map.rs index eb976f760..b0268fd81 100644 --- a/src/sync/waker_map.rs +++ b/src/sync/waker_map.rs @@ -85,14 +85,31 @@ impl WakerMap { } } - /// Removes a waker. - pub fn remove(&self, key: usize) { + /// Removes the waker of a completed operation. + pub fn complete(&self, key: usize) { let mut inner = self.lock(); if inner.entries.remove(key).is_none() { inner.none_count -= 1; } } + /// Removes the waker of a cancelled operation. + pub fn cancel(&self, key: usize) { + let mut inner = self.lock(); + if inner.entries.remove(key).is_none() { + inner.none_count -= 1; + + // The operation was cancelled and notified so notify another operation instead. + if let Some((_, opt_waker)) = inner.entries.iter_mut().next() { + // If there is no waker in this entry, that means it was already woken. + if let Some(w) = opt_waker.take() { + w.wake(); + inner.none_count += 1; + } + } + } + } + /// Notifies one blocked operation. #[inline] pub fn notify_one(&self) { From 5b5c1030c6227ca7fa17cedf6dd55a5cb3469bc3 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Wed, 30 Oct 2019 10:52:06 +0100 Subject: [PATCH 6/9] Rename to WakerSet --- src/sync/channel.rs | 28 +++++++++---------- src/sync/mod.rs | 4 +-- src/sync/mutex.rs | 14 +++++----- src/sync/{waker_map.rs => waker_set.rs} | 36 ++++++++++++------------- 4 files changed, 41 insertions(+), 41 deletions(-) rename src/sync/{waker_map.rs => waker_set.rs} (88%) diff --git a/src/sync/channel.rs b/src/sync/channel.rs index 4222fb098..403bee742 100644 --- a/src/sync/channel.rs +++ b/src/sync/channel.rs @@ -14,7 +14,7 @@ use std::task::{Context, Poll}; use crossbeam_utils::{Backoff, CachePadded}; use crate::stream::Stream; -use crate::sync::WakerMap; +use crate::sync::WakerSet; /// Creates a bounded multi-producer multi-consumer channel. /// @@ -167,7 +167,7 @@ impl Sender { }; if poll.is_ready() { - // If the current task is in the map, remove it. + // If the current task is in the set, remove it. if let Some(key) = self.opt_key.take() { self.channel.send_wakers.complete(key); } @@ -179,7 +179,7 @@ impl Sender { impl Drop for SendFuture<'_, T> { fn drop(&mut self) { - // If the current task is still in the map, that means it is being cancelled now. + // If the current task is still in the set, that means it is being cancelled now. // Wake up another task instead. if let Some(key) = self.opt_key { self.channel.send_wakers.cancel(key); @@ -339,7 +339,7 @@ pub struct Receiver { /// The inner channel. channel: Arc>, - /// The key for this receiver in the `channel.stream_wakers` map. + /// The key for this receiver in the `channel.stream_wakers` set. opt_key: Option, } @@ -392,7 +392,7 @@ impl Receiver { impl Drop for RecvFuture<'_, T> { fn drop(&mut self) { - // If the current task is still in the map, that means it is being cancelled now. + // If the current task is still in the set, that means it is being cancelled now. if let Some(key) = self.opt_key { self.channel.recv_wakers.cancel(key); } @@ -487,7 +487,7 @@ impl Receiver { impl Drop for Receiver { fn drop(&mut self) { - // If the current task is still in the stream map, that means it is being cancelled now. + // If the current task is still in the stream set, that means it is being cancelled now. if let Some(key) = self.opt_key { self.channel.stream_wakers.cancel(key); } @@ -541,7 +541,7 @@ impl fmt::Debug for Receiver { /// associated key will then be stored in `opt_key`. fn poll_recv( channel: &Channel, - wakers: &WakerMap, + wakers: &WakerSet, opt_key: &mut Option, cx: &mut Context<'_>, ) -> Poll> { @@ -566,7 +566,7 @@ fn poll_recv( }; if poll.is_ready() { - // If the current task is in the map, remove it. + // If the current task is in the set, remove it. if let Some(key) = opt_key.take() { wakers.complete(key); } @@ -618,13 +618,13 @@ struct Channel { mark_bit: usize, /// Send operations waiting while the channel is full. - send_wakers: WakerMap, + send_wakers: WakerSet, /// Receive operations waiting while the channel is empty and not disconnected. - recv_wakers: WakerMap, + recv_wakers: WakerSet, /// Streams waiting while the channel is empty and not disconnected. - stream_wakers: WakerMap, + stream_wakers: WakerSet, /// The number of currently active `Sender`s. sender_count: AtomicUsize, @@ -678,9 +678,9 @@ impl Channel { mark_bit, head: CachePadded::new(AtomicUsize::new(head)), tail: CachePadded::new(AtomicUsize::new(tail)), - send_wakers: WakerMap::new(), - recv_wakers: WakerMap::new(), - stream_wakers: WakerMap::new(), + send_wakers: WakerSet::new(), + recv_wakers: WakerSet::new(), + stream_wakers: WakerSet::new(), sender_count: AtomicUsize::new(1), receiver_count: AtomicUsize::new(1), _marker: PhantomData, diff --git a/src/sync/mod.rs b/src/sync/mod.rs index d193a3861..733ee9b1a 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -46,5 +46,5 @@ cfg_unstable! { mod channel; } -pub(crate) mod waker_map; -pub(crate) use waker_map::WakerMap; +pub(crate) mod waker_set; +pub(crate) use waker_set::WakerSet; diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index a79ed32a3..acc5abb7c 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -5,7 +5,7 @@ use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use crate::future::Future; -use crate::sync::WakerMap; +use crate::sync::WakerSet; use crate::task::{Context, Poll}; /// A mutual exclusion primitive for protecting shared data. @@ -43,7 +43,7 @@ use crate::task::{Context, Poll}; /// ``` pub struct Mutex { locked: AtomicBool, - wakers: WakerMap, + wakers: WakerSet, value: UnsafeCell, } @@ -63,7 +63,7 @@ impl Mutex { pub fn new(t: T) -> Mutex { Mutex { locked: AtomicBool::new(false), - wakers: WakerMap::new(), + wakers: WakerSet::new(), value: UnsafeCell::new(t), } } @@ -114,7 +114,7 @@ impl Mutex { } // Try locking again because it's possible the mutex got unlocked just - // before the current task was inserted into the waker map. + // before the current task was inserted into the waker set. match self.mutex.try_lock() { Some(guard) => Poll::Ready(guard), None => Poll::Pending, @@ -123,7 +123,7 @@ impl Mutex { }; if poll.is_ready() { - // If the current task is in the map, remove it. + // If the current task is in the set, remove it. if let Some(key) = self.opt_key.take() { self.mutex.wakers.complete(key); } @@ -135,7 +135,7 @@ impl Mutex { impl Drop for LockFuture<'_, T> { fn drop(&mut self) { - // If the current task is still in the map, that means it is being cancelled now. + // If the current task is still in the set, that means it is being cancelled now. if let Some(key) = self.opt_key { self.mutex.wakers.cancel(key); } @@ -265,7 +265,7 @@ impl Drop for MutexGuard<'_, T> { fn drop(&mut self) { // Use `AcqRel` to: // 1. Release changes made to the value inside the mutex. - // 2. Acquire changes made to the waker map. + // 2. Acquire changes made to the waker set. self.0.locked.swap(false, Ordering::AcqRel); // Notify one blocked `lock()` operation. diff --git a/src/sync/waker_map.rs b/src/sync/waker_set.rs similarity index 88% rename from src/sync/waker_map.rs rename to src/sync/waker_set.rs index b0268fd81..68251085f 100644 --- a/src/sync/waker_map.rs +++ b/src/sync/waker_set.rs @@ -1,7 +1,7 @@ //! A common utility for building synchronization primitives. //! //! When an async operation is blocked, it needs to register itself somewhere so that it can be -//! notified later on. The `WakerMap` type helps with keeping track of such async operations and +//! notified later on. The `WakerSet` type helps with keeping track of such async operations and //! notifying them when they may make progress. use std::cell::UnsafeCell; @@ -22,13 +22,13 @@ const NOTIFY_ONE: usize = 1 << 1; /// Set when there are tasks for `notify_all()` to wake. const NOTIFY_ALL: usize = 1 << 2; -/// Inner representation of `WakerMap`. +/// Inner representation of `WakerSet`. struct Inner { - /// A list of entries in the map. + /// A list of entries in the set. /// /// Each entry has an optional waker associated with the task that is executing the operation. /// If the waker is set to `None`, that means the task has been woken up but hasn't removed - /// itself from the `WakerMap` yet. + /// itself from the `WakerSet` yet. /// /// The key of each entry is its index in the `Slab`. entries: Slab>, @@ -37,20 +37,20 @@ struct Inner { none_count: usize, } -/// A map holding wakers. -pub struct WakerMap { +/// A set holding wakers. +pub struct WakerSet { /// Holds three bits: `LOCKED`, `NOTIFY_ONE`, and `NOTIFY_ALL`. flag: AtomicUsize, - /// A map holding wakers. + /// A set holding wakers. inner: UnsafeCell, } -impl WakerMap { - /// Creates a new `WakerMap`. +impl WakerSet { + /// Creates a new `WakerSet`. #[inline] - pub fn new() -> WakerMap { - WakerMap { + pub fn new() -> WakerSet { + WakerSet { flag: AtomicUsize::new(0), inner: UnsafeCell::new(Inner { entries: Slab::new(), @@ -153,13 +153,13 @@ impl WakerMap { while self.flag.fetch_or(LOCKED, Ordering::Acquire) & LOCKED != 0 { backoff.snooze(); } - Lock { waker_map: self } + Lock { waker_set: self } } } -/// A guard holding a `WakerMap` locked. +/// A guard holding a `WakerSet` locked. struct Lock<'a> { - waker_map: &'a WakerMap, + waker_set: &'a WakerSet, } impl Drop for Lock<'_> { @@ -177,8 +177,8 @@ impl Drop for Lock<'_> { flag |= NOTIFY_ALL; } - // Use `SeqCst` ordering to synchronize with `WakerMap::lock_to_notify()`. - self.waker_map.flag.store(flag, Ordering::SeqCst); + // Use `SeqCst` ordering to synchronize with `WakerSet::lock_to_notify()`. + self.waker_set.flag.store(flag, Ordering::SeqCst); } } @@ -187,13 +187,13 @@ impl Deref for Lock<'_> { #[inline] fn deref(&self) -> &Inner { - unsafe { &*self.waker_map.inner.get() } + unsafe { &*self.waker_set.inner.get() } } } impl DerefMut for Lock<'_> { #[inline] fn deref_mut(&mut self) -> &mut Inner { - unsafe { &mut *self.waker_map.inner.get() } + unsafe { &mut *self.waker_set.inner.get() } } } From 4e55884f0f59d1a7d72ec4193481d6efbb9621bd Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Wed, 30 Oct 2019 11:13:44 +0100 Subject: [PATCH 7/9] Ignore clippy warning --- src/sync/rwlock.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sync/rwlock.rs b/src/sync/rwlock.rs index 20619e716..a0d0f07a8 100644 --- a/src/sync/rwlock.rs +++ b/src/sync/rwlock.rs @@ -10,6 +10,7 @@ use crate::future::Future; use crate::task::{Context, Poll, Waker}; /// Set if a write lock is held. +#[allow(clippy::identity_op)] const WRITE_LOCK: usize = 1 << 0; /// Set if there are read operations blocked on the lock. From a83070c801eaa3fadde43eb00b103c831723017e Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Wed, 30 Oct 2019 11:31:31 +0100 Subject: [PATCH 8/9] Ignore another clippy warning --- src/sync/waker_set.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sync/waker_set.rs b/src/sync/waker_set.rs index 68251085f..8fd1b6210 100644 --- a/src/sync/waker_set.rs +++ b/src/sync/waker_set.rs @@ -14,6 +14,7 @@ use slab::Slab; use crate::task::{Context, Waker}; /// Set when the entry list is locked. +#[allow(clippy::identity_op)] const LOCKED: usize = 1 << 0; /// Set when there are tasks for `notify_one()` to wake. From 1709eacf85ffa70585c893eb832ff92e6122d1cc Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Thu, 31 Oct 2019 01:34:32 +0100 Subject: [PATCH 9/9] Use stronger SeqCst ordering --- src/sync/mutex.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index acc5abb7c..fcd030d8f 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -183,7 +183,7 @@ impl Mutex { /// # }) /// ``` pub fn try_lock(&self) -> Option> { - if !self.locked.swap(true, Ordering::Acquire) { + if !self.locked.swap(true, Ordering::SeqCst) { Some(MutexGuard(self)) } else { None @@ -263,10 +263,8 @@ unsafe impl Sync for MutexGuard<'_, T> {} impl Drop for MutexGuard<'_, T> { fn drop(&mut self) { - // Use `AcqRel` to: - // 1. Release changes made to the value inside the mutex. - // 2. Acquire changes made to the waker set. - self.0.locked.swap(false, Ordering::AcqRel); + // Use `SeqCst` ordering to synchronize with `WakerSet::insert()` and `WakerSet::update()`. + self.0.locked.store(false, Ordering::SeqCst); // Notify one blocked `lock()` operation. self.0.wakers.notify_one();