Skip to content

Commit

Permalink
Added heartbeat skips.
Browse files Browse the repository at this point in the history
  • Loading branch information
dragostis committed Sep 14, 2024
1 parent 0fd33b2 commit 30eb199
Showing 1 changed file with 87 additions and 28 deletions.
115 changes: 87 additions & 28 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ pub struct Scope<'s> {
context: Arc<Context>,
worker_index: usize,
job_queue: ThreadJobQueue<'s>,
join_count: u8,
}

impl<'s> Scope<'s> {
Expand All @@ -194,6 +195,7 @@ impl<'s> Scope<'s> {
context: thread_pool.context.clone(),
worker_index,
job_queue: ThreadJobQueue::Current(JobQueue::default()),
join_count: 0,
}
}

Expand All @@ -206,6 +208,7 @@ impl<'s> Scope<'s> {
context,
worker_index,
job_queue: ThreadJobQueue::Worker(job_queue),
join_count: 0,
}
}

Expand Down Expand Up @@ -273,38 +276,13 @@ impl<'s> Scope<'s> {
self.context.heartbeats[self.worker_index].store(false, Ordering::Relaxed);
}

/// Runs `a` and `b` potentially in parallel on separate threads and
/// returns the results.
///
/// # Examples
///
/// ```
/// # use spice::ThreadPool;
/// let mut tp = ThreadPool::new().unwrap();
/// let mut s = tp.scope();
///
/// let mut vals = [0; 2];
/// let (left, right) = vals.split_at_mut(1);
///
/// s.join(|_|left[0] = 1, |_| right[0] = 1);
///
/// assert_eq!(vals, [1; 2]);
/// ```
pub fn join<A, B, RA, RB>(&mut self, a: A, b: B) -> (RA, RB)
fn join_inner<A, B, RA, RB>(&mut self, a: A, b: B) -> (RA, RB)
where
A: FnOnce(&mut Scope<'_>) -> RA + Send,
B: FnOnce(&mut Scope<'_>) -> RB + Send,
RA: Send,
RB: Send,
{
let a = move |scope: &mut Scope<'_>| {
if scope.context.heartbeats[scope.worker_index].load(Ordering::Relaxed) {
scope.heartbeat();
}

a(scope)
};

let stack = JobStack::new(a);
let job = Job::new(&stack);

Expand Down Expand Up @@ -340,6 +318,85 @@ impl<'s> Scope<'s> {
(ra, rb)
}
}

/// Runs `a` and `b` potentially in parallel on separate threads and
/// returns the results.
///
/// This variant skips checking for a heartbeat every 16 calls for improved
/// performance.
///
/// # Examples
///
/// ```
/// # use spice::ThreadPool;
/// let mut tp = ThreadPool::new().unwrap();
/// let mut s = tp.scope();
///
/// let mut vals = [0; 2];
/// let (left, right) = vals.split_at_mut(1);
///
/// s.join(|_|left[0] = 1, |_| right[0] = 1);
///
/// assert_eq!(vals, [1; 2]);
/// ```
pub fn join<A, B, RA, RB>(&mut self, a: A, b: B) -> (RA, RB)
where
A: FnOnce(&mut Scope<'_>) -> RA + Send,
B: FnOnce(&mut Scope<'_>) -> RB + Send,
RA: Send,
RB: Send,
{
self.join_with_heartbeat_every::<16, _, _, _, _>(a, b)
}

/// Runs `a` and `b` potentially in parallel on separate threads and
/// returns the results.
///
/// This variant skips checking for a heartbeat every `TIMES - 1` calls for
/// improved performance.
///
/// # Examples
///
/// ```
/// # use spice::ThreadPool;
/// let mut tp = ThreadPool::new().unwrap();
/// let mut s = tp.scope();
///
/// let mut vals = [0; 2];
/// let (left, right) = vals.split_at_mut(1);
///
/// // Skip checking 7/8 calls to join_with_heartbeat_every.
/// s.join_with_heartbeat_every::<8, _, _, _, _>(|_|left[0] = 1, |_| right[0] = 1);
///
/// assert_eq!(vals, [1; 2]);
/// ```
pub fn join_with_heartbeat_every<const TIMES: u8, A, B, RA, RB>(
&mut self,
a: A,
b: B,
) -> (RA, RB)
where
A: FnOnce(&mut Scope<'_>) -> RA + Send,
B: FnOnce(&mut Scope<'_>) -> RB + Send,
RA: Send,
RB: Send,
{
self.join_count = self.join_count.wrapping_add(1) % TIMES;

if self.join_count == 0 {
let a = move |scope: &mut Scope<'_>| {
if scope.context.heartbeats[scope.worker_index].load(Ordering::Relaxed) {
scope.heartbeat();
}

a(scope)
};

self.join_inner(a, b)
} else {
self.join_inner(a, b)
}
}
}

/// `ThreadPool` configuration.
Expand Down Expand Up @@ -597,16 +654,18 @@ mod tests {
0 => (),
1 => slice[0] += 1,
_ => {
let (_, tail) = slice.split_at_mut(1);
let (head, tail) = slice.split_at_mut(1);

s.join(
s.join_with_heartbeat_every::<1, _, _, _, _>(
|_| {
thread::sleep(Duration::from_micros(100));

if thread::current().id() != id {
threads_crossed.store(true, Ordering::Relaxed);
panic!("panicked across threads");
}

head[0] += 1;
},
|s| increment(s, tail, id),
);
Expand Down

0 comments on commit 30eb199

Please sign in to comment.