diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 80a1a090041..1b094af983d 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -157,7 +157,7 @@ wasm-bindgen-test = "0.3.0" mio-aio = { version = "0.7.0", features = ["tokio"] } [target.'cfg(loom)'.dev-dependencies] -loom = { version = "0.6", features = ["futures", "checkpoint"] } +loom = { version = "0.7", features = ["futures", "checkpoint"] } [package.metadata.docs.rs] all-features = true diff --git a/tokio/src/loom/mocked.rs b/tokio/src/loom/mocked.rs index 56dc1a06344..d40e2c1f8ea 100644 --- a/tokio/src/loom/mocked.rs +++ b/tokio/src/loom/mocked.rs @@ -15,6 +15,7 @@ pub(crate) mod sync { } #[inline] + #[track_caller] pub(crate) fn lock(&self) -> MutexGuard<'_, T> { self.0.lock().unwrap() } diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 38462ef485d..9b76867d279 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -93,6 +93,8 @@ pub struct Builder { /// How many ticks before yielding to the driver for timer and I/O events? pub(super) event_interval: u32, + pub(super) local_queue_capacity: usize, + /// When true, the multi-threade scheduler LIFO slot should not be used. /// /// This option should only be exposed as unstable. @@ -297,6 +299,12 @@ impl Builder { global_queue_interval: None, event_interval, + #[cfg(not(loom))] + local_queue_capacity: 256, + + #[cfg(loom)] + local_queue_capacity: 4, + seed_generator: RngSeedGenerator::new(RngSeed::new()), #[cfg(tokio_unstable)] @@ -1046,6 +1054,14 @@ impl Builder { } } + cfg_loom! { + pub(crate) fn local_queue_capacity(&mut self, value: usize) -> &mut Self { + assert!(value.is_power_of_two()); + self.local_queue_capacity = value; + self + } + } + fn build_current_thread_runtime(&mut self) -> io::Result { use crate::runtime::scheduler::{self, CurrentThread}; use crate::runtime::{runtime::Scheduler, Config}; @@ -1074,6 +1090,7 @@ impl Builder { after_unpark: self.after_unpark.clone(), global_queue_interval: self.global_queue_interval, event_interval: self.event_interval, + local_queue_capacity: self.local_queue_capacity, #[cfg(tokio_unstable)] unhandled_panic: self.unhandled_panic.clone(), disable_lifo_slot: self.disable_lifo_slot, @@ -1224,6 +1241,7 @@ cfg_rt_multi_thread! { after_unpark: self.after_unpark.clone(), global_queue_interval: self.global_queue_interval, event_interval: self.event_interval, + local_queue_capacity: self.local_queue_capacity, #[cfg(tokio_unstable)] unhandled_panic: self.unhandled_panic.clone(), disable_lifo_slot: self.disable_lifo_slot, @@ -1271,6 +1289,7 @@ cfg_rt_multi_thread! { after_unpark: self.after_unpark.clone(), global_queue_interval: self.global_queue_interval, event_interval: self.event_interval, + local_queue_capacity: self.local_queue_capacity, #[cfg(tokio_unstable)] unhandled_panic: self.unhandled_panic.clone(), disable_lifo_slot: self.disable_lifo_slot, diff --git a/tokio/src/runtime/config.rs b/tokio/src/runtime/config.rs index acae6ef5a55..d846a0d224a 100644 --- a/tokio/src/runtime/config.rs +++ b/tokio/src/runtime/config.rs @@ -1,4 +1,7 @@ -#![cfg_attr(any(not(feature = "full"), target_family = "wasm"), allow(dead_code))] +#![cfg_attr( + any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"), + allow(dead_code) +)] use crate::runtime::Callback; use crate::util::RngSeedGenerator; @@ -9,6 +12,9 @@ pub(crate) struct Config { /// How many ticks before yielding to the driver for timer and I/O events? pub(crate) event_interval: u32, + /// How big to make each worker's local queue + pub(crate) local_queue_capacity: usize, + /// Callback for a worker parking itself pub(crate) before_park: Option, diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index 572fdefb0da..0474c2b3e1d 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -2,7 +2,10 @@ // Eventually, this file will see significant refactoring / cleanup. For now, we // don't need to worry much about dead code with certain feature permutations. -#![cfg_attr(not(feature = "full"), allow(dead_code))] +#![cfg_attr( + any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"), + allow(dead_code) +)] use crate::runtime::park::{ParkThread, UnparkThread}; @@ -58,6 +61,10 @@ impl Driver { )) } + pub(crate) fn is_enabled(&self) -> bool { + self.inner.is_enabled() + } + pub(crate) fn park(&mut self, handle: &Handle) { self.inner.park(handle) } @@ -154,6 +161,13 @@ cfg_io_driver! { } impl IoStack { + pub(crate) fn is_enabled(&self) -> bool { + match self { + IoStack::Enabled(..) => true, + IoStack::Disabled(..) => false, + } + } + pub(crate) fn park(&mut self, handle: &Handle) { match self { IoStack::Enabled(v) => v.park(handle), @@ -217,6 +231,11 @@ cfg_not_io_driver! { pub(crate) fn shutdown(&mut self, _handle: &Handle) { self.0.shutdown(); } + + /// This is not a "real" driver, so it is not considered enabled. + pub(crate) fn is_enabled(&self) -> bool { + false + } } } @@ -298,6 +317,13 @@ cfg_time! { } impl TimeDriver { + pub(crate) fn is_enabled(&self) -> bool { + match self { + TimeDriver::Enabled { .. } => true, + TimeDriver::Disabled(inner) => inner.is_enabled(), + } + } + pub(crate) fn park(&mut self, handle: &Handle) { match self { TimeDriver::Enabled { driver, .. } => driver.park(handle), diff --git a/tokio/src/runtime/scheduler/inject/shared.rs b/tokio/src/runtime/scheduler/inject/shared.rs index 5a7b9c6a903..2d29486db73 100644 --- a/tokio/src/runtime/scheduler/inject/shared.rs +++ b/tokio/src/runtime/scheduler/inject/shared.rs @@ -109,6 +109,8 @@ impl Shared { pub(crate) unsafe fn pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T> { use std::cmp; + debug_assert!(n > 0); + // safety: All updates to the len atomic are guarded by the mutex. As // such, a non-atomic load followed by a store is safe. let len = self.len.unsync_load(); diff --git a/tokio/src/runtime/scheduler/inject/synced.rs b/tokio/src/runtime/scheduler/inject/synced.rs index 6847f68e5db..45f603878de 100644 --- a/tokio/src/runtime/scheduler/inject/synced.rs +++ b/tokio/src/runtime/scheduler/inject/synced.rs @@ -1,3 +1,8 @@ +#![cfg_attr( + any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"), + allow(dead_code) +)] + use crate::runtime::task; pub(crate) struct Synced { @@ -29,4 +34,8 @@ impl Synced { // safety: a `Notified` is pushed into the queue and now it is popped! Some(unsafe { task::Notified::from_raw(task) }) } + + pub(crate) fn is_empty(&self) -> bool { + self.head.is_none() + } } diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/idle.rs b/tokio/src/runtime/scheduler/multi_thread_alt/idle.rs index c051bc4275f..9f08a8cdfbc 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/idle.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/idle.rs @@ -60,7 +60,12 @@ impl Idle { (idle, synced) } + pub(super) fn needs_searching(&self) -> bool { + self.needs_searching.load(Acquire) + } + pub(super) fn num_idle(&self, synced: &Synced) -> usize { + #[cfg(not(loom))] debug_assert_eq!(synced.available_cores.len(), self.num_idle.load(Acquire)); synced.available_cores.len() } @@ -131,13 +136,7 @@ impl Idle { } // We need to establish a stronger barrier than with `notify_local` - if self - .num_searching - .compare_exchange(0, 1, AcqRel, Acquire) - .is_err() - { - return; - } + self.num_searching.fetch_add(1, AcqRel); self.notify_synced(synced, shared); } @@ -158,6 +157,7 @@ impl Idle { synced.assigned_cores[worker] = Some(core); let num_idle = synced.idle.available_cores.len(); + #[cfg(not(loom))] debug_assert_eq!(num_idle, self.num_idle.load(Acquire) - 1); // Update the number of sleeping workers @@ -221,6 +221,7 @@ impl Idle { let num_idle = synced.idle.available_cores.len(); self.num_idle.store(num_idle, Release); } else { + #[cfg(not(loom))] debug_assert_eq!( synced.idle.available_cores.len(), self.num_idle.load(Acquire) @@ -260,11 +261,11 @@ impl Idle { // The core should not be searching at this point debug_assert!(!core.is_searching); - // Check that this isn't the final worker to go idle *and* - // `needs_searching` is set. - debug_assert!(!self.needs_searching.load(Acquire) || num_active_workers(&synced.idle) > 1); + // Check that there are no pending tasks in the global queue + debug_assert!(synced.inject.is_empty()); let num_idle = synced.idle.available_cores.len(); + #[cfg(not(loom))] debug_assert_eq!(num_idle, self.num_idle.load(Acquire)); self.idle_map.set(core.index); @@ -314,7 +315,7 @@ impl Idle { } } - fn transition_worker_to_searching(&self, core: &mut Core) { + pub(super) fn transition_worker_to_searching(&self, core: &mut Core) { core.is_searching = true; self.num_searching.fetch_add(1, AcqRel); self.needs_searching.store(false, Release); @@ -324,10 +325,7 @@ impl Idle { /// /// Returns `true` if this is the final searching worker. The caller /// **must** notify a new worker. - pub(super) fn transition_worker_from_searching(&self, core: &mut Core) -> bool { - debug_assert!(core.is_searching); - core.is_searching = false; - + pub(super) fn transition_worker_from_searching(&self) -> bool { let prev = self.num_searching.fetch_sub(1, AcqRel); debug_assert!(prev > 0); diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/queue.rs b/tokio/src/runtime/scheduler/multi_thread_alt/queue.rs index d4acc408183..e41e9fdb3a6 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/queue.rs @@ -54,46 +54,30 @@ pub(crate) struct Inner { tail: AtomicUnsignedShort, /// Elements - buffer: Box<[UnsafeCell>>; LOCAL_QUEUE_CAPACITY]>, + buffer: Box<[UnsafeCell>>]>, + + mask: usize, } unsafe impl Send for Inner {} unsafe impl Sync for Inner {} -#[cfg(not(loom))] -const LOCAL_QUEUE_CAPACITY: usize = 256; - -// Shrink the size of the local queue when using loom. This shouldn't impact -// logic, but allows loom to test more edge cases in a reasonable a mount of -// time. -#[cfg(loom)] -const LOCAL_QUEUE_CAPACITY: usize = 4; - -const MASK: usize = LOCAL_QUEUE_CAPACITY - 1; - -// Constructing the fixed size array directly is very awkward. The only way to -// do it is to repeat `UnsafeCell::new(MaybeUninit::uninit())` 256 times, as -// the contents are not Copy. The trick with defining a const doesn't work for -// generic types. -fn make_fixed_size(buffer: Box<[T]>) -> Box<[T; LOCAL_QUEUE_CAPACITY]> { - assert_eq!(buffer.len(), LOCAL_QUEUE_CAPACITY); - - // safety: We check that the length is correct. - unsafe { Box::from_raw(Box::into_raw(buffer).cast()) } -} - /// Create a new local run-queue -pub(crate) fn local() -> (Steal, Local) { - let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY); +pub(crate) fn local(capacity: usize) -> (Steal, Local) { + assert!(capacity <= 4096); + assert!(capacity >= 1); + + let mut buffer = Vec::with_capacity(capacity); - for _ in 0..LOCAL_QUEUE_CAPACITY { + for _ in 0..capacity { buffer.push(UnsafeCell::new(MaybeUninit::uninit())); } let inner = Arc::new(Inner { head: AtomicUnsignedLong::new(0), tail: AtomicUnsignedShort::new(0), - buffer: make_fixed_size(buffer.into_boxed_slice()), + buffer: buffer.into_boxed_slice(), + mask: capacity - 1, }); let local = Local { @@ -112,7 +96,7 @@ impl Local { } pub(crate) fn max_capacity(&self) -> usize { - LOCAL_QUEUE_CAPACITY + self.inner.buffer.len() } /// Returns `true` if there are no entries in the queue @@ -120,6 +104,10 @@ impl Local { self.inner.is_empty() } + pub(crate) fn can_steal(&self) -> bool { + self.remaining_slots() >= self.max_capacity() - self.max_capacity() / 2 + } + /// Pushes a batch of tasks to the back of the queue. All tasks must fit in /// the local queue. /// @@ -128,7 +116,7 @@ impl Local { /// The method panics if there is not enough capacity to fit in the queue. pub(crate) fn push_back(&mut self, tasks: impl ExactSizeIterator>) { let len = tasks.len(); - assert!(len <= LOCAL_QUEUE_CAPACITY); + assert!(len <= self.inner.buffer.len()); if len == 0 { // Nothing to do @@ -136,21 +124,24 @@ impl Local { } let head = self.inner.head.load(Acquire); - let (steal, _) = unpack(head); + let (steal, real) = unpack(head); // safety: this is the **only** thread that updates this cell. let mut tail = unsafe { self.inner.tail.unsync_load() }; - if tail.wrapping_sub(steal) <= (LOCAL_QUEUE_CAPACITY - len) as UnsignedShort { + if tail.wrapping_sub(steal) <= (self.inner.buffer.len() - len) as UnsignedShort { // Yes, this if condition is structured a bit weird (first block // does nothing, second returns an error). It is this way to match // `push_back_or_overflow`. } else { - panic!() + panic!( + "not enough capacity; len={}; tail={}; steal={}; real={}", + len, tail, steal, real + ); } for task in tasks { - let idx = tail as usize & MASK; + let idx = tail as usize & self.inner.mask; self.inner.buffer[idx].with_mut(|ptr| { // Write the task to the slot @@ -188,7 +179,7 @@ impl Local { // safety: this is the **only** thread that updates this cell. let tail = unsafe { self.inner.tail.unsync_load() }; - if tail.wrapping_sub(steal) < LOCAL_QUEUE_CAPACITY as UnsignedShort { + if tail.wrapping_sub(steal) < self.inner.buffer.len() as UnsignedShort { // There is capacity for the task break tail; } else if steal != real { @@ -217,7 +208,7 @@ impl Local { // Second half of `push_back` fn push_back_finish(&self, task: task::Notified, tail: UnsignedShort) { // Map the position to a slot index. - let idx = tail as usize & MASK; + let idx = tail as usize & self.inner.mask; self.inner.buffer[idx].with_mut(|ptr| { // Write the task to the slot @@ -250,15 +241,15 @@ impl Local { overflow: &O, stats: &mut Stats, ) -> Result<(), task::Notified> { - /// How many elements are we taking from the local queue. - /// - /// This is one less than the number of tasks pushed to the inject - /// queue as we are also inserting the `task` argument. - const NUM_TASKS_TAKEN: UnsignedShort = (LOCAL_QUEUE_CAPACITY / 2) as UnsignedShort; + // How many elements are we taking from the local queue. + // + // This is one less than the number of tasks pushed to the inject + // queue as we are also inserting the `task` argument. + let num_tasks_taken: UnsignedShort = (self.inner.buffer.len() / 2) as UnsignedShort; assert_eq!( tail.wrapping_sub(head) as usize, - LOCAL_QUEUE_CAPACITY, + self.inner.buffer.len(), "queue is not full; tail = {}; head = {}", tail, head @@ -282,8 +273,8 @@ impl Local { .compare_exchange( prev, pack( - head.wrapping_add(NUM_TASKS_TAKEN), - head.wrapping_add(NUM_TASKS_TAKEN), + head.wrapping_add(num_tasks_taken), + head.wrapping_add(num_tasks_taken), ), Release, Relaxed, @@ -298,19 +289,21 @@ impl Local { /// An iterator that takes elements out of the run queue. struct BatchTaskIter<'a, T: 'static> { - buffer: &'a [UnsafeCell>>; LOCAL_QUEUE_CAPACITY], + buffer: &'a [UnsafeCell>>], + mask: usize, head: UnsignedLong, i: UnsignedLong, + num: UnsignedShort, } impl<'a, T: 'static> Iterator for BatchTaskIter<'a, T> { type Item = task::Notified; #[inline] fn next(&mut self) -> Option> { - if self.i == UnsignedLong::from(NUM_TASKS_TAKEN) { + if self.i == UnsignedLong::from(self.num) { None } else { - let i_idx = self.i.wrapping_add(self.head) as usize & MASK; + let i_idx = self.i.wrapping_add(self.head) as usize & self.mask; let slot = &self.buffer[i_idx]; // safety: Our CAS from before has assumed exclusive ownership @@ -327,8 +320,10 @@ impl Local { // values again, and we are the only producer. let batch_iter = BatchTaskIter { buffer: &self.inner.buffer, + mask: self.inner.mask, head: head as UnsignedLong, i: 0, + num: num_tasks_taken, }; overflow.push_batch(batch_iter.chain(std::iter::once(task))); @@ -371,7 +366,7 @@ impl Local { .compare_exchange(head, next, AcqRel, Acquire); match res { - Ok(_) => break real as usize & MASK, + Ok(_) => break real as usize & self.inner.mask, Err(actual) => head = actual, } }; @@ -381,6 +376,10 @@ impl Local { } impl Steal { + pub(crate) fn is_empty(&self) -> bool { + self.0.is_empty() + } + /// Steals half the tasks from self and place them into `dst`. pub(crate) fn steal_into( &self, @@ -396,7 +395,7 @@ impl Steal { // from `dst` there may not be enough capacity to steal. let (steal, _) = unpack(dst.inner.head.load(Acquire)); - if dst_tail.wrapping_sub(steal) > LOCAL_QUEUE_CAPACITY as UnsignedShort / 2 { + if dst_tail.wrapping_sub(steal) > self.0.buffer.len() as UnsignedShort / 2 { // we *could* try to steal less here, but for simplicity, we're just // going to abort. return None; @@ -420,7 +419,7 @@ impl Steal { n -= 1; let ret_pos = dst_tail.wrapping_add(n); - let ret_idx = ret_pos as usize & MASK; + let ret_idx = ret_pos as usize & dst.inner.mask; // safety: the value was written as part of `steal_into2` and not // exposed to stealers, so no other thread can access it. @@ -481,8 +480,8 @@ impl Steal { } }; - assert!( - n <= LOCAL_QUEUE_CAPACITY as UnsignedShort / 2, + debug_assert!( + n <= (self.0.buffer.len() - self.0.buffer.len() / 2) as UnsignedShort, "actual = {}", n ); @@ -496,8 +495,8 @@ impl Steal { let dst_pos = dst_tail.wrapping_add(i); // Map to slots - let src_idx = src_pos as usize & MASK; - let dst_idx = dst_pos as usize & MASK; + let src_idx = src_pos as usize & self.0.mask; + let dst_idx = dst_pos as usize & self.0.mask; // Read the task // @@ -566,7 +565,7 @@ impl Inner { let (steal, _) = unpack(self.head.load(Acquire)); let tail = self.tail.load(Acquire); - LOCAL_QUEUE_CAPACITY - (tail.wrapping_sub(steal) as usize) + self.buffer.len() - (tail.wrapping_sub(steal) as usize) } fn len(&self) -> UnsignedShort { @@ -594,8 +593,3 @@ fn unpack(n: UnsignedLong) -> (UnsignedShort, UnsignedShort) { fn pack(steal: UnsignedShort, real: UnsignedShort) -> UnsignedLong { (real as UnsignedLong) | ((steal as UnsignedLong) << (mem::size_of::() * 8)) } - -#[test] -fn test_local_queue_capacity() { - assert!(LOCAL_QUEUE_CAPACITY - 1 <= u8::MAX as usize); -} diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index 4d7bfa97eaf..e9bb4fd9f33 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -190,6 +190,9 @@ pub(crate) struct Synced { /// stolen by a thread that was spawned as part of `block_in_place`. shutdown_cores: Vec>, + /// The driver goes here when shutting down + shutdown_driver: Option>, + /// Synchronized state for `Idle`. pub(super) idle: idle::Synced, @@ -258,9 +261,14 @@ pub(super) fn create( seed_generator: RngSeedGenerator, config: Config, ) -> runtime::Handle { - // Allocate num_cores + 1 workers so that one worker can handle the I/O - // driver, if needed. - let num_workers = num_cores + 1; + let mut num_workers = num_cores; + + // If the driver is enabled, we need an extra thread to handle polling the + // driver when all cores are busy. + if driver.is_enabled() { + num_workers += 1; + } + let mut cores = Vec::with_capacity(num_cores); let mut remotes = Vec::with_capacity(num_cores); // Worker metrics are actually core based @@ -268,7 +276,7 @@ pub(super) fn create( // Create the local queues for i in 0..num_cores { - let (steal, run_queue) = queue::local(); + let (steal, run_queue) = queue::local(config.local_queue_capacity); let metrics = WorkerMetrics::from_config(&config); let stats = Stats::new(&metrics); @@ -303,6 +311,7 @@ pub(super) fn create( synced: Mutex::new(Synced { assigned_cores: (0..num_workers).map(|_| None).collect(), shutdown_cores: Vec::with_capacity(num_cores), + shutdown_driver: None, idle: idle_synced, inject: inject_synced, }), @@ -616,6 +625,13 @@ impl Worker { cx: &Context, mut synced: MutexGuard<'_, Synced>, ) -> NextTaskResult { + if cx.shared().idle.needs_searching() { + if let Some(mut core) = self.try_acquire_available_core(cx, &mut synced) { + cx.shared().idle.transition_worker_to_searching(&mut core); + return Ok((None, core)); + } + } + cx.shared() .idle .transition_worker_to_parked(&mut synced, cx.index); @@ -642,7 +658,7 @@ impl Worker { return Ok((None, core)); } - let n = core.run_queue.max_capacity() / 2; + let n = cmp::max(core.run_queue.remaining_slots() / 2, 1); let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n); Ok((maybe_task, core)) @@ -658,6 +674,7 @@ impl Worker { self.reset_lifo_enabled(cx); // At this point, the local queue should be empty + #[cfg(not(loom))] debug_assert!(core.run_queue.is_empty()); // Update shutdown state while locked @@ -761,7 +778,7 @@ impl Worker { // available slots in the queue. let cap = usize::min( core.run_queue.remaining_slots(), - core.run_queue.max_capacity() / 2, + usize::max(core.run_queue.max_capacity() / 2, 1), ); let mut synced = cx.shared().synced.lock(); @@ -787,7 +804,7 @@ impl Worker { cx.shared().inject.len() / cx.shared().remotes.len() + 1 }; - let n = usize::min(n, max); + let n = usize::min(n, max) + 1; // safety: passing in the correct `inject::Synced`. let mut tasks = unsafe { cx.shared().inject.pop_n(&mut synced.inject, n) }; @@ -822,8 +839,13 @@ impl Worker { const ROUNDS: usize = 1; debug_assert!(core.lifo_slot.is_none()); + #[cfg(not(loom))] debug_assert!(core.run_queue.is_empty()); + if !core.run_queue.can_steal() { + return Ok((None, core)); + } + if !self.transition_to_searching(cx, &mut core) { return Ok((None, core)); } @@ -1135,22 +1157,15 @@ impl Worker { fn do_park(&mut self, cx: &Context, mut core: Box) -> NextTaskResult { let was_searching = core.is_searching; - // Before we park, if we are searching, we need to transition away from searching - if self.transition_from_searching(cx, &mut core) { - cx.shared().idle.snapshot(&mut self.idle_snapshot); - // We were the last searching worker, we need to do one last check - if let Some(task) = self.steal_one_round(cx, &mut core, 0) { - cx.shared().notify_parked_local(); - - return Ok((Some(task), core)); - } - } - // Acquire the lock let mut synced = cx.shared().synced.lock(); + // The local queue should be empty at this point + #[cfg(not(loom))] + debug_assert!(core.run_queue.is_empty()); + // Try one last time to get tasks - let n = core.run_queue.max_capacity() / 2; + let n = cmp::max(core.run_queue.remaining_slots() / 2, 1); if let Some(task) = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n) { return Ok((Some(task), core)); } @@ -1178,30 +1193,49 @@ impl Worker { return Ok((None, core)); } - // Core being returned must not be in the searching state - debug_assert!(!core.is_searching); - // Release the core + core.is_searching = false; cx.shared().idle.release_core(&mut synced, core); - if let Some(mut driver) = cx.shared().driver.take() { - // Drop the lock before parking on the driver - drop(synced); + drop(synced); + if was_searching { + if cx.shared().idle.transition_worker_from_searching() { + // cx.shared().idle.snapshot(&mut self.idle_snapshot); + // We were the last searching worker, we need to do one last check + for i in 0..cx.shared().remotes.len() { + if !cx.shared().remotes[i].steal.is_empty() { + let mut synced = cx.shared().synced.lock(); + + // Try to get a core + if let Some(mut core) = self.try_acquire_available_core(cx, &mut synced) { + cx.shared().idle.transition_worker_to_searching(&mut core); + return Ok((None, core)); + } else { + // Fall back to the park routine + break; + } + } + } + } + } + + if let Some(mut driver) = cx.shared().take_driver() { // Wait for driver events driver.park(&cx.handle.driver); synced = cx.shared().synced.lock(); - // Put the driver back - cx.shared().driver.set(driver); - if cx.shared().inject.is_closed(&mut synced.inject) { + synced.shutdown_driver = Some(driver); self.shutdown_clear_defer(cx); self.shutdown_finalize(cx, synced); return Err(()); } + // Put the driver back + cx.shared().driver.set(driver); + // Try to acquire an available core to schedule I/O events if let Some(core) = self.try_acquire_available_core(cx, &mut synced) { // This may result in a task being run @@ -1214,6 +1248,8 @@ impl Worker { self.wait_for_core(cx, synced) } } else { + synced = cx.shared().synced.lock(); + // Wait for a core to be assigned to us self.wait_for_core(cx, synced) } @@ -1233,7 +1269,8 @@ impl Worker { return false; } - cx.shared().idle.transition_worker_from_searching(core) + core.is_searching = false; + cx.shared().idle.transition_worker_from_searching() } fn can_transition_to_parked(&self, core: &mut Core) -> bool { @@ -1270,10 +1307,11 @@ impl Worker { return; } - let mut driver = match cx.shared().driver.take() { - Some(driver) => driver, - None => return, - }; + let driver = synced.shutdown_driver.take(); + + if cx.shared().driver_enabled() && driver.is_none() { + return; + } debug_assert!(cx.shared().owned.is_empty()); @@ -1283,7 +1321,9 @@ impl Worker { } // Shutdown the driver - driver.shutdown(&cx.handle.driver); + if let Some(mut driver) = driver { + driver.shutdown(&cx.handle.driver); + } // Drain the injection queue // @@ -1412,6 +1452,10 @@ impl Shared { pub(super) fn close(&self) { let mut synced = self.synced.lock(); + if let Some(driver) = self.driver.take() { + synced.shutdown_driver = Some(driver); + } + if self.inject.close(&mut synced.inject) { // Set the shutdown flag on all available cores self.idle.shutdown(&mut synced, self); @@ -1442,6 +1486,18 @@ impl Shared { self.inject.push_batch(&mut synced.inject, iter); } } + + fn take_driver(&self) -> Option> { + if !self.driver_enabled() { + return None; + } + + self.driver.take() + } + + fn driver_enabled(&self) -> bool { + self.condvars.len() > self.remotes.len() + } } impl Overflow> for Shared { diff --git a/tokio/src/runtime/tests/loom_multi_thread_alt.rs b/tokio/src/runtime/tests/loom_multi_thread_alt.rs index 6ab066ab6f6..1b9c3b477c6 100644 --- a/tokio/src/runtime/tests/loom_multi_thread_alt.rs +++ b/tokio/src/runtime/tests/loom_multi_thread_alt.rs @@ -309,6 +309,113 @@ mod group_c { drop(pool); }); } + + #[test] + fn fill_local_queue() { + const NUM_SPAWNS: usize = 3; + loom::model(|| { + // using std versions here as it is just to control shutdown. + let cnt = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let (tx, rx) = oneshot::channel(); + let tx = AtomicOneshot::new(tx); + + let pool = runtime::Builder::new_multi_thread_alt() + .worker_threads(2) + // Set the intervals to avoid tuning logic + .global_queue_interval(61) + .local_queue_capacity(1) + .build() + .unwrap(); + + for _ in 0..NUM_SPAWNS { + let cnt = cnt.clone(); + let tx = tx.clone(); + pool.spawn(track(async move { + if NUM_SPAWNS == 1 + cnt.fetch_add(1, Relaxed) { + tx.assert_send(()); + } + })); + } + + rx.recv(); + }); + } + + // This tests a very specific case that happened when a worker has no more + // available work to process because a peer is in the process of stealing + // (but does not finish stealing), and the worker happens to find more work + // from the injection queue *right* before parking. + #[test] + fn pool_concurrent_park_with_steal_with_inject() { + const DEPTH: usize = 4; + + let mut model = loom::model::Builder::new(); + model.expect_explicit_explore = true; + model.preemption_bound = Some(3); + + model.check(|| { + let pool = runtime::Builder::new_multi_thread_alt() + .worker_threads(2) + // Set the intervals to avoid tuning logic + .global_queue_interval(61) + .local_queue_capacity(DEPTH) + .build() + .unwrap(); + + // Use std types to avoid adding backtracking. + type Flag = std::sync::Arc; + let flag: Flag = Default::default(); + let flag1 = flag.clone(); + + let (tx1, rx1) = oneshot::channel(); + + async fn task(expect: isize, flag: Flag) { + if expect == flag.load(Relaxed) { + flag.store(expect + 1, Relaxed); + } else { + flag.store(-1, Relaxed); + loom::skip_branch(); + } + } + + pool.spawn(track(async move { + let flag = flag1; + // First 2 spawned task should be stolen + crate::spawn(task(1, flag.clone())); + crate::spawn(task(2, flag.clone())); + crate::spawn(async move { + task(0, flag.clone()).await; + tx1.send(()); + }); + + // One to fill the LIFO slot + crate::spawn(async move {}); + + loom::explore(); + })); + + rx1.recv(); + + if 1 == flag.load(Relaxed) { + loom::stop_exploring(); + + let (tx3, rx3) = oneshot::channel(); + pool.spawn(async move { + loom::skip_branch(); + tx3.send(()); + }); + + pool.spawn(async {}); + pool.spawn(async {}); + + loom::explore(); + + rx3.recv(); + } else { + loom::skip_branch(); + } + }); + } } mod group_d { diff --git a/tokio/tests/rt_threaded_alt.rs b/tokio/tests/rt_threaded_alt.rs index b0eb4279af0..3e3ac076290 100644 --- a/tokio/tests/rt_threaded_alt.rs +++ b/tokio/tests/rt_threaded_alt.rs @@ -322,7 +322,7 @@ fn start_stop_callbacks_called() { } #[test] -fn blocking() { +fn blocking_task() { // used for notifying the main thread const NUM: usize = 1_000;