diff --git a/Cargo.toml b/Cargo.toml index fbb788f..013912d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,8 +15,8 @@ categories = ["asynchronous", "concurrency"] exclude = ["/.*"] [dependencies] -event-listener = { version = "3.0.0", default-features = false } -event-listener-strategy = { version = "0.3.0", default-features = false } +event-listener = { version = "4.0.0", default-features = false } +event-listener-strategy = { version = "0.4.0", default-features = false } pin-project-lite = "0.2.11" [features] diff --git a/src/barrier.rs b/src/barrier.rs index f488929..519838c 100644 --- a/src/barrier.rs +++ b/src/barrier.rs @@ -82,7 +82,7 @@ impl Barrier { BarrierWait::_new(BarrierWaitInner { barrier: self, lock: Some(self.state.lock()), - evl: EventListener::new(&self.event), + evl: EventListener::new(), state: WaitState::Initial, }) } @@ -200,7 +200,7 @@ impl EventListenerFuture for BarrierWaitInner<'_> { if state.count < this.barrier.n { // We need to wait for the event. - this.evl.as_mut().listen(); + this.evl.as_mut().listen(&this.barrier.event); *this.state = WaitState::Waiting { local_gen }; } else { // We are the last one. @@ -233,7 +233,7 @@ impl EventListenerFuture for BarrierWaitInner<'_> { if *local_gen == state.generation_id && state.count < this.barrier.n { // We need to wait for the event again. - this.evl.as_mut().listen(); + this.evl.as_mut().listen(&this.barrier.event); *this.state = WaitState::Waiting { local_gen: *local_gen, }; diff --git a/src/mutex.rs b/src/mutex.rs index 5b7fcbd..25890e5 100644 --- a/src/mutex.rs +++ b/src/mutex.rs @@ -478,10 +478,7 @@ impl>> AcquireSlow { #[cold] fn new(mutex: B) -> Self { // Create a new instance of the listener. - let listener = { - let mutex = Borrow::>::borrow(&mutex); - EventListener::new(&mutex.lock_ops) - }; + let listener = { EventListener::new() }; AcquireSlow { mutex: Some(mutex), @@ -532,7 +529,7 @@ impl>> EventListenerFuture for AcquireSlow loop { // Start listening for events. if !this.listener.is_listening() { - this.listener.as_mut().listen(); + this.listener.as_mut().listen(&mutex.lock_ops); // Try locking if nobody is being starved. match mutex @@ -596,7 +593,7 @@ impl>> EventListenerFuture for AcquireSlow loop { if !this.listener.is_listening() { // Start listening for events. - this.listener.as_mut().listen(); + this.listener.as_mut().listen(&mutex.lock_ops); // Try locking if nobody else is being starved. match mutex diff --git a/src/once_cell.rs b/src/once_cell.rs index dc3d69e..4aaeb9f 100644 --- a/src/once_cell.rs +++ b/src/once_cell.rs @@ -274,9 +274,9 @@ impl OnceCell { } // Slow path: wait for the value to be initialized. - let listener = EventListener::new(&self.passive_waiters); + let listener = EventListener::new(); pin!(listener); - listener.as_mut().listen(); + listener.as_mut().listen(&self.passive_waiters); // Try again. if let Some(value) = self.get() { @@ -329,9 +329,9 @@ impl OnceCell { } // Slow path: wait for the value to be initialized. - let listener = EventListener::new(&self.passive_waiters); + let listener = EventListener::new(); pin!(listener); - listener.as_mut().listen(); + listener.as_mut().listen(&self.passive_waiters); // Try again. if let Some(value) = self.get() { @@ -591,7 +591,7 @@ impl OnceCell { strategy: &mut impl for<'a> Strategy<'a>, ) -> Result<(), E> { // The event listener we're currently waiting on. - let event_listener = EventListener::new(&self.active_initializers); + let event_listener = EventListener::new(); pin!(event_listener); let mut closure = Some(closure); @@ -614,7 +614,7 @@ impl OnceCell { if event_listener.is_listening() { strategy.wait(event_listener.as_mut()).await; } else { - event_listener.as_mut().listen(); + event_listener.as_mut().listen(&self.active_initializers); } } State::Uninitialized => { diff --git a/src/rwlock/raw.rs b/src/rwlock/raw.rs index df08edb..88583c8 100644 --- a/src/rwlock/raw.rs +++ b/src/rwlock/raw.rs @@ -86,7 +86,7 @@ impl RawRwLock { RawRead { lock: self, state: self.state.load(Ordering::Acquire), - listener: EventListener::new(&self.no_writer), + listener: EventListener::new(), } } @@ -161,7 +161,7 @@ impl RawRwLock { pub(super) fn write(&self) -> RawWrite<'_> { RawWrite { lock: self, - no_readers: EventListener::new(&self.no_readers), + no_readers: EventListener::new(), state: WriteState::Acquiring { lock: self.mutex.lock(), }, @@ -193,7 +193,7 @@ impl RawRwLock { RawUpgrade { lock: Some(self), - listener: EventListener::new(&self.no_readers), + listener: EventListener::new(), } } @@ -328,7 +328,7 @@ impl<'a> EventListenerFuture for RawRead<'a> { } else { // Start listening for "no writer" events. let load_ordering = if !this.listener.is_listening() { - this.listener.as_mut().listen(); + this.listener.as_mut().listen(&this.lock.no_writer); // Make sure there really is no writer. Ordering::SeqCst @@ -473,7 +473,7 @@ impl<'a> EventListenerFuture for RawWrite<'a> { } // Start waiting for the readers to finish. - this.no_readers.as_mut().listen(); + this.no_readers.as_mut().listen(&this.lock.no_readers); this.state.as_mut().set(WriteState::WaitingReaders); } @@ -494,7 +494,7 @@ impl<'a> EventListenerFuture for RawWrite<'a> { // Wait for the readers to finish. if !this.no_readers.is_listening() { // Register a listener. - this.no_readers.as_mut().listen(); + this.no_readers.as_mut().listen(&this.lock.no_readers); } else { // Wait for the readers to finish. ready!(strategy.poll(this.no_readers.as_mut(), cx)); @@ -559,7 +559,7 @@ impl<'a> EventListenerFuture for RawUpgrade<'a> { // If there are readers, wait for them to finish. if !this.listener.is_listening() { // Start listening for "no readers" events. - this.listener.as_mut().listen(); + this.listener.as_mut().listen(&lock.no_readers); } else { // Wait for the readers to finish. ready!(strategy.poll(this.listener.as_mut(), cx)); diff --git a/src/semaphore.rs b/src/semaphore.rs index af41cae..9015d05 100644 --- a/src/semaphore.rs +++ b/src/semaphore.rs @@ -88,7 +88,7 @@ impl Semaphore { pub fn acquire(&self) -> Acquire<'_> { Acquire::_new(AcquireInner { semaphore: self, - listener: EventListener::new(&self.event), + listener: EventListener::new(), }) } @@ -176,7 +176,7 @@ impl Semaphore { pub fn acquire_arc(self: &Arc) -> AcquireArc { AcquireArc { semaphore: self.clone(), - listener: EventListener::new(&self.event), + listener: EventListener::new(), } } @@ -245,7 +245,7 @@ impl<'a> EventListenerFuture for AcquireInner<'a> { None => { // Wait on the listener. if !this.listener.is_listening() { - this.listener.as_mut().listen(); + this.listener.as_mut().listen(&this.semaphore.event); } else { ready!(strategy.poll(this.listener.as_mut(), cx)); } @@ -285,7 +285,7 @@ impl Future for AcquireArc { None => { // Wait on the listener. if !this.listener.is_listening() { - this.listener.as_mut().listen(); + this.listener.as_mut().listen(&this.semaphore.event); } else { ready!(this.listener.as_mut().poll(cx)); } diff --git a/tests/rwlock.rs b/tests/rwlock.rs index 78c8ee5..ab6fc37 100644 --- a/tests/rwlock.rs +++ b/tests/rwlock.rs @@ -117,7 +117,7 @@ fn contention() { let tx = tx.clone(); let rw = rw.clone(); - spawn(async move { + let _spawned = spawn(async move { for _ in 0..M { if fastrand::u32(..N) == 0 { drop(rw.write().await); @@ -151,7 +151,7 @@ fn contention_arc() { let tx = tx.clone(); let rw = rw.clone(); - spawn(async move { + let _spawned = spawn(async move { for _ in 0..M { if fastrand::u32(..N) == 0 { drop(rw.write_arc().await); @@ -177,7 +177,7 @@ fn writer_and_readers() { let (tx, rx) = async_channel::unbounded(); // Spawn a writer task. - spawn({ + let _spawned = spawn({ let lock = lock.clone(); async move { let mut lock = lock.write().await; @@ -223,7 +223,7 @@ fn writer_and_readers_arc() { let (tx, rx) = async_channel::unbounded(); // Spawn a writer task. - spawn({ + let _spawned = spawn({ let lock = lock.clone(); async move { let mut lock = lock.write_arc().await;