From 5a17b3b852be27957785587b050af730ba083255 Mon Sep 17 00:00:00 2001
From: Peter Jankuliak
Date: Wed, 8 Mar 2023 12:09:10 +0100
Subject: [PATCH] Fix a race condition in the async code
The race condition was happening when the Sender sent a value and was
immediatelly dropped between the calls
`self.receiver.shared.recv_sync(None)` and
`self.receiver.shared.is_disconnected()`. More information on github
issue https://github.com/zesterer/flume/issues/120
---
src/async.rs | 47 +++++++++++++++++++++++++++--------------------
1 file changed, 27 insertions(+), 20 deletions(-)
diff --git a/src/async.rs b/src/async.rs
index c9e1dd1..5293f8d 100644
--- a/src/async.rs
+++ b/src/async.rs
@@ -374,29 +374,36 @@ impl<'a, T> RecvFut<'a, T> {
stream: bool,
) -> Poll> {
if self.hook.is_some() {
- if let Ok(msg) = self.receiver.shared.recv_sync(None) {
- Poll::Ready(Ok(msg))
- } else if self.receiver.shared.is_disconnected() {
- Poll::Ready(Err(RecvError::Disconnected))
- } else {
- let hook = self.hook.as_ref().map(Arc::clone).unwrap();
- if hook.update_waker(cx.waker()) {
- // If the previous hook was awakened, we need to insert it back to the
- // queue, otherwise, it remains valid.
- wait_lock(&self.receiver.shared.chan).waiting.push_back(hook);
+ match self.receiver.shared.recv_sync(None) {
+ Ok(msg) => return Poll::Ready(Ok(msg)),
+ Err(TryRecvTimeoutError::Disconnected) => {
+ return Poll::Ready(Err(RecvError::Disconnected))
}
- // To avoid a missed wakeup, re-check disconnect status here because the channel might have
- // gotten shut down before we had a chance to push our hook
- if self.receiver.shared.is_disconnected() {
- // And now, to avoid a race condition between the first recv attempt and the disconnect check we
- // just performed, attempt to recv again just in case we missed something.
- Poll::Ready(self.receiver.shared
+ _ => (),
+ }
+
+ let hook = self.hook.as_ref().map(Arc::clone).unwrap();
+ if hook.update_waker(cx.waker()) {
+ // If the previous hook was awakened, we need to insert it back to the
+ // queue, otherwise, it remains valid.
+ wait_lock(&self.receiver.shared.chan)
+ .waiting
+ .push_back(hook);
+ }
+ // To avoid a missed wakeup, re-check disconnect status here because the channel might have
+ // gotten shut down before we had a chance to push our hook
+ if self.receiver.shared.is_disconnected() {
+ // And now, to avoid a race condition between the first recv attempt and the disconnect check we
+ // just performed, attempt to recv again just in case we missed something.
+ Poll::Ready(
+ self.receiver
+ .shared
.recv_sync(None)
.map(Ok)
- .unwrap_or(Err(RecvError::Disconnected)))
- } else {
- Poll::Pending
- }
+ .unwrap_or(Err(RecvError::Disconnected)),
+ )
+ } else {
+ Poll::Pending
}
} else {
let mut_self = self.get_mut();