Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework sig processing threads and add perf for process/verify #183

Merged
merged 1 commit into from
May 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,4 @@ libc = "^0.2.1"
getopts = "^0.2"
isatty = "0.1"
futures = "0.1"
rand = "0.4.2"
158 changes: 108 additions & 50 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use recorder::Signal;
use result::Result;
use serde_json;
use signature::PublicKey;
use std::cmp::max;
use std::collections::VecDeque;
use std::io::sink;
use std::io::{Cursor, Write};
Expand All @@ -30,6 +29,9 @@ use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use streamer;
use transaction::Transaction;
use timing;
use std::time::Instant;
use rand::{thread_rng, Rng};

pub struct AccountantSkel {
acc: Mutex<Accountant>,
Expand Down Expand Up @@ -256,41 +258,64 @@ impl AccountantSkel {
}
}

fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<Vec<SharedPackets>> {
fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<(Vec<SharedPackets>, usize)> {
let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?;
debug!("got msgs");
let mut len = msgs.read().unwrap().packets.len();
let mut batch = vec![msgs];
while let Ok(more) = recvr.try_recv() {
debug!("got more msgs");
trace!("got more msgs");
len += more.read().unwrap().packets.len();
batch.push(more);

if len > 100_000 {
break;
}
}
info!("batch len {}", batch.len());
Ok(batch)
debug!("batch len {}", batch.len());
Ok((batch, len))
}

fn verify_batch(batch: Vec<SharedPackets>) -> Vec<Vec<(SharedPackets, Vec<u8>)>> {
let chunk_size = max(1, (batch.len() + 3) / 4);
let batches: Vec<_> = batch.chunks(chunk_size).map(|x| x.to_vec()).collect();
batches
.into_par_iter()
.map(|batch| {
let r = ecdsa::ed25519_verify(&batch);
batch.into_iter().zip(r).collect()
})
.collect()
fn verify_batch(
batch: Vec<SharedPackets>,
sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> Result<()> {
let r = ecdsa::ed25519_verify(&batch);
let res = batch.into_iter().zip(r).collect();
sendr.lock().unwrap().send(res)?;
// TODO: fix error handling here?
Ok(())
}

fn verifier(
recvr: &streamer::PacketReceiver,
sendr: &Sender<Vec<(SharedPackets, Vec<u8>)>>,
recvr: &Arc<Mutex<streamer::PacketReceiver>>,
sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> Result<()> {
let batch = Self::recv_batch(recvr)?;
let verified_batches = Self::verify_batch(batch);
debug!("verified batches: {}", verified_batches.len());
for xs in verified_batches {
sendr.send(xs)?;
}
let (batch, len) = Self::recv_batch(&recvr.lock().unwrap())?;
let now = Instant::now();
let batch_len = batch.len();
let rand_id = thread_rng().gen_range(0, 100);
info!(
"@{:?} verifier: verifying: {} id: {}",
timing::timestamp(),
batch.len(),
rand_id
);

Self::verify_batch(batch, sendr).unwrap();

let total_time_ms = timing::duration_as_ms(&now.elapsed());
let total_time_s = timing::duration_as_s(&now.elapsed());
info!(
"@{:?} verifier: done. batches: {} total verify time: {:?} id: {} verified: {} v/s {}",
timing::timestamp(),
batch_len,
total_time_ms,
rand_id,
len,
(len as f32 / total_time_s)
);
Ok(())
}

Expand Down Expand Up @@ -335,8 +360,6 @@ impl AccountantSkel {
}
}

debug!("processing verified");

// Let validators know they should not attempt to process additional
// transactions in parallel.
self.historian_input.lock().unwrap().send(Signal::Tick)?;
Expand Down Expand Up @@ -387,16 +410,25 @@ impl AccountantSkel {
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
let timer = Duration::new(1, 0);
let recv_start = Instant::now();
let mms = verified_receiver.recv_timeout(timer)?;
debug!("got some messages: {}", mms.len());
let mut reqs_len = 0;
let mms_len = mms.len();
info!(
"@{:?} process start stalled for: {:?}ms batches: {}",
timing::timestamp(),
timing::duration_as_ms(&recv_start.elapsed()),
mms.len(),
);
let proc_start = Instant::now();
for (msgs, vers) in mms {
let reqs = Self::deserialize_packets(&msgs.read().unwrap());
reqs_len += reqs.len();
let req_vers = reqs.into_iter()
.zip(vers)
.filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver)))
.filter(|x| {
let v = x.0.verify();
trace!("v:{} x:{:?}", v, x);
v
})
.collect();
Expand All @@ -421,7 +453,16 @@ impl AccountantSkel {
}
packet_recycler.recycle(msgs);
}
debug!("done responding");
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
info!(
"@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}",
timing::timestamp(),
mms_len,
total_time_ms,
reqs_len,
(reqs_len as f32) / (total_time_s)
);
Ok(())
}
/// Process verified blobs, already in order
Expand Down Expand Up @@ -486,13 +527,21 @@ impl AccountantSkel {
);
let (verified_sender, verified_receiver) = channel();

