From c5cc91443e497f62bfaa457a00c54e74441b94ce Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 2 May 2018 15:54:53 -0600 Subject: [PATCH 1/2] Rename sender/receiver to input/output --- src/accountant_skel.rs | 16 ++++++++-------- src/bin/historian-demo.rs | 6 +++--- src/historian.rs | 36 ++++++++++++++++++------------------ 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 65a0ddd097a4e5..d3141f70844aef 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -105,7 +105,7 @@ impl AccountantSkel { /// Process any Entry items that have been published by the Historian. pub fn sync(&mut self) -> Hash { - while let Ok(entry) = self.historian.receiver.try_recv() { + while let Ok(entry) = self.historian.output.try_recv() { self.last_id = entry.id; self.acc.register_entry_id(&self.last_id); writeln!(self.writer, "{}", serde_json::to_string(&entry).unwrap()).unwrap(); @@ -215,14 +215,14 @@ impl AccountantSkel { for result in self.acc.process_verified_transactions(trs) { if let Ok(tr) = result { self.historian - .sender + .input .send(Signal::Event(Event::Transaction(tr)))?; } } // Let validators know they should not attempt to process additional // transactions in parallel. - self.historian.sender.send(Signal::Tick)?; + self.historian.input.send(Signal::Tick)?; // Process the remaining requests serially. let rsps = reqs.into_iter() @@ -545,9 +545,9 @@ mod tests { assert!(skel.process_packets(req_vers).is_ok()); // Collect the ledger and feed it to a new accountant. - skel.historian.sender.send(Signal::Tick).unwrap(); - drop(skel.historian.sender); - let entries: Vec = skel.historian.receiver.iter().collect(); + skel.historian.input.send(Signal::Tick).unwrap(); + drop(skel.historian.input); + let entries: Vec = skel.historian.output.iter().collect(); // Assert the user holds one token, not two. If the server only output one // entry, then the second transaction will be rejected, because it drives @@ -800,8 +800,8 @@ mod bench { let tps = txs as f64 / sec; // Ensure that all transactions were successfully logged. - drop(skel.historian.sender); - let entries: Vec = skel.historian.receiver.iter().collect(); + drop(skel.historian.input); + let entries: Vec = skel.historian.output.iter().collect(); assert_eq!(entries.len(), 1); assert_eq!(entries[0].events.len(), txs as usize); diff --git a/src/bin/historian-demo.rs b/src/bin/historian-demo.rs index 306b8ffbaa8611..fe4180c457c1f9 100644 --- a/src/bin/historian-demo.rs +++ b/src/bin/historian-demo.rs @@ -17,7 +17,7 @@ fn create_ledger(hist: &Historian, seed: &Hash) -> Result<(), SendError> let keypair = KeyPair::new(); let tr = Transaction::new(&keypair, keypair.pubkey(), 42, *seed); let signal0 = Signal::Event(Event::Transaction(tr)); - hist.sender.send(signal0)?; + hist.input.send(signal0)?; sleep(Duration::from_millis(10)); Ok(()) } @@ -26,8 +26,8 @@ fn main() { let seed = Hash::default(); let hist = Historian::new(&seed, Some(10)); create_ledger(&hist, &seed).expect("send error"); - drop(hist.sender); - let entries: Vec = hist.receiver.iter().collect(); + drop(hist.input); + let entries: Vec = hist.output.iter().collect(); for entry in &entries { println!("{:?}", entry); } diff --git a/src/historian.rs b/src/historian.rs index 412027846fab03..970a75db0b22bb 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -9,20 +9,20 @@ use std::thread::{spawn, JoinHandle}; use std::time::Instant; pub struct Historian { - pub sender: SyncSender, - pub receiver: Receiver, + pub input: SyncSender, + pub output: Receiver, pub thread_hdl: JoinHandle, } impl Historian { pub fn new(start_hash: &Hash, ms_per_tick: Option) -> Self { - let (sender, event_receiver) = sync_channel(10_000); - let (entry_sender, receiver) = sync_channel(10_000); + let (input, event_receiver) = sync_channel(10_000); + let (entry_sender, output) = sync_channel(10_000); let thread_hdl = Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender); Historian { - sender, - receiver, + input, + output, thread_hdl, } } @@ -62,21 +62,21 @@ mod tests { let zero = Hash::default(); let hist = Historian::new(&zero, None); - hist.sender.send(Signal::Tick).unwrap(); + hist.input.send(Signal::Tick).unwrap(); sleep(Duration::new(0, 1_000_000)); - hist.sender.send(Signal::Tick).unwrap(); + hist.input.send(Signal::Tick).unwrap(); sleep(Duration::new(0, 1_000_000)); - hist.sender.send(Signal::Tick).unwrap(); + hist.input.send(Signal::Tick).unwrap(); - let entry0 = hist.receiver.recv().unwrap(); - let entry1 = hist.receiver.recv().unwrap(); - let entry2 = hist.receiver.recv().unwrap(); + let entry0 = hist.output.recv().unwrap(); + let entry1 = hist.output.recv().unwrap(); + let entry2 = hist.output.recv().unwrap(); assert_eq!(entry0.num_hashes, 0); assert_eq!(entry1.num_hashes, 0); assert_eq!(entry2.num_hashes, 0); - drop(hist.sender); + drop(hist.input); assert_eq!( hist.thread_hdl.join().unwrap(), ExitReason::RecvDisconnected @@ -89,8 +89,8 @@ mod tests { fn test_historian_closed_sender() { let zero = Hash::default(); let hist = Historian::new(&zero, None); - drop(hist.receiver); - hist.sender.send(Signal::Tick).unwrap(); + drop(hist.output); + hist.input.send(Signal::Tick).unwrap(); assert_eq!( hist.thread_hdl.join().unwrap(), ExitReason::SendDisconnected @@ -102,9 +102,9 @@ mod tests { let zero = Hash::default(); let hist = Historian::new(&zero, Some(20)); sleep(Duration::from_millis(300)); - hist.sender.send(Signal::Tick).unwrap(); - drop(hist.sender); - let entries: Vec = hist.receiver.iter().collect(); + hist.input.send(Signal::Tick).unwrap(); + drop(hist.input); + let entries: Vec = hist.output.iter().collect(); assert!(entries.len() > 1); // Ensure the ID is not the seed. From 4b9f11558691fd40777706ca3831803c320b309d Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 2 May 2018 16:35:37 -0600 Subject: [PATCH 2/2] Hoist Historian input --- src/accountant_skel.rs | 42 ++++++++++++++++++++++++++------------- src/accountant_stub.rs | 5 ++++- src/bin/historian-demo.rs | 13 ++++++------ src/bin/testnode.rs | 5 ++++- src/historian.rs | 37 +++++++++++++++++----------------- 5 files changed, 62 insertions(+), 40 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index d3141f70844aef..527e57a9f0d2f6 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -21,7 +21,7 @@ use std::collections::VecDeque; use std::io::Write; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::mpsc::{channel, Receiver, Sender, SyncSender}; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; @@ -34,6 +34,7 @@ pub struct AccountantSkel { acc: Accountant, last_id: Hash, writer: W, + historian_input: SyncSender, historian: Historian, entry_info_subscribers: Vec, } @@ -78,11 +79,18 @@ pub enum Response { impl AccountantSkel { /// Create a new AccountantSkel that wraps the given Accountant. - pub fn new(acc: Accountant, last_id: Hash, writer: W, historian: Historian) -> Self { + pub fn new( + acc: Accountant, + last_id: Hash, + writer: W, + historian_input: SyncSender, + historian: Historian, + ) -> Self { AccountantSkel { acc, last_id, writer, + historian_input, historian, entry_info_subscribers: vec![], } @@ -214,15 +222,14 @@ impl AccountantSkel { // Process the transactions in parallel and then log the successful ones. for result in self.acc.process_verified_transactions(trs) { if let Ok(tr) = result { - self.historian - .input + self.historian_input .send(Signal::Event(Event::Transaction(tr)))?; } } // Let validators know they should not attempt to process additional // transactions in parallel. - self.historian.input.send(Signal::Tick)?; + self.historian_input.send(Signal::Tick)?; // Process the remaining requests serially. let rsps = reqs.into_iter() @@ -482,6 +489,7 @@ mod tests { use std::io::sink; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::mpsc::sync_channel; use std::sync::{Arc, Mutex}; use std::thread::sleep; use std::time::Duration; @@ -530,8 +538,9 @@ mod tests { let mint = Mint::new(2); let acc = Accountant::new(&mint); let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address"); - let historian = Historian::new(&mint.last_id(), None); - let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), historian); + let (input, event_receiver) = sync_channel(10); + let historian = Historian::new(event_receiver, &mint.last_id(), None); + let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), input, historian); // Process a batch that includes a transaction that receives two tokens. let alice = KeyPair::new(); @@ -545,8 +554,8 @@ mod tests { assert!(skel.process_packets(req_vers).is_ok()); // Collect the ledger and feed it to a new accountant. - skel.historian.input.send(Signal::Tick).unwrap(); - drop(skel.historian.input); + skel.historian_input.send(Signal::Tick).unwrap(); + drop(skel.historian_input); let entries: Vec = skel.historian.output.iter().collect(); // Assert the user holds one token, not two. If the server only output one @@ -569,11 +578,13 @@ mod tests { let acc = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let historian = Historian::new(&alice.last_id(), Some(30)); + let (input, event_receiver) = sync_channel(10); + let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); let acc = Arc::new(Mutex::new(AccountantSkel::new( acc, alice.last_id(), sink(), + input, historian, ))); let _threads = AccountantSkel::serve(&acc, &addr, exit.clone()).unwrap(); @@ -651,11 +662,13 @@ mod tests { let starting_balance = 10_000; let alice = Mint::new(starting_balance); let acc = Accountant::new(&alice); - let historian = Historian::new(&alice.last_id(), Some(30)); + let (input, event_receiver) = sync_channel(10); + let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); let acc = Arc::new(Mutex::new(AccountantSkel::new( acc, alice.last_id(), sink(), + input, historian, ))); @@ -790,8 +803,9 @@ mod bench { .map(|tr| (Request::Transaction(tr), rsp_addr, 1_u8)) .collect(); - let historian = Historian::new(&mint.last_id(), None); - let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), historian); + let (input, event_receiver) = sync_channel(10); + let historian = Historian::new(event_receiver, &mint.last_id(), None); + let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), input, historian); let now = Instant::now(); assert!(skel.process_packets(req_vers).is_ok()); @@ -800,7 +814,7 @@ mod bench { let tps = txs as f64 / sec; // Ensure that all transactions were successfully logged. - drop(skel.historian.input); + drop(input); let entries: Vec = skel.historian.output.iter().collect(); assert_eq!(entries.len(), 1); assert_eq!(entries[0].events.len(), txs as usize); diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 2dd331da7b130e..d03866099cbe77 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -165,6 +165,7 @@ mod tests { use signature::{KeyPair, KeyPairUtil}; use std::io::sink; use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::mpsc::sync_channel; use std::sync::{Arc, Mutex}; use std::thread::sleep; use std::time::Duration; @@ -178,11 +179,13 @@ mod tests { let acc = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let historian = Historian::new(&alice.last_id(), Some(30)); + let (input, event_receiver) = sync_channel(10); + let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); let acc = Arc::new(Mutex::new(AccountantSkel::new( acc, alice.last_id(), sink(), + input, historian, ))); let _threads = AccountantSkel::serve(&acc, addr, exit.clone()).unwrap(); diff --git a/src/bin/historian-demo.rs b/src/bin/historian-demo.rs index fe4180c457c1f9..010391cbab3d88 100644 --- a/src/bin/historian-demo.rs +++ b/src/bin/historian-demo.rs @@ -8,25 +8,26 @@ use solana::ledger::Block; use solana::recorder::Signal; use solana::signature::{KeyPair, KeyPairUtil}; use solana::transaction::Transaction; -use std::sync::mpsc::SendError; +use std::sync::mpsc::{sync_channel, SendError, SyncSender}; use std::thread::sleep; use std::time::Duration; -fn create_ledger(hist: &Historian, seed: &Hash) -> Result<(), SendError> { +fn create_ledger(input: &SyncSender, seed: &Hash) -> Result<(), SendError> { sleep(Duration::from_millis(15)); let keypair = KeyPair::new(); let tr = Transaction::new(&keypair, keypair.pubkey(), 42, *seed); let signal0 = Signal::Event(Event::Transaction(tr)); - hist.input.send(signal0)?; + input.send(signal0)?; sleep(Duration::from_millis(10)); Ok(()) } fn main() { + let (input, event_receiver) = sync_channel(10); let seed = Hash::default(); - let hist = Historian::new(&seed, Some(10)); - create_ledger(&hist, &seed).expect("send error"); - drop(hist.input); + let hist = Historian::new(event_receiver, &seed, Some(10)); + create_ledger(&input, &seed).expect("send error"); + drop(input); let entries: Vec = hist.output.iter().collect(); for entry in &entries { println!("{:?}", entry); diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 8b6da2c1c818f7..6a6d8ac9ea1c0e 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -15,6 +15,7 @@ use std::env; use std::io::{stdin, stdout, Read}; use std::process::exit; use std::sync::atomic::AtomicBool; +use std::sync::mpsc::sync_channel; use std::sync::{Arc, Mutex}; fn print_usage(program: &str, opts: Options) { @@ -95,12 +96,14 @@ fn main() { acc.register_entry_id(&last_id); } - let historian = Historian::new(&last_id, Some(1000)); + let (input, event_receiver) = sync_channel(10_000); + let historian = Historian::new(event_receiver, &last_id, Some(1000)); let exit = Arc::new(AtomicBool::new(false)); let skel = Arc::new(Mutex::new(AccountantSkel::new( acc, last_id, stdout(), + input, historian, ))); let threads = AccountantSkel::serve(&skel, &addr, exit.clone()).unwrap(); diff --git a/src/historian.rs b/src/historian.rs index 970a75db0b22bb..0d56b1deab9a07 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -9,22 +9,20 @@ use std::thread::{spawn, JoinHandle}; use std::time::Instant; pub struct Historian { - pub input: SyncSender, pub output: Receiver, pub thread_hdl: JoinHandle, } impl Historian { - pub fn new(start_hash: &Hash, ms_per_tick: Option) -> Self { - let (input, event_receiver) = sync_channel(10_000); + pub fn new( + event_receiver: Receiver, + start_hash: &Hash, + ms_per_tick: Option, + ) -> Self { let (entry_sender, output) = sync_channel(10_000); let thread_hdl = Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender); - Historian { - input, - output, - thread_hdl, - } + Historian { output, thread_hdl } } /// A background thread that will continue tagging received Event messages and @@ -59,14 +57,15 @@ mod tests { #[test] fn test_historian() { + let (input, event_receiver) = sync_channel(10); let zero = Hash::default(); - let hist = Historian::new(&zero, None); + let hist = Historian::new(event_receiver, &zero, None); - hist.input.send(Signal::Tick).unwrap(); + input.send(Signal::Tick).unwrap(); sleep(Duration::new(0, 1_000_000)); - hist.input.send(Signal::Tick).unwrap(); + input.send(Signal::Tick).unwrap(); sleep(Duration::new(0, 1_000_000)); - hist.input.send(Signal::Tick).unwrap(); + input.send(Signal::Tick).unwrap(); let entry0 = hist.output.recv().unwrap(); let entry1 = hist.output.recv().unwrap(); @@ -76,7 +75,7 @@ mod tests { assert_eq!(entry1.num_hashes, 0); assert_eq!(entry2.num_hashes, 0); - drop(hist.input); + drop(input); assert_eq!( hist.thread_hdl.join().unwrap(), ExitReason::RecvDisconnected @@ -87,10 +86,11 @@ mod tests { #[test] fn test_historian_closed_sender() { + let (input, event_receiver) = sync_channel(10); let zero = Hash::default(); - let hist = Historian::new(&zero, None); + let hist = Historian::new(event_receiver, &zero, None); drop(hist.output); - hist.input.send(Signal::Tick).unwrap(); + input.send(Signal::Tick).unwrap(); assert_eq!( hist.thread_hdl.join().unwrap(), ExitReason::SendDisconnected @@ -99,11 +99,12 @@ mod tests { #[test] fn test_ticking_historian() { + let (input, event_receiver) = sync_channel(10); let zero = Hash::default(); - let hist = Historian::new(&zero, Some(20)); + let hist = Historian::new(event_receiver, &zero, Some(20)); sleep(Duration::from_millis(300)); - hist.input.send(Signal::Tick).unwrap(); - drop(hist.input); + input.send(Signal::Tick).unwrap(); + drop(input); let entries: Vec = hist.output.iter().collect(); assert!(entries.len() > 1);