Skip to content

Commit

Permalink
feat!: Detect possible deadlocks when instantiating a parallel iterator.
Browse files Browse the repository at this point in the history
Deadlocks can happen if the producer for results doesn't start as there
is no free thread on the rayon pool, and the only way for it to become free
is if the iterator produces results.

We now offer a `busy_timeout` in the relevant variants of the
`Parallelism` enumeration to allow controlling how long we will wait
until we abort with an error.
  • Loading branch information
Byron committed Dec 15, 2022
1 parent cc0009f commit 3bf1bc2
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 55 deletions.
34 changes: 0 additions & 34 deletions examples/crash.rs

This file was deleted.

37 changes: 29 additions & 8 deletions src/core/dir_entry_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::Result;
pub struct DirEntryIter<C: ClientState> {
min_depth: usize,
// iterator yielding next ReadDir results when needed
read_dir_iter: Peekable<ReadDirIter<C>>,
read_dir_iter: Option<Peekable<ReadDirIter<C>>>,
// stack of ReadDir results, track location in filesystem traversal
read_dir_results_stack: Vec<vec::IntoIter<Result<DirEntry<C>>>>,
}
Expand All @@ -34,28 +34,33 @@ impl<C: ClientState> DirEntryIter<C> {
.collect();

// 2. Init new read_dir_iter from those specs
let read_dir_iter = ReadDirIter::new(read_dir_specs, parallelism, core_read_dir_callback);
let read_dir_iter =
ReadDirIter::try_new(read_dir_specs, parallelism, core_read_dir_callback)
.map(|iter| iter.peekable());

// 3. Return DirEntryIter that will return initial root entries and then
// fill and process read_dir_iter until complete
DirEntryIter {
min_depth,
read_dir_iter: read_dir_iter.peekable(),
read_dir_iter,
read_dir_results_stack: vec![root_entry_results.into_iter()],
}
.into()
}

