Skip to content

Commit

Permalink
rt(alt): fix a number of concurrency bugs (#5907)
Browse files Browse the repository at this point in the history
Expands loom coverage and fixes a number of bugs.

Closes #5888
  • Loading branch information
carllerche committed Aug 4, 2023
1 parent dbda204 commit 8832e93
Show file tree
Hide file tree
Showing 12 changed files with 330 additions and 112 deletions.
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ wasm-bindgen-test = "0.3.0"
mio-aio = { version = "0.7.0", features = ["tokio"] }

[target.'cfg(loom)'.dev-dependencies]
loom = { version = "0.6", features = ["futures", "checkpoint"] }
loom = { version = "0.7", features = ["futures", "checkpoint"] }

[package.metadata.docs.rs]
all-features = true
Expand Down
1 change: 1 addition & 0 deletions tokio/src/loom/mocked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub(crate) mod sync {
}

#[inline]
#[track_caller]
pub(crate) fn lock(&self) -> MutexGuard<'_, T> {
self.0.lock().unwrap()
}
Expand Down
19 changes: 19 additions & 0 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ pub struct Builder {
/// How many ticks before yielding to the driver for timer and I/O events?
pub(super) event_interval: u32,

pub(super) local_queue_capacity: usize,

/// When true, the multi-threade scheduler LIFO slot should not be used.
///
/// This option should only be exposed as unstable.
Expand Down Expand Up @@ -297,6 +299,12 @@ impl Builder {
global_queue_interval: None,
event_interval,

#[cfg(not(loom))]
local_queue_capacity: 256,

#[cfg(loom)]
local_queue_capacity: 4,

seed_generator: RngSeedGenerator::new(RngSeed::new()),

#[cfg(tokio_unstable)]
Expand Down Expand Up @@ -1046,6 +1054,14 @@ impl Builder {
}
}

cfg_loom! {
pub(crate) fn local_queue_capacity(&mut self, value: usize) -> &mut Self {
assert!(value.is_power_of_two());
self.local_queue_capacity = value;
self
}
}

fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::scheduler::{self, CurrentThread};
use crate::runtime::{runtime::Scheduler, Config};
Expand Down Expand Up @@ -1074,6 +1090,7 @@ impl Builder {
after_unpark: self.after_unpark.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
local_queue_capacity: self.local_queue_capacity,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
Expand Down Expand Up @@ -1224,6 +1241,7 @@ cfg_rt_multi_thread! {
after_unpark: self.after_unpark.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
local_queue_capacity: self.local_queue_capacity,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
Expand Down Expand Up @@ -1271,6 +1289,7 @@ cfg_rt_multi_thread! {
after_unpark: self.after_unpark.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
local_queue_capacity: self.local_queue_capacity,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
Expand Down
8 changes: 7 additions & 1 deletion tokio/src/runtime/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#![cfg_attr(any(not(feature = "full"), target_family = "wasm"), allow(dead_code))]
#![cfg_attr(
any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"),
allow(dead_code)
)]
use crate::runtime::Callback;
use crate::util::RngSeedGenerator;

Expand All @@ -9,6 +12,9 @@ pub(crate) struct Config {
/// How many ticks before yielding to the driver for timer and I/O events?
pub(crate) event_interval: u32,

/// How big to make each worker's local queue
pub(crate) local_queue_capacity: usize,

/// Callback for a worker parking itself
pub(crate) before_park: Option<Callback>,

Expand Down
28 changes: 27 additions & 1 deletion tokio/src/runtime/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

// Eventually, this file will see significant refactoring / cleanup. For now, we
// don't need to worry much about dead code with certain feature permutations.
#![cfg_attr(not(feature = "full"), allow(dead_code))]
#![cfg_attr(
any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"),
allow(dead_code)
)]

use crate::runtime::park::{ParkThread, UnparkThread};

Expand Down Expand Up @@ -58,6 +61,10 @@ impl Driver {
))
}

pub(crate) fn is_enabled(&self) -> bool {
self.inner.is_enabled()
}

pub(crate) fn park(&mut self, handle: &Handle) {
self.inner.park(handle)
}
Expand Down Expand Up @@ -154,6 +161,13 @@ cfg_io_driver! {
}

