-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New Transaction Queue implementation #8074
Changes from 52 commits
c467c7d
15a52b7
d358050
34e1eb1
1b5ff18
f77a46d
b176521
d966a4b
43e8289
5e783eb
71fa691
a70f1c3
afae75b
47e8928
4b25b86
81a96e4
b538413
3368324
7a24420
fc4a0aa
e595e83
620e6be
5a6cdf5
0f2424c
a1cb18c
e902bec
fede1ea
5075cec
449facf
e2be467
c07ea86
b720077
fe74263
8312e4a
51e2f43
54965d6
43e9358
236ac24
d698145
e97e453
f079b91
41a5acc
35c7783
886224a
b4d9341
03e89ec
900b446
7dab842
8046ef9
83c92bc
eb025db
1f1694b
893131f
660b6b0
39c6bb4
006d635
a6b9324
cfc2200
ce02494
68b3f2a
72f49d4
8ab2912
901e532
7f15140
1dcc47e
a74f18a
6d7d16b
bdfdb3d
2d1da26
cca0db7
281e54d
6a304c4
383a908
7aaff4f
93f93a0
7f290e7
bed88ee
4a7940e
6dcea90
c36d39d
c92812b
be75720
357fcf3
f41b0f2
da0f845
296cc42
e199b86
285f4fd
af81bee
53f328c
5e012ae
0e2bf52
7f15c30
483b70c
b7789e2
1c757e0
d7c8b94
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -131,5 +131,5 @@ members = [ | |
"miner", | ||
"transaction-pool", | ||
"whisper", | ||
"util/rlp_compress" | ||
"util/rlp_compress", | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -120,6 +120,18 @@ impl AccountTransactions { | |
} | ||
} | ||
|
||
/// Transaction import result. | ||
pub enum ImportResult { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe In our case There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agree, but worth noting that this is a type that existed before, only moved. |
||
/// Transaction has been imported to the current queue. | ||
/// | ||
/// It's going to be propagated to peers. | ||
Current, | ||
/// Transaction has been imported to future queue. | ||
/// | ||
/// It means it won't be propagated until the gap is filled. | ||
Future | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not very clear for me why these options were selected. Maybe adopt something like |
||
} | ||
|
||
type Listener = Box<Fn(&[H256]) + Send + Sync>; | ||
|
||
/// Light transaction queue. See module docs for more details. | ||
|
@@ -142,7 +154,7 @@ impl fmt::Debug for TransactionQueue { | |
|
||
impl TransactionQueue { | ||
/// Import a pending transaction to be queued. | ||
pub fn import(&mut self, tx: PendingTransaction) -> Result<transaction::ImportResult, transaction::Error> { | ||
pub fn import(&mut self, tx: PendingTransaction) -> Result<ImportResult, transaction::Error> { | ||
let sender = tx.sender(); | ||
let hash = tx.hash(); | ||
let nonce = tx.nonce; | ||
|
@@ -158,7 +170,7 @@ impl TransactionQueue { | |
future: BTreeMap::new(), | ||
}); | ||
|
||
(transaction::ImportResult::Current, vec![hash]) | ||
(ImportResult::Current, vec![hash]) | ||
} | ||
Entry::Occupied(mut entry) => { | ||
let acct_txs = entry.get_mut(); | ||
|
@@ -180,7 +192,7 @@ impl TransactionQueue { | |
let old = ::std::mem::replace(&mut acct_txs.current[idx], tx_info); | ||
self.by_hash.remove(&old.hash); | ||
|
||
(transaction::ImportResult::Current, vec![hash]) | ||
(ImportResult::Current, vec![hash]) | ||
} | ||
Err(idx) => { | ||
let cur_len = acct_txs.current.len(); | ||
|
@@ -202,13 +214,13 @@ impl TransactionQueue { | |
acct_txs.future.insert(future_nonce, future); | ||
} | ||
|
||
(transaction::ImportResult::Current, vec![hash]) | ||
(ImportResult::Current, vec![hash]) | ||
} else if idx == cur_len && acct_txs.current.last().map_or(false, |f| f.nonce + 1.into() != nonce) { | ||
trace!(target: "txqueue", "Queued future transaction for {}, nonce={}", sender, nonce); | ||
let future_nonce = nonce; | ||
acct_txs.future.insert(future_nonce, tx_info); | ||
|
||
(transaction::ImportResult::Future, vec![]) | ||
(ImportResult::Future, vec![]) | ||
} else { | ||
trace!(target: "txqueue", "Queued current transaction for {}, nonce={}", sender, nonce); | ||
|
||
|
@@ -217,7 +229,7 @@ impl TransactionQueue { | |
let mut promoted = acct_txs.adjust_future(); | ||
promoted.insert(0, hash); | ||
|
||
(transaction::ImportResult::Current, promoted) | ||
(ImportResult::Current, promoted) | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -336,8 +336,33 @@ impl<'x> OpenBlock<'x> { | |
} | ||
|
||
/// Push transactions onto the block. | ||
pub fn push_transactions(&mut self, transactions: &[SignedTransaction]) -> Result<(), Error> { | ||
push_transactions(self, transactions) | ||
#[cfg(not(feature = "slow-blocks"))] | ||
fn push_transactions(&mut self, transactions: Vec<SignedTransaction>) -> Result<(), Error> { | ||
for t in transactions { | ||
self.push_transaction(t, None)?; | ||
} | ||
Ok(()) | ||
} | ||
|
||
/// Push transactions onto the block. | ||
#[cfg(feature = "slow-blocks")] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd love to see a comment explaining what There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added documentation in |
||
fn push_transactions(&mut self, transactions: Vec<SignedTransaction>) -> Result<(), Error> { | ||
use std::time; | ||
|
||
let slow_tx = option_env!("SLOW_TX_DURATION").and_then(|v| v.parse().ok()).unwrap_or(100); | ||
for t in transactions { | ||
let hash = t.hash(); | ||
let start = time::Instant::now(); | ||
self.push_transaction(t, None)?; | ||
let took = start.elapsed(); | ||
let took_ms = took.as_secs() * 1000 + took.subsec_nanos() as u64 / 1000000; | ||
if took > time::Duration::from_millis(slow_tx) { | ||
warn!("Heavy ({} ms) transaction in block {:?}: {:?}", took_ms, block.header().number(), hash); | ||
} | ||
debug!(target: "tx", "Transaction {:?} took: {} ms", hash, took_ms); | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Populate self from a header. | ||
|
@@ -534,10 +559,10 @@ impl IsBlock for SealedBlock { | |
} | ||
|
||
/// Enact the block given by block header, transactions and uncles | ||
pub fn enact( | ||
header: &Header, | ||
transactions: &[SignedTransaction], | ||
uncles: &[Header], | ||
fn enact( | ||
header: Header, | ||
transactions: Vec<SignedTransaction>, | ||
uncles: Vec<Header>, | ||
engine: &EthEngine, | ||
tracing: bool, | ||
db: StateDB, | ||
|
@@ -567,48 +592,19 @@ pub fn enact( | |
is_epoch_begin, | ||
)?; | ||
|
||
b.populate_from(header); | ||
b.populate_from(&header); | ||
b.push_transactions(transactions)?; | ||
|
||
for u in uncles { | ||
b.push_uncle(u.clone())?; | ||
b.push_uncle(u)?; | ||
} | ||
|
||
Ok(b.close_and_lock()) | ||
} | ||
|
||
#[inline] | ||
#[cfg(not(feature = "slow-blocks"))] | ||
fn push_transactions(block: &mut OpenBlock, transactions: &[SignedTransaction]) -> Result<(), Error> { | ||
for t in transactions { | ||
block.push_transaction(t.clone(), None)?; | ||
} | ||
Ok(()) | ||
} | ||
|
||
#[cfg(feature = "slow-blocks")] | ||
fn push_transactions(block: &mut OpenBlock, transactions: &[SignedTransaction]) -> Result<(), Error> { | ||
use std::time; | ||
|
||
let slow_tx = option_env!("SLOW_TX_DURATION").and_then(|v| v.parse().ok()).unwrap_or(100); | ||
for t in transactions { | ||
let hash = t.hash(); | ||
let start = time::Instant::now(); | ||
block.push_transaction(t.clone(), None)?; | ||
let took = start.elapsed(); | ||
let took_ms = took.as_secs() * 1000 + took.subsec_nanos() as u64 / 1000000; | ||
if took > time::Duration::from_millis(slow_tx) { | ||
warn!("Heavy ({} ms) transaction in block {:?}: {:?}", took_ms, block.header().number(), hash); | ||
} | ||
debug!(target: "tx", "Transaction {:?} took: {} ms", hash, took_ms); | ||
} | ||
Ok(()) | ||
} | ||
|
||
// TODO [ToDr] Pass `PreverifiedBlock` by move, this will avoid unecessary allocation | ||
/// Enact the block given by `block_bytes` using `engine` on the database `db` with given `parent` block header | ||
pub fn enact_verified( | ||
block: &PreverifiedBlock, | ||
block: PreverifiedBlock, | ||
engine: &EthEngine, | ||
tracing: bool, | ||
db: StateDB, | ||
|
@@ -620,9 +616,9 @@ pub fn enact_verified( | |
let view = BlockView::new(&block.bytes); | ||
|
||
enact( | ||
&block.header, | ||
&block.transactions, | ||
&view.uncles(), | ||
block.header, | ||
block.transactions, | ||
view.uncles(), | ||
engine, | ||
tracing, | ||
db, | ||
|
@@ -690,7 +686,7 @@ mod tests { | |
)?; | ||
|
||
b.populate_from(&header); | ||
b.push_transactions(&transactions)?; | ||
b.push_transactions(transactions)?; | ||
|
||
for u in &block.uncles() { | ||
b.push_uncle(u.clone())?; | ||
|
@@ -783,3 +779,4 @@ mod tests { | |
assert!(orig_db.journal_db().keys().iter().filter(|k| orig_db.journal_db().get(k.0) != db.journal_db().get(k.0)).next() == None); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs to be reimplemented