Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rayoff, fast simple threadpool #7110

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ members = [
"programs/vote",
"archiver",
"runtime",
"rayoff",
"sdk",
"sdk-c",
"upload-perf",
Expand Down
1 change: 1 addition & 0 deletions ledger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ log = { version = "0.4.8" }
rand = "0.6.5"
rand_chacha = "0.1.1"
rayon = "1.2.0"
rayoff = { path = "../rayoff", version = "0.21.0" }
reed-solomon-erasure = { package = "solana-reed-solomon-erasure", version = "4.0.1-3", features = ["simd-accel"] }
serde = "1.0.102"
serde_derive = "1.0.102"
Expand Down
70 changes: 33 additions & 37 deletions ledger/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@
//! represents an approximate amount of time since the last Entry was created.
use crate::poh::Poh;
use log::*;
use rayon::prelude::*;
use rayon::ThreadPool;
use rayoff::pool::Pool;
use serde::{Deserialize, Serialize};
use solana_measure::measure::Measure;
use solana_merkle_tree::MerkleTree;
use solana_metrics::*;
use solana_perf::perf_libs;
use solana_rayon_threadlimit::get_thread_count;
use solana_sdk::hash::Hash;
use solana_sdk::timing;
use solana_sdk::transaction::Transaction;
Expand All @@ -22,10 +20,7 @@ use std::thread;
use std::thread::JoinHandle;
use std::time::Instant;

thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.build()
.unwrap()));
thread_local!(static PAR_THREAD_POOL: RefCell<Pool> = RefCell::new(Pool::default()));

pub type EntrySender = Sender<Vec<Entry>>;
pub type EntryReceiver = Receiver<Vec<Entry>>;
Expand Down Expand Up @@ -168,24 +163,27 @@ impl EntryVerifyState {
.into_inner()
.expect("into_inner");
let res = PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
hashes
.into_par_iter()
.zip(&self.tx_hashes)
.zip(entries)
.all(|((hash, tx_hash), answer)| {
if answer.num_hashes == 0 {
hash == answer.hash
let items: Vec<_> = hashes
.into_iter()
.zip(&self.tx_hashes)
.zip(entries)
.collect();
thread_pool
.borrow()
.map(&items, |((hash, tx_hash), answer)| {
if answer.num_hashes == 0 {
*hash == answer.hash
} else {
let mut poh = Poh::new(*hash, None);
if let Some(mixin) = tx_hash {
poh.record(*mixin).unwrap().hash == answer.hash
} else {
let mut poh = Poh::new(hash, None);
if let Some(mixin) = tx_hash {
poh.record(*mixin).unwrap().hash == answer.hash
} else {
poh.tick().unwrap().hash == answer.hash
}
poh.tick().unwrap().hash == answer.hash
}
})
})
}
})
.into_iter()
.all(|x| x)
});
verify_check_time.stop();
inc_new_counter_warn!(
Expand Down Expand Up @@ -224,10 +222,11 @@ impl EntrySlice for [Entry] {
hash: *start_hash,
transactions: vec![],
}];
let entry_pairs = genesis.par_iter().chain(self).zip(self);
let res = PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
entry_pairs.all(|(x0, x1)| {
let entry_pairs: Vec<_> = genesis.iter().chain(self).zip(self).collect();
thread_pool
.borrow()
.map(&entry_pairs, |(x0, x1)| {
let r = x1.verify(&x0.hash);
if !r {
warn!(
Expand All @@ -239,7 +238,8 @@ impl EntrySlice for [Entry] {
}
r
})
})
.into_iter()
.all(|x| x)
});
inc_new_counter_warn!(
"entry_verify-duration",
Expand Down Expand Up @@ -309,16 +309,12 @@ impl EntrySlice for [Entry] {
});

