Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an efficient implementation based on critical-section #148

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ jobs:
run: cargo hack build --all --no-dev-deps
- run: cargo hack build --all --target thumbv7m-none-eabi --no-default-features --no-dev-deps
- run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps --features portable-atomic
- run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps --features critical-section
#- name: Install wasm-pack
# uses: taiki-e/install-action@wasm-pack
#- run: wasm-pack test --node
Expand Down Expand Up @@ -93,6 +94,7 @@ jobs:
uses: taiki-e/install-action@cargo-hack
- run: cargo hack build --all --rust-version
- run: cargo hack build --all --no-default-features --rust-version
- run: cargo hack build --all --no-default-features --rust-version --features critical-section

clippy:
runs-on: ubuntu-latest
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)'] }

[dependencies]
concurrent-queue = { version = "2.4.0", default-features = false }
critical-section = { version = "1.2.0", default-features = false, optional = true }
pin-project-lite = "0.2.12"
portable-atomic-util = { version = "0.2.0", default-features = false, optional = true, features = ["alloc"] }

Expand All @@ -45,6 +46,7 @@ default-features = false
optional = true

[dev-dependencies]
critical-section = { version = "1.2.0", features = ["std"] }
futures-lite = "2.0.0"
try-lock = "0.2.5"
waker-fn = "1"
Expand Down
4 changes: 2 additions & 2 deletions examples/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
//! This mutex exposes both blocking and async methods for acquiring a lock.

#[cfg(not(target_family = "wasm"))]
#[cfg(all(feature = "std", not(target_family = "wasm")))]
mod example {
#![allow(dead_code)]

Expand Down Expand Up @@ -171,7 +171,7 @@ mod example {
}
}

