diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index c5c82f90fd8..14b22cbd5ee 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -357,7 +357,7 @@ mod big_notify { use self::state::{AtomicState, Version}; mod state { use crate::loom::sync::atomic::AtomicUsize; - use crate::loom::sync::atomic::Ordering::SeqCst; + use crate::loom::sync::atomic::Ordering; const CLOSED_BIT: usize = 1; @@ -377,6 +377,11 @@ mod state { pub(super) struct StateSnapshot(usize); /// The state stored in an atomic integer. + /// + /// The `Sender` uses `Release` ordering for storing a new state + /// and the `Receiver`s use `Acquire` ordering for loading the + /// current state. This ensures that written values are seen by + /// the `Receiver`s for a proper handover. #[derive(Debug)] pub(super) struct AtomicState(AtomicUsize); @@ -406,24 +411,35 @@ mod state { impl AtomicState { /// Create a new `AtomicState` that is not closed and which has the - /// version set to `Version::initial()`. + /// version set to `Version::INITIAL`. pub(super) fn new() -> Self { AtomicState(AtomicUsize::new(Version::INITIAL.0)) } /// Load the current value of the state. + /// + /// Only used by the receiver and for debugging purposes. + /// + /// The receiver side (read-only) uses `Acquire` ordering for a proper handover + /// of the shared value with the sender side (single writer). The state is always + /// updated after modifying and before releasing the (exclusive) lock on the + /// shared value. pub(super) fn load(&self) -> StateSnapshot { - StateSnapshot(self.0.load(SeqCst)) + StateSnapshot(self.0.load(Ordering::Acquire)) } /// Increment the version counter. - pub(super) fn increment_version(&self) { - self.0.fetch_add(STEP_SIZE, SeqCst); + pub(super) fn increment_version_while_locked(&self) { + // Use `Release` ordering to ensure that the shared value + // has been written before updating the version. The shared + // value is still protected by an exclusive lock during this + // method. + self.0.fetch_add(STEP_SIZE, Ordering::Release); } /// Set the closed bit in the state. pub(super) fn set_closed(&self) { - self.0.fetch_or(CLOSED_BIT, SeqCst); + self.0.fetch_or(CLOSED_BIT, Ordering::Release); } } } @@ -1061,7 +1077,7 @@ impl Sender { } }; - self.shared.state.increment_version(); + self.shared.state.increment_version_while_locked(); // Release the write lock. //