Skip to content

Commit

Permalink
Merge pull request #176 from aeyakovenko/multinode
Browse files Browse the repository at this point in the history
Multinode
  • Loading branch information
garious committed May 7, 2018
2 parents 78a73c3 + 72b40e5 commit f2697c1
Show file tree
Hide file tree
Showing 8 changed files with 349 additions and 74 deletions.
143 changes: 112 additions & 31 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use serde_json;
use signature::PublicKey;
use std::cmp::max;
use std::collections::VecDeque;
use std::io::sink;
use std::io::{Cursor, Write};
use std::mem::size_of;
use std::net::{SocketAddr, UdpSocket};
Expand Down Expand Up @@ -100,6 +101,8 @@ impl AccountantSkel {
num_events: entry.events.len() as u64,
};
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
trace!("sending {} to {}", data.len(), addr);
//TODO dont do IO here, this needs to be on a separate channel
let res = socket.send_to(&data, addr);
if res.is_err() {
eprintln!("couldn't send response: {:?}", res);
Expand Down Expand Up @@ -182,16 +185,11 @@ impl AccountantSkel {
broadcast: &streamer::BlobSender,
blob_recycler: &packet::BlobRecycler,
writer: &Arc<Mutex<W>>,
exit: Arc<AtomicBool>,
) -> Result<()> {
let mut q = VecDeque::new();
while let Ok(list) = Self::receive_all(&obj, writer) {
trace!("New blobs? {}", list.len());
Self::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
if exit.load(Ordering::Relaxed) {
break;
}
}
let list = Self::receive_all(&obj, writer)?;
trace!("New blobs? {}", list.len());
Self::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
if !q.is_empty() {
broadcast.send(q)?;
}
Expand All @@ -206,14 +204,26 @@ impl AccountantSkel {
writer: Arc<Mutex<W>>,
) -> JoinHandle<()> {
spawn(move || loop {
let e = Self::run_sync(
obj.clone(),
&broadcast,
&blob_recycler,
&writer,
exit.clone(),
);
if e.is_err() && exit.load(Ordering::Relaxed) {
let _ = Self::run_sync(obj.clone(), &broadcast, &blob_recycler, &writer);
if exit.load(Ordering::Relaxed) {
info!("sync_service exiting");
break;
}
})
}

/// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out
fn run_sync_no_broadcast(obj: SharedSkel) -> Result<()> {
Self::receive_all(&obj, &Arc::new(Mutex::new(sink())))?;
Ok(())
}

pub fn sync_no_broadcast_service(obj: SharedSkel, exit: Arc<AtomicBool>) -> JoinHandle<()> {
spawn(move || loop {
let _ = Self::run_sync_no_broadcast(obj.clone());
if exit.load(Ordering::Relaxed) {
info!("sync_no_broadcast_service exiting");
break;
}
})
Expand All @@ -228,7 +238,9 @@ impl AccountantSkel {
match msg {
Request::GetBalance { key } => {
let val = self.acc.lock().unwrap().get_balance(&key);
Some((Response::Balance { key, val }, rsp_addr))
let rsp = (Response::Balance { key, val }, rsp_addr);
info!("Response::Balance {:?}", rsp);
Some(rsp)
}
Request::Transaction(_) => unreachable!(),
Request::Subscribe { subscriptions } => {
Expand All @@ -247,10 +259,10 @@ impl AccountantSkel {
fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<Vec<SharedPackets>> {
let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?;
trace!("got msgs");
debug!("got msgs");
let mut batch = vec![msgs];
while let Ok(more) = recvr.try_recv() {
trace!("got more msgs");
debug!("got more msgs");
batch.push(more);
}
info!("batch len {}", batch.len());
Expand All @@ -275,6 +287,7 @@ impl AccountantSkel {
) -> Result<()> {
let batch = Self::recv_batch(recvr)?;
let verified_batches = Self::verify_batch(batch);
debug!("verified batches: {}", verified_batches.len());
for xs in verified_batches {
sendr.send(xs)?;
}
Expand Down Expand Up @@ -315,8 +328,9 @@ impl AccountantSkel {
&self,
req_vers: Vec<(Request, SocketAddr, u8)>,
) -> Result<Vec<(Response, SocketAddr)>> {
trace!("partitioning");
debug!("partitioning");
let (trs, reqs) = Self::partition_requests(req_vers);
debug!("trs: {} reqs: {}", trs.len(), reqs.len());

// Process the transactions in parallel and then log the successful ones.
for result in self.acc.lock().unwrap().process_verified_transactions(trs) {
Expand All @@ -328,15 +342,21 @@ impl AccountantSkel {
}
}

debug!("processing verified");

// Let validators know they should not attempt to process additional
// transactions in parallel.
self.historian_input.lock().unwrap().send(Signal::Tick)?;

debug!("after historian_input");

// Process the remaining requests serially.
let rsps = reqs.into_iter()
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
.collect();

debug!("returning rsps");

Ok(rsps)
}

Expand Down Expand Up @@ -377,7 +397,7 @@ impl AccountantSkel {
) -> Result<()> {
let timer = Duration::new(1, 0);
let mms = verified_receiver.recv_timeout(timer)?;
trace!("got some messages: {}", mms.len());
debug!("got some messages: {}", mms.len());
for (msgs, vers) in mms {
let reqs = Self::deserialize_packets(&msgs.read().unwrap());
let req_vers = reqs.into_iter()
Expand All @@ -389,18 +409,18 @@ impl AccountantSkel {
v
})
.collect();
trace!("process_packets");
debug!("process_packets");
let rsps = obj.process_packets(req_vers)?;
trace!("done process_packets");
debug!("done process_packets");
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
trace!("sending blobs: {}", blobs.len());
if !blobs.is_empty() {
info!("process: sending blobs: {}", blobs.len());
//don't wake up the other side if there is nothing
responder_sender.send(blobs)?;
}
packet_recycler.recycle(msgs);
}
trace!("done responding");
debug!("done responding");
Ok(())
}
/// Process verified blobs, already in order
Expand All @@ -412,6 +432,7 @@ impl AccountantSkel {
) -> Result<()> {
let timer = Duration::new(1, 0);
let blobs = verified_receiver.recv_timeout(timer)?;
trace!("replicating blobs {}", blobs.len());
for msgs in &blobs {
let blob = msgs.read().unwrap();
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
Expand Down Expand Up @@ -541,10 +562,12 @@ impl AccountantSkel {
obj: &SharedSkel,
me: ReplicatedData,
gossip: UdpSocket,
serve: UdpSocket,
replicate: UdpSocket,
leader: ReplicatedData,
exit: Arc<AtomicBool>,
) -> Result<Vec<JoinHandle<()>>> {
//replicate pipeline
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
crdt.write().unwrap().set_leader(leader.id);
crdt.write().unwrap().insert(leader);
Expand Down Expand Up @@ -580,27 +603,84 @@ impl AccountantSkel {
//then sent to the window, which does the erasure coding reconstruction
let t_window = streamer::window(
exit.clone(),
crdt,
crdt.clone(),
blob_recycler.clone(),
blob_receiver,
window_sender,
retransmit_sender,
);

let skel = obj.clone();
let t_server = spawn(move || loop {
let s_exit = exit.clone();
let t_replicator = spawn(move || loop {
let e = Self::replicate_state(&skel, &window_receiver, &blob_recycler);
if e.is_err() && exit.load(Ordering::Relaxed) {
if e.is_err() && s_exit.load(Ordering::Relaxed) {
break;
}
});

//serve pipeline
// make sure we are on the same interface
let mut local = serve.local_addr()?;
local.set_port(0);
let respond_socket = UdpSocket::bind(local.clone())?;

let packet_recycler = packet::PacketRecycler::default();
let blob_recycler = packet::BlobRecycler::default();
let (packet_sender, packet_receiver) = channel();
let t_packet_receiver =
streamer::receiver(serve, exit.clone(), packet_recycler.clone(), packet_sender)?;
let (responder_sender, responder_receiver) = channel();
let t_responder = streamer::responder(
respond_socket,
exit.clone(),
blob_recycler.clone(),
responder_receiver,
);
let (verified_sender, verified_receiver) = channel();

let exit_ = exit.clone();
let t_verifier = spawn(move || loop {
let e = Self::verifier(&packet_receiver, &verified_sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
trace!("verifier exiting");
break;
}
});

let t_sync = Self::sync_no_broadcast_service(obj.clone(), exit.clone());

let skel = obj.clone();
let s_exit = exit.clone();
let t_server = spawn(move || loop {
let e = Self::process(
&mut skel.clone(),
&verified_receiver,
&responder_sender,
&packet_recycler,
&blob_recycler,
);
if e.is_err() {
if s_exit.load(Ordering::Relaxed) {
break;
}
}
});

Ok(vec![
//replicate threads
t_blob_receiver,
t_retransmit,
t_window,
t_server,
t_replicator,
t_gossip,
t_listen,
//serve threads
t_packet_receiver,
t_responder,
t_server,
t_verifier,
t_sync,
])
}
}
Expand Down Expand Up @@ -769,7 +849,7 @@ mod tests {
tr2.data.plan = Plan::new_payment(502, bob_pubkey);
let _sig = acc_stub.transfer_signed(tr2).unwrap();

assert_eq!(acc_stub.get_balance(&bob_pubkey).wait().unwrap(), 500);
assert_eq!(acc_stub.get_balance(&bob_pubkey).unwrap(), 500);
trace!("exiting");
exit.store(true, Ordering::Relaxed);
trace!("joining threads");
Expand Down Expand Up @@ -797,7 +877,7 @@ mod tests {
fn test_replicate() {
logger::setup();
let (leader_data, leader_gossip, _, leader_serve) = test_node();
let (target1_data, target1_gossip, target1_replicate, _) = test_node();
let (target1_data, target1_gossip, target1_replicate, target1_serve) = test_node();
let (target2_data, target2_gossip, target2_replicate, _) = test_node();
let exit = Arc::new(AtomicBool::new(false));

Expand Down Expand Up @@ -851,6 +931,7 @@ mod tests {
&acc,
target1_data,
target1_gossip,
target1_serve,
target1_replicate,
leader_data,
exit.clone(),
Expand Down
Loading

0 comments on commit f2697c1

Please sign in to comment.