Skip to content

Commit

Permalink
rayoff library
Browse files Browse the repository at this point in the history
  • Loading branch information
aeyakovenko committed Nov 23, 2019
1 parent 221984c commit 23e5a53
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 0 deletions.
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ members = [
"programs/vote_program",
"archiver",
"runtime",
"rayoff",
"sdk",
"sdk-c",
"upload-perf",
Expand Down
1 change: 1 addition & 0 deletions perf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.21.0" }
solana-budget-api = { path = "../programs/budget_api", version = "0.21.0" }
solana-logger = { path = "../logger", version = "0.21.0" }
solana-metrics = { path = "../metrics", version = "0.21.0" }
rayoff = { path = "../rayoff", version = "0.21.0" }

[lib]
name = "solana_perf"
Expand Down
13 changes: 13 additions & 0 deletions perf/benches/sigverify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ fn bench_sigverify(bencher: &mut Bencher) {
})
}

#[bench]
fn bench_sigverify_rayoff(bencher: &mut Bencher) {
let tx = test_tx();

// generate packet vector
let batches = to_packets(&vec![tx; 128]);

// verify packets
bencher.iter(|| {
let _ans = sigverify::ed25519_verify_rayoff(&batches);
})
}

#[bench]
fn bench_get_offsets(bencher: &mut Bencher) {
let tx = test_tx();
Expand Down
23 changes: 23 additions & 0 deletions perf/src/sigverify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::packet::{Packet, Packets};
use crate::perf_libs;
use crate::recycler::Recycler;
use bincode::serialized_size;
use rayoff::rayoff::Pool;
use rayon::ThreadPool;
use solana_metrics::inc_new_counter_debug;
use solana_rayon_threadlimit::get_thread_count;
Expand Down Expand Up @@ -243,6 +244,28 @@ pub fn generate_offsets(
))
}

thread_local!(static RAYOFF_POOL: RefCell<Pool> = RefCell::new(Pool::new()));

pub fn ed25519_verify_rayoff(batches: &[Packets]) -> Vec<Vec<u8>> {
use rayon::prelude::*;
let count = batch_size(batches);
debug!("CPU ECDSA for {}", batch_size(batches));
let rv = batches
.iter()
.map(|batch| {
let mut items: Vec<(_, u8)> = batch.packets.iter().map(|b| (b, 0)).collect();
RAYOFF_POOL.with(|pool| {
pool.borrow()
.dispatch_mut(&mut items, |(packet, out)| *out = verify_packet(packet));
let rv: Vec<u8> = items.into_iter().map(|x| x.1).collect();
rv
})
})
.collect();
inc_new_counter_debug!("ed25519_verify_cpu", count);
rv
}

pub fn ed25519_verify_cpu(batches: &[Packets]) -> Vec<Vec<u8>> {
use rayon::prelude::*;
let count = batch_size(batches);
Expand Down
17 changes: 17 additions & 0 deletions rayoff/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "rayoff"
version = "0.21.0"
authors = ["Anatoly Yakovenko <anatoly@solana.com>"]

[dependencies]
sys-info = "0.5.8"

[dev-dependencies]
rayon = "1.1"

[lib]
crate-type = ["lib"]
name = "rayoff"

[[bench]]
name = "rayoff"
16 changes: 16 additions & 0 deletions rayoff/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
A Rust library that implements a fast threadpool.

Raw bench performance:

```
test bench_baseline ... bench: 90 ns/iter (+/- 5)
test bench_pool ... bench: 10,489 ns/iter (+/- 2,053)
test bench_rayon ... bench: 11,817 ns/iter (+/- 636)
```

sigverify performance:
```
running 3 tests
test bench_sigverify ... bench: 3,973,128 ns/iter (+/- 306,527)
test bench_sigverify_rayoff ... bench: 3,697,677 ns/iter (+/- 738,464)
```
47 changes: 47 additions & 0 deletions rayoff/benches/rayoff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#![feature(test)]
extern crate rayoff;
extern crate rayon;
extern crate test;

use rayoff::rayoff::Pool;
use rayon::prelude::*;
use test::Bencher;

#[bench]
fn bench_pool(bencher: &mut Bencher) {
let pool = Pool::new();
bencher.iter(|| {
let mut array = [0usize; 100];
pool.dispatch_mut(&mut array, |val: &mut usize| *val += 1);
let expected = [1usize; 100];
for i in 0..100 {
assert_eq!(array[i], expected[i]);
}
})
}

#[bench]
fn bench_baseline(bencher: &mut Bencher) {
bencher.iter(|| {
let mut array = [0usize; 100];
for i in array.iter_mut() {
*i += 1;
}
let expected = [1usize; 100];
for i in 0..100 {
assert_eq!(array[i], expected[i]);
}
})
}

#[bench]
fn bench_rayon(bencher: &mut Bencher) {
bencher.iter(|| {
let mut array = [0usize; 100];
array.par_iter_mut().for_each(|p| *p += 1);
let expected = [1usize; 100];
for i in 0..100 {
assert_eq!(array[i], expected[i]);
}
})
}
1 change: 1 addition & 0 deletions rayoff/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod rayoff;
102 changes: 102 additions & 0 deletions rayoff/src/rayoff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
extern crate sys_info;

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc};
use std::thread::{JoinHandle, yield_now, spawn};

struct Job {
func: Box<dyn Fn(*mut u64, usize, usize)>,
elems: *mut u64,
num: usize,
work_index: AtomicUsize,
done_index: AtomicUsize,
}
unsafe impl Send for Job {}
unsafe impl Sync for Job {}

pub struct Pool {
senders: Vec<Sender<Arc<Job>>>,
threads: Vec<JoinHandle<()>>,
}

impl Job {
fn execute(&self) {
loop {
let index = self.work_index.fetch_add(1, Ordering::Relaxed);
if index >= self.num {
self.done_index.fetch_add(1, Ordering::Relaxed);
break;
}
(self.func)(self.elems, self.num, index);
}
}
fn wait(&self, num: usize) {
loop {
let guard = self.done_index.load(Ordering::Relaxed);
if guard >= num {
break;
}
yield_now();
}
}
}

impl Pool {
pub fn new() -> Self {
let num_threads = sys_info::cpu_num().unwrap_or(16) - 1;
let mut pool = Self {
senders: vec![],
threads: vec![],
};
(0..num_threads).for_each(|_| {
let (sender, recvr): (Sender<Arc<Job>>, Receiver<Arc<Job>>) = channel();
let t = spawn(move || {
for job in recvr.iter() {
job.execute()
}
});
pool.senders.push(sender);
pool.threads.push(t);
});
pool
}

pub fn dispatch_mut<F, A>(&self, elems: &mut [A], func: F)
where
F: Fn(&mut A) + 'static,
{
let job = Job {
elems: elems.as_mut_ptr() as *mut u64,
num: elems.len(),
done_index: AtomicUsize::new(0),
work_index: AtomicUsize::new(0),
func: Box::new(move |ptr, num, index| {
let ptr = ptr as *mut A;
let slice = unsafe { std::slice::from_raw_parts_mut(ptr, num) };
func(&mut slice[index])
}),
};
let job = Arc::new(job);
for s in &self.senders {
s.send(job.clone()).expect("send should never fail");
}
job.execute();
job.wait(self.senders.len() + 1);
}
}

#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pool() {
let pool = Pool::new();
let mut array = [0usize; 100];
pool.dispatch_mut(&mut array, |val: &mut usize| *val += 1);
let expected = [1usize; 100];
for i in 0..100 {
assert_eq!(array[i], expected[i]);
}
}
}

0 comments on commit 23e5a53

Please sign in to comment.