diff --git a/rayoff/src/rayoff.rs b/rayoff/src/rayoff.rs index 7ca3773fa46084..b27a68b8b70cfc 100644 --- a/rayoff/src/rayoff.rs +++ b/rayoff/src/rayoff.rs @@ -2,32 +2,30 @@ extern crate sys_info; use job::Job; use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::Arc; -use std::thread::{spawn, JoinHandle}; +use std::sync::{Mutex, Arc}; +use std::thread::{spawn}; +#[derive(Debug)] pub struct Pool { - senders: Vec>>, - threads: Vec>, + senders: Mutex>>>, } impl Default for Pool { fn default() -> Self { let num_threads = sys_info::cpu_num().unwrap_or(16) - 1; - let mut pool = Self { - senders: vec![], - threads: vec![], - }; + let mut senders = vec![]; (0..num_threads).for_each(|_| { let (sender, recvr): (Sender>, Receiver>) = channel(); - let t = spawn(move || { + let _ = spawn(move || { for job in recvr.iter() { job.execute() } }); - pool.senders.push(sender); - pool.threads.push(t); + senders.push(sender); }); - pool + Self { + senders: Mutex::new(senders), + } } } @@ -39,11 +37,9 @@ impl Pool { // Job must be destroyed in the frame that its created let job = unsafe { Job::new(elems, func) }; let job = Arc::new(job); - for s in &self.senders { - s.send(job.clone()).expect("send should never fail"); - } + let len = self.notify_all(job.clone()); job.execute(); - job.wait(self.senders.len() + 1); + job.wait(len + 1); } pub fn map(&self, inputs: &[A], func: F) -> Vec where @@ -58,6 +54,14 @@ impl Pool { }); outs } + fn notify_all(&self, job: Arc) -> usize { + let senders = self.senders.lock().unwrap(); + let len = senders.len(); + for s in senders.iter() { + s.send(job.clone()).expect("send should never fail"); + } + len + } } #[cfg(test)] diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 1bc0efbbc542c4..0cdf6a182e302e 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -287,8 +287,8 @@ impl Accounts { /// returns only the latest/current version of B for this slot fn scan_slot(&self, slot: Slot, func: F) -> Vec where - F: Fn(&StoredAccount) -> Option + 'static, - B: Clone + Default, + F: Fn(&StoredAccount) -> Option + Send + Sync, + B: Clone + Default + Send, { let accumulator: Vec> = self.accounts_db.scan_account_storage( slot, diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 740302481b3fff..cd8b489cd062ec 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -611,8 +611,8 @@ impl AccountsDB { // PERF: Sequentially read each storage entry in parallel pub fn scan_account_storage(&self, slot_id: Slot, scan_func: F) -> Vec where - F: Fn(&StoredAccount, AppendVecId, &mut B) -> () + 'static, - B: Default + Clone, + F: Fn(&StoredAccount, AppendVecId, &mut B) -> () + Send + Sync, + B: Default + Clone + Send, { let storage_maps: Vec> = self .storage