From 9f1723b15bf2e475a58816a33b8596283a8adb9c Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Tue, 19 Sep 2023 22:03:28 +0200 Subject: [PATCH 01/10] sync::watch: Use Acquire/Release memory ordering instead of SeqCst --- tokio/src/sync/watch.rs | 54 ++++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 61307fce47d..edd39ad5f74 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -114,7 +114,7 @@ use crate::sync::notify::Notify; use crate::loom::sync::atomic::AtomicUsize; -use crate::loom::sync::atomic::Ordering::Relaxed; +use crate::loom::sync::atomic::Ordering; use crate::loom::sync::{Arc, RwLock, RwLockReadGuard}; use std::fmt; use std::mem; @@ -247,7 +247,8 @@ struct Shared { impl fmt::Debug for Shared { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let state = self.state.load(); + // Using `Relaxed` ordering is sufficient for this purpose. + let state = self.state.load(Ordering::Relaxed); f.debug_struct("Shared") .field("value", &self.value) .field("version", &state.version()) @@ -341,7 +342,7 @@ mod big_notify { /// This function implements the case where randomness is not available. #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))] pub(super) fn notified(&self) -> Notified<'_> { - let i = self.next.fetch_add(1, Relaxed) % 8; + let i = self.next.fetch_add(1, Ordering::Relaxed) % 8; self.inner[i].notified() } @@ -357,7 +358,7 @@ mod big_notify { use self::state::{AtomicState, Version}; mod state { use crate::loom::sync::atomic::AtomicUsize; - use crate::loom::sync::atomic::Ordering::SeqCst; + use crate::loom::sync::atomic::Ordering; const CLOSED_BIT: usize = 1; @@ -377,6 +378,11 @@ mod state { pub(super) struct StateSnapshot(usize); /// The state stored in an atomic integer. + /// + /// The `Sender` uses `Release` ordering for storing a new state + /// and the `Receiver`s use `Acquire` ordering for loading the + /// current state. This ensures that written values are seen by + /// the `Receiver`s for a proper handover. #[derive(Debug)] pub(super) struct AtomicState(AtomicUsize); @@ -412,18 +418,32 @@ mod state { } /// Load the current value of the state. - pub(super) fn load(&self) -> StateSnapshot { - StateSnapshot(self.0.load(SeqCst)) + pub(super) fn load(&self, ordering: Ordering) -> StateSnapshot { + StateSnapshot(self.0.load(ordering)) + } + + /// Load the current value of the state. + /// + /// The receiver side (read-only) uses `Acquire` ordering for a proper handover + /// with the sender side (single writer). + pub(super) fn load_receiver(&self) -> StateSnapshot { + StateSnapshot(self.0.load(Ordering::Acquire)) } /// Increment the version counter. pub(super) fn increment_version(&self) { - self.0.fetch_add(STEP_SIZE, SeqCst); + // Use `Release` ordering to ensure that storing the version + // state is seen by the receiver side that uses `Acquire` for + // loading the state. + self.0.fetch_add(STEP_SIZE, Ordering::Release); } /// Set the closed bit in the state. pub(super) fn set_closed(&self) { - self.0.fetch_or(CLOSED_BIT, SeqCst); + // Use `Release` ordering to ensure that storing the version + // state is seen by the receiver side that uses `Acquire` for + // loading the state. + self.0.fetch_or(CLOSED_BIT, Ordering::Release); } } } @@ -489,7 +509,7 @@ impl Receiver { fn from_shared(version: Version, shared: Arc>) -> Self { // No synchronization necessary as this is only used as a counter and // not memory access. - shared.ref_count_rx.fetch_add(1, Relaxed); + shared.ref_count_rx.fetch_add(1, Ordering::Relaxed); Self { shared, version } } @@ -543,7 +563,7 @@ impl Receiver { // After obtaining a read-lock no concurrent writes could occur // and the loaded version matches that of the borrowed reference. - let new_version = self.shared.state.load().version(); + let new_version = self.shared.state.load_receiver().version(); let has_changed = self.version != new_version; Ref { inner, has_changed } @@ -590,7 +610,7 @@ impl Receiver { // After obtaining a read-lock no concurrent writes could occur // and the loaded version matches that of the borrowed reference. - let new_version = self.shared.state.load().version(); + let new_version = self.shared.state.load_receiver().version(); let has_changed = self.version != new_version; // Mark the shared value as seen by updating the version @@ -631,7 +651,7 @@ impl Receiver { /// ``` pub fn has_changed(&self) -> Result { // Load the version from the state - let state = self.shared.state.load(); + let state = self.shared.state.load_receiver(); if state.is_closed() { // The sender has dropped. return Err(error::RecvError(())); @@ -768,7 +788,7 @@ impl Receiver { { let inner = self.shared.value.read().unwrap(); - let new_version = self.shared.state.load().version(); + let new_version = self.shared.state.load_receiver().version(); let has_changed = self.version != new_version; self.version = new_version; @@ -814,7 +834,7 @@ fn maybe_changed( version: &mut Version, ) -> Option> { // Load the version from the state - let state = shared.state.load(); + let state = shared.state.load_receiver(); let new_version = state.version(); if *version != new_version { @@ -865,7 +885,7 @@ impl Drop for Receiver { fn drop(&mut self) { // No synchronization necessary as this is only used as a counter and // not memory access. - if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) { + if 1 == self.shared.ref_count_rx.fetch_sub(1, Ordering::Relaxed) { // This is the last `Receiver` handle, tasks waiting on `Sender::closed()` self.shared.notify_tx.notify_waiters(); } @@ -1228,7 +1248,7 @@ impl Sender { /// ``` pub fn subscribe(&self) -> Receiver { let shared = self.shared.clone(); - let version = shared.state.load().version(); + let version = shared.state.load_receiver().version(); // The CLOSED bit in the state tracks only whether the sender is // dropped, so we do not need to unset it if this reopens the channel. @@ -1254,7 +1274,7 @@ impl Sender { /// } /// ``` pub fn receiver_count(&self) -> usize { - self.shared.ref_count_rx.load(Relaxed) + self.shared.ref_count_rx.load(Ordering::Relaxed) } } From e5daae76ee276611079c3bec4507cfce4958bbaf Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Tue, 19 Sep 2023 23:09:53 +0200 Subject: [PATCH 02/10] Use wildcard import to reduce diff --- tokio/src/sync/watch.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index edd39ad5f74..4c1c8d772c3 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -114,7 +114,7 @@ use crate::sync::notify::Notify; use crate::loom::sync::atomic::AtomicUsize; -use crate::loom::sync::atomic::Ordering; +use crate::loom::sync::atomic::Ordering::*; use crate::loom::sync::{Arc, RwLock, RwLockReadGuard}; use std::fmt; use std::mem; @@ -248,7 +248,7 @@ struct Shared { impl fmt::Debug for Shared { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // Using `Relaxed` ordering is sufficient for this purpose. - let state = self.state.load(Ordering::Relaxed); + let state = self.state.load(Relaxed); f.debug_struct("Shared") .field("value", &self.value) .field("version", &state.version()) @@ -342,7 +342,7 @@ mod big_notify { /// This function implements the case where randomness is not available. #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))] pub(super) fn notified(&self) -> Notified<'_> { - let i = self.next.fetch_add(1, Ordering::Relaxed) % 8; + let i = self.next.fetch_add(1, Relaxed) % 8; self.inner[i].notified() } @@ -509,7 +509,7 @@ impl Receiver { fn from_shared(version: Version, shared: Arc>) -> Self { // No synchronization necessary as this is only used as a counter and // not memory access. - shared.ref_count_rx.fetch_add(1, Ordering::Relaxed); + shared.ref_count_rx.fetch_add(1, Relaxed); Self { shared, version } } @@ -885,7 +885,7 @@ impl Drop for Receiver { fn drop(&mut self) { // No synchronization necessary as this is only used as a counter and // not memory access. - if 1 == self.shared.ref_count_rx.fetch_sub(1, Ordering::Relaxed) { + if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) { // This is the last `Receiver` handle, tasks waiting on `Sender::closed()` self.shared.notify_tx.notify_waiters(); } @@ -1274,7 +1274,7 @@ impl Sender { /// } /// ``` pub fn receiver_count(&self) -> usize { - self.shared.ref_count_rx.load(Ordering::Relaxed) + self.shared.ref_count_rx.load(Relaxed) } } From fb38b9055093baeb00f0ff7a517ec7382e79a992 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Wed, 20 Sep 2023 00:47:35 +0200 Subject: [PATCH 03/10] Use a single load() method with fixed memory ordering --- tokio/src/sync/watch.rs | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 4c1c8d772c3..86b261f2c4b 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -247,8 +247,7 @@ struct Shared { impl fmt::Debug for Shared { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // Using `Relaxed` ordering is sufficient for this purpose. - let state = self.state.load(Relaxed); + let state = self.state.load(); f.debug_struct("Shared") .field("value", &self.value) .field("version", &state.version()) @@ -418,15 +417,14 @@ mod state { } /// Load the current value of the state. - pub(super) fn load(&self, ordering: Ordering) -> StateSnapshot { - StateSnapshot(self.0.load(ordering)) - } - - /// Load the current value of the state. + /// + /// Only used by the receiver and for debugging purposes. /// /// The receiver side (read-only) uses `Acquire` ordering for a proper handover - /// with the sender side (single writer). - pub(super) fn load_receiver(&self) -> StateSnapshot { + /// of the shared value with the sender side (single writer). The state is always + /// updated after modifying and before releasing the (exclusive) lock on the + /// shared value. + pub(super) fn load(&self) -> StateSnapshot { StateSnapshot(self.0.load(Ordering::Acquire)) } @@ -563,7 +561,7 @@ impl Receiver { // After obtaining a read-lock no concurrent writes could occur // and the loaded version matches that of the borrowed reference. - let new_version = self.shared.state.load_receiver().version(); + let new_version = self.shared.state.load().version(); let has_changed = self.version != new_version; Ref { inner, has_changed } @@ -610,7 +608,7 @@ impl Receiver { // After obtaining a read-lock no concurrent writes could occur // and the loaded version matches that of the borrowed reference. - let new_version = self.shared.state.load_receiver().version(); + let new_version = self.shared.state.load().version(); let has_changed = self.version != new_version; // Mark the shared value as seen by updating the version @@ -651,7 +649,7 @@ impl Receiver { /// ``` pub fn has_changed(&self) -> Result { // Load the version from the state - let state = self.shared.state.load_receiver(); + let state = self.shared.state.load(); if state.is_closed() { // The sender has dropped. return Err(error::RecvError(())); @@ -788,7 +786,7 @@ impl Receiver { { let inner = self.shared.value.read().unwrap(); - let new_version = self.shared.state.load_receiver().version(); + let new_version = self.shared.state.load().version(); let has_changed = self.version != new_version; self.version = new_version; @@ -834,7 +832,7 @@ fn maybe_changed( version: &mut Version, ) -> Option> { // Load the version from the state - let state = shared.state.load_receiver(); + let state = shared.state.load(); let new_version = state.version(); if *version != new_version { @@ -1248,7 +1246,7 @@ impl Sender { /// ``` pub fn subscribe(&self) -> Receiver { let shared = self.shared.clone(); - let version = shared.state.load_receiver().version(); + let version = shared.state.load().version(); // The CLOSED bit in the state tracks only whether the sender is // dropped, so we do not need to unset it if this reopens the channel. From 8dcfb5607fd1a96db9e4e5b44a8e2f9dc217c850 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Wed, 20 Sep 2023 00:49:21 +0200 Subject: [PATCH 04/10] Use a verbose method name and fix comments for incrementing the version --- tokio/src/sync/watch.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 86b261f2c4b..d1ed95e993b 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -429,10 +429,11 @@ mod state { } /// Increment the version counter. - pub(super) fn increment_version(&self) { - // Use `Release` ordering to ensure that storing the version - // state is seen by the receiver side that uses `Acquire` for - // loading the state. + pub(super) fn increment_version_after_updating_shared_value_while_locked(&self) { + // Use `Release` ordering to ensure that the shared value + // has been written before updating the version. The shared + // value is still protected by an exclusive lock during this + // method. self.0.fetch_add(STEP_SIZE, Ordering::Release); } @@ -1064,7 +1065,7 @@ impl Sender { } }; - self.shared.state.increment_version(); + self.shared.state.increment_version_after_updating_shared_value_while_locked(); // Release the write lock. // From ccbe55cbd09fa62aec999fbcf7609c3341cd2bcc Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Wed, 20 Sep 2023 00:51:19 +0200 Subject: [PATCH 05/10] Use Relaxed ordering for setting the CLOSED bit --- tokio/src/sync/watch.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index d1ed95e993b..eb6416be82e 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -439,10 +439,8 @@ mod state { /// Set the closed bit in the state. pub(super) fn set_closed(&self) { - // Use `Release` ordering to ensure that storing the version - // state is seen by the receiver side that uses `Acquire` for - // loading the state. - self.0.fetch_or(CLOSED_BIT, Ordering::Release); + // Relaxed ordering is sufficient here. + self.0.fetch_or(CLOSED_BIT, Ordering::Relaxed); } } } From 64f1e23ce6ddb1f9519b9462e9d9e4db1a0a19e4 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Wed, 20 Sep 2023 01:16:47 +0200 Subject: [PATCH 06/10] Fix formatting --- tokio/src/sync/watch.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index eb6416be82e..9f796774293 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -1063,7 +1063,9 @@ impl Sender { } }; - self.shared.state.increment_version_after_updating_shared_value_while_locked(); + self.shared + .state + .increment_version_after_updating_shared_value_while_locked(); // Release the write lock. // From c0b73f00fc78f666c19d39da420d0d7e1ee98975 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Wed, 20 Sep 2023 14:05:46 +0200 Subject: [PATCH 07/10] Update tokio/src/sync/watch.rs Co-authored-by: Alice Ryhl --- tokio/src/sync/watch.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 9f796774293..2391370ca05 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -439,8 +439,7 @@ mod state { /// Set the closed bit in the state. pub(super) fn set_closed(&self) { - // Relaxed ordering is sufficient here. - self.0.fetch_or(CLOSED_BIT, Ordering::Relaxed); + self.0.fetch_or(CLOSED_BIT, Ordering::Release); } } } From 2f3830d9f61d4a316988071161b76ef0203f4f2b Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Wed, 20 Sep 2023 14:08:55 +0200 Subject: [PATCH 08/10] Simplify function name --- tokio/src/sync/watch.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 2391370ca05..13f3212a577 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -429,7 +429,7 @@ mod state { } /// Increment the version counter. - pub(super) fn increment_version_after_updating_shared_value_while_locked(&self) { + pub(super) fn increment_version_while_locked(&self) { // Use `Release` ordering to ensure that the shared value // has been written before updating the version. The shared // value is still protected by an exclusive lock during this @@ -1062,9 +1062,7 @@ impl Sender { } }; - self.shared - .state - .increment_version_after_updating_shared_value_while_locked(); + self.shared.state.increment_version_while_locked(); // Release the write lock. // From ee45f0733a74f3225f864c1bf00ab59ecf60d2c8 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Thu, 21 Sep 2023 00:02:45 +0200 Subject: [PATCH 09/10] Fix comment --- tokio/src/sync/watch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 13f3212a577..d050adc7b67 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -411,7 +411,7 @@ mod state { impl AtomicState { /// Create a new `AtomicState` that is not closed and which has the - /// version set to `Version::initial()`. + /// version set to `Version::INITIAL`. pub(super) fn new() -> Self { AtomicState(AtomicUsize::new(Version::INITIAL.0)) } From 63399619b64211d99c5ae5d321d3e92514774177 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Sun, 24 Sep 2023 17:37:14 +0200 Subject: [PATCH 10/10] Revert wildcard import --- tokio/src/sync/watch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index d050adc7b67..67b6bdbd4c8 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -114,7 +114,7 @@ use crate::sync::notify::Notify; use crate::loom::sync::atomic::AtomicUsize; -use crate::loom::sync::atomic::Ordering::*; +use crate::loom::sync::atomic::Ordering::Relaxed; use crate::loom::sync::{Arc, RwLock, RwLockReadGuard}; use std::fmt; use std::mem;