diff --git a/tokio/src/runtime/scheduler/inject.rs b/tokio/src/runtime/scheduler/inject.rs index 4b65449e7e7..39976fcd7a2 100644 --- a/tokio/src/runtime/scheduler/inject.rs +++ b/tokio/src/runtime/scheduler/inject.rs @@ -1,292 +1,72 @@ //! Inject queue used to send wakeups to a work-stealing scheduler -use crate::loom::sync::atomic::AtomicUsize; -use crate::loom::sync::{Mutex, MutexGuard}; +use crate::loom::sync::Mutex; use crate::runtime::task; -use std::marker::PhantomData; -use std::sync::atomic::Ordering::{Acquire, Release}; +mod pop; +pub(crate) use pop::Pop; -/// Growable, MPMC queue used to inject new tasks into the scheduler and as an -/// overflow queue when the local, fixed-size, array queue overflows. -pub(crate) struct Inject { - /// Pointers to the head and tail of the queue. - pointers: Mutex, +mod shared; +pub(crate) use shared::Shared; - /// Number of pending tasks in the queue. This helps prevent unnecessary - /// locking in the hot path. - len: AtomicUsize, +mod synced; +pub(crate) use synced::Synced; - _p: PhantomData, +cfg_rt_multi_thread! { + mod rt_multi_thread; } -struct Pointers { - /// True if the queue is closed. - is_closed: bool, - - /// Linked-list head. - head: Option, - - /// Linked-list tail. - tail: Option, +cfg_metrics! { + mod metrics; } -pub(crate) struct Pop<'a, T: 'static> { - len: usize, - pointers: Option>, - _p: PhantomData, +/// Growable, MPMC queue used to inject new tasks into the scheduler and as an +/// overflow queue when the local, fixed-size, array queue overflows. +pub(crate) struct Inject { + shared: Shared, + synced: Mutex, } -unsafe impl Send for Inject {} -unsafe impl Sync for Inject {} - impl Inject { pub(crate) fn new() -> Inject { + let (shared, synced) = Shared::new(); + Inject { - pointers: Mutex::new(Pointers { - is_closed: false, - head: None, - tail: None, - }), - len: AtomicUsize::new(0), - _p: PhantomData, + shared, + synced: Mutex::new(synced), } } - pub(crate) fn is_empty(&self) -> bool { - self.len() == 0 - } - // Kind of annoying to have to include the cfg here - #[cfg(any(tokio_taskdump, all(feature = "rt-multi-thread", not(tokio_wasi))))] + #[cfg(tokio_taskdump)] pub(crate) fn is_closed(&self) -> bool { - self.pointers.lock().is_closed + let synced = self.synced.lock(); + self.shared.is_closed(&synced) } /// Closes the injection queue, returns `true` if the queue is open when the /// transition is made. pub(crate) fn close(&self) -> bool { - let mut p = self.pointers.lock(); - - if p.is_closed { - return false; - } - - p.is_closed = true; - true - } - - pub(crate) fn len(&self) -> usize { - self.len.load(Acquire) + let mut synced = self.synced.lock(); + self.shared.close(&mut synced) } /// Pushes a value into the queue. /// /// This does nothing if the queue is closed. pub(crate) fn push(&self, task: task::Notified) { - // Acquire queue lock - let mut p = self.pointers.lock(); - - if p.is_closed { - return; - } - - // safety: only mutated with the lock held - let len = unsafe { self.len.unsync_load() }; - let task = task.into_raw(); - - // The next pointer should already be null - debug_assert!(get_next(task).is_none()); - - if let Some(tail) = p.tail { - // safety: Holding the Notified for a task guarantees exclusive - // access to the `queue_next` field. - set_next(tail, Some(task)); - } else { - p.head = Some(task); - } - - p.tail = Some(task); - - self.len.store(len + 1, Release); + let mut synced = self.synced.lock(); + // safety: passing correct `Synced` + unsafe { self.shared.push(&mut synced, task) } } pub(crate) fn pop(&self) -> Option> { - self.pop_n(1).next() - } - - pub(crate) fn pop_n(&self, n: usize) -> Pop<'_, T> { - use std::cmp; - - // Fast path, if len == 0, then there are no values - if self.is_empty() { - return Pop { - len: 0, - pointers: None, - _p: PhantomData, - }; - } - - // Lock the queue - let p = self.pointers.lock(); - - // safety: All updates to the len atomic are guarded by the mutex. As - // such, a non-atomic load followed by a store is safe. - let len = unsafe { self.len.unsync_load() }; - - let n = cmp::min(n, len); - - // Decrement the count. - self.len.store(len - n, Release); - - Pop { - len: n, - pointers: Some(p), - _p: PhantomData, - } - } -} - -cfg_rt_multi_thread! { - impl Inject { - /// Pushes several values into the queue. - #[inline] - pub(crate) fn push_batch(&self, mut iter: I) - where - I: Iterator>, - { - let first = match iter.next() { - Some(first) => first.into_raw(), - None => return, - }; - - // Link up all the tasks. - let mut prev = first; - let mut counter = 1; - - // We are going to be called with an `std::iter::Chain`, and that - // iterator overrides `for_each` to something that is easier for the - // compiler to optimize than a loop. - iter.for_each(|next| { - let next = next.into_raw(); - - // safety: Holding the Notified for a task guarantees exclusive - // access to the `queue_next` field. - set_next(prev, Some(next)); - prev = next; - counter += 1; - }); - - // Now that the tasks are linked together, insert them into the - // linked list. - self.push_batch_inner(first, prev, counter); - } - - /// Inserts several tasks that have been linked together into the queue. - /// - /// The provided head and tail may be be the same task. In this case, a - /// single task is inserted. - #[inline] - fn push_batch_inner( - &self, - batch_head: task::RawTask, - batch_tail: task::RawTask, - num: usize, - ) { - debug_assert!(get_next(batch_tail).is_none()); - - let mut p = self.pointers.lock(); - - if let Some(tail) = p.tail { - set_next(tail, Some(batch_head)); - } else { - p.head = Some(batch_head); - } - - p.tail = Some(batch_tail); - - // Increment the count. - // - // safety: All updates to the len atomic are guarded by the mutex. As - // such, a non-atomic load followed by a store is safe. - let len = unsafe { self.len.unsync_load() }; - - self.len.store(len + num, Release); - } - } -} - -impl Drop for Inject { - fn drop(&mut self) { - if !std::thread::panicking() { - assert!(self.pop().is_none(), "queue not empty"); - } - } -} - -impl<'a, T: 'static> Iterator for Pop<'a, T> { - type Item = task::Notified; - - fn next(&mut self) -> Option { - if self.len == 0 { + if self.shared.is_empty() { return None; } - // `pointers` is always `Some` when `len() > 0` - let pointers = self.pointers.as_mut().unwrap(); - let ret = pointers.pop(); - - debug_assert!(ret.is_some()); - - self.len -= 1; - - if self.len == 0 { - self.pointers = None; - } - - ret - } - - fn size_hint(&self) -> (usize, Option) { - (self.len, Some(self.len)) - } -} - -impl<'a, T: 'static> ExactSizeIterator for Pop<'a, T> { - fn len(&self) -> usize { - self.len - } -} - -impl<'a, T: 'static> Drop for Pop<'a, T> { - fn drop(&mut self) { - for _ in self.by_ref() {} - } -} - -impl Pointers { - fn pop(&mut self) -> Option> { - let task = self.head?; - - self.head = get_next(task); - - if self.head.is_none() { - self.tail = None; - } - - set_next(task, None); - - // safety: a `Notified` is pushed into the queue and now it is popped! - Some(unsafe { task::Notified::from_raw(task) }) - } -} - -fn get_next(task: task::RawTask) -> Option { - unsafe { task.get_queue_next() } -} - -fn set_next(task: task::RawTask, val: Option) { - unsafe { - task.set_queue_next(val); + let mut synced = self.synced.lock(); + // safety: passing correct `Synced` + unsafe { self.shared.pop(&mut synced) } } } diff --git a/tokio/src/runtime/scheduler/inject/metrics.rs b/tokio/src/runtime/scheduler/inject/metrics.rs new file mode 100644 index 00000000000..76f045fdbd6 --- /dev/null +++ b/tokio/src/runtime/scheduler/inject/metrics.rs @@ -0,0 +1,7 @@ +use super::Inject; + +impl Inject { + pub(crate) fn len(&self) -> usize { + self.shared.len() + } +} diff --git a/tokio/src/runtime/scheduler/inject/pop.rs b/tokio/src/runtime/scheduler/inject/pop.rs new file mode 100644 index 00000000000..4e6d5d3be3a --- /dev/null +++ b/tokio/src/runtime/scheduler/inject/pop.rs @@ -0,0 +1,55 @@ +use super::Synced; + +use crate::runtime::task; + +use std::marker::PhantomData; + +pub(crate) struct Pop<'a, T: 'static> { + len: usize, + synced: &'a mut Synced, + _p: PhantomData, +} + +impl<'a, T: 'static> Pop<'a, T> { + pub(super) fn new(len: usize, synced: &'a mut Synced) -> Pop<'a, T> { + Pop { + len, + synced, + _p: PhantomData, + } + } +} + +impl<'a, T: 'static> Iterator for Pop<'a, T> { + type Item = task::Notified; + + fn next(&mut self) -> Option { + if self.len == 0 { + return None; + } + + let ret = self.synced.pop(); + + // Should be `Some` when `len > 0` + debug_assert!(ret.is_some()); + + self.len -= 1; + ret + } + + fn size_hint(&self) -> (usize, Option) { + (self.len, Some(self.len)) + } +} + +impl<'a, T: 'static> ExactSizeIterator for Pop<'a, T> { + fn len(&self) -> usize { + self.len + } +} + +impl<'a, T: 'static> Drop for Pop<'a, T> { + fn drop(&mut self) { + for _ in self.by_ref() {} + } +} diff --git a/tokio/src/runtime/scheduler/inject/rt_multi_thread.rs b/tokio/src/runtime/scheduler/inject/rt_multi_thread.rs new file mode 100644 index 00000000000..07d1063c5d8 --- /dev/null +++ b/tokio/src/runtime/scheduler/inject/rt_multi_thread.rs @@ -0,0 +1,98 @@ +use super::{Shared, Synced}; + +use crate::runtime::scheduler::Lock; +use crate::runtime::task; + +use std::sync::atomic::Ordering::Release; + +impl<'a> Lock for &'a mut Synced { + type Handle = &'a mut Synced; + + fn lock(self) -> Self::Handle { + self + } +} + +impl AsMut for Synced { + fn as_mut(&mut self) -> &mut Synced { + self + } +} + +impl Shared { + /// Pushes several values into the queue. + /// + /// # Safety + /// + /// Must be called with the same `Synced` instance returned by `Inject::new` + #[inline] + pub(crate) unsafe fn push_batch(&self, shared: L, mut iter: I) + where + L: Lock, + I: Iterator>, + { + let first = match iter.next() { + Some(first) => first.into_raw(), + None => return, + }; + + // Link up all the tasks. + let mut prev = first; + let mut counter = 1; + + // We are going to be called with an `std::iter::Chain`, and that + // iterator overrides `for_each` to something that is easier for the + // compiler to optimize than a loop. + iter.for_each(|next| { + let next = next.into_raw(); + + // safety: Holding the Notified for a task guarantees exclusive + // access to the `queue_next` field. + unsafe { prev.set_queue_next(Some(next)) }; + prev = next; + counter += 1; + }); + + // Now that the tasks are linked together, insert them into the + // linked list. + self.push_batch_inner(shared, first, prev, counter); + } + + /// Inserts several tasks that have been linked together into the queue. + /// + /// The provided head and tail may be be the same task. In this case, a + /// single task is inserted. + #[inline] + unsafe fn push_batch_inner( + &self, + shared: L, + batch_head: task::RawTask, + batch_tail: task::RawTask, + num: usize, + ) where + L: Lock, + { + debug_assert!(unsafe { batch_tail.get_queue_next().is_none() }); + + let mut synced = shared.lock(); + let synced = synced.as_mut(); + + if let Some(tail) = synced.tail { + unsafe { + tail.set_queue_next(Some(batch_head)); + } + } else { + synced.head = Some(batch_head); + } + + synced.tail = Some(batch_tail); + + // Increment the count. + // + // safety: All updates to the len atomic are guarded by the mutex. As + // such, a non-atomic load followed by a store is safe. + let len = self.len.unsync_load(); + + self.len.store(len + num, Release); + } +} diff --git a/tokio/src/runtime/scheduler/inject/shared.rs b/tokio/src/runtime/scheduler/inject/shared.rs new file mode 100644 index 00000000000..7fdd2839dd2 --- /dev/null +++ b/tokio/src/runtime/scheduler/inject/shared.rs @@ -0,0 +1,119 @@ +use super::{Pop, Synced}; + +use crate::loom::sync::atomic::AtomicUsize; +use crate::runtime::task; + +use std::marker::PhantomData; +use std::sync::atomic::Ordering::{Acquire, Release}; + +pub(crate) struct Shared { + /// Number of pending tasks in the queue. This helps prevent unnecessary + /// locking in the hot path. + pub(super) len: AtomicUsize, + + _p: PhantomData, +} + +unsafe impl Send for Shared {} +unsafe impl Sync for Shared {} + +impl Shared { + pub(crate) fn new() -> (Shared, Synced) { + let inject = Shared { + len: AtomicUsize::new(0), + _p: PhantomData, + }; + + let synced = Synced { + is_closed: false, + head: None, + tail: None, + }; + + (inject, synced) + } + + pub(crate) fn is_empty(&self) -> bool { + self.len() == 0 + } + + // Kind of annoying to have to include the cfg here + #[cfg(any(tokio_taskdump, all(feature = "rt-multi-thread", not(tokio_wasi))))] + pub(crate) fn is_closed(&self, synced: &Synced) -> bool { + synced.is_closed + } + + /// Closes the injection queue, returns `true` if the queue is open when the + /// transition is made. + pub(crate) fn close(&self, synced: &mut Synced) -> bool { + if synced.is_closed { + return false; + } + + synced.is_closed = true; + true + } + + pub(crate) fn len(&self) -> usize { + self.len.load(Acquire) + } + + /// Pushes a value into the queue. + /// + /// This does nothing if the queue is closed. + /// + /// # Safety + /// + /// Must be called with the same `Synced` instance returned by `Inject::new` + pub(crate) unsafe fn push(&self, synced: &mut Synced, task: task::Notified) { + if synced.is_closed { + return; + } + + // safety: only mutated with the lock held + let len = self.len.unsync_load(); + let task = task.into_raw(); + + // The next pointer should already be null + debug_assert!(unsafe { task.get_queue_next().is_none() }); + + if let Some(tail) = synced.tail { + // safety: Holding the Notified for a task guarantees exclusive + // access to the `queue_next` field. + unsafe { tail.set_queue_next(Some(task)) }; + } else { + synced.head = Some(task); + } + + synced.tail = Some(task); + self.len.store(len + 1, Release); + } + + /// Pop a value from the queue. + /// + /// # Safety + /// + /// Must be called with the same `Synced` instance returned by `Inject::new` + pub(crate) unsafe fn pop(&self, synced: &mut Synced) -> Option> { + self.pop_n(synced, 1).next() + } + + /// Pop `n` values from the queue + /// + /// # Safety + /// + /// Must be called with the same `Synced` instance returned by `Inject::new` + pub(crate) unsafe fn pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T> { + use std::cmp; + + // safety: All updates to the len atomic are guarded by the mutex. As + // such, a non-atomic load followed by a store is safe. + let len = self.len.unsync_load(); + let n = cmp::min(n, len); + + // Decrement the count. + self.len.store(len - n, Release); + + Pop::new(n, synced) + } +} diff --git a/tokio/src/runtime/scheduler/inject/synced.rs b/tokio/src/runtime/scheduler/inject/synced.rs new file mode 100644 index 00000000000..6847f68e5db --- /dev/null +++ b/tokio/src/runtime/scheduler/inject/synced.rs @@ -0,0 +1,32 @@ +use crate::runtime::task; + +pub(crate) struct Synced { + /// True if the queue is closed. + pub(super) is_closed: bool, + + /// Linked-list head. + pub(super) head: Option, + + /// Linked-list tail. + pub(super) tail: Option, +} + +unsafe impl Send for Synced {} +unsafe impl Sync for Synced {} + +impl Synced { + pub(super) fn pop(&mut self) -> Option> { + let task = self.head?; + + self.head = unsafe { task.get_queue_next() }; + + if self.head.is_none() { + self.tail = None; + } + + unsafe { task.set_queue_next(None) }; + + // safety: a `Notified` is pushed into the queue and now it is popped! + Some(unsafe { task::Notified::from_raw(task) }) + } +} diff --git a/tokio/src/runtime/scheduler/lock.rs b/tokio/src/runtime/scheduler/lock.rs new file mode 100644 index 00000000000..0901c2b37ca --- /dev/null +++ b/tokio/src/runtime/scheduler/lock.rs @@ -0,0 +1,6 @@ +/// A lock (mutex) yielding generic data. +pub(crate) trait Lock { + type Handle: AsMut; + + fn lock(self) -> Self::Handle; +} diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 93390453eb2..3e3151711f5 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -5,11 +5,14 @@ cfg_rt! { mod defer; use defer::Defer; - mod inject; + pub(crate) mod inject; pub(crate) use inject::Inject; } cfg_rt_multi_thread! { + mod lock; + use lock::Lock; + pub(crate) mod multi_thread; pub(crate) use multi_thread::MultiThread; } diff --git a/tokio/src/runtime/scheduler/multi_thread/mod.rs b/tokio/src/runtime/scheduler/multi_thread/mod.rs index de39a93fd8b..306a622b3ed 100644 --- a/tokio/src/runtime/scheduler/multi_thread/mod.rs +++ b/tokio/src/runtime/scheduler/multi_thread/mod.rs @@ -6,6 +6,9 @@ use counters::Counters; mod handle; pub(crate) use handle::Handle; +mod overflow; +pub(crate) use overflow::Overflow; + mod idle; use self::idle::Idle; @@ -18,8 +21,7 @@ pub(crate) use park::{Parker, Unparker}; pub(crate) mod queue; mod worker; -use worker::Shared; -pub(crate) use worker::{Context, Launch}; +pub(crate) use worker::{Context, Launch, Shared}; pub(crate) use worker::block_in_place; diff --git a/tokio/src/runtime/scheduler/multi_thread/overflow.rs b/tokio/src/runtime/scheduler/multi_thread/overflow.rs new file mode 100644 index 00000000000..ab664811cff --- /dev/null +++ b/tokio/src/runtime/scheduler/multi_thread/overflow.rs @@ -0,0 +1,26 @@ +use crate::runtime::task; + +#[cfg(test)] +use std::cell::RefCell; + +pub(crate) trait Overflow { + fn push(&self, task: task::Notified); + + fn push_batch(&self, iter: I) + where + I: Iterator>; +} + +#[cfg(test)] +impl Overflow for RefCell>> { + fn push(&self, task: task::Notified) { + self.borrow_mut().push(task); + } + + fn push_batch(&self, iter: I) + where + I: Iterator>, + { + self.borrow_mut().extend(iter); + } +} diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index c61c4de609d..6444df88b8a 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -2,8 +2,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::Arc; -use crate::runtime::scheduler::multi_thread::Stats; -use crate::runtime::scheduler::Inject; +use crate::runtime::scheduler::multi_thread::{Overflow, Stats}; use crate::runtime::task; use std::mem::{self, MaybeUninit}; @@ -183,10 +182,10 @@ impl Local { /// When the queue overflows, half of the curent contents of the queue is /// moved to the given Injection queue. This frees up capacity for more /// tasks to be pushed into the local queue. - pub(crate) fn push_back_or_overflow( + pub(crate) fn push_back_or_overflow>( &mut self, mut task: task::Notified, - inject: &Inject, + overflow: &O, stats: &mut Stats, ) { let tail = loop { @@ -202,12 +201,12 @@ impl Local { } else if steal != real { // Concurrently stealing, this will free up capacity, so only // push the task onto the inject queue - inject.push(task); + overflow.push(task); return; } else { // Push the current task and half of the queue into the // inject queue. - match self.push_overflow(task, real, tail, inject, stats) { + match self.push_overflow(task, real, tail, overflow, stats) { Ok(_) => return, // Lost the race, try again Err(v) => { @@ -248,12 +247,12 @@ impl Local { /// workers "missed" some of the tasks during a steal, they will get /// another opportunity. #[inline(never)] - fn push_overflow( + fn push_overflow>( &mut self, task: task::Notified, head: UnsignedShort, tail: UnsignedShort, - inject: &Inject, + overflow: &O, stats: &mut Stats, ) -> Result<(), task::Notified> { /// How many elements are we taking from the local queue. @@ -336,7 +335,7 @@ impl Local { head: head as UnsignedLong, i: 0, }; - inject.push_batch(batch_iter.chain(std::iter::once(task))); + overflow.push_batch(batch_iter.chain(std::iter::once(task))); // Add 1 to factor in the task currently being scheduled. stats.incr_overflow_count(); diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 7b5b09f7573..76f8d0be4c3 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -60,9 +60,9 @@ use crate::loom::sync::{Arc, Mutex}; use crate::runtime; use crate::runtime::context; use crate::runtime::scheduler::multi_thread::{ - idle, queue, Counters, Handle, Idle, Parker, Stats, Unparker, + idle, queue, Counters, Handle, Idle, Overflow, Parker, Stats, Unparker, }; -use crate::runtime::scheduler::{Defer, Inject}; +use crate::runtime::scheduler::{inject, Defer, Lock}; use crate::runtime::task::OwnedTasks; use crate::runtime::{ blocking, coop, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics, @@ -129,7 +129,7 @@ struct Core { } /// State shared across all workers -pub(super) struct Shared { +pub(crate) struct Shared { /// Per-worker remote state. All other workers have access to this and is /// how they communicate between each other. remotes: Box<[Remote]>, @@ -137,7 +137,7 @@ pub(super) struct Shared { /// Global task queue used for: /// 1. Submit work to the scheduler while **not** currently on a worker thread. /// 2. Submit work to the scheduler when a worker run queue is saturated - inject: Inject>, + inject: inject::Shared>, /// Coordinates idle workers idle: Idle, @@ -172,7 +172,11 @@ pub(super) struct Shared { /// Data synchronized by the scheduler mutex pub(super) struct Synced { + /// Synchronized state for `Idle`. pub(super) idle: idle::Synced, + + /// Synchronized state for `Inject`. + inject: inject::Synced, } /// Used to communicate with a worker from other threads. @@ -256,14 +260,18 @@ pub(super) fn create( } let (idle, idle_synced) = Idle::new(size); + let (inject, inject_synced) = inject::Shared::new(); let handle = Arc::new(Handle { shared: Shared { remotes: remotes.into_boxed_slice(), - inject: Inject::new(), + inject, idle, owned: OwnedTasks::new(), - synced: Mutex::new(Synced { idle: idle_synced }), + synced: Mutex::new(Synced { + idle: idle_synced, + inject: inject_synced, + }), shutdown_cores: Mutex::new(vec![]), config, scheduler_metrics: SchedulerMetrics::new(), @@ -561,7 +569,7 @@ impl Context { // the back of the queue and return. core.run_queue.push_back_or_overflow( task, - self.worker.inject(), + &*self.worker.handle, &mut core.stats, ); // If we hit this point, the LIFO slot should be enabled. @@ -710,7 +718,10 @@ impl Core { // Update the global queue interval, if needed self.tune_global_queue_interval(worker); - worker.inject().pop().or_else(|| self.next_local_task()) + worker + .handle + .next_remote_task() + .or_else(|| self.next_local_task()) } else { let maybe_task = self.next_local_task(); @@ -718,6 +729,10 @@ impl Core { return maybe_task; } + if worker.inject().is_empty() { + return None; + } + // Other threads can only **remove** tasks from the current worker's // `run_queue`. So, we can be confident that by the time we call // `run_queue.push_back` below, there will be *at least* `cap` @@ -735,7 +750,9 @@ impl Core { cap, ); - let mut tasks = worker.inject().pop_n(n); + let mut synced = worker.handle.shared.synced.lock(); + // safety: passing in the correct `inject::Synced`. + let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) }; // Pop the first task to return immedietly let ret = tasks.next(); @@ -783,7 +800,7 @@ impl Core { } // Fallback on checking the global queue - worker.handle.shared.inject.pop() + worker.handle.next_remote_task() } fn transition_to_searching(&mut self, worker: &Worker) -> bool { @@ -870,7 +887,8 @@ impl Core { if !self.is_shutdown { // Check if the scheduler has been shutdown - self.is_shutdown = worker.inject().is_closed(); + let synced = worker.handle.shared.synced.lock(); + self.is_shutdown = worker.inject().is_closed(&synced.inject); } } @@ -911,7 +929,7 @@ impl Core { impl Worker { /// Returns a reference to the scheduler's injection queue. - fn inject(&self) -> &Inject> { + fn inject(&self) -> &inject::Shared> { &self.handle.shared.inject } } @@ -946,8 +964,7 @@ impl Handle { } // Otherwise, use the inject queue. - self.shared.inject.push(task); - self.shared.scheduler_metrics.inc_remote_schedule_count(); + self.push_remote_task(task); self.notify_parked_remote(); }) } @@ -961,7 +978,7 @@ impl Handle { // flexibility and the task may go to the front of the queue. let should_notify = if is_yield || !core.lifo_enabled { core.run_queue - .push_back_or_overflow(task, &self.shared.inject, &mut core.stats); + .push_back_or_overflow(task, self, &mut core.stats); true } else { // Push to the LIFO slot @@ -970,7 +987,7 @@ impl Handle { if let Some(prev) = prev { core.run_queue - .push_back_or_overflow(prev, &self.shared.inject, &mut core.stats); + .push_back_or_overflow(prev, self, &mut core.stats); } core.lifo_slot = Some(task); @@ -986,8 +1003,32 @@ impl Handle { } } + fn next_remote_task(&self) -> Option { + if self.shared.inject.is_empty() { + return None; + } + + let mut synced = self.shared.synced.lock(); + // safety: passing in correct `idle::Synced` + unsafe { self.shared.inject.pop(&mut synced.inject) } + } + + fn push_remote_task(&self, task: Notified) { + self.shared.scheduler_metrics.inc_remote_schedule_count(); + + let mut synced = self.shared.synced.lock(); + // safety: passing in correct `idle::Synced` + unsafe { + self.shared.inject.push(&mut synced.inject, task); + } + } + pub(super) fn close(&self) { - if self.shared.inject.close() { + if self + .shared + .inject + .close(&mut self.shared.synced.lock().inject) + { self.notify_all(); } } @@ -1055,7 +1096,7 @@ impl Handle { // Drain the injection queue // // We already shut down every task, so we can simply drop the tasks. - while let Some(task) = self.shared.inject.pop() { + while let Some(task) = self.next_remote_task() { drop(task); } } @@ -1065,6 +1106,41 @@ impl Handle { } } +impl Overflow> for Handle { + fn push(&self, task: task::Notified>) { + self.push_remote_task(task); + } + + fn push_batch(&self, iter: I) + where + I: Iterator>>, + { + unsafe { + self.shared.inject.push_batch(self, iter); + } + } +} + +pub(crate) struct InjectGuard<'a> { + lock: crate::loom::sync::MutexGuard<'a, Synced>, +} + +impl<'a> AsMut for InjectGuard<'a> { + fn as_mut(&mut self) -> &mut inject::Synced { + &mut self.lock.inject + } +} + +impl<'a> Lock for &'a Handle { + type Handle = InjectGuard<'a>; + + fn lock(self) -> Self::Handle { + InjectGuard { + lock: self.shared.synced.lock(), + } + } +} + #[track_caller] fn with_current(f: impl FnOnce(Option<&Context>) -> R) -> R { use scheduler::Context::MultiThread; diff --git a/tokio/src/runtime/tests/inject.rs b/tokio/src/runtime/tests/inject.rs index 149654d19a0..ccead5e024a 100644 --- a/tokio/src/runtime/tests/inject.rs +++ b/tokio/src/runtime/tests/inject.rs @@ -1,38 +1,54 @@ -use crate::runtime::scheduler::Inject; +use crate::runtime::scheduler::inject; #[test] fn push_and_pop() { - let inject = Inject::new(); + const N: usize = 2; - for _ in 0..10 { + let (inject, mut synced) = inject::Shared::new(); + + for i in 0..N { + assert_eq!(inject.len(), i); let (task, _) = super::unowned(async {}); - inject.push(task); + unsafe { inject.push(&mut synced, task) }; } - for _ in 0..10 { - assert!(inject.pop().is_some()); + for i in 0..N { + assert_eq!(inject.len(), N - i); + assert!(unsafe { inject.pop(&mut synced) }.is_some()); } - assert!(inject.pop().is_none()); + println!("--------------"); + + assert!(unsafe { inject.pop(&mut synced) }.is_none()); } #[test] fn push_batch_and_pop() { - let inject = Inject::new(); + let (inject, mut inject_synced) = inject::Shared::new(); - inject.push_batch((0..10).map(|_| super::unowned(async {}).0)); + unsafe { + inject.push_batch( + &mut inject_synced, + (0..10).map(|_| super::unowned(async {}).0), + ); - assert_eq!(5, inject.pop_n(5).count()); - assert_eq!(5, inject.pop_n(5).count()); - assert_eq!(0, inject.pop_n(5).count()); + assert_eq!(5, inject.pop_n(&mut inject_synced, 5).count()); + assert_eq!(5, inject.pop_n(&mut inject_synced, 5).count()); + assert_eq!(0, inject.pop_n(&mut inject_synced, 5).count()); + } } #[test] fn pop_n_drains_on_drop() { - let inject = Inject::new(); + let (inject, mut inject_synced) = inject::Shared::new(); - inject.push_batch((0..10).map(|_| super::unowned(async {}).0)); - let _ = inject.pop_n(10); + unsafe { + inject.push_batch( + &mut inject_synced, + (0..10).map(|_| super::unowned(async {}).0), + ); + let _ = inject.pop_n(&mut inject_synced, 10); - assert_eq!(inject.len(), 0); + assert_eq!(inject.len(), 0); + } } diff --git a/tokio/src/runtime/tests/loom_queue.rs b/tokio/src/runtime/tests/loom_queue.rs index 756f7b8424b..b60e039b9a6 100644 --- a/tokio/src/runtime/tests/loom_queue.rs +++ b/tokio/src/runtime/tests/loom_queue.rs @@ -1,8 +1,8 @@ use crate::runtime::scheduler::multi_thread::{queue, Stats}; -use crate::runtime::scheduler::Inject; use crate::runtime::tests::NoopSchedule; use loom::thread; +use std::cell::RefCell; fn new_stats() -> Stats { Stats::new(&crate::runtime::WorkerMetrics::new()) @@ -12,7 +12,7 @@ fn new_stats() -> Stats { fn basic() { loom::model(|| { let (steal, mut local) = queue::local(); - let inject = Inject::new(); + let inject = RefCell::new(vec![]); let mut stats = new_stats(); let th = thread::spawn(move || { @@ -54,9 +54,7 @@ fn basic() { } } - while inject.pop().is_some() { - n += 1; - } + n += inject.borrow_mut().drain(..).count(); n += th.join().unwrap(); @@ -68,7 +66,7 @@ fn basic() { fn steal_overflow() { loom::model(|| { let (steal, mut local) = queue::local(); - let inject = Inject::new(); + let inject = RefCell::new(vec![]); let mut stats = new_stats(); let th = thread::spawn(move || { @@ -108,9 +106,7 @@ fn steal_overflow() { n += 1; } - while inject.pop().is_some() { - n += 1; - } + n += inject.borrow_mut().drain(..).count(); assert_eq!(7, n); }); @@ -139,7 +135,7 @@ fn multi_stealer() { loom::model(|| { let (steal, mut local) = queue::local(); - let inject = Inject::new(); + let inject = RefCell::new(vec![]); let mut stats = new_stats(); // Push work @@ -161,9 +157,7 @@ fn multi_stealer() { n += 1; } - while inject.pop().is_some() { - n += 1; - } + n += inject.borrow_mut().drain(..).count(); n += th1.join().unwrap(); n += th2.join().unwrap(); @@ -178,7 +172,7 @@ fn chained_steal() { let mut stats = new_stats(); let (s1, mut l1) = queue::local(); let (s2, mut l2) = queue::local(); - let inject = Inject::new(); + let inject = RefCell::new(vec![]); // Load up some tasks for _ in 0..4 { @@ -207,6 +201,5 @@ fn chained_steal() { while l1.pop().is_some() {} while l2.pop().is_some() {} - while inject.pop().is_some() {} }); } diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index 5642d29d7eb..5df92b7a291 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -1,7 +1,7 @@ use crate::runtime::scheduler::multi_thread::{queue, Stats}; -use crate::runtime::scheduler::Inject; use crate::runtime::task::{self, Schedule, Task}; +use std::cell::RefCell; use std::thread; use std::time::Duration; @@ -29,7 +29,7 @@ fn new_stats() -> Stats { #[test] fn fits_256_one_at_a_time() { let (_, mut local) = queue::local(); - let inject = Inject::new(); + let inject = RefCell::new(vec![]); let mut stats = new_stats(); for _ in 0..256 { @@ -41,7 +41,7 @@ fn fits_256_one_at_a_time() { assert_metrics!(stats, overflow_count == 0); } - assert!(inject.pop().is_none()); + assert!(inject.borrow_mut().pop().is_none()); while local.pop().is_some() {} } @@ -87,7 +87,7 @@ fn fits_256_all_in_chunks() { #[test] fn overflow() { let (_, mut local) = queue::local(); - let inject = Inject::new(); + let inject = RefCell::new(vec![]); let mut stats = new_stats(); for _ in 0..257 { @@ -101,9 +101,7 @@ fn overflow() { let mut n = 0; - while inject.pop().is_some() { - n += 1; - } + n += inject.borrow_mut().drain(..).count(); while local.pop().is_some() { n += 1; @@ -118,7 +116,7 @@ fn steal_batch() { let (steal1, mut local1) = queue::local(); let (_, mut local2) = queue::local(); - let inject = Inject::new(); + let inject = RefCell::new(vec![]); for _ in 0..4 { let (task, _) = super::unowned(async {}); @@ -164,7 +162,7 @@ fn stress1() { for _ in 0..NUM_ITER { let (steal, mut local) = queue::local(); - let inject = Inject::new(); + let inject = RefCell::new(vec![]); let th = thread::spawn(move || { let mut stats = new_stats(); @@ -207,9 +205,7 @@ fn stress1() { } } - while inject.pop().is_some() { - n += 1; - } + n += inject.borrow_mut().drain(..).count(); n += th.join().unwrap(); @@ -227,7 +223,7 @@ fn stress2() { for _ in 0..NUM_ITER { let (steal, mut local) = queue::local(); - let inject = Inject::new(); + let inject = RefCell::new(vec![]); let th = thread::spawn(move || { let mut stats = new_stats(); @@ -259,9 +255,7 @@ fn stress2() { num_pop += 1; } - while inject.pop().is_some() { - num_pop += 1; - } + num_pop += inject.borrow_mut().drain(..).count(); } num_pop += th.join().unwrap(); @@ -270,9 +264,7 @@ fn stress2() { num_pop += 1; } - while inject.pop().is_some() { - num_pop += 1; - } + num_pop += inject.borrow_mut().drain(..).count(); assert_eq!(num_pop, NUM_TASKS); }