Skip to content

Commit

Permalink
support an initial window filled with last up-to-WINDOW_SIZE blobs
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-solana committed Jul 4, 2018
1 parent 22d2c96 commit 2acf401
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 18 deletions.
51 changes: 36 additions & 15 deletions src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::result;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::RwLock;
use std::time::Instant;
use streamer::WINDOW_SIZE;
use timing::duration_as_us;
use transaction::{Instruction, Plan, Transaction};

Expand Down Expand Up @@ -322,10 +323,7 @@ impl Bank {
}

/// Process an ordered list of entries.
pub fn process_entries<I>(&self, entries: I) -> Result<u64>
where
I: IntoIterator<Item = Entry>,
{
pub fn process_entries(&self, entries: Vec<Entry>) -> Result<u64> {
let mut entry_count = 0;
for entry in entries {
entry_count += 1;
Expand Down Expand Up @@ -364,7 +362,7 @@ impl Bank {
}

/// Process a full ledger.
pub fn process_ledger<I>(&self, entries: I) -> Result<u64>
pub fn process_ledger<I>(&self, entries: I) -> Result<(u64, Vec<Entry>)>
where
I: IntoIterator<Item = Entry>,
{
Expand All @@ -380,20 +378,39 @@ impl Bank {
let entry1 = entries
.next()
.expect("invalid ledger: need at least 2 entries");
let tx = &entry1.transactions[0];
let deposit = if let Instruction::NewContract(contract) = &tx.instruction {
contract.plan.final_payment()
} else {
None
}.expect("invalid ledger, needs to start with a contract");
{
let tx = &entry1.transactions[0];
let deposit = if let Instruction::NewContract(contract) = &tx.instruction {
contract.plan.final_payment()
} else {
None
}.expect("invalid ledger, needs to start with a contract");

self.apply_payment(&deposit, &mut self.balances.write().unwrap());
self.apply_payment(&deposit, &mut self.balances.write().unwrap());
}
self.register_entry_id(&entry0.id);
self.register_entry_id(&entry1.id);

let mut entry_count = 2;
entry_count += self.process_blocks(entries)?;
Ok(entry_count)
let mut tail = Vec::with_capacity(WINDOW_SIZE as usize);
let mut next = Vec::with_capacity(WINDOW_SIZE as usize);

for block in &entries.into_iter().chunks(WINDOW_SIZE as usize) {
tail = next;
next = block.collect();
entry_count += self.process_blocks(next.clone())?;
}

tail.append(&mut next);

if tail.len() < WINDOW_SIZE as usize {
tail.insert(0, entry1);
if tail.len() < WINDOW_SIZE as usize {
tail.insert(0, entry0);
}
}

Ok((entry_count, tail))
}

/// Process a Witness Signature. Any payment plans waiting on this signature
Expand Down Expand Up @@ -795,9 +812,13 @@ mod tests {
#[test]
fn test_process_ledger() {
let (ledger, pubkey) = create_sample_ledger();
let (ledger, dup) = ledger.tee();
let bank = Bank::default();
bank.process_ledger(ledger).unwrap();
let (ledger_height, tail) = bank.process_ledger(ledger).unwrap();
assert_eq!(bank.get_balance(&pubkey), 1);
assert_eq!(ledger_height, 3);
assert_eq!(tail.len(), 3);
assert_eq!(tail, dup.collect_vec());
}

// Write the given entries to a file and then return a file iterator to them.
Expand Down
1 change: 1 addition & 0 deletions src/drone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ mod tests {
let server = FullNode::new_leader(
bank,
0,
None,
Some(Duration::from_millis(30)),
leader,
exit.clone(),
Expand Down
19 changes: 16 additions & 3 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

use bank::Bank;
use crdt::{Crdt, ReplicatedData, TestNode};
use entry::Entry;
use entry_writer;
use ledger::Block;
use ncp::Ncp;
use packet::BlobRecycler;
use rpu::Rpu;
use std::collections::VecDeque;
use std::fs::{File, OpenOptions};
use std::io::{sink, stdin, stdout, BufReader};
use std::io::{Read, Write};
Expand Down Expand Up @@ -50,9 +53,9 @@ impl FullNode {
};
let reader = BufReader::new(infile);
let entries = entry_writer::read_entries(reader).map(|e| e.expect("failed to parse entry"));
info!("processing ledger...");
let entry_height = bank.process_ledger(entries).expect("process_ledger");

info!("processing ledger...");
let (entry_height, ledger_tail) = bank.process_ledger(entries).expect("process_ledger");
// entry_height is the network-wide agreed height of the ledger.
// initialize it from the input ledger
info!("processed {} ledger...", entry_height);
Expand Down Expand Up @@ -99,6 +102,7 @@ impl FullNode {
let server = FullNode::new_leader(
bank,
entry_height,
Some(ledger_tail),
//Some(Duration::from_millis(1000)),
None,
node,
Expand Down Expand Up @@ -139,6 +143,7 @@ impl FullNode {
pub fn new_leader<W: Write + Send + 'static>(
bank: Bank,
entry_height: u64,
ledger_tail: Option<Vec<Entry>>,
tick_duration: Option<Duration>,
node: TestNode,
exit: Arc<AtomicBool>,
Expand All @@ -165,7 +170,15 @@ impl FullNode {
);
thread_hdls.extend(tpu.thread_hdls);
let crdt = Arc::new(RwLock::new(Crdt::new(node.data)));
let window = streamer::default_window();
let window = match ledger_tail {
Some(ledger_tail) => {
let mut blobs = VecDeque::new();
ledger_tail.to_blobs(&blob_recycler, &mut blobs);
streamer::initialized_window(&crdt, blobs, entry_height)
}
None => streamer::default_window(),
};

let ncp = Ncp::new(
crdt.clone(),
window.clone(),
Expand Down
39 changes: 39 additions & 0 deletions src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,45 @@ pub fn default_window() -> Window {
Arc::new(RwLock::new(vec![None; WINDOW_SIZE as usize]))
}

/// Initialize a rebroadcast window with most recent Entry blobs
/// * `blobs` - up to WINDOW_SIZE most recent blobs
/// * `entry_height` - current entry height
pub fn initialized_window(
crdt: &Arc<RwLock<Crdt>>,
blobs: VecDeque<SharedBlob>,
entry_height: u64,
) -> Window {
let window = default_window();

{
let mut win = window.write().unwrap();
assert!(blobs.len() <= win.len());

// flatten deque to vec
let mut blobs: Vec<_> = blobs.into_iter().collect();

debug!(
"initialized window entry_height:{} blobs_len:{}",
entry_height,
blobs.len()
);
// Index the blobs
let mut received = entry_height - blobs.len() as u64;
Crdt::index_blobs(crdt, &blobs, &mut received).unwrap();

// populate the window, offset by implied index
while let Some(b) = blobs.pop() {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize;
trace!("caching {} at {}", ix, pos);
assert!(win[pos].is_none());
win[pos] = Some(b);
}
}

window
}

pub fn window(
exit: Arc<AtomicBool>,
crdt: Arc<RwLock<Crdt>>,
Expand Down
3 changes: 3 additions & 0 deletions src/thin_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ mod tests {
let server = FullNode::new_leader(
bank,
0,
None,
Some(Duration::from_millis(30)),
leader,
exit.clone(),
Expand Down Expand Up @@ -292,6 +293,7 @@ mod tests {
let server = FullNode::new_leader(
bank,
0,
None,
Some(Duration::from_millis(30)),
leader,
exit.clone(),
Expand Down Expand Up @@ -345,6 +347,7 @@ mod tests {
let server = FullNode::new_leader(
bank,
0,
None,
Some(Duration::from_millis(30)),
leader,
exit.clone(),
Expand Down

0 comments on commit 2acf401

Please sign in to comment.