diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 5e4d8612149..c87baaa14c5 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -902,11 +902,14 @@ impl Shared { // Safety: `tail` lock is still held. let waiter = unsafe { waiter.as_mut() }; - assert!(waiter.queued.swap(false, Release)); - if let Some(waker) = waiter.waker.take() { wakers.push(waker); } + + // `Release` is needed to synchronize with `Recv::drop`. + // It is critical to set this variable **after** waker + // is extracted, otherwise we may data race with `Recv::drop`. + assert!(waiter.queued.swap(false, Release)); } None => { break 'outer; @@ -1104,6 +1107,11 @@ impl Receiver { } } + // If the waiter is not already queued, enqueue it. + // Relaxed memory order suffices because we don't need + // to synchronize with `Recv::drop` here (calling + // `Receiver::recv_ref` with a waiter implies ownership + // of the corresponding `Recv`). if !(*ptr).queued.swap(true, Relaxed) { tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr)); } @@ -1401,16 +1409,23 @@ where impl<'a, T> Drop for Recv<'a, T> { fn drop(&mut self) { + // Safety: `waiter.queued` is atomic. + // Acquire ordering is required to synchronize with + // `Shared::notify_rx` before we drop the object. let queued = self .waiter .with(|ptr| unsafe { (*ptr).queued.load(Acquire) }); + // If the waiter is queued, we need to unlink it from the waiters list. + // If not, no further synchronization is required, since the waiter + // is not in the list and, as such, is not shared with any other threads. if queued { // Acquire the tail lock. This is required for safety before accessing // the waiter node. let mut tail = self.receiver.shared.tail.lock(); - // safety: tail lock is held + // Safety: tail lock is held. + // Relaxed order suffices because we hold the tail lock. let queued = self .waiter .with(|ptr| unsafe { (*ptr).queued.load(Relaxed) });