From 72bcf9499a82fe0dccb438c0c4f9922babd9fb0d Mon Sep 17 00:00:00 2001 From: Bryan Donlan Date: Mon, 7 Dec 2020 22:32:44 +0000 Subject: [PATCH] rt: fix deadlock in shutdown Previously, the runtime shutdown logic would first hand control over all cores to a single thread, which would sequentially shut down all tasks on the core and then wait for them to complete. This could deadlock when one task is waiting for a later core's task to complete. For example, in the newly added test, we have a `block_in_place` task that is waiting for another task to be dropped. If the latter task adds its core to the shutdown list later than the former, we end up waiting forever for the `block_in_place` task to complete. Additionally, there also was a bug wherein we'd attempt to park on the parker after shutting it down that was fixed as part of the refactors above. This change restructures the code to bring all tasks to a halt (and do any parking needed) before we collapse to a single thread to avoid this deadlock. There was also an issue in which cancelled tasks would not unpark the originating thread, due to what appears to be some sort of optimization gone wrong. This has been fixed to be much more conservative in selecting when not to unpark the source thread (this may be too conservative; please take a look at the changes to `release()`). Fixes: #2789 --- tokio/src/runtime/thread_pool/worker.rs | 60 ++++++++++++++----------- tokio/src/time/driver/mod.rs | 2 + tokio/tests/time_sleep.rs | 13 ++++++ 3 files changed, 50 insertions(+), 25 deletions(-) diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index 400ddee3232..49a5f0b098c 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -78,11 +78,11 @@ pub(super) struct Shared { /// Coordinates idle workers idle: Idle, - /// Workers have have observed the shutdown signal + /// Cores that have observed the shutdown signal /// /// The core is **not** placed back in the worker to avoid it from being /// stolen by a thread that was spawned as part of `block_in_place`. - shutdown_workers: Mutex, Arc)>>, + shutdown_cores: Mutex>>, } /// Used to communicate with a worker from other threads. @@ -157,7 +157,7 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc, Launch) { remotes: remotes.into_boxed_slice(), inject: queue::Inject::new(), idle: Idle::new(size), - shutdown_workers: Mutex::new(vec![]), + shutdown_cores: Mutex::new(vec![]), }); let mut launch = Launch(vec![]); @@ -328,8 +328,10 @@ impl Context { } } + core.pre_shutdown(&self.worker); + // Signal shutdown - self.worker.shared.shutdown(core, self.worker.clone()); + self.worker.shared.shutdown(core); Err(()) } @@ -546,11 +548,9 @@ impl Core { } } - // Shutdown the core - fn shutdown(&mut self, worker: &Worker) { - // Take the core - let mut park = self.park.take().expect("park missing"); - + // Signals all tasks to shut down, and waits for them to complete. Must run + // before we enter the single-threaded phase of shutdown processing. + fn pre_shutdown(&mut self, worker: &Worker) { // Signal to all tasks to shut down. for header in self.tasks.iter() { header.shutdown(); @@ -564,8 +564,17 @@ impl Core { } // Wait until signalled + let park = self.park.as_mut().expect("park missing"); park.park().expect("park failed"); } + } + + // Shutdown the core + fn shutdown(&mut self) { + assert!(self.tasks.is_empty()); + + // Take the core + let mut park = self.park.take().expect("park missing"); // Drain the queue while self.next_local_task().is_some() {} @@ -630,18 +639,23 @@ impl task::Schedule for Arc { use std::ptr::NonNull; enum Immediate { + // Task has been synchronously removed from the Core owned by the + // current thread Removed(Option), - Core(bool), + // Task is owned by another thread, so we need to notify it to clean + // up the task later. + MaybeRemote, } let immediate = CURRENT.with(|maybe_cx| { let cx = match maybe_cx { Some(cx) => cx, - None => return Immediate::Core(false), + None => return Immediate::MaybeRemote, }; if !self.eq(&cx.worker) { - return Immediate::Core(cx.core.borrow().is_some()); + // Task owned by another core, so we need to notify it. + return Immediate::MaybeRemote; } let mut maybe_core = cx.core.borrow_mut(); @@ -656,15 +670,15 @@ impl task::Schedule for Arc { } } - Immediate::Core(false) + Immediate::MaybeRemote }); // Checks if we were called from within a worker, allowing for immediate // removal of a scheduled task. Else we have to go through the slower // process below where we remotely mark a task as dropped. - let worker_has_core = match immediate { + match immediate { Immediate::Removed(task) => return task, - Immediate::Core(worker_has_core) => worker_has_core, + Immediate::MaybeRemote => (), }; // Track the task to be released by the worker that owns it @@ -682,10 +696,6 @@ impl task::Schedule for Arc { self.remote().pending_drop.push(task); - if worker_has_core { - return None; - } - // The worker core has been handed off to another thread. In the // event that the scheduler is currently shutting down, the thread // that owns the task may be waiting on the release to complete @@ -799,16 +809,16 @@ impl Shared { /// its core back into its handle. /// /// If all workers have reached this point, the final cleanup is performed. - fn shutdown(&self, core: Box, worker: Arc) { - let mut workers = self.shutdown_workers.lock(); - workers.push((core, worker)); + fn shutdown(&self, core: Box) { + let mut cores = self.shutdown_cores.lock(); + cores.push(core); - if workers.len() != self.remotes.len() { + if cores.len() != self.remotes.len() { return; } - for (mut core, worker) in workers.drain(..) { - core.shutdown(&worker); + for mut core in cores.drain(..) { + core.shutdown(); } // Drain the injection queue diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index 917078efbcc..9fbc0b3cf96 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -189,6 +189,8 @@ where let mut lock = self.inner.lock(); + assert!(!lock.is_shutdown); + let next_wake = lock.wheel.next_expiration_time(); lock.next_wake = next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); diff --git a/tokio/tests/time_sleep.rs b/tokio/tests/time_sleep.rs index d110ec27a8d..55e24e0a9ca 100644 --- a/tokio/tests/time_sleep.rs +++ b/tokio/tests/time_sleep.rs @@ -308,3 +308,16 @@ async fn no_out_of_bounds_close_to_max() { fn ms(n: u64) -> Duration { Duration::from_millis(n) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn hang_on_shutdown() { + let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>(); + tokio::spawn(async move { + tokio::task::block_in_place(|| sync_rx.recv().ok()); + }); + + tokio::spawn(async { + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + drop(sync_tx); + }); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; +}