Skip to content

Commit

Permalink
time: fix panic on multithreaded runtime shutdown
Browse files Browse the repository at this point in the history
All core threads share the same park, and invoke park() followed by shutdown -
in parallel.  This is quite possibly a bug in the worker (in that it invokes
shutdown when it's not done with the parker), but for now work around it by
bypassing the timer parker once shutdown is called.

Fixes: tokio-rs#2789
  • Loading branch information
Bryan Donlan committed Dec 7, 2020
1 parent 0707f4c commit 74bd064
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 7 deletions.
12 changes: 6 additions & 6 deletions tokio/src/time/driver/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ pub(crate) struct Handle {
inner: Arc<Mutex<super::Inner>>,
}

impl std::fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Handle({:?})", &*self.inner as *const _)
}
}

impl Handle {
/// Creates a new timer `Handle` from a shared `Inner` timer state.
pub(super) fn new(inner: Arc<Mutex<super::Inner>>) -> Self {
Expand Down Expand Up @@ -76,9 +82,3 @@ cfg_not_rt! {
}
}
}

impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Handle")
}
}
24 changes: 23 additions & 1 deletion tokio/src/time/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,23 @@ where

let mut lock = self.inner.lock();

if lock.is_shutdown {
drop(lock);

// In the MT runtime, we can have multiple threads using the same
// park. These threads both poll and shutdown the parker, and so we
// can end up in park_internal after shutdown. This is probably not
// correct behavior from the worker, but for now, deal with this by
// short-circuiting the timer logic and delegating to the park
// backend.

if let Some(limit) = limit {
return self.park.park_timeout(limit);
} else {
return self.park.park();
}
}

let next_wake = lock.wheel.next_expiration_time();
lock.next_wake =
next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
Expand Down Expand Up @@ -255,7 +272,12 @@ impl Handle {

let mut lock = self.lock();

assert!(now >= lock.elapsed);
assert!(
now >= lock.elapsed,
"Attempted to process at a time earlier than last process: {:016X} < {:016X}",
now,
lock.elapsed
);

while let Some(entry) = lock.wheel.poll(now) {
debug_assert!(unsafe { entry.is_pending() });
Expand Down
14 changes: 14 additions & 0 deletions tokio/tests/time_sleep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,17 @@ async fn no_out_of_bounds_close_to_max() {
fn ms(n: u64) -> Duration {
Duration::from_millis(n)
}

#[tokio::test(flavor = "multi_thread")]
async fn hang_on_shutdown() {
let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>();
tokio::spawn(async move {
tokio::task::block_in_place(|| sync_rx.recv().ok());
});

tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
drop(sync_tx);
});
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}

0 comments on commit 74bd064

Please sign in to comment.