#[cfg(target_family = "wasm")]
#[cfg(any(target_family = "wasm", not(feature = "std")))]
mod example {
pub(super) fn entry() {
println!("This example is not supported on wasm yet.");
Expand Down
231 changes: 150 additions & 81 deletions src/std.rs → src/intrusive.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,32 @@
//! libstd-based implementation of `event-listener`.
//! Intrusive linked list-based implementation of `event-listener`.
//!
//! This implementation crates an intrusive linked list of listeners.
//! This implementation crates an intrusive linked list of listeners. This list
//! is secured using either a libstd mutex or a critical section.

use crate::notify::{GenericNotify, Internal, Notification};
use crate::sync::atomic::Ordering;
use crate::sync::cell::{Cell, UnsafeCell};
use crate::sync::{Mutex, MutexGuard};
use crate::{RegisterResult, State, TaskRef};

#[cfg(feature = "critical-section")]
use core::cell::RefCell;
#[cfg(all(feature = "std", not(feature = "critical-section")))]
use core::ops::{Deref, DerefMut};

use core::marker::PhantomPinned;
use core::mem;
use core::ops::{Deref, DerefMut};
use core::pin::Pin;
use core::ptr::NonNull;

pub(super) struct List<T>(Mutex<Inner<T>>);
#[cfg(all(feature = "std", not(feature = "critical-section")))]
use crate::sync::{Mutex, MutexGuard};
#[cfg(feature = "critical-section")]
use critical_section::Mutex;

pub(super) struct List<T>(
#[cfg(all(feature = "std", not(feature = "critical-section")))] Mutex<Inner<T>>,
#[cfg(feature = "critical-section")] Mutex<RefCell<Inner<T>>>,
);

struct Inner<T> {
/// The head of the linked list.
Expand All @@ -36,27 +48,77 @@ struct Inner<T> {
impl<T> List<T> {
/// Create a new, empty event listener list.
pub(super) fn new() -> Self {
Self(Mutex::new(Inner {
let inner = Inner {
head: None,
tail: None,
next: None,
len: 0,
notified: 0,
}))
};

#[cfg(feature = "critical-section")]
let inner = RefCell::new(inner);

Self(Mutex::new(inner))
}

/// Get the total number of listeners without blocking.
#[cfg(all(feature = "std", not(feature = "critical-section")))]
pub(crate) fn try_total_listeners(&self) -> Option<usize> {
self.0.try_lock().ok().map(|list| list.len)
}

/// Get the total number of listeners without blocking.
#[cfg(feature = "critical-section")]
pub(crate) fn try_total_listeners(&self) -> Option<usize> {
Some(critical_section::with(|cs| self.0.borrow(cs).borrow().len))
}

/// Get the total number of listeners with blocking.
#[cfg(all(feature = "std", not(feature = "critical-section")))]
pub(crate) fn total_listeners(&self) -> usize {
self.0.lock().unwrap_or_else(|e| e.into_inner()).len
}

/// Get the total number of listeners with blocking.
#[cfg(all(feature = "std", feature = "critical-section"))]
notgull marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) fn total_listeners(&self) -> usize {
self.try_total_listeners().unwrap()
}
}

impl<T> crate::Inner<T> {
#[cfg(all(feature = "std", not(feature = "critical-section")))]
fn with_inner<R>(&self, f: impl FnOnce(&mut Inner<T>) -> R) -> R {
let mut list = self.lock();
f(&mut list)
}

#[cfg(feature = "critical-section")]
fn with_inner<R>(&self, f: impl FnOnce(&mut Inner<T>) -> R) -> R {
struct ListWrapper<'a, T> {
inner: &'a crate::Inner<T>,
list: &'a mut Inner<T>,
}

impl<T> Drop for ListWrapper<'_, T> {
fn drop(&mut self) {
update_notified(&self.inner.notified, self.list);
}
}

critical_section::with(move |cs| {
let mut list = self.list.0.borrow_ref_mut(cs);
let wrapper = ListWrapper {
inner: self,
list: &mut *list,
};

f(wrapper.list)
})
}

#[cfg(all(feature = "std", not(feature = "critical-section")))]
fn lock(&self) -> ListLock<'_, '_, T> {
ListLock {
inner: self,
Expand All @@ -66,37 +128,37 @@ impl<T> crate::Inner<T> {

/// Add a new listener to the list.
pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
let mut inner = self.lock();

listener.as_mut().set(Some(Listener {
link: UnsafeCell::new(Link {
state: Cell::new(State::Created),
prev: Cell::new(inner.tail),
next: Cell::new(None),
}),
_pin: PhantomPinned,
}));
let listener = listener.as_pin_mut().unwrap();

{
let entry_guard = listener.link.get();
// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe { entry_guard.deref() };

// Replace the tail with the new entry.
match mem::replace(&mut inner.tail, Some(entry.into())) {
None => inner.head = Some(entry.into()),
Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) },
};
}
self.with_inner(|inner| {
listener.as_mut().set(Some(Listener {
link: UnsafeCell::new(Link {
state: Cell::new(State::Created),
prev: Cell::new(inner.tail),
next: Cell::new(None),
}),
_pin: PhantomPinned,
}));
let listener = listener.as_pin_mut().unwrap();

{
let entry_guard = listener.link.get();
// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe { entry_guard.deref() };

// Replace the tail with the new entry.
match mem::replace(&mut inner.tail, Some(entry.into())) {
None => inner.head = Some(entry.into()),
Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) },
};
}

// If there are no unnotified entries, this is the first one.
if inner.next.is_none() {
inner.next = inner.tail;
}
// If there are no unnotified entries, this is the first one.
if inner.next.is_none() {
inner.next = inner.tail;
}

// Bump the entry count.
inner.len += 1;
// Bump the entry count.
inner.len += 1;
});
}

/// Remove a listener from the list.
Expand All @@ -105,13 +167,13 @@ impl<T> crate::Inner<T> {
listener: Pin<&mut Option<Listener<T>>>,
propagate: bool,
) -> Option<State<T>> {
self.lock().remove(listener, propagate)
self.with_inner(|inner| inner.remove(listener, propagate))
}

/// Notifies a number of entries.
#[cold]
pub(crate) fn notify(&self, notify: impl Notification<Tag = T>) -> usize {
self.lock().notify(notify)
self.with_inner(|inner| inner.notify(notify))
}

