Skip to content

Commit

Permalink
Add deadlock detection
Browse files Browse the repository at this point in the history
  • Loading branch information
Zoxc committed Mar 3, 2020
1 parent 8b2ff87 commit 27911f7
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 16 deletions.
29 changes: 26 additions & 3 deletions rayon-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
//! succeed.
#![doc(html_root_url = "https://docs.rs/rayon-core/1.7")]
#![deny(missing_debug_implementations)]
#![deny(missing_docs)]
#![deny(unreachable_pub)]

use std::any::Any;
use std::env;
Expand Down Expand Up @@ -56,6 +53,7 @@ pub mod tlv;

pub use self::join::{join, join_context};
pub use self::registry::ThreadBuilder;
pub use self::registry::{mark_blocked, mark_unblocked, Registry};
pub use self::scope::{scope, Scope};
pub use self::scope::{scope_fifo, ScopeFifo};
pub use self::spawn::{spawn, spawn_fifo};
Expand Down Expand Up @@ -134,6 +132,9 @@ pub struct ThreadPoolBuilder<S = DefaultSpawn> {
/// The stack size for the created worker threads
stack_size: Option<usize>,

/// Closure invoked on deadlock.
deadlock_handler: Option<Box<DeadlockHandler>>,

/// Closure invoked on worker thread start.
start_handler: Option<Box<StartHandler>>,

Expand Down Expand Up @@ -161,6 +162,9 @@ pub struct Configuration {
/// may be invoked multiple times in parallel.
type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync;

/// The type for a closure that gets invoked when the Rayon thread pool deadlocks
type DeadlockHandler = dyn Fn() + Send + Sync;

/// The type for a closure that gets invoked when a thread starts. The
/// closure is passed the index of the thread on which it is invoked.
/// Note that this same closure may be invoked multiple times in parallel.
Expand All @@ -181,6 +185,7 @@ impl Default for ThreadPoolBuilder {
stack_size: None,
start_handler: None,
exit_handler: None,
deadlock_handler: None,
spawn_handler: DefaultSpawn,
breadth_first: false,
}
Expand Down Expand Up @@ -365,6 +370,7 @@ impl<S> ThreadPoolBuilder<S> {
stack_size: self.stack_size,
start_handler: self.start_handler,
exit_handler: self.exit_handler,
deadlock_handler: self.deadlock_handler,
breadth_first: self.breadth_first,
}
}
Expand Down Expand Up @@ -523,6 +529,20 @@ impl<S> ThreadPoolBuilder<S> {
self.breadth_first
}

/// Takes the current deadlock callback, leaving `None`.
fn take_deadlock_handler(&mut self) -> Option<Box<DeadlockHandler>> {
self.deadlock_handler.take()
}

/// Set a callback to be invoked on current deadlock.
pub fn deadlock_handler<H>(mut self, deadlock_handler: H) -> Self
where
H: Fn() + Send + Sync + 'static,
{
self.deadlock_handler = Some(Box::new(deadlock_handler));
self
}

/// Takes the current thread start callback, leaving `None`.
fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
self.start_handler.take()
Expand Down Expand Up @@ -676,6 +696,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
ref get_thread_name,
ref panic_handler,
ref stack_size,
ref deadlock_handler,
ref start_handler,
ref exit_handler,
spawn_handler: _,
Expand All @@ -692,6 +713,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
}
let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder);
let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder);
let deadlock_handler = deadlock_handler.as_ref().map(|_| ClosurePlaceholder);
let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);

Expand All @@ -700,6 +722,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
.field("get_thread_name", &get_thread_name)
.field("panic_handler", &panic_handler)
.field("stack_size", &stack_size)
.field("deadlock_handler", &deadlock_handler)
.field("start_handler", &start_handler)
.field("exit_handler", &exit_handler)
.field("breadth_first", &breadth_first)
Expand Down
35 changes: 30 additions & 5 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use crate::sleep::Sleep;
use crate::unwind;
use crate::util::leak;
use crate::{
ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
DeadlockHandler, ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError,
ThreadPoolBuilder,
};
use crossbeam_deque::{Steal, Stealer, Worker};
use crossbeam_queue::SegQueue;
Expand Down Expand Up @@ -131,11 +132,12 @@ where
}
}

