From c7d35128b66cb76709e0139d9ff7deabe8b5e70b Mon Sep 17 00:00:00 2001 From: Matilda Smeds Date: Sun, 16 Apr 2023 14:46:10 +0200 Subject: [PATCH 1/8] runtime: Add a metric that counts currently active tasks Adds a new metric for the number of active tasks for the Runtime. We count the elements on the OwnedTasks LinkedList, and access that through RuntimeMetrics. Closes: #5400 --- tokio/src/runtime/metrics/runtime.rs | 19 +++++++++++++++++++ tokio/src/runtime/scheduler/current_thread.rs | 4 ++++ tokio/src/runtime/scheduler/mod.rs | 8 ++++++++ .../runtime/scheduler/multi_thread/handle.rs | 4 ++++ tokio/src/runtime/task/list.rs | 4 ++++ tokio/src/util/linked_list.rs | 15 +++++++++++++++ tokio/tests/rt_metrics.rs | 17 +++++++++++++++++ 7 files changed, 71 insertions(+) diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 8e52fead1ef..014995c7efa 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -68,6 +68,25 @@ impl RuntimeMetrics { self.handle.inner.num_blocking_threads() } + /// Returns the number of active tasks in the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.active_tasks_count(); + /// println!("Runtime has {} active tasks", n); + /// } + /// ``` + pub fn active_tasks_count(&self) -> usize { + self.handle.inner.active_tasks_count() + } + /// Returns the number of idle threads, which have spawned by the runtime /// for `spawn_blocking` calls. /// diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index 375e47c412b..64e1265c9eb 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -430,6 +430,10 @@ cfg_metrics! { pub(crate) fn blocking_queue_depth(&self) -> usize { self.blocking_spawner.queue_depth() } + + pub(crate) fn active_tasks_count(&self) -> usize { + self.shared.owned.active_tasks_count() + } } } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index f45d8a80ba4..0ea207e1e93 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -135,6 +135,14 @@ cfg_rt! { } } + pub(crate) fn active_tasks_count(&self) -> usize { + match self { + Handle::CurrentThread(handle) => handle.active_tasks_count(), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.active_tasks_count(), + } + } + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { match self { Handle::CurrentThread(handle) => handle.scheduler_metrics(), diff --git a/tokio/src/runtime/scheduler/multi_thread/handle.rs b/tokio/src/runtime/scheduler/multi_thread/handle.rs index 69a4620c127..77baaeb06d4 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle.rs @@ -69,6 +69,10 @@ cfg_metrics! { self.blocking_spawner.num_idle_threads() } + pub(crate) fn active_tasks_count(&self) -> usize { + self.shared.owned.active_tasks_count() + } + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { &self.shared.scheduler_metrics } diff --git a/tokio/src/runtime/task/list.rs b/tokio/src/runtime/task/list.rs index 159c13e16e4..d29a35f9302 100644 --- a/tokio/src/runtime/task/list.rs +++ b/tokio/src/runtime/task/list.rs @@ -153,6 +153,10 @@ impl OwnedTasks { } } + pub(crate) fn active_tasks_count(&self) -> usize { + self.inner.lock().list.count() + } + pub(crate) fn remove(&self, task: &Task) -> Option> { let task_id = task.header().get_owner_id(); if task_id == 0 { diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index 6bf26818423..f42ea010510 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -174,6 +174,21 @@ impl LinkedList { true } + // Counts the elements of the LinkedList + pub(crate) fn count(&self) -> usize { + let mut count = 0; + let mut current = self.head; + + while let Some(node) = current { + unsafe { + count += 1; + current = L::pointers(node).as_ref().get_next() + } + } + + count + } + /// Removes the specified node from the list /// /// # Safety diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index d238808c8ed..76aa0587f80 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -81,6 +81,23 @@ fn blocking_queue_depth() { assert_eq!(0, rt.metrics().blocking_queue_depth()); } +#[test] +fn active_tasks_count() { + let rt = current_thread(); + let metrics = rt.metrics(); + assert_eq!(0, metrics.active_tasks_count()); + rt.spawn(async move { + assert_eq!(1, metrics.active_tasks_count()); + }); + + let rt = threaded(); + let metrics = rt.metrics(); + assert_eq!(0, metrics.active_tasks_count()); + rt.spawn(async move { + assert_eq!(1, metrics.active_tasks_count()); + }); +} + #[test] fn remote_schedule_count() { use std::thread; From 32343c277a09e964a509336f4f0e3818b12fff07 Mon Sep 17 00:00:00 2001 From: Matilda Smeds Date: Tue, 18 Apr 2023 19:31:45 +0200 Subject: [PATCH 2/8] Use atomic counter to track LinkedList length --- tokio/src/util/linked_list.rs | 44 +++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index f42ea010510..d817d54a14e 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -11,6 +11,7 @@ use core::fmt; use core::marker::{PhantomData, PhantomPinned}; use core::mem::ManuallyDrop; use core::ptr::{self, NonNull}; +use std::sync::atomic::{AtomicUsize, Ordering}; /// An intrusive linked list. /// @@ -25,6 +26,9 @@ pub(crate) struct LinkedList { /// Node type marker. _marker: PhantomData<*const L>, + + /// Tracks the length of the linked list + _count: AtomicUsize, } unsafe impl Send for LinkedList where L::Target: Send {} @@ -117,6 +121,7 @@ impl LinkedList { head: None, tail: None, _marker: PhantomData, + _count: AtomicUsize::new(0), } } } @@ -142,6 +147,7 @@ impl LinkedList { self.tail = Some(ptr); } } + self._count.fetch_add(1, Ordering::Relaxed); } /// Removes the last element from a list and returns it, or None if it is @@ -160,6 +166,8 @@ impl LinkedList { L::pointers(last).as_mut().set_prev(None); L::pointers(last).as_mut().set_next(None); + self._count.fetch_sub(1, Ordering::Relaxed); + Some(L::from_raw(last)) } } @@ -174,19 +182,9 @@ impl LinkedList { true } - // Counts the elements of the LinkedList + // Returns the number of elements contained in the LinkedList pub(crate) fn count(&self) -> usize { - let mut count = 0; - let mut current = self.head; - - while let Some(node) = current { - unsafe { - count += 1; - current = L::pointers(node).as_ref().get_next() - } - } - - count + self._count.load(Ordering::Relaxed) } /// Removes the specified node from the list @@ -230,6 +228,8 @@ impl LinkedList { L::pointers(node).as_mut().set_next(None); L::pointers(node).as_mut().set_prev(None); + self._count.fetch_sub(1, Ordering::Relaxed); + Some(L::from_raw(node)) } } @@ -734,6 +734,26 @@ pub(crate) mod tests { } } + #[test] + fn count() { + let mut list = LinkedList::<&Entry, <&Entry as Link>::Target>::new(); + assert_eq!(0, list.count()); + + let a = entry(5); + let b = entry(7); + list.push_front(a.as_ref()); + list.push_front(b.as_ref()); + assert_eq!(2, list.count()); + + list.pop_back(); + assert_eq!(1, list.count()); + + unsafe { + list.remove(ptr(&b)); + } + assert_eq!(0, list.count()); + } + /// This is a fuzz test. You run it by entering `cargo fuzz run fuzz_linked_list` in CLI in `/tokio/` module. #[cfg(fuzzing)] pub fn fuzz_linked_list(ops: &[u8]) { From e757921cd3cd7057a87ada8e605336920341368c Mon Sep 17 00:00:00 2001 From: Matilda Smeds Date: Thu, 20 Apr 2023 21:41:11 +0200 Subject: [PATCH 3/8] Add CountedLinkedList type that wraps LinkedList --- tokio/src/runtime/task/list.rs | 14 +++++--- tokio/src/util/linked_list.rs | 63 ++++++++++++++++++++++++++-------- 2 files changed, 57 insertions(+), 20 deletions(-) diff --git a/tokio/src/runtime/task/list.rs b/tokio/src/runtime/task/list.rs index d29a35f9302..a88546578ff 100644 --- a/tokio/src/runtime/task/list.rs +++ b/tokio/src/runtime/task/list.rs @@ -10,7 +10,7 @@ use crate::future::Future; use crate::loom::cell::UnsafeCell; use crate::loom::sync::Mutex; use crate::runtime::task::{JoinHandle, LocalNotified, Notified, Schedule, Task}; -use crate::util::linked_list::{Link, LinkedList}; +use crate::util::linked_list::{CountedLinkedList, Link, LinkedList}; use std::marker::PhantomData; @@ -54,9 +54,13 @@ cfg_not_has_atomic_u64! { } pub(crate) struct OwnedTasks { - inner: Mutex>, + inner: Mutex>, id: u64, } +struct CountedOwnedTasksInner { + list: CountedLinkedList, as Link>::Target>, + closed: bool, +} pub(crate) struct LocalOwnedTasks { inner: UnsafeCell>, id: u64, @@ -70,8 +74,8 @@ struct OwnedTasksInner { impl OwnedTasks { pub(crate) fn new() -> Self { Self { - inner: Mutex::new(OwnedTasksInner { - list: LinkedList::new(), + inner: Mutex::new(CountedOwnedTasksInner { + list: CountedLinkedList::new(), closed: false, }), id: get_next_id(), @@ -300,4 +304,4 @@ mod tests { last_id = next_id; } } -} +} \ No newline at end of file diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index d817d54a14e..b45c5226455 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -26,9 +26,6 @@ pub(crate) struct LinkedList { /// Node type marker. _marker: PhantomData<*const L>, - - /// Tracks the length of the linked list - _count: AtomicUsize, } unsafe impl Send for LinkedList where L::Target: Send {} @@ -121,7 +118,6 @@ impl LinkedList { head: None, tail: None, _marker: PhantomData, - _count: AtomicUsize::new(0), } } } @@ -147,7 +143,6 @@ impl LinkedList { self.tail = Some(ptr); } } - self._count.fetch_add(1, Ordering::Relaxed); } /// Removes the last element from a list and returns it, or None if it is @@ -166,8 +161,6 @@ impl LinkedList { L::pointers(last).as_mut().set_prev(None); L::pointers(last).as_mut().set_next(None); - self._count.fetch_sub(1, Ordering::Relaxed); - Some(L::from_raw(last)) } } @@ -182,11 +175,6 @@ impl LinkedList { true } - // Returns the number of elements contained in the LinkedList - pub(crate) fn count(&self) -> usize { - self._count.load(Ordering::Relaxed) - } - /// Removes the specified node from the list /// /// # Safety @@ -228,8 +216,6 @@ impl LinkedList { L::pointers(node).as_mut().set_next(None); L::pointers(node).as_mut().set_prev(None); - self._count.fetch_sub(1, Ordering::Relaxed); - Some(L::from_raw(node)) } } @@ -243,6 +229,53 @@ impl fmt::Debug for LinkedList { } } +// ===== impl CountedLinkedList ==== + +// Delegates operations to the base LinkedList implementation, and adds a counter to the elements +// in the list. +pub(crate) struct CountedLinkedList { + _list: LinkedList, + _count: AtomicUsize, +} + +impl CountedLinkedList { + pub(crate) const fn new() -> CountedLinkedList { + CountedLinkedList { + _list: LinkedList::new(), + _count: AtomicUsize::new(0), + } + } + + pub(crate) fn push_front(&mut self, val: L::Handle) { + self._list.push_front(val); + self._count.fetch_add(1, Ordering::Relaxed); + } + + pub(crate) fn pop_back(&mut self) -> Option { + let val = self._list.pop_back(); + if val.is_some() { + self._count.fetch_sub(1, Ordering::Relaxed); + } + val + } + + pub(crate) fn is_empty(&self) -> bool { + self._list.is_empty() + } + + pub(crate) unsafe fn remove(&mut self, node: NonNull) -> Option { + let val = self._list.remove(node); + if val.is_some() { + self._count.fetch_sub(1, Ordering::Relaxed); + } + val + } + + pub(crate) fn count(&self) -> usize { + self._count.load(Ordering::Relaxed) + } +} + #[cfg(any( feature = "fs", feature = "rt", @@ -736,7 +769,7 @@ pub(crate) mod tests { #[test] fn count() { - let mut list = LinkedList::<&Entry, <&Entry as Link>::Target>::new(); + let mut list = CountedLinkedList::<&Entry, <&Entry as Link>::Target>::new(); assert_eq!(0, list.count()); let a = entry(5); From 11109619026d4660e08d247f8ef5d20d7ca4278f Mon Sep 17 00:00:00 2001 From: Matilda Smeds Date: Mon, 24 Apr 2023 19:13:37 +0200 Subject: [PATCH 4/8] Update tokio/src/util/linked_list.rs Co-authored-by: Alice Ryhl --- tokio/src/util/linked_list.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index b45c5226455..088d3442826 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -234,8 +234,8 @@ impl fmt::Debug for LinkedList { // Delegates operations to the base LinkedList implementation, and adds a counter to the elements // in the list. pub(crate) struct CountedLinkedList { - _list: LinkedList, - _count: AtomicUsize, + list: LinkedList, + count: AtomicUsize, } impl CountedLinkedList { From 415737176da37d9d2253089fe1a9e157acaf1f37 Mon Sep 17 00:00:00 2001 From: Matilda Smeds Date: Mon, 24 Apr 2023 19:28:53 +0200 Subject: [PATCH 5/8] rustfmt --- tokio/src/runtime/task/list.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/task/list.rs b/tokio/src/runtime/task/list.rs index a88546578ff..c520cb04ed3 100644 --- a/tokio/src/runtime/task/list.rs +++ b/tokio/src/runtime/task/list.rs @@ -304,4 +304,4 @@ mod tests { last_id = next_id; } } -} \ No newline at end of file +} From e7b00b147064bface89dacec87b2608b4b731be3 Mon Sep 17 00:00:00 2001 From: Matilda Smeds Date: Mon, 24 Apr 2023 19:32:38 +0200 Subject: [PATCH 6/8] Remove underscores from variable names --- tokio/src/util/linked_list.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index 088d3442826..035728ea910 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -241,38 +241,38 @@ pub(crate) struct CountedLinkedList { impl CountedLinkedList { pub(crate) const fn new() -> CountedLinkedList { CountedLinkedList { - _list: LinkedList::new(), - _count: AtomicUsize::new(0), + list: LinkedList::new(), + count: AtomicUsize::new(0), } } pub(crate) fn push_front(&mut self, val: L::Handle) { - self._list.push_front(val); - self._count.fetch_add(1, Ordering::Relaxed); + self.list.push_front(val); + self.count.fetch_add(1, Ordering::Relaxed); } pub(crate) fn pop_back(&mut self) -> Option { - let val = self._list.pop_back(); + let val = self.list.pop_back(); if val.is_some() { - self._count.fetch_sub(1, Ordering::Relaxed); + self.count.fetch_sub(1, Ordering::Relaxed); } val } pub(crate) fn is_empty(&self) -> bool { - self._list.is_empty() + self.list.is_empty() } pub(crate) unsafe fn remove(&mut self, node: NonNull) -> Option { - let val = self._list.remove(node); + let val = self.list.remove(node); if val.is_some() { - self._count.fetch_sub(1, Ordering::Relaxed); + self.count.fetch_sub(1, Ordering::Relaxed); } val } pub(crate) fn count(&self) -> usize { - self._count.load(Ordering::Relaxed) + self.count.load(Ordering::Relaxed) } } From 9292ed20d3b46e492d374bfe1f9ea68f576f3145 Mon Sep 17 00:00:00 2001 From: Matilda Smeds Date: Mon, 24 Apr 2023 22:13:40 +0200 Subject: [PATCH 7/8] remove const from fn def --- tokio/src/util/linked_list.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index 035728ea910..49ac74059de 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -239,7 +239,7 @@ pub(crate) struct CountedLinkedList { } impl CountedLinkedList { - pub(crate) const fn new() -> CountedLinkedList { + pub(crate) fn new() -> CountedLinkedList { CountedLinkedList { list: LinkedList::new(), count: AtomicUsize::new(0), From 041d2a507fd7a7965b5d687b6b3a63eb282f2f6b Mon Sep 17 00:00:00 2001 From: Matilda Smeds Date: Wed, 26 Apr 2023 21:18:48 +0200 Subject: [PATCH 8/8] Make counter non-atomic --- tokio/src/util/linked_list.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index 49ac74059de..74a684d1a05 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -11,7 +11,6 @@ use core::fmt; use core::marker::{PhantomData, PhantomPinned}; use core::mem::ManuallyDrop; use core::ptr::{self, NonNull}; -use std::sync::atomic::{AtomicUsize, Ordering}; /// An intrusive linked list. /// @@ -235,26 +234,26 @@ impl fmt::Debug for LinkedList { // in the list. pub(crate) struct CountedLinkedList { list: LinkedList, - count: AtomicUsize, + count: usize, } impl CountedLinkedList { pub(crate) fn new() -> CountedLinkedList { CountedLinkedList { list: LinkedList::new(), - count: AtomicUsize::new(0), + count: 0, } } pub(crate) fn push_front(&mut self, val: L::Handle) { self.list.push_front(val); - self.count.fetch_add(1, Ordering::Relaxed); + self.count += 1; } pub(crate) fn pop_back(&mut self) -> Option { let val = self.list.pop_back(); if val.is_some() { - self.count.fetch_sub(1, Ordering::Relaxed); + self.count -= 1; } val } @@ -266,13 +265,13 @@ impl CountedLinkedList { pub(crate) unsafe fn remove(&mut self, node: NonNull) -> Option { let val = self.list.remove(node); if val.is_some() { - self.count.fetch_sub(1, Ordering::Relaxed); + self.count -= 1; } val } pub(crate) fn count(&self) -> usize { - self.count.load(Ordering::Relaxed) + self.count } }