Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
aeyakovenko committed Apr 24, 2018
2 parents 58d1ddd + 7390d6c commit 3789405
Showing 1 changed file with 8 additions and 12 deletions.
20 changes: 8 additions & 12 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use streamer;
use transaction::Transaction;
use subscribers::Subscribers;

use subscribers;
use std::mem::size_of;
Expand Down Expand Up @@ -252,18 +253,14 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
/// Respond with a signed hash of the state
fn replicate_state(
obj: &Arc<Mutex<AccountantSkel<W>>>,
verified_receiver: &streamer::BlobReceiver,
verified_receiver: &BlobReceiver,
blob_sender: &streamer::BlobSender,
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
let timer = Duration::new(1, 0);
let blobs = verified_receiver.recv_timeout(timer)?;
for msgs in &blobs {
let blob = msgs.read().unwrap();
let mut entries:Vec<Entry> = Vec::new();
for i in 0..blob.meta.size/size_of::<Entry>() {
entries.push(deserialize(&blob.data[i..i+size_of::<Entry>()]).unwrap());
}
for msgs in blobs {
let entries:Vec<Entry> = b.read().unwrap().data.deserialize()?;
for e in entries {
obj.lock().unwrap().acc.process_verified_events(e.events)?;
}
Expand Down Expand Up @@ -343,7 +340,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
/// 5. respond with the hash of the state back to the leader
pub fn replicate(
obj: &Arc<Mutex<AccountantSkel<W>>>,
rsubs: subscribers::Subscribers,
rsubs: Subscribers,
exit: Arc<AtomicBool>,
) -> Result<Vec<JoinHandle<()>>> {
let read = UdpSocket::bind(rsubs.me.addr)?;
Expand All @@ -355,15 +352,15 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
let blob_recycler = packet::BlobRecycler::default();
let (blob_sender, blob_receiver) = channel();
let t_blob_receiver =
streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender.clone())?;
streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?;
let (window_sender, window_receiver) = channel();
let (retransmit_sender, retransmit_receiver) = channel();

let subs = Arc::new(RwLock::new(rsubs));
let t_retransmit = streamer::retransmitter(
write,
exit.clone(),
subs.clone(),
subs,
blob_recycler.clone(),
retransmit_receiver,
);
Expand All @@ -381,8 +378,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {

let skel = obj.clone();
let t_server = spawn(move || loop {
let e = Self::replicate_state(&skel, &window_receiver,
&blob_sender, &blob_recycler);
let e = Self::replicate_state(&skel, &window_receiver, &blob_sender, &blob_recycler);
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
}
Expand Down

0 comments on commit 3789405

Please sign in to comment.