diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index 57493f4bd2c..a7885bdf172 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -444,6 +444,7 @@ impl Semaphore { } assert_eq!(acquired, 0); + let mut old_waker = None; // Otherwise, register the waker & enqueue the node. node.waker.with_mut(|waker| { @@ -455,7 +456,7 @@ impl Semaphore { .map(|waker| !waker.will_wake(cx.waker())) .unwrap_or(true) { - *waker = Some(cx.waker().clone()); + old_waker = std::mem::replace(waker, Some(cx.waker().clone())); } }); @@ -468,6 +469,8 @@ impl Semaphore { waiters.queue.push_front(node); } + drop(waiters); + drop(old_waker); Pending } diff --git a/tokio/src/sync/tests/semaphore_batch.rs b/tokio/src/sync/tests/semaphore_batch.rs index d4e35aa7b2f..c9e9d05ec8f 100644 --- a/tokio/src/sync/tests/semaphore_batch.rs +++ b/tokio/src/sync/tests/semaphore_batch.rs @@ -252,3 +252,32 @@ fn cancel_acquire_releases_permits() { assert_eq!(6, s.available_permits()); assert_ok!(s.try_acquire(6)); } + +#[test] +fn release_permits_at_drop() { + use crate::sync::semaphore::*; + use futures::task::ArcWake; + use std::future::Future; + use std::sync::Arc; + + let sem = Arc::new(Semaphore::new(1)); + + struct ReleaseOnDrop(Option); + + impl ArcWake for ReleaseOnDrop { + fn wake_by_ref(_arc_self: &Arc) {} + } + + let mut fut = Box::pin(async { + let _permit = sem.acquire().await.unwrap(); + }); + + // Second iteration shouldn't deadlock. + for _ in 0..=1 { + let waker = futures::task::waker(Arc::new(ReleaseOnDrop( + sem.clone().try_acquire_owned().ok(), + ))); + let mut cx = std::task::Context::from_waker(&waker); + assert!(fut.as_mut().poll(&mut cx).is_pending()); + } +}