Skip to content

Commit

Permalink
tokio: introduce Handle::dump and impl for current-thread runtime
Browse files Browse the repository at this point in the history
Task dumps are snapshots of runtime state. Taskdumps are collected by
instrumenting Tokio's leaves to conditionally collect backtraces, which
are then coalesced per-task into execution tree traces.

This initial implementation only supports collecting taskdumps from
within the context of a current-thread runtime, and only `yield_now()`
is instrumented.
  • Loading branch information
jswrenn committed Apr 7, 2023
1 parent 3c403d6 commit b9caa5f
Show file tree
Hide file tree
Showing 20 changed files with 751 additions and 7 deletions.
6 changes: 5 additions & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2018"
# If you copy one of the examples into a new project, you should be using
# [dependencies] instead, and delete the **path**.
[dev-dependencies]
tokio = { version = "1.0.0", path = "../tokio", features = ["full", "tracing"] }
tokio = { version = "1.0.0", path = "../tokio", features = ["full", "tracing", "taskdump"] }
tokio-util = { version = "0.7.0", path = "../tokio-util", features = ["full"] }
tokio-stream = { version = "0.1", path = "../tokio-stream" }

Expand Down Expand Up @@ -90,3 +90,7 @@ path = "named-pipe-ready.rs"
[[example]]
name = "named-pipe-multi-client"
path = "named-pipe-multi-client.rs"

[[example]]
name = "dump"
path = "dump.rs"
34 changes: 34 additions & 0 deletions examples/dump.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//! This example demonstrates tokio's experimental taskdumping functionality.
use std::hint::black_box;

#[inline(never)]
async fn a() {
black_box(b()).await
}

#[inline(never)]
async fn b() {
black_box(c()).await
}

#[inline(never)]
async fn c() {
black_box(tokio::task::yield_now()).await
}

#[tokio::main(flavor = "current_thread")]
async fn main() {
tokio::spawn(a());
tokio::spawn(b());
tokio::spawn(c());

let handle = tokio::runtime::Handle::current();
let dump = handle.dump();

for (i, task) in dump.tasks().iter().enumerate() {
let trace = task.trace();
println!("task {i} trace:");
println!("{trace}");
}
}
2 changes: 2 additions & 0 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ signal = [
"windows-sys/Win32_System_Console",
]
sync = []
taskdump = ["backtrace"]
test-util = ["rt", "sync", "time"]
time = []

Expand Down Expand Up @@ -114,6 +115,7 @@ socket2 = { version = "0.4.9", optional = true, features = [ "all" ] }
# Requires `--cfg tokio_unstable` to enable.
[target.'cfg(tokio_unstable)'.dependencies]
tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } # Not in full
backtrace = { version = "0.3.0", optional = true }

[target.'cfg(unix)'.dependencies]
libc = { version = "0.2.42", optional = true }
Expand Down
10 changes: 10 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,16 @@ macro_rules! cfg_not_rt_multi_thread {
}
}

macro_rules! cfg_taskdump {
($($item:item)*) => {
$(
#[cfg(all(tokio_unstable, feature = "taskdump"))]
#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "taskdump"))))]
$item
)*
};
}

