diff --git a/bin/node/bench/src/construct.rs b/bin/node/bench/src/construct.rs index 1532e02bd3ef6..ca1a1c18f9ea9 100644 --- a/bin/node/bench/src/construct.rs +++ b/bin/node/bench/src/construct.rs @@ -30,8 +30,8 @@ use std::{borrow::Cow, collections::HashMap, pin::Pin, sync::Arc}; use node_primitives::Block; use node_testing::bench::{BenchDb, BlockType, DatabaseType, KeyTypes, Profile}; use sc_transaction_pool_api::{ - ImportNotificationStream, PoolFuture, PoolStatus, TransactionFor, TransactionSource, - TransactionStatusStreamFor, TxHash, + ImportNotificationStream, PoolFuture, PoolStatus, ReadyTransactions, TransactionFor, + TransactionSource, TransactionStatusStreamFor, TxHash, }; use sp_consensus::{Environment, Proposer}; use sp_inherents::InherentDataProvider; @@ -216,6 +216,19 @@ impl sc_transaction_pool_api::InPoolTransaction for PoolTransaction { #[derive(Clone, Debug)] pub struct Transactions(Vec>); +pub struct TransactionsIterator(std::vec::IntoIter>); + +impl Iterator for TransactionsIterator { + type Item = Arc; + + fn next(&mut self) -> Option { + self.0.next() + } +} + +impl ReadyTransactions for TransactionsIterator { + fn report_invalid(&mut self, _tx: &Self::Item) {} +} impl sc_transaction_pool_api::TransactionPool for Transactions { type Block = Block; @@ -257,16 +270,17 @@ impl sc_transaction_pool_api::TransactionPool for Transactions { _at: NumberFor, ) -> Pin< Box< - dyn Future> + Send>> - + Send, + dyn Future< + Output = Box> + Send>, + > + Send, >, > { - let iter: Box> + Send> = - Box::new(self.0.clone().into_iter()); + let iter: Box> + Send> = + Box::new(TransactionsIterator(self.0.clone().into_iter())); Box::pin(futures::future::ready(iter)) } - fn ready(&self) -> Box> + Send> { + fn ready(&self) -> Box> + Send> { unimplemented!() } diff --git a/client/basic-authorship/src/basic_authorship.rs b/client/basic-authorship/src/basic_authorship.rs index e38bb11688f8b..bbee60ae98dcf 100644 --- a/client/basic-authorship/src/basic_authorship.rs +++ b/client/basic-authorship/src/basic_authorship.rs @@ -344,7 +344,7 @@ where let mut t2 = futures_timer::Delay::new(deadline.saturating_duration_since((self.now)()) / 8).fuse(); - let pending_iterator = select! { + let mut pending_iterator = select! { res = t1 => res, _ = t2 => { log::warn!( @@ -363,7 +363,7 @@ where let mut transaction_pushed = false; let mut hit_block_size_limit = false; - for pending_tx in pending_iterator { + while let Some(pending_tx) = pending_iterator.next() { if (self.now)() > deadline { debug!( "Consensus deadline reached when pushing block transactions, \ @@ -378,6 +378,7 @@ where let block_size = block_builder.estimate_block_size(self.include_proof_in_block_size_estimation); if block_size + pending_tx_data.encoded_size() > block_size_limit { + pending_iterator.report_invalid(&pending_tx); if skipped < MAX_SKIPPED_TRANSACTIONS { skipped += 1; debug!( @@ -400,6 +401,7 @@ where debug!("[{:?}] Pushed to the block.", pending_tx_hash); }, Err(ApplyExtrinsicFailed(Validity(e))) if e.exhausted_resources() => { + pending_iterator.report_invalid(&pending_tx); if skipped < MAX_SKIPPED_TRANSACTIONS { skipped += 1; debug!( @@ -412,6 +414,7 @@ where } }, Err(e) if skipped > 0 => { + pending_iterator.report_invalid(&pending_tx); trace!( "[{:?}] Ignoring invalid transaction when skipping: {}", pending_tx_hash, @@ -419,6 +422,7 @@ where ); }, Err(e) => { + pending_iterator.report_invalid(&pending_tx); debug!("[{:?}] Invalid transaction: {}", pending_tx_hash, e); unqueue_invalid.push(pending_tx_hash); }, diff --git a/client/transaction-pool/api/src/lib.rs b/client/transaction-pool/api/src/lib.rs index a6252f1373c5d..cd8784bfc83e2 100644 --- a/client/transaction-pool/api/src/lib.rs +++ b/client/transaction-pool/api/src/lib.rs @@ -223,13 +223,14 @@ pub trait TransactionPool: Send + Sync { at: NumberFor, ) -> Pin< Box< - dyn Future> + Send>> - + Send, + dyn Future< + Output = Box> + Send>, + > + Send, >, >; /// Get an iterator for ready transactions ordered by priority. - fn ready(&self) -> Box> + Send>; + fn ready(&self) -> Box> + Send>; // *** Block production /// Remove transactions identified by given hashes (and dependent transactions) from the pool. @@ -254,6 +255,27 @@ pub trait TransactionPool: Send + Sync { fn ready_transaction(&self, hash: &TxHash) -> Option>; } +/// An iterator of ready transactions. +/// +/// The trait extends regular [`std::iter::Iterator`] trait and allows reporting +/// last-returned element as invalid. +/// +/// The implementation is then allowed, for performance reasons, to change the elements +/// returned next, by e.g. skipping elements that are known to depend on the reported +/// transaction, which yields them invalid as well. +pub trait ReadyTransactions: Iterator { + /// Report given transaction as invalid. + /// + /// This might affect subsequent elements returned by the iterator, so dependent transactions + /// are skipped for performance reasons. + fn report_invalid(&mut self, _tx: &Self::Item); +} + +/// A no-op implementation for an empty iterator. +impl ReadyTransactions for std::iter::Empty { + fn report_invalid(&mut self, _tx: &T) {} +} + /// Events that the transaction pool listens for. pub enum ChainEvent { /// New best block have been added to the chain diff --git a/client/transaction-pool/graph/Cargo.toml b/client/transaction-pool/graph/Cargo.toml deleted file mode 100644 index b49cadc51c33c..0000000000000 --- a/client/transaction-pool/graph/Cargo.toml +++ /dev/null @@ -1,39 +0,0 @@ -[package] -name = "sc-transaction-graph" -version = "4.0.0-dev" -authors = ["Parity Technologies "] -edition = "2018" -license = "GPL-3.0-or-later WITH Classpath-exception-2.0" -homepage = "https://substrate.dev" -repository = "https://github.com/paritytech/substrate/" -description = "Generic Transaction Pool" -readme = "README.md" - -[package.metadata.docs.rs] -targets = ["x86_64-unknown-linux-gnu"] - -[dependencies] -derive_more = "0.99.2" -thiserror = "1.0.21" -futures = "0.3.9" -log = "0.4.8" -parking_lot = "0.11.1" -serde = { version = "1.0.101", features = ["derive"] } -sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" } -sc-utils = { version = "4.0.0-dev", path = "../../utils" } -sp-core = { version = "4.0.0-dev", path = "../../../primitives/core" } -sp-runtime = { version = "4.0.0-dev", path = "../../../primitives/runtime" } -sp-transaction-pool = { version = "4.0.0-dev", path = "../../../primitives/transaction-pool" } -parity-util-mem = { version = "0.10.0", default-features = false, features = ["primitive-types"] } -linked-hash-map = "0.5.4" -retain_mut = "0.1.3" - -[dev-dependencies] -assert_matches = "1.3.0" -codec = { package = "parity-scale-codec", version = "2.0.0" } -substrate-test-runtime = { version = "2.0.0", path = "../../../test-utils/runtime" } -criterion = "0.3" - -[[bench]] -name = "basics" -harness = false diff --git a/client/transaction-pool/src/graph/base_pool.rs b/client/transaction-pool/src/graph/base_pool.rs index 890a87e82929d..2c8becdfb2f0b 100644 --- a/client/transaction-pool/src/graph/base_pool.rs +++ b/client/transaction-pool/src/graph/base_pool.rs @@ -36,7 +36,7 @@ use sp_runtime::{ use super::{ future::{FutureTransactions, WaitingTransaction}, - ready::ReadyTransactions, + ready::{BestIterator, ReadyTransactions}, }; /// Successful import result. @@ -355,7 +355,7 @@ impl BasePool impl Iterator>> { + pub fn ready(&self) -> BestIterator { self.ready.get() } diff --git a/client/transaction-pool/src/graph/ready.rs b/client/transaction-pool/src/graph/ready.rs index 03689aeb32e6d..99a034689ccd0 100644 --- a/client/transaction-pool/src/graph/ready.rs +++ b/client/transaction-pool/src/graph/ready.rs @@ -23,7 +23,7 @@ use std::{ sync::Arc, }; -use log::trace; +use log::{debug, trace}; use sc_transaction_pool_api::error; use serde::Serialize; use sp_runtime::{traits::Member, transaction_validity::TransactionTag as Tag}; @@ -156,11 +156,16 @@ impl ReadyTransactions { /// - transactions that are valid for a shorter time go first /// 4. Lastly we sort by the time in the queue /// - transactions that are longer in the queue go first - pub fn get(&self) -> impl Iterator>> { + /// + /// The iterator is providing a way to report transactions that the receiver considers invalid. + /// In such case the entire subgraph of transactions that depend on the reported one will be + /// skipped. + pub fn get(&self) -> BestIterator { BestIterator { all: self.ready.clone(), best: self.best.clone(), awaiting: Default::default(), + invalid: Default::default(), } } @@ -482,6 +487,7 @@ pub struct BestIterator { all: ReadOnlyTrackedMap>, awaiting: HashMap)>, best: BTreeSet>, + invalid: HashSet, } impl BestIterator { @@ -498,6 +504,34 @@ impl BestIterator { } } +impl sc_transaction_pool_api::ReadyTransactions + for BestIterator +{ + fn report_invalid(&mut self, tx: &Self::Item) { + BestIterator::report_invalid(self, tx) + } +} + +impl BestIterator { + /// Report given transaction as invalid. + /// + /// As a consequence, all values that depend on the invalid one will be skipped. + /// When given transaction is not in the pool it has no effect. + /// When invoked on a fully drained iterator it has no effect either. + pub fn report_invalid(&mut self, tx: &Arc>) { + if let Some(to_report) = self.all.read().get(&tx.hash) { + debug!( + target: "txpool", + "[{:?}] Reported as invalid. Will skip sub-chains while iterating.", + to_report.transaction.transaction.hash + ); + for hash in &to_report.unlocks { + self.invalid.insert(hash.clone()); + } + } + } +} + impl Iterator for BestIterator { type Item = Arc>; @@ -505,8 +539,19 @@ impl Iterator for BestIterator { loop { let best = self.best.iter().next_back()?.clone(); let best = self.best.take(&best)?; + let hash = &best.transaction.hash; + + // Check if the transaction was marked invalid. + if self.invalid.contains(hash) { + debug!( + target: "txpool", + "[{:?}] Skipping invalid child transaction while iterating.", + hash + ); + continue + } - let next = self.all.read().get(&best.transaction.hash).cloned(); + let next = self.all.read().get(hash).cloned(); let ready = match next { Some(ready) => ready, // The transaction is not in all, maybe it was removed in the meantime? @@ -635,10 +680,13 @@ mod tests { assert_eq!(ready.get().count(), 3); } - #[test] - fn should_return_best_transactions_in_correct_order() { - // given - let mut ready = ReadyTransactions::default(); + /// Populate the pool, with a graph that looks like so: + /// + /// tx1 -> tx2 \ + /// -> -> tx3 + /// -> tx4 -> tx5 -> tx6 + /// -> tx7 + fn populate_pool(ready: &mut ReadyTransactions>) { let mut tx1 = tx(1); tx1.requires.clear(); let mut tx2 = tx(2); @@ -649,11 +697,17 @@ mod tests { tx3.provides = vec![]; let mut tx4 = tx(4); tx4.requires = vec![tx1.provides[0].clone()]; - tx4.provides = vec![]; - let tx5 = Transaction { - data: vec![5], + tx4.provides = vec![vec![107]]; + let mut tx5 = tx(5); + tx5.requires = vec![tx4.provides[0].clone()]; + tx5.provides = vec![vec![108]]; + let mut tx6 = tx(6); + tx6.requires = vec![tx5.provides[0].clone()]; + tx6.provides = vec![]; + let tx7 = Transaction { + data: vec![7], bytes: 1, - hash: 5, + hash: 7, priority: 1, valid_till: u64::MAX, // use the max here for testing. requires: vec![tx1.provides[0].clone()], @@ -663,20 +717,30 @@ mod tests { }; // when - for tx in vec![tx1, tx2, tx3, tx4, tx5] { - import(&mut ready, tx).unwrap(); + for tx in vec![tx1, tx2, tx3, tx7, tx4, tx5, tx6] { + import(ready, tx).unwrap(); } - // then assert_eq!(ready.best.len(), 1); + } + + #[test] + fn should_return_best_transactions_in_correct_order() { + // given + let mut ready = ReadyTransactions::default(); + populate_pool(&mut ready); + // when let mut it = ready.get().map(|tx| tx.data[0]); + // then assert_eq!(it.next(), Some(1)); assert_eq!(it.next(), Some(2)); assert_eq!(it.next(), Some(3)); assert_eq!(it.next(), Some(4)); assert_eq!(it.next(), Some(5)); + assert_eq!(it.next(), Some(6)); + assert_eq!(it.next(), Some(7)); assert_eq!(it.next(), None); } @@ -725,4 +789,26 @@ mod tests { TransactionRef { transaction: Arc::new(with_priority(3, 3)), insertion_id: 2 } ); } + + #[test] + fn should_skip_invalid_transactions_while_iterating() { + // given + let mut ready = ReadyTransactions::default(); + populate_pool(&mut ready); + + // when + let mut it = ready.get(); + let data = |tx: &Arc>>| tx.data[0]; + + // then + assert_eq!(it.next().as_ref().map(data), Some(1)); + assert_eq!(it.next().as_ref().map(data), Some(2)); + assert_eq!(it.next().as_ref().map(data), Some(3)); + let tx4 = it.next(); + assert_eq!(tx4.as_ref().map(data), Some(4)); + // report 4 as invalid, which should skip 5 & 6. + it.report_invalid(&tx4.unwrap()); + assert_eq!(it.next().as_ref().map(data), Some(7)); + assert_eq!(it.next().as_ref().map(data), None); + } } diff --git a/client/transaction-pool/src/graph/validated_pool.rs b/client/transaction-pool/src/graph/validated_pool.rs index e4aad7f342b5b..dba586adc846c 100644 --- a/client/transaction-pool/src/graph/validated_pool.rs +++ b/client/transaction-pool/src/graph/validated_pool.rs @@ -25,7 +25,7 @@ use std::{ use futures::channel::mpsc::{channel, Sender}; use parking_lot::{Mutex, RwLock}; use retain_mut::RetainMut; -use sc_transaction_pool_api::{error, PoolStatus}; +use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions}; use serde::Serialize; use sp_runtime::{ generic::BlockId, @@ -630,7 +630,7 @@ impl ValidatedPool { } /// Get an iterator for ready transactions ordered by priority - pub fn ready(&self) -> impl Iterator> + Send { + pub fn ready(&self) -> impl ReadyTransactions> + Send { self.pool.read().ready() } diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index 6eb5bd2f332ec..4d355df22d821 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -56,7 +56,8 @@ use std::{ use graph::{ExtrinsicHash, IsValidator}; use sc_transaction_pool_api::{ ChainEvent, ImportNotificationStream, MaintainedTransactionPool, PoolFuture, PoolStatus, - TransactionFor, TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash, + ReadyTransactions, TransactionFor, TransactionPool, TransactionSource, + TransactionStatusStreamFor, TxHash, }; use sp_core::traits::SpawnEssentialNamed; use sp_runtime::{ @@ -69,7 +70,7 @@ use crate::metrics::MetricsLink as PrometheusMetrics; use prometheus_endpoint::Registry as PrometheusRegistry; type BoxedReadyIterator = - Box>> + Send>; + Box>> + Send>; type ReadyIteratorFor = BoxedReadyIterator, graph::ExtrinsicFor>;