From 3bf1bc226571869e4a5c357d4f6e40ad0a28f3ff Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 15 Dec 2022 10:48:57 +0100 Subject: [PATCH] feat!: Detect possible deadlocks when instantiating a parallel iterator. Deadlocks can happen if the producer for results doesn't start as there is no free thread on the rayon pool, and the only way for it to become free is if the iterator produces results. We now offer a `busy_timeout` in the relevant variants of the `Parallelism` enumeration to allow controlling how long we will wait until we abort with an error. --- examples/crash.rs | 34 ---------------------------------- src/core/dir_entry_iter.rs | 37 +++++++++++++++++++++++++++++-------- src/core/read_dir_iter.rs | 24 ++++++++++++++++++++---- src/lib.rs | 38 ++++++++++++++++++++++++++++++-------- tests/detect_deadlock.rs | 26 ++++++++++++++++++++++++++ tests/integration.rs | 8 +++++++- 6 files changed, 112 insertions(+), 55 deletions(-) delete mode 100644 examples/crash.rs create mode 100644 tests/detect_deadlock.rs diff --git a/examples/crash.rs b/examples/crash.rs deleted file mode 100644 index f4684e2..0000000 --- a/examples/crash.rs +++ /dev/null @@ -1,34 +0,0 @@ -extern crate jwalk; - -use jwalk::{WalkDir, Parallelism}; -use rayon::prelude::*; - -fn main() { - let rounds = vec![0, 1]; - - rayon::ThreadPoolBuilder::new() - .num_threads(1) - .build_global() - .expect("Failed to initialize worker thread pool"); - - let jwalk_pool = std::sync::Arc::new( - rayon::ThreadPoolBuilder::new() - .num_threads(1) - .build() - .unwrap(), - ); - - // Does finish if jwalk uses own pool with 1 thread - rounds.par_iter().for_each(|round| { - eprintln!("Round {round}…"); - for _entry in WalkDir::new(".").parallelism(Parallelism::RayonExistingPool(jwalk_pool.clone())) {} - eprintln!("Round {round} completed"); - }); - - // Does not finish if jwalk uses shared pool with 1 thread - rounds.par_iter().for_each(|round| { - eprintln!("Round {round}…"); - for _entry in WalkDir::new(".") {} - eprintln!("Round {round} completed"); - }); -} \ No newline at end of file diff --git a/src/core/dir_entry_iter.rs b/src/core/dir_entry_iter.rs index 5a3cedd..b948e3c 100644 --- a/src/core/dir_entry_iter.rs +++ b/src/core/dir_entry_iter.rs @@ -9,7 +9,7 @@ use crate::Result; pub struct DirEntryIter { min_depth: usize, // iterator yielding next ReadDir results when needed - read_dir_iter: Peekable>, + read_dir_iter: Option>>, // stack of ReadDir results, track location in filesystem traversal read_dir_results_stack: Vec>>>, } @@ -34,28 +34,33 @@ impl DirEntryIter { .collect(); // 2. Init new read_dir_iter from those specs - let read_dir_iter = ReadDirIter::new(read_dir_specs, parallelism, core_read_dir_callback); + let read_dir_iter = + ReadDirIter::try_new(read_dir_specs, parallelism, core_read_dir_callback) + .map(|iter| iter.peekable()); // 3. Return DirEntryIter that will return initial root entries and then // fill and process read_dir_iter until complete DirEntryIter { min_depth, - read_dir_iter: read_dir_iter.peekable(), + read_dir_iter, read_dir_results_stack: vec![root_entry_results.into_iter()], } + .into() } - fn push_next_read_dir_results(&mut self) -> Result<()> { + fn push_next_read_dir_results( + iter: &mut Peekable>, + results: &mut Vec>>>, + ) -> Result<()> { // Push next read dir results or return error if read failed - let read_dir_result = self.read_dir_iter.next().unwrap(); + let read_dir_result = iter.next().unwrap(); let read_dir = match read_dir_result { Ok(read_dir) => read_dir, Err(err) => return Err(err), }; let ReadDir { results_list, .. } = read_dir; - - self.read_dir_results_stack.push(results_list.into_iter()); + results.push(results_list.into_iter()); Ok(()) } @@ -82,7 +87,23 @@ impl Iterator for DirEntryIter { // 2.2 If dir_entry has a read_children_path means we need to read a new // directory and push those results onto read_dir_results_stack if dir_entry.read_children_path.is_some() { - if let Err(err) = self.push_next_read_dir_results() { + let iter = match self.read_dir_iter + .as_mut() + .ok_or_else(|| { + Error::from_io( + 0, + std::io::Error::new( + std::io::ErrorKind::Other, + "rayon thread-pool too busy or dependency loop detected - aborting before possibility of deadlock", + ), + ) + }) { + Ok(iter) => iter, + Err(err) => return Some(Err(err)), + }; + if let Err(err) = + Self::push_next_read_dir_results(iter, &mut self.read_dir_results_stack) + { dir_entry.read_children_error = Some(err); } } diff --git a/src/core/read_dir_iter.rs b/src/core/read_dir_iter.rs index 5cfd434..8ae4d91 100644 --- a/src/core/read_dir_iter.rs +++ b/src/core/read_dir_iter.rs @@ -23,11 +23,11 @@ pub enum ReadDirIter { } impl ReadDirIter { - pub(crate) fn new( + pub(crate) fn try_new( read_dir_specs: Vec>, parallelism: Parallelism, core_read_dir_callback: Arc>, - ) -> Self { + ) -> Option { if let Parallelism::Serial = parallelism { ReadDirIter::Walk { read_dir_spec_stack: read_dir_specs, @@ -53,7 +53,20 @@ impl ReadDirIter { core_read_dir_callback, }; - parallelism.install(move || { + let (startup_tx, startup_rx) = parallelism + .timeout() + .map(|duration| { + let (tx, rx) = crossbeam::channel::unbounded(); + (Some(tx), Some((rx, duration))) + }) + .unwrap_or((None, None)); + parallelism.spawn(move || { + if let Some(tx) = startup_tx { + if tx.send(()).is_err() { + // rayon didn't install this function in time so the listener exited. Do the same. + return; + } + } read_dir_spec_iter.par_bridge().for_each_with( run_context, |run_context, ordered_read_dir_spec| { @@ -61,11 +74,14 @@ impl ReadDirIter { }, ); }); - + if startup_rx.map_or(false, |(rx, duration)| rx.recv_timeout(duration).is_err()) { + return None; + } ReadDirIter::ParWalk { read_dir_result_iter, } } + .into() } } diff --git a/src/lib.rs b/src/lib.rs index f421e6e..697a7f5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -174,12 +174,24 @@ type ProcessReadDirFunction = dyn Fn(Option, &Path, &mut ), + RayonExistingPool { + /// The pool to spawn our work onto. + pool: Arc, + /// Similar to [`Parallelism::RayonDefaultPool::busy_timeout`]. + busy_timeout: std::time::Duration, + }, /// Run in new rayon thread pool with # threads RayonNewPool(usize), } @@ -209,7 +221,9 @@ impl WalkDirGeneric { max_depth: ::std::usize::MAX, skip_hidden: true, follow_links: false, - parallelism: Parallelism::RayonDefaultPool, + parallelism: Parallelism::RayonDefaultPool { + busy_timeout: std::time::Duration::from_secs(1), + }, root_read_dir_state: C::ReadDirState::default(), process_read_dir: None, }, @@ -485,13 +499,13 @@ impl Clone for WalkDirOptions { } impl Parallelism { - pub(crate) fn install(&self, op: OP) + pub(crate) fn spawn(&self, op: OP) where OP: FnOnce() + Send + 'static, { match self { Parallelism::Serial => op(), - Parallelism::RayonDefaultPool => rayon::spawn(op), + Parallelism::RayonDefaultPool { .. } => rayon::spawn(op), Parallelism::RayonNewPool(num_threads) => { let mut thread_pool = ThreadPoolBuilder::new(); if *num_threads > 0 { @@ -503,7 +517,15 @@ impl Parallelism { rayon::spawn(op); } } - Parallelism::RayonExistingPool(thread_pool) => thread_pool.spawn(op), + Parallelism::RayonExistingPool { pool, .. } => pool.spawn(op), + } + } + + pub(crate) fn timeout(&self) -> Option { + match self { + Parallelism::Serial | Parallelism::RayonNewPool(_) => None, + Parallelism::RayonDefaultPool { busy_timeout } + | Parallelism::RayonExistingPool { busy_timeout, .. } => Some(*busy_timeout), } } } diff --git a/tests/detect_deadlock.rs b/tests/detect_deadlock.rs new file mode 100644 index 0000000..afd0fe1 --- /dev/null +++ b/tests/detect_deadlock.rs @@ -0,0 +1,26 @@ +use jwalk::WalkDir; +use rayon::prelude::*; + +#[test] +fn works() { + rayon::ThreadPoolBuilder::new() + .num_threads(1) + .build_global() + .expect("Failed to initialize worker thread pool"); + // Does not finish if jwalk uses shared pool with 1 thread, but we can detect this issue and signal this with an error. + (0..=1) + .collect::>() + .par_iter() + .for_each(|_round| { + for entry in WalkDir::new(".").parallelism(jwalk::Parallelism::RayonDefaultPool { + busy_timeout: std::time::Duration::from_millis(10), + }) { + match entry { + Ok(_) => panic!("Must detect deadlock"), + Err(err) + if err.io_error().expect("is IO").kind() == std::io::ErrorKind::Other => {} + Err(err) => panic!("Unexpected error: {:?}", err), + } + } + }); +} diff --git a/tests/integration.rs b/tests/integration.rs index a8b7da7..cc191bd 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -802,6 +802,9 @@ fn local_paths(walk_dir: WalkDir) -> Vec { .into_iter() .map(|each_result| { let each_entry = each_result.unwrap(); + if let Some(err) = each_entry.read_children_error.as_ref() { + panic!("should not encounter any child errors :{:?}", err); + } let path = each_entry.path().to_path_buf(); let path = path.strip_prefix(&root).unwrap().to_path_buf(); let mut path_string = path.to_str().unwrap().to_string(); @@ -889,7 +892,10 @@ fn walk_rayon_no_lockup() { .unwrap(), ); let _: Vec<_> = WalkDir::new(PathBuf::from(env!("CARGO_MANIFEST_DIR"))) - .parallelism(Parallelism::RayonExistingPool(pool)) + .parallelism(Parallelism::RayonExistingPool { + pool, + busy_timeout: std::time::Duration::from_millis(500), + }) .process_read_dir(|_, _, _, dir_entry_results| { for dir_entry_result in dir_entry_results { let _ = dir_entry_result