diff --git a/Cargo.lock b/Cargo.lock index b8b734e8c2b067..31c00b02769583 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2559,6 +2559,14 @@ dependencies = [ "rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rayoff" +version = "0.21.0" +dependencies = [ + "rayon 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "sys-info 0.5.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rayon" version = "1.2.0" @@ -3788,6 +3796,7 @@ dependencies = [ "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "rayoff 0.21.0", "rayon 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.102 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.102 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 1cf366aaf928d1..5d808ea464c3a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ members = [ "programs/vote", "archiver", "runtime", + "rayoff", "sdk", "sdk-c", "upload-perf", diff --git a/perf/Cargo.toml b/perf/Cargo.toml index ae72195a65a504..b591ea3269518e 100644 --- a/perf/Cargo.toml +++ b/perf/Cargo.toml @@ -23,6 +23,7 @@ solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.21.0" } solana-budget-program = { path = "../programs/budget", version = "0.21.0" } solana-logger = { path = "../logger", version = "0.21.0" } solana-metrics = { path = "../metrics", version = "0.21.0" } +rayoff = { path = "../rayoff", version = "0.21.0" } [lib] name = "solana_perf" diff --git a/perf/benches/sigverify.rs b/perf/benches/sigverify.rs index 49d057044417c8..486c64d61049b3 100644 --- a/perf/benches/sigverify.rs +++ b/perf/benches/sigverify.rs @@ -23,6 +23,19 @@ fn bench_sigverify(bencher: &mut Bencher) { }) } +#[bench] +fn bench_sigverify_rayoff(bencher: &mut Bencher) { + let tx = test_tx(); + + // generate packet vector + let batches = to_packets(&vec![tx; 128]); + + // verify packets + bencher.iter(|| { + let _ans = sigverify::ed25519_verify_rayoff(&batches); + }) +} + #[bench] fn bench_get_offsets(bencher: &mut Bencher) { let tx = test_tx(); diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index a44c27ce158728..7af0f5904ef32c 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -9,6 +9,7 @@ use crate::packet::{Packet, Packets}; use crate::perf_libs; use crate::recycler::Recycler; use bincode::serialized_size; +use rayoff::rayoff::Pool; use rayon::ThreadPool; use solana_metrics::inc_new_counter_debug; use solana_rayon_threadlimit::get_thread_count; @@ -243,6 +244,28 @@ pub fn generate_offsets( )) } +thread_local!(static RAYOFF_POOL: RefCell = RefCell::new(Pool::new())); + +pub fn ed25519_verify_rayoff(batches: &[Packets]) -> Vec> { + use rayon::prelude::*; + let count = batch_size(batches); + debug!("CPU ECDSA for {}", batch_size(batches)); + let rv = batches + .iter() + .map(|batch| { + let mut items: Vec<(_, u8)> = batch.packets.iter().map(|b| (b, 0)).collect(); + RAYOFF_POOL.with(|pool| { + pool.borrow() + .dispatch_mut(&mut items, |(packet, out)| *out = verify_packet(packet)); + let rv: Vec = items.into_iter().map(|x| x.1).collect(); + rv + }) + }) + .collect(); + inc_new_counter_debug!("ed25519_verify_cpu", count); + rv +} + pub fn ed25519_verify_cpu(batches: &[Packets]) -> Vec> { use rayon::prelude::*; let count = batch_size(batches); diff --git a/rayoff/Cargo.toml b/rayoff/Cargo.toml new file mode 100644 index 00000000000000..0d579bf18b3fec --- /dev/null +++ b/rayoff/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "rayoff" +version = "0.21.0" +authors = ["Anatoly Yakovenko "] + +[dependencies] +sys-info = "0.5.8" + +[dev-dependencies] +rayon = "1.1" + +[lib] +crate-type = ["lib"] +name = "rayoff" + +[[bench]] +name = "rayoff" diff --git a/rayoff/README.md b/rayoff/README.md new file mode 100644 index 00000000000000..174a560ea83a79 --- /dev/null +++ b/rayoff/README.md @@ -0,0 +1,16 @@ +A Rust library that implements a fast threadpool. + +Raw bench performance: + +``` +test bench_baseline ... bench: 90 ns/iter (+/- 5) +test bench_pool ... bench: 10,489 ns/iter (+/- 2,053) +test bench_rayon ... bench: 11,817 ns/iter (+/- 636) +``` + +sigverify performance: +``` +running 3 tests +test bench_sigverify ... bench: 3,973,128 ns/iter (+/- 306,527) +test bench_sigverify_rayoff ... bench: 3,697,677 ns/iter (+/- 738,464) +``` diff --git a/rayoff/benches/rayoff.rs b/rayoff/benches/rayoff.rs new file mode 100644 index 00000000000000..efb94f358b0caa --- /dev/null +++ b/rayoff/benches/rayoff.rs @@ -0,0 +1,47 @@ +#![feature(test)] +extern crate rayoff; +extern crate rayon; +extern crate test; + +use rayoff::rayoff::Pool; +use rayon::prelude::*; +use test::Bencher; + +#[bench] +fn bench_rayoff(bencher: &mut Bencher) { + let pool = Pool::new(); + bencher.iter(|| { + let mut array = [0usize; 100]; + pool.dispatch_mut(&mut array, |val: &mut usize| *val += 1); + let expected = [1usize; 100]; + for i in 0..100 { + assert_eq!(array[i], expected[i]); + } + }) +} + +#[bench] +fn bench_baseline(bencher: &mut Bencher) { + bencher.iter(|| { + let mut array = [0usize; 100]; + for i in array.iter_mut() { + *i += 1; + } + let expected = [1usize; 100]; + for i in 0..100 { + assert_eq!(array[i], expected[i]); + } + }) +} + +#[bench] +fn bench_rayon(bencher: &mut Bencher) { + bencher.iter(|| { + let mut array = [0usize; 100]; + array.par_iter_mut().for_each(|p| *p += 1); + let expected = [1usize; 100]; + for i in 0..100 { + assert_eq!(array[i], expected[i]); + } + }) +} diff --git a/rayoff/src/lib.rs b/rayoff/src/lib.rs new file mode 100644 index 00000000000000..f524eb641ea5d2 --- /dev/null +++ b/rayoff/src/lib.rs @@ -0,0 +1 @@ +pub mod rayoff; diff --git a/rayoff/src/rayoff.rs b/rayoff/src/rayoff.rs new file mode 100644 index 00000000000000..7d2cb3f55c1f17 --- /dev/null +++ b/rayoff/src/rayoff.rs @@ -0,0 +1,102 @@ +extern crate sys_info; + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::{Arc}; +use std::thread::{JoinHandle, yield_now, spawn}; + +struct Job { + func: Box, + elems: *mut u64, + num: usize, + work_index: AtomicUsize, + done_index: AtomicUsize, +} +unsafe impl Send for Job {} +unsafe impl Sync for Job {} + +pub struct Pool { + senders: Vec>>, + threads: Vec>, +} + +impl Job { + fn execute(&self) { + loop { + let index = self.work_index.fetch_add(1, Ordering::Relaxed); + if index >= self.num { + self.done_index.fetch_add(1, Ordering::Relaxed); + break; + } + (self.func)(self.elems, self.num, index); + } + } + fn wait(&self, num: usize) { + loop { + let guard = self.done_index.load(Ordering::Relaxed); + if guard >= num { + break; + } + yield_now(); + } + } +} + +impl Pool { + pub fn new() -> Self { + let num_threads = sys_info::cpu_num().unwrap_or(16) - 1; + let mut pool = Self { + senders: vec![], + threads: vec![], + }; + (0..num_threads).for_each(|_| { + let (sender, recvr): (Sender>, Receiver>) = channel(); + let t = spawn(move || { + for job in recvr.iter() { + job.execute() + } + }); + pool.senders.push(sender); + pool.threads.push(t); + }); + pool + } + + pub fn dispatch_mut(&self, elems: &mut [A], func: F) + where + F: Fn(&mut A) + 'static, + { + let job = Job { + elems: elems.as_mut_ptr() as *mut u64, + num: elems.len(), + done_index: AtomicUsize::new(0), + work_index: AtomicUsize::new(0), + func: Box::new(move |ptr, num, index| { + let ptr = ptr as *mut A; + let slice = unsafe { std::slice::from_raw_parts_mut(ptr, num) }; + func(&mut slice[index]) + }), + }; + let job = Arc::new(job); + for s in &self.senders { + s.send(job.clone()).expect("send should never fail"); + } + job.execute(); + job.wait(self.senders.len() + 1); + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_pool() { + let pool = Pool::new(); + let mut array = [0usize; 100]; + pool.dispatch_mut(&mut array, |val: &mut usize| *val += 1); + let expected = [1usize; 100]; + for i in 0..100 { + assert_eq!(array[i], expected[i]); + } + } +}