From 67d2853e244ee9d510d832b4a681fee8073da7d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Drago=C8=99=20Tiselice?= Date: Sat, 14 Sep 2024 18:29:00 +0300 Subject: [PATCH] Added heartbeat skips. --- src/lib.rs | 111 ++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 84 insertions(+), 27 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3afd89e..540e909 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -182,6 +182,7 @@ pub struct Scope<'s> { context: Arc, worker_index: usize, job_queue: ThreadJobQueue<'s>, + join_count: u8, } impl<'s> Scope<'s> { @@ -194,6 +195,7 @@ impl<'s> Scope<'s> { context: thread_pool.context.clone(), worker_index, job_queue: ThreadJobQueue::Current(JobQueue::default()), + join_count: 0, } } @@ -206,6 +208,7 @@ impl<'s> Scope<'s> { context, worker_index, job_queue: ThreadJobQueue::Worker(job_queue), + join_count: 0, } } @@ -273,38 +276,13 @@ impl<'s> Scope<'s> { self.context.heartbeats[self.worker_index].store(false, Ordering::Relaxed); } - /// Runs `a` and `b` potentially in parallel on separate threads and - /// returns the results. - /// - /// # Examples - /// - /// ``` - /// # use spice::ThreadPool; - /// let mut tp = ThreadPool::new().unwrap(); - /// let mut s = tp.scope(); - /// - /// let mut vals = [0; 2]; - /// let (left, right) = vals.split_at_mut(1); - /// - /// s.join(|_|left[0] = 1, |_| right[0] = 1); - /// - /// assert_eq!(vals, [1; 2]); - /// ``` - pub fn join(&mut self, a: A, b: B) -> (RA, RB) + fn join_inner(&mut self, a: A, b: B) -> (RA, RB) where A: FnOnce(&mut Scope<'_>) -> RA + Send, B: FnOnce(&mut Scope<'_>) -> RB + Send, RA: Send, RB: Send, { - let a = move |scope: &mut Scope<'_>| { - if scope.context.heartbeats[scope.worker_index].load(Ordering::Relaxed) { - scope.heartbeat(); - } - - a(scope) - }; - let stack = JobStack::new(a); let job = Job::new(&stack); @@ -340,6 +318,85 @@ impl<'s> Scope<'s> { (ra, rb) } } + + /// Runs `a` and `b` potentially in parallel on separate threads and + /// returns the results. + /// + /// This variant skips checking for a heartbeat every 16 calls for improved + /// performance. + /// + /// # Examples + /// + /// ``` + /// # use spice::ThreadPool; + /// let mut tp = ThreadPool::new().unwrap(); + /// let mut s = tp.scope(); + /// + /// let mut vals = [0; 2]; + /// let (left, right) = vals.split_at_mut(1); + /// + /// s.join(|_|left[0] = 1, |_| right[0] = 1); + /// + /// assert_eq!(vals, [1; 2]); + /// ``` + pub fn join(&mut self, a: A, b: B) -> (RA, RB) + where + A: FnOnce(&mut Scope<'_>) -> RA + Send, + B: FnOnce(&mut Scope<'_>) -> RB + Send, + RA: Send, + RB: Send, + { + self.join_with_heartbeat_every::<16, _, _, _, _>(a, b) + } + + /// Runs `a` and `b` potentially in parallel on separate threads and + /// returns the results. + /// + /// This variant skips checking for a heartbeat every `TIMES - 1` calls for + /// improved performance. + /// + /// # Examples + /// + /// ``` + /// # use spice::ThreadPool; + /// let mut tp = ThreadPool::new().unwrap(); + /// let mut s = tp.scope(); + /// + /// let mut vals = [0; 2]; + /// let (left, right) = vals.split_at_mut(1); + /// + /// // Skip checking 7/8 calls to join_with_heartbeat_every. + /// s.join_with_heartbeat_every::<8, _, _, _, _>(|_|left[0] = 1, |_| right[0] = 1); + /// + /// assert_eq!(vals, [1; 2]); + /// ``` + pub fn join_with_heartbeat_every( + &mut self, + a: A, + b: B, + ) -> (RA, RB) + where + A: FnOnce(&mut Scope<'_>) -> RA + Send, + B: FnOnce(&mut Scope<'_>) -> RB + Send, + RA: Send, + RB: Send, + { + self.join_count = self.join_count.wrapping_add(1) % TIMES; + + if self.join_count == 0 { + let a = move |scope: &mut Scope<'_>| { + if scope.context.heartbeats[scope.worker_index].load(Ordering::Relaxed) { + scope.heartbeat(); + } + + a(scope) + }; + + self.join_inner(a, b) + } else { + self.join_inner(a, b) + } + } } /// `ThreadPool` configuration. @@ -599,7 +656,7 @@ mod tests { _ => { let (_, tail) = slice.split_at_mut(1); - s.join( + s.join_with_heartbeat_every::<1, _, _, _, _>( |_| { thread::sleep(Duration::from_micros(100));