Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: Add a metric that counts currently active tasks #5628

19 changes: 19 additions & 0 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,25 @@ impl RuntimeMetrics {
self.handle.inner.num_blocking_threads()
}

/// Returns the number of active tasks in the runtime.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.active_tasks_count();
/// println!("Runtime has {} active tasks", n);
/// }
/// ```
pub fn active_tasks_count(&self) -> usize {
self.handle.inner.active_tasks_count()
}

/// Returns the number of idle threads, which have spawned by the runtime
/// for `spawn_blocking` calls.
///
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,10 @@ cfg_metrics! {
pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ cfg_rt! {
}
}

pub(crate) fn active_tasks_count(&self) -> usize {
match self {
Handle::CurrentThread(handle) => handle.active_tasks_count(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(handle) => handle.active_tasks_count(),
}
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
match self {
Handle::CurrentThread(handle) => handle.scheduler_metrics(),
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ cfg_metrics! {
self.blocking_spawner.num_idle_threads()
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/task/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ impl<S: 'static> OwnedTasks<S> {
}
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.inner.lock().list.count()
}

pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
let task_id = task.header().get_owner_id();
if task_id == 0 {
Expand Down
35 changes: 35 additions & 0 deletions tokio/src/util/linked_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use core::fmt;
use core::marker::{PhantomData, PhantomPinned};
use core::mem::ManuallyDrop;
use core::ptr::{self, NonNull};
use std::sync::atomic::{AtomicUsize, Ordering};

/// An intrusive linked list.
///
Expand All @@ -25,6 +26,9 @@ pub(crate) struct LinkedList<L, T> {

/// Node type marker.
_marker: PhantomData<*const L>,

/// Tracks the length of the linked list
_count: AtomicUsize,
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
}

unsafe impl<L: Link> Send for LinkedList<L, L::Target> where L::Target: Send {}
Expand Down Expand Up @@ -117,6 +121,7 @@ impl<L, T> LinkedList<L, T> {
head: None,
tail: None,
_marker: PhantomData,
_count: AtomicUsize::new(0),
}
}
}
Expand All @@ -142,6 +147,7 @@ impl<L: Link> LinkedList<L, L::Target> {
self.tail = Some(ptr);
}
}
self._count.fetch_add(1, Ordering::Relaxed);
}

/// Removes the last element from a list and returns it, or None if it is
Expand All @@ -160,6 +166,8 @@ impl<L: Link> LinkedList<L, L::Target> {
L::pointers(last).as_mut().set_prev(None);
L::pointers(last).as_mut().set_next(None);

self._count.fetch_sub(1, Ordering::Relaxed);

Some(L::from_raw(last))
}
}
Expand All @@ -174,6 +182,11 @@ impl<L: Link> LinkedList<L, L::Target> {
true
}

// Returns the number of elements contained in the LinkedList
pub(crate) fn count(&self) -> usize {
self._count.load(Ordering::Relaxed)
}

/// Removes the specified node from the list
///
/// # Safety
Expand Down Expand Up @@ -215,6 +228,8 @@ impl<L: Link> LinkedList<L, L::Target> {
L::pointers(node).as_mut().set_next(None);
L::pointers(node).as_mut().set_prev(None);

self._count.fetch_sub(1, Ordering::Relaxed);

Some(L::from_raw(node))
}
}
Expand Down Expand Up @@ -719,6 +734,26 @@ pub(crate) mod tests {
}
}

#[test]
fn count() {
let mut list = LinkedList::<&Entry, <&Entry as Link>::Target>::new();
assert_eq!(0, list.count());

let a = entry(5);
let b = entry(7);
list.push_front(a.as_ref());
list.push_front(b.as_ref());
assert_eq!(2, list.count());

list.pop_back();
assert_eq!(1, list.count());

unsafe {
list.remove(ptr(&b));
}
assert_eq!(0, list.count());
}

/// This is a fuzz test. You run it by entering `cargo fuzz run fuzz_linked_list` in CLI in `/tokio/` module.
#[cfg(fuzzing)]
pub fn fuzz_linked_list(ops: &[u8]) {
Expand Down
17 changes: 17 additions & 0 deletions tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,23 @@ fn blocking_queue_depth() {
assert_eq!(0, rt.metrics().blocking_queue_depth());
}

#[test]
fn active_tasks_count() {
let rt = current_thread();
let metrics = rt.metrics();
assert_eq!(0, metrics.active_tasks_count());
rt.spawn(async move {
assert_eq!(1, metrics.active_tasks_count());
});

let rt = threaded();
let metrics = rt.metrics();
assert_eq!(0, metrics.active_tasks_count());
rt.spawn(async move {
assert_eq!(1, metrics.active_tasks_count());
});
}

#[test]
fn remote_schedule_count() {
use std::thread;
Expand Down