Skip to content

Commit

Permalink
Rework sig processing threads and add perf for process/verify
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge committed May 8, 2018
1 parent 6f3ec8d commit a04b569
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 50 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,4 @@ libc = "^0.2.1"
getopts = "^0.2"
isatty = "0.1"
futures = "0.1"
rand = "0.4.2"
158 changes: 108 additions & 50 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use recorder::Signal;
use result::Result;
use serde_json;
use signature::PublicKey;
use std::cmp::max;
use std::collections::VecDeque;
use std::io::sink;
use std::io::{Cursor, Write};
Expand All @@ -30,6 +29,9 @@ use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use streamer;
use transaction::Transaction;
use timing;
use std::time::Instant;
use rand::{thread_rng, Rng};

pub struct AccountantSkel {
acc: Mutex<Accountant>,
Expand Down Expand Up @@ -256,41 +258,64 @@ impl AccountantSkel {
}
}

fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<Vec<SharedPackets>> {
fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<(Vec<SharedPackets>, usize)> {
let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?;
debug!("got msgs");
let mut len = msgs.read().unwrap().packets.len();
let mut batch = vec![msgs];
while let Ok(more) = recvr.try_recv() {
debug!("got more msgs");
trace!("got more msgs");
len += more.read().unwrap().packets.len();
batch.push(more);

if len > 100_000 {
break;
}
}
info!("batch len {}", batch.len());
Ok(batch)
debug!("batch len {}", batch.len());
Ok((batch, len))
}

fn verify_batch(batch: Vec<SharedPackets>) -> Vec<Vec<(SharedPackets, Vec<u8>)>> {
let chunk_size = max(1, (batch.len() + 3) / 4);
let batches: Vec<_> = batch.chunks(chunk_size).map(|x| x.to_vec()).collect();
batches
.into_par_iter()
.map(|batch| {
let r = ecdsa::ed25519_verify(&batch);
batch.into_iter().zip(r).collect()
})
.collect()
fn verify_batch(
batch: Vec<SharedPackets>,
sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> Result<()> {
let r = ecdsa::ed25519_verify(&batch);
let res = batch.into_iter().zip(r).collect();
sendr.lock().unwrap().send(res)?;
// TODO: fix error handling here?
Ok(())
}

fn verifier(
recvr: &streamer::PacketReceiver,
sendr: &Sender<Vec<(SharedPackets, Vec<u8>)>>,
recvr: &Arc<Mutex<streamer::PacketReceiver>>,
sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> Result<()> {
let batch = Self::recv_batch(recvr)?;
let verified_batches = Self::verify_batch(batch);
debug!("verified batches: {}", verified_batches.len());
for xs in verified_batches {
sendr.send(xs)?;
}
let (batch, len) = Self::recv_batch(&recvr.lock().unwrap())?;
let now = Instant::now();
let batch_len = batch.len();
let rand_id = thread_rng().gen_range(0, 100);
info!(
"@{:?} verifier: verifying: {} id: {}",
timing::timestamp(),
batch.len(),
rand_id
);

Self::verify_batch(batch, sendr).unwrap();

let total_time_ms = timing::duration_as_ms(&now.elapsed());
let total_time_s = timing::duration_as_s(&now.elapsed());
info!(
"@{:?} verifier: done. batches: {} total verify time: {:?} id: {} verified: {} v/s {}",
timing::timestamp(),
batch_len,
total_time_ms,
rand_id,
len,
(len as f32 / total_time_s)
);
Ok(())
}

Expand Down Expand Up @@ -335,8 +360,6 @@ impl AccountantSkel {
}
}

debug!("processing verified");

// Let validators know they should not attempt to process additional
// transactions in parallel.
self.historian_input.lock().unwrap().send(Signal::Tick)?;
Expand Down Expand Up @@ -387,16 +410,25 @@ impl AccountantSkel {
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
let timer = Duration::new(1, 0);
let recv_start = Instant::now();
let mms = verified_receiver.recv_timeout(timer)?;
debug!("got some messages: {}", mms.len());
let mut reqs_len = 0;
let mms_len = mms.len();
info!(
"@{:?} process start stalled for: {:?}ms batches: {}",
timing::timestamp(),
timing::duration_as_ms(&recv_start.elapsed()),
mms.len(),
);
let proc_start = Instant::now();
for (msgs, vers) in mms {
let reqs = Self::deserialize_packets(&msgs.read().unwrap());
reqs_len += reqs.len();
let req_vers = reqs.into_iter()
.zip(vers)
.filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver)))
.filter(|x| {
let v = x.0.verify();
trace!("v:{} x:{:?}", v, x);
v
})
.collect();
Expand All @@ -421,7 +453,16 @@ impl AccountantSkel {
}
packet_recycler.recycle(msgs);
}
debug!("done responding");
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
info!(
"@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}",
timing::timestamp(),
mms_len,
total_time_ms,
reqs_len,
(reqs_len as f32) / (total_time_s)
);
Ok(())
}
/// Process verified blobs, already in order
Expand Down Expand Up @@ -486,13 +527,21 @@ impl AccountantSkel {
);
let (verified_sender, verified_receiver) = channel();

