diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index e48788567e4..0bd40cd875c 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -364,10 +364,17 @@ impl Task { } cfg_taskdump! { - pub(super) fn notify_for_tracing(&self) -> Notified { - self.as_raw().state().transition_to_notified_for_tracing(); - // SAFETY: `transition_to_notified_for_tracing` increments the refcount. - unsafe { Notified(Task::new(self.raw)) } + /// Notify the task for task dumping. + /// + /// Returns `None` if the task has already been notified. + pub(super) fn notify_for_tracing(&self) -> Option> { + if self.as_raw().state().transition_to_notified_for_tracing() { + // SAFETY: `transition_to_notified_for_tracing` increments the + // refcount. + Some(unsafe { Notified(Task::new(self.raw)) }) + } else { + None + } } } } diff --git a/tokio/src/runtime/task/state.rs b/tokio/src/runtime/task/state.rs index 64cfb4b5db1..25c6b434a34 100644 --- a/tokio/src/runtime/task/state.rs +++ b/tokio/src/runtime/task/state.rs @@ -270,7 +270,11 @@ impl State { }) } - /// Transitions the state to `NOTIFIED`, unconditionally increasing the ref count. + /// Transitions the state to `NOTIFIED`, unconditionally increasing the ref + /// count. + /// + /// Returns `true` if the notified bit was transitioned from `0` to `1`; + /// otherwise `false.` #[cfg(all( tokio_unstable, tokio_taskdump, @@ -278,12 +282,16 @@ impl State { target_os = "linux", any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") ))] - pub(super) fn transition_to_notified_for_tracing(&self) { + pub(super) fn transition_to_notified_for_tracing(&self) -> bool { self.fetch_update_action(|mut snapshot| { - snapshot.set_notified(); - snapshot.ref_inc(); - ((), Some(snapshot)) - }); + if snapshot.is_notified() { + (false, None) + } else { + snapshot.set_notified(); + snapshot.ref_inc(); + (true, Some(snapshot)) + } + }) } /// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle. diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index 185d682a47c..34d40a5a253 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -272,14 +272,19 @@ pub(in crate::runtime) fn trace_current_thread( injection: &Inject>, ) -> Vec { // clear the local and injection queues - local.clear(); + + let mut dequeued = Vec::new(); + + while let Some(task) = local.pop_back() { + dequeued.push(task); + } while let Some(task) = injection.pop() { - drop(task); + dequeued.push(task); } // precondition: We have drained the tasks from the injection queue. - trace_owned(owned) + trace_owned(owned, dequeued) } cfg_rt_multi_thread! { @@ -299,22 +304,24 @@ cfg_rt_multi_thread! { synced: &Mutex, injection: &Shared>, ) -> Vec { + let mut dequeued = Vec::new(); + // clear the local queue while let Some(notified) = local.pop() { - drop(notified); + dequeued.push(notified); } // clear the injection queue let mut synced = synced.lock(); while let Some(notified) = injection.pop(&mut synced.inject) { - drop(notified); + dequeued.push(notified); } drop(synced); // precondition: we have drained the tasks from the local and injection // queues. - trace_owned(owned) + trace_owned(owned, dequeued) } } @@ -324,14 +331,20 @@ cfg_rt_multi_thread! { /// /// This helper presumes exclusive access to each task. The tasks must not exist /// in any other queue. -fn trace_owned(owned: &OwnedTasks) -> Vec { - // notify each task - let mut tasks = vec![]; +fn trace_owned(owned: &OwnedTasks, dequeued: Vec>) -> Vec { + let mut tasks = dequeued; + // Notify and trace all un-notified tasks. The dequeued tasks are already + // notified and so do not need to be re-notified. owned.for_each(|task| { - // notify the task (and thus make it poll-able) and stash it - tasks.push(task.notify_for_tracing()); - // we do not poll it here since we hold a lock on `owned` and the task - // may complete and need to remove itself from `owned`. + // Notify the task (and thus make it poll-able) and stash it. This fails + // if the task is already notified. In these cases, we skip tracing the + // task. + if let Some(notified) = task.notify_for_tracing() { + tasks.push(notified); + } + // We do not poll tasks here, since we hold a lock on `owned` and the + // task may complete and need to remove itself from `owned`. Polling + // such a task here would result in a deadlock. }); tasks diff --git a/tokio/tests/dump.rs b/tokio/tests/dump.rs index 4da0c9e8e18..ecb4495b33e 100644 --- a/tokio/tests/dump.rs +++ b/tokio/tests/dump.rs @@ -147,6 +147,7 @@ mod future_completes_during_trace { async fn dump() { let handle = Handle::current(); let _dump = handle.dump().await; + tokio::task::yield_now().await; } rt.block_on(async { @@ -154,3 +155,43 @@ mod future_completes_during_trace { }); } } + +/// Regression test for #6051. +/// +/// This test ensures that tasks notified outside of a worker will not be +/// traced, since doing so will un-set their notified bit prior to them being +/// run and panic. +#[test] +fn notified_during_tracing() { + let rt = runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(3) + .build() + .unwrap(); + + let timeout = async { + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + }; + + let timer = rt.spawn(async { + loop { + tokio::time::sleep(tokio::time::Duration::from_nanos(1)).await; + } + }); + + let dump = async { + loop { + let handle = Handle::current(); + let _dump = handle.dump().await; + } + }; + + rt.block_on(async { + tokio::select!( + biased; + _ = timeout => {}, + _ = timer => {}, + _ = dump => {}, + ); + }); +}