From 6967ab5b4b7efd095e70d51c9f32d10d4da45259 Mon Sep 17 00:00:00 2001 From: Noah Kennedy Date: Tue, 28 Feb 2023 16:41:10 -0600 Subject: [PATCH 1/4] metrics: add a new metric for budget exhaustion yields This change adds a new runtime metric which counts the number of times that tasks exhaust their budgets and are forced to yield back to the runtime. This metric is useful for debugging performance issues in many types of applications. --- tokio/src/runtime/coop.rs | 4 ++++ tokio/src/runtime/metrics/runtime.rs | 15 +++++++++++++ tokio/src/runtime/metrics/scheduler.rs | 7 ++++++ tokio/tests/rt_metrics.rs | 31 ++++++++++++++++++++++++++ 4 files changed, 57 insertions(+) diff --git a/tokio/src/runtime/coop.rs b/tokio/src/runtime/coop.rs index 0ba137ab67a..19575f084b5 100644 --- a/tokio/src/runtime/coop.rs +++ b/tokio/src/runtime/coop.rs @@ -177,6 +177,10 @@ cfg_coop! { cell.set(budget); Poll::Ready(restore) } else { + if let Ok(handle) = context::try_current() { + handle.scheduler_metrics().inc_budget_forced_yield_count(); + } + cx.waker().wake_by_ref(); Poll::Pending } diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index d29cb3d48ff..fe576bb0c6d 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -124,6 +124,21 @@ impl RuntimeMetrics { .load(Relaxed) } + /// Returns the number of times that tasks have been forced to yield back to the scheduler + /// after exhausting their task budgets. + /// + /// This count starts at zero when the runtime is created and increases by one each time a task yields due to exhausting its budget. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + pub fn budget_forced_yield_count(&self) -> u64 { + self.handle + .inner + .scheduler_metrics() + .budget_forced_yield_count + .load(Relaxed) + } + /// Returns the total number of times the given worker thread has parked. /// /// The worker park count starts at zero when the runtime is created and diff --git a/tokio/src/runtime/metrics/scheduler.rs b/tokio/src/runtime/metrics/scheduler.rs index d1ba3b64420..d9f8edfaabc 100644 --- a/tokio/src/runtime/metrics/scheduler.rs +++ b/tokio/src/runtime/metrics/scheduler.rs @@ -11,12 +11,14 @@ use crate::loom::sync::atomic::{AtomicU64, Ordering::Relaxed}; pub(crate) struct SchedulerMetrics { /// Number of tasks that are scheduled from outside the runtime. pub(super) remote_schedule_count: AtomicU64, + pub(super) budget_forced_yield_count: AtomicU64, } impl SchedulerMetrics { pub(crate) fn new() -> SchedulerMetrics { SchedulerMetrics { remote_schedule_count: AtomicU64::new(0), + budget_forced_yield_count: AtomicU64::new(0), } } @@ -24,4 +26,9 @@ impl SchedulerMetrics { pub(crate) fn inc_remote_schedule_count(&self) { self.remote_schedule_count.fetch_add(1, Relaxed); } + + /// Increment the number of tasks forced to yield due to budget exhaustion + pub(crate) fn inc_budget_forced_yield_count(&self) { + self.budget_forced_yield_count.fetch_add(1, Relaxed); + } } diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index fdb2fb5f551..f41568a7f78 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -1,9 +1,13 @@ #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", tokio_unstable, not(tokio_wasi)))] +use std::future::Future; use std::sync::{Arc, Mutex}; +use std::task::Poll; +use tokio::macros::support::poll_fn; use tokio::runtime::Runtime; +use tokio::task::consume_budget; use tokio::time::{self, Duration}; #[test] @@ -433,6 +437,33 @@ fn worker_local_queue_depth() { }); } +#[test] +fn budget_exhaustion_yield() { + let rt = current_thread(); + let metrics = rt.metrics(); + + assert_eq!(0, metrics.budget_forced_yield_count()); + + let mut did_yield = false; + + // block on a task which consumes budget until it yields + rt.block_on(poll_fn(|cx| loop { + if did_yield { + return Poll::Ready(()); + } + + let fut = consume_budget(); + tokio::pin!(fut); + + if fut.poll(cx).is_pending() { + did_yield = true; + return Poll::Pending; + } + })); + + assert_eq!(1, rt.metrics().budget_forced_yield_count()); +} + #[cfg(any(target_os = "linux", target_os = "macos"))] #[test] fn io_driver_fd_count() { From 0d5c4c4158103930bb0d4b939504fb2b47d5e1ab Mon Sep 17 00:00:00 2001 From: Noah Kennedy Date: Tue, 28 Feb 2023 17:39:56 -0600 Subject: [PATCH 2/4] fix comp issues --- tokio/src/runtime/coop.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/tokio/src/runtime/coop.rs b/tokio/src/runtime/coop.rs index 19575f084b5..e3c73f6f3f0 100644 --- a/tokio/src/runtime/coop.rs +++ b/tokio/src/runtime/coop.rs @@ -177,9 +177,7 @@ cfg_coop! { cell.set(budget); Poll::Ready(restore) } else { - if let Ok(handle) = context::try_current() { - handle.scheduler_metrics().inc_budget_forced_yield_count(); - } + inc_budget_forced_yield_count(); cx.waker().wake_by_ref(); Poll::Pending @@ -187,6 +185,20 @@ cfg_coop! { }).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained())))) } + cfg_metrics! { + #[inline(always)] + fn inc_budget_forced_yield_count() { + if let Ok(handle) = context::try_current() { + handle.scheduler_metrics().inc_budget_forced_yield_count(); + } + } + } + + cfg_not_metrics! { + #[inline(always)] + fn inc_budget_forced_yield_count() {} + } + impl Budget { /// Decrements the budget. Returns `true` if successful. Decrementing fails /// when there is not enough remaining budget. From 57ef8ae236cea34ed57c1f3355f979a0dc0b86e5 Mon Sep 17 00:00:00 2001 From: Noah Kennedy Date: Tue, 28 Feb 2023 17:46:11 -0600 Subject: [PATCH 3/4] fix comp issues --- tokio/src/runtime/coop.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/tokio/src/runtime/coop.rs b/tokio/src/runtime/coop.rs index e3c73f6f3f0..d2c8a7c77d7 100644 --- a/tokio/src/runtime/coop.rs +++ b/tokio/src/runtime/coop.rs @@ -185,16 +185,23 @@ cfg_coop! { }).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained())))) } - cfg_metrics! { - #[inline(always)] - fn inc_budget_forced_yield_count() { - if let Ok(handle) = context::try_current() { - handle.scheduler_metrics().inc_budget_forced_yield_count(); + cfg_rt! { + cfg_metrics! { + #[inline(always)] + fn inc_budget_forced_yield_count() { + if let Ok(handle) = context::try_current() { + handle.scheduler_metrics().inc_budget_forced_yield_count(); + } } } + + cfg_not_metrics! { + #[inline(always)] + fn inc_budget_forced_yield_count() {} + } } - cfg_not_metrics! { + cfg_not_rt! { #[inline(always)] fn inc_budget_forced_yield_count() {} } From 94ef05b9bb42b33f18c9bfbe323a59489fd092f6 Mon Sep 17 00:00:00 2001 From: Noah Kennedy Date: Wed, 1 Mar 2023 13:52:53 -0600 Subject: [PATCH 4/4] move the increment to when budget hits zero --- tokio/src/runtime/coop.rs | 28 ++++++++++++++++++------ tokio/tests/rt_metrics.rs | 45 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 7 deletions(-) diff --git a/tokio/src/runtime/coop.rs b/tokio/src/runtime/coop.rs index d2c8a7c77d7..03839d208e2 100644 --- a/tokio/src/runtime/coop.rs +++ b/tokio/src/runtime/coop.rs @@ -36,6 +36,11 @@ use crate::runtime::context; #[derive(Debug, Copy, Clone)] pub(crate) struct Budget(Option); +pub(crate) struct BudgetDecrement { + success: bool, + hit_zero: bool, +} + impl Budget { /// Budget assigned to a task on each poll. /// @@ -172,13 +177,19 @@ cfg_coop! { context::budget(|cell| { let mut budget = cell.get(); - if budget.decrement() { + let decrement = budget.decrement(); + + if decrement.success { let restore = RestoreOnPending(Cell::new(cell.get())); cell.set(budget); + + // avoid double counting + if decrement.hit_zero { + inc_budget_forced_yield_count(); + } + Poll::Ready(restore) } else { - inc_budget_forced_yield_count(); - cx.waker().wake_by_ref(); Poll::Pending } @@ -209,16 +220,19 @@ cfg_coop! { impl Budget { /// Decrements the budget. Returns `true` if successful. Decrementing fails /// when there is not enough remaining budget. - fn decrement(&mut self) -> bool { + fn decrement(&mut self) -> BudgetDecrement { if let Some(num) = &mut self.0 { if *num > 0 { *num -= 1; - true + + let hit_zero = *num == 0; + + BudgetDecrement { success: true, hit_zero } } else { - false + BudgetDecrement { success: false, hit_zero: false } } } else { - true + BudgetDecrement { success: true, hit_zero: false } } } diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index f41568a7f78..d238808c8ed 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -464,6 +464,51 @@ fn budget_exhaustion_yield() { assert_eq!(1, rt.metrics().budget_forced_yield_count()); } +#[test] +fn budget_exhaustion_yield_with_joins() { + let rt = current_thread(); + let metrics = rt.metrics(); + + assert_eq!(0, metrics.budget_forced_yield_count()); + + let mut did_yield_1 = false; + let mut did_yield_2 = false; + + // block on a task which consumes budget until it yields + rt.block_on(async { + tokio::join!( + poll_fn(|cx| loop { + if did_yield_1 { + return Poll::Ready(()); + } + + let fut = consume_budget(); + tokio::pin!(fut); + + if fut.poll(cx).is_pending() { + did_yield_1 = true; + return Poll::Pending; + } + }), + poll_fn(|cx| loop { + if did_yield_2 { + return Poll::Ready(()); + } + + let fut = consume_budget(); + tokio::pin!(fut); + + if fut.poll(cx).is_pending() { + did_yield_2 = true; + return Poll::Pending; + } + }) + ) + }); + + assert_eq!(1, rt.metrics().budget_forced_yield_count()); +} + #[cfg(any(target_os = "linux", target_os = "macos"))] #[test] fn io_driver_fd_count() {