diff --git a/examples/dump.rs b/examples/dump.rs index 159cc603ba0..c7358489933 100644 --- a/examples/dump.rs +++ b/examples/dump.rs @@ -6,7 +6,7 @@ target_os = "linux", any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") ))] -#[tokio::main(flavor = "current_thread")] +#[tokio::main] async fn main() { use std::hint::black_box; @@ -22,21 +22,29 @@ async fn main() { #[inline(never)] async fn c() { - black_box(tokio::task::yield_now()).await + loop { + tokio::task::yield_now().await; + } } - tokio::spawn(a()); - tokio::spawn(b()); - tokio::spawn(c()); + async fn dump() { + let handle = tokio::runtime::Handle::current(); + let dump = handle.dump().await; - let handle = tokio::runtime::Handle::current(); - let dump = handle.dump(); - - for (i, task) in dump.tasks().iter().enumerate() { - let trace = task.trace(); - println!("task {i} trace:"); - println!("{trace}"); + for (i, task) in dump.tasks().iter().enumerate() { + let trace = task.trace(); + println!("task {i} trace:"); + println!("{trace}\n"); + } } + + tokio::select!( + biased; + _ = tokio::spawn(a()) => {}, + _ = tokio::spawn(b()) => {}, + _ = tokio::spawn(c()) => {}, + _ = dump() => {}, + ); } #[cfg(not(all( diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index a4dc437dbb6..43a7a8a63bd 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -343,15 +343,40 @@ cfg_metrics! { cfg_taskdump! { impl Handle { /// Capture a snapshot of this runtime's state. - pub fn dump(&self) -> crate::runtime::Dump { + pub async fn dump(&self) -> crate::runtime::Dump { match &self.inner { scheduler::Handle::CurrentThread(handle) => handle.dump(), #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] - scheduler::Handle::MultiThread(_) => - unimplemented!("taskdumps are unsupported on the multi-thread runtime"), + scheduler::Handle::MultiThread(handle) => { + // perform the trace in a separate thread so that the + // trace itself does not appear in the taskdump. + let handle = handle.clone(); + spawn_thread(async { + let handle = handle; + handle.dump().await + }).await + }, } } } + + cfg_rt_multi_thread! { + /// Spawn a new thread and asynchronously await on its result. + async fn spawn_thread(f: F) -> ::Output + where + F: Future + Send + 'static, + ::Output: Send + 'static + { + let (tx, rx) = crate::sync::oneshot::channel(); + crate::loom::thread::spawn(|| { + let rt = crate::runtime::Builder::new_current_thread().build().unwrap(); + rt.block_on(async { + let _ = tx.send(f.await); + }); + }); + rx.await.unwrap() + } + } } /// Error returned by `try_current` when no Runtime has been started diff --git a/tokio/src/runtime/scheduler/multi_thread/handle.rs b/tokio/src/runtime/scheduler/multi_thread/handle.rs index 77baaeb06d4..cf3cf52af34 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle.rs @@ -95,6 +95,31 @@ cfg_metrics! { } } +cfg_taskdump! { + impl Handle { + pub(crate) async fn dump(&self) -> crate::runtime::Dump { + let trace_status = &self.shared.trace_status; + + // If a dump is in progress, block. + trace_status.start_trace_request(&self).await; + + let result = loop { + if let Some(result) = trace_status.take_result() { + break result; + } else { + self.notify_all(); + trace_status.result_ready.notified().await; + } + }; + + // Allow other queued dumps to proceed. + trace_status.end_trace_request(&self).await; + + result + } + } +} + impl fmt::Debug for Handle { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("multi_thread::Handle { ... }").finish() diff --git a/tokio/src/runtime/scheduler/multi_thread/mod.rs b/tokio/src/runtime/scheduler/multi_thread/mod.rs index 306a622b3ed..3db15fa8430 100644 --- a/tokio/src/runtime/scheduler/multi_thread/mod.rs +++ b/tokio/src/runtime/scheduler/multi_thread/mod.rs @@ -23,6 +23,10 @@ pub(crate) mod queue; mod worker; pub(crate) use worker::{Context, Launch, Shared}; +cfg_taskdump! { + pub(crate) use worker::Synced; +} + pub(crate) use worker::block_in_place; use crate::loom::sync::Arc; diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 947b6fb7f8c..150cf64e246 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -74,6 +74,10 @@ use std::cell::RefCell; use std::task::Waker; use std::time::Duration; +cfg_taskdump! { + use std::sync::Barrier; +} + /// A scheduler worker pub(super) struct Worker { /// Reference to scheduler's handle @@ -112,6 +116,9 @@ struct Core { /// True if the scheduler is being shutdown is_shutdown: bool, + /// True if the scheduler is being traced + is_traced: bool, + /// Parker /// /// Stored in an `Option` as the parker is added / removed to make the @@ -137,7 +144,7 @@ pub(crate) struct Shared { /// Global task queue used for: /// 1. Submit work to the scheduler while **not** currently on a worker thread. /// 2. Submit work to the scheduler when a worker run queue is saturated - inject: inject::Shared>, + pub(super) inject: inject::Shared>, /// Coordinates idle workers idle: Idle, @@ -155,6 +162,9 @@ pub(crate) struct Shared { #[allow(clippy::vec_box)] // we're moving an already-boxed value shutdown_cores: Mutex>>, + /// The number of cores that have observed the trace signal. + pub(super) trace_status: TraceStatus, + /// Scheduler configuration options config: Config, @@ -171,18 +181,18 @@ pub(crate) struct Shared { } /// Data synchronized by the scheduler mutex -pub(super) struct Synced { +pub(crate) struct Synced { /// Synchronized state for `Idle`. pub(super) idle: idle::Synced, /// Synchronized state for `Inject`. - inject: inject::Synced, + pub(crate) inject: inject::Synced, } /// Used to communicate with a worker from other threads. struct Remote { /// Steals tasks from this worker. - steal: queue::Steal>, + pub(super) steal: queue::Steal>, /// Unparks the associated worker thread unpark: Unparker, @@ -204,6 +214,82 @@ pub(crate) struct Context { /// Starts the workers pub(crate) struct Launch(Vec>); +cfg_not_taskdump! { + pub(super) struct TraceStatus {} + + impl TraceStatus { + fn new(_: usize) -> Self { + Self {} + } + + fn trace_requested(&self) -> bool { + false + } + } +} + +cfg_taskdump! { + use crate::sync::notify::Notify; + use crate::runtime::dump::Dump; + use crate::loom::sync::atomic::{AtomicBool, Ordering}; + + /// Tracing status of the worker. + pub(super) struct TraceStatus { + pub(super) trace_requested: AtomicBool, + trace_start: Barrier, + trace_end: Barrier, + pub(super) result_ready: Notify, + pub(super) trace_result: Mutex>, + } + + impl TraceStatus { + fn new(remotes_len: usize) -> Self { + Self { + trace_requested: AtomicBool::new(false), + trace_start: Barrier::new(remotes_len), + trace_end: Barrier::new(remotes_len), + result_ready: Notify::new(), + trace_result: Mutex::new(None), + } + } + + fn trace_requested(&self) -> bool { + self.trace_requested.load(Ordering::Relaxed) + } + + pub(super) async fn start_trace_request(&self, handle: &Handle) { + while self.trace_requested.compare_exchange(false, + true, + Ordering::Acquire, + Ordering::Relaxed).is_err() + { + handle.notify_all(); + crate::task::yield_now().await; + } + } + + fn stash_result(&self, dump: Dump) { + let _ = self.trace_result.lock().insert(dump); + self.result_ready.notify_one(); + } + + pub(super) fn take_result(&self) -> Option { + self.trace_result.lock().take() + } + + pub(super) async fn end_trace_request(&self, handle: &Handle) { + while self.trace_requested.compare_exchange(true, + false, + Ordering::Acquire, + Ordering::Relaxed).is_err() + { + handle.notify_all(); + crate::task::yield_now().await; + } + } + } +} + /// Running a task may consume the core. If the core is still available when /// running the task completes, it is returned. Otherwise, the worker will need /// to stop processing. @@ -249,6 +335,7 @@ pub(super) fn create( run_queue, is_searching: false, is_shutdown: false, + is_traced: false, park: Some(park), global_queue_interval: stats.tuned_global_queue_interval(&config), stats, @@ -262,6 +349,7 @@ pub(super) fn create( let (idle, idle_synced) = Idle::new(size); let (inject, inject_synced) = inject::Shared::new(); + let remotes_len = remotes.len(); let handle = Arc::new(Handle { shared: Shared { remotes: remotes.into_boxed_slice(), @@ -273,6 +361,7 @@ pub(super) fn create( inject: inject_synced, }), shutdown_cores: Mutex::new(vec![]), + trace_status: TraceStatus::new(remotes_len), config, scheduler_metrics: SchedulerMetrics::new(), worker_metrics: worker_metrics.into_boxed_slice(), @@ -476,6 +565,10 @@ impl Context { while !core.is_shutdown { self.assert_lifo_enabled_is_correct(&core); + if core.is_traced { + core = self.worker.handle.trace_core(core); + } + // Increment the tick core.tick(); @@ -649,7 +742,7 @@ impl Context { } if core.transition_to_parked(&self.worker) { - while !core.is_shutdown { + while !core.is_shutdown && !core.is_traced { core.stats.about_to_park(); core = self.park_timeout(core, None); @@ -825,7 +918,7 @@ impl Core { /// Returns true if the transition happened, false if there is work to do first. fn transition_to_parked(&mut self, worker: &Worker) -> bool { // Workers should not park if they have work to do - if self.lifo_slot.is_some() || self.run_queue.has_tasks() { + if self.lifo_slot.is_some() || self.run_queue.has_tasks() || self.is_traced { return false; } @@ -890,6 +983,11 @@ impl Core { let synced = worker.handle.shared.synced.lock(); self.is_shutdown = worker.inject().is_closed(&synced.inject); } + + if !self.is_traced { + // Check if the worker should be tracing. + self.is_traced = worker.handle.shared.trace_status.trace_requested(); + } } /// Signals all tasks to shut down, and waits for them to complete. Must run @@ -1048,7 +1146,7 @@ impl Handle { } } - fn notify_all(&self) { + pub(super) fn notify_all(&self) { for remote in &self.shared.remotes[..] { remote.unpark.unpark(&self.driver); } @@ -1101,6 +1199,54 @@ impl Handle { } } + cfg_not_taskdump! { + fn trace_core(&self, core: Box) -> Box { + core + } + } + + cfg_taskdump! { + fn trace_core(&self, mut core: Box) -> Box { + use crate::runtime::dump; + use task::trace::trace_multi_thread; + + core.is_traced = false; + + // wait for other workers + let barrier = self.shared.trace_status.trace_start.wait(); + + if !barrier.is_leader() { + // wait for leader to finish tracing + self.shared.trace_status.trace_end.wait(); + return core; + } + + // trace + + let owned = &self.shared.owned; + let mut local = self.shared.steal_all(); + let synced = &self.shared.synced; + let injection = &self.shared.inject; + + // safety: `trace_multi_thread` is invoked with the same `synced` that `injection` + // was created with. + let traces = unsafe { trace_multi_thread(owned, &mut local, synced, injection) } + .into_iter() + .map(dump::Task::new) + .collect(); + + let result = dump::Dump::new(traces); + + // stash the result + self.shared.trace_status.stash_result(result); + + // allow other workers to proceed + self.shared.trace_status.trace_end.wait(); + + core + } + } + fn ptr_eq(&self, other: &Handle) -> bool { std::ptr::eq(self, other) } @@ -1163,6 +1309,29 @@ cfg_metrics! { } } +cfg_taskdump! { + impl Shared { + /// Steal all tasks from remotes into a single local queue. + pub(super) fn steal_all(&self) -> super::queue::Local> { + let (_steal, mut local) = super::queue::local(); + + let worker_metrics = WorkerMetrics::new(); + let mut stats = Stats::new(&worker_metrics); + + for remote in self.remotes.iter() { + let steal = &remote.steal; + while !steal.is_empty() { + if let Some(task) = steal.steal_into(&mut local, &mut stats) { + local.push_back([task].into_iter()); + } + } + } + + local + } + } +} + // `u32::abs_diff` is not available on Tokio's MSRV. fn abs_diff(a: u32, b: u32) -> u32 { if a > b { diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index 6299d75d581..543b7eee98e 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -279,3 +279,52 @@ pub(in crate::runtime) fn trace_current_thread( }) .collect() } + +cfg_rt_multi_thread! { + use crate::loom::sync::Mutex; + use crate::runtime::scheduler::multi_thread; + use crate::runtime::scheduler::multi_thread::Synced; + use crate::runtime::scheduler::inject::Shared; + + /// Trace and poll all tasks of the current_thread runtime. + /// + /// ## Safety + /// + /// Must be called with the same `synced` that `injection` was created with. + pub(in crate::runtime) unsafe fn trace_multi_thread( + owned: &OwnedTasks>, + local: &mut multi_thread::queue::Local>, + synced: &Mutex, + injection: &Shared>, + ) -> Vec { + // clear the local queue + while let Some(notified) = local.pop() { + drop(notified); + } + + // clear the injection queue + let mut synced = synced.lock(); + while let Some(notified) = injection.pop(&mut synced.inject) { + drop(notified); + } + + drop(synced); + + // notify each task + let mut traces = vec![]; + owned.for_each(|task| { + // set the notified bit + task.as_raw().state().transition_to_notified_for_tracing(); + + // trace the task + let ((), trace) = Trace::capture(|| task.as_raw().poll()); + traces.push(trace); + + // reschedule the task + let _ = task.as_raw().state().transition_to_notified_by_ref(); + task.as_raw().schedule(); + }); + + traces + } +} diff --git a/tokio/tests/dump.rs b/tokio/tests/dump.rs new file mode 100644 index 00000000000..79051d335bb --- /dev/null +++ b/tokio/tests/dump.rs @@ -0,0 +1,98 @@ +#![cfg(all( + tokio_unstable, + tokio_taskdump, + target_os = "linux", + any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") +))] + +use std::hint::black_box; +use tokio::runtime::{self, Handle}; + +#[inline(never)] +async fn a() { + black_box(b()).await +} + +#[inline(never)] +async fn b() { + black_box(c()).await +} + +#[inline(never)] +async fn c() { + loop { + black_box(tokio::task::yield_now()).await + } +} + +#[test] +fn current_thread() { + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + async fn dump() { + let handle = Handle::current(); + let dump = handle.dump().await; + + let tasks: Vec<_> = dump.tasks().iter().collect(); + + assert_eq!(tasks.len(), 3); + + for task in tasks { + let trace = task.trace().to_string(); + eprintln!("\n\n{trace}\n\n"); + assert!(trace.contains("dump::a")); + assert!(trace.contains("dump::b")); + assert!(trace.contains("dump::c")); + assert!(trace.contains("tokio::task::yield_now")); + } + } + + rt.block_on(async { + tokio::select!( + biased; + _ = tokio::spawn(a()) => {}, + _ = tokio::spawn(a()) => {}, + _ = tokio::spawn(a()) => {}, + _ = dump() => {}, + ); + }); +} + +#[test] +fn multi_thread() { + let rt = runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + async fn dump() { + let handle = Handle::current(); + let dump = handle.dump().await; + + let tasks: Vec<_> = dump.tasks().iter().collect(); + + assert_eq!(tasks.len(), 3); + + for task in tasks { + let trace = task.trace().to_string(); + eprintln!("\n\n{trace}\n\n"); + assert!(trace.contains("dump::a")); + assert!(trace.contains("dump::b")); + assert!(trace.contains("dump::c")); + assert!(trace.contains("tokio::task::yield_now")); + } + } + + rt.block_on(async { + tokio::select!( + biased; + _ = tokio::spawn(a()) => {}, + _ = tokio::spawn(a()) => {}, + _ = tokio::spawn(a()) => {}, + _ = dump() => {}, + ); + }); +} diff --git a/tokio/tests/dump_current_thread.rs b/tokio/tests/dump_current_thread.rs deleted file mode 100644 index 29661f98fb7..00000000000 --- a/tokio/tests/dump_current_thread.rs +++ /dev/null @@ -1,55 +0,0 @@ -#![cfg(all( - tokio_unstable, - tokio_taskdump, - target_os = "linux", - any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") -))] - -use std::hint::black_box; -use tokio::runtime; - -#[inline(never)] -async fn a() { - black_box(b()).await -} - -#[inline(never)] -async fn b() { - black_box(c()).await -} - -#[inline(never)] -async fn c() { - black_box(tokio::task::yield_now()).await -} - -#[test] -fn test() { - let rt = runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - - rt.spawn(a()); - - let handle = rt.handle(); - - assert_eq!(handle.dump().tasks().iter().count(), 0); - - let dump = rt.block_on(async { - handle.spawn(a()); - handle.dump() - }); - - let tasks: Vec<_> = dump.tasks().iter().collect(); - - assert_eq!(tasks.len(), 2); - - for task in tasks { - let trace = task.trace().to_string(); - assert!(trace.contains("dump_current_thread::a")); - assert!(trace.contains("dump_current_thread::b")); - assert!(trace.contains("dump_current_thread::c")); - assert!(trace.contains("tokio::task::yield_now")); - } -}