Skip to content

Commit

Permalink
support an initial window filled with last up-to-WINDOW_SIZE blobs
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-solana committed Jul 6, 2018
1 parent 22d2c96 commit 24cdff1
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 32 deletions.
98 changes: 71 additions & 27 deletions src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::result;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::RwLock;
use std::time::Instant;
use streamer::WINDOW_SIZE;
use timing::duration_as_us;
use transaction::{Instruction, Plan, Transaction};

Expand Down Expand Up @@ -322,10 +323,7 @@ impl Bank {
}

/// Process an ordered list of entries.
pub fn process_entries<I>(&self, entries: I) -> Result<u64>
where
I: IntoIterator<Item = Entry>,
{
pub fn process_entries(&self, entries: Vec<Entry>) -> Result<u64> {
let mut entry_count = 0;
for entry in entries {
entry_count += 1;
Expand Down Expand Up @@ -364,7 +362,7 @@ impl Bank {
}

/// Process a full ledger.
pub fn process_ledger<I>(&self, entries: I) -> Result<u64>
pub fn process_ledger<I>(&self, entries: I) -> Result<(u64, Vec<Entry>)>
where
I: IntoIterator<Item = Entry>,
{
Expand All @@ -380,20 +378,39 @@ impl Bank {
let entry1 = entries
.next()
.expect("invalid ledger: need at least 2 entries");
let tx = &entry1.transactions[0];
let deposit = if let Instruction::NewContract(contract) = &tx.instruction {
contract.plan.final_payment()
} else {
None
}.expect("invalid ledger, needs to start with a contract");
{
let tx = &entry1.transactions[0];
let deposit = if let Instruction::NewContract(contract) = &tx.instruction {
contract.plan.final_payment()
} else {
None
}.expect("invalid ledger, needs to start with a contract");

self.apply_payment(&deposit, &mut self.balances.write().unwrap());
self.apply_payment(&deposit, &mut self.balances.write().unwrap());
}
self.register_entry_id(&entry0.id);
self.register_entry_id(&entry1.id);

let mut entry_count = 2;
entry_count += self.process_blocks(entries)?;
Ok(entry_count)
let mut tail = Vec::with_capacity(WINDOW_SIZE as usize);
let mut next = Vec::with_capacity(WINDOW_SIZE as usize);

for block in &entries.into_iter().chunks(WINDOW_SIZE as usize) {
tail = next;
next = block.collect();
entry_count += self.process_blocks(next.clone())?;
}

tail.append(&mut next);

if tail.len() < WINDOW_SIZE as usize {
tail.insert(0, entry1);
if tail.len() < WINDOW_SIZE as usize {
tail.insert(0, entry0);
}
}

Ok((entry_count, tail))
}

/// Process a Witness Signature. Any payment plans waiting on this signature
Expand Down Expand Up @@ -526,9 +543,9 @@ mod tests {
use super::*;
use bincode::serialize;
use entry::next_entry;
use entry::Entry;
use entry_writer::{self, EntryWriter};
use hash::hash;
use ledger::next_entries;
use signature::KeyPairUtil;
use std::io::{BufReader, Cursor, Seek, SeekFrom};

Expand Down Expand Up @@ -779,25 +796,52 @@ mod tests {
assert_eq!(bank.get_balance(&mint.pubkey()), 1);
}

fn create_sample_block(mint: &Mint) -> impl Iterator<Item = Entry> {
let keypair = KeyPair::new();
let tx = Transaction::new(&mint.keypair(), keypair.pubkey(), 1, mint.last_id());
next_entries(&mint.last_id(), 0, vec![tx]).into_iter()
fn create_sample_block(mint: &Mint, length: usize) -> impl Iterator<Item = Entry> {
let mut entries = Vec::with_capacity(length);
let mut hash = mint.last_id();
let mut cur_hashes = 0;
for _ in 0..length {
let keypair = KeyPair::new();
let tx = Transaction::new(&mint.keypair(), keypair.pubkey(), 1, mint.last_id());
let entry = Entry::new_mut(&mut hash, &mut cur_hashes, vec![tx], false);
entries.push(entry);
}
entries.into_iter()
}

fn create_sample_ledger() -> (impl Iterator<Item = Entry>, PublicKey) {
let mint = Mint::new(2);
fn create_sample_ledger(length: usize) -> (impl Iterator<Item = Entry>, PublicKey) {
let mint = Mint::new(1 + length as i64);
let genesis = mint.create_entries();
let block = create_sample_block(&mint);
let block = create_sample_block(&mint, length);
(genesis.into_iter().chain(block), mint.pubkey())
}

#[test]
fn test_process_ledger() {
let (ledger, pubkey) = create_sample_ledger();
let (ledger, pubkey) = create_sample_ledger(1);
let (ledger, dup) = ledger.tee();
let bank = Bank::default();
bank.process_ledger(ledger).unwrap();
let (ledger_height, tail) = bank.process_ledger(ledger).unwrap();
assert_eq!(bank.get_balance(&pubkey), 1);
assert_eq!(ledger_height, 3);
assert_eq!(tail.len(), 3);
assert_eq!(tail, dup.collect_vec());
let last_entry = &tail[tail.len() - 1];
assert_eq!(bank.last_id(), last_entry.id);
}

#[test]
fn test_process_ledger_around_window_size() {
let window_size = WINDOW_SIZE as usize;
for entry_count in window_size - 1..window_size + 1 {
let (ledger, pubkey) = create_sample_ledger(entry_count);
let bank = Bank::default();
let (ledger_height, tail) = bank.process_ledger(ledger).unwrap();
assert_eq!(bank.get_balance(&pubkey), 1);
assert_eq!(ledger_height, entry_count as u64 + 2);
assert!(tail.len() <= window_size);
let last_entry = &tail[tail.len() - 1];
assert_eq!(bank.last_id(), last_entry.id);
}
}

// Write the given entries to a file and then return a file iterator to them.
Expand All @@ -812,7 +856,7 @@ mod tests {

#[test]
fn test_process_ledger_from_file() {
let (ledger, pubkey) = create_sample_ledger();
let (ledger, pubkey) = create_sample_ledger(1);
let ledger = to_file_iter(ledger);

let bank = Bank::default();
Expand All @@ -824,7 +868,7 @@ mod tests {
fn test_process_ledger_from_files() {
let mint = Mint::new(2);
let genesis = to_file_iter(mint.create_entries().into_iter());
let block = to_file_iter(create_sample_block(&mint));
let block = to_file_iter(create_sample_block(&mint, 1));

let bank = Bank::default();
bank.process_ledger(genesis.chain(block)).unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/drone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ mod tests {
let server = FullNode::new_leader(
bank,
0,
None,
Some(Duration::from_millis(30)),
leader,
exit.clone(),
Expand Down
47 changes: 42 additions & 5 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

use bank::Bank;
use crdt::{Crdt, ReplicatedData, TestNode};
use entry::Entry;
use entry_writer;
use ledger::Block;
use ncp::Ncp;
use packet::BlobRecycler;
use rpu::Rpu;
use std::collections::VecDeque;
use std::fs::{File, OpenOptions};
use std::io::{sink, stdin, stdout, BufReader};
use std::io::{Read, Write};
Expand Down Expand Up @@ -50,9 +53,9 @@ impl FullNode {
};
let reader = BufReader::new(infile);
let entries = entry_writer::read_entries(reader).map(|e| e.expect("failed to parse entry"));
info!("processing ledger...");
let entry_height = bank.process_ledger(entries).expect("process_ledger");

info!("processing ledger...");
let (entry_height, ledger_tail) = bank.process_ledger(entries).expect("process_ledger");
// entry_height is the network-wide agreed height of the ledger.
// initialize it from the input ledger
info!("processed {} ledger...", entry_height);
Expand All @@ -73,6 +76,7 @@ impl FullNode {
let server = FullNode::new_validator(
bank,
entry_height,
Some(ledger_tail),
node,
network_entry_point,
exit.clone(),
Expand All @@ -99,6 +103,7 @@ impl FullNode {
let server = FullNode::new_leader(
bank,
entry_height,
Some(ledger_tail),
//Some(Duration::from_millis(1000)),
None,
node,
Expand All @@ -112,6 +117,7 @@ impl FullNode {
server
}
}

/// Create a server instance acting as a leader.
///
/// ```text
Expand Down Expand Up @@ -139,6 +145,7 @@ impl FullNode {
pub fn new_leader<W: Write + Send + 'static>(
bank: Bank,
entry_height: u64,
ledger_tail: Option<Vec<Entry>>,
tick_duration: Option<Duration>,
node: TestNode,
exit: Arc<AtomicBool>,
Expand All @@ -165,7 +172,9 @@ impl FullNode {
);
thread_hdls.extend(tpu.thread_hdls);
let crdt = Arc::new(RwLock::new(Crdt::new(node.data)));
let window = streamer::default_window();

let window = fullnode_window(ledger_tail, entry_height, &crdt, &blob_recycler);

let ncp = Ncp::new(
crdt.clone(),
window.clone(),
Expand Down Expand Up @@ -221,6 +230,7 @@ impl FullNode {
pub fn new_validator(
bank: Bank,
entry_height: u64,
ledger_tail: Option<Vec<Entry>>,
node: TestNode,
entry_point: ReplicatedData,
exit: Arc<AtomicBool>,
Expand All @@ -239,7 +249,11 @@ impl FullNode {
crdt.write()
.expect("'crdt' write lock before insert() in pub fn replicate")
.insert(&entry_point);
let window = streamer::default_window();

let blob_recycler = BlobRecycler::default();

let window = fullnode_window(ledger_tail, entry_height, &crdt, &blob_recycler);

let ncp = Ncp::new(
crdt.clone(),
window.clone(),
Expand All @@ -263,6 +277,29 @@ impl FullNode {
FullNode { thread_hdls }
}
}

fn fullnode_window(
ledger_tail: Option<Vec<Entry>>,
entry_height: u64,
crdt: &Arc<RwLock<Crdt>>,
blob_recycler: &BlobRecycler,
) -> streamer::Window {
match ledger_tail {
Some(ledger_tail) => {
let mut blobs = VecDeque::new();

// convert to blobs
ledger_tail.to_blobs(&blob_recycler, &mut blobs);

// flatten deque to vec
let blobs: Vec<_> = blobs.into_iter().collect();

streamer::initialized_window(&crdt, blobs, entry_height)
}
None => streamer::default_window(),
}
}

#[cfg(test)]
mod tests {
use bank::Bank;
Expand All @@ -278,7 +315,7 @@ mod tests {
let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false));
let entry = tn.data.clone();
let v = FullNode::new_validator(bank, 0, tn, entry, exit.clone());
let v = FullNode::new_validator(bank, 0, None, tn, entry, exit.clone());
exit.store(true, Ordering::Relaxed);
for t in v.thread_hdls {
t.join().unwrap();
Expand Down
38 changes: 38 additions & 0 deletions src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,44 @@ pub fn default_window() -> Window {
Arc::new(RwLock::new(vec![None; WINDOW_SIZE as usize]))
}

/// Initialize a rebroadcast window with most recent Entry blobs
/// * `crdt` - gossip instance, used to set blob ids
/// * `blobs` - up to WINDOW_SIZE most recent blobs
/// * `entry_height` - current entry height
pub fn initialized_window(
crdt: &Arc<RwLock<Crdt>>,
blobs: Vec<SharedBlob>,
entry_height: u64,
) -> Window {
let window = default_window();

{
let mut win = window.write().unwrap();
assert!(blobs.len() <= win.len());

debug!(
"initialized window entry_height:{} blobs_len:{}",
entry_height,
blobs.len()
);

// Index the blobs
let mut received = entry_height - blobs.len() as u64;
Crdt::index_blobs(crdt, &blobs, &mut received).expect("index blobs for initial window");

// populate the window, offset by implied index
for b in blobs {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize;
trace!("caching {} at {}", ix, pos);
assert!(win[pos].is_none());
win[pos] = Some(b);
}
}

window
}

pub fn window(
exit: Arc<AtomicBool>,
crdt: Arc<RwLock<Crdt>>,
Expand Down
3 changes: 3 additions & 0 deletions src/thin_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ mod tests {
let server = FullNode::new_leader(
bank,
0,
None,
Some(Duration::from_millis(30)),
leader,
exit.clone(),
Expand Down Expand Up @@ -292,6 +293,7 @@ mod tests {
let server = FullNode::new_leader(
bank,
0,
None,
Some(Duration::from_millis(30)),
leader,
exit.clone(),
Expand Down Expand Up @@ -345,6 +347,7 @@ mod tests {
let server = FullNode::new_leader(
bank,
0,
None,
Some(Duration::from_millis(30)),
leader,
exit.clone(),
Expand Down

0 comments on commit 24cdff1

Please sign in to comment.