diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index 2cbe2973980..ac859de67c1 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -2,8 +2,8 @@ use crate::future::poll_fn; use crate::loom::sync::atomic::AtomicBool; use crate::loom::sync::Arc; use crate::runtime::driver::{self, Driver}; -use crate::runtime::scheduler::{self, Defer}; -use crate::runtime::task::{self, Inject, JoinHandle, OwnedTasks, Schedule, Task}; +use crate::runtime::scheduler::{self, Defer, Inject}; +use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; use crate::runtime::{blocking, context, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics}; use crate::sync::notify::Notify; use crate::util::atomic_cell::AtomicCell; diff --git a/tokio/src/runtime/task/inject.rs b/tokio/src/runtime/scheduler/inject.rs similarity index 94% rename from tokio/src/runtime/task/inject.rs rename to tokio/src/runtime/scheduler/inject.rs index 6b7eee07328..4b65449e7e7 100644 --- a/tokio/src/runtime/task/inject.rs +++ b/tokio/src/runtime/scheduler/inject.rs @@ -5,7 +5,6 @@ use crate::loom::sync::{Mutex, MutexGuard}; use crate::runtime::task; use std::marker::PhantomData; -use std::ptr::NonNull; use std::sync::atomic::Ordering::{Acquire, Release}; /// Growable, MPMC queue used to inject new tasks into the scheduler and as an @@ -26,10 +25,10 @@ struct Pointers { is_closed: bool, /// Linked-list head. - head: Option>, + head: Option, /// Linked-list tail. - tail: Option>, + tail: Option, } pub(crate) struct Pop<'a, T: 'static> { @@ -190,8 +189,8 @@ cfg_rt_multi_thread! { #[inline] fn push_batch_inner( &self, - batch_head: NonNull, - batch_tail: NonNull, + batch_head: task::RawTask, + batch_tail: task::RawTask, num: usize, ) { debug_assert!(get_next(batch_tail).is_none()); @@ -282,12 +281,12 @@ impl Pointers { } } -fn get_next(header: NonNull) -> Option> { - unsafe { header.as_ref().queue_next.with(|ptr| *ptr) } +fn get_next(task: task::RawTask) -> Option { + unsafe { task.get_queue_next() } } -fn set_next(header: NonNull, val: Option>) { +fn set_next(task: task::RawTask, val: Option) { unsafe { - header.as_ref().set_next(val); + task.set_queue_next(val); } } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 118e555d298..93390453eb2 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -4,6 +4,9 @@ cfg_rt! { mod defer; use defer::Defer; + + mod inject; + pub(crate) use inject::Inject; } cfg_rt_multi_thread! { diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index 2a0ae833850..c61c4de609d 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -3,7 +3,8 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::Arc; use crate::runtime::scheduler::multi_thread::Stats; -use crate::runtime::task::{self, Inject}; +use crate::runtime::scheduler::Inject; +use crate::runtime::task; use std::mem::{self, MaybeUninit}; use std::ptr; diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 7b87a9c0d9f..7b5b09f7573 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -62,8 +62,8 @@ use crate::runtime::context; use crate::runtime::scheduler::multi_thread::{ idle, queue, Counters, Handle, Idle, Parker, Stats, Unparker, }; -use crate::runtime::scheduler::Defer; -use crate::runtime::task::{Inject, OwnedTasks}; +use crate::runtime::scheduler::{Defer, Inject}; +use crate::runtime::task::OwnedTasks; use crate::runtime::{ blocking, coop, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics, }; diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 8da59cb0601..932552fb915 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -182,9 +182,6 @@ mod id; #[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] pub use id::{id, try_id, Id}; -mod inject; -pub(super) use self::inject::Inject; - #[cfg(feature = "rt")] mod abort; mod join; @@ -198,7 +195,7 @@ mod list; pub(crate) use self::list::{LocalOwnedTasks, OwnedTasks}; mod raw; -use self::raw::RawTask; +pub(crate) use self::raw::RawTask; mod state; use self::state::State; @@ -335,13 +332,17 @@ cfg_rt! { } impl Task { - unsafe fn from_raw(ptr: NonNull
) -> Task { + unsafe fn new(raw: RawTask) -> Task { Task { - raw: RawTask::from_raw(ptr), + raw, _p: PhantomData, } } + unsafe fn from_raw(ptr: NonNull
) -> Task { + Task::new(RawTask::from_raw(ptr)) + } + #[cfg(all( tokio_unstable, tokio_taskdump, @@ -369,22 +370,16 @@ impl Notified { } impl Notified { - unsafe fn from_raw(ptr: NonNull
) -> Notified { - Notified(Task::from_raw(ptr)) - } -} - -impl Task { - fn into_raw(self) -> NonNull
{ - let ret = self.raw.header_ptr(); - mem::forget(self); - ret + pub(crate) unsafe fn from_raw(ptr: RawTask) -> Notified { + Notified(Task::new(ptr)) } } impl Notified { - fn into_raw(self) -> NonNull
{ - self.0.into_raw() + pub(crate) fn into_raw(self) -> RawTask { + let raw = self.0.raw; + mem::forget(self); + raw } } diff --git a/tokio/src/runtime/task/raw.rs b/tokio/src/runtime/task/raw.rs index 72bfa3f46c1..8078859285d 100644 --- a/tokio/src/runtime/task/raw.rs +++ b/tokio/src/runtime/task/raw.rs @@ -6,7 +6,7 @@ use std::ptr::NonNull; use std::task::{Poll, Waker}; /// Raw task handle -pub(in crate::runtime) struct RawTask { +pub(crate) struct RawTask { ptr: NonNull
, } @@ -240,6 +240,27 @@ impl RawTask { pub(super) fn ref_inc(self) { self.header().state.ref_inc(); } + + /// Get the queue-next pointer + /// + /// This is for usage by the injection queue + /// + /// Safety: make sure only one queue uses this and access is synchronized. + pub(crate) unsafe fn get_queue_next(self) -> Option { + self.header() + .queue_next + .with(|ptr| *ptr) + .map(|p| RawTask::from_raw(p)) + } + + /// Sets the queue-next pointer + /// + /// This is for usage by the injection queue + /// + /// Safety: make sure only one queue uses this and access is synchronized. + pub(crate) unsafe fn set_queue_next(self, val: Option) { + self.header().set_next(val.map(|task| task.ptr)); + } } impl Clone for RawTask { diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index 99abc754852..6299d75d581 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -1,7 +1,6 @@ use crate::loom::sync::Arc; use crate::runtime::context; -use crate::runtime::scheduler::{self, current_thread}; -use crate::runtime::task::Inject; +use crate::runtime::scheduler::{self, current_thread, Inject}; use backtrace::BacktraceFrame; use std::cell::Cell; diff --git a/tokio/src/runtime/tests/inject.rs b/tokio/src/runtime/tests/inject.rs index 92431855485..149654d19a0 100644 --- a/tokio/src/runtime/tests/inject.rs +++ b/tokio/src/runtime/tests/inject.rs @@ -1,4 +1,4 @@ -use crate::runtime::task::Inject; +use crate::runtime::scheduler::Inject; #[test] fn push_and_pop() { diff --git a/tokio/src/runtime/tests/loom_queue.rs b/tokio/src/runtime/tests/loom_queue.rs index eb85807a6b0..756f7b8424b 100644 --- a/tokio/src/runtime/tests/loom_queue.rs +++ b/tokio/src/runtime/tests/loom_queue.rs @@ -1,5 +1,5 @@ use crate::runtime::scheduler::multi_thread::{queue, Stats}; -use crate::runtime::task::Inject; +use crate::runtime::scheduler::Inject; use crate::runtime::tests::NoopSchedule; use loom::thread; diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index 2e42f1bbb9e..5642d29d7eb 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -1,5 +1,6 @@ use crate::runtime::scheduler::multi_thread::{queue, Stats}; -use crate::runtime::task::{self, Inject, Schedule, Task}; +use crate::runtime::scheduler::Inject; +use crate::runtime::task::{self, Schedule, Task}; use std::thread; use std::time::Duration;