Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync::watch: Use Acquire/Release memory ordering instead of SeqCst #6018

Merged
merged 11 commits into from
Sep 24, 2023
33 changes: 26 additions & 7 deletions tokio/src/sync/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@
use crate::sync::notify::Notify;

use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::atomic::Ordering::Relaxed;
use crate::loom::sync::atomic::Ordering::*;
uklotzde marked this conversation as resolved.
Show resolved Hide resolved
use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
use std::fmt;
use std::mem;
Expand Down Expand Up @@ -357,7 +357,7 @@ mod big_notify {
use self::state::{AtomicState, Version};
mod state {
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::atomic::Ordering::SeqCst;
use crate::loom::sync::atomic::Ordering;

const CLOSED_BIT: usize = 1;

Expand All @@ -377,6 +377,11 @@ mod state {
pub(super) struct StateSnapshot(usize);

/// The state stored in an atomic integer.
///
/// The `Sender` uses `Release` ordering for storing a new state
/// and the `Receiver`s use `Acquire` ordering for loading the
/// current state. This ensures that written values are seen by
/// the `Receiver`s for a proper handover.
#[derive(Debug)]
pub(super) struct AtomicState(AtomicUsize);

Expand Down Expand Up @@ -412,18 +417,30 @@ mod state {
}

/// Load the current value of the state.
///
/// Only used by the receiver and for debugging purposes.
///
/// The receiver side (read-only) uses `Acquire` ordering for a proper handover
/// of the shared value with the sender side (single writer). The state is always
/// updated after modifying and before releasing the (exclusive) lock on the
/// shared value.
pub(super) fn load(&self) -> StateSnapshot {
StateSnapshot(self.0.load(SeqCst))
StateSnapshot(self.0.load(Ordering::Acquire))
}

/// Increment the version counter.
pub(super) fn increment_version(&self) {
self.0.fetch_add(STEP_SIZE, SeqCst);
pub(super) fn increment_version_after_updating_shared_value_while_locked(&self) {
uklotzde marked this conversation as resolved.
Show resolved Hide resolved
// Use `Release` ordering to ensure that the shared value
// has been written before updating the version. The shared
// value is still protected by an exclusive lock during this
// method.
self.0.fetch_add(STEP_SIZE, Ordering::Release);
}

/// Set the closed bit in the state.
pub(super) fn set_closed(&self) {
self.0.fetch_or(CLOSED_BIT, SeqCst);
// Relaxed ordering is sufficient here.
self.0.fetch_or(CLOSED_BIT, Ordering::Relaxed);
uklotzde marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down Expand Up @@ -1046,7 +1063,9 @@ impl<T> Sender<T> {
}
};

self.shared.state.increment_version();
self.shared
.state
.increment_version_after_updating_shared_value_while_locked();

// Release the write lock.
//
Expand Down
Loading