diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 648bc76dc4b..7aa75b7855d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -366,14 +366,14 @@ jobs: # First run with all features (including parking_lot) - run: cross test -p tokio --all-features --target ${{ matrix.target }} --tests env: - RUSTFLAGS: --cfg tokio_unstable -Dwarnings --cfg tokio_no_ipv6 ${{ matrix.rustflags }} + RUSTFLAGS: --cfg tokio_unstable -Dwarnings --cfg tokio_no_ipv6 --cfg tokio_cross ${{ matrix.rustflags }} # Now run without parking_lot - name: Remove `parking_lot` from `full` feature run: sed -i '0,/parking_lot/{/parking_lot/d;}' tokio/Cargo.toml # The `tokio_no_parking_lot` cfg is here to ensure the `sed` above does not silently break. - run: cross test -p tokio --features full,test-util --target ${{ matrix.target }} --tests env: - RUSTFLAGS: --cfg tokio_unstable -Dwarnings --cfg tokio_no_ipv6 --cfg tokio_no_parking_lot ${{ matrix.rustflags }} + RUSTFLAGS: --cfg tokio_unstable -Dwarnings --cfg tokio_no_ipv6 --cfg tokio_no_parking_lot --cfg tokio_cross ${{ matrix.rustflags }} # See https://github.com/tokio-rs/tokio/issues/5187 no-atomic-u64: @@ -393,7 +393,7 @@ jobs: - uses: taiki-e/setup-cross-toolchain-action@v1 with: target: i686-unknown-linux-gnu - - run: cargo test -Zbuild-std --target target-specs/i686-unknown-linux-gnu.json -p tokio --all-features + - run: cargo test -Zbuild-std --target target-specs/i686-unknown-linux-gnu.json -p tokio --all-features -- --test-threads 1 --nocapture env: RUSTFLAGS: --cfg tokio_unstable --cfg tokio_taskdump -Dwarnings --cfg tokio_no_atomic_u64 # https://github.com/tokio-rs/tokio/pull/5356 diff --git a/benches/rt_multi_threaded.rs b/benches/rt_multi_threaded.rs index 88553e4ab50..689c334b6d7 100644 --- a/benches/rt_multi_threaded.rs +++ b/benches/rt_multi_threaded.rs @@ -6,12 +6,14 @@ use tokio::runtime::{self, Runtime}; use tokio::sync::oneshot; use bencher::{benchmark_group, benchmark_main, Bencher}; -use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::{mpsc, Arc}; +use std::time::{Duration, Instant}; const NUM_WORKERS: usize = 4; const NUM_SPAWN: usize = 10_000; +const STALL_DUR: Duration = Duration::from_micros(10); fn spawn_many_local(b: &mut Bencher) { let rt = rt(); @@ -57,19 +59,64 @@ fn spawn_many_remote_idle(b: &mut Bencher) { }); } -fn spawn_many_remote_busy(b: &mut Bencher) { +// The runtime is busy with tasks that consume CPU time and yield. Yielding is a +// lower notification priority than spawning / regular notification. +fn spawn_many_remote_busy1(b: &mut Bencher) { let rt = rt(); let rt_handle = rt.handle(); let mut handles = Vec::with_capacity(NUM_SPAWN); + let flag = Arc::new(AtomicBool::new(true)); // Spawn some tasks to keep the runtimes busy for _ in 0..(2 * NUM_WORKERS) { - rt.spawn(async { - loop { + let flag = flag.clone(); + rt.spawn(async move { + while flag.load(Relaxed) { tokio::task::yield_now().await; - std::thread::sleep(std::time::Duration::from_micros(10)); + stall(); + } + }); + } + + b.iter(|| { + for _ in 0..NUM_SPAWN { + handles.push(rt_handle.spawn(async {})); + } + + rt.block_on(async { + for handle in handles.drain(..) { + handle.await.unwrap(); } }); + }); + + flag.store(false, Relaxed); +} + +// The runtime is busy with tasks that consume CPU time and spawn new high-CPU +// tasks. Spawning goes via a higher notification priority than yielding. +fn spawn_many_remote_busy2(b: &mut Bencher) { + const NUM_SPAWN: usize = 1_000; + + let rt = rt(); + let rt_handle = rt.handle(); + let mut handles = Vec::with_capacity(NUM_SPAWN); + let flag = Arc::new(AtomicBool::new(true)); + + // Spawn some tasks to keep the runtimes busy + for _ in 0..(NUM_WORKERS) { + let flag = flag.clone(); + fn iter(flag: Arc) { + tokio::spawn(async { + if flag.load(Relaxed) { + stall(); + iter(flag); + } + }); + } + rt.spawn(async { + iter(flag); + }); } b.iter(|| { @@ -83,6 +130,8 @@ fn spawn_many_remote_busy(b: &mut Bencher) { } }); }); + + flag.store(false, Relaxed); } fn yield_many(b: &mut Bencher) { @@ -193,11 +242,19 @@ fn rt() -> Runtime { .unwrap() } +fn stall() { + let now = Instant::now(); + while now.elapsed() < STALL_DUR { + std::thread::yield_now(); + } +} + benchmark_group!( scheduler, spawn_many_local, spawn_many_remote_idle, - spawn_many_remote_busy, + spawn_many_remote_busy1, + spawn_many_remote_busy2, ping_pong, yield_many, chained_spawn, diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index dda21a3ae27..af9e0e172f3 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -82,7 +82,13 @@ pub struct Builder { pub(super) keep_alive: Option, /// How many ticks before pulling a task from the global/remote queue? - pub(super) global_queue_interval: u32, + /// + /// When `None`, the value is unspecified and behavior details are left to + /// the scheduler. Each scheduler flavor could choose to either pick its own + /// default value or use some other strategy to decide when to poll from the + /// global queue. For example, the multi-threaded scheduler uses a + /// self-tuning strategy based on mean task poll times. + pub(super) global_queue_interval: Option, /// How many ticks before yielding to the driver for timer and I/O events? pub(super) event_interval: u32, @@ -211,7 +217,7 @@ impl Builder { #[cfg(not(loom))] const EVENT_INTERVAL: u32 = 61; - Builder::new(Kind::CurrentThread, 31, EVENT_INTERVAL) + Builder::new(Kind::CurrentThread, EVENT_INTERVAL) } cfg_not_wasi! { @@ -222,7 +228,7 @@ impl Builder { #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] pub fn new_multi_thread() -> Builder { // The number `61` is fairly arbitrary. I believe this value was copied from golang. - Builder::new(Kind::MultiThread, 61, 61) + Builder::new(Kind::MultiThread, 61) } } @@ -230,7 +236,7 @@ impl Builder { /// values. /// /// Configuration methods can be chained on the return value. - pub(crate) fn new(kind: Kind, global_queue_interval: u32, event_interval: u32) -> Builder { + pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder { Builder { kind, @@ -266,7 +272,7 @@ impl Builder { // Defaults for these values depend on the scheduler kind, so we get them // as parameters. - global_queue_interval, + global_queue_interval: None, event_interval, seed_generator: RngSeedGenerator::new(RngSeed::new()), @@ -716,7 +722,7 @@ impl Builder { /// # } /// ``` pub fn global_queue_interval(&mut self, val: u32) -> &mut Self { - self.global_queue_interval = val; + self.global_queue_interval = Some(val); self } diff --git a/tokio/src/runtime/config.rs b/tokio/src/runtime/config.rs index 4af5eee472f..c42e4fe5a80 100644 --- a/tokio/src/runtime/config.rs +++ b/tokio/src/runtime/config.rs @@ -4,7 +4,7 @@ use crate::util::RngSeedGenerator; pub(crate) struct Config { /// How many ticks before pulling a task from the global/remote queue? - pub(crate) global_queue_interval: u32, + pub(crate) global_queue_interval: Option, /// How many ticks before yielding to the driver for timer and I/O events? pub(crate) event_interval: u32, diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index 296792d7fb8..1bb4e261f73 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -34,7 +34,7 @@ pub(crate) struct MetricsBatch { busy_duration_total: u64, /// Instant at which work last resumed (continued after park). - last_resume_time: Instant, + processing_scheduled_tasks_started_at: Instant, /// If `Some`, tracks poll times in nanoseconds poll_timer: Option, @@ -62,7 +62,7 @@ impl MetricsBatch { local_schedule_count: 0, overflow_count: 0, busy_duration_total: 0, - last_resume_time: now, + processing_scheduled_tasks_started_at: now, poll_timer: worker_metrics .poll_count_histogram .as_ref() @@ -106,11 +106,20 @@ impl MetricsBatch { } else { self.poll_count_on_last_park = self.poll_count; } + } + + /// Start processing a batch of tasks + pub(crate) fn start_processing_scheduled_tasks(&mut self) { + self.processing_scheduled_tasks_started_at = Instant::now(); + } - let busy_duration = self.last_resume_time.elapsed(); + /// Stop processing a batch of tasks + pub(crate) fn end_processing_scheduled_tasks(&mut self) { + let busy_duration = self.processing_scheduled_tasks_started_at.elapsed(); self.busy_duration_total += duration_as_u64(busy_duration); } + /// Start polling an individual task pub(crate) fn start_poll(&mut self) { self.poll_count += 1; @@ -119,6 +128,7 @@ impl MetricsBatch { } } + /// Stop polling an individual task pub(crate) fn end_poll(&mut self) { if let Some(poll_timer) = &mut self.poll_timer { let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed()); @@ -126,10 +136,6 @@ impl MetricsBatch { } } - pub(crate) fn returned_from_park(&mut self) { - self.last_resume_time = Instant::now(); - } - pub(crate) fn inc_local_schedule_count(&mut self) { self.local_schedule_count += 1; } diff --git a/tokio/src/runtime/metrics/mock.rs b/tokio/src/runtime/metrics/mock.rs index 4d5e7b5b152..8f8345c08b4 100644 --- a/tokio/src/runtime/metrics/mock.rs +++ b/tokio/src/runtime/metrics/mock.rs @@ -39,8 +39,9 @@ impl MetricsBatch { pub(crate) fn submit(&mut self, _to: &WorkerMetrics) {} pub(crate) fn about_to_park(&mut self) {} - pub(crate) fn returned_from_park(&mut self) {} pub(crate) fn inc_local_schedule_count(&mut self) {} + pub(crate) fn start_processing_scheduled_tasks(&mut self) {} + pub(crate) fn end_processing_scheduled_tasks(&mut self) {} pub(crate) fn start_poll(&mut self) {} pub(crate) fn end_poll(&mut self) {} } diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index eb20b7ba027..9a0578c455a 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -59,6 +59,9 @@ struct Core { /// Metrics batch metrics: MetricsBatch, + /// How often to check the global queue + global_queue_interval: u32, + /// True if a task panicked without being handled and the runtime is /// configured to shutdown on unhandled panic. unhandled_panic: bool, @@ -102,6 +105,11 @@ type Notified = task::Notified>; /// Initial queue capacity. const INITIAL_CAPACITY: usize = 64; +/// Used if none is specified. This is a temporary constant and will be removed +/// as we unify tuning logic between the multi-thread and current-thread +/// schedulers. +const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31; + impl CurrentThread { pub(crate) fn new( driver: Driver, @@ -112,6 +120,11 @@ impl CurrentThread { ) -> (CurrentThread, Arc) { let worker_metrics = WorkerMetrics::from_config(&config); + // Get the configured global queue interval, or use the default. + let global_queue_interval = config + .global_queue_interval + .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL); + let handle = Arc::new(Handle { shared: Shared { inject: Inject::new(), @@ -131,6 +144,7 @@ impl CurrentThread { tick: 0, driver: Some(driver), metrics: MetricsBatch::new(&handle.shared.worker_metrics), + global_queue_interval, unhandled_panic: false, }))); @@ -273,7 +287,7 @@ impl Core { } fn next_task(&mut self, handle: &Handle) -> Option { - if self.tick % handle.shared.config.global_queue_interval == 0 { + if self.tick % self.global_queue_interval == 0 { handle .next_remote_task() .or_else(|| self.next_local_task(handle)) @@ -362,7 +376,6 @@ impl Context { }); core = c; - core.metrics.returned_from_park(); } if let Some(f) = &handle.shared.config.after_unpark { @@ -626,6 +639,8 @@ impl CoreGuard<'_> { pin!(future); + core.metrics.start_processing_scheduled_tasks(); + 'outer: loop { let handle = &context.handle; @@ -654,12 +669,16 @@ impl CoreGuard<'_> { let task = match entry { Some(entry) => entry, None => { + core.metrics.end_processing_scheduled_tasks(); + core = if did_defer_tasks() { context.park_yield(core, handle) } else { context.park(core, handle) }; + core.metrics.start_processing_scheduled_tasks(); + // Try polling the `block_on` future next continue 'outer; } @@ -674,9 +693,13 @@ impl CoreGuard<'_> { core = c; } + core.metrics.end_processing_scheduled_tasks(); + // Yield to the driver, this drives the timer and pulls any // pending I/O events. core = context.park_yield(core, handle); + + core.metrics.start_processing_scheduled_tasks(); } }); diff --git a/tokio/src/runtime/scheduler/multi_thread/mod.rs b/tokio/src/runtime/scheduler/multi_thread/mod.rs index fed00b76459..67a7890a1de 100644 --- a/tokio/src/runtime/scheduler/multi_thread/mod.rs +++ b/tokio/src/runtime/scheduler/multi_thread/mod.rs @@ -9,6 +9,9 @@ pub(crate) use handle::Handle; mod idle; use self::idle::Idle; +mod stats; +pub(crate) use stats::Stats; + mod park; pub(crate) use park::{Parker, Unparker}; diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index dd132fb9a6d..2a0ae833850 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -2,8 +2,8 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::Arc; +use crate::runtime::scheduler::multi_thread::Stats; use crate::runtime::task::{self, Inject}; -use crate::runtime::MetricsBatch; use std::mem::{self, MaybeUninit}; use std::ptr; @@ -186,7 +186,7 @@ impl Local { &mut self, mut task: task::Notified, inject: &Inject, - metrics: &mut MetricsBatch, + stats: &mut Stats, ) { let tail = loop { let head = self.inner.head.load(Acquire); @@ -206,7 +206,7 @@ impl Local { } else { // Push the current task and half of the queue into the // inject queue. - match self.push_overflow(task, real, tail, inject, metrics) { + match self.push_overflow(task, real, tail, inject, stats) { Ok(_) => return, // Lost the race, try again Err(v) => { @@ -253,7 +253,7 @@ impl Local { head: UnsignedShort, tail: UnsignedShort, inject: &Inject, - metrics: &mut MetricsBatch, + stats: &mut Stats, ) -> Result<(), task::Notified> { /// How many elements are we taking from the local queue. /// @@ -338,7 +338,7 @@ impl Local { inject.push_batch(batch_iter.chain(std::iter::once(task))); // Add 1 to factor in the task currently being scheduled. - metrics.incr_overflow_count(); + stats.incr_overflow_count(); Ok(()) } @@ -394,7 +394,7 @@ impl Steal { pub(crate) fn steal_into( &self, dst: &mut Local, - dst_metrics: &mut MetricsBatch, + dst_stats: &mut Stats, ) -> Option> { // Safety: the caller is the only thread that mutates `dst.tail` and // holds a mutable reference. @@ -420,8 +420,8 @@ impl Steal { return None; } - dst_metrics.incr_steal_count(n as u16); - dst_metrics.incr_steal_operations(); + dst_stats.incr_steal_count(n as u16); + dst_stats.incr_steal_operations(); // We are returning a task here n -= 1; diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs new file mode 100644 index 00000000000..f01daaa1bff --- /dev/null +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -0,0 +1,140 @@ +use crate::runtime::{Config, MetricsBatch, WorkerMetrics}; + +use std::cmp; +use std::time::{Duration, Instant}; + +/// Per-worker statistics. This is used for both tuning the scheduler and +/// reporting runtime-level metrics/stats. +pub(crate) struct Stats { + /// The metrics batch used to report runtime-level metrics/stats to the + /// user. + batch: MetricsBatch, + + /// Instant at which work last resumed (continued after park). + /// + /// This duplicates the value stored in `MetricsBatch`. We will unify + /// `Stats` and `MetricsBatch` when we stabilize metrics. + processing_scheduled_tasks_started_at: Instant, + + /// Number of tasks polled in the batch of scheduled tasks + tasks_polled_in_batch: usize, + + /// Exponentially-weighted moving average of time spent polling scheduled a + /// task. + /// + /// Tracked in nanoseconds, stored as a f64 since that is what we use with + /// the EWMA calculations + task_poll_time_ewma: f64, +} + +/// How to weigh each individual poll time, value is plucked from thin air. +const TASK_POLL_TIME_EWMA_ALPHA: f64 = 0.1; + +/// Ideally, we wouldn't go above this, value is plucked from thin air. +const TARGET_GLOBAL_QUEUE_INTERVAL: f64 = Duration::from_micros(200).as_nanos() as f64; + +/// Max value for the global queue interval. This is 2x the previous default +const MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 127; + +/// This is the previous default +const TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 61; + +impl Stats { + pub(crate) fn new(worker_metrics: &WorkerMetrics) -> Stats { + // Seed the value with what we hope to see. + let task_poll_time_ewma = + TARGET_GLOBAL_QUEUE_INTERVAL / TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL as f64; + + Stats { + batch: MetricsBatch::new(worker_metrics), + processing_scheduled_tasks_started_at: Instant::now(), + tasks_polled_in_batch: 0, + task_poll_time_ewma, + } + } + + pub(crate) fn tuned_global_queue_interval(&self, config: &Config) -> u32 { + // If an interval is explicitly set, don't tune. + if let Some(configured) = config.global_queue_interval { + return configured; + } + + // As of Rust 1.45, casts from f64 -> u32 are saturating, which is fine here. + let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32; + + cmp::max( + // We don't want to return less than 2 as that would result in the + // global queue always getting checked first. + 2, + cmp::min( + MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL, + tasks_per_interval, + ), + ) + } + + pub(crate) fn submit(&mut self, to: &WorkerMetrics) { + self.batch.submit(to); + } + + pub(crate) fn about_to_park(&mut self) { + self.batch.about_to_park(); + } + + pub(crate) fn inc_local_schedule_count(&mut self) { + self.batch.inc_local_schedule_count(); + } + + pub(crate) fn start_processing_scheduled_tasks(&mut self) { + self.batch.start_processing_scheduled_tasks(); + + self.processing_scheduled_tasks_started_at = Instant::now(); + self.tasks_polled_in_batch = 0; + } + + pub(crate) fn end_processing_scheduled_tasks(&mut self) { + self.batch.end_processing_scheduled_tasks(); + + // Update the EWMA task poll time + if self.tasks_polled_in_batch > 0 { + let now = Instant::now(); + + // If we "overflow" this conversion, we have bigger problems than + // slightly off stats. + let elapsed = (now - self.processing_scheduled_tasks_started_at).as_nanos() as f64; + let num_polls = self.tasks_polled_in_batch as f64; + + // Calculate the mean poll duration for a single task in the batch + let mean_poll_duration = elapsed / num_polls; + + // Compute the alpha weighted by the number of tasks polled this batch. + let weighted_alpha = 1.0 - (1.0 - TASK_POLL_TIME_EWMA_ALPHA).powf(num_polls); + + // Now compute the new weighted average task poll time. + self.task_poll_time_ewma = weighted_alpha * mean_poll_duration + + (1.0 - weighted_alpha) * self.task_poll_time_ewma; + } + } + + pub(crate) fn start_poll(&mut self) { + self.batch.start_poll(); + + self.tasks_polled_in_batch += 1; + } + + pub(crate) fn end_poll(&mut self) { + self.batch.end_poll(); + } + + pub(crate) fn incr_steal_count(&mut self, by: u16) { + self.batch.incr_steal_count(by); + } + + pub(crate) fn incr_steal_operations(&mut self) { + self.batch.incr_steal_operations(); + } + + pub(crate) fn incr_overflow_count(&mut self) { + self.batch.incr_overflow_count(); + } +} diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index bedaff39e4f..02d7fce6546 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -59,10 +59,12 @@ use crate::loom::sync::{Arc, Mutex}; use crate::runtime; use crate::runtime::context; -use crate::runtime::scheduler::multi_thread::{queue, Counters, Handle, Idle, Parker, Unparker}; +use crate::runtime::scheduler::multi_thread::{ + queue, Counters, Handle, Idle, Parker, Stats, Unparker, +}; use crate::runtime::task::{Inject, OwnedTasks}; use crate::runtime::{ - blocking, coop, driver, scheduler, task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics, + blocking, coop, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics, }; use crate::util::atomic_cell::AtomicCell; use crate::util::rand::{FastRand, RngSeedGenerator}; @@ -114,8 +116,11 @@ struct Core { /// borrow checker happy. park: Option, - /// Batching metrics so they can be submitted to RuntimeMetrics. - metrics: MetricsBatch, + /// Per-worker runtime stats + stats: Stats, + + /// How often to check the global queue + global_queue_interval: u32, /// Fast random number generator. rand: FastRand, @@ -217,6 +222,7 @@ pub(super) fn create( let park = park.clone(); let unpark = park.unpark(); let metrics = WorkerMetrics::from_config(&config); + let stats = Stats::new(&metrics); cores.push(Box::new(Core { tick: 0, @@ -226,7 +232,8 @@ pub(super) fn create( is_searching: false, is_shutdown: false, park: Some(park), - metrics: MetricsBatch::new(&metrics), + global_queue_interval: stats.tuned_global_queue_interval(&config), + stats, rand: FastRand::new(config.seed_generator.next_seed()), })); @@ -436,6 +443,10 @@ impl Context { // a task that had the LIFO slot disabled. self.reset_lifo_enabled(&mut core); + // Start as "processing" tasks as polling tasks from the local queue + // will be one of the first things we do. + core.stats.start_processing_scheduled_tasks(); + while !core.is_shutdown { self.assert_lifo_enabled_is_correct(&core); @@ -451,9 +462,14 @@ impl Context { continue; } + // We consumed all work in the queues and will start searching for work. + core.stats.end_processing_scheduled_tasks(); + // There is no more **local** work to process, try to steal work // from other workers. if let Some(task) = core.steal_work(&self.worker) { + // Found work, switch back to processing + core.stats.start_processing_scheduled_tasks(); core = self.run_task(task, core)?; } else { // Wait for work @@ -481,8 +497,13 @@ impl Context { self.assert_lifo_enabled_is_correct(&core); + // Measure the poll start time. Note that we may end up polling other + // tasks under this measurement. In this case, the tasks came from the + // LIFO slot and are considered part of the current task for scheduling + // purposes. These tasks inherent the "parent"'s limits. + core.stats.start_poll(); + // Make the core available to the runtime context - core.metrics.start_poll(); *self.core.borrow_mut() = Some(core); // Run the task @@ -505,29 +526,25 @@ impl Context { } }; - // If task poll times is enabled, measure the poll time. Note - // that, if the `core` is stolen, this means `block_in_place` - // was called, turning the poll into a "blocking op". In this - // case, we don't want to measure the poll time as it doesn't - // really count as an async poll anymore. - core.metrics.end_poll(); - // Check for a task in the LIFO slot let task = match core.lifo_slot.take() { Some(task) => task, None => { self.reset_lifo_enabled(&mut core); + core.stats.end_poll(); return Ok(core); } }; if !coop::has_budget_remaining() { + core.stats.end_poll(); + // Not enough budget left to run the LIFO task, push it to // the back of the queue and return. core.run_queue.push_back_or_overflow( task, self.worker.inject(), - &mut core.metrics, + &mut core.stats, ); // If we hit this point, the LIFO slot should be enabled. // There is no need to reset it. @@ -552,7 +569,6 @@ impl Context { } // Run the LIFO task, then loop - core.metrics.start_poll(); *self.core.borrow_mut() = Some(core); let task = self.worker.handle.shared.owned.assert_owner(task); task.run(); @@ -575,12 +591,16 @@ impl Context { if core.tick % self.worker.handle.shared.config.event_interval == 0 { super::counters::inc_num_maintenance(); + core.stats.end_processing_scheduled_tasks(); + // Call `park` with a 0 timeout. This enables the I/O driver, timer, ... // to run without actually putting the thread to sleep. core = self.park_timeout(core, Some(Duration::from_millis(0))); // Run regularly scheduled maintenance core.maintenance(&self.worker); + + core.stats.start_processing_scheduled_tasks(); } core @@ -604,9 +624,8 @@ impl Context { if core.transition_to_parked(&self.worker) { while !core.is_shutdown { - core.metrics.about_to_park(); + core.stats.about_to_park(); core = self.park_timeout(core, None); - core.metrics.returned_from_park(); // Run regularly scheduled maintenance core.maintenance(&self.worker); @@ -665,7 +684,10 @@ impl Core { /// Return the next notified task available to this worker. fn next_task(&mut self, worker: &Worker) -> Option { - if self.tick % worker.handle.shared.config.global_queue_interval == 0 { + if self.tick % self.global_queue_interval == 0 { + // Update the global queue interval, if needed + self.tune_global_queue_interval(worker); + worker.inject().pop().or_else(|| self.next_local_task()) } else { let maybe_task = self.next_local_task(); @@ -732,7 +754,7 @@ impl Core { let target = &worker.handle.shared.remotes[i]; if let Some(task) = target .steal - .steal_into(&mut self.run_queue, &mut self.metrics) + .steal_into(&mut self.run_queue, &mut self.stats) { return Some(task); } @@ -812,7 +834,7 @@ impl Core { /// Runs maintenance work such as checking the pool's state. fn maintenance(&mut self, worker: &Worker) { - self.metrics + self.stats .submit(&worker.handle.shared.worker_metrics[worker.index]); if !self.is_shutdown { @@ -827,7 +849,7 @@ impl Core { // Signal to all tasks to shut down. worker.handle.shared.owned.close_and_shutdown_all(); - self.metrics + self.stats .submit(&worker.handle.shared.worker_metrics[worker.index]); } @@ -841,6 +863,19 @@ impl Core { park.shutdown(&handle.driver); } + + fn tune_global_queue_interval(&mut self, worker: &Worker) { + let next = self + .stats + .tuned_global_queue_interval(&worker.handle.shared.config); + + debug_assert!(next > 1); + + // Smooth out jitter + if abs_diff(self.global_queue_interval, next) > 2 { + self.global_queue_interval = next; + } + } } impl Worker { @@ -887,7 +922,7 @@ impl Handle { } fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) { - core.metrics.inc_local_schedule_count(); + core.stats.inc_local_schedule_count(); // Spawning from the worker thread. If scheduling a "yield" then the // task must always be pushed to the back of the queue, enabling other @@ -895,7 +930,7 @@ impl Handle { // flexibility and the task may go to the front of the queue. let should_notify = if is_yield || !core.lifo_enabled { core.run_queue - .push_back_or_overflow(task, &self.shared.inject, &mut core.metrics); + .push_back_or_overflow(task, &self.shared.inject, &mut core.stats); true } else { // Push to the LIFO slot @@ -904,7 +939,7 @@ impl Handle { if let Some(prev) = prev { core.run_queue - .push_back_or_overflow(prev, &self.shared.inject, &mut core.metrics); + .push_back_or_overflow(prev, &self.shared.inject, &mut core.stats); } core.lifo_slot = Some(task); @@ -1028,3 +1063,12 @@ cfg_metrics! { } } } + +// `u32::abs_diff` is not available on Tokio's MSRV. +fn abs_diff(a: u32, b: u32) -> u32 { + if a > b { + a - b + } else { + b - a + } +} diff --git a/tokio/src/runtime/tests/loom_pool.rs b/tokio/src/runtime/tests/loom_pool.rs index e2c826f0795..fb42e1eb40b 100644 --- a/tokio/src/runtime/tests/loom_pool.rs +++ b/tokio/src/runtime/tests/loom_pool.rs @@ -347,6 +347,7 @@ mod group_d { fn mk_pool(num_threads: usize) -> Runtime { runtime::Builder::new_multi_thread() .worker_threads(num_threads) + // Set the intervals to avoid tuning logic .event_interval(2) .build() .unwrap() diff --git a/tokio/src/runtime/tests/loom_queue.rs b/tokio/src/runtime/tests/loom_queue.rs index 58c17ad65c2..eb85807a6b0 100644 --- a/tokio/src/runtime/tests/loom_queue.rs +++ b/tokio/src/runtime/tests/loom_queue.rs @@ -1,12 +1,11 @@ -use crate::runtime::scheduler::multi_thread::queue; +use crate::runtime::scheduler::multi_thread::{queue, Stats}; use crate::runtime::task::Inject; use crate::runtime::tests::NoopSchedule; -use crate::runtime::MetricsBatch; use loom::thread; -fn metrics_batch() -> MetricsBatch { - MetricsBatch::new(&crate::runtime::WorkerMetrics::new()) +fn new_stats() -> Stats { + Stats::new(&crate::runtime::WorkerMetrics::new()) } #[test] @@ -14,15 +13,15 @@ fn basic() { loom::model(|| { let (steal, mut local) = queue::local(); let inject = Inject::new(); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let th = thread::spawn(move || { - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let (_, mut local) = queue::local(); let mut n = 0; for _ in 0..3 { - if steal.steal_into(&mut local, &mut metrics).is_some() { + if steal.steal_into(&mut local, &mut stats).is_some() { n += 1; } @@ -39,7 +38,7 @@ fn basic() { for _ in 0..2 { for _ in 0..2 { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); } if local.pop().is_some() { @@ -48,7 +47,7 @@ fn basic() { // Push another task let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); while local.pop().is_some() { n += 1; @@ -70,14 +69,14 @@ fn steal_overflow() { loom::model(|| { let (steal, mut local) = queue::local(); let inject = Inject::new(); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let th = thread::spawn(move || { - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let (_, mut local) = queue::local(); let mut n = 0; - if steal.steal_into(&mut local, &mut metrics).is_some() { + if steal.steal_into(&mut local, &mut stats).is_some() { n += 1; } @@ -92,7 +91,7 @@ fn steal_overflow() { // push a task, pop a task let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); if local.pop().is_some() { n += 1; @@ -100,7 +99,7 @@ fn steal_overflow() { for _ in 0..6 { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); } n += th.join().unwrap(); @@ -122,10 +121,10 @@ fn multi_stealer() { const NUM_TASKS: usize = 5; fn steal_tasks(steal: queue::Steal) -> usize { - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let (_, mut local) = queue::local(); - if steal.steal_into(&mut local, &mut metrics).is_none() { + if steal.steal_into(&mut local, &mut stats).is_none() { return 0; } @@ -141,12 +140,12 @@ fn multi_stealer() { loom::model(|| { let (steal, mut local) = queue::local(); let inject = Inject::new(); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); // Push work for _ in 0..NUM_TASKS { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); } let th1 = { @@ -176,7 +175,7 @@ fn multi_stealer() { #[test] fn chained_steal() { loom::model(|| { - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let (s1, mut l1) = queue::local(); let (s2, mut l2) = queue::local(); let inject = Inject::new(); @@ -184,17 +183,17 @@ fn chained_steal() { // Load up some tasks for _ in 0..4 { let (task, _) = super::unowned(async {}); - l1.push_back_or_overflow(task, &inject, &mut metrics); + l1.push_back_or_overflow(task, &inject, &mut stats); let (task, _) = super::unowned(async {}); - l2.push_back_or_overflow(task, &inject, &mut metrics); + l2.push_back_or_overflow(task, &inject, &mut stats); } // Spawn a task to steal from **our** queue let th = thread::spawn(move || { - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let (_, mut local) = queue::local(); - s1.steal_into(&mut local, &mut metrics); + s1.steal_into(&mut local, &mut stats); while local.pop().is_some() {} }); @@ -202,7 +201,7 @@ fn chained_steal() { // Drain our tasks, then attempt to steal while l1.pop().is_some() {} - s2.steal_into(&mut l1, &mut metrics); + s2.steal_into(&mut l1, &mut stats); th.join().unwrap(); diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index 09f249e9ea4..2e42f1bbb9e 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -1,18 +1,17 @@ -use crate::runtime::scheduler::multi_thread::queue; +use crate::runtime::scheduler::multi_thread::{queue, Stats}; use crate::runtime::task::{self, Inject, Schedule, Task}; -use crate::runtime::MetricsBatch; use std::thread; use std::time::Duration; #[allow(unused)] macro_rules! assert_metrics { - ($metrics:ident, $field:ident == $v:expr) => {{ + ($stats:ident, $field:ident == $v:expr) => {{ use crate::runtime::WorkerMetrics; use std::sync::atomic::Ordering::Relaxed; let worker = WorkerMetrics::new(); - $metrics.submit(&worker); + $stats.submit(&worker); let expect = $v; let actual = worker.$field.load(Relaxed); @@ -21,24 +20,24 @@ macro_rules! assert_metrics { }}; } -fn metrics_batch() -> MetricsBatch { +fn new_stats() -> Stats { use crate::runtime::WorkerMetrics; - MetricsBatch::new(&WorkerMetrics::new()) + Stats::new(&WorkerMetrics::new()) } #[test] fn fits_256_one_at_a_time() { let (_, mut local) = queue::local(); let inject = Inject::new(); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); for _ in 0..256 { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); } cfg_metrics! { - assert_metrics!(metrics, overflow_count == 0); + assert_metrics!(stats, overflow_count == 0); } assert!(inject.pop().is_none()); @@ -88,15 +87,15 @@ fn fits_256_all_in_chunks() { fn overflow() { let (_, mut local) = queue::local(); let inject = Inject::new(); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); for _ in 0..257 { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); } cfg_metrics! { - assert_metrics!(metrics, overflow_count == 1); + assert_metrics!(stats, overflow_count == 1); } let mut n = 0; @@ -114,7 +113,7 @@ fn overflow() { #[test] fn steal_batch() { - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let (steal1, mut local1) = queue::local(); let (_, mut local2) = queue::local(); @@ -122,13 +121,13 @@ fn steal_batch() { for _ in 0..4 { let (task, _) = super::unowned(async {}); - local1.push_back_or_overflow(task, &inject, &mut metrics); + local1.push_back_or_overflow(task, &inject, &mut stats); } - assert!(steal1.steal_into(&mut local2, &mut metrics).is_some()); + assert!(steal1.steal_into(&mut local2, &mut stats).is_some()); cfg_metrics! { - assert_metrics!(metrics, steal_count == 2); + assert_metrics!(stats, steal_count == 2); } for _ in 0..1 { @@ -160,19 +159,19 @@ fn stress1() { const NUM_PUSH: usize = normal_or_miri(500, 10); const NUM_POP: usize = normal_or_miri(250, 10); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); for _ in 0..NUM_ITER { let (steal, mut local) = queue::local(); let inject = Inject::new(); let th = thread::spawn(move || { - let mut metrics = metrics_batch(); + let mut stats = new_stats(); let (_, mut local) = queue::local(); let mut n = 0; for _ in 0..NUM_STEAL { - if steal.steal_into(&mut local, &mut metrics).is_some() { + if steal.steal_into(&mut local, &mut stats).is_some() { n += 1; } @@ -184,7 +183,7 @@ fn stress1() { } cfg_metrics! { - assert_metrics!(metrics, steal_count == n as _); + assert_metrics!(stats, steal_count == n as _); } n @@ -195,7 +194,7 @@ fn stress1() { for _ in 0..NUM_LOCAL { for _ in 0..NUM_PUSH { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); } for _ in 0..NUM_POP { @@ -223,14 +222,14 @@ fn stress2() { const NUM_TASKS: usize = normal_or_miri(1_000_000, 50); const NUM_STEAL: usize = normal_or_miri(1_000, 10); - let mut metrics = metrics_batch(); + let mut stats = new_stats(); for _ in 0..NUM_ITER { let (steal, mut local) = queue::local(); let inject = Inject::new(); let th = thread::spawn(move || { - let mut stats = metrics_batch(); + let mut stats = new_stats(); let (_, mut local) = queue::local(); let mut n = 0; @@ -253,7 +252,7 @@ fn stress2() { for i in 0..NUM_TASKS { let (task, _) = super::unowned(async {}); - local.push_back_or_overflow(task, &inject, &mut metrics); + local.push_back_or_overflow(task, &inject, &mut stats); if i % 128 == 0 && local.pop().is_some() { num_pop += 1; diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index feb2f9f8cca..85ecdf4271d 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -35,6 +35,7 @@ fn single_thread() { } #[test] +#[cfg(not(tokio_cross))] fn many_oneshot_futures() { // used for notifying the main thread const NUM: usize = 1_000; @@ -588,6 +589,133 @@ async fn test_block_in_place4() { tokio::task::block_in_place(|| {}); } +// Testing the tuning logic is tricky as it is inherently timing based, and more +// of a heuristic than an exact behavior. This test checks that the interval +// changes over time based on load factors. There are no assertions, completion +// is sufficient. If there is a regression, this test will hang. In theory, we +// could add limits, but that would be likely to fail on CI. +#[test] +#[cfg(not(tokio_cross))] +fn test_tuning() { + use std::sync::atomic::AtomicBool; + use std::time::Duration; + + let rt = runtime::Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(); + + fn iter(flag: Arc, counter: Arc, stall: bool) { + if flag.load(Relaxed) { + if stall { + std::thread::sleep(Duration::from_micros(5)); + } + + counter.fetch_add(1, Relaxed); + tokio::spawn(async move { iter(flag, counter, stall) }); + } + } + + let flag = Arc::new(AtomicBool::new(true)); + let counter = Arc::new(AtomicUsize::new(61)); + let interval = Arc::new(AtomicUsize::new(61)); + + { + let flag = flag.clone(); + let counter = counter.clone(); + rt.spawn(async move { iter(flag, counter, true) }); + } + + // Now, hammer the injection queue until the interval drops. + let mut i = 0; + let mut n = 0; + loop { + let curr = interval.load(Relaxed); + + if curr <= 8 { + n += 1; + } else { + n = 0; + } + + // Make sure we get a few good rounds. Jitter in the tuning could result + // in one "good" value without being representative of reaching a good + // state. + if n == 3 { + break; + } + + let counter = counter.clone(); + let interval = interval.clone(); + + if i <= 5_000 { + i += 1; + rt.spawn(async move { + let prev = counter.swap(0, Relaxed); + interval.store(prev, Relaxed); + }); + std::thread::yield_now(); + } else { + std::thread::sleep(Duration::from_micros(500)); + } + } + + flag.store(false, Relaxed); + + let w = Arc::downgrade(&interval); + drop(interval); + + while w.strong_count() > 0 { + std::thread::sleep(Duration::from_micros(500)); + } + + // Now, run it again with a faster task + let flag = Arc::new(AtomicBool::new(true)); + // Set it high, we know it shouldn't ever really be this high + let counter = Arc::new(AtomicUsize::new(10_000)); + let interval = Arc::new(AtomicUsize::new(10_000)); + + { + let flag = flag.clone(); + let counter = counter.clone(); + rt.spawn(async move { iter(flag, counter, false) }); + } + + // Now, hammer the injection queue until the interval reaches the expected range. + let mut i = 0; + let mut n = 0; + loop { + let curr = interval.load(Relaxed); + + if curr <= 1_000 && curr > 32 { + n += 1; + } else { + n = 0; + } + + if n == 3 { + break; + } + + let counter = counter.clone(); + let interval = interval.clone(); + + if i <= 5_000 { + i += 1; + rt.spawn(async move { + let prev = counter.swap(0, Relaxed); + interval.store(prev, Relaxed); + }); + + std::thread::yield_now(); + } else { + std::thread::sleep(Duration::from_micros(500)); + } + } + + flag.store(false, Relaxed); +} + fn rt() -> runtime::Runtime { runtime::Runtime::new().unwrap() }