From 7ba1731e807741911b173cab6f18785d0cf48070 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Sun, 10 Mar 2024 23:04:18 +0800 Subject: [PATCH] support `no_std` --- Cargo.toml | 15 +- src/future.rs | 23 ++- src/lib.rs | 410 ++++++++++++++++++++++++++------------------------ src/tokio.rs | 19 ++- 4 files changed, 246 insertions(+), 221 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9319909..744ddca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,30 +6,29 @@ homepage = "https://github.com/al8n/wg" repository = "https://github.com/al8n/wg.git" documentation = "https://docs.rs/wg/" readme = "README.md" -version = "0.7.4" +version = "0.7.5" license = "MIT OR Apache-2.0" keywords = ["waitgroup", "async", "sync", "notify", "wake"] categories = ["asynchronous", "concurrency", "data-structures"] edition = "2021" [features] -default = [] -full = ["triomphe", "parking_lot"] +default = ["std", "parking_lot", "triomphe"] +std = ["triomphe?/default", "event-listener?/default", "futures-core?/default", "tokio?/rt"] triomphe = ["dep:triomphe"] parking_lot = ["dep:parking_lot"] future = ["event-listener", "pin-project-lite"] - tokio = ["dep:tokio", "futures-core", "pin-project-lite"] [dependencies] parking_lot = { version = "0.12", optional = true } -triomphe = { version = "0.1", optional = true } -event-listener = { version = "5", optional = true } +triomphe = { version = "0.1", optional = true, default-features = false } +event-listener = { version = "5", optional = true, default-features = false } pin-project-lite = { version = "0.2", optional = true } -tokio = { version = "1", default-features = false, optional = true, features = ["sync", "rt"] } -futures-core = { version = "0.3", optional = true } +tokio = { version = "1", optional = true, default-features = false, features = ["sync"] } +futures-core = { version = "0.3", default-features = false, optional = true } [dev-dependencies] tokio = { version = "1", features = ["full"] } diff --git a/src/future.rs b/src/future.rs index 23eab00..c3c3ec0 100644 --- a/src/future.rs +++ b/src/future.rs @@ -1,12 +1,17 @@ -use super::*; use event_listener::{Event, EventListener}; -use std::{ +use core::{ pin::Pin, sync::atomic::{AtomicUsize, Ordering}, task::{Context, Poll}, }; +#[cfg(feature = "std")] +use std::sync::Arc; + +#[cfg(not(feature = "std"))] +use alloc::sync::Arc; + #[derive(Debug)] struct AsyncInner { counter: AtomicUsize, @@ -89,8 +94,8 @@ impl Clone for AsyncWaitGroup { } } -impl std::fmt::Debug for AsyncWaitGroup { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl core::fmt::Debug for AsyncWaitGroup { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("AsyncWaitGroup") .field("counter", &self.inner.counter) .finish() @@ -199,7 +204,7 @@ impl AsyncWaitGroup { WaitGroupFuture { inner: self, notified: self.inner.event.listen(), - _pin: std::marker::PhantomPinned, + _pin: core::marker::PhantomPinned, } } @@ -233,9 +238,11 @@ impl AsyncWaitGroup { /// wg.block_wait(spawner); /// # }) /// ``` + #[cfg(feature = "std")] + #[cfg_attr(docsrs, doc(cfg(feature = "std")))] pub fn block_wait(&self, spawner: S) where - S: FnOnce(Pin + Send + 'static>>), + S: FnOnce(Pin + Send + 'static>>), { let this = self.clone(); let (tx, rx) = std::sync::mpsc::channel(); @@ -258,11 +265,11 @@ pin_project_lite::pin_project! { #[pin] notified: EventListener, #[pin] - _pin: std::marker::PhantomPinned, + _pin: core::marker::PhantomPinned, } } -impl<'a> std::future::Future for WaitGroupFuture<'a> { +impl<'a> core::future::Future for WaitGroupFuture<'a> { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/src/lib.rs b/src/lib.rs index cd234a6..eab6fce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,10 +18,14 @@ * limitations under the License. */ #![doc = include_str!("../README.md")] +#![cfg_attr(not(feature = "std"), no_std)] #![deny(missing_docs)] #![cfg_attr(docsrs, feature(doc_cfg))] #![cfg_attr(docsrs, allow(unused_attributes))] +#[cfg(not(feature = "std"))] +extern crate alloc; + /// [`AsyncWaitGroup`](crate::future::AsyncWaitGroup) for `futures`. #[cfg(feature = "future")] #[cfg_attr(docsrs, doc(cfg(feature = "future")))] @@ -32,247 +36,255 @@ pub mod future; #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] pub mod tokio; -trait Mu { - type Guard<'a> - where - Self: 'a; - fn lock_me(&self) -> Self::Guard<'_>; -} - -#[cfg(feature = "parking_lot")] -impl Mu for parking_lot::Mutex { - type Guard<'a> = parking_lot::MutexGuard<'a, T> where Self: 'a; +#[cfg(feature = "std")] +#[cfg_attr(docsrs, doc(cfg(feature = "std")))] +pub use sync::*; - fn lock_me(&self) -> Self::Guard<'_> { - self.lock() +#[cfg(feature = "std")] +#[cfg_attr(docsrs, doc(cfg(feature = "std")))] +mod sync { + trait Mu { + type Guard<'a> + where + Self: 'a; + fn lock_me(&self) -> Self::Guard<'_>; } -} -#[cfg(not(feature = "parking_lot"))] -impl Mu for std::sync::Mutex { - type Guard<'a> = std::sync::MutexGuard<'a, T> where Self: 'a; + #[cfg(feature = "parking_lot")] + impl Mu for parking_lot::Mutex { + type Guard<'a> = parking_lot::MutexGuard<'a, T> where Self: 'a; - fn lock_me(&self) -> Self::Guard<'_> { - self.lock().unwrap() - } -} - -#[cfg(feature = "parking_lot")] -use parking_lot::{Condvar, Mutex}; -#[cfg(not(feature = "triomphe"))] -use std::sync::Arc; -#[cfg(not(feature = "parking_lot"))] -use std::sync::{Condvar, Mutex}; -#[cfg(feature = "triomphe")] -use triomphe::Arc; - -struct Inner { - cvar: Condvar, - count: Mutex, -} - -/// A WaitGroup waits for a collection of threads to finish. -/// The main thread calls [`add`] to set the number of -/// thread to wait for. Then each of the goroutines -/// runs and calls Done when finished. At the same time, -/// Wait can be used to block until all goroutines have finished. -/// -/// A WaitGroup must not be copied after first use. -/// -/// # Example -/// -/// ```rust -/// use wg::WaitGroup; -/// use std::sync::Arc; -/// use std::sync::atomic::{AtomicUsize, Ordering}; -/// use std::time::Duration; -/// use std::thread::{spawn, sleep}; -/// -/// let wg = WaitGroup::new(); -/// let ctr = Arc::new(AtomicUsize::new(0)); -/// -/// for _ in 0..5 { -/// let ctrx = ctr.clone(); -/// let t_wg = wg.add(1); -/// spawn(move || { -/// // mock some time consuming task -/// sleep(Duration::from_millis(50)); -/// ctrx.fetch_add(1, Ordering::Relaxed); -/// -/// // mock task is finished -/// t_wg.done(); -/// }); -/// } -/// -/// wg.wait(); -/// assert_eq!(ctr.load(Ordering::Relaxed), 5); -/// ``` -/// -/// [`wait`]: struct.WaitGroup.html#method.wait -/// [`add`]: struct.WaitGroup.html#method.add -pub struct WaitGroup { - inner: Arc, -} - -impl Default for WaitGroup { - fn default() -> Self { - Self { - inner: Arc::new(Inner { - cvar: Condvar::new(), - count: Mutex::new(0), - }), + fn lock_me(&self) -> Self::Guard<'_> { + self.lock() } } -} -impl From for WaitGroup { - fn from(count: usize) -> Self { - Self { - inner: Arc::new(Inner { - cvar: Condvar::new(), - count: Mutex::new(count), - }), - } - } -} + #[cfg(not(feature = "parking_lot"))] + impl Mu for std::sync::Mutex { + type Guard<'a> = std::sync::MutexGuard<'a, T> where Self: 'a; -impl Clone for WaitGroup { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), + fn lock_me(&self) -> Self::Guard<'_> { + self.lock().unwrap() } } -} -impl std::fmt::Debug for WaitGroup { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let count = self.inner.count.lock_me(); - f.debug_struct("WaitGroup").field("count", &*count).finish() - } -} + #[cfg(feature = "parking_lot")] + use parking_lot::{Condvar, Mutex}; + #[cfg(not(feature = "triomphe"))] + use std::sync::Arc; + #[cfg(not(feature = "parking_lot"))] + use std::sync::{Condvar, Mutex}; + #[cfg(feature = "triomphe")] + use triomphe::Arc; -impl WaitGroup { - /// Creates a new wait group and returns the single reference to it. - /// - /// # Examples - /// - /// ``` - /// use wg::WaitGroup; - /// - /// let wg = WaitGroup::new(); - /// ``` - pub fn new() -> Self { - Self::default() + struct Inner { + cvar: Condvar, + count: Mutex, } - /// Adds delta to the WaitGroup counter. - /// If the counter becomes zero, all threads blocked on [`wait`] are released. + /// A WaitGroup waits for a collection of threads to finish. + /// The main thread calls [`add`] to set the number of + /// thread to wait for. Then each of the goroutines + /// runs and calls Done when finished. At the same time, + /// Wait can be used to block until all goroutines have finished. /// - /// Note that calls with a delta that occur when the counter is zero - /// must happen before a Wait. - /// Typically this means the calls to add should execute before the statement - /// creating the thread or other event to be waited for. - /// If a `WaitGroup` is reused to [`wait`] for several independent sets of events, - /// new `add` calls must happen after all previous [`wait`] calls have returned. + /// A WaitGroup must not be copied after first use. /// /// # Example + /// /// ```rust /// use wg::WaitGroup; + /// use std::sync::Arc; + /// use std::sync::atomic::{AtomicUsize, Ordering}; + /// use std::time::Duration; + /// use std::thread::{spawn, sleep}; /// /// let wg = WaitGroup::new(); + /// let ctr = Arc::new(AtomicUsize::new(0)); /// - /// wg.add(3); - /// (0..3).for_each(|_| { - /// let t_wg = wg.clone(); - /// std::thread::spawn(move || { - /// // do some time consuming work + /// for _ in 0..5 { + /// let ctrx = ctr.clone(); + /// let t_wg = wg.add(1); + /// spawn(move || { + /// // mock some time consuming task + /// sleep(Duration::from_millis(50)); + /// ctrx.fetch_add(1, Ordering::Relaxed); + /// + /// // mock task is finished /// t_wg.done(); /// }); - /// }); + /// } /// /// wg.wait(); + /// assert_eq!(ctr.load(Ordering::Relaxed), 5); /// ``` /// - /// [`wait`]: struct.AsyncWaitGroup.html#method.wait - pub fn add(&self, num: usize) -> Self { - let mut ctr = self.inner.count.lock_me(); + /// [`wait`]: struct.WaitGroup.html#method.wait + /// [`add`]: struct.WaitGroup.html#method.add + pub struct WaitGroup { + inner: Arc, + } - *ctr += num; - Self { - inner: self.inner.clone(), + impl Default for WaitGroup { + fn default() -> Self { + Self { + inner: Arc::new(Inner { + cvar: Condvar::new(), + count: Mutex::new(0), + }), + } } } - /// done decrements the WaitGroup counter by one. - /// - /// # Example - /// - /// ```rust - /// use wg::WaitGroup; - /// use std::thread; - /// - /// let wg = WaitGroup::new(); - /// wg.add(1); - /// let t_wg = wg.clone(); - /// thread::spawn(move || { - /// // do some time consuming task - /// t_wg.done() - /// }); - /// - /// ``` - pub fn done(&self) { - let mut val = self.inner.count.lock_me(); + impl From for WaitGroup { + fn from(count: usize) -> Self { + Self { + inner: Arc::new(Inner { + cvar: Condvar::new(), + count: Mutex::new(count), + }), + } + } + } - *val = if val.eq(&1) { - self.inner.cvar.notify_all(); - 0 - } else if val.eq(&0) { - 0 - } else { - *val - 1 - }; + impl Clone for WaitGroup { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } } - /// waitings return how many jobs are waiting. - pub fn waitings(&self) -> usize { - *self.inner.count.lock_me() + impl std::fmt::Debug for WaitGroup { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let count = self.inner.count.lock_me(); + f.debug_struct("WaitGroup").field("count", &*count).finish() + } } - /// wait blocks until the WaitGroup counter is zero. - /// - /// # Example - /// - /// ```rust - /// use wg::WaitGroup; - /// use std::thread; - /// - /// let wg = WaitGroup::new(); - /// wg.add(1); - /// let t_wg = wg.clone(); - /// thread::spawn(move || { - /// // do some time consuming task - /// t_wg.done() - /// }); - /// - /// // wait other thread completes - /// wg.wait(); - /// ``` - pub fn wait(&self) { - let mut ctr = self.inner.count.lock_me(); + impl WaitGroup { + /// Creates a new wait group and returns the single reference to it. + /// + /// # Examples + /// + /// ``` + /// use wg::WaitGroup; + /// + /// let wg = WaitGroup::new(); + /// ``` + pub fn new() -> Self { + Self::default() + } + + /// Adds delta to the WaitGroup counter. + /// If the counter becomes zero, all threads blocked on [`wait`] are released. + /// + /// Note that calls with a delta that occur when the counter is zero + /// must happen before a Wait. + /// Typically this means the calls to add should execute before the statement + /// creating the thread or other event to be waited for. + /// If a `WaitGroup` is reused to [`wait`] for several independent sets of events, + /// new `add` calls must happen after all previous [`wait`] calls have returned. + /// + /// # Example + /// ```rust + /// use wg::WaitGroup; + /// + /// let wg = WaitGroup::new(); + /// + /// wg.add(3); + /// (0..3).for_each(|_| { + /// let t_wg = wg.clone(); + /// std::thread::spawn(move || { + /// // do some time consuming work + /// t_wg.done(); + /// }); + /// }); + /// + /// wg.wait(); + /// ``` + /// + /// [`wait`]: struct.AsyncWaitGroup.html#method.wait + pub fn add(&self, num: usize) -> Self { + let mut ctr = self.inner.count.lock_me(); - if ctr.eq(&0) { - return; + *ctr += num; + Self { + inner: self.inner.clone(), + } + } + + /// done decrements the WaitGroup counter by one. + /// + /// # Example + /// + /// ```rust + /// use wg::WaitGroup; + /// use std::thread; + /// + /// let wg = WaitGroup::new(); + /// wg.add(1); + /// let t_wg = wg.clone(); + /// thread::spawn(move || { + /// // do some time consuming task + /// t_wg.done() + /// }); + /// + /// ``` + pub fn done(&self) { + let mut val = self.inner.count.lock_me(); + + *val = if val.eq(&1) { + self.inner.cvar.notify_all(); + 0 + } else if val.eq(&0) { + 0 + } else { + *val - 1 + }; } - while *ctr > 0 { - #[cfg(feature = "parking_lot")] - { - self.inner.cvar.wait(&mut ctr); + /// waitings return how many jobs are waiting. + pub fn waitings(&self) -> usize { + *self.inner.count.lock_me() + } + + /// wait blocks until the WaitGroup counter is zero. + /// + /// # Example + /// + /// ```rust + /// use wg::WaitGroup; + /// use std::thread; + /// + /// let wg = WaitGroup::new(); + /// wg.add(1); + /// let t_wg = wg.clone(); + /// thread::spawn(move || { + /// // do some time consuming task + /// t_wg.done() + /// }); + /// + /// // wait other thread completes + /// wg.wait(); + /// ``` + pub fn wait(&self) { + let mut ctr = self.inner.count.lock_me(); + + if ctr.eq(&0) { + return; } - #[cfg(not(feature = "parking_lot"))] - { - ctr = self.inner.cvar.wait(ctr).unwrap(); + while *ctr > 0 { + #[cfg(feature = "parking_lot")] + { + self.inner.cvar.wait(&mut ctr); + } + + #[cfg(not(feature = "parking_lot"))] + { + ctr = self.inner.cvar.wait(ctr).unwrap(); + } } } } diff --git a/src/tokio.rs b/src/tokio.rs index 1ac0116..a4d7ecb 100644 --- a/src/tokio.rs +++ b/src/tokio.rs @@ -1,13 +1,18 @@ -use super::*; use ::tokio::sync::{futures::Notified, Notify}; -use std::{ +use core::{ future::Future, pin::Pin, sync::atomic::{AtomicUsize, Ordering}, task::{Context, Poll}, }; +#[cfg(feature = "std")] +use std::sync::Arc; + +#[cfg(not(feature = "std"))] +use alloc::sync::Arc; + #[derive(Debug)] struct AsyncInner { counter: AtomicUsize, @@ -89,8 +94,8 @@ impl Clone for AsyncWaitGroup { } } -impl std::fmt::Debug for AsyncWaitGroup { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl core::fmt::Debug for AsyncWaitGroup { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("AsyncWaitGroup") .field("counter", &self.inner.counter) .finish() @@ -198,7 +203,7 @@ impl AsyncWaitGroup { WaitGroupFuture { inner: self, notified: self.inner.notify.notified(), - _pin: std::marker::PhantomPinned, + _pin: core::marker::PhantomPinned, } } @@ -228,6 +233,8 @@ impl AsyncWaitGroup { /// wg.block_wait(); /// } /// ``` + #[cfg(feature = "std")] + #[cfg_attr(docsrs, doc(cfg(feature = "std")))] pub fn block_wait(&self) { let this = self.clone(); let (tx, rx) = std::sync::mpsc::channel(); @@ -249,7 +256,7 @@ pin_project_lite::pin_project! { #[pin] notified: Notified<'a>, #[pin] - _pin: std::marker::PhantomPinned, + _pin: core::marker::PhantomPinned, } }