diff --git a/benches/rt_multi_threaded.rs b/benches/rt_multi_threaded.rs index 88553e4ab50..d5f11c5667d 100644 --- a/benches/rt_multi_threaded.rs +++ b/benches/rt_multi_threaded.rs @@ -6,12 +6,14 @@ use tokio::runtime::{self, Runtime}; use tokio::sync::oneshot; use bencher::{benchmark_group, benchmark_main, Bencher}; -use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::{mpsc, Arc}; +use std::time::{Duration, Instant}; const NUM_WORKERS: usize = 4; const NUM_SPAWN: usize = 10_000; +const STALL_DUR: Duration = Duration::from_micros(10); fn spawn_many_local(b: &mut Bencher) { let rt = rt(); @@ -57,19 +59,60 @@ fn spawn_many_remote_idle(b: &mut Bencher) { }); } -fn spawn_many_remote_busy(b: &mut Bencher) { +fn spawn_many_remote_busy1(b: &mut Bencher) { let rt = rt(); let rt_handle = rt.handle(); let mut handles = Vec::with_capacity(NUM_SPAWN); + let flag = Arc::new(AtomicBool::new(true)); // Spawn some tasks to keep the runtimes busy for _ in 0..(2 * NUM_WORKERS) { - rt.spawn(async { - loop { + let flag = flag.clone(); + rt.spawn(async move { + while flag.load(Relaxed) { tokio::task::yield_now().await; - std::thread::sleep(std::time::Duration::from_micros(10)); + stall(); + } + }); + } + + b.iter(|| { + for _ in 0..NUM_SPAWN { + handles.push(rt_handle.spawn(async {})); + } + + rt.block_on(async { + for handle in handles.drain(..) { + handle.await.unwrap(); } }); + }); + + flag.store(false, Relaxed); +} + +fn spawn_many_remote_busy2(b: &mut Bencher) { + const NUM_SPAWN: usize = 1_000; + + let rt = rt(); + let rt_handle = rt.handle(); + let mut handles = Vec::with_capacity(NUM_SPAWN); + let flag = Arc::new(AtomicBool::new(true)); + + // Spawn some tasks to keep the runtimes busy + for _ in 0..(NUM_WORKERS) { + let flag = flag.clone(); + fn iter(flag: Arc) { + tokio::spawn(async { + if flag.load(Relaxed) { + stall(); + iter(flag); + } + }); + } + rt.spawn(async { + iter(flag); + }); } b.iter(|| { @@ -83,6 +126,8 @@ fn spawn_many_remote_busy(b: &mut Bencher) { } }); }); + + flag.store(false, Relaxed); } fn yield_many(b: &mut Bencher) { @@ -193,11 +238,19 @@ fn rt() -> Runtime { .unwrap() } +fn stall() { + let now = Instant::now(); + while now.elapsed() < STALL_DUR { + std::thread::yield_now(); + } +} + benchmark_group!( scheduler, spawn_many_local, spawn_many_remote_idle, - spawn_many_remote_busy, + spawn_many_remote_busy1, + spawn_many_remote_busy2, ping_pong, yield_many, chained_spawn, diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index dda21a3ae27..641e7f728e1 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -82,7 +82,7 @@ pub struct Builder { pub(super) keep_alive: Option, /// How many ticks before pulling a task from the global/remote queue? - pub(super) global_queue_interval: u32, + pub(super) global_queue_interval: Option, /// How many ticks before yielding to the driver for timer and I/O events? pub(super) event_interval: u32, @@ -211,7 +211,7 @@ impl Builder { #[cfg(not(loom))] const EVENT_INTERVAL: u32 = 61; - Builder::new(Kind::CurrentThread, 31, EVENT_INTERVAL) + Builder::new(Kind::CurrentThread, EVENT_INTERVAL) } cfg_not_wasi! { @@ -222,7 +222,7 @@ impl Builder { #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] pub fn new_multi_thread() -> Builder { // The number `61` is fairly arbitrary. I believe this value was copied from golang. - Builder::new(Kind::MultiThread, 61, 61) + Builder::new(Kind::MultiThread, 61) } } @@ -230,7 +230,7 @@ impl Builder { /// values. /// /// Configuration methods can be chained on the return value. - pub(crate) fn new(kind: Kind, global_queue_interval: u32, event_interval: u32) -> Builder { + pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder { Builder { kind, @@ -266,7 +266,7 @@ impl Builder { // Defaults for these values depend on the scheduler kind, so we get them // as parameters. - global_queue_interval, + global_queue_interval: None, event_interval, seed_generator: RngSeedGenerator::new(RngSeed::new()), @@ -716,7 +716,7 @@ impl Builder { /// # } /// ``` pub fn global_queue_interval(&mut self, val: u32) -> &mut Self { - self.global_queue_interval = val; + self.global_queue_interval = Some(val); self } diff --git a/tokio/src/runtime/config.rs b/tokio/src/runtime/config.rs index 4af5eee472f..c42e4fe5a80 100644 --- a/tokio/src/runtime/config.rs +++ b/tokio/src/runtime/config.rs @@ -4,7 +4,7 @@ use crate::util::RngSeedGenerator; pub(crate) struct Config { /// How many ticks before pulling a task from the global/remote queue? - pub(crate) global_queue_interval: u32, + pub(crate) global_queue_interval: Option, /// How many ticks before yielding to the driver for timer and I/O events? pub(crate) event_interval: u32, diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index 296792d7fb8..1bb4e261f73 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -34,7 +34,7 @@ pub(crate) struct MetricsBatch { busy_duration_total: u64, /// Instant at which work last resumed (continued after park). - last_resume_time: Instant, + processing_scheduled_tasks_started_at: Instant, /// If `Some`, tracks poll times in nanoseconds poll_timer: Option, @@ -62,7 +62,7 @@ impl MetricsBatch { local_schedule_count: 0, overflow_count: 0, busy_duration_total: 0, - last_resume_time: now, + processing_scheduled_tasks_started_at: now, poll_timer: worker_metrics .poll_count_histogram .as_ref() @@ -106,11 +106,20 @@ impl MetricsBatch { } else { self.poll_count_on_last_park = self.poll_count; } + } + + /// Start processing a batch of tasks + pub(crate) fn start_processing_scheduled_tasks(&mut self) { + self.processing_scheduled_tasks_started_at = Instant::now(); + } - let busy_duration = self.last_resume_time.elapsed(); + /// Stop processing a batch of tasks + pub(crate) fn end_processing_scheduled_tasks(&mut self) { + let busy_duration = self.processing_scheduled_tasks_started_at.elapsed(); self.busy_duration_total += duration_as_u64(busy_duration); } + /// Start polling an individual task pub(crate) fn start_poll(&mut self) { self.poll_count += 1; @@ -119,6 +128,7 @@ impl MetricsBatch { } } + /// Stop polling an individual task pub(crate) fn end_poll(&mut self) { if let Some(poll_timer) = &mut self.poll_timer { let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed()); @@ -126,10 +136,6 @@ impl MetricsBatch { } } - pub(crate) fn returned_from_park(&mut self) { - self.last_resume_time = Instant::now(); - } - pub(crate) fn inc_local_schedule_count(&mut self) { self.local_schedule_count += 1; } diff --git a/tokio/src/runtime/metrics/mock.rs b/tokio/src/runtime/metrics/mock.rs index 4d5e7b5b152..8f8345c08b4 100644 --- a/tokio/src/runtime/metrics/mock.rs +++ b/tokio/src/runtime/metrics/mock.rs @@ -39,8 +39,9 @@ impl MetricsBatch { pub(crate) fn submit(&mut self, _to: &WorkerMetrics) {} pub(crate) fn about_to_park(&mut self) {} - pub(crate) fn returned_from_park(&mut self) {} pub(crate) fn inc_local_schedule_count(&mut self) {} + pub(crate) fn start_processing_scheduled_tasks(&mut self) {} + pub(crate) fn end_processing_scheduled_tasks(&mut self) {} pub(crate) fn start_poll(&mut self) {} pub(crate) fn end_poll(&mut self) {} } diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index 8b0dfaaf3d0..54679dce314 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -59,6 +59,9 @@ struct Core { /// Metrics batch metrics: MetricsBatch, + /// How often to check the global queue + global_queue_interval: u32, + /// True if a task panicked without being handled and the runtime is /// configured to shutdown on unhandled panic. unhandled_panic: bool, @@ -100,6 +103,11 @@ type Notified = task::Notified>; /// Initial queue capacity. const INITIAL_CAPACITY: usize = 64; +/// Used if none is specified. This is a temporary constant and will be removed +/// as we unify tuning logic between the multi-thread and current-thread +/// schedulers. +const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 61; + // Tracks the current CurrentThread. scoped_thread_local!(static CURRENT: Context); @@ -113,6 +121,11 @@ impl CurrentThread { ) -> (CurrentThread, Arc) { let worker_metrics = WorkerMetrics::from_config(&config); + // Get the configured global queue interval, or use the default. + let global_queue_interval = config + .global_queue_interval + .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL); + let handle = Arc::new(Handle { shared: Shared { inject: Inject::new(), @@ -132,6 +145,7 @@ impl CurrentThread { tick: 0, driver: Some(driver), metrics: MetricsBatch::new(&handle.shared.worker_metrics), + global_queue_interval, unhandled_panic: false, }))); @@ -255,7 +269,7 @@ impl Core { } fn next_task(&mut self, handle: &Handle) -> Option { - if self.tick % handle.shared.config.global_queue_interval == 0 { + if self.tick % self.global_queue_interval == 0 { handle .next_remote_task() .or_else(|| self.next_local_task(handle)) @@ -344,7 +358,6 @@ impl Context { }); core = c; - core.metrics.returned_from_park(); } if let Some(f) = &handle.shared.config.after_unpark { @@ -603,6 +616,8 @@ impl CoreGuard<'_> { pin!(future); + core.metrics.start_processing_scheduled_tasks(); + 'outer: loop { let handle = &context.handle; @@ -631,12 +646,16 @@ impl CoreGuard<'_> { let task = match entry { Some(entry) => entry, None => { + core.metrics.end_processing_scheduled_tasks(); + core = if did_defer_tasks() { context.park_yield(core, handle) } else { context.park(core, handle) }; + core.metrics.start_processing_scheduled_tasks(); + // Try polling the `block_on` future next continue 'outer; } @@ -651,9 +670,13 @@ impl CoreGuard<'_> { core = c; } + core.metrics.end_processing_scheduled_tasks(); + // Yield to the driver, this drives the timer and pulls any // pending I/O events. core = context.park_yield(core, handle); + + core.metrics.start_processing_scheduled_tasks(); } }); diff --git a/tokio/src/runtime/scheduler/multi_thread/mod.rs b/tokio/src/runtime/scheduler/multi_thread/mod.rs index 0a1c63de9b1..9b225335bcf 100644 --- a/tokio/src/runtime/scheduler/multi_thread/mod.rs +++ b/tokio/src/runtime/scheduler/multi_thread/mod.rs @@ -9,6 +9,9 @@ pub(crate) use handle::Handle; mod idle; use self::idle::Idle; +mod stats; +pub(crate) use stats::Stats; + mod park; pub(crate) use park::{Parker, Unparker}; diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index dd132fb9a6d..2a0ae833850 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -2,8 +2,8 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::Arc; +use crate::runtime::scheduler::multi_thread::Stats; use crate::runtime::task::{self, Inject}; -use crate::runtime::MetricsBatch; use std::mem::{self, MaybeUninit}; use std::ptr; @@ -186,7 +186,7 @@ impl Local { &mut self, mut task: task::Notified, inject: &Inject, - metrics: &mut MetricsBatch, + stats: &mut Stats, ) { let tail = loop { let head = self.inner.head.load(Acquire); @@ -206,7 +206,7 @@ impl Local { } else { // Push the current task and half of the queue into the // inject queue. - match self.push_overflow(task, real, tail, inject, metrics) { + match self.push_overflow(task, real, tail, inject, stats) { Ok(_) => return, // Lost the race, try again Err(v) => { @@ -253,7 +253,7 @@ impl Local { head: UnsignedShort, tail: UnsignedShort, inject: &Inject, - metrics: &mut MetricsBatch, + stats: &mut Stats, ) -> Result<(), task::Notified> { /// How many elements are we taking from the local queue. /// @@ -338,7 +338,7 @@ impl Local { inject.push_batch(batch_iter.chain(std::iter::once(task))); // Add 1 to factor in the task currently being scheduled. - metrics.incr_overflow_count(); + stats.incr_overflow_count(); Ok(()) } @@ -394,7 +394,7 @@ impl Steal { pub(crate) fn steal_into( &self, dst: &mut Local, - dst_metrics: &mut MetricsBatch, + dst_stats: &mut Stats, ) -> Option> { // Safety: the caller is the only thread that mutates `dst.tail` and // holds a mutable reference. @@ -420,8 +420,8 @@ impl Steal { return None; } - dst_metrics.incr_steal_count(n as u16); - dst_metrics.incr_steal_operations(); + dst_stats.incr_steal_count(n as u16); + dst_stats.incr_steal_operations(); // We are returning a task here n -= 1; diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs new file mode 100644 index 00000000000..00b0c9ea309 --- /dev/null +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -0,0 +1,138 @@ +use crate::runtime::{Config, MetricsBatch, WorkerMetrics}; + +use std::cmp; +use std::time::{Duration, Instant}; + +/// Per-worker statistics. This is used for both tuning the scheduler and +/// reporting runtime-level metrics/stats. +pub(crate) struct Stats { + /// The metrics batch used to report runtime-level metrics/stats to the + /// user. + batch: MetricsBatch, + + /// Instant at which work last resumed (continued after park). + /// + /// This duplicates the value stored in `MetricsBatch`. We will unify + /// `Stats` and `MetricsBatch` when we stabilize metrics. + processing_scheduled_tasks_started_at: Instant, + + /// Number of tasks polled in the batch of scheduled tasks + tasks_polled_in_batch: usize, + + /// Exponentially-weighted moving average of time spent polling scheduled a + /// task. + /// + /// Tracked in nanoseconds, stored as a f64 since that is what we use with + /// the EWMA calculations + task_poll_time_ewma: f64, +} + +// How to weigh each individual poll time, value is plucked from thin air. +const TASK_POLL_TIME_EWMA_ALPHA: f64 = 0.1; + +/// Ideally, we wouldn't go above this, value is plucked from thin air. +const TARGET_GLOBAL_QUEUE_INTERVAL: f64 = Duration::from_micros(200).as_nanos() as f64; + +/// Max value for the global queue interval. This is 2x the previous default +const MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 127; + +/// This is the previous default +const TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 61; + +impl Stats { + pub(crate) fn new(worker_metrics: &WorkerMetrics) -> Stats { + // Seed the value with what we hope to see. + let task_poll_time_ewma = + TARGET_GLOBAL_QUEUE_INTERVAL / TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL as f64; + + Stats { + batch: MetricsBatch::new(worker_metrics), + processing_scheduled_tasks_started_at: Instant::now(), + tasks_polled_in_batch: 0, + task_poll_time_ewma: task_poll_time_ewma, + } + } + + pub(crate) fn tuned_global_queue_interval(&self, config: &Config) -> u32 { + // If an interval is explicitly set, don't tune. + if let Some(configured) = config.global_queue_interval { + return configured; + } + + // As of Rust 1.45, casts from f64 -> u32 are saturating, which is fine here. + let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32; + + cmp::max( + 1, + cmp::min( + MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL, + tasks_per_interval, + ), + ) + } + + pub(crate) fn submit(&mut self, to: &WorkerMetrics) { + self.batch.submit(to); + } + + pub(crate) fn about_to_park(&mut self) { + self.batch.about_to_park(); + } + + pub(crate) fn inc_local_schedule_count(&mut self) { + self.batch.inc_local_schedule_count(); + } + + pub(crate) fn start_processing_scheduled_tasks(&mut self) { + self.batch.start_processing_scheduled_tasks(); + + self.processing_scheduled_tasks_started_at = Instant::now(); + self.tasks_polled_in_batch = 0; + } + + pub(crate) fn end_processing_scheduled_tasks(&mut self) { + self.batch.end_processing_scheduled_tasks(); + + // Update the EWMA task poll time + if self.tasks_polled_in_batch > 0 { + let now = Instant::now(); + + // If we "overflow" this conversion, we have bigger problems than + // slightly off stats. + let elapsed = (now - self.processing_scheduled_tasks_started_at).as_nanos() as f64; + let num_polls = self.tasks_polled_in_batch as f64; + + // Calculate the mean poll duration for a single task in the batch + let mean_poll_duration = elapsed / num_polls; + + // Compute the alpha weighted by the number of tasks polled this batch. + let weighted_alpha = 1.0 - (1.0 - TASK_POLL_TIME_EWMA_ALPHA).powf(num_polls); + + // Now compute the new weighted average task poll time. + self.task_poll_time_ewma = weighted_alpha * mean_poll_duration + + (1.0 - weighted_alpha) * self.task_poll_time_ewma; + } + } + + pub(crate) fn start_poll(&mut self) { + self.batch.start_poll(); + + self.tasks_polled_in_batch += 1; + } + + pub(crate) fn end_poll(&mut self) { + self.batch.end_poll(); + } + + pub(crate) fn incr_steal_count(&mut self, by: u16) { + self.batch.incr_steal_count(by); + } + + pub(crate) fn incr_steal_operations(&mut self) { + self.batch.incr_steal_operations(); + } + + pub(crate) fn incr_overflow_count(&mut self) { + self.batch.incr_overflow_count(); + } +} diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index e2bbb643db7..3ce07825a00 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -59,10 +59,12 @@ use crate::loom::sync::{Arc, Mutex}; use crate::runtime; use crate::runtime::context; -use crate::runtime::scheduler::multi_thread::{queue, Counters, Handle, Idle, Parker, Unparker}; +use crate::runtime::scheduler::multi_thread::{ + queue, Counters, Handle, Idle, Parker, Stats, Unparker, +}; use crate::runtime::task::{Inject, OwnedTasks}; use crate::runtime::{ - blocking, coop, driver, scheduler, task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics, + blocking, coop, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics, }; use crate::util::atomic_cell::AtomicCell; use crate::util::rand::{FastRand, RngSeedGenerator}; @@ -114,8 +116,11 @@ struct Core { /// borrow checker happy. park: Option, - /// Batching metrics so they can be submitted to RuntimeMetrics. - metrics: MetricsBatch, + /// Per-worker runtime stats + stats: Stats, + + /// How often to check the global queue + global_queue_interval: u32, /// Fast random number generator. rand: FastRand, @@ -220,6 +225,7 @@ pub(super) fn create( let park = park.clone(); let unpark = park.unpark(); let metrics = WorkerMetrics::from_config(&config); + let stats = Stats::new(&metrics); cores.push(Box::new(Core { tick: 0, @@ -229,7 +235,8 @@ pub(super) fn create( is_searching: false, is_shutdown: false, park: Some(park), - metrics: MetricsBatch::new(&metrics), + global_queue_interval: stats.tuned_global_queue_interval(&config), + stats, rand: FastRand::new(config.seed_generator.next_seed()), })); @@ -437,6 +444,10 @@ impl Context { // a task that had the LIFO slot disabled. self.reset_lifo_enabled(&mut core); + // Start as "processing" tasks as polling tasks from the local queue + // will be one of the first things we do. + core.stats.start_processing_scheduled_tasks(); + while !core.is_shutdown { self.assert_lifo_enabled_is_correct(&core); @@ -452,9 +463,14 @@ impl Context { continue; } + // We consumed all work in the queues and will start searching for work. + core.stats.end_processing_scheduled_tasks(); + // There is no more **local** work to process, try to steal work // from other workers. if let Some(task) = core.steal_work(&self.worker) { + // Found work, switch back to processing + core.stats.start_processing_scheduled_tasks(); core = self.run_task(task, core)?; } else { // Wait for work @@ -483,7 +499,7 @@ impl Context { self.assert_lifo_enabled_is_correct(&core); // Make the core available to the runtime context - core.metrics.start_poll(); + core.stats.start_poll(); *self.core.borrow_mut() = Some(core); // Run the task @@ -511,24 +527,27 @@ impl Context { // was called, turning the poll into a "blocking op". In this // case, we don't want to measure the poll time as it doesn't // really count as an async poll anymore. - core.metrics.end_poll(); + // core.stats.end_poll(); // Check for a task in the LIFO slot let task = match core.lifo_slot.take() { Some(task) => task, None => { self.reset_lifo_enabled(&mut core); + core.stats.end_poll(); return Ok(core); } }; if !coop::has_budget_remaining() { + core.stats.end_poll(); + // Not enough budget left to run the LIFO task, push it to // the back of the queue and return. core.run_queue.push_back_or_overflow( task, self.worker.inject(), - &mut core.metrics, + &mut core.stats, ); // If we hit this point, the LIFO slot should be enabled. // There is no need to reset it. @@ -553,7 +572,6 @@ impl Context { } // Run the LIFO task, then loop - core.metrics.start_poll(); *self.core.borrow_mut() = Some(core); let task = self.worker.handle.shared.owned.assert_owner(task); task.run(); @@ -576,12 +594,16 @@ impl Context { if core.tick % self.worker.handle.shared.config.event_interval == 0 { super::counters::inc_num_maintenance(); + core.stats.end_processing_scheduled_tasks(); + // Call `park` with a 0 timeout. This enables the I/O driver, timer, ... // to run without actually putting the thread to sleep. core = self.park_timeout(core, Some(Duration::from_millis(0))); // Run regularly scheduled maintenance core.maintenance(&self.worker); + + core.stats.start_processing_scheduled_tasks(); } core @@ -605,9 +627,8 @@ impl Context { if core.transition_to_parked(&self.worker) { while !core.is_shutdown { - core.metrics.about_to_park(); + core.stats.about_to_park(); core = self.park_timeout(core, None); - core.metrics.returned_from_park(); // Run regularly scheduled maintenance core.maintenance(&self.worker); @@ -666,7 +687,10 @@ impl Core { /// Return the next notified task available to this worker. fn next_task(&mut self, worker: &Worker) -> Option { - if self.tick % worker.handle.shared.config.global_queue_interval == 0 { + if self.tick % self.global_queue_interval == 0 { + // Update the global queue interval, if needed + self.tune_global_queue_interval(worker); + worker.inject().pop().or_else(|| self.next_local_task()) } else { let maybe_task = self.next_local_task(); @@ -733,7 +757,7 @@ impl Core { let target = &worker.handle.shared.remotes[i]; if let Some(task) = target .steal - .steal_into(&mut self.run_queue, &mut self.metrics) + .steal_into(&mut self.run_queue, &mut self.stats) { return Some(task); } @@ -813,7 +837,7 @@ impl Core { /// Runs maintenance work such as checking the pool's state. fn maintenance(&mut self, worker: &Worker) { - self.metrics + self.stats .submit(&worker.handle.shared.worker_metrics[worker.index]); if !self.is_shutdown { @@ -828,7 +852,7 @@ impl Core { // Signal to all tasks to shut down. worker.handle.shared.owned.close_and_shutdown_all(); - self.metrics + self.stats .submit(&worker.handle.shared.worker_metrics[worker.index]); } @@ -842,6 +866,17 @@ impl Core { park.shutdown(&handle.driver); } + + fn tune_global_queue_interval(&mut self, worker: &Worker) { + let next = self + .stats + .tuned_global_queue_interval(&worker.handle.shared.config); + + // Smooth out jitter + if self.global_queue_interval.abs_diff(next) > 2 { + self.global_queue_interval = next; + } + } } impl Worker { @@ -888,7 +923,7 @@ impl Handle { } fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) { - core.metrics.inc_local_schedule_count(); + core.stats.inc_local_schedule_count(); // Spawning from the worker thread. If scheduling a "yield" then the // task must always be pushed to the back of the queue, enabling other @@ -896,7 +931,7 @@ impl Handle { // flexibility and the task may go to the front of the queue. let should_notify = if is_yield || !core.lifo_enabled { core.run_queue - .push_back_or_overflow(task, &self.shared.inject, &mut core.metrics); + .push_back_or_overflow(task, &self.shared.inject, &mut core.stats); true } else { // Push to the LIFO slot @@ -905,7 +940,7 @@ impl Handle { if let Some(prev) = prev { core.run_queue - .push_back_or_overflow(prev, &self.shared.inject, &mut core.metrics); + .push_back_or_overflow(prev, &self.shared.inject, &mut core.stats); } core.lifo_slot = Some(task); diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index 09f249e9ea4..2e42f1bbb9e 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -1,18 +1,17 @@ -use crate::runtime::scheduler::multi_thread::queue; +use crate::runtime::scheduler::multi_thread::{queue, Stats}; use crate::runtime::task::{self, Inject, Schedule, Task}; -use crate::runtime::MetricsBatch; use std::thread; use std::time::Duration; #[allow(unused)] macro_rules! assert_metrics { - ($metrics:ident, $field:ident == $v:expr) => {{ + ($stats:ident, $field:ident == $v:expr) => {{ use crate::runtime::WorkerMetrics; use std::sync::atomic::Ordering::Relaxed; let worker = WorkerMetrics::new(); - $metrics.submit(&worker); + $stats.submit(&worker); let expect = $v; let actual = worker.$field.load(Relaxed); @@ -21,24 +20,24 @@ macro_rules! assert_metrics { }}; } -fn metrics_batch() -> MetricsBatch { +fn new_stats() -> Stats { use crate::runtime::WorkerMetrics; - MetricsBatch::new(&WorkerMetrics::new()) + Stats::new(&WorkerMetrics::new()) } #[test] fn fits_256_one_at_a_time() { let (_, mut local) = queue::local(); let inject = Inject::new(); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); for _ in 0..256 { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); } cfg_metrics! { - assert_metrics!(metrics, overflow_count == 0); + assert_metrics!(stats, overflow_count == 0); } assert!(inject.pop().is_none()); @@ -88,15 +87,15 @@ fn fits_256_all_in_chunks() { fn overflow() { let (_, mut local) = queue::local(); let inject = Inject::new(); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); for _ in 0..257 { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); } cfg_metrics! { - assert_metrics!(metrics, overflow_count == 1); + assert_metrics!(stats, overflow_count == 1); } let mut n = 0; @@ -114,7 +113,7 @@ fn overflow() { #[test] fn steal_batch() { - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let (steal1, mut local1) = queue::local(); let (_, mut local2) = queue::local(); @@ -122,13 +121,13 @@ fn steal_batch() { for _ in 0..4 { let (task, _) = super::unowned(async {}); - local1.push_back_or_overflow(task, &inject, &mut metrics); + local1.push_back_or_overflow(task, &inject, &mut stats); } - assert!(steal1.steal_into(&mut local2, &mut metrics).is_some()); + assert!(steal1.steal_into(&mut local2, &mut stats).is_some()); cfg_metrics! { - assert_metrics!(metrics, steal_count == 2); + assert_metrics!(stats, steal_count == 2); } for _ in 0..1 { @@ -160,19 +159,19 @@ fn stress1() { const NUM_PUSH: usize = normal_or_miri(500, 10); const NUM_POP: usize = normal_or_miri(250, 10); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); for _ in 0..NUM_ITER { let (steal, mut local) = queue::local(); let inject = Inject::new(); let th = thread::spawn(move || { - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let (_, mut local) = queue::local(); let mut n = 0; for _ in 0..NUM_STEAL { - if steal.steal_into(&mut local, &mut metrics).is_some() { + if steal.steal_into(&mut local, &mut stats).is_some() { n += 1; } @@ -184,7 +183,7 @@ fn stress1() { } cfg_metrics! { - assert_metrics!(metrics, steal_count == n as _); + assert_metrics!(stats, steal_count == n as _); } n @@ -195,7 +194,7 @@ fn stress1() { for _ in 0..NUM_LOCAL { for _ in 0..NUM_PUSH { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); } for _ in 0..NUM_POP { @@ -223,14 +222,14 @@ fn stress2() { const NUM_TASKS: usize = normal_or_miri(1_000_000, 50); const NUM_STEAL: usize = normal_or_miri(1_000, 10); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); for _ in 0..NUM_ITER { let (steal, mut local) = queue::local(); let inject = Inject::new(); let th = thread::spawn(move || { - let mut stats = metrics_batch(); + let mut stats = new_stats(); let (_, mut local) = queue::local(); let mut n = 0; @@ -253,7 +252,7 @@ fn stress2() { for i in 0..NUM_TASKS { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); if i % 128 == 0 && local.pop().is_some() { num_pop += 1;