let exit_ = exit.clone();
let t_verifier = spawn(move || loop {
let e = Self::verifier(&packet_receiver, &verified_sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
break;
}
});
let mut verify_threads = Vec::new();
let shared_verified_sender = Arc::new(Mutex::new(verified_sender));
let shared_packet_receiver = Arc::new(Mutex::new(packet_receiver));
for _ in 0..4 {
let exit_ = exit.clone();
let recv = shared_packet_receiver.clone();
let sender = shared_verified_sender.clone();
let thread = spawn(move || loop {
let e = Self::verifier(&recv, &sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
break;
}
});
verify_threads.push(thread);
}

let (broadcast_sender, broadcast_receiver) = channel();

Expand Down Expand Up @@ -528,16 +577,18 @@ impl AccountantSkel {
}
}
});
Ok(vec![

let mut threads = vec![
t_receiver,
t_responder,
t_server,
t_verifier,
t_sync,
t_gossip,
t_listen,
t_broadcast,
])
];
threads.extend(verify_threads.into_iter());
Ok(threads)
}

/// This service receives messages from a leader in the network and processes the transactions
Expand Down Expand Up @@ -639,15 +690,21 @@ impl AccountantSkel {
);
let (verified_sender, verified_receiver) = channel();

let exit_ = exit.clone();
let t_verifier = spawn(move || loop {
let e = Self::verifier(&packet_receiver, &verified_sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
trace!("verifier exiting");
break;
}
});

let mut verify_threads = Vec::new();
let shared_verified_sender = Arc::new(Mutex::new(verified_sender));
let shared_packet_receiver = Arc::new(Mutex::new(packet_receiver));
for _ in 0..4 {
let exit_ = exit.clone();
let recv = shared_packet_receiver.clone();
let sender = shared_verified_sender.clone();
let thread = spawn(move || loop {
let e = Self::verifier(&recv, &sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
break;
}
});
verify_threads.push(thread);
}
let t_sync = Self::sync_no_broadcast_service(obj.clone(), exit.clone());

let skel = obj.clone();
Expand All @@ -667,7 +724,7 @@ impl AccountantSkel {
}
});

Ok(vec![
let mut threads = vec![
//replicate threads
t_blob_receiver,
t_retransmit,
Expand All @@ -679,9 +736,10 @@ impl AccountantSkel {
t_packet_receiver,
t_responder,
t_server,
t_verifier,
t_sync,
])
];
threads.extend(verify_threads.into_iter());
Ok(threads)
}
}

Expand Down
1 change: 1 addition & 0 deletions src/accountant_stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ mod tests {
let last_id = acc.get_last_id().wait().unwrap();
let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
.unwrap();
sleep(Duration::from_millis(1000));
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 500);
exit.store(true, Ordering::Relaxed);
for t in threads {
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod result;
pub mod signature;
pub mod streamer;
pub mod transaction;
pub mod timing;
extern crate bincode;
extern crate byteorder;
extern crate chrono;
Expand All @@ -41,3 +42,5 @@ extern crate futures;
#[cfg(test)]
#[macro_use]
extern crate matches;

extern crate rand;
15 changes: 15 additions & 0 deletions src/timing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::Duration;

pub fn duration_as_ms(d: &Duration) -> u64 {
return (d.as_secs() * 1000) + (d.subsec_nanos() as u64 / 1_000_000);
}

pub fn duration_as_s(d: &Duration) -> f32 {
return d.as_secs() as f32 + (d.subsec_nanos() as f32 / 1_000_000_000.0);
}

pub fn timestamp() -> u64 {
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
return duration_as_ms(&now);
}

0 comments on commit a04b569

Please sign in to comment.