impl IoStack {
pub(crate) fn is_enabled(&self) -> bool {
match self {
IoStack::Enabled(..) => true,
IoStack::Disabled(..) => false,
}
}

pub(crate) fn park(&mut self, handle: &Handle) {
match self {
IoStack::Enabled(v) => v.park(handle),
Expand Down Expand Up @@ -217,6 +231,11 @@ cfg_not_io_driver! {
pub(crate) fn shutdown(&mut self, _handle: &Handle) {
self.0.shutdown();
}

/// This is not a "real" driver, so it is not considered enabled.
pub(crate) fn is_enabled(&self) -> bool {
false
}
}
}

Expand Down Expand Up @@ -298,6 +317,13 @@ cfg_time! {
}

impl TimeDriver {
pub(crate) fn is_enabled(&self) -> bool {
match self {
TimeDriver::Enabled { .. } => true,
TimeDriver::Disabled(inner) => inner.is_enabled(),
}
}

pub(crate) fn park(&mut self, handle: &Handle) {
match self {
TimeDriver::Enabled { driver, .. } => driver.park(handle),
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/runtime/scheduler/inject/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ impl<T: 'static> Shared<T> {
pub(crate) unsafe fn pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T> {
use std::cmp;

debug_assert!(n > 0);

// safety: All updates to the len atomic are guarded by the mutex. As
// such, a non-atomic load followed by a store is safe.
let len = self.len.unsync_load();
Expand Down
9 changes: 9 additions & 0 deletions tokio/src/runtime/scheduler/inject/synced.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
#![cfg_attr(
any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"),
allow(dead_code)
)]

use crate::runtime::task;

pub(crate) struct Synced {
Expand Down Expand Up @@ -29,4 +34,8 @@ impl Synced {
// safety: a `Notified` is pushed into the queue and now it is popped!
Some(unsafe { task::Notified::from_raw(task) })
}

pub(crate) fn is_empty(&self) -> bool {
self.head.is_none()
}
}
28 changes: 13 additions & 15 deletions tokio/src/runtime/scheduler/multi_thread_alt/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ impl Idle {
(idle, synced)
}

pub(super) fn needs_searching(&self) -> bool {
self.needs_searching.load(Acquire)
}

pub(super) fn num_idle(&self, synced: &Synced) -> usize {
#[cfg(not(loom))]
debug_assert_eq!(synced.available_cores.len(), self.num_idle.load(Acquire));
synced.available_cores.len()
}
Expand Down Expand Up @@ -131,13 +136,7 @@ impl Idle {
}

// We need to establish a stronger barrier than with `notify_local`
if self
.num_searching
.compare_exchange(0, 1, AcqRel, Acquire)
.is_err()
{
return;
}
self.num_searching.fetch_add(1, AcqRel);

self.notify_synced(synced, shared);
}
Expand All @@ -158,6 +157,7 @@ impl Idle {
synced.assigned_cores[worker] = Some(core);

let num_idle = synced.idle.available_cores.len();
#[cfg(not(loom))]
debug_assert_eq!(num_idle, self.num_idle.load(Acquire) - 1);

// Update the number of sleeping workers
Expand Down Expand Up @@ -221,6 +221,7 @@ impl Idle {
let num_idle = synced.idle.available_cores.len();
self.num_idle.store(num_idle, Release);
} else {
#[cfg(not(loom))]
debug_assert_eq!(
synced.idle.available_cores.len(),
self.num_idle.load(Acquire)
Expand Down Expand Up @@ -260,11 +261,11 @@ impl Idle {
// The core should not be searching at this point
debug_assert!(!core.is_searching);

// Check that this isn't the final worker to go idle *and*
// `needs_searching` is set.
debug_assert!(!self.needs_searching.load(Acquire) || num_active_workers(&synced.idle) > 1);
// Check that there are no pending tasks in the global queue
debug_assert!(synced.inject.is_empty());

let num_idle = synced.idle.available_cores.len();
#[cfg(not(loom))]
debug_assert_eq!(num_idle, self.num_idle.load(Acquire));

self.idle_map.set(core.index);
Expand Down Expand Up @@ -314,7 +315,7 @@ impl Idle {
}
}

fn transition_worker_to_searching(&self, core: &mut Core) {
pub(super) fn transition_worker_to_searching(&self, core: &mut Core) {
core.is_searching = true;
self.num_searching.fetch_add(1, AcqRel);
self.needs_searching.store(false, Release);
Expand All @@ -324,10 +325,7 @@ impl Idle {
///
/// Returns `true` if this is the final searching worker. The caller
/// **must** notify a new worker.
pub(super) fn transition_worker_from_searching(&self, core: &mut Core) -> bool {
debug_assert!(core.is_searching);
core.is_searching = false;

pub(super) fn transition_worker_from_searching(&self) -> bool {
let prev = self.num_searching.fetch_sub(1, AcqRel);
debug_assert!(prev > 0);

Expand Down
Loading

0 comments on commit 8832e93

Please sign in to comment.