/// Register a task to be notified when the event is triggered.
Expand All @@ -123,41 +185,42 @@ impl<T> crate::Inner<T> {
mut listener: Pin<&mut Option<Listener<T>>>,
task: TaskRef<'_>,
) -> RegisterResult<T> {
let mut inner = self.lock();
let entry_guard = match listener.as_mut().as_pin_mut() {
Some(listener) => listener.link.get(),
None => return RegisterResult::NeverInserted,
};
// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe { entry_guard.deref() };

// Take out the state and check it.
match entry.state.replace(State::NotifiedTaken) {
State::Notified { tag, .. } => {
// We have been notified, remove the listener.
inner.remove(listener, false);
RegisterResult::Notified(tag)
}
self.with_inner(|inner| {
let entry_guard = match listener.as_mut().as_pin_mut() {
Some(listener) => listener.link.get(),
None => return RegisterResult::NeverInserted,
};
// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe { entry_guard.deref() };

State::Task(other_task) => {
// Only replace the task if it's different.
entry.state.set(State::Task({
if !task.will_wake(other_task.as_task_ref()) {
task.into_task()
} else {
other_task
}
}));
// Take out the state and check it.
match entry.state.replace(State::NotifiedTaken) {
State::Notified { tag, .. } => {
// We have been notified, remove the listener.
inner.remove(listener, false);
RegisterResult::Notified(tag)
}

RegisterResult::Registered
}
State::Task(other_task) => {
// Only replace the task if it's different.
entry.state.set(State::Task({
if !task.will_wake(other_task.as_task_ref()) {
task.into_task()
} else {
other_task
}
}));

RegisterResult::Registered
}

_ => {
// We have not been notified, register the task.
entry.state.set(State::Task(task.into_task()));
RegisterResult::Registered
_ => {
// We have not been notified, register the task.
entry.state.set(State::Task(task.into_task()));
RegisterResult::Registered
}
}
}
})
}
}

Expand Down Expand Up @@ -274,11 +337,13 @@ impl<T> Inner<T> {
}
}

#[cfg(all(feature = "std", not(feature = "critical-section")))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo it would be a good idea to put ListLock into a separate file and mark the whole file with #[cfg(...)], this would make this easier to read.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've inlined the entire construct into with_inner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh that's quite neat, thank you.

struct ListLock<'a, 'b, T> {
lock: MutexGuard<'a, Inner<T>>,
inner: &'b crate::Inner<T>,
}

#[cfg(all(feature = "std", not(feature = "critical-section")))]
impl<T> Deref for ListLock<'_, '_, T> {
type Target = Inner<T>;

Expand All @@ -287,25 +352,29 @@ impl<T> Deref for ListLock<'_, '_, T> {
}
}

#[cfg(all(feature = "std", not(feature = "critical-section")))]
impl<T> DerefMut for ListLock<'_, '_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.lock
}
}

#[cfg(all(feature = "std", not(feature = "critical-section")))]
impl<T> Drop for ListLock<'_, '_, T> {
fn drop(&mut self) {
let list = &mut **self;
update_notified(&self.inner.notified, &self.lock);
}
}

// Update the notified count.
let notified = if list.notified < list.len {
list.notified
} else {
usize::MAX
};
fn update_notified<T>(slot: &crate::sync::atomic::AtomicUsize, list: &Inner<T>) {
// Update the notified count.
let notified = if list.notified < list.len {
list.notified
} else {
usize::MAX
};

self.inner.notified.store(notified, Ordering::Release);
}
slot.store(notified, Ordering::Release);
}

pub(crate) struct Listener<T> {
Expand Down Expand Up @@ -358,15 +427,15 @@ mod tests {
inner.insert(listen2.as_mut());
inner.insert(listen3.as_mut());

assert_eq!(inner.lock().len, 3);
assert_eq!(inner.list.try_total_listeners(), Some(3));

// Remove one.
assert_eq!(inner.remove(listen2, false), Some(State::Created));
assert_eq!(inner.lock().len, 2);
assert_eq!(inner.list.try_total_listeners(), Some(2));

// Remove another.
assert_eq!(inner.remove(listen1, false), Some(State::Created));
assert_eq!(inner.lock().len, 1);
assert_eq!(inner.list.try_total_listeners(), Some(1));
}

#[test]
Expand Down
Loading