From 85c75a57e0b520a945598d5bf2038ff97300f78e Mon Sep 17 00:00:00 2001 From: amab8901 Date: Sat, 18 Feb 2023 21:00:23 +0100 Subject: [PATCH 1/2] Solve deadlock in Semaphore --- tokio/src/sync/batch_semaphore.rs | 6 ++++- tokio/src/sync/tests/semaphore_batch.rs | 30 +++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index 57493f4bd2c..59a62b92d8f 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -444,6 +444,8 @@ impl Semaphore { } assert_eq!(acquired, 0); + + let mut old_waker = None; // Otherwise, register the waker & enqueue the node. node.waker.with_mut(|waker| { @@ -455,7 +457,7 @@ impl Semaphore { .map(|waker| !waker.will_wake(cx.waker())) .unwrap_or(true) { - *waker = Some(cx.waker().clone()); + old_waker = std::mem::replace(waker, Some(cx.waker().clone())); } }); @@ -468,6 +470,8 @@ impl Semaphore { waiters.queue.push_front(node); } + drop(waiters); + drop(old_waker); Pending } diff --git a/tokio/src/sync/tests/semaphore_batch.rs b/tokio/src/sync/tests/semaphore_batch.rs index d4e35aa7b2f..bc5947099a8 100644 --- a/tokio/src/sync/tests/semaphore_batch.rs +++ b/tokio/src/sync/tests/semaphore_batch.rs @@ -252,3 +252,33 @@ fn cancel_acquire_releases_permits() { assert_eq!(6, s.available_permits()); assert_ok!(s.try_acquire(6)); } + +#[test] +fn release_permits_at_drop() { + use futures::task::ArcWake; + use std::future::Future; + use std::sync::Arc; + + use crate::sync::semaphore::*; + + let sem = Arc::new(Semaphore::new(1)); + + struct ReleaseOnDrop(Option); + + impl ArcWake for ReleaseOnDrop { + fn wake_by_ref(_arc_self: &Arc) {} + } + + let mut fut = Box::pin(async { + let _permit = sem.acquire().await.unwrap(); + }); + + // Second iteration shouldn't deadlock. + for _ in 0..=1 { + let waker = futures::task::waker(Arc::new(ReleaseOnDrop( + sem.clone().try_acquire_owned().ok(), + ))); + let mut cx = std::task::Context::from_waker(&waker); + assert!(fut.as_mut().poll(&mut cx).is_pending()); + } +} \ No newline at end of file From 0922d358f7677892e392e32361b7f6a6579d1438 Mon Sep 17 00:00:00 2001 From: amab8901 Date: Sun, 19 Feb 2023 05:13:57 +0100 Subject: [PATCH 2/2] Fix fmt --- tokio/src/sync/batch_semaphore.rs | 1 - tokio/src/sync/tests/semaphore_batch.rs | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index 59a62b92d8f..a7885bdf172 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -444,7 +444,6 @@ impl Semaphore { } assert_eq!(acquired, 0); - let mut old_waker = None; // Otherwise, register the waker & enqueue the node. diff --git a/tokio/src/sync/tests/semaphore_batch.rs b/tokio/src/sync/tests/semaphore_batch.rs index bc5947099a8..c9e9d05ec8f 100644 --- a/tokio/src/sync/tests/semaphore_batch.rs +++ b/tokio/src/sync/tests/semaphore_batch.rs @@ -255,11 +255,10 @@ fn cancel_acquire_releases_permits() { #[test] fn release_permits_at_drop() { + use crate::sync::semaphore::*; use futures::task::ArcWake; use std::future::Future; use std::sync::Arc; - - use crate::sync::semaphore::*; let sem = Arc::new(Semaphore::new(1)); @@ -281,4 +280,4 @@ fn release_permits_at_drop() { let mut cx = std::task::Context::from_waker(&waker); assert!(fut.as_mut().poll(&mut cx).is_pending()); } -} \ No newline at end of file +}