diff --git a/.github/workflows/loom.yml b/.github/workflows/loom.yml index 0c18e0c3aa8..3fa179dd73c 100644 --- a/.github/workflows/loom.yml +++ b/.github/workflows/loom.yml @@ -24,13 +24,19 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - scope: - - --skip loom_pool - - loom_pool::group_a - - loom_pool::group_b - - loom_pool::group_c - - loom_pool::group_d - - time::driver + include: + - scope: --skip loom_pool + max_preemptions: 2 + - scope: loom_pool::group_a + max_preemptions: 1 + - scope: loom_pool::group_b + max_preemptions: 2 + - scope: loom_pool::group_c + max_preemptions: 1 + - scope: loom_pool::group_d + max_preemptions: 1 + - scope: time::driver + max_preemptions: 2 steps: - uses: actions/checkout@v3 - name: Install Rust ${{ env.rust_stable }} @@ -43,6 +49,6 @@ jobs: working-directory: tokio env: RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings - LOOM_MAX_PREEMPTIONS: 2 + LOOM_MAX_PREEMPTIONS: ${{ matrix.max_preemptions }} LOOM_MAX_BRANCHES: 10000 SCOPE: ${{ matrix.scope }} diff --git a/benches/rt_multi_threaded.rs b/benches/rt_multi_threaded.rs index af048d9e8cb..88553e4ab50 100644 --- a/benches/rt_multi_threaded.rs +++ b/benches/rt_multi_threaded.rs @@ -10,9 +10,10 @@ use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; use std::sync::{mpsc, Arc}; -fn spawn_many(b: &mut Bencher) { - const NUM_SPAWN: usize = 10_000; +const NUM_WORKERS: usize = 4; +const NUM_SPAWN: usize = 10_000; +fn spawn_many_local(b: &mut Bencher) { let rt = rt(); let (tx, rx) = mpsc::sync_channel(1000); @@ -38,6 +39,52 @@ fn spawn_many(b: &mut Bencher) { }); } +fn spawn_many_remote_idle(b: &mut Bencher) { + let rt = rt(); + + let mut handles = Vec::with_capacity(NUM_SPAWN); + + b.iter(|| { + for _ in 0..NUM_SPAWN { + handles.push(rt.spawn(async {})); + } + + rt.block_on(async { + for handle in handles.drain(..) { + handle.await.unwrap(); + } + }); + }); +} + +fn spawn_many_remote_busy(b: &mut Bencher) { + let rt = rt(); + let rt_handle = rt.handle(); + let mut handles = Vec::with_capacity(NUM_SPAWN); + + // Spawn some tasks to keep the runtimes busy + for _ in 0..(2 * NUM_WORKERS) { + rt.spawn(async { + loop { + tokio::task::yield_now().await; + std::thread::sleep(std::time::Duration::from_micros(10)); + } + }); + } + + b.iter(|| { + for _ in 0..NUM_SPAWN { + handles.push(rt_handle.spawn(async {})); + } + + rt.block_on(async { + for handle in handles.drain(..) { + handle.await.unwrap(); + } + }); + }); +} + fn yield_many(b: &mut Bencher) { const NUM_YIELD: usize = 1_000; const TASKS: usize = 200; @@ -140,12 +187,20 @@ fn chained_spawn(b: &mut Bencher) { fn rt() -> Runtime { runtime::Builder::new_multi_thread() - .worker_threads(4) + .worker_threads(NUM_WORKERS) .enable_all() .build() .unwrap() } -benchmark_group!(scheduler, spawn_many, ping_pong, yield_many, chained_spawn,); +benchmark_group!( + scheduler, + spawn_many_local, + spawn_many_remote_idle, + spawn_many_remote_busy, + ping_pong, + yield_many, + chained_spawn, +); benchmark_main!(scheduler); diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index faf56db2e91..dd132fb9a6d 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -110,6 +110,15 @@ impl Local { !self.inner.is_empty() } + /// How many tasks can be pushed into the queue + pub(crate) fn remaining_slots(&self) -> usize { + self.inner.remaining_slots() + } + + pub(crate) fn max_capacity(&self) -> usize { + LOCAL_QUEUE_CAPACITY + } + /// Returns false if there are any entries in the queue /// /// Separate to is_stealable so that refactors of is_stealable to "protect" @@ -118,8 +127,62 @@ impl Local { !self.inner.is_empty() } - /// Pushes a task to the back of the local queue, skipping the LIFO slot. - pub(crate) fn push_back( + /// Pushes a batch of tasks to the back of the queue. All tasks must fit in + /// the local queue. + /// + /// # Panics + /// + /// The method panics if there is not enough capacity to fit in the queue. + pub(crate) fn push_back(&mut self, tasks: impl ExactSizeIterator>) { + let len = tasks.len(); + assert!(len <= LOCAL_QUEUE_CAPACITY); + + if len == 0 { + // Nothing to do + return; + } + + let head = self.inner.head.load(Acquire); + let (steal, _) = unpack(head); + + // safety: this is the **only** thread that updates this cell. + let mut tail = unsafe { self.inner.tail.unsync_load() }; + + if tail.wrapping_sub(steal) <= (LOCAL_QUEUE_CAPACITY - len) as UnsignedShort { + // Yes, this if condition is structured a bit weird (first block + // does nothing, second returns an error). It is this way to match + // `push_back_or_overflow`. + } else { + panic!() + } + + for task in tasks { + let idx = tail as usize & MASK; + + self.inner.buffer[idx].with_mut(|ptr| { + // Write the task to the slot + // + // Safety: There is only one producer and the above `if` + // condition ensures we don't touch a cell if there is a + // value, thus no consumer. + unsafe { + ptr::write((*ptr).as_mut_ptr(), task); + } + }); + + tail = tail.wrapping_add(1); + } + + self.inner.tail.store(tail, Release); + } + + /// Pushes a task to the back of the local queue, if there is not enough + /// capacity in the queue, this triggers the overflow operation. + /// + /// When the queue overflows, half of the curent contents of the queue is + /// moved to the given Injection queue. This frees up capacity for more + /// tasks to be pushed into the local queue. + pub(crate) fn push_back_or_overflow( &mut self, mut task: task::Notified, inject: &Inject, @@ -153,6 +216,11 @@ impl Local { } }; + self.push_back_finish(task, tail); + } + + // Second half of `push_back` + fn push_back_finish(&self, task: task::Notified, tail: UnsignedShort) { // Map the position to a slot index. let idx = tail as usize & MASK; @@ -501,6 +569,13 @@ impl Drop for Local { } impl Inner { + fn remaining_slots(&self) -> usize { + let (steal, _) = unpack(self.head.load(Acquire)); + let tail = self.tail.load(Acquire); + + LOCAL_QUEUE_CAPACITY - (tail.wrapping_sub(steal) as usize) + } + fn len(&self) -> UnsignedShort { let (_, head) = unpack(self.head.load(Acquire)); let tail = self.tail.load(Acquire); diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index eb0ed705ee1..4729376fc8f 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -509,8 +509,11 @@ impl Context { } else { // Not enough budget left to run the LIFO task, push it to // the back of the queue and return. - core.run_queue - .push_back(task, self.worker.inject(), &mut core.metrics); + core.run_queue.push_back_or_overflow( + task, + self.worker.inject(), + &mut core.metrics, + ); return Ok(core); } } @@ -612,7 +615,38 @@ impl Core { if self.tick % worker.handle.shared.config.global_queue_interval == 0 { worker.inject().pop().or_else(|| self.next_local_task()) } else { - self.next_local_task().or_else(|| worker.inject().pop()) + let maybe_task = self.next_local_task(); + + if maybe_task.is_some() { + return maybe_task; + } + + // Other threads can only **remove** tasks from the current worker's + // `run_queue`. So, we can be confident that by the time we call + // `run_queue.push_back` below, there will be *at least* `cap` + // available slots in the queue. + let cap = usize::min( + self.run_queue.remaining_slots(), + self.run_queue.max_capacity() / 2, + ); + + // The worker is currently idle, pull a batch of work from the + // injection queue. We don't want to pull *all* the work so other + // workers can also get some. + let n = usize::min( + worker.inject().len() / worker.handle.shared.remotes.len() + 1, + cap, + ); + + let mut tasks = worker.inject().pop_n(n); + + // Pop the first task to return immedietly + let ret = tasks.next(); + + // Push the rest of the on the run queue + self.run_queue.push_back(tasks); + + ret } } @@ -808,7 +842,7 @@ impl Handle { // flexibility and the task may go to the front of the queue. let should_notify = if is_yield || self.shared.config.disable_lifo_slot { core.run_queue - .push_back(task, &self.shared.inject, &mut core.metrics); + .push_back_or_overflow(task, &self.shared.inject, &mut core.metrics); true } else { // Push to the LIFO slot @@ -817,7 +851,7 @@ impl Handle { if let Some(prev) = prev { core.run_queue - .push_back(prev, &self.shared.inject, &mut core.metrics); + .push_back_or_overflow(prev, &self.shared.inject, &mut core.metrics); } core.lifo_slot = Some(task); diff --git a/tokio/src/runtime/task/inject.rs b/tokio/src/runtime/task/inject.rs index eb17ee644fc..6b7eee07328 100644 --- a/tokio/src/runtime/task/inject.rs +++ b/tokio/src/runtime/task/inject.rs @@ -1,7 +1,7 @@ //! Inject queue used to send wakeups to a work-stealing scheduler use crate::loom::sync::atomic::AtomicUsize; -use crate::loom::sync::Mutex; +use crate::loom::sync::{Mutex, MutexGuard}; use crate::runtime::task; use std::marker::PhantomData; @@ -32,6 +32,12 @@ struct Pointers { tail: Option>, } +pub(crate) struct Pop<'a, T: 'static> { + len: usize, + pointers: Option>, + _p: PhantomData, +} + unsafe impl Send for Inject {} unsafe impl Sync for Inject {} @@ -107,34 +113,38 @@ impl Inject { } pub(crate) fn pop(&self) -> Option> { + self.pop_n(1).next() + } + + pub(crate) fn pop_n(&self, n: usize) -> Pop<'_, T> { + use std::cmp; + // Fast path, if len == 0, then there are no values if self.is_empty() { - return None; + return Pop { + len: 0, + pointers: None, + _p: PhantomData, + }; } - let mut p = self.pointers.lock(); - - // It is possible to hit null here if another thread popped the last - // task between us checking `len` and acquiring the lock. - let task = p.head?; - - p.head = get_next(task); + // Lock the queue + let p = self.pointers.lock(); - if p.head.is_none() { - p.tail = None; - } + // safety: All updates to the len atomic are guarded by the mutex. As + // such, a non-atomic load followed by a store is safe. + let len = unsafe { self.len.unsync_load() }; - set_next(task, None); + let n = cmp::min(n, len); // Decrement the count. - // - // safety: All updates to the len atomic are guarded by the mutex. As - // such, a non-atomic load followed by a store is safe. - self.len - .store(unsafe { self.len.unsync_load() } - 1, Release); + self.len.store(len - n, Release); - // safety: a `Notified` is pushed into the queue and now it is popped! - Some(unsafe { task::Notified::from_raw(task) }) + Pop { + len: n, + pointers: Some(p), + _p: PhantomData, + } } } @@ -215,6 +225,63 @@ impl Drop for Inject { } } +impl<'a, T: 'static> Iterator for Pop<'a, T> { + type Item = task::Notified; + + fn next(&mut self) -> Option { + if self.len == 0 { + return None; + } + + // `pointers` is always `Some` when `len() > 0` + let pointers = self.pointers.as_mut().unwrap(); + let ret = pointers.pop(); + + debug_assert!(ret.is_some()); + + self.len -= 1; + + if self.len == 0 { + self.pointers = None; + } + + ret + } + + fn size_hint(&self) -> (usize, Option) { + (self.len, Some(self.len)) + } +} + +impl<'a, T: 'static> ExactSizeIterator for Pop<'a, T> { + fn len(&self) -> usize { + self.len + } +} + +impl<'a, T: 'static> Drop for Pop<'a, T> { + fn drop(&mut self) { + for _ in self.by_ref() {} + } +} + +impl Pointers { + fn pop(&mut self) -> Option> { + let task = self.head?; + + self.head = get_next(task); + + if self.head.is_none() { + self.tail = None; + } + + set_next(task, None); + + // safety: a `Notified` is pushed into the queue and now it is popped! + Some(unsafe { task::Notified::from_raw(task) }) + } +} + fn get_next(header: NonNull) -> Option> { unsafe { header.as_ref().queue_next.with(|ptr| *ptr) } } diff --git a/tokio/src/runtime/tests/inject.rs b/tokio/src/runtime/tests/inject.rs new file mode 100644 index 00000000000..92431855485 --- /dev/null +++ b/tokio/src/runtime/tests/inject.rs @@ -0,0 +1,38 @@ +use crate::runtime::task::Inject; + +#[test] +fn push_and_pop() { + let inject = Inject::new(); + + for _ in 0..10 { + let (task, _) = super::unowned(async {}); + inject.push(task); + } + + for _ in 0..10 { + assert!(inject.pop().is_some()); + } + + assert!(inject.pop().is_none()); +} + +#[test] +fn push_batch_and_pop() { + let inject = Inject::new(); + + inject.push_batch((0..10).map(|_| super::unowned(async {}).0)); + + assert_eq!(5, inject.pop_n(5).count()); + assert_eq!(5, inject.pop_n(5).count()); + assert_eq!(0, inject.pop_n(5).count()); +} + +#[test] +fn pop_n_drains_on_drop() { + let inject = Inject::new(); + + inject.push_batch((0..10).map(|_| super::unowned(async {}).0)); + let _ = inject.pop_n(10); + + assert_eq!(inject.len(), 0); +} diff --git a/tokio/src/runtime/tests/loom_queue.rs b/tokio/src/runtime/tests/loom_queue.rs index 4c3511cde66..58c17ad65c2 100644 --- a/tokio/src/runtime/tests/loom_queue.rs +++ b/tokio/src/runtime/tests/loom_queue.rs @@ -39,7 +39,7 @@ fn basic() { for _ in 0..2 { for _ in 0..2 { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut metrics); } if local.pop().is_some() { @@ -48,7 +48,7 @@ fn basic() { // Push another task let (task, _) = super::unowned(async {}); - local.push_back(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut metrics); while local.pop().is_some() { n += 1; @@ -92,7 +92,7 @@ fn steal_overflow() { // push a task, pop a task let (task, _) = super::unowned(async {}); - local.push_back(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut metrics); if local.pop().is_some() { n += 1; @@ -100,7 +100,7 @@ fn steal_overflow() { for _ in 0..6 { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut metrics); } n += th.join().unwrap(); @@ -146,7 +146,7 @@ fn multi_stealer() { // Push work for _ in 0..NUM_TASKS { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut metrics); } let th1 = { @@ -184,10 +184,10 @@ fn chained_steal() { // Load up some tasks for _ in 0..4 { let (task, _) = super::unowned(async {}); - l1.push_back(task, &inject, &mut metrics); + l1.push_back_or_overflow(task, &inject, &mut metrics); let (task, _) = super::unowned(async {}); - l2.push_back(task, &inject, &mut metrics); + l2.push_back_or_overflow(task, &inject, &mut metrics); } // Spawn a task to steal from **our** queue diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs index 4e7c2453f25..c63285aad49 100644 --- a/tokio/src/runtime/tests/mod.rs +++ b/tokio/src/runtime/tests/mod.rs @@ -63,6 +63,7 @@ cfg_loom! { } cfg_not_loom! { + mod inject; mod queue; #[cfg(not(miri))] diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index ac80fa7332f..09f249e9ea4 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -27,14 +27,14 @@ fn metrics_batch() -> MetricsBatch { } #[test] -fn fits_256() { +fn fits_256_one_at_a_time() { let (_, mut local) = queue::local(); let inject = Inject::new(); let mut metrics = metrics_batch(); for _ in 0..256 { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut metrics); } cfg_metrics! { @@ -46,6 +46,44 @@ fn fits_256() { while local.pop().is_some() {} } +#[test] +fn fits_256_all_at_once() { + let (_, mut local) = queue::local(); + + let mut tasks = (0..256) + .map(|_| super::unowned(async {}).0) + .collect::>(); + local.push_back(tasks.drain(..)); + + let mut i = 0; + while local.pop().is_some() { + i += 1; + } + + assert_eq!(i, 256); +} + +#[test] +fn fits_256_all_in_chunks() { + let (_, mut local) = queue::local(); + + let mut tasks = (0..256) + .map(|_| super::unowned(async {}).0) + .collect::>(); + + local.push_back(tasks.drain(..10)); + local.push_back(tasks.drain(..100)); + local.push_back(tasks.drain(..46)); + local.push_back(tasks.drain(..100)); + + let mut i = 0; + while local.pop().is_some() { + i += 1; + } + + assert_eq!(i, 256); +} + #[test] fn overflow() { let (_, mut local) = queue::local(); @@ -54,7 +92,7 @@ fn overflow() { for _ in 0..257 { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut metrics); } cfg_metrics! { @@ -84,7 +122,7 @@ fn steal_batch() { for _ in 0..4 { let (task, _) = super::unowned(async {}); - local1.push_back(task, &inject, &mut metrics); + local1.push_back_or_overflow(task, &inject, &mut metrics); } assert!(steal1.steal_into(&mut local2, &mut metrics).is_some()); @@ -157,7 +195,7 @@ fn stress1() { for _ in 0..NUM_LOCAL { for _ in 0..NUM_PUSH { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut metrics); } for _ in 0..NUM_POP { @@ -215,7 +253,7 @@ fn stress2() { for i in 0..NUM_TASKS { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut metrics); if i % 128 == 0 && local.pop().is_some() { num_pop += 1; diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index 22bd5b3d707..0fe839a2f97 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -510,10 +510,20 @@ fn injection_queue_depth() { // First we need to block the runtime workers let (tx1, rx1) = std::sync::mpsc::channel(); let (tx2, rx2) = std::sync::mpsc::channel(); + let (tx3, rx3) = std::sync::mpsc::channel(); + let rx3 = Arc::new(Mutex::new(rx3)); rt.spawn(async move { rx1.recv().unwrap() }); rt.spawn(async move { rx2.recv().unwrap() }); + // Spawn some more to make sure there are items + for _ in 0..10 { + let rx = rx3.clone(); + rt.spawn(async move { + rx.lock().unwrap().recv().unwrap(); + }); + } + thread::spawn(move || { handle.spawn(async {}); }) @@ -522,7 +532,11 @@ fn injection_queue_depth() { let n = metrics.injection_queue_depth(); assert!(1 <= n, "{}", n); - assert!(3 >= n, "{}", n); + assert!(15 >= n, "{}", n); + + for _ in 0..10 { + tx3.send(()).unwrap(); + } tx1.send(()).unwrap(); tx2.send(()).unwrap();