macro_rules! cfg_test_util {
($($item:item)*) => {
$(
Expand Down
18 changes: 18 additions & 0 deletions tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ cfg_rt! {
use std::time::Duration;
}

cfg_taskdump! {
use crate::runtime::task::trace;
}

struct Context {
/// Uniquely identifies the current thread
#[cfg(feature = "rt")]
Expand Down Expand Up @@ -45,6 +49,9 @@ struct Context {
/// Tracks the amount of "work" a task may still do before yielding back to
/// the sheduler
budget: Cell<coop::Budget>,

#[cfg(all(tokio_unstable, feature = "taskdump"))]
trace: trace::Context,
}

tokio_thread_local! {
Expand Down Expand Up @@ -75,6 +82,9 @@ tokio_thread_local! {
rng: FastRand::new(RngSeed::new()),

budget: Cell::new(coop::Budget::unconstrained()),

#[cfg(all(tokio_unstable, feature = "taskdump"))]
trace: trace::Context::new(),
}
}
}
Expand Down Expand Up @@ -380,6 +390,14 @@ cfg_rt! {
}
}

cfg_taskdump! {
/// SAFETY: Callers of this function must ensure that trace frames always
/// form a valid linked list.
pub(crate) unsafe fn with_trace<R>(f: impl FnOnce(&trace::Context) -> R) -> R {
CONTEXT.with(|c| f(&c.trace))
}
}

// Forces the current "entered" state to be cleared while the closure
// is executed.
//
Expand Down
69 changes: 69 additions & 0 deletions tokio/src/runtime/dump.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//! Snapshots of runtime state.
use std::fmt;

/// A snapshot of a runtime's state.
#[derive(Debug)]

pub struct Dump {
tasks: Tasks,
}

/// Snapshots of tasks.
#[derive(Debug)]

pub struct Tasks {
tasks: Vec<Task>,
}

/// A snapshot of a task.
#[derive(Debug)]

pub struct Task {
trace: Trace,
}

/// An execution trace of a task's last poll.
#[derive(Debug)]
pub struct Trace {
inner: super::task::trace::Trace,
}

impl Dump {
pub(crate) fn new(tasks: Vec<Task>) -> Self {
Self {
tasks: Tasks { tasks },
}
}

/// Tasks in this snapshot.
pub fn tasks(&self) -> &Tasks {
&self.tasks
}
}

impl Tasks {
/// Iterate over tasks.
pub fn iter(&self) -> impl Iterator<Item = &Task> {
self.tasks.iter()
}
}

impl Task {
pub(crate) fn new(trace: super::task::trace::Trace) -> Self {
Self {
trace: Trace { inner: trace },
}
}

/// A trace of this task's state.
pub fn trace(&self) -> &Trace {
&self.trace
}
}

impl fmt::Display for Trace {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
}
}
16 changes: 16 additions & 0 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ impl Handle {
F::Output: Send + 'static,
{
let id = crate::runtime::task::Id::next();
#[cfg(all(tokio_unstable, feature = "taskdump"))]
let future = super::task::trace::Trace::root(future);
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "task", _name, id.as_u64());
self.inner.spawn(future, id)
Expand Down Expand Up @@ -321,6 +323,20 @@ cfg_metrics! {
}
}

cfg_taskdump! {
impl Handle {
/// Capture a snapshot of this runtime's state.
pub fn dump(&self) -> crate::runtime::Dump {
match &self.inner {
scheduler::Handle::CurrentThread(handle) => handle.dump(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(_) =>
unimplemented!("taskdumps are unsupported on the multi-thread runtime"),
}
}
}
}

/// Error returned by `try_current` when no Runtime has been started
#[derive(Debug)]
pub struct TryCurrentError {
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ cfg_rt! {
mod defer;
pub(crate) use defer::Defer;

cfg_taskdump! {
pub mod dump;
pub use dump::Dump;
}

mod handle;
pub use handle::{EnterGuard, Handle, TryCurrentError};

Expand Down
40 changes: 40 additions & 0 deletions tokio/src/runtime/scheduler/current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,46 @@ impl Handle {
handle
}

/// Capture a snapshot of this runtime's state.
#[cfg(all(tokio_unstable, feature = "taskdump"))]
pub(crate) fn dump(&self) -> crate::runtime::Dump {
use crate::runtime::{dump, task::trace::Trace};

let mut snapshots = vec![];

// todo: how to make this work outside of a runtime context?
CURRENT.with(|maybe_context| {
// drain the local queue
let Some(context) = maybe_context else { return };
let mut maybe_core = context.core.borrow_mut();
let Some(core) = maybe_core.as_mut() else { return };
let local = &mut core.tasks;
let _ = local.drain(..);

// drain the injection queue
if let Some(injection) = self.shared.queue.lock().as_mut() {
let _ = injection.drain(..);
}

// notify each task
let mut tasks = vec![];
self.shared.owned.for_each(|task| {
// set the notified bit
let _ = task.as_raw().state().transition_to_notified_for_tracing();
// store the raw tasks into a vec
tasks.push(task.as_raw());
});

// trace each task
for task in tasks {
let ((), trace) = Trace::capture(|| task.poll());
snapshots.push(dump::Task::new(trace));
}
});

dump::Dump::new(snapshots)
}

fn pop(&self) -> Option<task::Notified<Arc<Handle>>> {
match self.shared.queue.lock().as_mut() {
Some(queue) => queue.pop_front(),
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/runtime/task/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,18 @@ impl<S: 'static> OwnedTasks<S> {
}
}

cfg_taskdump! {
impl<S: 'static> OwnedTasks<S> {
/// Locks the tasks, and calls `f` on an iterator over them.
pub(crate) fn for_each<F>(&self, f: F)
where
F: FnMut(&Task<S>)
{
self.inner.lock().list.for_each(f)
}
}
}

impl<S: 'static> LocalOwnedTasks<S> {
pub(crate) fn new() -> Self {
Self {
Expand Down
9 changes: 9 additions & 0 deletions tokio/src/runtime/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ use self::state::State;

mod waker;

cfg_taskdump! {
pub(crate) mod trace;
}

use crate::future::Future;
use crate::util::linked_list;

Expand Down Expand Up @@ -340,6 +344,11 @@ impl<S: 'static> Task<S> {
}
}

#[cfg(all(tokio_unstable, feature = "taskdump"))]
pub(crate) fn as_raw(&self) -> RawTask {
self.raw
}

fn header(&self) -> &Header {
self.raw.header()
}
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/runtime/task/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::ptr::NonNull;
use std::task::{Poll, Waker};

/// Raw task handle
pub(super) struct RawTask {
pub(crate) struct RawTask {
ptr: NonNull<Header>,
}

Expand Down Expand Up @@ -190,12 +190,12 @@ impl RawTask {
}

/// Returns a reference to the task's state.
pub(super) fn state(&self) -> &State {
pub(crate) fn state(&self) -> &State {
&self.header().state
}

/// Safety: mutual exclusion is required to call this function.
pub(super) fn poll(self) {
pub(crate) fn poll(self) {
let vtable = self.header().vtable;
unsafe { (vtable.poll)(self.ptr) }
}
Expand Down
14 changes: 12 additions & 2 deletions tokio/src/runtime/task/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::fmt;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::usize;

pub(super) struct State {
pub(crate) struct State {
val: AtomicUsize,
}

Expand Down Expand Up @@ -88,7 +88,7 @@ pub(super) enum TransitionToNotifiedByVal {
}

#[must_use]
pub(super) enum TransitionToNotifiedByRef {
pub(crate) enum TransitionToNotifiedByRef {
DoNothing,
Submit,
}
Expand Down Expand Up @@ -270,6 +270,16 @@ impl State {
})
}

/// Transitions the state to `NOTIFIED`, unconditionally increasing the ref count.
#[cfg(all(tokio_unstable, feature = "taskdump"))]
pub(crate) fn transition_to_notified_for_tracing(&self) {
self.fetch_update_action(|mut snapshot| {
snapshot.set_notified();
snapshot.ref_inc();
((), Some(snapshot))
});
}

/// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle.
///
/// Returns `true` if the task needs to be submitted to the pool for
Expand Down
Loading

0 comments on commit b9caa5f

Please sign in to comment.