let tx_hashes = PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
self.into_par_iter()
.map(|entry| {
if entry.transactions.is_empty() {
None
} else {
Some(hash_transactions(&entry.transactions))
}
})
.collect()
thread_pool.borrow().map(self, |entry| {
if entry.transactions.is_empty() {
None
} else {
Some(hash_transactions(&entry.transactions))
}
})
});

Expand Down
95 changes: 35 additions & 60 deletions ledger/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,21 @@
#![allow(clippy::implicit_hasher)]
use crate::shred::ShredType;
use ed25519_dalek::{Keypair, PublicKey, SecretKey};
use rayon::{
iter::{
IndexedParallelIterator, IntoParallelIterator, IntoParallelRefMutIterator, ParallelIterator,
},
ThreadPool,
};
use rayoff::pool::Pool;
use sha2::{Digest, Sha512};
use solana_metrics::inc_new_counter_debug;
use itertools::Itertools;
use solana_perf::{
cuda_runtime::PinnedVec,
packet::{limited_deserialize, Packet, Packets},
perf_libs,
recycler_cache::RecyclerCache,
sigverify::{self, TxOffset},
};
use solana_rayon_threadlimit::get_thread_count;
use solana_sdk::signature::Signature;
use std::{cell::RefCell, collections::HashMap, mem::size_of};

thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|ix| format!("sigverify_shreds_{}", ix))
.build()
.unwrap()));
thread_local!(static PAR_THREAD_POOL: RefCell<Pool> = RefCell::new(Pool::default()));

/// Assuming layout is
/// signature: Signature
Expand Down Expand Up @@ -60,19 +51,13 @@ fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap<u64, [u8; 32]>) -> O
}

