From acc7ade1d97121e50c01c36449ecf08cf3a3528a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20L=C3=B6thberg?= Date: Wed, 16 Aug 2023 11:37:12 +0200 Subject: [PATCH] Make remaining core channel spinlock opt-in MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Johannes Löthberg --- src/async.rs | 1 + src/lib.rs | 68 +++++++++++++++++++++++++++++++++++++-------------- src/select.rs | 1 + 3 files changed, 52 insertions(+), 18 deletions(-) diff --git a/src/async.rs b/src/async.rs index fdd6bad..fae44d4 100644 --- a/src/async.rs +++ b/src/async.rs @@ -10,6 +10,7 @@ use std::{ use crate::*; use futures_core::{stream::{Stream, FusedStream}, future::FusedFuture}; use futures_sink::Sink; +use spin1::Mutex as Spinlock; struct AsyncSignal { waker: Spinlock, diff --git a/src/lib.rs b/src/lib.rs index f388399..c9bb3ee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,6 +47,7 @@ use std::{ fmt, }; +#[cfg(feature = "spin")] use spin1::{Mutex as Spinlock, MutexGuard as SpinlockGuard}; use crate::signal::{Signal, SyncSignal}; @@ -256,47 +257,78 @@ enum TryRecvTimeoutError { } // TODO: Investigate some sort of invalidation flag for timeouts +#[cfg(feature = "spin")] struct Hook(Option>>, S); +#[cfg(not(feature = "spin"))] +struct Hook(Option>>, S); + +#[cfg(feature = "spin")] impl Hook { - pub fn slot(msg: Option, signal: S) -> Arc where S: Sized { + pub fn slot(msg: Option, signal: S) -> Arc + where + S: Sized, + { Arc::new(Self(Some(Spinlock::new(msg)), signal)) } - pub fn trigger(signal: S) -> Arc where S: Sized { - Arc::new(Self(None, signal)) + fn lock(&self) -> Option>> { + self.0.as_ref().map(|s| s.lock()) } +} - pub fn signal(&self) -> &S { - &self.1 +#[cfg(not(feature = "spin"))] +impl Hook { + pub fn slot(msg: Option, signal: S) -> Arc + where + S: Sized, + { + Arc::new(Self(Some(Mutex::new(msg)), signal)) } - pub fn fire_nothing(&self) -> bool { - self.signal().fire() + fn lock(&self) -> Option>> { + self.0.as_ref().map(|s| s.lock().unwrap()) } +} +impl Hook { pub fn fire_recv(&self) -> (T, &S) { - let msg = self.0.as_ref().unwrap().lock().take().unwrap(); + let msg = self.lock().unwrap().take().unwrap(); (msg, self.signal()) } pub fn fire_send(&self, msg: T) -> (Option, &S) { - let ret = match &self.0 { - Some(hook) => { - *hook.lock() = Some(msg); + let ret = match self.lock() { + Some(mut lock) => { + *lock = Some(msg); None - }, + } None => Some(msg), }; (ret, self.signal()) } pub fn is_empty(&self) -> bool { - self.0.as_ref().map(|s| s.lock().is_none()).unwrap_or(true) + self.lock().map(|s| s.is_none()).unwrap_or(true) } pub fn try_take(&self) -> Option { - self.0.as_ref().and_then(|s| s.lock().take()) + self.lock().unwrap().take() + } + + pub fn trigger(signal: S) -> Arc + where + S: Sized, + { + Arc::new(Self(None, signal)) + } + + pub fn signal(&self) -> &S { + &self.1 + } + + pub fn fire_nothing(&self) -> bool { + self.signal().fire() } } @@ -304,7 +336,7 @@ impl Hook { pub fn wait_recv(&self, abort: &AtomicBool) -> Option { loop { let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg - let msg = self.0.as_ref().unwrap().lock().take(); + let msg = self.lock().unwrap().take(); if let Some(msg) = msg { break Some(msg); } else if disconnected { @@ -319,7 +351,7 @@ impl Hook { pub fn wait_deadline_recv(&self, abort: &AtomicBool, deadline: Instant) -> Result { loop { let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg - let msg = self.0.as_ref().unwrap().lock().take(); + let msg = self.lock().unwrap().take(); if let Some(msg) = msg { break Ok(msg); } else if disconnected { @@ -335,7 +367,7 @@ impl Hook { pub fn wait_send(&self, abort: &AtomicBool) { loop { let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg - if disconnected || self.0.as_ref().unwrap().lock().is_none() { + if disconnected || self.lock().unwrap().is_none() { break; } @@ -347,7 +379,7 @@ impl Hook { pub fn wait_deadline_send(&self, abort: &AtomicBool, deadline: Instant) -> Result<(), bool> { loop { let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg - if self.0.as_ref().unwrap().lock().is_none() { + if self.lock().unwrap().is_none() { break Ok(()); } else if disconnected { break Err(false); diff --git a/src/select.rs b/src/select.rs index bf6bc9c..cef15aa 100644 --- a/src/select.rs +++ b/src/select.rs @@ -1,6 +1,7 @@ //! Types that permit waiting upon multiple blocking operations using the [`Selector`] interface. use crate::*; +use spin1::Mutex as Spinlock; use std::{any::Any, marker::PhantomData}; #[cfg(feature = "eventual-fairness")]