Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix mpsc::SyncSender spinning behavior #106701

Merged
merged 3 commits into from
Jan 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions library/std/src/sync/mpmc/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl<T> Channel<T> {
return true;
}
Err(_) => {
backoff.spin();
backoff.spin_light();
tail = self.tail.load(Ordering::Relaxed);
}
}
Expand All @@ -182,11 +182,11 @@ impl<T> Channel<T> {
return false;
}

backoff.spin();
backoff.spin_light();
tail = self.tail.load(Ordering::Relaxed);
} else {
// Snooze because we need to wait for the stamp to get updated.
backoff.snooze();
backoff.spin_heavy();
tail = self.tail.load(Ordering::Relaxed);
}
}
Expand Down Expand Up @@ -251,7 +251,7 @@ impl<T> Channel<T> {
return true;
}
Err(_) => {
backoff.spin();
backoff.spin_light();
head = self.head.load(Ordering::Relaxed);
}
}
Expand All @@ -273,11 +273,11 @@ impl<T> Channel<T> {
}
}

backoff.spin();
backoff.spin_light();
head = self.head.load(Ordering::Relaxed);
} else {
// Snooze because we need to wait for the stamp to get updated.
backoff.snooze();
backoff.spin_heavy();
head = self.head.load(Ordering::Relaxed);
}
}
Expand Down Expand Up @@ -330,7 +330,7 @@ impl<T> Channel<T> {
if backoff.is_completed() {
break;
} else {
backoff.spin();
backoff.spin_light();
}
}

Expand Down
16 changes: 8 additions & 8 deletions library/std/src/sync/mpmc/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl<T> Slot<T> {
fn wait_write(&self) {
let backoff = Backoff::new();
while self.state.load(Ordering::Acquire) & WRITE == 0 {
backoff.snooze();
backoff.spin_heavy();
}
}
}
Expand Down Expand Up @@ -82,7 +82,7 @@ impl<T> Block<T> {
if !next.is_null() {
return next;
}
backoff.snooze();
backoff.spin_heavy();
}
}

