Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

updated process_verified_transactions to utilize existential type #224

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 147 additions & 14 deletions src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ use event::Event;
use hash::Hash;
use mint::Mint;
use plan::{Payment, Plan, Witness};
use rayon::iter::ParallelIterator;
use rayon::prelude::*;
use signature::{KeyPair, PublicKey, Signature};
use std::collections::hash_map::Entry::Occupied;
use std::collections::{HashMap, HashSet, VecDeque};
use std::result;
use std::sync::RwLock;
use std::sync::atomic::{AtomicIsize, AtomicUsize, Ordering};
use std::sync::RwLock;
use transaction::Transaction;

pub const MAX_ENTRY_IDS: usize = 1024 * 4;
Expand Down Expand Up @@ -224,22 +225,23 @@ impl Bank {
}

/// Process a batch of verified transactions.
pub fn process_verified_transactions(&self, trs: Vec<Transaction>) -> Vec<Result<Transaction>> {
pub fn process_verified_transactions<'a>(
&'a self,
trs: Vec<Transaction>,
) -> impl ParallelIterator<Item = Result<Transaction>> + 'a {
// Run all debits first to filter out any transactions that can't be processed
// in parallel deterministically.
let results: Vec<_> = trs.into_par_iter()
.map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr))
.collect(); // Calling collect() here forces all debits to complete before moving on.

results
.into_par_iter()
.map(|result| {
result.map(|tr| {
self.process_verified_transaction_credits(&tr);
tr
})
// returns a parallel iterator because process_verified_events immediately maps
results.into_par_iter().map(move |result| {
result.map(|tr| {
self.process_verified_transaction_credits(&tr);
tr
})
.collect()
})
}

fn partition_events(events: Vec<Event>) -> (Vec<Transaction>, Vec<Event>) {
Expand All @@ -254,10 +256,41 @@ impl Bank {
(trs, rest)
}

pub fn process_verified_transactions_old(&self, trs: Vec<Transaction>) -> Vec<Result<Transaction>> {
// Run all debits first to filter out any transactions that can't be processed
// in parallel deterministically.
let results: Vec<_> = trs.into_par_iter()
.map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr))
.collect(); // Calling collect() here forces all debits to complete before moving on.

results
.into_par_iter()
.map(|result| {
result.map(|tr| {
self.process_verified_transaction_credits(&tr);
tr
})
})
.collect()
}

pub fn process_verified_events_old(&self, events: Vec<Event>) -> Vec<Result<Event>> {
let (trs, rest) = Self::partition_events(events);
let mut results: Vec<_> = self.process_verified_transactions_old(trs)
.into_iter()
.map(|x| x.map(Event::Transaction))
.collect();

for event in rest {
results.push(self.process_verified_event(event));
}

results
}

pub fn process_verified_events(&self, events: Vec<Event>) -> Vec<Result<Event>> {
let (trs, rest) = Self::partition_events(events);
let mut results: Vec<_> = self.process_verified_transactions(trs)
.into_iter()
.map(|x| x.map(Event::Transaction))
.collect();

Expand Down Expand Up @@ -582,7 +615,7 @@ mod tests {
let tr0 = Transaction::new(&mint.keypair(), keypair.pubkey(), 2, mint.last_id());
let tr1 = Transaction::new(&keypair, mint.pubkey(), 1, mint.last_id());
let trs = vec![tr0, tr1];
let results = bank.process_verified_transactions(trs);
let results: Vec<Result<Transaction>> = bank.process_verified_transactions(trs).collect();
assert!(results[1].is_err());

// Assert bad transactions aren't counted.
Expand All @@ -600,8 +633,70 @@ mod bench {
use signature::KeyPairUtil;

#[bench]
fn process_verified_event_bench(bencher: &mut Bencher) {
let mint = Mint::new(100_000_000);
fn process_verified_transactions_old_bench(bencher: &mut Bencher) {
extern crate untrusted;

use signature::{GenKeys, KeyPair};
use event::Event;
use untrusted::Input;

let tokens_per_user = 1_000;

bencher.iter(|| {
let mint = Mint::new(1_000_000_000);
let num_accounts = 100_000;
let mint_keypair = mint.keypair();
let rnd = GenKeys::new(mint.keypair().public_key_bytes());
let users = rnd.gen_n_keys(num_accounts, tokens_per_user);
let bank = Bank::new(&mint);

let events: Vec<_> = users
.into_par_iter()
.map(|(pkcs8, tokens)| {
let last_id = mint.last_id();
let rando = KeyPair::from_pkcs8(Input::from(&pkcs8)).unwrap();
let tr = Transaction::new(&mint_keypair, rando.pubkey(), tokens, last_id);
Event::Transaction(tr)
})
.collect();
bank.process_verified_events_old(events);
})
}

#[bench]
fn process_verified_transactions_new_bench(bencher: &mut Bencher) {
extern crate untrusted;

use signature::{GenKeys, KeyPair};
use event::Event;
use untrusted::Input;

let tokens_per_user = 1_000;

bencher.iter(|| {
let mint = Mint::new(1_000_000_000);
let num_accounts = 100_000;
let mint_keypair = mint.keypair();
let rnd = GenKeys::new(mint.keypair().public_key_bytes());
let users = rnd.gen_n_keys(num_accounts, tokens_per_user);
let bank = Bank::new(&mint);

let events: Vec<_> = users
.into_par_iter()
.map(|(pkcs8, tokens)| {
let last_id = mint.last_id();
let rando = KeyPair::from_pkcs8(Input::from(&pkcs8)).unwrap();
let tr = Transaction::new(&mint_keypair, rando.pubkey(), tokens, last_id);
Event::Transaction(tr)
})
.collect();
bank.process_verified_events(events);
})
}

#[bench]
fn process_verified_event_new_bench(bencher: &mut Bencher) {
let mint = Mint::new(1_000_000_000);
let bank = Bank::new(&mint);
// Create transactions between unrelated parties.
let transactions: Vec<_> = (0..4096)
Expand Down Expand Up @@ -632,6 +727,44 @@ mod bench {

assert!(
bank.process_verified_transactions(transactions.clone())
.all(|x| x.is_ok())
);
});
}

#[bench]
fn process_verified_event_old_bench(bencher: &mut Bencher) {
let mint = Mint::new(1_000_000_000);
let bank = Bank::new(&mint);
// Create transactions between unrelated parties.
let transactions: Vec<_> = (0..4096)
.into_par_iter()
.map(|i| {
// Seed the 'from' account.
let rando0 = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, mint.last_id());
bank.process_verified_transaction(&tr).unwrap();

// Seed the 'to' account and a cell for its signature.
let last_id = hash(&serialize(&i).unwrap()); // Unique hash
bank.register_entry_id(&last_id);

let rando1 = KeyPair::new();
let tr = Transaction::new(&rando0, rando1.pubkey(), 1, last_id);
bank.process_verified_transaction(&tr).unwrap();

// Finally, return a transaction that's unique
Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
})
.collect();
bencher.iter(|| {
// Since benchmarker runs this multiple times, we need to clear the signatures.
for sigs in bank.last_ids.read().unwrap().iter() {
sigs.1.write().unwrap().clear();
}

assert!(
bank.process_verified_transactions_old(transactions.clone())
.iter()
.all(|x| x.is_ok())
);
Expand Down