From 3a4aef17b2c70d255affa51eb473efcf703896e4 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Thu, 7 Dec 2023 18:45:07 +0800 Subject: [PATCH] runtime: reduce the lock contention in task spawn (#6001) --- benches/Cargo.toml | 1 + tokio/src/runtime/builder.rs | 1 - tokio/src/runtime/id.rs | 8 +- .../runtime/scheduler/current_thread/mod.rs | 6 +- .../runtime/scheduler/multi_thread/worker.rs | 13 +- .../scheduler/multi_thread_alt/worker.rs | 6 +- tokio/src/runtime/task/id.rs | 19 ++- tokio/src/runtime/task/list.rs | 110 +++++++------ tokio/src/runtime/task/mod.rs | 14 ++ tokio/src/runtime/tests/task.rs | 5 +- tokio/src/util/linked_list.rs | 78 --------- tokio/src/util/mod.rs | 4 + tokio/src/util/sharded_list.rs | 149 ++++++++++++++++++ 13 files changed, 272 insertions(+), 142 deletions(-) create mode 100644 tokio/src/util/sharded_list.rs diff --git a/benches/Cargo.toml b/benches/Cargo.toml index e0b162b422e..1eea2e04489 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -12,6 +12,7 @@ tokio = { version = "1.5.0", path = "../tokio", features = ["full"] } criterion = "0.5.1" rand = "0.8" rand_chacha = "0.3" +num_cpus = "1.16.0" [dev-dependencies] tokio-util = { version = "0.7.0", path = "../tokio-util", features = ["full"] } diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 37d7cfe8f0b..78e6bf50d62 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1279,7 +1279,6 @@ cfg_rt_multi_thread! { use crate::runtime::scheduler::MultiThreadAlt; let core_threads = self.worker_threads.unwrap_or_else(num_cpus); - let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?; // Create the blocking pool diff --git a/tokio/src/runtime/id.rs b/tokio/src/runtime/id.rs index 58551d49989..8c6df7fcefb 100644 --- a/tokio/src/runtime/id.rs +++ b/tokio/src/runtime/id.rs @@ -1,5 +1,5 @@ use std::fmt; -use std::num::NonZeroU64; +use std::num::{NonZeroU32, NonZeroU64}; /// An opaque ID that uniquely identifies a runtime relative to all other currently /// running runtimes. @@ -39,6 +39,12 @@ impl From for Id { } } +impl From for Id { + fn from(value: NonZeroU32) -> Self { + Id(value.into()) + } +} + impl fmt::Display for Id { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.0.fmt(f) diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index bc5b65ad329..3ae3d7accfc 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -132,7 +132,7 @@ impl CurrentThread { let handle = Arc::new(Handle { shared: Shared { inject: Inject::new(), - owned: OwnedTasks::new(), + owned: OwnedTasks::new(1), woken: AtomicBool::new(false), config, scheduler_metrics: SchedulerMetrics::new(), @@ -248,7 +248,7 @@ fn shutdown2(mut core: Box, handle: &Handle) -> Box { // Drain the OwnedTasks collection. This call also closes the // collection, ensuring that no tasks are ever pushed after this // call returns. - handle.shared.owned.close_and_shutdown_all(); + handle.shared.owned.close_and_shutdown_all(0); // Drain local queue // We already shut down every task, so we just need to drop the task. @@ -614,7 +614,7 @@ impl Schedule for Arc { // If `None`, the runtime is shutting down, so there is no need to signal shutdown if let Some(core) = core.as_mut() { core.unhandled_panic = true; - self.shared.owned.close_and_shutdown_all(); + self.shared.owned.close_and_shutdown_all(0); } } _ => unreachable!("runtime core not set in CURRENT thread-local"), diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index dbd88867006..313e2ea68f7 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -287,7 +287,7 @@ pub(super) fn create( remotes: remotes.into_boxed_slice(), inject, idle, - owned: OwnedTasks::new(), + owned: OwnedTasks::new(size), synced: Mutex::new(Synced { idle: idle_synced, inject: inject_synced, @@ -548,7 +548,6 @@ impl Context { } core.pre_shutdown(&self.worker); - // Signal shutdown self.worker.handle.shutdown_core(core); Err(()) @@ -955,8 +954,16 @@ impl Core { /// Signals all tasks to shut down, and waits for them to complete. Must run /// before we enter the single-threaded phase of shutdown processing. fn pre_shutdown(&mut self, worker: &Worker) { + // Start from a random inner list + let start = self + .rand + .fastrand_n(worker.handle.shared.owned.get_shard_size() as u32); // Signal to all tasks to shut down. - worker.handle.shared.owned.close_and_shutdown_all(); + worker + .handle + .shared + .owned + .close_and_shutdown_all(start as usize); self.stats .submit(&worker.handle.shared.worker_metrics[worker.index]); diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index 66865f59753..8d16418a80b 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -307,7 +307,7 @@ pub(super) fn create( remotes: remotes.into_boxed_slice(), inject, idle, - owned: OwnedTasks::new(), + owned: OwnedTasks::new(num_cores), synced: Mutex::new(Synced { assigned_cores: (0..num_workers).map(|_| None).collect(), shutdown_cores: Vec::with_capacity(num_cores), @@ -1460,7 +1460,9 @@ impl Shared { } pub(super) fn shutdown_core(&self, handle: &Handle, mut core: Box) { - self.owned.close_and_shutdown_all(); + // Start from a random inner list + let start = core.rand.fastrand_n(self.owned.get_shard_size() as u32); + self.owned.close_and_shutdown_all(start as usize); core.stats.submit(&self.worker_metrics[core.index]); diff --git a/tokio/src/runtime/task/id.rs b/tokio/src/runtime/task/id.rs index 2b0d95c0243..82c8a7e7e90 100644 --- a/tokio/src/runtime/task/id.rs +++ b/tokio/src/runtime/task/id.rs @@ -24,7 +24,7 @@ use std::fmt; #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))] #[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] #[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)] -pub struct Id(u64); +pub struct Id(pub(crate) u64); /// Returns the [`Id`] of the currently running task. /// @@ -74,11 +74,22 @@ impl fmt::Display for Id { impl Id { pub(crate) fn next() -> Self { - use crate::loom::sync::atomic::{Ordering::Relaxed, StaticAtomicU64}; + use crate::loom::sync::atomic::Ordering::Relaxed; + use crate::loom::sync::atomic::StaticAtomicU64; - static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1); + #[cfg(all(test, loom))] + { + crate::loom::lazy_static! { + static ref NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1); + } + Self(NEXT_ID.fetch_add(1, Relaxed)) + } - Self(NEXT_ID.fetch_add(1, Relaxed)) + #[cfg(not(all(test, loom)))] + { + static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1); + Self(NEXT_ID.fetch_add(1, Relaxed)) + } } pub(crate) fn as_u64(&self) -> u64 { diff --git a/tokio/src/runtime/task/list.rs b/tokio/src/runtime/task/list.rs index 3a1fcce2ec4..3d2a121cf1d 100644 --- a/tokio/src/runtime/task/list.rs +++ b/tokio/src/runtime/task/list.rs @@ -8,10 +8,11 @@ use crate::future::Future; use crate::loom::cell::UnsafeCell; -use crate::loom::sync::Mutex; use crate::runtime::task::{JoinHandle, LocalNotified, Notified, Schedule, Task}; -use crate::util::linked_list::{CountedLinkedList, Link, LinkedList}; +use crate::util::linked_list::{Link, LinkedList}; +use crate::util::sharded_list; +use crate::loom::sync::atomic::{AtomicBool, Ordering}; use std::marker::PhantomData; use std::num::NonZeroU64; @@ -25,7 +26,7 @@ use std::num::NonZeroU64; // mixed up runtimes happen to have the same id. cfg_has_atomic_u64! { - use std::sync::atomic::{AtomicU64, Ordering}; + use std::sync::atomic::AtomicU64; static NEXT_OWNED_TASKS_ID: AtomicU64 = AtomicU64::new(1); @@ -40,7 +41,7 @@ cfg_has_atomic_u64! { } cfg_not_has_atomic_u64! { - use std::sync::atomic::{AtomicU32, Ordering}; + use std::sync::atomic::AtomicU32; static NEXT_OWNED_TASKS_ID: AtomicU32 = AtomicU32::new(1); @@ -55,30 +56,30 @@ cfg_not_has_atomic_u64! { } pub(crate) struct OwnedTasks { - inner: Mutex>, + list: List, pub(crate) id: NonZeroU64, + closed: AtomicBool, } -struct CountedOwnedTasksInner { - list: CountedLinkedList, as Link>::Target>, - closed: bool, -} + +type List = sharded_list::ShardedList, as Link>::Target>; + pub(crate) struct LocalOwnedTasks { inner: UnsafeCell>, pub(crate) id: NonZeroU64, _not_send_or_sync: PhantomData<*const ()>, } + struct OwnedTasksInner { list: LinkedList, as Link>::Target>, closed: bool, } impl OwnedTasks { - pub(crate) fn new() -> Self { + pub(crate) fn new(num_cores: usize) -> Self { + let shard_size = Self::gen_shared_list_size(num_cores); Self { - inner: Mutex::new(CountedOwnedTasksInner { - list: CountedLinkedList::new(), - closed: false, - }), + list: List::new(shard_size), + closed: AtomicBool::new(false), id: get_next_id(), } } @@ -112,16 +113,16 @@ impl OwnedTasks { task.header().set_owner_id(self.id); } - let mut lock = self.inner.lock(); - if lock.closed { - drop(lock); - drop(notified); + let shard = self.list.lock_shard(&task); + // Check the closed flag in the lock for ensuring all that tasks + // will shut down after the OwnedTasks has been closed. + if self.closed.load(Ordering::Acquire) { + drop(shard); task.shutdown(); - None - } else { - lock.list.push_front(task); - Some(notified) + return None; } + shard.push(task); + Some(notified) } /// Asserts that the given task is owned by this OwnedTasks and convert it to @@ -129,7 +130,6 @@ impl OwnedTasks { #[inline] pub(crate) fn assert_owner(&self, task: Notified) -> LocalNotified { debug_assert_eq!(task.header().get_owner_id(), Some(self.id)); - // safety: All tasks bound to this OwnedTasks are Send, so it is safe // to poll it on this thread no matter what thread we are on. LocalNotified { @@ -140,34 +140,34 @@ impl OwnedTasks { /// Shuts down all tasks in the collection. This call also closes the /// collection, preventing new items from being added. - pub(crate) fn close_and_shutdown_all(&self) + /// + /// The parameter start determines which shard this method will start at. + /// Using different values for each worker thread reduces contention. + pub(crate) fn close_and_shutdown_all(&self, start: usize) where S: Schedule, { - // The first iteration of the loop was unrolled so it can set the - // closed bool. - let first_task = { - let mut lock = self.inner.lock(); - lock.closed = true; - lock.list.pop_back() - }; - match first_task { - Some(task) => task.shutdown(), - None => return, + self.closed.store(true, Ordering::Release); + for i in start..self.get_shard_size() + start { + loop { + let task = self.list.pop_back(i); + match task { + Some(task) => { + task.shutdown(); + } + None => break, + } + } } + } - loop { - let task = match self.inner.lock().list.pop_back() { - Some(task) => task, - None => return, - }; - - task.shutdown(); - } + #[inline] + pub(crate) fn get_shard_size(&self) -> usize { + self.list.shard_size() } pub(crate) fn active_tasks_count(&self) -> usize { - self.inner.lock().list.count() + self.list.len() } pub(crate) fn remove(&self, task: &Task) -> Option> { @@ -179,11 +179,27 @@ impl OwnedTasks { // safety: We just checked that the provided task is not in some other // linked list. - unsafe { self.inner.lock().list.remove(task.header_ptr()) } + unsafe { self.list.remove(task.header_ptr()) } } pub(crate) fn is_empty(&self) -> bool { - self.inner.lock().list.is_empty() + self.list.is_empty() + } + + /// Generates the size of the sharded list based on the number of worker threads. + /// + /// The sharded lock design can effectively alleviate + /// lock contention performance problems caused by high concurrency. + /// + /// However, as the number of shards increases, the memory continuity between + /// nodes in the intrusive linked list will diminish. Furthermore, + /// the construction time of the sharded list will also increase with a higher number of shards. + /// + /// Due to the above reasons, we set a maximum value for the shared list size, + /// denoted as `MAX_SHARED_LIST_SIZE`. + fn gen_shared_list_size(num_cores: usize) -> usize { + const MAX_SHARED_LIST_SIZE: usize = 1 << 16; + usize::min(MAX_SHARED_LIST_SIZE, num_cores.next_power_of_two() * 4) } } @@ -192,9 +208,9 @@ cfg_taskdump! { /// Locks the tasks, and calls `f` on an iterator over them. pub(crate) fn for_each(&self, f: F) where - F: FnMut(&Task) + F: FnMut(&Task), { - self.inner.lock().list.for_each(f) + self.list.for_each(f); } } } diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index d7fde0fe67d..e48788567e4 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -208,6 +208,7 @@ cfg_taskdump! { use crate::future::Future; use crate::util::linked_list; +use crate::util::sharded_list; use std::marker::PhantomData; use std::ptr::NonNull; @@ -503,3 +504,16 @@ unsafe impl linked_list::Link for Task { self::core::Trailer::addr_of_owned(Header::get_trailer(target)) } } + +/// # Safety +/// +/// The id of a task is never changed after creation of the task, so the return value of +/// `get_shard_id` will not change. (The cast may throw away the upper 32 bits of the task id, but +/// the shard id still won't change from call to call.) +unsafe impl sharded_list::ShardedListItem for Task { + unsafe fn get_shard_id(target: NonNull) -> usize { + // SAFETY: The caller guarantees that `target` points at a valid task. + let task_id = unsafe { Header::get_id(target) }; + task_id.0 as usize + } +} diff --git a/tokio/src/runtime/tests/task.rs b/tokio/src/runtime/tests/task.rs index 0485bba7a00..a0604505ccc 100644 --- a/tokio/src/runtime/tests/task.rs +++ b/tokio/src/runtime/tests/task.rs @@ -241,7 +241,7 @@ fn with(f: impl FnOnce(Runtime)) { let _reset = Reset; let rt = Runtime(Arc::new(Inner { - owned: OwnedTasks::new(), + owned: OwnedTasks::new(16), core: Mutex::new(Core { queue: VecDeque::new(), }), @@ -308,14 +308,13 @@ impl Runtime { fn shutdown(&self) { let mut core = self.0.core.try_lock().unwrap(); - self.0.owned.close_and_shutdown_all(); + self.0.owned.close_and_shutdown_all(0); while let Some(task) = core.queue.pop_back() { drop(task); } drop(core); - assert!(self.0.owned.is_empty()); } } diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index cda7e3398bf..0ed2b616456 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -228,53 +228,6 @@ impl fmt::Debug for LinkedList { } } -// ===== impl CountedLinkedList ==== - -// Delegates operations to the base LinkedList implementation, and adds a counter to the elements -// in the list. -pub(crate) struct CountedLinkedList { - list: LinkedList, - count: usize, -} - -impl CountedLinkedList { - pub(crate) fn new() -> CountedLinkedList { - CountedLinkedList { - list: LinkedList::new(), - count: 0, - } - } - - pub(crate) fn push_front(&mut self, val: L::Handle) { - self.list.push_front(val); - self.count += 1; - } - - pub(crate) fn pop_back(&mut self) -> Option { - let val = self.list.pop_back(); - if val.is_some() { - self.count -= 1; - } - val - } - - pub(crate) fn is_empty(&self) -> bool { - self.list.is_empty() - } - - pub(crate) unsafe fn remove(&mut self, node: NonNull) -> Option { - let val = self.list.remove(node); - if val.is_some() { - self.count -= 1; - } - val - } - - pub(crate) fn count(&self) -> usize { - self.count - } -} - #[cfg(any( feature = "fs", feature = "rt", @@ -342,22 +295,11 @@ cfg_io_driver_impl! { } cfg_taskdump! { - impl CountedLinkedList { - pub(crate) fn for_each(&mut self, f: F) - where - F: FnMut(&T::Handle), - { - self.list.for_each(f) - } - } - impl LinkedList { pub(crate) fn for_each(&mut self, mut f: F) where F: FnMut(&T::Handle), { - use std::mem::ManuallyDrop; - let mut next = self.head; while let Some(curr) = next { @@ -796,26 +738,6 @@ pub(crate) mod tests { } } - #[test] - fn count() { - let mut list = CountedLinkedList::<&Entry, <&Entry as Link>::Target>::new(); - assert_eq!(0, list.count()); - - let a = entry(5); - let b = entry(7); - list.push_front(a.as_ref()); - list.push_front(b.as_ref()); - assert_eq!(2, list.count()); - - list.pop_back(); - assert_eq!(1, list.count()); - - unsafe { - list.remove(ptr(&b)); - } - assert_eq!(0, list.count()); - } - /// This is a fuzz test. You run it by entering `cargo fuzz run fuzz_linked_list` in CLI in `/tokio/` module. #[cfg(fuzzing)] pub fn fuzz_linked_list(ops: &[u8]) { diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 7d4cd5f9c7c..abdb70406d2 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -42,6 +42,10 @@ pub(crate) use wake_list::WakeList; ))] pub(crate) mod linked_list; +cfg_rt! { + pub(crate) mod sharded_list; +} + #[cfg(any(feature = "rt", feature = "macros"))] pub(crate) mod rand; diff --git a/tokio/src/util/sharded_list.rs b/tokio/src/util/sharded_list.rs new file mode 100644 index 00000000000..c1009db94c9 --- /dev/null +++ b/tokio/src/util/sharded_list.rs @@ -0,0 +1,149 @@ +use std::ptr::NonNull; +use std::sync::atomic::Ordering; + +use crate::loom::sync::{Mutex, MutexGuard}; +use std::sync::atomic::AtomicUsize; + +use super::linked_list::{Link, LinkedList}; + +/// An intrusive linked list supporting highly concurrent updates. +/// +/// It currently relies on `LinkedList`, so it is the caller's +/// responsibility to ensure the list is empty before dropping it. +/// +/// Note: Due to its inner sharded design, the order of nodes cannot be guaranteed. +pub(crate) struct ShardedList { + lists: Box<[Mutex>]>, + count: AtomicUsize, + shard_mask: usize, +} + +/// Determines which linked list an item should be stored in. +/// +/// # Safety +/// +/// Implementations must guarantee that the id of an item does not change from +/// call to call. +pub(crate) unsafe trait ShardedListItem: Link { + /// # Safety + /// The provided pointer must point at a valid list item. + unsafe fn get_shard_id(target: NonNull) -> usize; +} + +impl ShardedList { + /// Creates a new and empty sharded linked list with the specified size. + pub(crate) fn new(sharded_size: usize) -> Self { + assert!(sharded_size.is_power_of_two()); + + let shard_mask = sharded_size - 1; + let mut lists = Vec::with_capacity(sharded_size); + for _ in 0..sharded_size { + lists.push(Mutex::new(LinkedList::::new())) + } + Self { + lists: lists.into_boxed_slice(), + count: AtomicUsize::new(0), + shard_mask, + } + } +} + +/// Used to get the lock of shard. +pub(crate) struct ShardGuard<'a, L, T> { + lock: MutexGuard<'a, LinkedList>, + count: &'a AtomicUsize, + id: usize, +} + +impl ShardedList { + /// Removes the last element from a list specified by shard_id and returns it, or None if it is + /// empty. + pub(crate) fn pop_back(&self, shard_id: usize) -> Option { + let mut lock = self.shard_inner(shard_id); + let node = lock.pop_back(); + if node.is_some() { + self.count.fetch_sub(1, Ordering::Relaxed); + } + node + } + + /// Removes the specified node from the list. + /// + /// # Safety + /// + /// The caller **must** ensure that exactly one of the following is true: + /// - `node` is currently contained by `self`, + /// - `node` is not contained by any list, + /// - `node` is currently contained by some other `GuardedLinkedList`. + pub(crate) unsafe fn remove(&self, node: NonNull) -> Option { + let id = L::get_shard_id(node); + let mut lock = self.shard_inner(id); + // SAFETY: Since the shard id cannot change, it's not possible for this node + // to be in any other list of the same sharded list. + let node = unsafe { lock.remove(node) }; + if node.is_some() { + self.count.fetch_sub(1, Ordering::Relaxed); + } + node + } + + /// Gets the lock of ShardedList, makes us have the write permission. + pub(crate) fn lock_shard(&self, val: &L::Handle) -> ShardGuard<'_, L, L::Target> { + let id = unsafe { L::get_shard_id(L::as_raw(val)) }; + ShardGuard { + lock: self.shard_inner(id), + count: &self.count, + id, + } + } + + /// Gets the count of elements in this list. + pub(crate) fn len(&self) -> usize { + self.count.load(Ordering::Relaxed) + } + + /// Returns whether the linked list does not contain any node. + pub(crate) fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Gets the shard size of this SharedList. + /// + /// Used to help us to decide the parameter `shard_id` of the `pop_back` method. + pub(crate) fn shard_size(&self) -> usize { + self.shard_mask + 1 + } + + #[inline] + fn shard_inner(&self, id: usize) -> MutexGuard<'_, LinkedList::Target>> { + // Safety: This modulo operation ensures that the index is not out of bounds. + unsafe { self.lists.get_unchecked(id & self.shard_mask).lock() } + } +} + +impl<'a, L: ShardedListItem> ShardGuard<'a, L, L::Target> { + /// Push a value to this shard. + pub(crate) fn push(mut self, val: L::Handle) { + let id = unsafe { L::get_shard_id(L::as_raw(&val)) }; + assert_eq!(id, self.id); + self.lock.push_front(val); + self.count.fetch_add(1, Ordering::Relaxed); + } +} + +cfg_taskdump! { + impl ShardedList { + pub(crate) fn for_each(&self, mut f: F) + where + F: FnMut(&L::Handle), + { + let mut guards = Vec::with_capacity(self.lists.len()); + for list in self.lists.iter() { + guards.push(list.lock()); + } + for g in &mut guards { + g.for_each(&mut f); + } + } + } +}