From df96c16a4e5a1e4c4bd5e1944833da7af46adcdc Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 24 May 2023 15:38:12 -0700 Subject: [PATCH 01/22] rt(threaded): basic self-tuning of injection queue Each multi-threaded runtime worker prioritizes pulling tasks off of its local queue. Every so often, it checks the injection (global) queue for work submitted there. Previously, "every so often," was a constant "number of tasks polled" value. Tokio sets a default of 61, but allows users to configure this value. If workers are under load with tasks that are slow to poll, the injection queue can be starved. To prevent starvation in this case, this commit implements some basic self-tuning. The multi-threaded scheduler tracks the mean task poll time using an exponentially-weighted moving average. It then uses this value to pick an interval at which to check the injection queue. This commit is a first pass at adding self-tuning to the scheduler. There are other values in the scheduler that could benefit from self-tuning (e.g. the maintenance interval). Additionally, the current-thread scheduler could also benfit from self-tuning. However, we have reached the point where we should start investigating ways to unify logic in both schedulers. Adding self-tuning to the current-thread scheduler will be punted until after this unification. --- benches/rt_multi_threaded.rs | 65 ++++++++- tokio/src/runtime/builder.rs | 12 +- tokio/src/runtime/config.rs | 2 +- tokio/src/runtime/metrics/batch.rs | 20 ++- tokio/src/runtime/metrics/mock.rs | 3 +- tokio/src/runtime/scheduler/current_thread.rs | 27 +++- .../src/runtime/scheduler/multi_thread/mod.rs | 3 + .../runtime/scheduler/multi_thread/queue.rs | 16 +- .../runtime/scheduler/multi_thread/stats.rs | 138 ++++++++++++++++++ .../runtime/scheduler/multi_thread/worker.rs | 76 +++++++--- tokio/src/runtime/tests/queue.rs | 47 +++--- 11 files changed, 330 insertions(+), 79 deletions(-) create mode 100644 tokio/src/runtime/scheduler/multi_thread/stats.rs diff --git a/benches/rt_multi_threaded.rs b/benches/rt_multi_threaded.rs index 88553e4ab50..d5f11c5667d 100644 --- a/benches/rt_multi_threaded.rs +++ b/benches/rt_multi_threaded.rs @@ -6,12 +6,14 @@ use tokio::runtime::{self, Runtime}; use tokio::sync::oneshot; use bencher::{benchmark_group, benchmark_main, Bencher}; -use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::{mpsc, Arc}; +use std::time::{Duration, Instant}; const NUM_WORKERS: usize = 4; const NUM_SPAWN: usize = 10_000; +const STALL_DUR: Duration = Duration::from_micros(10); fn spawn_many_local(b: &mut Bencher) { let rt = rt(); @@ -57,19 +59,60 @@ fn spawn_many_remote_idle(b: &mut Bencher) { }); } -fn spawn_many_remote_busy(b: &mut Bencher) { +fn spawn_many_remote_busy1(b: &mut Bencher) { let rt = rt(); let rt_handle = rt.handle(); let mut handles = Vec::with_capacity(NUM_SPAWN); + let flag = Arc::new(AtomicBool::new(true)); // Spawn some tasks to keep the runtimes busy for _ in 0..(2 * NUM_WORKERS) { - rt.spawn(async { - loop { + let flag = flag.clone(); + rt.spawn(async move { + while flag.load(Relaxed) { tokio::task::yield_now().await; - std::thread::sleep(std::time::Duration::from_micros(10)); + stall(); + } + }); + } + + b.iter(|| { + for _ in 0..NUM_SPAWN { + handles.push(rt_handle.spawn(async {})); + } + + rt.block_on(async { + for handle in handles.drain(..) { + handle.await.unwrap(); } }); + }); + + flag.store(false, Relaxed); +} + +fn spawn_many_remote_busy2(b: &mut Bencher) { + const NUM_SPAWN: usize = 1_000; + + let rt = rt(); + let rt_handle = rt.handle(); + let mut handles = Vec::with_capacity(NUM_SPAWN); + let flag = Arc::new(AtomicBool::new(true)); + + // Spawn some tasks to keep the runtimes busy + for _ in 0..(NUM_WORKERS) { + let flag = flag.clone(); + fn iter(flag: Arc) { + tokio::spawn(async { + if flag.load(Relaxed) { + stall(); + iter(flag); + } + }); + } + rt.spawn(async { + iter(flag); + }); } b.iter(|| { @@ -83,6 +126,8 @@ fn spawn_many_remote_busy(b: &mut Bencher) { } }); }); + + flag.store(false, Relaxed); } fn yield_many(b: &mut Bencher) { @@ -193,11 +238,19 @@ fn rt() -> Runtime { .unwrap() } +fn stall() { + let now = Instant::now(); + while now.elapsed() < STALL_DUR { + std::thread::yield_now(); + } +} + benchmark_group!( scheduler, spawn_many_local, spawn_many_remote_idle, - spawn_many_remote_busy, + spawn_many_remote_busy1, + spawn_many_remote_busy2, ping_pong, yield_many, chained_spawn, diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index dda21a3ae27..641e7f728e1 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -82,7 +82,7 @@ pub struct Builder { pub(super) keep_alive: Option, /// How many ticks before pulling a task from the global/remote queue? - pub(super) global_queue_interval: u32, + pub(super) global_queue_interval: Option, /// How many ticks before yielding to the driver for timer and I/O events? pub(super) event_interval: u32, @@ -211,7 +211,7 @@ impl Builder { #[cfg(not(loom))] const EVENT_INTERVAL: u32 = 61; - Builder::new(Kind::CurrentThread, 31, EVENT_INTERVAL) + Builder::new(Kind::CurrentThread, EVENT_INTERVAL) } cfg_not_wasi! { @@ -222,7 +222,7 @@ impl Builder { #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] pub fn new_multi_thread() -> Builder { // The number `61` is fairly arbitrary. I believe this value was copied from golang. - Builder::new(Kind::MultiThread, 61, 61) + Builder::new(Kind::MultiThread, 61) } } @@ -230,7 +230,7 @@ impl Builder { /// values. /// /// Configuration methods can be chained on the return value. - pub(crate) fn new(kind: Kind, global_queue_interval: u32, event_interval: u32) -> Builder { + pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder { Builder { kind, @@ -266,7 +266,7 @@ impl Builder { // Defaults for these values depend on the scheduler kind, so we get them // as parameters. - global_queue_interval, + global_queue_interval: None, event_interval, seed_generator: RngSeedGenerator::new(RngSeed::new()), @@ -716,7 +716,7 @@ impl Builder { /// # } /// ``` pub fn global_queue_interval(&mut self, val: u32) -> &mut Self { - self.global_queue_interval = val; + self.global_queue_interval = Some(val); self } diff --git a/tokio/src/runtime/config.rs b/tokio/src/runtime/config.rs index 4af5eee472f..c42e4fe5a80 100644 --- a/tokio/src/runtime/config.rs +++ b/tokio/src/runtime/config.rs @@ -4,7 +4,7 @@ use crate::util::RngSeedGenerator; pub(crate) struct Config { /// How many ticks before pulling a task from the global/remote queue? - pub(crate) global_queue_interval: u32, + pub(crate) global_queue_interval: Option, /// How many ticks before yielding to the driver for timer and I/O events? pub(crate) event_interval: u32, diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index 296792d7fb8..1bb4e261f73 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -34,7 +34,7 @@ pub(crate) struct MetricsBatch { busy_duration_total: u64, /// Instant at which work last resumed (continued after park). - last_resume_time: Instant, + processing_scheduled_tasks_started_at: Instant, /// If `Some`, tracks poll times in nanoseconds poll_timer: Option, @@ -62,7 +62,7 @@ impl MetricsBatch { local_schedule_count: 0, overflow_count: 0, busy_duration_total: 0, - last_resume_time: now, + processing_scheduled_tasks_started_at: now, poll_timer: worker_metrics .poll_count_histogram .as_ref() @@ -106,11 +106,20 @@ impl MetricsBatch { } else { self.poll_count_on_last_park = self.poll_count; } + } + + /// Start processing a batch of tasks + pub(crate) fn start_processing_scheduled_tasks(&mut self) { + self.processing_scheduled_tasks_started_at = Instant::now(); + } - let busy_duration = self.last_resume_time.elapsed(); + /// Stop processing a batch of tasks + pub(crate) fn end_processing_scheduled_tasks(&mut self) { + let busy_duration = self.processing_scheduled_tasks_started_at.elapsed(); self.busy_duration_total += duration_as_u64(busy_duration); } + /// Start polling an individual task pub(crate) fn start_poll(&mut self) { self.poll_count += 1; @@ -119,6 +128,7 @@ impl MetricsBatch { } } + /// Stop polling an individual task pub(crate) fn end_poll(&mut self) { if let Some(poll_timer) = &mut self.poll_timer { let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed()); @@ -126,10 +136,6 @@ impl MetricsBatch { } } - pub(crate) fn returned_from_park(&mut self) { - self.last_resume_time = Instant::now(); - } - pub(crate) fn inc_local_schedule_count(&mut self) { self.local_schedule_count += 1; } diff --git a/tokio/src/runtime/metrics/mock.rs b/tokio/src/runtime/metrics/mock.rs index 4d5e7b5b152..8f8345c08b4 100644 --- a/tokio/src/runtime/metrics/mock.rs +++ b/tokio/src/runtime/metrics/mock.rs @@ -39,8 +39,9 @@ impl MetricsBatch { pub(crate) fn submit(&mut self, _to: &WorkerMetrics) {} pub(crate) fn about_to_park(&mut self) {} - pub(crate) fn returned_from_park(&mut self) {} pub(crate) fn inc_local_schedule_count(&mut self) {} + pub(crate) fn start_processing_scheduled_tasks(&mut self) {} + pub(crate) fn end_processing_scheduled_tasks(&mut self) {} pub(crate) fn start_poll(&mut self) {} pub(crate) fn end_poll(&mut self) {} } diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index 8b0dfaaf3d0..54679dce314 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -59,6 +59,9 @@ struct Core { /// Metrics batch metrics: MetricsBatch, + /// How often to check the global queue + global_queue_interval: u32, + /// True if a task panicked without being handled and the runtime is /// configured to shutdown on unhandled panic. unhandled_panic: bool, @@ -100,6 +103,11 @@ type Notified = task::Notified>; /// Initial queue capacity. const INITIAL_CAPACITY: usize = 64; +/// Used if none is specified. This is a temporary constant and will be removed +/// as we unify tuning logic between the multi-thread and current-thread +/// schedulers. +const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 61; + // Tracks the current CurrentThread. scoped_thread_local!(static CURRENT: Context); @@ -113,6 +121,11 @@ impl CurrentThread { ) -> (CurrentThread, Arc) { let worker_metrics = WorkerMetrics::from_config(&config); + // Get the configured global queue interval, or use the default. + let global_queue_interval = config + .global_queue_interval + .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL); + let handle = Arc::new(Handle { shared: Shared { inject: Inject::new(), @@ -132,6 +145,7 @@ impl CurrentThread { tick: 0, driver: Some(driver), metrics: MetricsBatch::new(&handle.shared.worker_metrics), + global_queue_interval, unhandled_panic: false, }))); @@ -255,7 +269,7 @@ impl Core { } fn next_task(&mut self, handle: &Handle) -> Option { - if self.tick % handle.shared.config.global_queue_interval == 0 { + if self.tick % self.global_queue_interval == 0 { handle .next_remote_task() .or_else(|| self.next_local_task(handle)) @@ -344,7 +358,6 @@ impl Context { }); core = c; - core.metrics.returned_from_park(); } if let Some(f) = &handle.shared.config.after_unpark { @@ -603,6 +616,8 @@ impl CoreGuard<'_> { pin!(future); + core.metrics.start_processing_scheduled_tasks(); + 'outer: loop { let handle = &context.handle; @@ -631,12 +646,16 @@ impl CoreGuard<'_> { let task = match entry { Some(entry) => entry, None => { + core.metrics.end_processing_scheduled_tasks(); + core = if did_defer_tasks() { context.park_yield(core, handle) } else { context.park(core, handle) }; + core.metrics.start_processing_scheduled_tasks(); + // Try polling the `block_on` future next continue 'outer; } @@ -651,9 +670,13 @@ impl CoreGuard<'_> { core = c; } + core.metrics.end_processing_scheduled_tasks(); + // Yield to the driver, this drives the timer and pulls any // pending I/O events. core = context.park_yield(core, handle); + + core.metrics.start_processing_scheduled_tasks(); } }); diff --git a/tokio/src/runtime/scheduler/multi_thread/mod.rs b/tokio/src/runtime/scheduler/multi_thread/mod.rs index 0a1c63de9b1..9b225335bcf 100644 --- a/tokio/src/runtime/scheduler/multi_thread/mod.rs +++ b/tokio/src/runtime/scheduler/multi_thread/mod.rs @@ -9,6 +9,9 @@ pub(crate) use handle::Handle; mod idle; use self::idle::Idle; +mod stats; +pub(crate) use stats::Stats; + mod park; pub(crate) use park::{Parker, Unparker}; diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index dd132fb9a6d..2a0ae833850 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -2,8 +2,8 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::Arc; +use crate::runtime::scheduler::multi_thread::Stats; use crate::runtime::task::{self, Inject}; -use crate::runtime::MetricsBatch; use std::mem::{self, MaybeUninit}; use std::ptr; @@ -186,7 +186,7 @@ impl Local { &mut self, mut task: task::Notified, inject: &Inject, - metrics: &mut MetricsBatch, + stats: &mut Stats, ) { let tail = loop { let head = self.inner.head.load(Acquire); @@ -206,7 +206,7 @@ impl Local { } else { // Push the current task and half of the queue into the // inject queue. - match self.push_overflow(task, real, tail, inject, metrics) { + match self.push_overflow(task, real, tail, inject, stats) { Ok(_) => return, // Lost the race, try again Err(v) => { @@ -253,7 +253,7 @@ impl Local { head: UnsignedShort, tail: UnsignedShort, inject: &Inject, - metrics: &mut MetricsBatch, + stats: &mut Stats, ) -> Result<(), task::Notified> { /// How many elements are we taking from the local queue. /// @@ -338,7 +338,7 @@ impl Local { inject.push_batch(batch_iter.chain(std::iter::once(task))); // Add 1 to factor in the task currently being scheduled. - metrics.incr_overflow_count(); + stats.incr_overflow_count(); Ok(()) } @@ -394,7 +394,7 @@ impl Steal { pub(crate) fn steal_into( &self, dst: &mut Local, - dst_metrics: &mut MetricsBatch, + dst_stats: &mut Stats, ) -> Option> { // Safety: the caller is the only thread that mutates `dst.tail` and // holds a mutable reference. @@ -420,8 +420,8 @@ impl Steal { return None; } - dst_metrics.incr_steal_count(n as u16); - dst_metrics.incr_steal_operations(); + dst_stats.incr_steal_count(n as u16); + dst_stats.incr_steal_operations(); // We are returning a task here n -= 1; diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs new file mode 100644 index 00000000000..00b0c9ea309 --- /dev/null +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -0,0 +1,138 @@ +use crate::runtime::{Config, MetricsBatch, WorkerMetrics}; + +use std::cmp; +use std::time::{Duration, Instant}; + +/// Per-worker statistics. This is used for both tuning the scheduler and +/// reporting runtime-level metrics/stats. +pub(crate) struct Stats { + /// The metrics batch used to report runtime-level metrics/stats to the + /// user. + batch: MetricsBatch, + + /// Instant at which work last resumed (continued after park). + /// + /// This duplicates the value stored in `MetricsBatch`. We will unify + /// `Stats` and `MetricsBatch` when we stabilize metrics. + processing_scheduled_tasks_started_at: Instant, + + /// Number of tasks polled in the batch of scheduled tasks + tasks_polled_in_batch: usize, + + /// Exponentially-weighted moving average of time spent polling scheduled a + /// task. + /// + /// Tracked in nanoseconds, stored as a f64 since that is what we use with + /// the EWMA calculations + task_poll_time_ewma: f64, +} + +// How to weigh each individual poll time, value is plucked from thin air. +const TASK_POLL_TIME_EWMA_ALPHA: f64 = 0.1; + +/// Ideally, we wouldn't go above this, value is plucked from thin air. +const TARGET_GLOBAL_QUEUE_INTERVAL: f64 = Duration::from_micros(200).as_nanos() as f64; + +/// Max value for the global queue interval. This is 2x the previous default +const MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 127; + +/// This is the previous default +const TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 61; + +impl Stats { + pub(crate) fn new(worker_metrics: &WorkerMetrics) -> Stats { + // Seed the value with what we hope to see. + let task_poll_time_ewma = + TARGET_GLOBAL_QUEUE_INTERVAL / TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL as f64; + + Stats { + batch: MetricsBatch::new(worker_metrics), + processing_scheduled_tasks_started_at: Instant::now(), + tasks_polled_in_batch: 0, + task_poll_time_ewma: task_poll_time_ewma, + } + } + + pub(crate) fn tuned_global_queue_interval(&self, config: &Config) -> u32 { + // If an interval is explicitly set, don't tune. + if let Some(configured) = config.global_queue_interval { + return configured; + } + + // As of Rust 1.45, casts from f64 -> u32 are saturating, which is fine here. + let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32; + + cmp::max( + 1, + cmp::min( + MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL, + tasks_per_interval, + ), + ) + } + + pub(crate) fn submit(&mut self, to: &WorkerMetrics) { + self.batch.submit(to); + } + + pub(crate) fn about_to_park(&mut self) { + self.batch.about_to_park(); + } + + pub(crate) fn inc_local_schedule_count(&mut self) { + self.batch.inc_local_schedule_count(); + } + + pub(crate) fn start_processing_scheduled_tasks(&mut self) { + self.batch.start_processing_scheduled_tasks(); + + self.processing_scheduled_tasks_started_at = Instant::now(); + self.tasks_polled_in_batch = 0; + } + + pub(crate) fn end_processing_scheduled_tasks(&mut self) { + self.batch.end_processing_scheduled_tasks(); + + // Update the EWMA task poll time + if self.tasks_polled_in_batch > 0 { + let now = Instant::now(); + + // If we "overflow" this conversion, we have bigger problems than + // slightly off stats. + let elapsed = (now - self.processing_scheduled_tasks_started_at).as_nanos() as f64; + let num_polls = self.tasks_polled_in_batch as f64; + + // Calculate the mean poll duration for a single task in the batch + let mean_poll_duration = elapsed / num_polls; + + // Compute the alpha weighted by the number of tasks polled this batch. + let weighted_alpha = 1.0 - (1.0 - TASK_POLL_TIME_EWMA_ALPHA).powf(num_polls); + + // Now compute the new weighted average task poll time. + self.task_poll_time_ewma = weighted_alpha * mean_poll_duration + + (1.0 - weighted_alpha) * self.task_poll_time_ewma; + } + } + + pub(crate) fn start_poll(&mut self) { + self.batch.start_poll(); + + self.tasks_polled_in_batch += 1; + } + + pub(crate) fn end_poll(&mut self) { + self.batch.end_poll(); + } + + pub(crate) fn incr_steal_count(&mut self, by: u16) { + self.batch.incr_steal_count(by); + } + + pub(crate) fn incr_steal_operations(&mut self) { + self.batch.incr_steal_operations(); + } + + pub(crate) fn incr_overflow_count(&mut self) { + self.batch.incr_overflow_count(); + } +} diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index e2bbb643db7..3e7e91538f0 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -59,10 +59,12 @@ use crate::loom::sync::{Arc, Mutex}; use crate::runtime; use crate::runtime::context; -use crate::runtime::scheduler::multi_thread::{queue, Counters, Handle, Idle, Parker, Unparker}; +use crate::runtime::scheduler::multi_thread::{ + queue, Counters, Handle, Idle, Parker, Stats, Unparker, +}; use crate::runtime::task::{Inject, OwnedTasks}; use crate::runtime::{ - blocking, coop, driver, scheduler, task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics, + blocking, coop, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics, }; use crate::util::atomic_cell::AtomicCell; use crate::util::rand::{FastRand, RngSeedGenerator}; @@ -114,8 +116,11 @@ struct Core { /// borrow checker happy. park: Option, - /// Batching metrics so they can be submitted to RuntimeMetrics. - metrics: MetricsBatch, + /// Per-worker runtime stats + stats: Stats, + + /// How often to check the global queue + global_queue_interval: u32, /// Fast random number generator. rand: FastRand, @@ -220,6 +225,7 @@ pub(super) fn create( let park = park.clone(); let unpark = park.unpark(); let metrics = WorkerMetrics::from_config(&config); + let stats = Stats::new(&metrics); cores.push(Box::new(Core { tick: 0, @@ -229,7 +235,8 @@ pub(super) fn create( is_searching: false, is_shutdown: false, park: Some(park), - metrics: MetricsBatch::new(&metrics), + global_queue_interval: stats.tuned_global_queue_interval(&config), + stats, rand: FastRand::new(config.seed_generator.next_seed()), })); @@ -437,6 +444,10 @@ impl Context { // a task that had the LIFO slot disabled. self.reset_lifo_enabled(&mut core); + // Start as "processing" tasks as polling tasks from the local queue + // will be one of the first things we do. + core.stats.start_processing_scheduled_tasks(); + while !core.is_shutdown { self.assert_lifo_enabled_is_correct(&core); @@ -452,9 +463,14 @@ impl Context { continue; } + // We consumed all work in the queues and will start searching for work. + core.stats.end_processing_scheduled_tasks(); + // There is no more **local** work to process, try to steal work // from other workers. if let Some(task) = core.steal_work(&self.worker) { + // Found work, switch back to processing + core.stats.start_processing_scheduled_tasks(); core = self.run_task(task, core)?; } else { // Wait for work @@ -483,7 +499,7 @@ impl Context { self.assert_lifo_enabled_is_correct(&core); // Make the core available to the runtime context - core.metrics.start_poll(); + core.stats.start_poll(); *self.core.borrow_mut() = Some(core); // Run the task @@ -506,29 +522,25 @@ impl Context { } }; - // If task poll times is enabled, measure the poll time. Note - // that, if the `core` is stolen, this means `block_in_place` - // was called, turning the poll into a "blocking op". In this - // case, we don't want to measure the poll time as it doesn't - // really count as an async poll anymore. - core.metrics.end_poll(); - // Check for a task in the LIFO slot let task = match core.lifo_slot.take() { Some(task) => task, None => { self.reset_lifo_enabled(&mut core); + core.stats.end_poll(); return Ok(core); } }; if !coop::has_budget_remaining() { + core.stats.end_poll(); + // Not enough budget left to run the LIFO task, push it to // the back of the queue and return. core.run_queue.push_back_or_overflow( task, self.worker.inject(), - &mut core.metrics, + &mut core.stats, ); // If we hit this point, the LIFO slot should be enabled. // There is no need to reset it. @@ -553,7 +565,6 @@ impl Context { } // Run the LIFO task, then loop - core.metrics.start_poll(); *self.core.borrow_mut() = Some(core); let task = self.worker.handle.shared.owned.assert_owner(task); task.run(); @@ -576,12 +587,16 @@ impl Context { if core.tick % self.worker.handle.shared.config.event_interval == 0 { super::counters::inc_num_maintenance(); + core.stats.end_processing_scheduled_tasks(); + // Call `park` with a 0 timeout. This enables the I/O driver, timer, ... // to run without actually putting the thread to sleep. core = self.park_timeout(core, Some(Duration::from_millis(0))); // Run regularly scheduled maintenance core.maintenance(&self.worker); + + core.stats.start_processing_scheduled_tasks(); } core @@ -605,9 +620,8 @@ impl Context { if core.transition_to_parked(&self.worker) { while !core.is_shutdown { - core.metrics.about_to_park(); + core.stats.about_to_park(); core = self.park_timeout(core, None); - core.metrics.returned_from_park(); // Run regularly scheduled maintenance core.maintenance(&self.worker); @@ -666,7 +680,10 @@ impl Core { /// Return the next notified task available to this worker. fn next_task(&mut self, worker: &Worker) -> Option { - if self.tick % worker.handle.shared.config.global_queue_interval == 0 { + if self.tick % self.global_queue_interval == 0 { + // Update the global queue interval, if needed + self.tune_global_queue_interval(worker); + worker.inject().pop().or_else(|| self.next_local_task()) } else { let maybe_task = self.next_local_task(); @@ -733,7 +750,7 @@ impl Core { let target = &worker.handle.shared.remotes[i]; if let Some(task) = target .steal - .steal_into(&mut self.run_queue, &mut self.metrics) + .steal_into(&mut self.run_queue, &mut self.stats) { return Some(task); } @@ -813,7 +830,7 @@ impl Core { /// Runs maintenance work such as checking the pool's state. fn maintenance(&mut self, worker: &Worker) { - self.metrics + self.stats .submit(&worker.handle.shared.worker_metrics[worker.index]); if !self.is_shutdown { @@ -828,7 +845,7 @@ impl Core { // Signal to all tasks to shut down. worker.handle.shared.owned.close_and_shutdown_all(); - self.metrics + self.stats .submit(&worker.handle.shared.worker_metrics[worker.index]); } @@ -842,6 +859,17 @@ impl Core { park.shutdown(&handle.driver); } + + fn tune_global_queue_interval(&mut self, worker: &Worker) { + let next = self + .stats + .tuned_global_queue_interval(&worker.handle.shared.config); + + // Smooth out jitter + if self.global_queue_interval.abs_diff(next) > 2 { + self.global_queue_interval = next; + } + } } impl Worker { @@ -888,7 +916,7 @@ impl Handle { } fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) { - core.metrics.inc_local_schedule_count(); + core.stats.inc_local_schedule_count(); // Spawning from the worker thread. If scheduling a "yield" then the // task must always be pushed to the back of the queue, enabling other @@ -896,7 +924,7 @@ impl Handle { // flexibility and the task may go to the front of the queue. let should_notify = if is_yield || !core.lifo_enabled { core.run_queue - .push_back_or_overflow(task, &self.shared.inject, &mut core.metrics); + .push_back_or_overflow(task, &self.shared.inject, &mut core.stats); true } else { // Push to the LIFO slot @@ -905,7 +933,7 @@ impl Handle { if let Some(prev) = prev { core.run_queue - .push_back_or_overflow(prev, &self.shared.inject, &mut core.metrics); + .push_back_or_overflow(prev, &self.shared.inject, &mut core.stats); } core.lifo_slot = Some(task); diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index 09f249e9ea4..2e42f1bbb9e 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -1,18 +1,17 @@ -use crate::runtime::scheduler::multi_thread::queue; +use crate::runtime::scheduler::multi_thread::{queue, Stats}; use crate::runtime::task::{self, Inject, Schedule, Task}; -use crate::runtime::MetricsBatch; use std::thread; use std::time::Duration; #[allow(unused)] macro_rules! assert_metrics { - ($metrics:ident, $field:ident == $v:expr) => {{ + ($stats:ident, $field:ident == $v:expr) => {{ use crate::runtime::WorkerMetrics; use std::sync::atomic::Ordering::Relaxed; let worker = WorkerMetrics::new(); - $metrics.submit(&worker); + $stats.submit(&worker); let expect = $v; let actual = worker.$field.load(Relaxed); @@ -21,24 +20,24 @@ macro_rules! assert_metrics { }}; } -fn metrics_batch() -> MetricsBatch { +fn new_stats() -> Stats { use crate::runtime::WorkerMetrics; - MetricsBatch::new(&WorkerMetrics::new()) + Stats::new(&WorkerMetrics::new()) } #[test] fn fits_256_one_at_a_time() { let (_, mut local) = queue::local(); let inject = Inject::new(); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); for _ in 0..256 { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); } cfg_metrics! { - assert_metrics!(metrics, overflow_count == 0); + assert_metrics!(stats, overflow_count == 0); } assert!(inject.pop().is_none()); @@ -88,15 +87,15 @@ fn fits_256_all_in_chunks() { fn overflow() { let (_, mut local) = queue::local(); let inject = Inject::new(); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); for _ in 0..257 { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); } cfg_metrics! { - assert_metrics!(metrics, overflow_count == 1); + assert_metrics!(stats, overflow_count == 1); } let mut n = 0; @@ -114,7 +113,7 @@ fn overflow() { #[test] fn steal_batch() { - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let (steal1, mut local1) = queue::local(); let (_, mut local2) = queue::local(); @@ -122,13 +121,13 @@ fn steal_batch() { for _ in 0..4 { let (task, _) = super::unowned(async {}); - local1.push_back_or_overflow(task, &inject, &mut metrics); + local1.push_back_or_overflow(task, &inject, &mut stats); } - assert!(steal1.steal_into(&mut local2, &mut metrics).is_some()); + assert!(steal1.steal_into(&mut local2, &mut stats).is_some()); cfg_metrics! { - assert_metrics!(metrics, steal_count == 2); + assert_metrics!(stats, steal_count == 2); } for _ in 0..1 { @@ -160,19 +159,19 @@ fn stress1() { const NUM_PUSH: usize = normal_or_miri(500, 10); const NUM_POP: usize = normal_or_miri(250, 10); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); for _ in 0..NUM_ITER { let (steal, mut local) = queue::local(); let inject = Inject::new(); let th = thread::spawn(move || { - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let (_, mut local) = queue::local(); let mut n = 0; for _ in 0..NUM_STEAL { - if steal.steal_into(&mut local, &mut metrics).is_some() { + if steal.steal_into(&mut local, &mut stats).is_some() { n += 1; } @@ -184,7 +183,7 @@ fn stress1() { } cfg_metrics! { - assert_metrics!(metrics, steal_count == n as _); + assert_metrics!(stats, steal_count == n as _); } n @@ -195,7 +194,7 @@ fn stress1() { for _ in 0..NUM_LOCAL { for _ in 0..NUM_PUSH { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); } for _ in 0..NUM_POP { @@ -223,14 +222,14 @@ fn stress2() { const NUM_TASKS: usize = normal_or_miri(1_000_000, 50); const NUM_STEAL: usize = normal_or_miri(1_000, 10); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); for _ in 0..NUM_ITER { let (steal, mut local) = queue::local(); let inject = Inject::new(); let th = thread::spawn(move || { - let mut stats = metrics_batch(); + let mut stats = new_stats(); let (_, mut local) = queue::local(); let mut n = 0; @@ -253,7 +252,7 @@ fn stress2() { for i in 0..NUM_TASKS { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); if i % 128 == 0 && local.pop().is_some() { num_pop += 1; From b7af66157f3f9de0ba8085707c15865ce4848966 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 25 May 2023 08:18:15 -0700 Subject: [PATCH 02/22] fix loom tests --- tokio/src/runtime/tests/loom_queue.rs | 47 +++++++++++++-------------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/tokio/src/runtime/tests/loom_queue.rs b/tokio/src/runtime/tests/loom_queue.rs index 58c17ad65c2..eb85807a6b0 100644 --- a/tokio/src/runtime/tests/loom_queue.rs +++ b/tokio/src/runtime/tests/loom_queue.rs @@ -1,12 +1,11 @@ -use crate::runtime::scheduler::multi_thread::queue; +use crate::runtime::scheduler::multi_thread::{queue, Stats}; use crate::runtime::task::Inject; use crate::runtime::tests::NoopSchedule; -use crate::runtime::MetricsBatch; use loom::thread; -fn metrics_batch() -> MetricsBatch { - MetricsBatch::new(&crate::runtime::WorkerMetrics::new()) +fn new_stats() -> Stats { + Stats::new(&crate::runtime::WorkerMetrics::new()) } #[test] @@ -14,15 +13,15 @@ fn basic() { loom::model(|| { let (steal, mut local) = queue::local(); let inject = Inject::new(); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let th = thread::spawn(move || { - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let (_, mut local) = queue::local(); let mut n = 0; for _ in 0..3 { - if steal.steal_into(&mut local, &mut metrics).is_some() { + if steal.steal_into(&mut local, &mut stats).is_some() { n += 1; } @@ -39,7 +38,7 @@ fn basic() { for _ in 0..2 { for _ in 0..2 { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); } if local.pop().is_some() { @@ -48,7 +47,7 @@ fn basic() { // Push another task let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); while local.pop().is_some() { n += 1; @@ -70,14 +69,14 @@ fn steal_overflow() { loom::model(|| { let (steal, mut local) = queue::local(); let inject = Inject::new(); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let th = thread::spawn(move || { - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let (_, mut local) = queue::local(); let mut n = 0; - if steal.steal_into(&mut local, &mut metrics).is_some() { + if steal.steal_into(&mut local, &mut stats).is_some() { n += 1; } @@ -92,7 +91,7 @@ fn steal_overflow() { // push a task, pop a task let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); if local.pop().is_some() { n += 1; @@ -100,7 +99,7 @@ fn steal_overflow() { for _ in 0..6 { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); } n += th.join().unwrap(); @@ -122,10 +121,10 @@ fn multi_stealer() { const NUM_TASKS: usize = 5; fn steal_tasks(steal: queue::Steal) -> usize { - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let (_, mut local) = queue::local(); - if steal.steal_into(&mut local, &mut metrics).is_none() { + if steal.steal_into(&mut local, &mut stats).is_none() { return 0; } @@ -141,12 +140,12 @@ fn multi_stealer() { loom::model(|| { let (steal, mut local) = queue::local(); let inject = Inject::new(); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); // Push work for _ in 0..NUM_TASKS { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); } let th1 = { @@ -176,7 +175,7 @@ fn multi_stealer() { #[test] fn chained_steal() { loom::model(|| { - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let (s1, mut l1) = queue::local(); let (s2, mut l2) = queue::local(); let inject = Inject::new(); @@ -184,17 +183,17 @@ fn chained_steal() { // Load up some tasks for _ in 0..4 { let (task, _) = super::unowned(async {}); - l1.push_back_or_overflow(task, &inject, &mut metrics); + l1.push_back_or_overflow(task, &inject, &mut stats); let (task, _) = super::unowned(async {}); - l2.push_back_or_overflow(task, &inject, &mut metrics); + l2.push_back_or_overflow(task, &inject, &mut stats); } // Spawn a task to steal from **our** queue let th = thread::spawn(move || { - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let (_, mut local) = queue::local(); - s1.steal_into(&mut local, &mut metrics); + s1.steal_into(&mut local, &mut stats); while local.pop().is_some() {} }); @@ -202,7 +201,7 @@ fn chained_steal() { // Drain our tasks, then attempt to steal while l1.pop().is_some() {} - s2.steal_into(&mut l1, &mut metrics); + s2.steal_into(&mut l1, &mut stats); th.join().unwrap(); From e93894bb2a5f45ee6602e146e16b6f373ec5b48a Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 25 May 2023 08:21:07 -0700 Subject: [PATCH 03/22] fix build on MSRV --- tokio/src/runtime/scheduler/multi_thread/worker.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 3e7e91538f0..19ab3580acb 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -866,7 +866,9 @@ impl Core { .tuned_global_queue_interval(&worker.handle.shared.config); // Smooth out jitter - if self.global_queue_interval.abs_diff(next) > 2 { + // + // `u32::abs_diff` is not available on Tokio's MSRV. + if abs_diff(self.global_queue_interval, next) > 2 { self.global_queue_interval = next; } } @@ -1047,3 +1049,11 @@ cfg_metrics! { } } } + +fn abs_diff(a: u32, b: u32) -> u32 { + if a > b { + a - b + } else { + b - a + } +} From 987d081041997ce3d8c4ea378de455e926cb1871 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 25 May 2023 08:25:19 -0700 Subject: [PATCH 04/22] fix clippy --- tokio/src/runtime/scheduler/multi_thread/stats.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs index 00b0c9ea309..5eacb12d381 100644 --- a/tokio/src/runtime/scheduler/multi_thread/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -49,7 +49,7 @@ impl Stats { batch: MetricsBatch::new(worker_metrics), processing_scheduled_tasks_started_at: Instant::now(), tasks_polled_in_batch: 0, - task_poll_time_ewma: task_poll_time_ewma, + task_poll_time_ewma, } } From 6db32b24ce40c465809ec9348fb5985ab3725cda Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 25 May 2023 08:48:50 -0700 Subject: [PATCH 05/22] avoid tuning in loom tests --- tokio/src/runtime/tests/loom_pool.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tokio/src/runtime/tests/loom_pool.rs b/tokio/src/runtime/tests/loom_pool.rs index b3ecd431240..470a424668e 100644 --- a/tokio/src/runtime/tests/loom_pool.rs +++ b/tokio/src/runtime/tests/loom_pool.rs @@ -349,6 +349,8 @@ mod group_d { fn mk_pool(num_threads: usize) -> Runtime { runtime::Builder::new_multi_thread() .worker_threads(num_threads) + // Set the intervals to avoid tuning logic + .event_interval(2) .build() .unwrap() } From 2e597456ee7b2e6ad6b5fcbddb828cda3eee8a34 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 25 May 2023 10:20:18 -0700 Subject: [PATCH 06/22] add a test --- tokio/tests/rt_threaded.rs | 77 +++++++++++++++++++++++++++++++++++++- 1 file changed, 76 insertions(+), 1 deletion(-) diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index feb2f9f8cca..aa114b5ffc2 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -10,10 +10,11 @@ use tokio_test::{assert_err, assert_ok}; use futures::future::poll_fn; use std::future::Future; use std::pin::Pin; -use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::{mpsc, Arc, Mutex}; use std::task::{Context, Poll, Waker}; +use std::time::Duration; macro_rules! cfg_metrics { ($($t:tt)*) => { @@ -588,6 +589,80 @@ async fn test_block_in_place4() { tokio::task::block_in_place(|| {}); } +#[test] +fn test_tuning() { + let rt = runtime::Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(); + + fn iter(flag: Arc, counter: Arc, stall: bool) { + if flag.load(Relaxed) { + if stall { + std::thread::sleep(Duration::from_micros(5)); + } + + counter.fetch_add(1, Relaxed); + tokio::spawn(async move { iter(flag, counter, stall) }); + } + } + + let flag = Arc::new(AtomicBool::new(true)); + let counter = Arc::new(AtomicUsize::new(61)); + let interval = Arc::new(AtomicUsize::new(61)); + + { + let flag = flag.clone(); + let counter = counter.clone(); + rt.spawn(async move { iter(flag, counter, true) }); + } + + // Now, hammer the injection queue until the interval drops. + while interval.load(Relaxed) > 8 { + let counter = counter.clone(); + let interval = interval.clone(); + + rt.spawn(async move { + let prev = counter.swap(0, Relaxed); + interval.store(prev, Relaxed); + }); + } + + flag.store(false, Relaxed); + + let w = Arc::downgrade(&interval); + drop(interval); + + while w.strong_count() > 0 { + std::thread::sleep(Duration::from_millis(100)); + } + + // Now, run it again with a faster task + let flag = Arc::new(AtomicBool::new(true)); + // Set it high, we know it shouldn't ever really be this high + let counter = Arc::new(AtomicUsize::new(10_000)); + let interval = Arc::new(AtomicUsize::new(10_000)); + + { + let flag = flag.clone(); + let counter = counter.clone(); + rt.spawn(async move { iter(flag, counter, false) }); + } + + // Now, hammer the injection queue until the interval reaches the expected range. + while interval.load(Relaxed) > 1_000 || interval.load(Relaxed) <= 32 { + let counter = counter.clone(); + let interval = interval.clone(); + + rt.spawn(async move { + let prev = counter.swap(0, Relaxed); + interval.store(prev, Relaxed); + }); + } + + flag.store(false, Relaxed); +} + fn rt() -> runtime::Runtime { runtime::Runtime::new().unwrap() } From 9ff9218639939b2b548525f45452679136dd2fbc Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 25 May 2023 10:22:30 -0700 Subject: [PATCH 07/22] try increasing loom scope --- .github/workflows/loom.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/loom.yml b/.github/workflows/loom.yml index a51829927de..417c3b470fb 100644 --- a/.github/workflows/loom.yml +++ b/.github/workflows/loom.yml @@ -28,13 +28,13 @@ jobs: - scope: --skip loom_pool max_preemptions: 2 - scope: loom_pool::group_a - max_preemptions: 1 + max_preemptions: 2 - scope: loom_pool::group_b max_preemptions: 2 - scope: loom_pool::group_c - max_preemptions: 1 + max_preemptions: 2 - scope: loom_pool::group_d - max_preemptions: 1 + max_preemptions: 2 - scope: time::driver max_preemptions: 2 steps: From 073412bde63034dbcd0c019f5cac96589e553547 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 25 May 2023 13:15:30 -0700 Subject: [PATCH 08/22] Revert "try increasing loom scope" This reverts commit 9ff9218639939b2b548525f45452679136dd2fbc. --- .github/workflows/loom.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/loom.yml b/.github/workflows/loom.yml index 417c3b470fb..a51829927de 100644 --- a/.github/workflows/loom.yml +++ b/.github/workflows/loom.yml @@ -28,13 +28,13 @@ jobs: - scope: --skip loom_pool max_preemptions: 2 - scope: loom_pool::group_a - max_preemptions: 2 + max_preemptions: 1 - scope: loom_pool::group_b max_preemptions: 2 - scope: loom_pool::group_c - max_preemptions: 2 + max_preemptions: 1 - scope: loom_pool::group_d - max_preemptions: 2 + max_preemptions: 1 - scope: time::driver max_preemptions: 2 steps: From d07794e1b9155cfe7f421ef0e0c38363af38c34a Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 25 May 2023 13:54:15 -0700 Subject: [PATCH 09/22] disable tuning test when running cross tests --- .github/workflows/ci.yml | 6 ++++-- tokio/tests/rt_threaded.rs | 29 +++++++++++++++++++++-------- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9701134c6ed..5c6fd6e9e6e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -345,11 +345,13 @@ jobs: matrix: include: - target: i686-unknown-linux-gnu - rustflags: --cfg tokio_taskdump + rustflags: --cfg tokio_cross --cfg tokio_taskdump - target: arm-unknown-linux-gnueabihf + rustflags: --cfg tokio_cross - target: armv7-unknown-linux-gnueabihf + rustflags: --cfg tokio_cross - target: aarch64-unknown-linux-gnu - rustflags: --cfg tokio_taskdump + rustflags: --cfg tokio_cross --cfg tokio_taskdump # Run a platform without AtomicU64 and no const Mutex::new - target: arm-unknown-linux-gnueabihf diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index aa114b5ffc2..07b137028c6 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -590,6 +590,7 @@ async fn test_block_in_place4() { } #[test] +#[cfg(not(tokio_cross))] fn test_tuning() { let rt = runtime::Builder::new_multi_thread() .worker_threads(1) @@ -618,14 +619,20 @@ fn test_tuning() { } // Now, hammer the injection queue until the interval drops. + let mut i = 0; while interval.load(Relaxed) > 8 { + i += 1; let counter = counter.clone(); let interval = interval.clone(); - rt.spawn(async move { - let prev = counter.swap(0, Relaxed); - interval.store(prev, Relaxed); - }); + if i <= 5_000 { + rt.spawn(async move { + let prev = counter.swap(0, Relaxed); + interval.store(prev, Relaxed); + }); + } + + std::thread::yield_now(); } flag.store(false, Relaxed); @@ -650,14 +657,20 @@ fn test_tuning() { } // Now, hammer the injection queue until the interval reaches the expected range. + let mut i = 0; while interval.load(Relaxed) > 1_000 || interval.load(Relaxed) <= 32 { + i += 1; let counter = counter.clone(); let interval = interval.clone(); - rt.spawn(async move { - let prev = counter.swap(0, Relaxed); - interval.store(prev, Relaxed); - }); + if i <= 5_000 { + rt.spawn(async move { + let prev = counter.swap(0, Relaxed); + interval.store(prev, Relaxed); + }); + } + + std::thread::yield_now(); } flag.store(false, Relaxed); From 266a11d6f05e65abdd51f0f42e8ef311249c9955 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 25 May 2023 14:03:38 -0700 Subject: [PATCH 10/22] fix warnings --- tokio/tests/rt_threaded.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 07b137028c6..d76edee2c5e 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -11,10 +11,9 @@ use futures::future::poll_fn; use std::future::Future; use std::pin::Pin; use std::sync::atomic::Ordering::Relaxed; -use std::sync::atomic::{AtomicBool, AtomicUsize}; +use std::sync::atomic::{AtomicUsize}; use std::sync::{mpsc, Arc, Mutex}; use std::task::{Context, Poll, Waker}; -use std::time::Duration; macro_rules! cfg_metrics { ($($t:tt)*) => { @@ -592,6 +591,9 @@ async fn test_block_in_place4() { #[test] #[cfg(not(tokio_cross))] fn test_tuning() { + use std::sync::atomic::AtomicBool; + use std::time::Duration; + let rt = runtime::Builder::new_multi_thread() .worker_threads(1) .build() From d91022d55417941828ef3448eba8ed68965d3921 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 25 May 2023 14:09:11 -0700 Subject: [PATCH 11/22] fmt --- tokio/tests/rt_threaded.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index d76edee2c5e..8d6093535bc 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -10,8 +10,8 @@ use tokio_test::{assert_err, assert_ok}; use futures::future::poll_fn; use std::future::Future; use std::pin::Pin; +use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; -use std::sync::atomic::{AtomicUsize}; use std::sync::{mpsc, Arc, Mutex}; use std::task::{Context, Poll, Waker}; From 034096bccf6e47a222dae9d4aa33a2868be6a57f Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 25 May 2023 14:59:58 -0700 Subject: [PATCH 12/22] tweak test --- tokio/tests/rt_threaded.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 8d6093535bc..18795bf0187 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -623,18 +623,19 @@ fn test_tuning() { // Now, hammer the injection queue until the interval drops. let mut i = 0; while interval.load(Relaxed) > 8 { - i += 1; let counter = counter.clone(); let interval = interval.clone(); if i <= 5_000 { + i += 1; rt.spawn(async move { let prev = counter.swap(0, Relaxed); interval.store(prev, Relaxed); }); + std::thread::yield_now(); + } else { + std::thread::sleep(Duration::from_micros(500)); } - - std::thread::yield_now(); } flag.store(false, Relaxed); @@ -643,7 +644,7 @@ fn test_tuning() { drop(interval); while w.strong_count() > 0 { - std::thread::sleep(Duration::from_millis(100)); + std::thread::sleep(Duration::from_micros(500)); } // Now, run it again with a faster task @@ -661,18 +662,20 @@ fn test_tuning() { // Now, hammer the injection queue until the interval reaches the expected range. let mut i = 0; while interval.load(Relaxed) > 1_000 || interval.load(Relaxed) <= 32 { - i += 1; let counter = counter.clone(); let interval = interval.clone(); if i <= 5_000 { + i += 1; rt.spawn(async move { let prev = counter.swap(0, Relaxed); interval.store(prev, Relaxed); }); - } - std::thread::yield_now(); + std::thread::yield_now(); + } else { + std::thread::sleep(Duration::from_micros(500)); + } } flag.store(false, Relaxed); From 0dc2b8f717e52d73ed716d32872f8e9790b0ae65 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 26 May 2023 08:05:40 -0700 Subject: [PATCH 13/22] try again --- .github/workflows/ci.yml | 2 +- tokio/tests/rt_threaded.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5c6fd6e9e6e..f372bbf8dab 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -355,7 +355,7 @@ jobs: # Run a platform without AtomicU64 and no const Mutex::new - target: arm-unknown-linux-gnueabihf - rustflags: --cfg tokio_no_const_mutex_new + rustflags: --cfg tokio_cross --cfg tokio_no_const_mutex_new steps: - uses: actions/checkout@v3 - name: Install Rust stable diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 18795bf0187..df168ec3869 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -35,6 +35,7 @@ fn single_thread() { } #[test] +#[cfg(not(tokio_cross))] fn many_oneshot_futures() { // used for notifying the main thread const NUM: usize = 1_000; From e3ab179e13fea10a8b858f72e7b2d77418bb3b02 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 26 May 2023 08:24:45 -0700 Subject: [PATCH 14/22] try again From 4350c471f63de10f4b8bd6a0d2fd17fb20cabaa2 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 26 May 2023 08:50:50 -0700 Subject: [PATCH 15/22] fix constant to match what it was before --- tokio/src/runtime/scheduler/current_thread.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index 54679dce314..fef3a7f9f03 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -106,7 +106,7 @@ const INITIAL_CAPACITY: usize = 64; /// Used if none is specified. This is a temporary constant and will be removed /// as we unify tuning logic between the multi-thread and current-thread /// schedulers. -const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 61; +const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31; // Tracks the current CurrentThread. scoped_thread_local!(static CURRENT: Context); From 0bb5765339fe263ab2975b6543c49a39a305462b Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 26 May 2023 13:49:44 -0700 Subject: [PATCH 16/22] try again From eb4a7d314d01c1cc43bfe592ffdf602a0086f719 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 26 May 2023 13:56:04 -0700 Subject: [PATCH 17/22] try again From b9eee6aa20308e3db74b42dcdfda40af14edf382 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 26 May 2023 14:31:08 -0700 Subject: [PATCH 18/22] tweak ci --- .github/workflows/ci.yml | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f372bbf8dab..bf8009407fc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -345,17 +345,15 @@ jobs: matrix: include: - target: i686-unknown-linux-gnu - rustflags: --cfg tokio_cross --cfg tokio_taskdump + rustflags: --cfg tokio_taskdump - target: arm-unknown-linux-gnueabihf - rustflags: --cfg tokio_cross - target: armv7-unknown-linux-gnueabihf - rustflags: --cfg tokio_cross - target: aarch64-unknown-linux-gnu - rustflags: --cfg tokio_cross --cfg tokio_taskdump + rustflags: --cfg tokio_taskdump # Run a platform without AtomicU64 and no const Mutex::new - target: arm-unknown-linux-gnueabihf - rustflags: --cfg tokio_cross --cfg tokio_no_const_mutex_new + rustflags: --cfg tokio_no_const_mutex_new steps: - uses: actions/checkout@v3 - name: Install Rust stable @@ -368,14 +366,14 @@ jobs: # First run with all features (including parking_lot) - run: cross test -p tokio --all-features --target ${{ matrix.target }} --tests env: - RUSTFLAGS: --cfg tokio_unstable -Dwarnings --cfg tokio_no_ipv6 ${{ matrix.rustflags }} + RUSTFLAGS: --cfg tokio_unstable -Dwarnings --cfg tokio_no_ipv6 --cfg tokio_cross ${{ matrix.rustflags }} # Now run without parking_lot - name: Remove `parking_lot` from `full` feature run: sed -i '0,/parking_lot/{/parking_lot/d;}' tokio/Cargo.toml # The `tokio_no_parking_lot` cfg is here to ensure the `sed` above does not silently break. - run: cross test -p tokio --features full,test-util --target ${{ matrix.target }} --tests env: - RUSTFLAGS: --cfg tokio_unstable -Dwarnings --cfg tokio_no_ipv6 --cfg tokio_no_parking_lot ${{ matrix.rustflags }} + RUSTFLAGS: --cfg tokio_unstable -Dwarnings --cfg tokio_no_ipv6 --cfg tokio_no_parking_lot --cfg tokio_cross ${{ matrix.rustflags }} # See https://github.com/tokio-rs/tokio/issues/5187 no-atomic-u64: From 1bb42a9ca92976a6315c0e4f34382adf5b314d5f Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 30 May 2023 12:50:10 -0700 Subject: [PATCH 19/22] apply feedback --- benches/rt_multi_threaded.rs | 4 ++++ tokio/src/runtime/builder.rs | 6 ++++++ tokio/src/runtime/scheduler/multi_thread/stats.rs | 2 +- tokio/src/runtime/scheduler/multi_thread/worker.rs | 10 +++++++--- tokio/tests/rt_threaded.rs | 5 +++++ 5 files changed, 23 insertions(+), 4 deletions(-) diff --git a/benches/rt_multi_threaded.rs b/benches/rt_multi_threaded.rs index d5f11c5667d..689c334b6d7 100644 --- a/benches/rt_multi_threaded.rs +++ b/benches/rt_multi_threaded.rs @@ -59,6 +59,8 @@ fn spawn_many_remote_idle(b: &mut Bencher) { }); } +// The runtime is busy with tasks that consume CPU time and yield. Yielding is a +// lower notification priority than spawning / regular notification. fn spawn_many_remote_busy1(b: &mut Bencher) { let rt = rt(); let rt_handle = rt.handle(); @@ -91,6 +93,8 @@ fn spawn_many_remote_busy1(b: &mut Bencher) { flag.store(false, Relaxed); } +// The runtime is busy with tasks that consume CPU time and spawn new high-CPU +// tasks. Spawning goes via a higher notification priority than yielding. fn spawn_many_remote_busy2(b: &mut Bencher) { const NUM_SPAWN: usize = 1_000; diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 641e7f728e1..af9e0e172f3 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -82,6 +82,12 @@ pub struct Builder { pub(super) keep_alive: Option, /// How many ticks before pulling a task from the global/remote queue? + /// + /// When `None`, the value is unspecified and behavior details are left to + /// the scheduler. Each scheduler flavor could choose to either pick its own + /// default value or use some other strategy to decide when to poll from the + /// global queue. For example, the multi-threaded scheduler uses a + /// self-tuning strategy based on mean task poll times. pub(super) global_queue_interval: Option, /// How many ticks before yielding to the driver for timer and I/O events? diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs index 5eacb12d381..9acc6237fa3 100644 --- a/tokio/src/runtime/scheduler/multi_thread/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -27,7 +27,7 @@ pub(crate) struct Stats { task_poll_time_ewma: f64, } -// How to weigh each individual poll time, value is plucked from thin air. +/// How to weigh each individual poll time, value is plucked from thin air. const TASK_POLL_TIME_EWMA_ALPHA: f64 = 0.1; /// Ideally, we wouldn't go above this, value is plucked from thin air. diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index cfe1350d90f..d6ee3e38fe0 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -497,8 +497,13 @@ impl Context { self.assert_lifo_enabled_is_correct(&core); - // Make the core available to the runtime context + // Measure the poll start time. Note that we may end up polling other + // tasks under this measurement. In this case, the tasks came from the + // LIFO slot and are considered part of the current task for scheduling + // purposes. These tasks inherent the "parent"'s limits. core.stats.start_poll(); + + // Make the core available to the runtime context *self.core.borrow_mut() = Some(core); // Run the task @@ -865,8 +870,6 @@ impl Core { .tuned_global_queue_interval(&worker.handle.shared.config); // Smooth out jitter - // - // `u32::abs_diff` is not available on Tokio's MSRV. if abs_diff(self.global_queue_interval, next) > 2 { self.global_queue_interval = next; } @@ -1059,6 +1062,7 @@ cfg_metrics! { } } +// `u32::abs_diff` is not available on Tokio's MSRV. fn abs_diff(a: u32, b: u32) -> u32 { if a > b { a - b diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index df168ec3869..b9cafdbffe8 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -589,6 +589,11 @@ async fn test_block_in_place4() { tokio::task::block_in_place(|| {}); } +// Testing the tuning logic is tricky as it is inherently timing based, and more +// of a heuristic than an exact behavior. This test checks that the interval +// changes over time based on load factors. There are no assertions, completion +// is sufficient. If there is a regression, this test will hang. In theory, we +// could add limits, but that would be likely to fail on CI. #[test] #[cfg(not(tokio_cross))] fn test_tuning() { From c5d5fe909b18f9c80c3491dd69bec9176979f9f5 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 30 May 2023 13:42:09 -0700 Subject: [PATCH 20/22] tweak tuning --- .../runtime/scheduler/multi_thread/stats.rs | 4 ++- .../runtime/scheduler/multi_thread/worker.rs | 2 ++ tokio/tests/rt_threaded.rs | 33 +++++++++++++++++-- 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs index 9acc6237fa3..f01daaa1bff 100644 --- a/tokio/src/runtime/scheduler/multi_thread/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -63,7 +63,9 @@ impl Stats { let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32; cmp::max( - 1, + // We don't want to return less than 2 as that would result in the + // global queue always getting checked first. + 2, cmp::min( MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL, tasks_per_interval, diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index d6ee3e38fe0..02d7fce6546 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -869,6 +869,8 @@ impl Core { .stats .tuned_global_queue_interval(&worker.handle.shared.config); + debug_assert!(next > 1); + // Smooth out jitter if abs_diff(self.global_queue_interval, next) > 2 { self.global_queue_interval = next; diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index b9cafdbffe8..85ecdf4271d 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -628,7 +628,23 @@ fn test_tuning() { // Now, hammer the injection queue until the interval drops. let mut i = 0; - while interval.load(Relaxed) > 8 { + let mut n = 0; + loop { + let curr = interval.load(Relaxed); + + if curr <= 8 { + n += 1; + } else { + n = 0; + } + + // Make sure we get a few good rounds. Jitter in the tuning could result + // in one "good" value without being representative of reaching a good + // state. + if n == 3 { + break; + } + let counter = counter.clone(); let interval = interval.clone(); @@ -667,7 +683,20 @@ fn test_tuning() { // Now, hammer the injection queue until the interval reaches the expected range. let mut i = 0; - while interval.load(Relaxed) > 1_000 || interval.load(Relaxed) <= 32 { + let mut n = 0; + loop { + let curr = interval.load(Relaxed); + + if curr <= 1_000 && curr > 32 { + n += 1; + } else { + n = 0; + } + + if n == 3 { + break; + } + let counter = counter.clone(); let interval = interval.clone(); From 5d4c19fb225dfe0738e785b020be1d1c36f67262 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 30 May 2023 19:58:24 -0700 Subject: [PATCH 21/22] try again --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 021382f1a5e..7aa75b7855d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -393,7 +393,7 @@ jobs: - uses: taiki-e/setup-cross-toolchain-action@v1 with: target: i686-unknown-linux-gnu - - run: cargo test -Zbuild-std --target target-specs/i686-unknown-linux-gnu.json -p tokio --all-features + - run: cargo test -Zbuild-std --target target-specs/i686-unknown-linux-gnu.json -p tokio --all-features -- --test-threads 1 --nocapture env: RUSTFLAGS: --cfg tokio_unstable --cfg tokio_taskdump -Dwarnings --cfg tokio_no_atomic_u64 # https://github.com/tokio-rs/tokio/pull/5356 From da2a1531640876fcb5a7c9618f4aef4130abe796 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 31 May 2023 08:15:00 -0700 Subject: [PATCH 22/22] try again