fn push_next_read_dir_results(&mut self) -> Result<()> {
fn push_next_read_dir_results(
iter: &mut Peekable<ReadDirIter<C>>,
results: &mut Vec<vec::IntoIter<Result<DirEntry<C>>>>,
) -> Result<()> {
// Push next read dir results or return error if read failed
let read_dir_result = self.read_dir_iter.next().unwrap();
let read_dir_result = iter.next().unwrap();
let read_dir = match read_dir_result {
Ok(read_dir) => read_dir,
Err(err) => return Err(err),
};

let ReadDir { results_list, .. } = read_dir;

self.read_dir_results_stack.push(results_list.into_iter());
results.push(results_list.into_iter());

Ok(())
}
Expand All @@ -82,7 +87,23 @@ impl<C: ClientState> Iterator for DirEntryIter<C> {
// 2.2 If dir_entry has a read_children_path means we need to read a new
// directory and push those results onto read_dir_results_stack
if dir_entry.read_children_path.is_some() {
if let Err(err) = self.push_next_read_dir_results() {
let iter = match self.read_dir_iter
.as_mut()
.ok_or_else(|| {
Error::from_io(
0,
std::io::Error::new(
std::io::ErrorKind::Other,
"rayon thread-pool too busy or dependency loop detected - aborting before possibility of deadlock",
),
)
}) {
Ok(iter) => iter,
Err(err) => return Some(Err(err)),
};
if let Err(err) =
Self::push_next_read_dir_results(iter, &mut self.read_dir_results_stack)
{
dir_entry.read_children_error = Some(err);
}
}
Expand Down
24 changes: 20 additions & 4 deletions src/core/read_dir_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ pub enum ReadDirIter<C: ClientState> {
}

impl<C: ClientState> ReadDirIter<C> {
pub(crate) fn new(
pub(crate) fn try_new(
read_dir_specs: Vec<ReadDirSpec<C>>,
parallelism: Parallelism,
core_read_dir_callback: Arc<ReadDirCallback<C>>,
) -> Self {
) -> Option<Self> {
if let Parallelism::Serial = parallelism {
ReadDirIter::Walk {
read_dir_spec_stack: read_dir_specs,
Expand All @@ -53,19 +53,35 @@ impl<C: ClientState> ReadDirIter<C> {
core_read_dir_callback,
};

parallelism.install(move || {
let (startup_tx, startup_rx) = parallelism
.timeout()
.map(|duration| {
let (tx, rx) = crossbeam::channel::unbounded();
(Some(tx), Some((rx, duration)))
})
.unwrap_or((None, None));
parallelism.spawn(move || {
if let Some(tx) = startup_tx {
if tx.send(()).is_err() {
// rayon didn't install this function in time so the listener exited. Do the same.
return;
}
}
read_dir_spec_iter.par_bridge().for_each_with(
run_context,
|run_context, ordered_read_dir_spec| {
multi_threaded_walk_dir(ordered_read_dir_spec, run_context);
},
);
});

if startup_rx.map_or(false, |(rx, duration)| rx.recv_timeout(duration).is_err()) {
return None;
}
ReadDirIter::ParWalk {
read_dir_result_iter,
}
}
.into()
}
}

Expand Down
38 changes: 30 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,24 @@ type ProcessReadDirFunction<C> = dyn Fn(Option<usize>, &Path, &mut <C as ClientS
/// If you plan to perform lots of per file processing you might want to use Rayon to
#[derive(Clone)]
pub enum Parallelism {
/// Run on calling thread
/// Run on calling thread, similar to what happens in the `walkdir` crate.
Serial,
/// Run in default rayon thread pool
RayonDefaultPool,
/// Run in default rayon thread pool.
RayonDefaultPool {
/// Define when we consider the rayon default pool too busy to serve our iteration and abort the iteration, defaulting to 1s.
///
/// This can happen if `jwalk` is launched from within a par-iter on a pool that only has a single thread,
/// or if there are many parallel `jwalk` invocations that all use the same threadpool, rendering it too busy
/// to respond within this duration.
busy_timeout: std::time::Duration,
},
/// Run in existing rayon thread pool
RayonExistingPool(Arc<ThreadPool>),
RayonExistingPool {
/// The pool to spawn our work onto.
pool: Arc<ThreadPool>,
/// Similar to [`Parallelism::RayonDefaultPool::busy_timeout`].
busy_timeout: std::time::Duration,
},
/// Run in new rayon thread pool with # threads
RayonNewPool(usize),
}
Expand Down Expand Up @@ -209,7 +221,9 @@ impl<C: ClientState> WalkDirGeneric<C> {
max_depth: ::std::usize::MAX,
skip_hidden: true,
follow_links: false,
parallelism: Parallelism::RayonDefaultPool,
parallelism: Parallelism::RayonDefaultPool {
busy_timeout: std::time::Duration::from_secs(1),
},
root_read_dir_state: C::ReadDirState::default(),
process_read_dir: None,
},
Expand Down Expand Up @@ -485,13 +499,13 @@ impl<C: ClientState> Clone for WalkDirOptions<C> {
}

impl Parallelism {
pub(crate) fn install<OP>(&self, op: OP)
pub(crate) fn spawn<OP>(&self, op: OP)
where
OP: FnOnce() + Send + 'static,
{
match self {
Parallelism::Serial => op(),
Parallelism::RayonDefaultPool => rayon::spawn(op),
Parallelism::RayonDefaultPool { .. } => rayon::spawn(op),
Parallelism::RayonNewPool(num_threads) => {
let mut thread_pool = ThreadPoolBuilder::new();
if *num_threads > 0 {
Expand All @@ -503,7 +517,15 @@ impl Parallelism {
rayon::spawn(op);
}
}
Parallelism::RayonExistingPool(thread_pool) => thread_pool.spawn(op),
Parallelism::RayonExistingPool { pool, .. } => pool.spawn(op),
}
}

pub(crate) fn timeout(&self) -> Option<std::time::Duration> {
match self {
Parallelism::Serial | Parallelism::RayonNewPool(_) => None,
Parallelism::RayonDefaultPool { busy_timeout }
| Parallelism::RayonExistingPool { busy_timeout, .. } => Some(*busy_timeout),
}
}
}
Expand Down
26 changes: 26 additions & 0 deletions tests/detect_deadlock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use jwalk::WalkDir;
use rayon::prelude::*;

#[test]
fn works() {
rayon::ThreadPoolBuilder::new()
.num_threads(1)
.build_global()
.expect("Failed to initialize worker thread pool");
// Does not finish if jwalk uses shared pool with 1 thread, but we can detect this issue and signal this with an error.
(0..=1)
.collect::<Vec<usize>>()
.par_iter()
.for_each(|_round| {
for entry in WalkDir::new(".").parallelism(jwalk::Parallelism::RayonDefaultPool {
busy_timeout: std::time::Duration::from_millis(10),
}) {
match entry {
Ok(_) => panic!("Must detect deadlock"),
Err(err)
if err.io_error().expect("is IO").kind() == std::io::ErrorKind::Other => {}
Err(err) => panic!("Unexpected error: {:?}", err),
}
}
});
}
8 changes: 7 additions & 1 deletion tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,9 @@ fn local_paths(walk_dir: WalkDir) -> Vec<String> {
.into_iter()
.map(|each_result| {
let each_entry = each_result.unwrap();
if let Some(err) = each_entry.read_children_error.as_ref() {
panic!("should not encounter any child errors :{:?}", err);
}
let path = each_entry.path().to_path_buf();
let path = path.strip_prefix(&root).unwrap().to_path_buf();
let mut path_string = path.to_str().unwrap().to_string();
Expand Down Expand Up @@ -889,7 +892,10 @@ fn walk_rayon_no_lockup() {
.unwrap(),
);
let _: Vec<_> = WalkDir::new(PathBuf::from(env!("CARGO_MANIFEST_DIR")))
.parallelism(Parallelism::RayonExistingPool(pool))
.parallelism(Parallelism::RayonExistingPool {
pool,
busy_timeout: std::time::Duration::from_millis(500),
})
.process_read_dir(|_, _, _, dir_entry_results| {
for dir_entry_result in dir_entry_results {
let _ = dir_entry_result
Expand Down

0 comments on commit 3bf1bc2

Please sign in to comment.