Skip to content

Commit

Permalink
taskdump: skip notified tasks during taskdumps (#6194)
Browse files Browse the repository at this point in the history
Fixes #6051
  • Loading branch information
jswrenn authored Dec 8, 2023
1 parent 3a4aef1 commit d561b58
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 23 deletions.
15 changes: 11 additions & 4 deletions tokio/src/runtime/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,17 @@ impl<S: 'static> Task<S> {
}

cfg_taskdump! {
pub(super) fn notify_for_tracing(&self) -> Notified<S> {
self.as_raw().state().transition_to_notified_for_tracing();
// SAFETY: `transition_to_notified_for_tracing` increments the refcount.
unsafe { Notified(Task::new(self.raw)) }
/// Notify the task for task dumping.
///
/// Returns `None` if the task has already been notified.
pub(super) fn notify_for_tracing(&self) -> Option<Notified<S>> {
if self.as_raw().state().transition_to_notified_for_tracing() {
// SAFETY: `transition_to_notified_for_tracing` increments the
// refcount.
Some(unsafe { Notified(Task::new(self.raw)) })
} else {
None
}
}
}
}
Expand Down
20 changes: 14 additions & 6 deletions tokio/src/runtime/task/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,20 +270,28 @@ impl State {
})
}

/// Transitions the state to `NOTIFIED`, unconditionally increasing the ref count.
/// Transitions the state to `NOTIFIED`, unconditionally increasing the ref
/// count.
///
/// Returns `true` if the notified bit was transitioned from `0` to `1`;
/// otherwise `false.`
#[cfg(all(
tokio_unstable,
tokio_taskdump,
feature = "rt",
target_os = "linux",
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
))]
pub(super) fn transition_to_notified_for_tracing(&self) {
pub(super) fn transition_to_notified_for_tracing(&self) -> bool {
self.fetch_update_action(|mut snapshot| {
snapshot.set_notified();
snapshot.ref_inc();
((), Some(snapshot))
});
if snapshot.is_notified() {
(false, None)
} else {
snapshot.set_notified();
snapshot.ref_inc();
(true, Some(snapshot))
}
})
}

/// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle.
Expand Down
39 changes: 26 additions & 13 deletions tokio/src/runtime/task/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,19 @@ pub(in crate::runtime) fn trace_current_thread(
injection: &Inject<Arc<current_thread::Handle>>,
) -> Vec<Trace> {
// clear the local and injection queues
local.clear();

let mut dequeued = Vec::new();

while let Some(task) = local.pop_back() {
dequeued.push(task);
}

while let Some(task) = injection.pop() {
drop(task);
dequeued.push(task);
}

// precondition: We have drained the tasks from the injection queue.
trace_owned(owned)
trace_owned(owned, dequeued)
}

cfg_rt_multi_thread! {
Expand All @@ -299,22 +304,24 @@ cfg_rt_multi_thread! {
synced: &Mutex<Synced>,
injection: &Shared<Arc<multi_thread::Handle>>,
) -> Vec<Trace> {
let mut dequeued = Vec::new();

// clear the local queue
while let Some(notified) = local.pop() {
drop(notified);
dequeued.push(notified);
}

// clear the injection queue
let mut synced = synced.lock();
while let Some(notified) = injection.pop(&mut synced.inject) {
drop(notified);
dequeued.push(notified);
}

drop(synced);

// precondition: we have drained the tasks from the local and injection
// queues.
trace_owned(owned)
trace_owned(owned, dequeued)
}
}

Expand All @@ -324,14 +331,20 @@ cfg_rt_multi_thread! {
///
/// This helper presumes exclusive access to each task. The tasks must not exist
/// in any other queue.
fn trace_owned<S: Schedule>(owned: &OwnedTasks<S>) -> Vec<Trace> {
// notify each task
let mut tasks = vec![];
fn trace_owned<S: Schedule>(owned: &OwnedTasks<S>, dequeued: Vec<Notified<S>>) -> Vec<Trace> {
let mut tasks = dequeued;
// Notify and trace all un-notified tasks. The dequeued tasks are already
// notified and so do not need to be re-notified.
owned.for_each(|task| {
// notify the task (and thus make it poll-able) and stash it
tasks.push(task.notify_for_tracing());
// we do not poll it here since we hold a lock on `owned` and the task
// may complete and need to remove itself from `owned`.
// Notify the task (and thus make it poll-able) and stash it. This fails
// if the task is already notified. In these cases, we skip tracing the
// task.
if let Some(notified) = task.notify_for_tracing() {
tasks.push(notified);
}
// We do not poll tasks here, since we hold a lock on `owned` and the
// task may complete and need to remove itself from `owned`. Polling
// such a task here would result in a deadlock.
});

tasks
Expand Down
41 changes: 41 additions & 0 deletions tokio/tests/dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,51 @@ mod future_completes_during_trace {
async fn dump() {
let handle = Handle::current();
let _dump = handle.dump().await;
tokio::task::yield_now().await;
}

rt.block_on(async {
let _ = tokio::join!(tokio::spawn(complete_during_trace()), dump());
});
}
}

/// Regression test for #6051.
///
/// This test ensures that tasks notified outside of a worker will not be
/// traced, since doing so will un-set their notified bit prior to them being
/// run and panic.
#[test]
fn notified_during_tracing() {
let rt = runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(3)
.build()
.unwrap();

let timeout = async {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
};

let timer = rt.spawn(async {
loop {
tokio::time::sleep(tokio::time::Duration::from_nanos(1)).await;
}
});

let dump = async {
loop {
let handle = Handle::current();
let _dump = handle.dump().await;
}
};

rt.block_on(async {
tokio::select!(
biased;
_ = timeout => {},
_ = timer => {},
_ = dump => {},
);
});
}

0 comments on commit d561b58

Please sign in to comment.