diff --git a/Cargo.toml b/Cargo.toml index 5b76477..cc3dc6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,11 @@ fastrand = "2.0.0" flume = "0.11.0" futures-lite = "2.0.0" waker-fn = "1.1.0" +async-std = "1.0" + +[patch.crates-io] +# use local async-lock as dependency of async-std for dev-dependencies, instead of published one +async-lock = { path = "." } [target.'cfg(target_family = "wasm")'.dev-dependencies] wasm-bindgen-test = "0.3" diff --git a/src/lib.rs b/src/lib.rs index 8a62869..462ee01 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ //! //! * [`Barrier`] - enables tasks to synchronize all together at the same time. //! * [`Mutex`] - a mutual exclusion lock. +//! * [`RefCell`] - a single thread reader-writer lock, allowing any number of readers or a single writer. //! * [`RwLock`] - a reader-writer lock, allowing any number of readers or a single writer. //! * [`Semaphore`] - limits the number of concurrent operations. //! @@ -90,12 +91,14 @@ macro_rules! const_fn { mod barrier; mod mutex; mod once_cell; +mod refcell; mod rwlock; mod semaphore; pub use barrier::{Barrier, BarrierWaitResult}; pub use mutex::{Mutex, MutexGuard, MutexGuardArc}; pub use once_cell::OnceCell; +pub use refcell::RefCell; pub use rwlock::{ RwLock, RwLockReadGuard, RwLockReadGuardArc, RwLockUpgradableReadGuard, RwLockUpgradableReadGuardArc, RwLockWriteGuard, RwLockWriteGuardArc, diff --git a/src/refcell.rs b/src/refcell.rs new file mode 100644 index 0000000..ec93e2d --- /dev/null +++ b/src/refcell.rs @@ -0,0 +1,262 @@ +//! source of inspiration : +//! + +mod raw; + +use core::{ + cell::UnsafeCell, + fmt, + future::Future, + ops::{Deref, DerefMut}, + ptr::NonNull, + task::Poll, +}; +use raw::{RawBorrow, RawBorrowMut, RawRefCell}; + +/// A single thread async mutable memory location. +/// +/// This type of lock allows multiple readers or one writer at any point in time. +/// It's can be used with [`LocalExecutor`]. +/// +/// [`LocalExecutor`]: https://docs.rs/async-executor/latest/async_executor/struct.LocalExecutor.html +pub struct RefCell { + //borrow: Cell, + raw: RawRefCell, + value: UnsafeCell, +} +impl RefCell { + /// Create a new RefCell. + pub fn new(value: T) -> RefCell { + Self { + raw: RawRefCell::new(), + //borrow: Cell::new(BorrowFlag::Available), + value: UnsafeCell::new(value), + } + } +} + +impl RefCell { + //TODO: + // pub fn into_inner(self) -> T { + // self.value.into_inner() + // } + + //pub fn replace(&self, t: T) -> T {} + //pub fn replace_with(&self, f: F) -> T + //pub fn swap(&self, other: &RefCell) + + /// Acquire a borrow on the wrapped value. + /// + /// This wait the end of the current borrow_mut. + /// + /// Returns a guard that releases the lock when dropped. + pub fn borrow(&self) -> Borrow<'_, T> { + Borrow::new(self.raw.borrow(), self.value.get()) + } + + /// Tried to acquire a borrow on the wrapped value. + /// + /// Return `None` instead of wait if a borrow_mut is in progress. + pub fn try_borrow(&self) -> Option> { + if self.raw.try_borrow() { + Some(Ref { + value: self.value.get(), + lock: &self.raw, + }) + } else { + None + } + } + + /// Mutably borrows the wrapped value. + /// `await` if the value is currently borrowed. For non `async` variant, use [`Self::try_borrow_mut`]. + /// TODO: + pub fn borrow_mut(&self) -> BorrowMut<'_, T> { + BorrowMut::new(self.raw.borrow_mut(), self.value.get()) + } + + /// Tried to acquire a borrow on the wrapped value. + /// + /// Return `None` instead of wait if a borrow_mut is in progress. + pub fn try_borrow_mut(&self) -> Option> { + if self.raw.try_borrow_mut() { + Some(RefMut { + value: self.value.get(), + lock: &self.raw, + }) + } else { + None + } + } +} + +impl fmt::Debug for RefCell { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + struct Locked; + impl fmt::Debug for Locked { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("") + } + } + + match self.try_borrow() { + None => f.debug_struct("RefCell").field("value", &Locked).finish(), + Some(guard) => f.debug_struct("RefCell").field("value", &&*guard).finish(), + } + } +} +pub struct Borrow<'b, T: ?Sized> { + value: NonNull, + raw: RawBorrow<'b>, // &'b + //waker: Waker, +} + +impl<'x, T: ?Sized> Borrow<'x, T> { + fn new(raw: RawBorrow<'x>, value: *mut T) -> Self { + let value = unsafe { NonNull::new_unchecked(value) }; + Self { value, raw } + } +} + +impl<'b, T: ?Sized + 'b> Future for Borrow<'b, T> { + type Output = Ref<'b, T>; + + fn poll( + self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> Poll { + if self.raw.try_borrow() { + Poll::Ready(Ref:: { + lock: self.raw.lock, + value: self.value.as_ptr(), + }) + } else { + // set state waiting ? + self.raw.lock.borrow_wake(cx.waker().clone()); + Poll::Pending + } + } +} + +impl fmt::Debug for Borrow<'_, T> { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("Borrow { .. }") + } +} +pub struct BorrowMut<'b, T: ?Sized> { + value: NonNull, + raw: RawBorrowMut<'b>, // &'b + //waker: Waker, +} + +impl<'x, T: ?Sized> BorrowMut<'x, T> { + fn new(raw: RawBorrowMut<'x>, value: *mut T) -> Self { + let value = unsafe { NonNull::new_unchecked(value) }; + Self { value, raw } + } +} + +impl<'b, T: ?Sized + 'b> Future for BorrowMut<'b, T> { + type Output = RefMut<'b, T>; + + fn poll( + self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> Poll { + if self.raw.try_borrow_mut() { + Poll::Ready(RefMut:: { + lock: self.raw.lock, + value: self.value.as_ptr(), + }) + } else { + // set state waiting ? + self.raw.lock.borrow_mut_wake(cx.waker().clone()); + Poll::Pending + } + } +} + +impl fmt::Debug for BorrowMut<'_, T> { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("BorrowMut { .. }") + } +} + +/// Wraps a borrowed reference to a value in a `RefCell` box. +/// A wrapper type for an immutably borrowed value from a `RefCell`. +/// +/// See the [module-level documentation](self) for more. +pub struct Ref<'a, T: ?Sized + 'a> { + lock: &'a RawRefCell, + value: *const T, +} + +impl Drop for Ref<'_, T> { + #[inline] + fn drop(&mut self) { + self.lock.borrow_unlock(); + } +} +impl fmt::Debug for Ref<'_, T> { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} +impl fmt::Display for Ref<'_, T> { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} +impl Deref for Ref<'_, T> { + type Target = T; + + #[inline] + fn deref(&self) -> &T { + unsafe { &*self.value } + } +} + +/// A wrapper type for a mutably borrowed value from a RefCell. +/// +/// See the [module-level documentation](self) for more. +pub struct RefMut<'a, T: ?Sized + 'a> { + lock: &'a RawRefCell, + value: *mut T, +} + +impl Drop for RefMut<'_, T> { + #[inline] + fn drop(&mut self) { + self.lock.borrow_mut_unlock(); + } +} +impl fmt::Debug for RefMut<'_, T> { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} +impl fmt::Display for RefMut<'_, T> { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} +impl Deref for RefMut<'_, T> { + type Target = T; + + #[inline] + fn deref(&self) -> &T { + unsafe { &*self.value } + } +} +impl DerefMut for RefMut<'_, T> { + #[inline] + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.value } + } +} diff --git a/src/refcell/raw.rs b/src/refcell/raw.rs new file mode 100644 index 0000000..3438967 --- /dev/null +++ b/src/refcell/raw.rs @@ -0,0 +1,152 @@ +use core::{ + cell::{Cell, RefCell}, + task::Waker, +}; +use std::collections::VecDeque; + +const BORROW_MUT_BIT: usize = 1; +const ONE_BORROW: usize = 2; + +pub(super) struct RawRefCell { + /// Event triggered when the last borrow is dropped. + borrow_wakes: RefCell>, + + /// Event triggered when the borrow_mut is dropped. + borrow_mut_wakes: RefCell>, + + /// Current state of the lock. + /// + /// The least significant bit (`WRITER_BIT`) is set to 1 when a writer is holding the lock or + /// trying to acquire it. + /// + /// The upper bits contain the number of currently active borrows. Each active reader + /// increments the state by `ONE_BORROW`. + state: Cell, +} + +impl RawRefCell { + pub(super) const fn new() -> Self { + Self { + borrow_wakes: RefCell::new(VecDeque::new()), + borrow_mut_wakes: RefCell::new(VecDeque::new()), + state: Cell::new(0), + } + } + + pub(super) fn borrow(&self) -> RawBorrow<'_> { + RawBorrow { lock: self } + } + + pub(super) fn borrow_wake(&self, waker: Waker) { + self.borrow_wakes.borrow_mut().push_back(waker); + } + + pub(super) fn try_borrow(&self) -> bool { + let state = self.state.get(); + + // If there's a mutable borrow holding the lock or attempting to acquire it, we cannot acquire + // a read lock here. + if state & BORROW_MUT_BIT != 0 { + return false; + } + + // Make sure the number of borrows doesn't overflow. + if state > isize::MAX as usize { + crate::abort(); + } + + // Increment the number of readers. + // TODO ? ok if self.state.update(|val| val + ONE_BORROW ) == state + ONE_BORROW; + self.state.set(state + ONE_BORROW); + + true + } + + pub(super) fn borrow_unlock(&self) { + // Decrement the number of borrows. + let state = self.state.get(); + self.state.set(state - ONE_BORROW); + + if self.state.get() == 0 { + // If this was the last reader, wake up the next borrow the "no borrows" event. + if let Some(borrow_mut_wake) = self.borrow_mut_wakes.borrow_mut().pop_front() { + borrow_mut_wake.wake(); + } + } + } + + pub(super) fn borrow_mut(&self) -> RawBorrowMut<'_> { + RawBorrowMut { lock: self } + } + + pub(super) fn borrow_mut_wake(&self, waker: Waker) { + self.borrow_mut_wakes.borrow_mut().push_back(waker); + } + + pub(super) fn try_borrow_mut(&self) -> bool { + let state = self.state.get(); + + // If there's a mutable borrow holding the lock or attempting to acquire it, we cannot acquire + // a borrow_mut lock here. + if state & BORROW_MUT_BIT != 0 { + return false; + } + + // If there's at least one simple borrow, we cannot acquire + // a borrow_mut lock here. + if state & !BORROW_MUT_BIT != 0 { + return false; + } + + // Increment the number of readers. + // TODO ? ok if self.state.update(|val| val & BORROW_MUT_BIT ) == BORROW_MUT_BIT; + self.state.set(BORROW_MUT_BIT); + + true + } + + pub(super) fn borrow_mut_unlock(&self) { + if let Some(borrow_mut_wake) = self.borrow_mut_wakes.borrow_mut().pop_front() { + // If there is a waiting borrow_mut, wake up + borrow_mut_wake.wake(); + } else { + // Only remove Borrow mut bit, if there is no other task waiting for it. + let new_state = self.state.get() & !BORROW_MUT_BIT; + self.state.set(new_state); + // else, wakeup borrow_wake + let mut borrow_wakes = self.borrow_wakes.borrow_mut(); + + borrow_wakes + .drain(0..) + .for_each(|waiting_borrow| waiting_borrow.wake()); + } + } +} + +pub(super) struct RawBorrow<'a> { + // The lock that is being acquired. + pub(super) lock: &'a RawRefCell, + // ??? // Making this type `!Unpin` enables future optimizations. + // #[pin] + // _pin: PhantomPinned +} + +impl RawBorrow<'_> { + pub fn try_borrow(&self) -> bool { + self.lock.try_borrow() + } +} + +pub(super) struct RawBorrowMut<'a> { + // The lock that is being acquired. + pub(super) lock: &'a RawRefCell, + // ??? // Making this type `!Unpin` enables future optimizations. + // #[pin] + // _pin: PhantomPinned +} + +impl RawBorrowMut<'_> { + pub fn try_borrow_mut(&self) -> bool { + self.lock.try_borrow_mut() + } +} diff --git a/tests/refcell.rs b/tests/refcell.rs new file mode 100644 index 0000000..9244557 --- /dev/null +++ b/tests/refcell.rs @@ -0,0 +1,51 @@ +use async_lock::RefCell; +use async_std::task::sleep; +use futures_lite::future; +use std::time::Duration; + +#[test] +fn test_refcell() { + let vec = RefCell::new(Vec::new()); + + // insert entry in vec + let insert_fut = async { + let mut idx = 1; + loop { + { + let mut vec = vec.borrow_mut().await; + let new_val = (((idx + 10) * 4) + idx + 3) / 3; + sleep(Duration::from_micros(500)).await; + println!("Insert new val :{new_val}"); + vec.push(new_val); + } + sleep(Duration::from_secs(2)).await; + idx += 1; + if idx == 10 { + sleep(Duration::from_secs(2)).await; + break; + } + } + + sleep(Duration::from_secs(2)).await; + vec.borrow_mut().await.clear(); + }; + + // print the content of vec + let print_fut = async { + sleep(Duration::from_micros(100)).await; + loop { + { + let vec = vec.borrow().await; + if vec.is_empty() { + return; + } + println!("Vec :"); + vec.iter() + .enumerate() + .for_each(|(idx, val)| println!("\t{idx}: {val}")); + } + sleep(Duration::from_micros(1500)).await; + } + }; + future::block_on(future::zip(insert_fut, print_fut)); +}