Expand Down Expand Up @@ -191,7 +191,7 @@ impl<T> Channel<T> {

// If we reached the end of the block, wait until the next one is installed.
if offset == BLOCK_CAP {
backoff.snooze();
backoff.spin_heavy();
tail = self.tail.index.load(Ordering::Acquire);
block = self.tail.block.load(Ordering::Acquire);
continue;
Expand Down Expand Up @@ -247,7 +247,7 @@ impl<T> Channel<T> {
return true;
},
Err(_) => {
backoff.spin();
backoff.spin_light();
tail = self.tail.index.load(Ordering::Acquire);
block = self.tail.block.load(Ordering::Acquire);
}
Expand Down Expand Up @@ -286,7 +286,7 @@ impl<T> Channel<T> {

// If we reached the end of the block, wait until the next one is installed.
if offset == BLOCK_CAP {
backoff.snooze();
backoff.spin_heavy();
head = self.head.index.load(Ordering::Acquire);
block = self.head.block.load(Ordering::Acquire);
continue;
Expand Down Expand Up @@ -320,7 +320,7 @@ impl<T> Channel<T> {
// The block can be null here only if the first message is being sent into the channel.
// In that case, just wait until it gets initialized.
if block.is_null() {
backoff.snooze();
backoff.spin_heavy();
head = self.head.index.load(Ordering::Acquire);
block = self.head.block.load(Ordering::Acquire);
continue;
Expand Down Expand Up @@ -351,7 +351,7 @@ impl<T> Channel<T> {
return true;
},
Err(_) => {
backoff.spin();
backoff.spin_light();
head = self.head.index.load(Ordering::Acquire);
block = self.head.block.load(Ordering::Acquire);
}
Expand Down Expand Up @@ -542,7 +542,7 @@ impl<T> Channel<T> {
// New updates to tail will be rejected by MARK_BIT and aborted unless it's
// at boundary. We need to wait for the updates take affect otherwise there
// can be memory leaks.
backoff.snooze();
backoff.spin_heavy();
tail = self.tail.index.load(Ordering::Acquire);
}

Expand Down
2 changes: 1 addition & 1 deletion library/std/src/sync/mpmc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ mod zero;
use crate::fmt;
use crate::panic::{RefUnwindSafe, UnwindSafe};
use crate::time::{Duration, Instant};
use error::*;
pub use error::*;

/// Creates a channel of unbounded capacity.
///
Expand Down
31 changes: 15 additions & 16 deletions library/std/src/sync/mpmc/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,8 @@ impl<T> DerefMut for CachePadded<T> {
}

const SPIN_LIMIT: u32 = 6;
const YIELD_LIMIT: u32 = 10;

/// Performs exponential backoff in spin loops.
/// Performs quadratic backoff in spin loops.
pub struct Backoff {
step: Cell<u32>,
}
Expand All @@ -104,25 +103,27 @@ impl Backoff {
Backoff { step: Cell::new(0) }
}

/// Backs off in a lock-free loop.
/// Backs off using lightweight spinning.
///
/// This method should be used when we need to retry an operation because another thread made
/// progress.
/// This method should be used for:
/// - Retrying an operation because another thread made progress. i.e. on CAS failure.
/// - Waiting for an operation to complete by spinning optimistically for a few iterations
/// before falling back to parking the thread (see `Backoff::is_completed`).
#[inline]
pub fn spin(&self) {
pub fn spin_light(&self) {
let step = self.step.get().min(SPIN_LIMIT);
for _ in 0..step.pow(2) {
crate::hint::spin_loop();
}

if self.step.get() <= SPIN_LIMIT {
self.step.set(self.step.get() + 1);
}
self.step.set(self.step.get() + 1);
}

/// Backs off in a blocking loop.
/// Backs off using heavyweight spinning.
///
/// This method should be used in blocking loops where parking the thread is not an option.
#[inline]
pub fn snooze(&self) {
pub fn spin_heavy(&self) {
if self.step.get() <= SPIN_LIMIT {
for _ in 0..self.step.get().pow(2) {
crate::hint::spin_loop()
Expand All @@ -131,14 +132,12 @@ impl Backoff {
crate::thread::yield_now();
}

if self.step.get() <= YIELD_LIMIT {
self.step.set(self.step.get() + 1);
}
self.step.set(self.step.get() + 1);
}

/// Returns `true` if quadratic backoff has completed and blocking the thread is advised.
/// Returns `true` if quadratic backoff has completed and parking the thread is advised.
#[inline]
pub fn is_completed(&self) -> bool {
self.step.get() > YIELD_LIMIT
self.step.get() > SPIN_LIMIT
}
}
2 changes: 1 addition & 1 deletion library/std/src/sync/mpmc/zero.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl<T> Packet<T> {
fn wait_ready(&self) {
let backoff = Backoff::new();
while !self.ready.load(Ordering::Acquire) {
backoff.snooze();
backoff.spin_heavy();
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions library/std/src/sync/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,15 @@ impl<T> SyncSender<T> {
pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
self.inner.try_send(t)
}

// Attempts to send for a value on this receiver, returning an error if the
// corresponding channel has hung up, or if it waits more than `timeout`.
//
// This method is currently private and only used for tests.
#[allow(unused)]
fn send_timeout(&self, t: T, timeout: Duration) -> Result<(), mpmc::SendTimeoutError<T>> {
self.inner.send_timeout(t, timeout)
}
}

#[stable(feature = "rust1", since = "1.0.0")]
Expand Down
8 changes: 8 additions & 0 deletions library/std/src/sync/mpsc/sync_tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::*;
use crate::env;
use crate::sync::mpmc::SendTimeoutError;
use crate::thread;
use crate::time::Duration;

Expand Down Expand Up @@ -41,6 +42,13 @@ fn recv_timeout() {
assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
}

#[test]
fn send_timeout() {
let (tx, _rx) = sync_channel::<i32>(1);
assert_eq!(tx.send_timeout(1, Duration::from_millis(1)), Ok(()));
assert_eq!(tx.send_timeout(1, Duration::from_millis(1)), Err(SendTimeoutError::Timeout(1)));
}

#[test]
fn smoke_threads() {
let (tx, rx) = sync_channel::<i32>(0);
Expand Down