let exit_ = exit.clone();
let t_verifier = spawn(move || loop {
let e = Self::verifier(&packet_receiver, &verified_sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
break;
}
});
let mut verify_threads = Vec::new();
let shared_verified_sender = Arc::new(Mutex::new(verified_sender));
let shared_packet_receiver = Arc::new(Mutex::new(packet_receiver));
for _ in 0..4 {
let exit_ = exit.clone();
let recv = shared_packet_receiver.clone();
let sender = shared_verified_sender.clone();
let thread = spawn(move || loop {
let e = Self::verifier(&recv, &sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
break;
}
});
verify_threads.push(thread);
}

let (broadcast_sender, broadcast_receiver) = channel();

Expand Down Expand Up @@ -528,16 +577,18 @@ impl AccountantSkel {
}
}
});
Ok(vec![

let mut threads = vec![
t_receiver,
t_responder,
t_server,
t_verifier,
t_sync,
t_gossip,
t_listen,
t_broadcast,
])
];
threads.extend(verify_threads.into_iter());
Ok(threads)
}

/// This service receives messages from a leader in the network and processes the transactions
Expand Down Expand Up @@ -639,15 +690,21 @@ impl AccountantSkel {
);
let (verified_sender, verified_receiver) = channel();

let exit_ = exit.clone();
let t_verifier = spawn(move || loop {
let e = Self::verifier(&packet_receiver, &verified_sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
trace!("verifier exiting");
break;
}
});

let mut verify_threads = Vec::new();
let shared_verified_sender = Arc::new(Mutex::new(verified_sender));
let shared_packet_receiver = Arc::new(Mutex::new(packet_receiver));
for _ in 0..4 {
let exit_ = exit.clone();
let recv = shared_packet_receiver.clone();
let sender = shared_verified_sender.clone();
let thread = spawn(move || loop {
let e = Self::verifier(&recv, &sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
break;
}
});
verify_threads.push(thread);
}
let t_sync = Self::sync_no_broadcast_service(obj.clone(), exit.clone());

let skel = obj.clone();
Expand All @@ -667,7 +724,7 @@ impl AccountantSkel {
}
});

Ok(vec![
let mut threads = vec![
//replicate threads
t_blob_receiver,
t_retransmit,
Expand All @@ -679,9 +736,10 @@ impl AccountantSkel {
t_packet_receiver,
t_responder,
t_server,
t_verifier,
t_sync,
])
];
threads.extend(verify_threads.into_iter());
Ok(threads)
}
}

Expand Down
14 changes: 13 additions & 1 deletion src/accountant_stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ mod tests {
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
use std::time::Instant;

// TODO: Figure out why this test sometimes hangs on TravisCI.
#[test]
Expand Down Expand Up @@ -193,7 +194,18 @@ mod tests {
let last_id = acc.get_last_id().wait().unwrap();
let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
.unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 500);
let mut balance;
let now = Instant::now();
loop {
balance = acc.get_balance(&bob_pubkey);
if balance.is_ok() {
break;
}
if now.elapsed().as_secs() > 0 {
break;
}
}
assert_eq!(balance.unwrap(), 500);
exit.store(true, Ordering::Relaxed);
for t in threads {
t.join().unwrap();
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod result;
pub mod signature;
pub mod streamer;
pub mod transaction;
pub mod timing;
extern crate bincode;
extern crate byteorder;
extern crate chrono;
Expand All @@ -41,3 +42,5 @@ extern crate futures;
#[cfg(test)]
#[macro_use]
extern crate matches;

extern crate rand;
15 changes: 15 additions & 0 deletions src/timing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::Duration;

pub fn duration_as_ms(d: &Duration) -> u64 {
return (d.as_secs() * 1000) + (d.subsec_nanos() as u64 / 1_000_000);
}

pub fn duration_as_s(d: &Duration) -> f32 {
return d.as_secs() as f32 + (d.subsec_nanos() as f32 / 1_000_000_000.0);
}

pub fn timestamp() -> u64 {
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
return duration_as_ms(&now);
}