Skip to content

Commit

Permalink
rt: unstable EWMA poll time metric (#5927)
Browse files Browse the repository at this point in the history
Because the runtime uses this value as a tuning heuristic, it can be
useful to get its value. This patch exposes the value as an unstable
metric.
  • Loading branch information
carllerche authored Aug 10, 2023
1 parent dd23f08 commit 6cb106c
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 8 deletions.
3 changes: 2 additions & 1 deletion tokio/src/runtime/metrics/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ impl MetricsBatch {
}
}

pub(crate) fn submit(&mut self, worker: &WorkerMetrics) {
pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
worker.mean_poll_time.store(mean_poll_time, Relaxed);
worker.park_count.store(self.park_count, Relaxed);
worker.noop_count.store(self.noop_count, Relaxed);
worker.steal_count.store(self.steal_count, Relaxed);
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/metrics/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl MetricsBatch {
Self {}
}

pub(crate) fn submit(&mut self, _to: &WorkerMetrics) {}
pub(crate) fn submit(&mut self, _to: &WorkerMetrics, _mean_poll_time: u64) {}
pub(crate) fn about_to_park(&mut self) {}
pub(crate) fn inc_local_schedule_count(&mut self) {}
pub(crate) fn start_processing_scheduled_tasks(&mut self) {}
Expand Down
41 changes: 41 additions & 0 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,47 @@ impl RuntimeMetrics {
.unwrap_or_default()
}

/// Returns the mean duration of task polls, in nanoseconds.
///
/// This is an exponentially weighted moving average. Currently, this metric
/// is only provided by the multi-threaded runtime.
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.worker_mean_poll_time(0);
/// println!("worker 0 has a mean poll time of {:?}", n);
/// }
/// ```
#[track_caller]
pub fn worker_mean_poll_time(&self, worker: usize) -> Duration {
let nanos = self
.handle
.inner
.worker_metrics(worker)
.mean_poll_time
.load(Relaxed);
Duration::from_nanos(nanos)
}

/// Returns the number of tasks currently scheduled in the blocking
/// thread pool, spawned using `spawn_blocking`.
///
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/metrics/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub(crate) struct WorkerMetrics {
/// Number of tasks the worker polled.
pub(crate) poll_count: AtomicU64,

/// EWMA task poll time, in nanoseconds.
pub(crate) mean_poll_time: AtomicU64,

/// Amount of time the worker spent doing work vs. parking.
pub(crate) busy_duration_total: AtomicU64,

Expand Down Expand Up @@ -62,6 +65,7 @@ impl WorkerMetrics {
steal_count: AtomicU64::new(0),
steal_operations: AtomicU64::new(0),
poll_count: AtomicU64::new(0),
mean_poll_time: AtomicU64::new(0),
overflow_count: AtomicU64::new(0),
busy_duration_total: AtomicU64::new(0),
local_schedule_count: AtomicU64::new(0),
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ impl Core {
}

fn submit_metrics(&mut self, handle: &Handle) {
self.metrics.submit(&handle.shared.worker_metrics);
self.metrics.submit(&handle.shared.worker_metrics, 0);
}
}

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/multi_thread/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl Stats {
}

pub(crate) fn submit(&mut self, to: &WorkerMetrics) {
self.batch.submit(to);
self.batch.submit(to, self.task_poll_time_ewma as u64);
}

pub(crate) fn about_to_park(&mut self) {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/multi_thread_alt/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl Stats {
}

pub(crate) fn submit(&mut self, to: &WorkerMetrics) {
self.batch.submit(to);
self.batch.submit(to, self.task_poll_time_ewma as u64);
}

pub(crate) fn about_to_park(&mut self) {
Expand Down
19 changes: 16 additions & 3 deletions tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,18 +215,25 @@ fn worker_steal_count() {
}

#[test]
fn worker_poll_count() {
fn worker_poll_count_and_time() {
const N: u64 = 5;

async fn task() {
// Sync sleep
std::thread::sleep(std::time::Duration::from_micros(10));
}

let rt = current_thread();
let metrics = rt.metrics();
rt.block_on(async {
for _ in 0..N {
tokio::spawn(async {}).await.unwrap();
tokio::spawn(task()).await.unwrap();
}
});
drop(rt);
assert_eq!(N, metrics.worker_poll_count(0));
// Not currently supported for current-thread runtime
assert_eq!(Duration::default(), metrics.worker_mean_poll_time(0));

// Does not populate the histogram
assert!(!metrics.poll_count_histogram_enabled());
Expand All @@ -238,7 +245,7 @@ fn worker_poll_count() {
let metrics = rt.metrics();
rt.block_on(async {
for _ in 0..N {
tokio::spawn(async {}).await.unwrap();
tokio::spawn(task()).await.unwrap();
}
});
drop(rt);
Expand All @@ -249,6 +256,12 @@ fn worker_poll_count() {

assert_eq!(N, n);

let n: Duration = (0..metrics.num_workers())
.map(|i| metrics.worker_mean_poll_time(i))
.sum();

assert!(n > Duration::default());

// Does not populate the histogram
assert!(!metrics.poll_count_histogram_enabled());
for n in 0..metrics.num_workers() {
Expand Down

0 comments on commit 6cb106c

Please sign in to comment.