From 24cdff1810b30c0442eef697c370ce0934c3f331 Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Mon, 2 Jul 2018 10:07:32 -0700 Subject: [PATCH] support an initial window filled with last up-to-WINDOW_SIZE blobs --- src/bank.rs | 98 +++++++++++++++++++++++++++++++++------------- src/drone.rs | 1 + src/fullnode.rs | 47 +++++++++++++++++++--- src/streamer.rs | 38 ++++++++++++++++++ src/thin_client.rs | 3 ++ 5 files changed, 155 insertions(+), 32 deletions(-) diff --git a/src/bank.rs b/src/bank.rs index c3f0fec8aa164f..e1be328dd2d817 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -19,6 +19,7 @@ use std::result; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::RwLock; use std::time::Instant; +use streamer::WINDOW_SIZE; use timing::duration_as_us; use transaction::{Instruction, Plan, Transaction}; @@ -322,10 +323,7 @@ impl Bank { } /// Process an ordered list of entries. - pub fn process_entries(&self, entries: I) -> Result - where - I: IntoIterator, - { + pub fn process_entries(&self, entries: Vec) -> Result { let mut entry_count = 0; for entry in entries { entry_count += 1; @@ -364,7 +362,7 @@ impl Bank { } /// Process a full ledger. - pub fn process_ledger(&self, entries: I) -> Result + pub fn process_ledger(&self, entries: I) -> Result<(u64, Vec)> where I: IntoIterator, { @@ -380,20 +378,39 @@ impl Bank { let entry1 = entries .next() .expect("invalid ledger: need at least 2 entries"); - let tx = &entry1.transactions[0]; - let deposit = if let Instruction::NewContract(contract) = &tx.instruction { - contract.plan.final_payment() - } else { - None - }.expect("invalid ledger, needs to start with a contract"); + { + let tx = &entry1.transactions[0]; + let deposit = if let Instruction::NewContract(contract) = &tx.instruction { + contract.plan.final_payment() + } else { + None + }.expect("invalid ledger, needs to start with a contract"); - self.apply_payment(&deposit, &mut self.balances.write().unwrap()); + self.apply_payment(&deposit, &mut self.balances.write().unwrap()); + } self.register_entry_id(&entry0.id); self.register_entry_id(&entry1.id); let mut entry_count = 2; - entry_count += self.process_blocks(entries)?; - Ok(entry_count) + let mut tail = Vec::with_capacity(WINDOW_SIZE as usize); + let mut next = Vec::with_capacity(WINDOW_SIZE as usize); + + for block in &entries.into_iter().chunks(WINDOW_SIZE as usize) { + tail = next; + next = block.collect(); + entry_count += self.process_blocks(next.clone())?; + } + + tail.append(&mut next); + + if tail.len() < WINDOW_SIZE as usize { + tail.insert(0, entry1); + if tail.len() < WINDOW_SIZE as usize { + tail.insert(0, entry0); + } + } + + Ok((entry_count, tail)) } /// Process a Witness Signature. Any payment plans waiting on this signature @@ -526,9 +543,9 @@ mod tests { use super::*; use bincode::serialize; use entry::next_entry; + use entry::Entry; use entry_writer::{self, EntryWriter}; use hash::hash; - use ledger::next_entries; use signature::KeyPairUtil; use std::io::{BufReader, Cursor, Seek, SeekFrom}; @@ -779,25 +796,52 @@ mod tests { assert_eq!(bank.get_balance(&mint.pubkey()), 1); } - fn create_sample_block(mint: &Mint) -> impl Iterator { - let keypair = KeyPair::new(); - let tx = Transaction::new(&mint.keypair(), keypair.pubkey(), 1, mint.last_id()); - next_entries(&mint.last_id(), 0, vec![tx]).into_iter() + fn create_sample_block(mint: &Mint, length: usize) -> impl Iterator { + let mut entries = Vec::with_capacity(length); + let mut hash = mint.last_id(); + let mut cur_hashes = 0; + for _ in 0..length { + let keypair = KeyPair::new(); + let tx = Transaction::new(&mint.keypair(), keypair.pubkey(), 1, mint.last_id()); + let entry = Entry::new_mut(&mut hash, &mut cur_hashes, vec![tx], false); + entries.push(entry); + } + entries.into_iter() } - - fn create_sample_ledger() -> (impl Iterator, PublicKey) { - let mint = Mint::new(2); + fn create_sample_ledger(length: usize) -> (impl Iterator, PublicKey) { + let mint = Mint::new(1 + length as i64); let genesis = mint.create_entries(); - let block = create_sample_block(&mint); + let block = create_sample_block(&mint, length); (genesis.into_iter().chain(block), mint.pubkey()) } #[test] fn test_process_ledger() { - let (ledger, pubkey) = create_sample_ledger(); + let (ledger, pubkey) = create_sample_ledger(1); + let (ledger, dup) = ledger.tee(); let bank = Bank::default(); - bank.process_ledger(ledger).unwrap(); + let (ledger_height, tail) = bank.process_ledger(ledger).unwrap(); assert_eq!(bank.get_balance(&pubkey), 1); + assert_eq!(ledger_height, 3); + assert_eq!(tail.len(), 3); + assert_eq!(tail, dup.collect_vec()); + let last_entry = &tail[tail.len() - 1]; + assert_eq!(bank.last_id(), last_entry.id); + } + + #[test] + fn test_process_ledger_around_window_size() { + let window_size = WINDOW_SIZE as usize; + for entry_count in window_size - 1..window_size + 1 { + let (ledger, pubkey) = create_sample_ledger(entry_count); + let bank = Bank::default(); + let (ledger_height, tail) = bank.process_ledger(ledger).unwrap(); + assert_eq!(bank.get_balance(&pubkey), 1); + assert_eq!(ledger_height, entry_count as u64 + 2); + assert!(tail.len() <= window_size); + let last_entry = &tail[tail.len() - 1]; + assert_eq!(bank.last_id(), last_entry.id); + } } // Write the given entries to a file and then return a file iterator to them. @@ -812,7 +856,7 @@ mod tests { #[test] fn test_process_ledger_from_file() { - let (ledger, pubkey) = create_sample_ledger(); + let (ledger, pubkey) = create_sample_ledger(1); let ledger = to_file_iter(ledger); let bank = Bank::default(); @@ -824,7 +868,7 @@ mod tests { fn test_process_ledger_from_files() { let mint = Mint::new(2); let genesis = to_file_iter(mint.create_entries().into_iter()); - let block = to_file_iter(create_sample_block(&mint)); + let block = to_file_iter(create_sample_block(&mint, 1)); let bank = Bank::default(); bank.process_ledger(genesis.chain(block)).unwrap(); diff --git a/src/drone.rs b/src/drone.rs index be5d1e12ddc7f9..bb46554cfb2246 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -251,6 +251,7 @@ mod tests { let server = FullNode::new_leader( bank, 0, + None, Some(Duration::from_millis(30)), leader, exit.clone(), diff --git a/src/fullnode.rs b/src/fullnode.rs index 2c1bdb811d886b..7b718b3f8b4ae6 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -2,10 +2,13 @@ use bank::Bank; use crdt::{Crdt, ReplicatedData, TestNode}; +use entry::Entry; use entry_writer; +use ledger::Block; use ncp::Ncp; use packet::BlobRecycler; use rpu::Rpu; +use std::collections::VecDeque; use std::fs::{File, OpenOptions}; use std::io::{sink, stdin, stdout, BufReader}; use std::io::{Read, Write}; @@ -50,9 +53,9 @@ impl FullNode { }; let reader = BufReader::new(infile); let entries = entry_writer::read_entries(reader).map(|e| e.expect("failed to parse entry")); - info!("processing ledger..."); - let entry_height = bank.process_ledger(entries).expect("process_ledger"); + info!("processing ledger..."); + let (entry_height, ledger_tail) = bank.process_ledger(entries).expect("process_ledger"); // entry_height is the network-wide agreed height of the ledger. // initialize it from the input ledger info!("processed {} ledger...", entry_height); @@ -73,6 +76,7 @@ impl FullNode { let server = FullNode::new_validator( bank, entry_height, + Some(ledger_tail), node, network_entry_point, exit.clone(), @@ -99,6 +103,7 @@ impl FullNode { let server = FullNode::new_leader( bank, entry_height, + Some(ledger_tail), //Some(Duration::from_millis(1000)), None, node, @@ -112,6 +117,7 @@ impl FullNode { server } } + /// Create a server instance acting as a leader. /// /// ```text @@ -139,6 +145,7 @@ impl FullNode { pub fn new_leader( bank: Bank, entry_height: u64, + ledger_tail: Option>, tick_duration: Option, node: TestNode, exit: Arc, @@ -165,7 +172,9 @@ impl FullNode { ); thread_hdls.extend(tpu.thread_hdls); let crdt = Arc::new(RwLock::new(Crdt::new(node.data))); - let window = streamer::default_window(); + + let window = fullnode_window(ledger_tail, entry_height, &crdt, &blob_recycler); + let ncp = Ncp::new( crdt.clone(), window.clone(), @@ -221,6 +230,7 @@ impl FullNode { pub fn new_validator( bank: Bank, entry_height: u64, + ledger_tail: Option>, node: TestNode, entry_point: ReplicatedData, exit: Arc, @@ -239,7 +249,11 @@ impl FullNode { crdt.write() .expect("'crdt' write lock before insert() in pub fn replicate") .insert(&entry_point); - let window = streamer::default_window(); + + let blob_recycler = BlobRecycler::default(); + + let window = fullnode_window(ledger_tail, entry_height, &crdt, &blob_recycler); + let ncp = Ncp::new( crdt.clone(), window.clone(), @@ -263,6 +277,29 @@ impl FullNode { FullNode { thread_hdls } } } + +fn fullnode_window( + ledger_tail: Option>, + entry_height: u64, + crdt: &Arc>, + blob_recycler: &BlobRecycler, +) -> streamer::Window { + match ledger_tail { + Some(ledger_tail) => { + let mut blobs = VecDeque::new(); + + // convert to blobs + ledger_tail.to_blobs(&blob_recycler, &mut blobs); + + // flatten deque to vec + let blobs: Vec<_> = blobs.into_iter().collect(); + + streamer::initialized_window(&crdt, blobs, entry_height) + } + None => streamer::default_window(), + } +} + #[cfg(test)] mod tests { use bank::Bank; @@ -278,7 +315,7 @@ mod tests { let bank = Bank::new(&alice); let exit = Arc::new(AtomicBool::new(false)); let entry = tn.data.clone(); - let v = FullNode::new_validator(bank, 0, tn, entry, exit.clone()); + let v = FullNode::new_validator(bank, 0, None, tn, entry, exit.clone()); exit.store(true, Ordering::Relaxed); for t in v.thread_hdls { t.join().unwrap(); diff --git a/src/streamer.rs b/src/streamer.rs index 91674657fdb5bd..9b1818d34d7bb9 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -458,6 +458,44 @@ pub fn default_window() -> Window { Arc::new(RwLock::new(vec![None; WINDOW_SIZE as usize])) } +/// Initialize a rebroadcast window with most recent Entry blobs +/// * `crdt` - gossip instance, used to set blob ids +/// * `blobs` - up to WINDOW_SIZE most recent blobs +/// * `entry_height` - current entry height +pub fn initialized_window( + crdt: &Arc>, + blobs: Vec, + entry_height: u64, +) -> Window { + let window = default_window(); + + { + let mut win = window.write().unwrap(); + assert!(blobs.len() <= win.len()); + + debug!( + "initialized window entry_height:{} blobs_len:{}", + entry_height, + blobs.len() + ); + + // Index the blobs + let mut received = entry_height - blobs.len() as u64; + Crdt::index_blobs(crdt, &blobs, &mut received).expect("index blobs for initial window"); + + // populate the window, offset by implied index + for b in blobs { + let ix = b.read().unwrap().get_index().expect("blob index"); + let pos = (ix % WINDOW_SIZE) as usize; + trace!("caching {} at {}", ix, pos); + assert!(win[pos].is_none()); + win[pos] = Some(b); + } + } + + window +} + pub fn window( exit: Arc, crdt: Arc>, diff --git a/src/thin_client.rs b/src/thin_client.rs index cd1866f0c991c0..e121c6b3f90bed 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -251,6 +251,7 @@ mod tests { let server = FullNode::new_leader( bank, 0, + None, Some(Duration::from_millis(30)), leader, exit.clone(), @@ -292,6 +293,7 @@ mod tests { let server = FullNode::new_leader( bank, 0, + None, Some(Duration::from_millis(30)), leader, exit.clone(), @@ -345,6 +347,7 @@ mod tests { let server = FullNode::new_leader( bank, 0, + None, Some(Duration::from_millis(30)), leader, exit.clone(),