From 844dc9be2f95e2403dc50562333090af7d2d20a5 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 12 Nov 2021 09:42:00 -0800 Subject: [PATCH 1/8] loom test for #4225 Signed-off-by: Eliza Weisman --- tokio/src/sync/tests/loom_oneshot.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tokio/src/sync/tests/loom_oneshot.rs b/tokio/src/sync/tests/loom_oneshot.rs index 9729cfb73d3..1aacf0f10e5 100644 --- a/tokio/src/sync/tests/loom_oneshot.rs +++ b/tokio/src/sync/tests/loom_oneshot.rs @@ -55,6 +55,21 @@ fn changing_rx_task() { }); } +#[test] +fn recv_close() { + // reproduces https://github.com/tokio-rs/tokio/issues/4225 + loom::model(|| { + let (tx, mut rx) = oneshot::channel(); + thread::spawn(move || { + let _ = tx.send(()); + }); + thread::spawn(move || { + rx.close(); + let _ = rx.try_recv(); + }); + }) +} + // TODO: Move this into `oneshot` proper. use std::future::Future; From c8f82d45060c9e936ab49cd3838135208de0924d Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 12 Nov 2021 09:58:46 -0800 Subject: [PATCH 2/8] oneshot: prioritize closed over complete This fixes the race. Signed-off-by: Eliza Weisman --- tokio/src/sync/oneshot.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index c23fcb32ba0..dde513bcac9 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -839,13 +839,15 @@ impl Receiver { let result = if let Some(inner) = self.inner.as_ref() { let state = State::load(&inner.state, Acquire); - if state.is_complete() { + // First, check if the channel has been closed. If the channel is + // closed, return an error. + if state.is_closed() { + Err(TryRecvError::Closed) + } else if state.is_complete() { match unsafe { inner.consume_value() } { Some(value) => Ok(value), None => Err(TryRecvError::Closed), } - } else if state.is_closed() { - Err(TryRecvError::Closed) } else { // Not ready, this does not clear `inner` return Err(TryRecvError::Empty); From 5ae003c200b8d69259e3826733cb2a6a2f4d6476 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 12 Nov 2021 10:00:37 -0800 Subject: [PATCH 3/8] add a similar test for `poll_recv` Signed-off-by: Eliza Weisman --- tokio/src/sync/tests/loom_oneshot.rs | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/tests/loom_oneshot.rs b/tokio/src/sync/tests/loom_oneshot.rs index 1aacf0f10e5..f7156a1b54e 100644 --- a/tokio/src/sync/tests/loom_oneshot.rs +++ b/tokio/src/sync/tests/loom_oneshot.rs @@ -56,18 +56,32 @@ fn changing_rx_task() { } #[test] -fn recv_close() { +fn try_recv_close() { // reproduces https://github.com/tokio-rs/tokio/issues/4225 loom::model(|| { let (tx, mut rx) = oneshot::channel(); thread::spawn(move || { let _ = tx.send(()); }); + + rx.close(); + let _ = rx.try_recv(); + }) +} + +#[test] +fn recv_closed() { + // reproduces https://github.com/tokio-rs/tokio/issues/4225 + loom::model(|| { + let (tx, mut rx) = oneshot::channel(); + thread::spawn(move || { - rx.close(); - let _ = rx.try_recv(); + tx.send(1).unwrap(); }); - }) + + rx.close(); + let _ = block_on(rx); + }); } // TODO: Move this into `oneshot` proper. From a6b60ed8010c5a185c5096b6fb5f5b7648da973b Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 12 Nov 2021 10:01:17 -0800 Subject: [PATCH 4/8] also prioritize closed in poll_recv Signed-off-by: Eliza Weisman --- tokio/src/sync/oneshot.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index dde513bcac9..6bff76f6ee5 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -493,7 +493,7 @@ impl Sender { *ptr = Some(t); }); - if !inner.complete() { + if dbg!(!inner.complete()) { unsafe { return Err(inner.consume_value().unwrap()); } @@ -887,7 +887,7 @@ impl Future for Receiver { impl Inner { fn complete(&self) -> bool { - let prev = State::set_complete(&self.state); + let prev = dbg!(State::set_complete(&self.state)); if prev.is_closed() { return false; @@ -910,15 +910,15 @@ impl Inner { // Load the state let mut state = State::load(&self.state, Acquire); - if state.is_complete() { + if state.is_closed() { + coop.made_progress(); + Ready(Err(RecvError(()))) + } else if state.is_complete() { coop.made_progress(); match unsafe { self.consume_value() } { Some(value) => Ready(Ok(value)), None => Ready(Err(RecvError(()))), } - } else if state.is_closed() { - coop.made_progress(); - Ready(Err(RecvError(()))) } else { if state.is_rx_task_set() { let will_notify = unsafe { self.rx_task.will_wake(cx) }; @@ -968,7 +968,7 @@ impl Inner { /// Called by `Receiver` to indicate that the value will never be received. fn close(&self) { - let prev = State::set_closed(&self.state); + let prev = dbg!(State::set_closed(&self.state)); if prev.is_tx_task_set() && !prev.is_complete() { unsafe { From 54b740d47cf9ac7bd306acdbbdb0074c30686a12 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 12 Nov 2021 10:15:04 -0800 Subject: [PATCH 5/8] remove dbgs Signed-off-by: Eliza Weisman --- tokio/src/sync/oneshot.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 6bff76f6ee5..d20eef89c1d 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -493,7 +493,7 @@ impl Sender { *ptr = Some(t); }); - if dbg!(!inner.complete()) { + if !inner.complete() { unsafe { return Err(inner.consume_value().unwrap()); } @@ -887,7 +887,7 @@ impl Future for Receiver { impl Inner { fn complete(&self) -> bool { - let prev = dbg!(State::set_complete(&self.state)); + let prev = State::set_complete(&self.state); if prev.is_closed() { return false; @@ -968,7 +968,7 @@ impl Inner { /// Called by `Receiver` to indicate that the value will never be received. fn close(&self) { - let prev = dbg!(State::set_closed(&self.state)); + let prev = State::set_closed(&self.state); if prev.is_tx_task_set() && !prev.is_complete() { unsafe { From 2324301490c773cb16df7d9a784d0c83c3db21ed Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 12 Nov 2021 10:22:58 -0800 Subject: [PATCH 6/8] fix test failing when the channel's closed Signed-off-by: Eliza Weisman --- tokio/src/sync/tests/loom_oneshot.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/tests/loom_oneshot.rs b/tokio/src/sync/tests/loom_oneshot.rs index f7156a1b54e..c5f79720794 100644 --- a/tokio/src/sync/tests/loom_oneshot.rs +++ b/tokio/src/sync/tests/loom_oneshot.rs @@ -76,7 +76,7 @@ fn recv_closed() { let (tx, mut rx) = oneshot::channel(); thread::spawn(move || { - tx.send(1).unwrap(); + let _ = tx.send(1); }); rx.close(); From ef15fbaf88ca263e2f6f492c5a8176439126773e Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 12 Nov 2021 10:23:54 -0800 Subject: [PATCH 7/8] restore prev behavior: honor completed over closed Signed-off-by: Eliza Weisman --- tokio/src/sync/oneshot.rs | 40 +++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index d20eef89c1d..244ad124375 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -839,15 +839,13 @@ impl Receiver { let result = if let Some(inner) = self.inner.as_ref() { let state = State::load(&inner.state, Acquire); - // First, check if the channel has been closed. If the channel is - // closed, return an error. - if state.is_closed() { - Err(TryRecvError::Closed) - } else if state.is_complete() { + if state.is_complete() { match unsafe { inner.consume_value() } { Some(value) => Ok(value), None => Err(TryRecvError::Closed), } + } else if state.is_closed() { + Err(TryRecvError::Closed) } else { // Not ready, this does not clear `inner` return Err(TryRecvError::Empty); @@ -910,15 +908,15 @@ impl Inner { // Load the state let mut state = State::load(&self.state, Acquire); - if state.is_closed() { - coop.made_progress(); - Ready(Err(RecvError(()))) - } else if state.is_complete() { + if state.is_complete() { coop.made_progress(); match unsafe { self.consume_value() } { Some(value) => Ready(Ok(value)), None => Ready(Err(RecvError(()))), } + } else if state.is_closed() { + coop.made_progress(); + Ready(Err(RecvError(()))) } else { if state.is_rx_task_set() { let will_notify = unsafe { self.rx_task.will_wake(cx) }; @@ -1033,11 +1031,25 @@ impl State { } fn set_complete(cell: &AtomicUsize) -> State { - // TODO: This could be `Release`, followed by an `Acquire` fence *if* - // the `RX_TASK_SET` flag is set. However, `loom` does not support - // fences yet. - let val = cell.fetch_or(VALUE_SENT, AcqRel); - State(val) + let mut state = cell.load(Ordering::Relaxed); + loop { + if State(state).is_closed() { + break; + } + // TODO: This could be `Release`, followed by an `Acquire` fence *if* + // the `RX_TASK_SET` flag is set. However, `loom` does not support + // fences yet. + match cell.compare_exchange_weak( + state, + state | VALUE_SENT, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(actual) => state = actual, + } + } + State(state) } fn is_rx_task_set(self) -> bool { From 897e71e8ef77289e404ffb74409e5f00645aa631 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 12 Nov 2021 10:34:21 -0800 Subject: [PATCH 8/8] add comments in `set_complete` Signed-off-by: Eliza Weisman --- tokio/src/sync/oneshot.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 244ad124375..3fa7e4653a6 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -1031,6 +1031,19 @@ impl State { } fn set_complete(cell: &AtomicUsize) -> State { + // This method is a compare-and-swap loop rather than a fetch-or like + // other `set_$WHATEVER` methods on `State`. This is because we must + // check if the state has been closed before setting the `VALUE_SENT` + // bit. + // + // We don't want to set both the `VALUE_SENT` bit if the `CLOSED` + // bit is already set, because `VALUE_SENT` will tell the receiver that + // it's okay to access the inner `UnsafeCell`. Immediately after calling + // `set_complete`, if the channel was closed, the sender will _also_ + // access the `UnsafeCell` to take the value back out, so if a + // `poll_recv` or `try_recv` call is occurring concurrently, both + // threads may try to access the `UnsafeCell` if we were to set the + // `VALUE_SENT` bit on a closed channel. let mut state = cell.load(Ordering::Relaxed); loop { if State(state).is_closed() {