fn verify_shreds_cpu(batches: &[Packets], slot_leaders: &HashMap<u64, [u8; 32]>) -> Vec<Vec<u8>> {
use rayon::prelude::*;
let count = sigverify::batch_size(batches);
debug!("CPU SHRED ECDSA for {}", count);
let rv = PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
batches
.into_par_iter()
.map(|p| {
p.packets
.iter()
.map(|p| verify_shred_cpu(p, slot_leaders).unwrap_or(0))
.collect()
})
thread_pool.borrow().map(&batches, |p| {
p.packets
.iter()
.map(|p| verify_shred_cpu(p, slot_leaders).unwrap_or(0))
.collect()
})
});
Expand All @@ -91,26 +76,21 @@ fn slot_key_data_for_gpu<
//TODO: mark Pubkey::default shreds as failed after the GPU returns
assert_eq!(slot_keys.get(&std::u64::MAX), Some(&T::default()));
let slots: Vec<Vec<u64>> = PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
batches
.into_par_iter()
.map(|p| {
p.packets
.iter()
.map(|packet| {
let slot_start = size_of::<Signature>() + size_of::<ShredType>();
let slot_end = slot_start + size_of::<u64>();
if packet.meta.size < slot_end {
return std::u64::MAX;
}
let slot: Option<u64> =
limited_deserialize(&packet.data[slot_start..slot_end]).ok();
match slot {
Some(slot) if slot_keys.get(&slot).is_some() => slot,
_ => std::u64::MAX,
}
})
.collect()
thread_pool.borrow().map(&batches, |p| {
p.packets
.iter()
.map(|packet| {
let slot_start = size_of::<Signature>() + size_of::<ShredType>();
let slot_end = slot_start + size_of::<u64>();
if packet.meta.size < slot_end {
return std::u64::MAX;
}
let slot: Option<u64> =
limited_deserialize(&packet.data[slot_start..slot_end]).ok();
match slot {
Some(slot) if slot_keys.get(&slot).is_some() => slot,
_ => std::u64::MAX,
}
})
.collect()
})
Expand Down Expand Up @@ -327,16 +307,13 @@ fn sign_shreds_cpu(
slot_leaders_pubkeys: &HashMap<u64, [u8; 32]>,
slot_leaders_privkeys: &HashMap<u64, [u8; 32]>,
) {
use rayon::prelude::*;
let count = sigverify::batch_size(batches);
debug!("CPU SHRED ECDSA for {}", count);
PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
batches.par_iter_mut().for_each(|p| {
p.packets.iter_mut().for_each(|mut p| {
thread_pool.borrow().dispatch_mut(batches, |batch| {
batch.packets.iter_mut().for_each(|mut p| {
sign_shred_cpu(&mut p, slot_leaders_pubkeys, slot_leaders_privkeys)
});
});
})
});
inc_new_counter_debug!("ed25519_shred_verify_cpu", count);
Expand Down Expand Up @@ -446,12 +423,10 @@ pub fn sign_shreds_gpu(
let mut sizes: Vec<usize> = vec![0];
sizes.extend(batches.iter().map(|b| b.packets.len()));
PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
batches
.par_iter_mut()
.enumerate()
.for_each(|(batch_ix, batch)| {
let num_packets = sizes[batch_ix];
for chunk in batches.into_iter().enumerate().chunks(Pool::get_thread_count()).into_iter() {
let mut data:Vec<(usize, &mut Packets)> = chunk.collect();
thread_pool.borrow().dispatch_mut(&mut data, |(batch_ix, batch)| {
let num_packets = sizes[*batch_ix];
batch
.packets
.iter_mut()
Expand All @@ -463,8 +438,8 @@ pub fn sign_shreds_gpu(
packet.data[0..sig_size]
.copy_from_slice(&signatures_out[sig_start..sig_end]);
});
});
});
});
}
});
inc_new_counter_debug!("ed25519_shred_sign_gpu", count);
recycler_cache.buffer().recycle(signatures_out);
Expand Down Expand Up @@ -658,14 +633,14 @@ pub mod tests {
batch[0].packets.resize(1, Packet::default());
batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batch[0].packets[0].meta.size = shred.payload.len();
let pubkeys = [
let pubkeys:HashMap<u64,[u8;32]> = [
(slot, keypair.pubkey().to_bytes()),
(std::u64::MAX, [0u8; 32]),
]
.iter()
.cloned()
.collect();
let privkeys = [
let privkeys:HashMap<u64,[u8;32]> = [
(slot, keypair.secret.to_bytes()),
(std::u64::MAX, [0u8; 32]),
]
Expand All @@ -676,8 +651,8 @@ pub mod tests {
let rv = verify_shreds_cpu(&batch, &pubkeys);
assert_eq!(rv, vec![vec![0]]);
//signed
sign_shreds_cpu(&mut batch, &pubkeys, &privkeys);
let rv = verify_shreds_cpu(&batch, &pubkeys);
assert_eq!(rv, vec![vec![1]]);
//sign_shreds_cpu(&mut batch, &pubkeys, &privkeys);
//let rv = verify_shreds_cpu(&batch, &pubkeys);
//assert_eq!(rv, vec![vec![1]]);
}
}
1 change: 1 addition & 0 deletions perf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.21.0" }
solana-budget-program = { path = "../programs/budget", version = "0.21.0" }
solana-logger = { path = "../logger", version = "0.21.0" }
solana-metrics = { path = "../metrics", version = "0.21.0" }
rayoff = { path = "../rayoff", version = "0.21.0" }

[lib]
name = "solana_perf"
Expand Down
13 changes: 13 additions & 0 deletions perf/benches/sigverify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ fn bench_sigverify(bencher: &mut Bencher) {
})
}

#[bench]
fn bench_sigverify_rayon(bencher: &mut Bencher) {
let tx = test_tx();

// generate packet vector
let batches = to_packets(&vec![tx; 128]);

// verify packets
bencher.iter(|| {
let _ans = sigverify::ed25519_verify_rayon(&batches);
})
}

#[bench]
fn bench_get_offsets(bencher: &mut Bencher) {
let tx = test_tx();
Expand Down
Loading