From 04bd80048d1f88799273d355bab7d69b6fc1acf9 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 17 Apr 2018 19:26:19 -0700 Subject: [PATCH 1/5] state replication wip update docs docs wip update state replication update docs docs wip update --- src/accountant_skel.rs | 219 ++++++++++++++++++++++++++++++++++++++++- src/result.rs | 7 ++ src/streamer.rs | 36 +++---- src/subscribers.rs | 9 +- 4 files changed, 246 insertions(+), 25 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 41d8f90ede1292..efc1b418fbfdca 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -22,11 +22,15 @@ use std::io::Write; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; use transaction::Transaction; +use subscribers::Subscribers; + +use subscribers; +use std::mem::size_of; pub struct AccountantSkel { acc: Accountant, @@ -245,8 +249,30 @@ impl AccountantSkel { } Ok(()) } + /// Process verified blobs, already in order + /// Respond with a signed hash of the state + fn replicate_state( + obj: &Arc>>, + verified_receiver: &BlobReceiver, + blob_sender: &streamer::BlobSender, + blob_recycler: &packet::BlobRecycler, + ) -> Result<()> { + let timer = Duration::new(1, 0); + let blobs = verified_receiver.recv_timeout(timer)?; + for msgs in blobs { + let entries:Vec = b.read().unwrap().data.deserialize()?; + for e in entries { + obj.lock().unwrap().acc.process_verified_events(e.events)?; + } + //TODO respond back to leader with hash of the state + } + blob_recycler.recycle(msgs); + Ok(()) + } + /// Create a UDP microservice that forwards messages the given AccountantSkel. + /// This service is the network leader /// Set `exit` to shutdown its threads. pub fn serve( obj: &Arc>>, @@ -279,7 +305,7 @@ impl AccountantSkel { let skel = obj.clone(); let t_server = spawn(move || loop { - let e = AccountantSkel::process( + let e = Self::process( &skel, &verified_receiver, &blob_sender, @@ -292,6 +318,139 @@ impl AccountantSkel { }); Ok(vec![t_receiver, t_responder, t_server, t_verifier]) } + + /// This service receives messages from a leader in the network and processes the transactions + /// on the accountant state. + /// # Arguments + /// * `obj` - The accountant state. + /// * `rsubs` - The subscribers. + /// * `exit` - The exit signal. + /// # Remarks + /// The pipeline is constructed as follows: + /// 1. receive blobs from the network, these are out of order + /// 2. verify blobs, PoH, signatures (TODO) + /// 3. reconstruct contiguous window + /// a. order the blobs + /// b. use erasure coding to reconstruct missing blobs + /// c. ask the network for missing blobs, if erasure coding is insufficient + /// d. make sure that the blobs PoH sequences connect (TODO) + /// 4. process the transaction state machine + /// 5. respond with the hash of the state back to the leader + pub fn replicate( + obj: &Arc>>, + rsubs: Subscribers, + exit: Arc, + ) -> Result>> { + let read = UdpSocket::bind(rsubs.me.addr)?; + // make sure we are on the same interface + let mut local = read.local_addr()?; + local.set_port(0); + let write = UdpSocket::bind(local)?; + + let blob_recycler = packet::BlobRecycler::default(); + let (blob_sender, blob_receiver) = channel(); + let t_blob_receiver = + streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?; + let (window_sender, window_receiver) = channel(); + let (retransmit_sender, retransmit_receiver) = channel(); + + let subs = Arc::new(RwLock::new(rsubs)); + let t_retransmit = streamer::retransmitter( + write, + exit.clone(), + subs, + blob_recycler.clone(), + retransmit_receiver, + ); + //TODO + //the packets comming out of blob_receiver need to be sent to the GPU and verified + //then sent to the window, which does the erasure coding reconstruction + let t_window = streamer::window( + exit.clone(), + subs, + blob_recycler.clone(), + blob_receiver, + window_sender, + retransmit_sender, + ); + + let skel = obj.clone(); + let t_server = spawn(move || loop { + let e = Self::replicate_state(&skel, &window_receiver, &blob_sender, &blob_recycler); + if e.is_err() && exit.load(Ordering::Relaxed) { + break; + } + }); + Ok(vec![t_blob_receiver, t_retransmit, t_window, t_server]) + } + + /// This service receives messages from a leader in the network and processes the transactions + /// on the accountant state. + /// # Arguments + /// * `obj` - The accountant state. + /// * `rsubs` - The subscribers. + /// * `exit` - The exit signal. + /// # Remarks + /// The pipeline is constructed as follows: + /// 1. receive blobs from the network, these are out of order + /// 2. verify blobs, PoH, signatures (TODO) + /// 3. reconstruct contiguous window + /// a. order the blobs + /// b. use erasure coding to reconstruct missing blobs + /// c. ask the network for missing blobs, if erasure coding is insufficient + /// d. make sure that the blobs PoH sequences connect (TODO) + /// 4. process the transaction state machine + /// 5. respond with the hash of the state back to the leader + pub fn replicate( + obj: &Arc>>, + rsubs: subscribers::Subscribers, + exit: Arc, + ) -> Result>> { + let read = UdpSocket::bind(rsubs.me.addr)?; + // make sure we are on the same interface + let mut local = read.local_addr()?; + local.set_port(0); + let write = UdpSocket::bind(local)?; + + let blob_recycler = packet::BlobRecycler::default(); + let (blob_sender, blob_receiver) = channel(); + let t_blob_receiver = + streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender.clone())?; + let (window_sender, window_receiver) = channel(); + let (retransmit_sender, retransmit_receiver) = channel(); + + let subs = Arc::new(RwLock::new(rsubs)); + let t_retransmit = streamer::retransmitter( + write, + exit.clone(), + subs.clone(), + blob_recycler.clone(), + retransmit_receiver, + ); + //TODO + //the packets comming out of blob_receiver need to be sent to the GPU and verified + //then sent to the window, which does the erasure coding reconstruction + let t_window = streamer::window( + exit.clone(), + subs, + blob_recycler.clone(), + blob_receiver, + window_sender, + retransmit_sender, + ); + + let skel = obj.clone(); + let t_server = spawn(move || loop { + let e = Self::replicate_state(&skel, &window_receiver, + &blob_sender, &blob_recycler); + if e.is_err() && exit.load(Ordering::Relaxed) { + break; + } + }); + Ok(vec![t_blob_receiver, t_retransmit, t_window, t_server]) + } +======= +>>>>>>> d620ce5... update } #[cfg(test)] @@ -319,7 +478,7 @@ mod tests { use accountant_skel::{to_packets, Request}; use bincode::serialize; use ecdsa; - use packet::{PacketRecycler, NUM_PACKETS}; + use packet::{BlobRecycler, PacketRecycler, NUM_PACKETS}; use transaction::{memfind, test_tx}; use accountant::Accountant; @@ -339,6 +498,12 @@ mod tests { use std::time::Duration; use transaction::Transaction; + use subscribers::{Node, Subscribers}; + use streamer; + use std::sync::mpsc::channel; + use std::collections::VecDeque; + use packet::{PACKET_DATA_SIZE}; + #[test] fn test_layout() { let tr = test_tx(); @@ -443,6 +608,54 @@ mod tests { exit.store(true, Ordering::Relaxed); } + #[test] + fn test_replicate() { + let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let addr = read.local_addr().unwrap(); + let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let exit = Arc::new(AtomicBool::new(false)); + + let node_me = Node::default(); + let node_leader = Node::new([0; 8], 0, send.local_addr().unwrap()); + let subs = Subscribers::new(node_me, node_leader, &[]); + + let recv_recycler = PacketRecycler::default(); + let resp_recycler = BlobRecycler::default(); + let (s_reader, r_reader) = channel(); + let t_receiver = streamer::receiver(read, exit.clone(), recv_recycler.clone(), s_reader).unwrap(); + let (s_responder, r_responder) = channel(); + let t_responder = streamer::responder(send, exit.clone(), resp_recycler.clone(), r_responder); + + let alice = Mint::new(10_000); + let acc = Accountant::new(&alice); + let bob_pubkey = KeyPair::new().pubkey(); + let historian = Historian::new(&alice.last_id(), Some(30)); + let acc = Arc::new(Mutex::new(AccountantSkel::new( + acc, + alice.last_id(), + sink(), + historian, + ))); + + let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap(); + + let mut msgs = VecDeque::new(); + for i in 0..10 { + let b = resp_recycler.allocate(); + let b_ = b.clone(); + let mut w = b.write().unwrap(); + w.data[0] = i as u8; + w.meta.size = PACKET_DATA_SIZE; + w.meta.set_addr(&addr); + msgs.push_back(b_); + } + s_responder.send(msgs).expect("send"); + + exit.store(true, Ordering::Relaxed); + t_receiver.join().expect("join"); + t_responder.join().expect("join"); + } + } #[cfg(all(feature = "unstable", test))] diff --git a/src/result.rs b/src/result.rs index 9b3c17a3695fde..01872dfbe1138c 100644 --- a/src/result.rs +++ b/src/result.rs @@ -4,6 +4,7 @@ use bincode; use serde_json; use std; use std::any::Any; +use accountant; #[derive(Debug)] pub enum Error { @@ -14,6 +15,7 @@ pub enum Error { RecvError(std::sync::mpsc::RecvError), RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), Serialize(std::boxed::Box), + AccountingError(accountant::AccountingError), SendError, Services, } @@ -30,6 +32,11 @@ impl std::convert::From for Error { Error::RecvTimeoutError(e) } } +impl std::convert::From for Error { + fn from(e: accountant::AccountingError) -> Error { + Error::AccountingError(e) + } +} impl std::convert::From> for Error { fn from(_e: std::sync::mpsc::SendError) -> Error { Error::SendError diff --git a/src/streamer.rs b/src/streamer.rs index 33882c31dcffda..0fd8c1b758204d 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -8,7 +8,7 @@ use std::sync::mpsc; use std::sync::{Arc, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; -use subscribers; +use subscribers::Subscribers; pub type PacketReceiver = mpsc::Receiver; pub type PacketSender = mpsc::Sender; @@ -106,12 +106,12 @@ pub fn blob_receiver( fn recv_window( window: &mut Vec>, - subs: &Arc>, + subs: &Arc>, recycler: &BlobRecycler, consumed: &mut usize, r: &BlobReceiver, s: &BlobSender, - cast: &BlobSender, + retransmit: &BlobSender, ) -> Result<()> { let timer = Duration::new(1, 0); let mut dq = r.recv_timeout(timer)?; @@ -120,7 +120,7 @@ fn recv_window( } { //retransmit all leader blocks - let mut castq = VecDeque::new(); + let mut retransmitq = VecDeque::new(); let rsubs = subs.read().unwrap(); for b in &dq { let p = b.read().unwrap(); @@ -141,11 +141,11 @@ fn recv_window( mnv.meta.size = sz; mnv.data[..sz].copy_from_slice(&p.data[..sz]); } - castq.push_back(nv); + retransmitq.push_back(nv); } } - if !castq.is_empty() { - cast.send(castq)?; + if !retransmitq.is_empty() { + retransmit.send(retransmitq)?; } } //send a contiguous set of blocks @@ -183,11 +183,11 @@ fn recv_window( pub fn window( exit: Arc, - subs: Arc>, + subs: Arc>, recycler: BlobRecycler, r: BlobReceiver, s: BlobSender, - cast: BlobSender, + retransmit: BlobSender, ) -> JoinHandle<()> { spawn(move || { let mut window = vec![None; NUM_BLOBS]; @@ -196,13 +196,13 @@ pub fn window( if exit.load(Ordering::Relaxed) { break; } - let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &cast); + let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &retransmit); } }) } fn retransmit( - subs: &Arc>, + subs: &Arc>, recycler: &BlobRecycler, r: &BlobReceiver, sock: &UdpSocket, @@ -237,7 +237,7 @@ fn retransmit( pub fn retransmitter( sock: UdpSocket, exit: Arc, - subs: Arc>, + subs: Arc>, recycler: BlobRecycler, r: BlobReceiver, ) -> JoinHandle<()> { @@ -442,20 +442,21 @@ mod test { let subs = Arc::new(RwLock::new(Subscribers::new( Node::default(), Node::new([0; 8], 0, send.local_addr().unwrap()), + &[], ))); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver(exit.clone(), resp_recycler.clone(), read, s_reader).unwrap(); let (s_window, r_window) = channel(); - let (s_cast, r_cast) = channel(); + let (s_retransmit, r_retransmit) = channel(); let t_window = window( exit.clone(), subs, resp_recycler.clone(), r_reader, s_window, - s_cast, + s_retransmit, ); let (s_responder, r_responder) = channel(); let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); @@ -475,8 +476,8 @@ mod test { let mut num = 0; get_blobs(r_window, &mut num); assert_eq!(num, 10); - let mut q = r_cast.recv().unwrap(); - while let Ok(mut nq) = r_cast.try_recv() { + let mut q = r_retransmit.recv().unwrap(); + while let Ok(mut nq) = r_retransmit.try_recv() { q.append(&mut nq); } assert_eq!(q.len(), 10); @@ -494,9 +495,8 @@ mod test { let subs = Arc::new(RwLock::new(Subscribers::new( Node::default(), Node::default(), + &[Node::new([0; 8], 1, read.local_addr().unwrap())] ))); - let n3 = Node::new([0; 8], 1, read.local_addr().unwrap()); - subs.write().unwrap().insert(&[n3]); let (s_retransmit, r_retransmit) = channel(); let blob_recycler = BlobRecycler::default(); let saddr = send.local_addr().unwrap(); diff --git a/src/subscribers.rs b/src/subscribers.rs index 153246d12adb31..b81a54941b599a 100644 --- a/src/subscribers.rs +++ b/src/subscribers.rs @@ -40,18 +40,19 @@ impl Node { pub struct Subscribers { data: Vec, - me: Node, + pub me: Node, pub leader: Node, } impl Subscribers { - pub fn new(me: Node, leader: Node) -> Subscribers { + pub fn new(me: Node, leader: Node, network: &[Node]) -> Subscribers { let mut h = Subscribers { data: vec![], me: me.clone(), leader: leader.clone(), }; h.insert(&[me, leader]); + h.insert(network); h } @@ -99,7 +100,7 @@ mod test { me.weight = 10; let mut leader = Node::default(); leader.weight = 11; - let mut s = Subscribers::new(me, leader); + let mut s = Subscribers::new(me, leader, &[]); assert_eq!(s.data.len(), 2); assert_eq!(s.data[0].weight, 11); assert_eq!(s.data[1].weight, 10); @@ -116,7 +117,7 @@ mod test { let s3 = UdpSocket::bind("127.0.0.1:0").expect("bind"); let n1 = Node::new([0; 8], 0, s1.local_addr().unwrap()); let n2 = Node::new([0; 8], 0, s2.local_addr().unwrap()); - let mut s = Subscribers::new(n1.clone(), n2.clone()); + let mut s = Subscribers::new(n1.clone(), n2.clone(), &[]); let n3 = Node::new([0; 8], 0, s3.local_addr().unwrap()); s.insert(&[n3]); let mut b = Blob::default(); From d715a291a09088b2ef6a0017da2bd51ad53e9638 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 24 Apr 2018 12:40:03 -0700 Subject: [PATCH 2/5] merge --- src/accountant_skel.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index efc1b418fbfdca..43a45a75bcd5cb 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -449,8 +449,6 @@ impl AccountantSkel { }); Ok(vec![t_blob_receiver, t_retransmit, t_window, t_server]) } -======= ->>>>>>> d620ce5... update } #[cfg(test)] From b07af0293736976b5c91740f2d97da909e03d1cf Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 24 Apr 2018 13:01:33 -0700 Subject: [PATCH 3/5] update --- src/accountant_skel.rs | 102 ++++++++--------------------------------- src/ecdsa.rs | 2 +- src/hash.rs | 2 +- src/mint.rs | 2 +- src/result.rs | 4 +- src/signature.rs | 2 +- src/streamer.rs | 12 ++++- 7 files changed, 35 insertions(+), 91 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 43a45a75bcd5cb..c70fa4a4ab3129 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -25,12 +25,9 @@ use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; -use streamer; -use transaction::Transaction; +use streamer::{BlobReceiver, BlobSender}; use subscribers::Subscribers; - -use subscribers; -use std::mem::size_of; +use transaction::Transaction; pub struct AccountantSkel { acc: Accountant, @@ -226,7 +223,7 @@ impl AccountantSkel { fn process( obj: &Arc>>, verified_receiver: &Receiver)>>, - blob_sender: &streamer::BlobSender, + blob_sender: &BlobSender, packet_recycler: &packet::PacketRecycler, blob_recycler: &packet::BlobRecycler, ) -> Result<()> { @@ -254,23 +251,26 @@ impl AccountantSkel { fn replicate_state( obj: &Arc>>, verified_receiver: &BlobReceiver, - blob_sender: &streamer::BlobSender, + blob_sender: &BlobSender, blob_recycler: &packet::BlobRecycler, ) -> Result<()> { let timer = Duration::new(1, 0); let blobs = verified_receiver.recv_timeout(timer)?; for msgs in blobs { - let entries:Vec = b.read().unwrap().data.deserialize()?; + let entries: Vec = { + let m = msgs.read().unwrap(); + let v = deserialize(m.data[..m.meta.len])?; + v + } for e in entries { obj.lock().unwrap().acc.process_verified_events(e.events)?; } //TODO respond back to leader with hash of the state + blob_recycler.recycle(msgs); } - blob_recycler.recycle(msgs); Ok(()) } - /// Create a UDP microservice that forwards messages the given AccountantSkel. /// This service is the network leader /// Set `exit` to shutdown its threads. @@ -383,72 +383,6 @@ impl AccountantSkel { }); Ok(vec![t_blob_receiver, t_retransmit, t_window, t_server]) } - - /// This service receives messages from a leader in the network and processes the transactions - /// on the accountant state. - /// # Arguments - /// * `obj` - The accountant state. - /// * `rsubs` - The subscribers. - /// * `exit` - The exit signal. - /// # Remarks - /// The pipeline is constructed as follows: - /// 1. receive blobs from the network, these are out of order - /// 2. verify blobs, PoH, signatures (TODO) - /// 3. reconstruct contiguous window - /// a. order the blobs - /// b. use erasure coding to reconstruct missing blobs - /// c. ask the network for missing blobs, if erasure coding is insufficient - /// d. make sure that the blobs PoH sequences connect (TODO) - /// 4. process the transaction state machine - /// 5. respond with the hash of the state back to the leader - pub fn replicate( - obj: &Arc>>, - rsubs: subscribers::Subscribers, - exit: Arc, - ) -> Result>> { - let read = UdpSocket::bind(rsubs.me.addr)?; - // make sure we are on the same interface - let mut local = read.local_addr()?; - local.set_port(0); - let write = UdpSocket::bind(local)?; - - let blob_recycler = packet::BlobRecycler::default(); - let (blob_sender, blob_receiver) = channel(); - let t_blob_receiver = - streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender.clone())?; - let (window_sender, window_receiver) = channel(); - let (retransmit_sender, retransmit_receiver) = channel(); - - let subs = Arc::new(RwLock::new(rsubs)); - let t_retransmit = streamer::retransmitter( - write, - exit.clone(), - subs.clone(), - blob_recycler.clone(), - retransmit_receiver, - ); - //TODO - //the packets comming out of blob_receiver need to be sent to the GPU and verified - //then sent to the window, which does the erasure coding reconstruction - let t_window = streamer::window( - exit.clone(), - subs, - blob_recycler.clone(), - blob_receiver, - window_sender, - retransmit_sender, - ); - - let skel = obj.clone(); - let t_server = spawn(move || loop { - let e = Self::replicate_state(&skel, &window_receiver, - &blob_sender, &blob_recycler); - if e.is_err() && exit.load(Ordering::Relaxed) { - break; - } - }); - Ok(vec![t_blob_receiver, t_retransmit, t_window, t_server]) - } } #[cfg(test)] @@ -496,11 +430,11 @@ mod tests { use std::time::Duration; use transaction::Transaction; - use subscribers::{Node, Subscribers}; - use streamer; - use std::sync::mpsc::channel; + use packet::PACKET_DATA_SIZE; use std::collections::VecDeque; - use packet::{PACKET_DATA_SIZE}; + use std::sync::mpsc::channel; + use streamer; + use subscribers::{Node, Subscribers}; #[test] fn test_layout() { @@ -620,9 +554,11 @@ mod tests { let recv_recycler = PacketRecycler::default(); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); - let t_receiver = streamer::receiver(read, exit.clone(), recv_recycler.clone(), s_reader).unwrap(); + let t_receiver = + streamer::receiver(read, exit.clone(), recv_recycler.clone(), s_reader).unwrap(); let (s_responder, r_responder) = channel(); - let t_responder = streamer::responder(send, exit.clone(), resp_recycler.clone(), r_responder); + let t_responder = + streamer::responder(send, exit.clone(), resp_recycler.clone(), r_responder); let alice = Mint::new(10_000); let acc = Accountant::new(&alice); @@ -634,7 +570,7 @@ mod tests { sink(), historian, ))); - + let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap(); let mut msgs = VecDeque::new(); diff --git a/src/ecdsa.rs b/src/ecdsa.rs index c0a06646d02fa8..a1df2e93d4345e 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -135,8 +135,8 @@ mod tests { use ecdsa; use packet::{Packet, Packets, SharedPackets}; use std::sync::RwLock; - use transaction::test_tx; use transaction::Transaction; + use transaction::test_tx; fn make_packet_from_transaction(tr: Transaction) -> Packet { let tx = serialize(&Request::Transaction(tr)).unwrap(); diff --git a/src/hash.rs b/src/hash.rs index 61dd01468c9fcb..ee7598a0dc29e2 100644 --- a/src/hash.rs +++ b/src/hash.rs @@ -1,7 +1,7 @@ //! The `hash` module provides functions for creating SHA-256 hashes. -use generic_array::typenum::U32; use generic_array::GenericArray; +use generic_array::typenum::U32; use sha2::{Digest, Sha256}; pub type Hash = GenericArray; diff --git a/src/mint.rs b/src/mint.rs index f3b6c5e598d635..2f385914deab0c 100644 --- a/src/mint.rs +++ b/src/mint.rs @@ -1,7 +1,7 @@ //! The `mint` module is a library for generating the chain's genesis block. -use entry::create_entry; use entry::Entry; +use entry::create_entry; use event::Event; use hash::{hash, Hash}; use ring::rand::SystemRandom; diff --git a/src/result.rs b/src/result.rs index 01872dfbe1138c..63c9baa53b5705 100644 --- a/src/result.rs +++ b/src/result.rs @@ -1,10 +1,10 @@ //! The `result` module exposes a Result type that propagates one of many different Error types. +use accountant; use bincode; use serde_json; use std; use std::any::Any; -use accountant; #[derive(Debug)] pub enum Error { @@ -77,9 +77,9 @@ mod tests { use std::io; use std::io::Write; use std::net::SocketAddr; - use std::sync::mpsc::channel; use std::sync::mpsc::RecvError; use std::sync::mpsc::RecvTimeoutError; + use std::sync::mpsc::channel; use std::thread; fn addr_parse_error() -> Result { diff --git a/src/signature.rs b/src/signature.rs index 1b01e14ef579e8..5f3aee61ea2267 100644 --- a/src/signature.rs +++ b/src/signature.rs @@ -1,7 +1,7 @@ //! The `signature` module provides functionality for public, and private keys. -use generic_array::typenum::{U32, U64}; use generic_array::GenericArray; +use generic_array::typenum::{U32, U64}; use ring::signature::Ed25519KeyPair; use ring::{rand, signature}; use untrusted; diff --git a/src/streamer.rs b/src/streamer.rs index 0fd8c1b758204d..c081e24596d2c6 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -196,7 +196,15 @@ pub fn window( if exit.load(Ordering::Relaxed) { break; } - let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &retransmit); + let _ = recv_window( + &mut window, + &subs, + &recycler, + &mut consumed, + &r, + &s, + &retransmit, + ); } }) } @@ -495,7 +503,7 @@ mod test { let subs = Arc::new(RwLock::new(Subscribers::new( Node::default(), Node::default(), - &[Node::new([0; 8], 1, read.local_addr().unwrap())] + &[Node::new([0; 8], 1, read.local_addr().unwrap())], ))); let (s_retransmit, r_retransmit) = channel(); let blob_recycler = BlobRecycler::default(); From 9c66a0973323b4caec964f97c2789bd821a69ef0 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 24 Apr 2018 13:03:22 -0700 Subject: [PATCH 4/5] update --- src/accountant_skel.rs | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index c70fa4a4ab3129..feb21a16cbbadc 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -25,9 +25,9 @@ use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; -use streamer::{BlobReceiver, BlobSender}; -use subscribers::Subscribers; +use streamer; use transaction::Transaction; +use subscribers::Subscribers; pub struct AccountantSkel { acc: Accountant, @@ -223,7 +223,7 @@ impl AccountantSkel { fn process( obj: &Arc>>, verified_receiver: &Receiver)>>, - blob_sender: &BlobSender, + blob_sender: &streamer::BlobSender, packet_recycler: &packet::PacketRecycler, blob_recycler: &packet::BlobRecycler, ) -> Result<()> { @@ -251,26 +251,25 @@ impl AccountantSkel { fn replicate_state( obj: &Arc>>, verified_receiver: &BlobReceiver, - blob_sender: &BlobSender, + blob_sender: &streamer::BlobSender, blob_recycler: &packet::BlobRecycler, ) -> Result<()> { let timer = Duration::new(1, 0); let blobs = verified_receiver.recv_timeout(timer)?; for msgs in blobs { - let entries: Vec = { - let m = msgs.read().unwrap(); - let v = deserialize(m.data[..m.meta.len])?; - v - } + let entries:Vec = b.read().unwrap().data.deserialize()?; for e in entries { obj.lock().unwrap().acc.process_verified_events(e.events)?; } //TODO respond back to leader with hash of the state - blob_recycler.recycle(msgs); + } + for blob in blobs { + blob_recycler.recycle(blob); } Ok(()) } + /// Create a UDP microservice that forwards messages the given AccountantSkel. /// This service is the network leader /// Set `exit` to shutdown its threads. @@ -430,11 +429,11 @@ mod tests { use std::time::Duration; use transaction::Transaction; - use packet::PACKET_DATA_SIZE; - use std::collections::VecDeque; - use std::sync::mpsc::channel; - use streamer; use subscribers::{Node, Subscribers}; + use streamer; + use std::sync::mpsc::channel; + use std::collections::VecDeque; + use packet::{PACKET_DATA_SIZE}; #[test] fn test_layout() { @@ -554,11 +553,9 @@ mod tests { let recv_recycler = PacketRecycler::default(); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); - let t_receiver = - streamer::receiver(read, exit.clone(), recv_recycler.clone(), s_reader).unwrap(); + let t_receiver = streamer::receiver(read, exit.clone(), recv_recycler.clone(), s_reader).unwrap(); let (s_responder, r_responder) = channel(); - let t_responder = - streamer::responder(send, exit.clone(), resp_recycler.clone(), r_responder); + let t_responder = streamer::responder(send, exit.clone(), resp_recycler.clone(), r_responder); let alice = Mint::new(10_000); let acc = Accountant::new(&alice); @@ -570,7 +567,7 @@ mod tests { sink(), historian, ))); - + let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap(); let mut msgs = VecDeque::new(); From 5034221bfa5dbd0fa37ba0e09797ad1b5818c4cc Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 24 Apr 2018 13:47:15 -0700 Subject: [PATCH 5/5] update --- src/accountant_skel.rs | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index feb21a16cbbadc..4b0468374df51f 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -250,21 +250,21 @@ impl AccountantSkel { /// Respond with a signed hash of the state fn replicate_state( obj: &Arc>>, - verified_receiver: &BlobReceiver, - blob_sender: &streamer::BlobSender, + verified_receiver: &streamer::BlobReceiver, blob_recycler: &packet::BlobRecycler, ) -> Result<()> { let timer = Duration::new(1, 0); let blobs = verified_receiver.recv_timeout(timer)?; for msgs in blobs { - let entries:Vec = b.read().unwrap().data.deserialize()?; + let entries:Vec = { + let m = msgs.read().unwrap(); + let r = deserialize(&m.data[..m.meta.size])?; + r + }; for e in entries { obj.lock().unwrap().acc.process_verified_events(e.events)?; } - //TODO respond back to leader with hash of the state - } - for blob in blobs { - blob_recycler.recycle(blob); + blob_recycler.recycle(msgs); } Ok(()) } @@ -357,7 +357,7 @@ impl AccountantSkel { let t_retransmit = streamer::retransmitter( write, exit.clone(), - subs, + subs.clone(), blob_recycler.clone(), retransmit_receiver, ); @@ -375,7 +375,7 @@ impl AccountantSkel { let skel = obj.clone(); let t_server = spawn(move || loop { - let e = Self::replicate_state(&skel, &window_receiver, &blob_sender, &blob_recycler); + let e = Self::replicate_state(&skel, &window_receiver, &blob_recycler); if e.is_err() && exit.load(Ordering::Relaxed) { break; } @@ -559,7 +559,6 @@ mod tests { let alice = Mint::new(10_000); let acc = Accountant::new(&alice); - let bob_pubkey = KeyPair::new().pubkey(); let historian = Historian::new(&alice.last_id(), Some(30)); let acc = Arc::new(Mutex::new(AccountantSkel::new( acc, @@ -568,7 +567,7 @@ mod tests { historian, ))); - let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap(); + let threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap(); let mut msgs = VecDeque::new(); for i in 0..10 { @@ -585,6 +584,9 @@ mod tests { exit.store(true, Ordering::Relaxed); t_receiver.join().expect("join"); t_responder.join().expect("join"); + for t in threads { + t.join().expect("join"); + } } }