pub(super) struct Registry {
pub struct Registry {
thread_infos: Vec<ThreadInfo>,
sleep: Sleep,
injected_jobs: SegQueue<JobRef>,
panic_handler: Option<Box<PanicHandler>>,
deadlock_handler: Option<Box<DeadlockHandler>>,
start_handler: Option<Box<StartHandler>>,
exit_handler: Option<Box<ExitHandler>>,

Expand Down Expand Up @@ -235,10 +237,11 @@ impl Registry {

let registry = Arc::new(Registry {
thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
sleep: Sleep::new(),
sleep: Sleep::new(n_threads),
injected_jobs: SegQueue::new(),
terminate_latch: CountLatch::new(),
panic_handler: builder.take_panic_handler(),
deadlock_handler: builder.take_deadlock_handler(),
start_handler: builder.take_start_handler(),
exit_handler: builder.take_exit_handler(),
});
Expand All @@ -265,7 +268,7 @@ impl Registry {
Ok(registry.clone())
}

pub(super) fn current() -> Arc<Registry> {
pub fn current() -> Arc<Registry> {
unsafe {
let worker_thread = WorkerThread::current();
if worker_thread.is_null() {
Expand Down Expand Up @@ -512,6 +515,24 @@ impl Registry {
}
}

/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
/// if no other worker thread is active
#[inline]
pub fn mark_blocked() {
let worker_thread = WorkerThread::current();
assert!(!worker_thread.is_null());
unsafe {
let registry = &(*worker_thread).registry;
registry.sleep.mark_blocked(&registry.deadlock_handler)
}
}

/// Mark a previously blocked Rayon worker thread as unblocked
#[inline]
pub fn mark_unblocked(registry: &Registry) {
registry.sleep.mark_unblocked()
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub(super) struct RegistryId {
addr: usize,
Expand Down Expand Up @@ -666,7 +687,11 @@ impl WorkerThread {
yields = self.registry.sleep.work_found(self.index, yields);
self.execute(job);
} else {
yields = self.registry.sleep.no_work_found(self.index, yields);
yields = self.registry.sleep.no_work_found(
self.index,
yields,
&self.registry.deadlock_handler,
);
}
}

Expand Down
33 changes: 33 additions & 0 deletions rayon-core/src/sleep/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -386,3 +386,36 @@ some of them were hit hard:
- 8-10% overhead on nbody-parreduce
- 35% overhead on increment-all
- 245% overhead on join-recursively

# Deadlock detection

This module tracks a number of variables in order to detect deadlocks due to user code blocking.
These variables are stored in the `SleepData` struct which itself is kept behind a mutex.
It contains the following fields:
- `worker_count` - The number of threads in the thread pool.
- `active_threads` - The number of threads in the thread pool which are running
and aren't blocked in user code or sleeping.
- `blocked_threads` - The number of threads which are blocked in user code.
This doesn't include threads blocked by Rayon.

User code can indicate blocking by calling `mark_blocked` before blocking and
calling `mark_unblocked` before unblocking a thread.
This will adjust `active_threads` and `blocked_threads` accordingly.

When we tickle the thread pool in `Sleep::tickle_cold`, we set `active_threads` to
`worker_count` - `blocked_threads` since we wake up all Rayon threads, but not thread blocked
by user code.

A deadlock is detected by checking if `active_threads` is 0 and `blocked_threads` is above 0.
If we ignored `blocked_threads` we would have a deadlock
immediately when creating the thread pool.
We would also deadlock once the thread pool ran out of work.
It is not possible for Rayon itself to deadlock.
Deadlocks can only be caused by user code blocking, so this condition doesn't miss any deadlocks.

We check for the deadlock condition when
threads fall asleep in `mark_unblocked` and in `Sleep::sleep`.
If there's a deadlock detected we call the user provided deadlock handler while we hold the
lock to `SleepData`. This means the deadlock handler cannot call `mark_blocked` and
`mark_unblocked`. The user is expected to handle the deadlock in some non-Rayon thread.
Once the deadlock handler returns, the thread which called the deadlock handler will go to sleep.
81 changes: 73 additions & 8 deletions rayon-core/src/sleep/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,38 @@
//! for an overview.
use crate::log::Event::*;
use crate::DeadlockHandler;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Condvar, Mutex};
use std::thread;
use std::usize;

struct SleepData {
/// The number of threads in the thread pool.
worker_count: usize,

/// The number of threads in the thread pool which are running and
/// aren't blocked in user code or sleeping.
active_threads: usize,

/// The number of threads which are blocked in user code.
/// This doesn't include threads blocked by this module.
blocked_threads: usize,
}

impl SleepData {
/// Checks if the conditions for a deadlock holds and if so calls the deadlock handler
#[inline]
pub fn deadlock_check(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
if self.active_threads == 0 && self.blocked_threads > 0 {
(deadlock_handler.as_ref().unwrap())();
}
}
}

pub(super) struct Sleep {
state: AtomicUsize,
data: Mutex<()>,
data: Mutex<SleepData>,
tickle: Condvar,
}

Expand All @@ -20,14 +44,42 @@ const ROUNDS_UNTIL_SLEEPY: usize = 32;
const ROUNDS_UNTIL_ASLEEP: usize = 64;

impl Sleep {
pub(super) fn new() -> Sleep {
pub(super) fn new(worker_count: usize) -> Sleep {
Sleep {
state: AtomicUsize::new(AWAKE),
data: Mutex::new(()),
data: Mutex::new(SleepData {
worker_count,
active_threads: worker_count,
blocked_threads: 0,
}),
tickle: Condvar::new(),
}
}

/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
/// if no other worker thread is active
#[inline]
pub fn mark_blocked(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
let mut data = self.data.lock().unwrap();
debug_assert!(data.active_threads > 0);
debug_assert!(data.blocked_threads < data.worker_count);
debug_assert!(data.active_threads > 0);
data.active_threads -= 1;
data.blocked_threads += 1;

data.deadlock_check(deadlock_handler);
}

/// Mark a previously blocked Rayon worker thread as unblocked
#[inline]
pub fn mark_unblocked(&self) {
let mut data = self.data.lock().unwrap();
debug_assert!(data.active_threads < data.worker_count);
debug_assert!(data.blocked_threads > 0);
data.active_threads += 1;
data.blocked_threads -= 1;
}

fn anyone_sleeping(&self, state: usize) -> bool {
state & SLEEPING != 0
}
Expand Down Expand Up @@ -61,7 +113,12 @@ impl Sleep {
}

#[inline]
pub(super) fn no_work_found(&self, worker_index: usize, yields: usize) -> usize {
pub(super) fn no_work_found(
&self,
worker_index: usize,
yields: usize,
deadlock_handler: &Option<Box<DeadlockHandler>>,
) -> usize {
log!(DidNotFindWork {
worker: worker_index,
yields: yields,
Expand All @@ -88,7 +145,7 @@ impl Sleep {
}
} else {
debug_assert_eq!(yields, ROUNDS_UNTIL_ASLEEP);
self.sleep(worker_index);
self.sleep(worker_index, deadlock_handler);
0
}
}
Expand Down Expand Up @@ -122,7 +179,10 @@ impl Sleep {
old_state: old_state,
});
if self.anyone_sleeping(old_state) {
let _data = self.data.lock().unwrap();
let mut data = self.data.lock().unwrap();
// Set the active threads to the number of workers,
// excluding threads blocked by the user since we won't wake those up
data.active_threads = data.worker_count - data.blocked_threads;
self.tickle.notify_all();
}
}
Expand Down Expand Up @@ -188,7 +248,7 @@ impl Sleep {
self.worker_is_sleepy(state, worker_index)
}

fn sleep(&self, worker_index: usize) {
fn sleep(&self, worker_index: usize, deadlock_handler: &Option<Box<DeadlockHandler>>) {
loop {
// Acquire here suffices. If we observe that the current worker is still
// sleepy, then in fact we know that no writes have occurred, and anyhow
Expand Down Expand Up @@ -235,7 +295,7 @@ impl Sleep {
// reason for the `compare_exchange` to fail is if an
// awaken comes, in which case the next cycle around
// the loop will just return.
let data = self.data.lock().unwrap();
let mut data = self.data.lock().unwrap();

// This must be SeqCst on success because we want to
// ensure:
Expand Down Expand Up @@ -264,6 +324,11 @@ impl Sleep {
log!(FellAsleep {
worker: worker_index
});

// Decrement the number of active threads and check for a deadlock
data.active_threads -= 1;
data.deadlock_check(deadlock_handler);

let _ = self.tickle.wait(data).unwrap();
log!(GotAwoken {
worker: worker_index
Expand Down

0 comments on commit 27911f7

Please sign in to comment.