diff --git a/examples/crash.rs b/examples/crash.rs deleted file mode 100644 index f4684e2..0000000 --- a/examples/crash.rs +++ /dev/null @@ -1,34 +0,0 @@ -extern crate jwalk; - -use jwalk::{WalkDir, Parallelism}; -use rayon::prelude::*; - -fn main() { - let rounds = vec![0, 1]; - - rayon::ThreadPoolBuilder::new() - .num_threads(1) - .build_global() - .expect("Failed to initialize worker thread pool"); - - let jwalk_pool = std::sync::Arc::new( - rayon::ThreadPoolBuilder::new() - .num_threads(1) - .build() - .unwrap(), - ); - - // Does finish if jwalk uses own pool with 1 thread - rounds.par_iter().for_each(|round| { - eprintln!("Round {round}…"); - for _entry in WalkDir::new(".").parallelism(Parallelism::RayonExistingPool(jwalk_pool.clone())) {} - eprintln!("Round {round} completed"); - }); - - // Does not finish if jwalk uses shared pool with 1 thread - rounds.par_iter().for_each(|round| { - eprintln!("Round {round}…"); - for _entry in WalkDir::new(".") {} - eprintln!("Round {round} completed"); - }); -} \ No newline at end of file diff --git a/src/core/dir_entry_iter.rs b/src/core/dir_entry_iter.rs index 5a3cedd..b948e3c 100644 --- a/src/core/dir_entry_iter.rs +++ b/src/core/dir_entry_iter.rs @@ -9,7 +9,7 @@ use crate::Result; pub struct DirEntryIter { min_depth: usize, // iterator yielding next ReadDir results when needed - read_dir_iter: Peekable>, + read_dir_iter: Option>>, // stack of ReadDir results, track location in filesystem traversal read_dir_results_stack: Vec>>>, } @@ -34,28 +34,33 @@ impl DirEntryIter { .collect(); // 2. Init new read_dir_iter from those specs - let read_dir_iter = ReadDirIter::new(read_dir_specs, parallelism, core_read_dir_callback); + let read_dir_iter = + ReadDirIter::try_new(read_dir_specs, parallelism, core_read_dir_callback) + .map(|iter| iter.peekable()); // 3. Return DirEntryIter that will return initial root entries and then // fill and process read_dir_iter until complete DirEntryIter { min_depth, - read_dir_iter: read_dir_iter.peekable(), + read_dir_iter, read_dir_results_stack: vec![root_entry_results.into_iter()], } + .into() } - fn push_next_read_dir_results(&mut self) -> Result<()> { + fn push_next_read_dir_results( + iter: &mut Peekable>, + results: &mut Vec>>>, + ) -> Result<()> { // Push next read dir results or return error if read failed - let read_dir_result = self.read_dir_iter.next().unwrap(); + let read_dir_result = iter.next().unwrap(); let read_dir = match read_dir_result { Ok(read_dir) => read_dir, Err(err) => return Err(err), }; let ReadDir { results_list, .. } = read_dir; - - self.read_dir_results_stack.push(results_list.into_iter()); + results.push(results_list.into_iter()); Ok(()) } @@ -82,7 +87,23 @@ impl Iterator for DirEntryIter { // 2.2 If dir_entry has a read_children_path means we need to read a new // directory and push those results onto read_dir_results_stack if dir_entry.read_children_path.is_some() { - if let Err(err) = self.push_next_read_dir_results() { + let iter = match self.read_dir_iter + .as_mut() + .ok_or_else(|| { + Error::from_io( + 0, + std::io::Error::new( + std::io::ErrorKind::Other, + "rayon thread-pool too busy or dependency loop detected - aborting before possibility of deadlock", + ), + ) + }) { + Ok(iter) => iter, + Err(err) => return Some(Err(err)), + }; + if let Err(err) = + Self::push_next_read_dir_results(iter, &mut self.read_dir_results_stack) + { dir_entry.read_children_error = Some(err); } } diff --git a/src/core/read_dir_iter.rs b/src/core/read_dir_iter.rs index 5cfd434..8ae4d91 100644 --- a/src/core/read_dir_iter.rs +++ b/src/core/read_dir_iter.rs @@ -23,11 +23,11 @@ pub enum ReadDirIter { } impl ReadDirIter { - pub(crate) fn new( + pub(crate) fn try_new( read_dir_specs: Vec>, parallelism: Parallelism, core_read_dir_callback: Arc>, - ) -> Self { + ) -> Option { if let Parallelism::Serial = parallelism { ReadDirIter::Walk { read_dir_spec_stack: read_dir_specs, @@ -53,7 +53,20 @@ impl ReadDirIter { core_read_dir_callback, }; - parallelism.install(move || { + let (startup_tx, startup_rx) = parallelism + .timeout() + .map(|duration| { + let (tx, rx) = crossbeam::channel::unbounded(); + (Some(tx), Some((rx, duration))) + }) + .unwrap_or((None, None)); + parallelism.spawn(move || { + if let Some(tx) = startup_tx { + if tx.send(()).is_err() { + // rayon didn't install this function in time so the listener exited. Do the same. + return; + } + } read_dir_spec_iter.par_bridge().for_each_with( run_context, |run_context, ordered_read_dir_spec| { @@ -61,11 +74,14 @@ impl ReadDirIter { }, ); }); - + if startup_rx.map_or(false, |(rx, duration)| rx.recv_timeout(duration).is_err()) { + return None; + } ReadDirIter::ParWalk { read_dir_result_iter, } } + .into() } } diff --git a/src/lib.rs b/src/lib.rs index f421e6e..697a7f5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -174,12 +174,24 @@ type ProcessReadDirFunction = dyn Fn(Option, &Path, &mut ), + RayonExistingPool { + /// The pool to spawn our work onto. + pool: Arc, + /// Similar to [`Parallelism::RayonDefaultPool::busy_timeout`]. + busy_timeout: std::time::Duration, + }, /// Run in new rayon thread pool with # threads RayonNewPool(usize), } @@ -209,7 +221,9 @@ impl WalkDirGeneric { max_depth: ::std::usize::MAX, skip_hidden: true, follow_links: false, - parallelism: Parallelism::RayonDefaultPool, + parallelism: Parallelism::RayonDefaultPool { + busy_timeout: std::time::Duration::from_secs(1), + }, root_read_dir_state: C::ReadDirState::default(), process_read_dir: None, }, @@ -485,13 +499,13 @@ impl Clone for WalkDirOptions { } impl Parallelism { - pub(crate) fn install(&self, op: OP) + pub(crate) fn spawn(&self, op: OP) where OP: FnOnce() + Send + 'static, { match self { Parallelism::Serial => op(), - Parallelism::RayonDefaultPool => rayon::spawn(op), + Parallelism::RayonDefaultPool { .. } => rayon::spawn(op), Parallelism::RayonNewPool(num_threads) => { let mut thread_pool = ThreadPoolBuilder::new(); if *num_threads > 0 { @@ -503,7 +517,15 @@ impl Parallelism { rayon::spawn(op); } } - Parallelism::RayonExistingPool(thread_pool) => thread_pool.spawn(op), + Parallelism::RayonExistingPool { pool, .. } => pool.spawn(op), + } + } + + pub(crate) fn timeout(&self) -> Option { + match self { + Parallelism::Serial | Parallelism::RayonNewPool(_) => None, + Parallelism::RayonDefaultPool { busy_timeout } + | Parallelism::RayonExistingPool { busy_timeout, .. } => Some(*busy_timeout), } } } diff --git a/tests/detect_deadlock.rs b/tests/detect_deadlock.rs new file mode 100644 index 0000000..afd0fe1 --- /dev/null +++ b/tests/detect_deadlock.rs @@ -0,0 +1,26 @@ +use jwalk::WalkDir; +use rayon::prelude::*; + +#[test] +fn works() { + rayon::ThreadPoolBuilder::new() + .num_threads(1) + .build_global() + .expect("Failed to initialize worker thread pool"); + // Does not finish if jwalk uses shared pool with 1 thread, but we can detect this issue and signal this with an error. + (0..=1) + .collect::>() + .par_iter() + .for_each(|_round| { + for entry in WalkDir::new(".").parallelism(jwalk::Parallelism::RayonDefaultPool { + busy_timeout: std::time::Duration::from_millis(10), + }) { + match entry { + Ok(_) => panic!("Must detect deadlock"), + Err(err) + if err.io_error().expect("is IO").kind() == std::io::ErrorKind::Other => {} + Err(err) => panic!("Unexpected error: {:?}", err), + } + } + }); +} diff --git a/tests/integration.rs b/tests/integration.rs index a8b7da7..cc191bd 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -802,6 +802,9 @@ fn local_paths(walk_dir: WalkDir) -> Vec { .into_iter() .map(|each_result| { let each_entry = each_result.unwrap(); + if let Some(err) = each_entry.read_children_error.as_ref() { + panic!("should not encounter any child errors :{:?}", err); + } let path = each_entry.path().to_path_buf(); let path = path.strip_prefix(&root).unwrap().to_path_buf(); let mut path_string = path.to_str().unwrap().to_string(); @@ -889,7 +892,10 @@ fn walk_rayon_no_lockup() { .unwrap(), ); let _: Vec<_> = WalkDir::new(PathBuf::from(env!("CARGO_MANIFEST_DIR"))) - .parallelism(Parallelism::RayonExistingPool(pool)) + .parallelism(Parallelism::RayonExistingPool { + pool, + busy_timeout: std::time::Duration::from_millis(500), + }) .process_read_dir(|_, _, _, dir_entry_results| { for dir_entry_result in dir_entry_results { let _ = dir_entry_result