From ce2f56c4f88085d9fbcb4e3d8268d178ca425bcc Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 31 May 2023 18:03:01 -0700 Subject: [PATCH 1/2] rt: move Inject to `runtime::scheduler` Previously, `Inject` was defined in `runtime::task`. This was because it used some internal fns as part of the intrusive linked-list implementation. In the future, we want to remove the mutex from Inject and move it to the scheduler proper (to reduce mutex ops). To set this up, this commit moves `Inject` to `runtime::scheduler`. To make this work, we have to `pub(crate)` `task::RawTask` and use that as the interface to access the next / previous pointers. --- tokio/src/runtime/scheduler/current_thread.rs | 5 +-- .../src/runtime/{task => scheduler}/inject.rs | 17 +++++----- tokio/src/runtime/scheduler/mod.rs | 3 ++ .../runtime/scheduler/multi_thread/queue.rs | 3 +- .../runtime/scheduler/multi_thread/worker.rs | 3 +- tokio/src/runtime/task/mod.rs | 31 ++++++++----------- tokio/src/runtime/task/raw.rs | 23 +++++++++++++- tokio/src/runtime/task/trace/mod.rs | 3 +- tokio/src/runtime/tests/inject.rs | 2 +- tokio/src/runtime/tests/queue.rs | 3 +- 10 files changed, 57 insertions(+), 36 deletions(-) rename tokio/src/runtime/{task => scheduler}/inject.rs (94%) diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index eb20b7ba027..c7e46887e02 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -2,8 +2,9 @@ use crate::future::poll_fn; use crate::loom::sync::atomic::AtomicBool; use crate::loom::sync::Arc; use crate::runtime::driver::{self, Driver}; -use crate::runtime::task::{self, Inject, JoinHandle, OwnedTasks, Schedule, Task}; -use crate::runtime::{blocking, context, scheduler, Config}; +use crate::runtime::scheduler::{self, Inject}; +use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; +use crate::runtime::{blocking, context, Config}; use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics}; use crate::sync::notify::Notify; use crate::util::atomic_cell::AtomicCell; diff --git a/tokio/src/runtime/task/inject.rs b/tokio/src/runtime/scheduler/inject.rs similarity index 94% rename from tokio/src/runtime/task/inject.rs rename to tokio/src/runtime/scheduler/inject.rs index 6b7eee07328..4b65449e7e7 100644 --- a/tokio/src/runtime/task/inject.rs +++ b/tokio/src/runtime/scheduler/inject.rs @@ -5,7 +5,6 @@ use crate::loom::sync::{Mutex, MutexGuard}; use crate::runtime::task; use std::marker::PhantomData; -use std::ptr::NonNull; use std::sync::atomic::Ordering::{Acquire, Release}; /// Growable, MPMC queue used to inject new tasks into the scheduler and as an @@ -26,10 +25,10 @@ struct Pointers { is_closed: bool, /// Linked-list head. - head: Option>, + head: Option, /// Linked-list tail. - tail: Option>, + tail: Option, } pub(crate) struct Pop<'a, T: 'static> { @@ -190,8 +189,8 @@ cfg_rt_multi_thread! { #[inline] fn push_batch_inner( &self, - batch_head: NonNull, - batch_tail: NonNull, + batch_head: task::RawTask, + batch_tail: task::RawTask, num: usize, ) { debug_assert!(get_next(batch_tail).is_none()); @@ -282,12 +281,12 @@ impl Pointers { } } -fn get_next(header: NonNull) -> Option> { - unsafe { header.as_ref().queue_next.with(|ptr| *ptr) } +fn get_next(task: task::RawTask) -> Option { + unsafe { task.get_queue_next() } } -fn set_next(header: NonNull, val: Option>) { +fn set_next(task: task::RawTask, val: Option) { unsafe { - header.as_ref().set_next(val); + task.set_queue_next(val); } } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 4d423f6e4d4..e3926d140f5 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -1,6 +1,9 @@ cfg_rt! { pub(crate) mod current_thread; pub(crate) use current_thread::CurrentThread; + + mod inject; + pub(crate) use inject::Inject; } cfg_rt_multi_thread! { diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index dd132fb9a6d..f52aac79772 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -2,7 +2,8 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::Arc; -use crate::runtime::task::{self, Inject}; +use crate::runtime::scheduler::Inject; +use crate::runtime::task; use crate::runtime::MetricsBatch; use std::mem::{self, MaybeUninit}; diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index bedaff39e4f..975f0a129c0 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -60,7 +60,8 @@ use crate::loom::sync::{Arc, Mutex}; use crate::runtime; use crate::runtime::context; use crate::runtime::scheduler::multi_thread::{queue, Counters, Handle, Idle, Parker, Unparker}; -use crate::runtime::task::{Inject, OwnedTasks}; +use crate::runtime::scheduler::Inject; +use crate::runtime::task::OwnedTasks; use crate::runtime::{ blocking, coop, driver, scheduler, task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics, }; diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 8da59cb0601..932552fb915 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -182,9 +182,6 @@ mod id; #[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] pub use id::{id, try_id, Id}; -mod inject; -pub(super) use self::inject::Inject; - #[cfg(feature = "rt")] mod abort; mod join; @@ -198,7 +195,7 @@ mod list; pub(crate) use self::list::{LocalOwnedTasks, OwnedTasks}; mod raw; -use self::raw::RawTask; +pub(crate) use self::raw::RawTask; mod state; use self::state::State; @@ -335,13 +332,17 @@ cfg_rt! { } impl Task { - unsafe fn from_raw(ptr: NonNull
) -> Task { + unsafe fn new(raw: RawTask) -> Task { Task { - raw: RawTask::from_raw(ptr), + raw, _p: PhantomData, } } + unsafe fn from_raw(ptr: NonNull
) -> Task { + Task::new(RawTask::from_raw(ptr)) + } + #[cfg(all( tokio_unstable, tokio_taskdump, @@ -369,22 +370,16 @@ impl Notified { } impl Notified { - unsafe fn from_raw(ptr: NonNull
) -> Notified { - Notified(Task::from_raw(ptr)) - } -} - -impl Task { - fn into_raw(self) -> NonNull
{ - let ret = self.raw.header_ptr(); - mem::forget(self); - ret + pub(crate) unsafe fn from_raw(ptr: RawTask) -> Notified { + Notified(Task::new(ptr)) } } impl Notified { - fn into_raw(self) -> NonNull
{ - self.0.into_raw() + pub(crate) fn into_raw(self) -> RawTask { + let raw = self.0.raw; + mem::forget(self); + raw } } diff --git a/tokio/src/runtime/task/raw.rs b/tokio/src/runtime/task/raw.rs index 72bfa3f46c1..8078859285d 100644 --- a/tokio/src/runtime/task/raw.rs +++ b/tokio/src/runtime/task/raw.rs @@ -6,7 +6,7 @@ use std::ptr::NonNull; use std::task::{Poll, Waker}; /// Raw task handle -pub(in crate::runtime) struct RawTask { +pub(crate) struct RawTask { ptr: NonNull
, } @@ -240,6 +240,27 @@ impl RawTask { pub(super) fn ref_inc(self) { self.header().state.ref_inc(); } + + /// Get the queue-next pointer + /// + /// This is for usage by the injection queue + /// + /// Safety: make sure only one queue uses this and access is synchronized. + pub(crate) unsafe fn get_queue_next(self) -> Option { + self.header() + .queue_next + .with(|ptr| *ptr) + .map(|p| RawTask::from_raw(p)) + } + + /// Sets the queue-next pointer + /// + /// This is for usage by the injection queue + /// + /// Safety: make sure only one queue uses this and access is synchronized. + pub(crate) unsafe fn set_queue_next(self, val: Option) { + self.header().set_next(val.map(|task| task.ptr)); + } } impl Clone for RawTask { diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index 2f1d7dba4f0..3da1772d2c9 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -1,6 +1,5 @@ use crate::loom::sync::Arc; -use crate::runtime::scheduler::current_thread; -use crate::runtime::task::Inject; +use crate::runtime::scheduler::{current_thread, Inject}; use backtrace::BacktraceFrame; use std::cell::Cell; use std::collections::VecDeque; diff --git a/tokio/src/runtime/tests/inject.rs b/tokio/src/runtime/tests/inject.rs index 92431855485..149654d19a0 100644 --- a/tokio/src/runtime/tests/inject.rs +++ b/tokio/src/runtime/tests/inject.rs @@ -1,4 +1,4 @@ -use crate::runtime::task::Inject; +use crate::runtime::scheduler::Inject; #[test] fn push_and_pop() { diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index 09f249e9ea4..f0fb8f0d235 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -1,5 +1,6 @@ use crate::runtime::scheduler::multi_thread::queue; -use crate::runtime::task::{self, Inject, Schedule, Task}; +use crate::runtime::scheduler::Inject; +use crate::runtime::task::{self, Schedule, Task}; use crate::runtime::MetricsBatch; use std::thread; From 6b1f7b797b2ab08c901d44f1edcbc83fb4833b0a Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 31 May 2023 18:12:16 -0700 Subject: [PATCH 2/2] fix loom tests --- tokio/src/runtime/tests/loom_queue.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/tests/loom_queue.rs b/tokio/src/runtime/tests/loom_queue.rs index 58c17ad65c2..c9aa111f6f2 100644 --- a/tokio/src/runtime/tests/loom_queue.rs +++ b/tokio/src/runtime/tests/loom_queue.rs @@ -1,5 +1,5 @@ use crate::runtime::scheduler::multi_thread::queue; -use crate::runtime::task::Inject; +use crate::runtime::scheduler::Inject; use crate::runtime::tests::NoopSchedule; use crate::runtime::MetricsBatch;