From 9b49b59b146b33d7fd15f876566a7bd25c351ff3 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 24 May 2023 15:38:12 -0700 Subject: [PATCH] rt(threaded): basic self-tuning of injection queue Each multi-threaded runtime worker prioritizes pulling tasks off of its local queue. Every so often, it checks the injection (global) queue for work submitted there. Previously, "every so often," was a constant "number of tasks polled" value. Tokio sets a default of 61, but allows users to configure this value. If workers are under load with tasks that are slow to poll, the injection queue can be starved. To prevent starvation in this case, this commit implements some basic self-tuning. The multi-threaded scheduler tracks the mean task poll time using an exponentially-weighted moving average. It then uses this value to pick an interval at which to check the injection queue. This commit is a first pass at adding self-tuning to the scheduler. There are other values in the scheduler that could benefit from self-tuning (e.g. the maintenance interval). Additionally, the current-thread scheduler could also benfit from self-tuning. However, we have reached the point where we should start investigating ways to unify logic in both schedulers. Adding self-tuning to the current-thread scheduler will be punted until after this unification. --- benches/rt_multi_threaded.rs | 65 ++++++++- tokio/src/runtime/builder.rs | 12 +- tokio/src/runtime/config.rs | 2 +- tokio/src/runtime/metrics/batch.rs | 20 ++- tokio/src/runtime/metrics/mock.rs | 3 +- tokio/src/runtime/scheduler/current_thread.rs | 27 +++- .../src/runtime/scheduler/multi_thread/mod.rs | 3 + .../runtime/scheduler/multi_thread/queue.rs | 16 +- .../runtime/scheduler/multi_thread/stats.rs | 138 ++++++++++++++++++ .../runtime/scheduler/multi_thread/worker.rs | 71 ++++++--- tokio/src/runtime/tests/queue.rs | 47 +++--- 11 files changed, 331 insertions(+), 73 deletions(-) create mode 100644 tokio/src/runtime/scheduler/multi_thread/stats.rs diff --git a/benches/rt_multi_threaded.rs b/benches/rt_multi_threaded.rs index 88553e4ab50..d5f11c5667d 100644 --- a/benches/rt_multi_threaded.rs +++ b/benches/rt_multi_threaded.rs @@ -6,12 +6,14 @@ use tokio::runtime::{self, Runtime}; use tokio::sync::oneshot; use bencher::{benchmark_group, benchmark_main, Bencher}; -use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::{mpsc, Arc}; +use std::time::{Duration, Instant}; const NUM_WORKERS: usize = 4; const NUM_SPAWN: usize = 10_000; +const STALL_DUR: Duration = Duration::from_micros(10); fn spawn_many_local(b: &mut Bencher) { let rt = rt(); @@ -57,19 +59,60 @@ fn spawn_many_remote_idle(b: &mut Bencher) { }); } -fn spawn_many_remote_busy(b: &mut Bencher) { +fn spawn_many_remote_busy1(b: &mut Bencher) { let rt = rt(); let rt_handle = rt.handle(); let mut handles = Vec::with_capacity(NUM_SPAWN); + let flag = Arc::new(AtomicBool::new(true)); // Spawn some tasks to keep the runtimes busy for _ in 0..(2 * NUM_WORKERS) { - rt.spawn(async { - loop { + let flag = flag.clone(); + rt.spawn(async move { + while flag.load(Relaxed) { tokio::task::yield_now().await; - std::thread::sleep(std::time::Duration::from_micros(10)); + stall(); + } + }); + } + + b.iter(|| { + for _ in 0..NUM_SPAWN { + handles.push(rt_handle.spawn(async {})); + } + + rt.block_on(async { + for handle in handles.drain(..) { + handle.await.unwrap(); } }); + }); + + flag.store(false, Relaxed); +} + +fn spawn_many_remote_busy2(b: &mut Bencher) { + const NUM_SPAWN: usize = 1_000; + + let rt = rt(); + let rt_handle = rt.handle(); + let mut handles = Vec::with_capacity(NUM_SPAWN); + let flag = Arc::new(AtomicBool::new(true)); + + // Spawn some tasks to keep the runtimes busy + for _ in 0..(NUM_WORKERS) { + let flag = flag.clone(); + fn iter(flag: Arc) { + tokio::spawn(async { + if flag.load(Relaxed) { + stall(); + iter(flag); + } + }); + } + rt.spawn(async { + iter(flag); + }); } b.iter(|| { @@ -83,6 +126,8 @@ fn spawn_many_remote_busy(b: &mut Bencher) { } }); }); + + flag.store(false, Relaxed); } fn yield_many(b: &mut Bencher) { @@ -193,11 +238,19 @@ fn rt() -> Runtime { .unwrap() } +fn stall() { + let now = Instant::now(); + while now.elapsed() < STALL_DUR { + std::thread::yield_now(); + } +} + benchmark_group!( scheduler, spawn_many_local, spawn_many_remote_idle, - spawn_many_remote_busy, + spawn_many_remote_busy1, + spawn_many_remote_busy2, ping_pong, yield_many, chained_spawn, diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index dda21a3ae27..641e7f728e1 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -82,7 +82,7 @@ pub struct Builder { pub(super) keep_alive: Option, /// How many ticks before pulling a task from the global/remote queue? - pub(super) global_queue_interval: u32, + pub(super) global_queue_interval: Option, /// How many ticks before yielding to the driver for timer and I/O events? pub(super) event_interval: u32, @@ -211,7 +211,7 @@ impl Builder { #[cfg(not(loom))] const EVENT_INTERVAL: u32 = 61; - Builder::new(Kind::CurrentThread, 31, EVENT_INTERVAL) + Builder::new(Kind::CurrentThread, EVENT_INTERVAL) } cfg_not_wasi! { @@ -222,7 +222,7 @@ impl Builder { #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] pub fn new_multi_thread() -> Builder { // The number `61` is fairly arbitrary. I believe this value was copied from golang. - Builder::new(Kind::MultiThread, 61, 61) + Builder::new(Kind::MultiThread, 61) } } @@ -230,7 +230,7 @@ impl Builder { /// values. /// /// Configuration methods can be chained on the return value. - pub(crate) fn new(kind: Kind, global_queue_interval: u32, event_interval: u32) -> Builder { + pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder { Builder { kind, @@ -266,7 +266,7 @@ impl Builder { // Defaults for these values depend on the scheduler kind, so we get them // as parameters. - global_queue_interval, + global_queue_interval: None, event_interval, seed_generator: RngSeedGenerator::new(RngSeed::new()), @@ -716,7 +716,7 @@ impl Builder { /// # } /// ``` pub fn global_queue_interval(&mut self, val: u32) -> &mut Self { - self.global_queue_interval = val; + self.global_queue_interval = Some(val); self } diff --git a/tokio/src/runtime/config.rs b/tokio/src/runtime/config.rs index 4af5eee472f..c42e4fe5a80 100644 --- a/tokio/src/runtime/config.rs +++ b/tokio/src/runtime/config.rs @@ -4,7 +4,7 @@ use crate::util::RngSeedGenerator; pub(crate) struct Config { /// How many ticks before pulling a task from the global/remote queue? - pub(crate) global_queue_interval: u32, + pub(crate) global_queue_interval: Option, /// How many ticks before yielding to the driver for timer and I/O events? pub(crate) event_interval: u32, diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index 296792d7fb8..1bb4e261f73 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -34,7 +34,7 @@ pub(crate) struct MetricsBatch { busy_duration_total: u64, /// Instant at which work last resumed (continued after park). - last_resume_time: Instant, + processing_scheduled_tasks_started_at: Instant, /// If `Some`, tracks poll times in nanoseconds poll_timer: Option, @@ -62,7 +62,7 @@ impl MetricsBatch { local_schedule_count: 0, overflow_count: 0, busy_duration_total: 0, - last_resume_time: now, + processing_scheduled_tasks_started_at: now, poll_timer: worker_metrics .poll_count_histogram .as_ref() @@ -106,11 +106,20 @@ impl MetricsBatch { } else { self.poll_count_on_last_park = self.poll_count; } + } + + /// Start processing a batch of tasks + pub(crate) fn start_processing_scheduled_tasks(&mut self) { + self.processing_scheduled_tasks_started_at = Instant::now(); + } - let busy_duration = self.last_resume_time.elapsed(); + /// Stop processing a batch of tasks + pub(crate) fn end_processing_scheduled_tasks(&mut self) { + let busy_duration = self.processing_scheduled_tasks_started_at.elapsed(); self.busy_duration_total += duration_as_u64(busy_duration); } + /// Start polling an individual task pub(crate) fn start_poll(&mut self) { self.poll_count += 1; @@ -119,6 +128,7 @@ impl MetricsBatch { } } + /// Stop polling an individual task pub(crate) fn end_poll(&mut self) { if let Some(poll_timer) = &mut self.poll_timer { let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed()); @@ -126,10 +136,6 @@ impl MetricsBatch { } } - pub(crate) fn returned_from_park(&mut self) { - self.last_resume_time = Instant::now(); - } - pub(crate) fn inc_local_schedule_count(&mut self) { self.local_schedule_count += 1; } diff --git a/tokio/src/runtime/metrics/mock.rs b/tokio/src/runtime/metrics/mock.rs index 4d5e7b5b152..8f8345c08b4 100644 --- a/tokio/src/runtime/metrics/mock.rs +++ b/tokio/src/runtime/metrics/mock.rs @@ -39,8 +39,9 @@ impl MetricsBatch { pub(crate) fn submit(&mut self, _to: &WorkerMetrics) {} pub(crate) fn about_to_park(&mut self) {} - pub(crate) fn returned_from_park(&mut self) {} pub(crate) fn inc_local_schedule_count(&mut self) {} + pub(crate) fn start_processing_scheduled_tasks(&mut self) {} + pub(crate) fn end_processing_scheduled_tasks(&mut self) {} pub(crate) fn start_poll(&mut self) {} pub(crate) fn end_poll(&mut self) {} } diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index 8b0dfaaf3d0..54679dce314 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -59,6 +59,9 @@ struct Core { /// Metrics batch metrics: MetricsBatch, + /// How often to check the global queue + global_queue_interval: u32, + /// True if a task panicked without being handled and the runtime is /// configured to shutdown on unhandled panic. unhandled_panic: bool, @@ -100,6 +103,11 @@ type Notified = task::Notified>; /// Initial queue capacity. const INITIAL_CAPACITY: usize = 64; +/// Used if none is specified. This is a temporary constant and will be removed +/// as we unify tuning logic between the multi-thread and current-thread +/// schedulers. +const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 61; + // Tracks the current CurrentThread. scoped_thread_local!(static CURRENT: Context); @@ -113,6 +121,11 @@ impl CurrentThread { ) -> (CurrentThread, Arc) { let worker_metrics = WorkerMetrics::from_config(&config); + // Get the configured global queue interval, or use the default. + let global_queue_interval = config + .global_queue_interval + .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL); + let handle = Arc::new(Handle { shared: Shared { inject: Inject::new(), @@ -132,6 +145,7 @@ impl CurrentThread { tick: 0, driver: Some(driver), metrics: MetricsBatch::new(&handle.shared.worker_metrics), + global_queue_interval, unhandled_panic: false, }))); @@ -255,7 +269,7 @@ impl Core { } fn next_task(&mut self, handle: &Handle) -> Option { - if self.tick % handle.shared.config.global_queue_interval == 0 { + if self.tick % self.global_queue_interval == 0 { handle .next_remote_task() .or_else(|| self.next_local_task(handle)) @@ -344,7 +358,6 @@ impl Context { }); core = c; - core.metrics.returned_from_park(); } if let Some(f) = &handle.shared.config.after_unpark { @@ -603,6 +616,8 @@ impl CoreGuard<'_> { pin!(future); + core.metrics.start_processing_scheduled_tasks(); + 'outer: loop { let handle = &context.handle; @@ -631,12 +646,16 @@ impl CoreGuard<'_> { let task = match entry { Some(entry) => entry, None => { + core.metrics.end_processing_scheduled_tasks(); + core = if did_defer_tasks() { context.park_yield(core, handle) } else { context.park(core, handle) }; + core.metrics.start_processing_scheduled_tasks(); + // Try polling the `block_on` future next continue 'outer; } @@ -651,9 +670,13 @@ impl CoreGuard<'_> { core = c; } + core.metrics.end_processing_scheduled_tasks(); + // Yield to the driver, this drives the timer and pulls any // pending I/O events. core = context.park_yield(core, handle); + + core.metrics.start_processing_scheduled_tasks(); } }); diff --git a/tokio/src/runtime/scheduler/multi_thread/mod.rs b/tokio/src/runtime/scheduler/multi_thread/mod.rs index 0a1c63de9b1..9b225335bcf 100644 --- a/tokio/src/runtime/scheduler/multi_thread/mod.rs +++ b/tokio/src/runtime/scheduler/multi_thread/mod.rs @@ -9,6 +9,9 @@ pub(crate) use handle::Handle; mod idle; use self::idle::Idle; +mod stats; +pub(crate) use stats::Stats; + mod park; pub(crate) use park::{Parker, Unparker}; diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index dd132fb9a6d..2a0ae833850 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -2,8 +2,8 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::Arc; +use crate::runtime::scheduler::multi_thread::Stats; use crate::runtime::task::{self, Inject}; -use crate::runtime::MetricsBatch; use std::mem::{self, MaybeUninit}; use std::ptr; @@ -186,7 +186,7 @@ impl Local { &mut self, mut task: task::Notified, inject: &Inject, - metrics: &mut MetricsBatch, + stats: &mut Stats, ) { let tail = loop { let head = self.inner.head.load(Acquire); @@ -206,7 +206,7 @@ impl Local { } else { // Push the current task and half of the queue into the // inject queue. - match self.push_overflow(task, real, tail, inject, metrics) { + match self.push_overflow(task, real, tail, inject, stats) { Ok(_) => return, // Lost the race, try again Err(v) => { @@ -253,7 +253,7 @@ impl Local { head: UnsignedShort, tail: UnsignedShort, inject: &Inject, - metrics: &mut MetricsBatch, + stats: &mut Stats, ) -> Result<(), task::Notified> { /// How many elements are we taking from the local queue. /// @@ -338,7 +338,7 @@ impl Local { inject.push_batch(batch_iter.chain(std::iter::once(task))); // Add 1 to factor in the task currently being scheduled. - metrics.incr_overflow_count(); + stats.incr_overflow_count(); Ok(()) } @@ -394,7 +394,7 @@ impl Steal { pub(crate) fn steal_into( &self, dst: &mut Local, - dst_metrics: &mut MetricsBatch, + dst_stats: &mut Stats, ) -> Option> { // Safety: the caller is the only thread that mutates `dst.tail` and // holds a mutable reference. @@ -420,8 +420,8 @@ impl Steal { return None; } - dst_metrics.incr_steal_count(n as u16); - dst_metrics.incr_steal_operations(); + dst_stats.incr_steal_count(n as u16); + dst_stats.incr_steal_operations(); // We are returning a task here n -= 1; diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs new file mode 100644 index 00000000000..00b0c9ea309 --- /dev/null +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -0,0 +1,138 @@ +use crate::runtime::{Config, MetricsBatch, WorkerMetrics}; + +use std::cmp; +use std::time::{Duration, Instant}; + +/// Per-worker statistics. This is used for both tuning the scheduler and +/// reporting runtime-level metrics/stats. +pub(crate) struct Stats { + /// The metrics batch used to report runtime-level metrics/stats to the + /// user. + batch: MetricsBatch, + + /// Instant at which work last resumed (continued after park). + /// + /// This duplicates the value stored in `MetricsBatch`. We will unify + /// `Stats` and `MetricsBatch` when we stabilize metrics. + processing_scheduled_tasks_started_at: Instant, + + /// Number of tasks polled in the batch of scheduled tasks + tasks_polled_in_batch: usize, + + /// Exponentially-weighted moving average of time spent polling scheduled a + /// task. + /// + /// Tracked in nanoseconds, stored as a f64 since that is what we use with + /// the EWMA calculations + task_poll_time_ewma: f64, +} + +// How to weigh each individual poll time, value is plucked from thin air. +const TASK_POLL_TIME_EWMA_ALPHA: f64 = 0.1; + +/// Ideally, we wouldn't go above this, value is plucked from thin air. +const TARGET_GLOBAL_QUEUE_INTERVAL: f64 = Duration::from_micros(200).as_nanos() as f64; + +/// Max value for the global queue interval. This is 2x the previous default +const MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 127; + +/// This is the previous default +const TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 61; + +impl Stats { + pub(crate) fn new(worker_metrics: &WorkerMetrics) -> Stats { + // Seed the value with what we hope to see. + let task_poll_time_ewma = + TARGET_GLOBAL_QUEUE_INTERVAL / TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL as f64; + + Stats { + batch: MetricsBatch::new(worker_metrics), + processing_scheduled_tasks_started_at: Instant::now(), + tasks_polled_in_batch: 0, + task_poll_time_ewma: task_poll_time_ewma, + } + } + + pub(crate) fn tuned_global_queue_interval(&self, config: &Config) -> u32 { + // If an interval is explicitly set, don't tune. + if let Some(configured) = config.global_queue_interval { + return configured; + } + + // As of Rust 1.45, casts from f64 -> u32 are saturating, which is fine here. + let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32; + + cmp::max( + 1, + cmp::min( + MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL, + tasks_per_interval, + ), + ) + } + + pub(crate) fn submit(&mut self, to: &WorkerMetrics) { + self.batch.submit(to); + } + + pub(crate) fn about_to_park(&mut self) { + self.batch.about_to_park(); + } + + pub(crate) fn inc_local_schedule_count(&mut self) { + self.batch.inc_local_schedule_count(); + } + + pub(crate) fn start_processing_scheduled_tasks(&mut self) { + self.batch.start_processing_scheduled_tasks(); + + self.processing_scheduled_tasks_started_at = Instant::now(); + self.tasks_polled_in_batch = 0; + } + + pub(crate) fn end_processing_scheduled_tasks(&mut self) { + self.batch.end_processing_scheduled_tasks(); + + // Update the EWMA task poll time + if self.tasks_polled_in_batch > 0 { + let now = Instant::now(); + + // If we "overflow" this conversion, we have bigger problems than + // slightly off stats. + let elapsed = (now - self.processing_scheduled_tasks_started_at).as_nanos() as f64; + let num_polls = self.tasks_polled_in_batch as f64; + + // Calculate the mean poll duration for a single task in the batch + let mean_poll_duration = elapsed / num_polls; + + // Compute the alpha weighted by the number of tasks polled this batch. + let weighted_alpha = 1.0 - (1.0 - TASK_POLL_TIME_EWMA_ALPHA).powf(num_polls); + + // Now compute the new weighted average task poll time. + self.task_poll_time_ewma = weighted_alpha * mean_poll_duration + + (1.0 - weighted_alpha) * self.task_poll_time_ewma; + } + } + + pub(crate) fn start_poll(&mut self) { + self.batch.start_poll(); + + self.tasks_polled_in_batch += 1; + } + + pub(crate) fn end_poll(&mut self) { + self.batch.end_poll(); + } + + pub(crate) fn incr_steal_count(&mut self, by: u16) { + self.batch.incr_steal_count(by); + } + + pub(crate) fn incr_steal_operations(&mut self) { + self.batch.incr_steal_operations(); + } + + pub(crate) fn incr_overflow_count(&mut self) { + self.batch.incr_overflow_count(); + } +} diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index e2bbb643db7..3ce07825a00 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -59,10 +59,12 @@ use crate::loom::sync::{Arc, Mutex}; use crate::runtime; use crate::runtime::context; -use crate::runtime::scheduler::multi_thread::{queue, Counters, Handle, Idle, Parker, Unparker}; +use crate::runtime::scheduler::multi_thread::{ + queue, Counters, Handle, Idle, Parker, Stats, Unparker, +}; use crate::runtime::task::{Inject, OwnedTasks}; use crate::runtime::{ - blocking, coop, driver, scheduler, task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics, + blocking, coop, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics, }; use crate::util::atomic_cell::AtomicCell; use crate::util::rand::{FastRand, RngSeedGenerator}; @@ -114,8 +116,11 @@ struct Core { /// borrow checker happy. park: Option, - /// Batching metrics so they can be submitted to RuntimeMetrics. - metrics: MetricsBatch, + /// Per-worker runtime stats + stats: Stats, + + /// How often to check the global queue + global_queue_interval: u32, /// Fast random number generator. rand: FastRand, @@ -220,6 +225,7 @@ pub(super) fn create( let park = park.clone(); let unpark = park.unpark(); let metrics = WorkerMetrics::from_config(&config); + let stats = Stats::new(&metrics); cores.push(Box::new(Core { tick: 0, @@ -229,7 +235,8 @@ pub(super) fn create( is_searching: false, is_shutdown: false, park: Some(park), - metrics: MetricsBatch::new(&metrics), + global_queue_interval: stats.tuned_global_queue_interval(&config), + stats, rand: FastRand::new(config.seed_generator.next_seed()), })); @@ -437,6 +444,10 @@ impl Context { // a task that had the LIFO slot disabled. self.reset_lifo_enabled(&mut core); + // Start as "processing" tasks as polling tasks from the local queue + // will be one of the first things we do. + core.stats.start_processing_scheduled_tasks(); + while !core.is_shutdown { self.assert_lifo_enabled_is_correct(&core); @@ -452,9 +463,14 @@ impl Context { continue; } + // We consumed all work in the queues and will start searching for work. + core.stats.end_processing_scheduled_tasks(); + // There is no more **local** work to process, try to steal work // from other workers. if let Some(task) = core.steal_work(&self.worker) { + // Found work, switch back to processing + core.stats.start_processing_scheduled_tasks(); core = self.run_task(task, core)?; } else { // Wait for work @@ -483,7 +499,7 @@ impl Context { self.assert_lifo_enabled_is_correct(&core); // Make the core available to the runtime context - core.metrics.start_poll(); + core.stats.start_poll(); *self.core.borrow_mut() = Some(core); // Run the task @@ -511,24 +527,27 @@ impl Context { // was called, turning the poll into a "blocking op". In this // case, we don't want to measure the poll time as it doesn't // really count as an async poll anymore. - core.metrics.end_poll(); + // core.stats.end_poll(); // Check for a task in the LIFO slot let task = match core.lifo_slot.take() { Some(task) => task, None => { self.reset_lifo_enabled(&mut core); + core.stats.end_poll(); return Ok(core); } }; if !coop::has_budget_remaining() { + core.stats.end_poll(); + // Not enough budget left to run the LIFO task, push it to // the back of the queue and return. core.run_queue.push_back_or_overflow( task, self.worker.inject(), - &mut core.metrics, + &mut core.stats, ); // If we hit this point, the LIFO slot should be enabled. // There is no need to reset it. @@ -553,7 +572,6 @@ impl Context { } // Run the LIFO task, then loop - core.metrics.start_poll(); *self.core.borrow_mut() = Some(core); let task = self.worker.handle.shared.owned.assert_owner(task); task.run(); @@ -576,12 +594,16 @@ impl Context { if core.tick % self.worker.handle.shared.config.event_interval == 0 { super::counters::inc_num_maintenance(); + core.stats.end_processing_scheduled_tasks(); + // Call `park` with a 0 timeout. This enables the I/O driver, timer, ... // to run without actually putting the thread to sleep. core = self.park_timeout(core, Some(Duration::from_millis(0))); // Run regularly scheduled maintenance core.maintenance(&self.worker); + + core.stats.start_processing_scheduled_tasks(); } core @@ -605,9 +627,8 @@ impl Context { if core.transition_to_parked(&self.worker) { while !core.is_shutdown { - core.metrics.about_to_park(); + core.stats.about_to_park(); core = self.park_timeout(core, None); - core.metrics.returned_from_park(); // Run regularly scheduled maintenance core.maintenance(&self.worker); @@ -666,7 +687,10 @@ impl Core { /// Return the next notified task available to this worker. fn next_task(&mut self, worker: &Worker) -> Option { - if self.tick % worker.handle.shared.config.global_queue_interval == 0 { + if self.tick % self.global_queue_interval == 0 { + // Update the global queue interval, if needed + self.tune_global_queue_interval(worker); + worker.inject().pop().or_else(|| self.next_local_task()) } else { let maybe_task = self.next_local_task(); @@ -733,7 +757,7 @@ impl Core { let target = &worker.handle.shared.remotes[i]; if let Some(task) = target .steal - .steal_into(&mut self.run_queue, &mut self.metrics) + .steal_into(&mut self.run_queue, &mut self.stats) { return Some(task); } @@ -813,7 +837,7 @@ impl Core { /// Runs maintenance work such as checking the pool's state. fn maintenance(&mut self, worker: &Worker) { - self.metrics + self.stats .submit(&worker.handle.shared.worker_metrics[worker.index]); if !self.is_shutdown { @@ -828,7 +852,7 @@ impl Core { // Signal to all tasks to shut down. worker.handle.shared.owned.close_and_shutdown_all(); - self.metrics + self.stats .submit(&worker.handle.shared.worker_metrics[worker.index]); } @@ -842,6 +866,17 @@ impl Core { park.shutdown(&handle.driver); } + + fn tune_global_queue_interval(&mut self, worker: &Worker) { + let next = self + .stats + .tuned_global_queue_interval(&worker.handle.shared.config); + + // Smooth out jitter + if self.global_queue_interval.abs_diff(next) > 2 { + self.global_queue_interval = next; + } + } } impl Worker { @@ -888,7 +923,7 @@ impl Handle { } fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) { - core.metrics.inc_local_schedule_count(); + core.stats.inc_local_schedule_count(); // Spawning from the worker thread. If scheduling a "yield" then the // task must always be pushed to the back of the queue, enabling other @@ -896,7 +931,7 @@ impl Handle { // flexibility and the task may go to the front of the queue. let should_notify = if is_yield || !core.lifo_enabled { core.run_queue - .push_back_or_overflow(task, &self.shared.inject, &mut core.metrics); + .push_back_or_overflow(task, &self.shared.inject, &mut core.stats); true } else { // Push to the LIFO slot @@ -905,7 +940,7 @@ impl Handle { if let Some(prev) = prev { core.run_queue - .push_back_or_overflow(prev, &self.shared.inject, &mut core.metrics); + .push_back_or_overflow(prev, &self.shared.inject, &mut core.stats); } core.lifo_slot = Some(task); diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index 09f249e9ea4..2e42f1bbb9e 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -1,18 +1,17 @@ -use crate::runtime::scheduler::multi_thread::queue; +use crate::runtime::scheduler::multi_thread::{queue, Stats}; use crate::runtime::task::{self, Inject, Schedule, Task}; -use crate::runtime::MetricsBatch; use std::thread; use std::time::Duration; #[allow(unused)] macro_rules! assert_metrics { - ($metrics:ident, $field:ident == $v:expr) => {{ + ($stats:ident, $field:ident == $v:expr) => {{ use crate::runtime::WorkerMetrics; use std::sync::atomic::Ordering::Relaxed; let worker = WorkerMetrics::new(); - $metrics.submit(&worker); + $stats.submit(&worker); let expect = $v; let actual = worker.$field.load(Relaxed); @@ -21,24 +20,24 @@ macro_rules! assert_metrics { }}; } -fn metrics_batch() -> MetricsBatch { +fn new_stats() -> Stats { use crate::runtime::WorkerMetrics; - MetricsBatch::new(&WorkerMetrics::new()) + Stats::new(&WorkerMetrics::new()) } #[test] fn fits_256_one_at_a_time() { let (_, mut local) = queue::local(); let inject = Inject::new(); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); for _ in 0..256 { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); } cfg_metrics! { - assert_metrics!(metrics, overflow_count == 0); + assert_metrics!(stats, overflow_count == 0); } assert!(inject.pop().is_none()); @@ -88,15 +87,15 @@ fn fits_256_all_in_chunks() { fn overflow() { let (_, mut local) = queue::local(); let inject = Inject::new(); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); for _ in 0..257 { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); } cfg_metrics! { - assert_metrics!(metrics, overflow_count == 1); + assert_metrics!(stats, overflow_count == 1); } let mut n = 0; @@ -114,7 +113,7 @@ fn overflow() { #[test] fn steal_batch() { - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let (steal1, mut local1) = queue::local(); let (_, mut local2) = queue::local(); @@ -122,13 +121,13 @@ fn steal_batch() { for _ in 0..4 { let (task, _) = super::unowned(async {}); - local1.push_back_or_overflow(task, &inject, &mut metrics); + local1.push_back_or_overflow(task, &inject, &mut stats); } - assert!(steal1.steal_into(&mut local2, &mut metrics).is_some()); + assert!(steal1.steal_into(&mut local2, &mut stats).is_some()); cfg_metrics! { - assert_metrics!(metrics, steal_count == 2); + assert_metrics!(stats, steal_count == 2); } for _ in 0..1 { @@ -160,19 +159,19 @@ fn stress1() { const NUM_PUSH: usize = normal_or_miri(500, 10); const NUM_POP: usize = normal_or_miri(250, 10); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); for _ in 0..NUM_ITER { let (steal, mut local) = queue::local(); let inject = Inject::new(); let th = thread::spawn(move || { - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let (_, mut local) = queue::local(); let mut n = 0; for _ in 0..NUM_STEAL { - if steal.steal_into(&mut local, &mut metrics).is_some() { + if steal.steal_into(&mut local, &mut stats).is_some() { n += 1; } @@ -184,7 +183,7 @@ fn stress1() { } cfg_metrics! { - assert_metrics!(metrics, steal_count == n as _); + assert_metrics!(stats, steal_count == n as _); } n @@ -195,7 +194,7 @@ fn stress1() { for _ in 0..NUM_LOCAL { for _ in 0..NUM_PUSH { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); } for _ in 0..NUM_POP { @@ -223,14 +222,14 @@ fn stress2() { const NUM_TASKS: usize = normal_or_miri(1_000_000, 50); const NUM_STEAL: usize = normal_or_miri(1_000, 10); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); for _ in 0..NUM_ITER { let (steal, mut local) = queue::local(); let inject = Inject::new(); let th = thread::spawn(move || { - let mut stats = metrics_batch(); + let mut stats = new_stats(); let (_, mut local) = queue::local(); let mut n = 0; @@ -253,7 +252,7 @@ fn stress2() { for i in 0..NUM_TASKS { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); if i % 128 == 0 && local.pop().is_some() { num_pop += 1;