From 9ce077465ac46a93ebaced4b47863952b3d63d33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Fri, 5 Jun 2020 23:12:00 +0200 Subject: [PATCH] Make transaction pool prune transactions only of canonical blocks (#6123) * Make tx pool aware of retracted fork blocks * Make it compile * Update client/transaction-pool/src/lib.rs Co-authored-by: Nikolay Volf * Fix doc test * Simplify the implementation * Send tree route as arc to prevent heavy clones * Switch to use `ExtrinsicHash` to make it more clear * Fix benchmark Co-authored-by: Nikolay Volf --- Cargo.lock | 1 + bin/node-template/node/src/service.rs | 30 +- bin/node/cli/src/service.rs | 28 +- client/api/src/backend.rs | 6 +- client/api/src/client.rs | 8 +- .../basic-authorship/src/basic_authorship.rs | 21 +- client/basic-authorship/src/lib.rs | 5 +- client/consensus/manual-seal/src/lib.rs | 6 +- .../manual-seal/src/seal_new_block.rs | 2 +- client/finality-grandpa/src/until_imported.rs | 2 +- client/offchain/src/lib.rs | 8 +- client/rpc/src/author/tests.rs | 8 +- client/service/src/builder.rs | 61 ++-- client/service/src/client/client.rs | 10 +- .../transaction-pool/graph/benches/basics.rs | 3 +- client/transaction-pool/graph/src/lib.rs | 6 +- client/transaction-pool/graph/src/listener.rs | 12 +- client/transaction-pool/graph/src/pool.rs | 119 ++++--- .../graph/src/validated_pool.rs | 53 +-- client/transaction-pool/src/api.rs | 66 ++-- client/transaction-pool/src/lib.rs | 131 ++++--- client/transaction-pool/src/revalidation.rs | 18 +- client/transaction-pool/src/testing/pool.rs | 330 ++++++++++++++++-- primitives/blockchain/Cargo.toml | 1 - primitives/blockchain/src/header_metadata.rs | 24 +- primitives/transaction-pool/Cargo.toml | 2 + primitives/transaction-pool/src/pool.rs | 6 +- test-utils/runtime/client/src/lib.rs | 9 +- test-utils/runtime/src/lib.rs | 10 +- .../runtime/transaction-pool/src/lib.rs | 157 ++++++--- utils/frame/rpc/system/src/lib.rs | 8 +- 31 files changed, 800 insertions(+), 351 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0ad5033f139df..358f22463a4b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7746,6 +7746,7 @@ dependencies = [ "parity-scale-codec", "serde", "sp-api", + "sp-blockchain", "sp-runtime", "sp-utils", ] diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index 8e57a041373a9..17606e29239bf 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -34,14 +34,22 @@ macro_rules! new_full_start { let inherent_data_providers = sp_inherents::InherentDataProviders::new(); let builder = sc_service::ServiceBuilder::new_full::< - node_template_runtime::opaque::Block, node_template_runtime::RuntimeApi, crate::service::Executor + node_template_runtime::opaque::Block, + node_template_runtime::RuntimeApi, + crate::service::Executor >($config)? .with_select_chain(|_config, backend| { Ok(sc_consensus::LongestChain::new(backend.clone())) })? - .with_transaction_pool(|config, client, _fetcher, prometheus_registry| { - let pool_api = sc_transaction_pool::FullChainApi::new(client.clone()); - Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry)) + .with_transaction_pool(|builder| { + let pool_api = sc_transaction_pool::FullChainApi::new( + builder.client().clone(), + ); + Ok(sc_transaction_pool::BasicPool::new( + builder.config().transaction_pool.clone(), + std::sync::Arc::new(pool_api), + builder.prometheus_registry(), + )) })? .with_import_queue(| _config, @@ -199,13 +207,19 @@ pub fn new_light(config: Configuration) -> Result { pub is_new_best: bool, /// Optional storage changes. pub storage_changes: Option<(StorageCollection, ChildStorageCollection)>, - /// Blocks that got retracted because of this one got imported. - pub retracted: Vec, + /// Tree route from old best to new best. + /// + /// If `None`, there was no re-org while importing. + pub tree_route: Option>, } /// Import operation wrapper diff --git a/client/api/src/client.rs b/client/api/src/client.rs index c855cd3a08328..aa6763653d488 100644 --- a/client/api/src/client.rs +++ b/client/api/src/client.rs @@ -16,7 +16,7 @@ //! A set of APIs supported by the client along with their primitives. -use std::{fmt, collections::HashSet}; +use std::{fmt, collections::HashSet, sync::Arc}; use sp_core::storage::StorageKey; use sp_runtime::{ traits::{Block as BlockT, NumberFor}, @@ -234,8 +234,10 @@ pub struct BlockImportNotification { pub header: Block::Header, /// Is this the new best block. pub is_new_best: bool, - /// List of retracted blocks ordered by block number. - pub retracted: Vec, + /// Tree route from old best to new best. + /// + /// If `None`, there was no re-org while importing. + pub tree_route: Option>>, } /// Summary of a finalized block. diff --git a/client/basic-authorship/src/basic_authorship.rs b/client/basic-authorship/src/basic_authorship.rs index cd241f38849a1..39ebbc89beda9 100644 --- a/client/basic-authorship/src/basic_authorship.rs +++ b/client/basic-authorship/src/basic_authorship.rs @@ -331,15 +331,14 @@ mod tests { use parking_lot::Mutex; use sp_consensus::{BlockOrigin, Proposer}; use substrate_test_runtime_client::{ - prelude::*, - runtime::{Extrinsic, Transfer}, + prelude::*, TestClientBuilder, runtime::{Extrinsic, Transfer}, TestClientBuilderExt, }; use sp_transaction_pool::{ChainEvent, MaintainedTransactionPool, TransactionSource}; use sc_transaction_pool::{BasicPool, FullChainApi}; use sp_api::Core; - use backend::Backend; use sp_blockchain::HeaderBackend; use sp_runtime::traits::NumberFor; + use sc_client_api::Backend; const SOURCE: TransactionSource = TransactionSource::External; @@ -357,7 +356,7 @@ mod tests { { ChainEvent::NewBlock { id: BlockId::Number(block_number.into()), - retracted: vec![], + tree_route: None, is_new_best: true, header, } @@ -452,8 +451,7 @@ mod tests { #[test] fn proposed_storage_changes_should_match_execute_block_storage_changes() { - let (client, backend) = substrate_test_runtime_client::TestClientBuilder::new() - .build_with_backend(); + let (client, backend) = TestClientBuilder::new().build_with_backend(); let client = Arc::new(client); let txpool = Arc::new( BasicPool::new( @@ -473,7 +471,9 @@ mod tests { futures::executor::block_on( txpool.maintain(chain_event( 0, - client.header(&BlockId::Number(0u64)).expect("header get error").expect("there should be header") + client.header(&BlockId::Number(0u64)) + .expect("header get error") + .expect("there should be header"), )) ); @@ -500,8 +500,11 @@ mod tests { backend.changes_trie_storage(), ).unwrap(); - let storage_changes = api.into_storage_changes(&state, changes_trie_state.as_ref(), genesis_hash) - .unwrap(); + let storage_changes = api.into_storage_changes( + &state, + changes_trie_state.as_ref(), + genesis_hash, + ).unwrap(); assert_eq!( proposal.storage_changes.transaction_storage_root, diff --git a/client/basic-authorship/src/lib.rs b/client/basic-authorship/src/lib.rs index 63020c0e68af7..4f53c87de3979 100644 --- a/client/basic-authorship/src/lib.rs +++ b/client/basic-authorship/src/lib.rs @@ -25,7 +25,10 @@ //! # use sp_consensus::{Environment, Proposer, RecordProof}; //! # use sp_runtime::generic::BlockId; //! # use std::{sync::Arc, time::Duration}; -//! # use substrate_test_runtime_client::{self, runtime::{Extrinsic, Transfer}, AccountKeyring}; +//! # use substrate_test_runtime_client::{ +//! # runtime::{Extrinsic, Transfer}, AccountKeyring, +//! # DefaultTestClientBuilderExt, TestClientBuilderExt, +//! # }; //! # use sc_transaction_pool::{BasicPool, FullChainApi}; //! # let client = Arc::new(substrate_test_runtime_client::new()); //! # let txpool = Arc::new(BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone())), None).0); diff --git a/client/consensus/manual-seal/src/lib.rs b/client/consensus/manual-seal/src/lib.rs index 26f493d5d220c..a5366148a7bbd 100644 --- a/client/consensus/manual-seal/src/lib.rs +++ b/client/consensus/manual-seal/src/lib.rs @@ -98,7 +98,7 @@ pub async fn run_manual_seal( inherent_data_providers: InherentDataProviders, ) where - A: txpool::ChainApi::Hash> + 'static, + A: txpool::ChainApi + 'static, B: BlockT + 'static, C: HeaderBackend + Finalizer + 'static, CB: ClientBackend + 'static, @@ -158,7 +158,7 @@ pub async fn run_instant_seal( inherent_data_providers: InherentDataProviders, ) where - A: txpool::ChainApi::Hash> + 'static, + A: txpool::ChainApi + 'static, B: BlockT + 'static, C: HeaderBackend + Finalizer + 'static, CB: ClientBackend + 'static, @@ -417,7 +417,7 @@ mod tests { id: BlockId::Number(1), header: client.header(&BlockId::Number(1)).expect("db error").expect("imported above"), is_new_best: true, - retracted: vec![], + tree_route: None, }).await; let (tx1, rx1) = futures::channel::oneshot::channel(); diff --git a/client/consensus/manual-seal/src/seal_new_block.rs b/client/consensus/manual-seal/src/seal_new_block.rs index a608c978e6edb..c5aea11ced316 100644 --- a/client/consensus/manual-seal/src/seal_new_block.rs +++ b/client/consensus/manual-seal/src/seal_new_block.rs @@ -87,7 +87,7 @@ pub async fn seal_new_block( E: Environment, >::Error: std::fmt::Display, >::Error: std::fmt::Display, - P: txpool::ChainApi::Hash>, + P: txpool::ChainApi, SC: SelectChain, { let future = async { diff --git a/client/finality-grandpa/src/until_imported.rs b/client/finality-grandpa/src/until_imported.rs index 6a39c2637eb7b..3ac94f3b062f0 100644 --- a/client/finality-grandpa/src/until_imported.rs +++ b/client/finality-grandpa/src/until_imported.rs @@ -585,7 +585,7 @@ mod tests { origin: BlockOrigin::File, header, is_new_best: false, - retracted: vec![], + tree_route: None, }).unwrap(); } } diff --git a/client/offchain/src/lib.rs b/client/offchain/src/lib.rs index 332e9f779a8e2..d6e62501b6bd8 100644 --- a/client/offchain/src/lib.rs +++ b/client/offchain/src/lib.rs @@ -166,7 +166,7 @@ mod tests { use super::*; use std::sync::Arc; use sc_network::{Multiaddr, PeerId}; - use substrate_test_runtime_client::runtime::Block; + use substrate_test_runtime_client::{TestClient, runtime::Block}; use sc_transaction_pool::{BasicPool, FullChainApi}; use sp_transaction_pool::{TransactionPool, InPoolTransaction}; use sc_client_api::ExecutorProvider; @@ -183,7 +183,9 @@ mod tests { } } - struct TestPool(BasicPool, Block>); + struct TestPool( + BasicPool, Block> + ); impl sp_transaction_pool::OffchainSubmitTransaction for TestPool { fn submit_at( @@ -200,8 +202,8 @@ mod tests { #[test] fn should_call_into_runtime_and_produce_extrinsic() { - // given let _ = env_logger::try_init(); + let client = Arc::new(substrate_test_runtime_client::new()); let pool = Arc::new(TestPool(BasicPool::new( Default::default(), diff --git a/client/rpc/src/author/tests.rs b/client/rpc/src/author/tests.rs index 8c1b82028bd79..d70a2ce2aff99 100644 --- a/client/rpc/src/author/tests.rs +++ b/client/rpc/src/author/tests.rs @@ -58,11 +58,9 @@ struct TestSetup { impl Default for TestSetup { fn default() -> Self { let keystore = KeyStore::new(); - let client = Arc::new( - substrate_test_runtime_client::TestClientBuilder::new() - .set_keystore(keystore.clone()) - .build() - ); + let client_builder = substrate_test_runtime_client::TestClientBuilder::new(); + let client = Arc::new(client_builder.set_keystore(keystore.clone()).build()); + let pool = Arc::new(BasicPool::new( Default::default(), Arc::new(FullChainApi::new(client.clone())), diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index baf1c2e0cceff..c2af1a129bd5f 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -16,16 +16,17 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::{Service, NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID, MallocSizeOfWasm}; -use crate::{start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager, SpawnTaskHandle}; -use crate::status_sinks; -use crate::config::{Configuration, KeystoreConfig, PrometheusConfig, OffchainWorkerConfig}; -use crate::metrics::MetricsService; +use crate::{ + Service, NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID, MallocSizeOfWasm, + start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager, SpawnTaskHandle, + status_sinks, metrics::MetricsService, client::{Client, ClientConfig}, + config::{Configuration, KeystoreConfig, PrometheusConfig, OffchainWorkerConfig}, +}; use sc_client_api::{ - self, BlockchainEvents, backend::RemoteBackend, light::RemoteBlockchain, execution_extensions::ExtensionsFactory, - ExecutorProvider, CallExecutor, ForkBlocks, BadBlocks, CloneableSpawn, UsageProvider, + BlockchainEvents, backend::RemoteBackend, light::RemoteBlockchain, + execution_extensions::ExtensionsFactory, ExecutorProvider, CallExecutor, ForkBlocks, BadBlocks, + CloneableSpawn, UsageProvider, }; -use crate::client::{Client, ClientConfig}; use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender}; use sc_chain_spec::get_extension; use sp_consensus::{ @@ -55,7 +56,6 @@ use std::{ use wasm_timer::SystemTime; use sc_telemetry::{telemetry, SUBSTRATE_INFO}; use sp_transaction_pool::{MaintainedTransactionPool, ChainEvent}; -use sp_blockchain; use prometheus_endpoint::Registry; use sc_client_db::{Backend, DatabaseSettings}; use sp_core::traits::CodeExecutor; @@ -452,8 +452,29 @@ impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> { } impl - ServiceBuilder { + ServiceBuilder< + TBl, + TRtApi, + TCl, + TFchr, + TSc, + TImpQu, + TFprb, + TFpp, + TExPool, + TRpc, + Backend + > +{ + /// Returns a reference to the configuration that was stored in this builder. + pub fn config(&self) -> &Configuration { + &self.config + } + + /// Returns a reference to the optional prometheus registry that was stored in this builder. + pub fn prometheus_registry(&self) -> Option<&Registry> { + self.config.prometheus_config.as_ref().map(|config| &config.registry) + } /// Returns a reference to the client that was stored in this builder. pub fn client(&self) -> &Arc { @@ -698,20 +719,12 @@ impl pub fn with_transaction_pool( self, transaction_pool_builder: impl FnOnce( - sc_transaction_pool::txpool::Options, - Arc, - Option, - Option<&Registry>, - ) -> Result<(UExPool, Option), Error> + &Self, + ) -> Result<(UExPool, Option), Error>, ) -> Result, Error> where TSc: Clone, TFchr: Clone { - let (transaction_pool, background_task) = transaction_pool_builder( - self.config.transaction_pool.clone(), - self.client.clone(), - self.fetcher.clone(), - self.config.prometheus_config.as_ref().map(|config| &config.registry), - )?; + let (transaction_pool, background_task) = transaction_pool_builder(&self)?; if let Some(background_task) = background_task{ self.task_manager.spawn_handle().spawn("txpool-background", background_task); @@ -1032,7 +1045,7 @@ ServiceBuilder< let mut import_stream = client.import_notification_stream().map(|n| ChainEvent::NewBlock { id: BlockId::Hash(n.hash), header: n.header, - retracted: n.retracted, + tree_route: n.tree_route, is_new_best: n.is_new_best, }).fuse(); let mut finality_stream = client.finality_notification_stream() @@ -1349,7 +1362,7 @@ ServiceBuilder< _telemetry_on_connect_sinks: telemetry_connection_sinks.clone(), keystore, marker: PhantomData::, - prometheus_registry: config.prometheus_config.map(|config| config.registry) + prometheus_registry: config.prometheus_config.map(|config| config.registry), }) } } diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index 77b3f065f43dd..fcbaab885134c 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -787,15 +787,15 @@ impl Client where NewBlockState::Normal }; - let retracted = if is_new_best { + let tree_route = if is_new_best { let route_from_best = sp_blockchain::tree_route( self.backend.blockchain(), info.best_hash, parent_hash, )?; - route_from_best.retracted().iter().rev().map(|e| e.hash.clone()).collect() + Some(route_from_best) } else { - Vec::default() + None }; trace!( @@ -826,7 +826,7 @@ impl Client where header: import_headers.into_post(), is_new_best, storage_changes, - retracted, + tree_route, }) } @@ -1048,7 +1048,7 @@ impl Client where origin: notify_import.origin, header: notify_import.header, is_new_best: notify_import.is_new_best, - retracted: notify_import.retracted, + tree_route: notify_import.tree_route.map(Arc::new), }; self.import_notification_sinks.lock() diff --git a/client/transaction-pool/graph/benches/basics.rs b/client/transaction-pool/graph/benches/basics.rs index ee92b60d548e6..bb10086bd4a55 100644 --- a/client/transaction-pool/graph/benches/basics.rs +++ b/client/transaction-pool/graph/benches/basics.rs @@ -51,7 +51,6 @@ fn to_tag(nonce: u64, from: AccountId) -> Tag { impl ChainApi for TestApi { type Block = Block; - type Hash = H256; type Error = sp_transaction_pool::error::Error; type ValidationFuture = Ready>; type BodyFuture = Ready>>>; @@ -107,7 +106,7 @@ impl ChainApi for TestApi { }) } - fn hash_and_length(&self, uxt: &ExtrinsicFor) -> (Self::Hash, usize) { + fn hash_and_length(&self, uxt: &ExtrinsicFor) -> (H256, usize) { let encoded = uxt.encode(); (blake2_256(&encoded).into(), encoded.len()) } diff --git a/client/transaction-pool/graph/src/lib.rs b/client/transaction-pool/graph/src/lib.rs index 04e5d0d3fbe9f..b4646c6055bf6 100644 --- a/client/transaction-pool/graph/src/lib.rs +++ b/client/transaction-pool/graph/src/lib.rs @@ -38,8 +38,6 @@ pub mod watcher; pub use self::base_pool::Transaction; pub use self::pool::{ - Pool, - Options, ChainApi, EventStream, ExtrinsicFor, - BlockHash, ExHash, NumberFor, TransactionFor, - ValidatedTransaction, + Pool, Options, ChainApi, EventStream, ExtrinsicFor, ExtrinsicHash, + BlockHash, NumberFor, TransactionFor, ValidatedTransaction, }; diff --git a/client/transaction-pool/graph/src/listener.rs b/client/transaction-pool/graph/src/listener.rs index 2923f2b34a8d0..1bc3720fa6b85 100644 --- a/client/transaction-pool/graph/src/listener.rs +++ b/client/transaction-pool/graph/src/listener.rs @@ -22,14 +22,14 @@ use std::{ }; use linked_hash_map::LinkedHashMap; use serde::Serialize; -use crate::{watcher, ChainApi, BlockHash}; +use crate::{watcher, ChainApi, ExtrinsicHash, BlockHash}; use log::{debug, trace, warn}; use sp_runtime::traits; /// Extrinsic pool default listener. -pub struct Listener { - watchers: HashMap>>, - finality_watchers: LinkedHashMap, Vec>, +pub struct Listener { + watchers: HashMap>>, + finality_watchers: LinkedHashMap, Vec>, } /// Maximum number of blocks awaiting finality at any time. @@ -45,7 +45,7 @@ impl Default for Listener { } impl Listener { - fn fire(&mut self, hash: &H, fun: F) where F: FnOnce(&mut watcher::Sender>) { + fn fire(&mut self, hash: &H, fun: F) where F: FnOnce(&mut watcher::Sender>) { let clean = if let Some(h) = self.watchers.get_mut(hash) { fun(h); h.is_done() @@ -61,7 +61,7 @@ impl Listener { /// Creates a new watcher for given verified extrinsic. /// /// The watcher can be used to subscribe to life-cycle events of that extrinsic. - pub fn create_watcher(&mut self, hash: H) -> watcher::Watcher> { + pub fn create_watcher(&mut self, hash: H) -> watcher::Watcher> { let sender = self.watchers.entry(hash.clone()).or_insert_with(watcher::Sender::default); sender.new_watcher(hash) } diff --git a/client/transaction-pool/graph/src/pool.rs b/client/transaction-pool/graph/src/pool.rs index 4f41e9110915e..e4d81c38ae38e 100644 --- a/client/transaction-pool/graph/src/pool.rs +++ b/client/transaction-pool/graph/src/pool.rs @@ -17,19 +17,16 @@ // along with this program. If not, see . use std::{ - hash, collections::HashMap, sync::Arc, }; -use crate::base_pool as base; -use crate::watcher::Watcher; -use serde::Serialize; +use crate::{base_pool as base, watcher::Watcher}; use futures::{Future, FutureExt}; use sp_runtime::{ generic::BlockId, - traits::{self, SaturatedConversion}, + traits::{self, SaturatedConversion, Block as BlockT}, transaction_validity::{ TransactionValidity, TransactionTag as Tag, TransactionValidityError, TransactionSource, }, @@ -44,19 +41,19 @@ pub use crate::validated_pool::ValidatedTransaction; /// Modification notification event stream type; pub type EventStream = TracingUnboundedReceiver; -/// Extrinsic hash type for a pool. -pub type ExHash = ::Hash; /// Block hash type for a pool. pub type BlockHash = <::Block as traits::Block>::Hash; +/// Extrinsic hash type for a pool. +pub type ExtrinsicHash = <::Block as traits::Block>::Hash; /// Extrinsic type for a pool. pub type ExtrinsicFor = <::Block as traits::Block>::Extrinsic; /// Block number type for the ChainApi pub type NumberFor = traits::NumberFor<::Block>; /// A type of transaction stored in the pool -pub type TransactionFor = Arc, ExtrinsicFor>>; +pub type TransactionFor = Arc, ExtrinsicFor>>; /// A type of validated transaction stored in the pool. pub type ValidatedTransactionFor = ValidatedTransaction< - ExHash, + ExtrinsicHash, ExtrinsicFor, ::Error, >; @@ -64,15 +61,15 @@ pub type ValidatedTransactionFor = ValidatedTransaction< /// Concrete extrinsic validation and query logic. pub trait ChainApi: Send + Sync { /// Block type. - type Block: traits::Block; - /// Transaction Hash type - type Hash: hash::Hash + Eq + traits::Member + Serialize; + type Block: BlockT; /// Error type. type Error: From + error::IntoPoolError; /// Validate transaction future. type ValidationFuture: Future> + Send + Unpin; /// Body future (since block body might be remote) - type BodyFuture: Future::Extrinsic>>, Self::Error>> + Unpin + Send + 'static; + type BodyFuture: Future< + Output = Result::Extrinsic>>, Self::Error> + > + Unpin + Send + 'static; /// Verify extrinsic at given block. fn validate_transaction( @@ -83,13 +80,19 @@ pub trait ChainApi: Send + Sync { ) -> Self::ValidationFuture; /// Returns a block number given the block id. - fn block_id_to_number(&self, at: &BlockId) -> Result>, Self::Error>; + fn block_id_to_number( + &self, + at: &BlockId, + ) -> Result>, Self::Error>; /// Returns a block hash given the block id. - fn block_id_to_hash(&self, at: &BlockId) -> Result>, Self::Error>; + fn block_id_to_hash( + &self, + at: &BlockId, + ) -> Result::Hash>, Self::Error>; /// Returns hash and encoding length of the extrinsic. - fn hash_and_length(&self, uxt: &ExtrinsicFor) -> (Self::Hash, usize); + fn hash_and_length(&self, uxt: &ExtrinsicFor) -> (ExtrinsicHash, usize); /// Returns a block body given the block id. fn block_body(&self, at: &BlockId) -> Self::BodyFuture; @@ -130,7 +133,6 @@ pub struct Pool { #[cfg(not(target_os = "unknown"))] impl parity_util_mem::MallocSizeOf for Pool where - B::Hash: parity_util_mem::MallocSizeOf, ExtrinsicFor: parity_util_mem::MallocSizeOf, { fn size_of(&self, ops: &mut parity_util_mem::MallocSizeOfOps) -> usize { @@ -153,7 +155,7 @@ impl Pool { source: TransactionSource, xts: T, force: bool, - ) -> Result, B::Error>>, B::Error> where + ) -> Result, B::Error>>, B::Error> where T: IntoIterator>, { let validated_pool = self.validated_pool.clone(); @@ -172,7 +174,7 @@ impl Pool { at: &BlockId, source: TransactionSource, xt: ExtrinsicFor, - ) -> Result, B::Error> { + ) -> Result, B::Error> { self.submit_at(at, source, std::iter::once(xt), false) .map(|import_result| import_result.and_then(|mut import_result| import_result .pop() @@ -187,7 +189,7 @@ impl Pool { at: &BlockId, source: TransactionSource, xt: ExtrinsicFor, - ) -> Result, BlockHash>, B::Error> { + ) -> Result, ExtrinsicHash>, B::Error> { let block_number = self.resolve_block_number(at)?; let (_, tx) = self.verify_one( at, block_number, source, xt, false @@ -198,7 +200,7 @@ impl Pool { /// Resubmit some transaction that were validated elsewhere. pub fn resubmit( &self, - revalidated_transactions: HashMap, ValidatedTransactionFor>, + revalidated_transactions: HashMap, ValidatedTransactionFor>, ) { let now = Instant::now(); @@ -215,7 +217,11 @@ impl Pool { /// Used to clear the pool from transactions that were part of recently imported block. /// The main difference from the `prune` is that we do not revalidate any transactions /// and ignore unknown passed hashes. - pub fn prune_known(&self, at: &BlockId, hashes: &[ExHash]) -> Result<(), B::Error> { + pub fn prune_known( + &self, + at: &BlockId, + hashes: &[ExtrinsicHash], + ) -> Result<(), B::Error> { // Get details of all extrinsics that are already in the pool let in_pool_tags = self.validated_pool.extrinsics_tags(hashes) .into_iter().filter_map(|x| x).flat_map(|x| x); @@ -299,7 +305,7 @@ impl Pool { &self, at: &BlockId, tags: impl IntoIterator, - known_imported_hashes: impl IntoIterator> + Clone, + known_imported_hashes: impl IntoIterator> + Clone, ) -> Result<(), B::Error> { log::debug!(target: "txpool", "Pruning at {:?}", at); // Prune all transactions that provide given tags @@ -336,7 +342,7 @@ impl Pool { } /// Returns transaction hash - pub fn hash_of(&self, xt: &ExtrinsicFor) -> ExHash { + pub fn hash_of(&self, xt: &ExtrinsicFor) -> ExtrinsicHash { self.validated_pool.api().hash_and_length(xt).0 } @@ -353,7 +359,7 @@ impl Pool { at: &BlockId, xts: impl IntoIterator)>, force: bool, - ) -> Result, ValidatedTransactionFor>, B::Error> { + ) -> Result, ValidatedTransactionFor>, B::Error> { // we need a block number to compute tx validity let block_number = self.resolve_block_number(at)?; let mut result = HashMap::new(); @@ -379,7 +385,7 @@ impl Pool { source: TransactionSource, xt: ExtrinsicFor, force: bool, - ) -> (ExHash, ValidatedTransactionFor) { + ) -> (ExtrinsicHash, ValidatedTransactionFor) { let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt); if !force && self.validated_pool.is_banned(&hash) { return ( @@ -444,9 +450,12 @@ mod tests { use futures::executor::block_on; use super::*; use sp_transaction_pool::TransactionStatus; - use sp_runtime::transaction_validity::{ValidTransaction, InvalidTransaction, TransactionSource}; + use sp_runtime::{ + traits::Hash, + transaction_validity::{ValidTransaction, InvalidTransaction, TransactionSource}, + }; use codec::Encode; - use substrate_test_runtime::{Block, Extrinsic, Transfer, H256, AccountId}; + use substrate_test_runtime::{Block, Extrinsic, Transfer, H256, AccountId, Hashing}; use assert_matches::assert_matches; use wasm_timer::Instant; use crate::base_pool::Limit; @@ -457,14 +466,13 @@ mod tests { #[derive(Clone, Debug, Default)] struct TestApi { delay: Arc>>>, - invalidate: Arc>>, - clear_requirements: Arc>>, - add_requirements: Arc>>, + invalidate: Arc>>, + clear_requirements: Arc>>, + add_requirements: Arc>>, } impl ChainApi for TestApi { type Block = Block; - type Hash = u64; type Error = error::Error; type ValidationFuture = futures::future::Ready>; type BodyFuture = futures::future::Ready>>>; @@ -518,7 +526,10 @@ mod tests { } /// Returns a block number given the block id. - fn block_id_to_number(&self, at: &BlockId) -> Result>, Self::Error> { + fn block_id_to_number( + &self, + at: &BlockId, + ) -> Result>, Self::Error> { Ok(match at { BlockId::Number(num) => Some(*num), BlockId::Hash(_) => None, @@ -526,7 +537,10 @@ mod tests { } /// Returns a block hash given the block id. - fn block_id_to_hash(&self, at: &BlockId) -> Result>, Self::Error> { + fn block_id_to_hash( + &self, + at: &BlockId, + ) -> Result::Hash>, Self::Error> { Ok(match at { BlockId::Number(num) => Some(H256::from_low_u64_be(*num)).into(), BlockId::Hash(_) => None, @@ -534,12 +548,10 @@ mod tests { } /// Hash the extrinsic. - fn hash_and_length(&self, uxt: &ExtrinsicFor) -> (Self::Hash, usize) { - let len = uxt.encode().len(); - ( - (H256::from(uxt.transfer().from.clone()).to_low_u64_be() << 5) + uxt.transfer().nonce, - len - ) + fn hash_and_length(&self, uxt: &ExtrinsicFor) -> (BlockHash, usize) { + let encoded = uxt.encode(); + let len = encoded.len(); + (Hashing::hash(&encoded), len) } fn block_body(&self, _id: &BlockId) -> Self::BodyFuture { @@ -599,19 +611,19 @@ mod tests { #[test] fn should_notify_about_pool_events() { - let stream = { + let (stream, hash0, hash1) = { // given let pool = pool(); let stream = pool.validated_pool().import_notification_stream(); // when - let _hash = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer { + let hash0 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, nonce: 0, }))).unwrap(); - let _hash = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer { + let hash1 = block_on(pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Transfer { from: AccountId::from_h256(H256::from_low_u64_be(1)), to: AccountId::from_h256(H256::from_low_u64_be(2)), amount: 5, @@ -627,13 +639,14 @@ mod tests { assert_eq!(pool.validated_pool().status().ready, 2); assert_eq!(pool.validated_pool().status().future, 1); - stream + + (stream, hash0, hash1) }; // then let mut it = futures::executor::block_on_stream(stream); - assert_eq!(it.next(), Some(32)); - assert_eq!(it.next(), Some(33)); + assert_eq!(it.next(), Some(hash0)); + assert_eq!(it.next(), Some(hash1)); assert_eq!(it.next(), None); } @@ -795,7 +808,10 @@ mod tests { // then let mut stream = futures::executor::block_on_stream(watcher.into_stream()); assert_eq!(stream.next(), Some(TransactionStatus::Ready)); - assert_eq!(stream.next(), Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into()))); + assert_eq!( + stream.next(), + Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into())), + ); } #[test] @@ -812,14 +828,19 @@ mod tests { assert_eq!(pool.validated_pool().status().future, 0); // when - block_on(pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![2u64])).unwrap(); + block_on( + pool.prune_tags(&BlockId::Number(2), vec![vec![0u8]], vec![watcher.hash().clone()]), + ).unwrap(); assert_eq!(pool.validated_pool().status().ready, 0); assert_eq!(pool.validated_pool().status().future, 0); // then let mut stream = futures::executor::block_on_stream(watcher.into_stream()); assert_eq!(stream.next(), Some(TransactionStatus::Ready)); - assert_eq!(stream.next(), Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into()))); + assert_eq!( + stream.next(), + Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into())), + ); } #[test] diff --git a/client/transaction-pool/graph/src/validated_pool.rs b/client/transaction-pool/graph/src/validated_pool.rs index 9ab45e3b263c0..d730b892e3502 100644 --- a/client/transaction-pool/graph/src/validated_pool.rs +++ b/client/transaction-pool/graph/src/validated_pool.rs @@ -22,7 +22,7 @@ use std::{ sync::Arc, }; -use crate::{base_pool as base, BlockHash}; +use crate::base_pool as base; use crate::listener::Listener; use crate::rotator::PoolRotator; use crate::watcher::Watcher; @@ -39,7 +39,9 @@ use wasm_timer::Instant; use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender}; use crate::base_pool::PruneStatus; -use crate::pool::{EventStream, Options, ChainApi, ExHash, ExtrinsicFor, TransactionFor}; +use crate::pool::{ + EventStream, Options, ChainApi, BlockHash, ExtrinsicHash, ExtrinsicFor, TransactionFor, +}; /// Pre-validated transaction. Validated pool only accepts transactions wrapped in this enum. #[derive(Debug)] @@ -82,7 +84,7 @@ impl ValidatedTransaction { /// A type of validated transaction stored in the pool. pub type ValidatedTransactionFor = ValidatedTransaction< - ExHash, + ExtrinsicHash, ExtrinsicFor, ::Error, >; @@ -91,19 +93,18 @@ pub type ValidatedTransactionFor = ValidatedTransaction< pub struct ValidatedPool { api: Arc, options: Options, - listener: RwLock, B>>, + listener: RwLock, B>>, pool: RwLock, + ExtrinsicHash, ExtrinsicFor, >>, - import_notification_sinks: Mutex>>>, - rotator: PoolRotator>, + import_notification_sinks: Mutex>>>, + rotator: PoolRotator>, } #[cfg(not(target_os = "unknown"))] impl parity_util_mem::MallocSizeOf for ValidatedPool where - B::Hash: parity_util_mem::MallocSizeOf, ExtrinsicFor: parity_util_mem::MallocSizeOf, { fn size_of(&self, ops: &mut parity_util_mem::MallocSizeOfOps) -> usize { @@ -127,17 +128,17 @@ impl ValidatedPool { } /// Bans given set of hashes. - pub fn ban(&self, now: &Instant, hashes: impl IntoIterator>) { + pub fn ban(&self, now: &Instant, hashes: impl IntoIterator>) { self.rotator.ban(now, hashes) } /// Returns true if transaction with given hash is currently banned from the pool. - pub fn is_banned(&self, hash: &ExHash) -> bool { + pub fn is_banned(&self, hash: &ExtrinsicHash) -> bool { self.rotator.is_banned(hash) } /// Imports a bunch of pre-validated transactions to the pool. - pub fn submit(&self, txs: T) -> Vec, B::Error>> where + pub fn submit(&self, txs: T) -> Vec, B::Error>> where T: IntoIterator> { let results = txs.into_iter() @@ -158,7 +159,7 @@ impl ValidatedPool { } /// Submit single pre-validated transaction to the pool. - fn submit_one(&self, tx: ValidatedTransactionFor) -> Result, B::Error> { + fn submit_one(&self, tx: ValidatedTransactionFor) -> Result, B::Error> { match tx { ValidatedTransaction::Valid(tx) => { let imported = self.pool.write().import(tx)?; @@ -183,7 +184,7 @@ impl ValidatedPool { } } - fn enforce_limits(&self) -> HashSet> { + fn enforce_limits(&self) -> HashSet> { let status = self.pool.read().status(); let ready_limit = &self.options.ready; let future_limit = &self.options.future; @@ -228,7 +229,7 @@ impl ValidatedPool { pub fn submit_and_watch( &self, tx: ValidatedTransactionFor, - ) -> Result, BlockHash>, B::Error> { + ) -> Result, ExtrinsicHash>, B::Error> { match tx { ValidatedTransaction::Valid(tx) => { let hash = self.api.hash_and_length(&tx.data).0; @@ -250,7 +251,7 @@ impl ValidatedPool { /// /// Removes and then submits passed transactions and all dependent transactions. /// Transactions that are missing from the pool are not submitted. - pub fn resubmit(&self, mut updated_transactions: HashMap, ValidatedTransactionFor>) { + pub fn resubmit(&self, mut updated_transactions: HashMap, ValidatedTransactionFor>) { #[derive(Debug, Clone, Copy, PartialEq)] enum Status { Future, Ready, Failed, Dropped }; @@ -369,7 +370,7 @@ impl ValidatedPool { } /// For each extrinsic, returns tags that it provides (if known), or None (if it is unknown). - pub fn extrinsics_tags(&self, hashes: &[ExHash]) -> Vec>> { + pub fn extrinsics_tags(&self, hashes: &[ExtrinsicHash]) -> Vec>> { self.pool.read().by_hashes(&hashes) .into_iter() .map(|existing_in_pool| existing_in_pool @@ -378,7 +379,7 @@ impl ValidatedPool { } /// Get ready transaction by hash - pub fn ready_by_hash(&self, hash: &ExHash) -> Option> { + pub fn ready_by_hash(&self, hash: &ExtrinsicHash) -> Option> { self.pool.read().ready_by_hash(hash) } @@ -386,7 +387,7 @@ impl ValidatedPool { pub fn prune_tags( &self, tags: impl IntoIterator, - ) -> Result, ExtrinsicFor>, B::Error> { + ) -> Result, ExtrinsicFor>, B::Error> { // Perform tag-based pruning in the base pool let status = self.pool.write().prune_tags(tags); // Notify event listeners of all transactions @@ -408,8 +409,8 @@ impl ValidatedPool { pub fn resubmit_pruned( &self, at: &BlockId, - known_imported_hashes: impl IntoIterator> + Clone, - pruned_hashes: Vec>, + known_imported_hashes: impl IntoIterator> + Clone, + pruned_hashes: Vec>, pruned_xts: Vec>, ) -> Result<(), B::Error> { debug_assert_eq!(pruned_hashes.len(), pruned_xts.len()); @@ -440,7 +441,7 @@ impl ValidatedPool { pub fn fire_pruned( &self, at: &BlockId, - hashes: impl Iterator>, + hashes: impl Iterator>, ) -> Result<(), B::Error> { let header_hash = self.api.block_id_to_hash(at)? .ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?; @@ -473,7 +474,7 @@ impl ValidatedPool { .map(|tx| tx.hash.clone()) .collect::>() }; - let futures_to_remove: Vec> = { + let futures_to_remove: Vec> = { let p = self.pool.read(); let mut hashes = Vec::new(); for tx in p.futures() { @@ -494,7 +495,7 @@ impl ValidatedPool { /// Get rotator reference. #[cfg(test)] - pub fn rotator(&self) -> &PoolRotator> { + pub fn rotator(&self) -> &PoolRotator> { &self.rotator } @@ -507,14 +508,14 @@ impl ValidatedPool { /// /// Consumers of this stream should use the `ready` method to actually get the /// pending transactions in the right order. - pub fn import_notification_stream(&self) -> EventStream> { + pub fn import_notification_stream(&self) -> EventStream> { let (sink, stream) = tracing_unbounded("mpsc_import_notifications"); self.import_notification_sinks.lock().push(sink); stream } /// Invoked when extrinsics are broadcasted. - pub fn on_broadcasted(&self, propagated: HashMap, Vec>) { + pub fn on_broadcasted(&self, propagated: HashMap, Vec>) { let mut listener = self.listener.write(); for (hash, peers) in propagated.into_iter() { listener.broadcasted(&hash, peers); @@ -527,7 +528,7 @@ impl ValidatedPool { /// to prevent them from entering the pool right away. /// Note this is not the case for the dependent transactions - those may /// still be valid so we want to be able to re-import them. - pub fn remove_invalid(&self, hashes: &[ExHash]) -> Vec> { + pub fn remove_invalid(&self, hashes: &[ExtrinsicHash]) -> Vec> { // early exit in case there is no invalid transactions. if hashes.is_empty() { return vec![]; diff --git a/client/transaction-pool/src/api.rs b/client/transaction-pool/src/api.rs index 725fb6ec4a8bf..79315c2724b5b 100644 --- a/client/transaction-pool/src/api.rs +++ b/client/transaction-pool/src/api.rs @@ -25,9 +25,7 @@ use futures::{ }; use sc_client_api::{ - blockchain::HeaderBackend, - light::{Fetcher, RemoteCallRequest, RemoteBodyRequest}, - BlockBackend, + blockchain::HeaderBackend, light::{Fetcher, RemoteCallRequest, RemoteBodyRequest}, BlockBackend, }; use sp_runtime::{ generic::BlockId, traits::{self, Block as BlockT, BlockIdTo, Header as HeaderT, Hash as HashT}, @@ -45,10 +43,7 @@ pub struct FullChainApi { _marker: PhantomData, } -impl FullChainApi where - Block: BlockT, - Client: ProvideRuntimeApi + BlockIdTo, -{ +impl FullChainApi { /// Create new transaction pool logic. pub fn new(client: Arc) -> Self { FullChainApi { @@ -58,12 +53,13 @@ impl FullChainApi where .name_prefix("txpool-verifier") .create() .expect("Failed to spawn verifier threads, that are critical for node operation."), - _marker: Default::default() + _marker: Default::default(), } } } -impl sc_transaction_graph::ChainApi for FullChainApi where +impl sc_transaction_graph::ChainApi for FullChainApi +where Block: BlockT, Client: ProvideRuntimeApi + BlockBackend + BlockIdTo, Client: Send + Sync + 'static, @@ -71,9 +67,10 @@ impl sc_transaction_graph::ChainApi for FullChainApi: Send, { type Block = Block; - type Hash = Block::Hash; type Error = error::Error; - type ValidationFuture = Pin> + Send>>; + type ValidationFuture = Pin< + Box> + Send> + >; type BodyFuture = Ready::Extrinsic>>>>; fn block_body(&self, id: &BlockId) -> Self::BodyFuture { @@ -136,7 +133,10 @@ impl sc_transaction_graph::ChainApi for FullChainApi) -> (Self::Hash, usize) { + fn hash_and_length( + &self, + ex: &sc_transaction_graph::ExtrinsicFor, + ) -> (sc_transaction_graph::ExtrinsicHash, usize) { ex.using_encoded(|x| { ( as traits::Hash>::hash(x), x.len()) }) @@ -150,11 +150,7 @@ pub struct LightChainApi { _phantom: PhantomData, } -impl LightChainApi where - Block: BlockT, - Client: HeaderBackend, - F: Fetcher, -{ +impl LightChainApi { /// Create new transaction pool logic. pub fn new(client: Arc, fetcher: Arc) -> Self { LightChainApi { @@ -165,16 +161,23 @@ impl LightChainApi where } } -impl sc_transaction_graph::ChainApi for LightChainApi where - Block: BlockT, - Client: HeaderBackend + 'static, - F: Fetcher + 'static, +impl sc_transaction_graph::ChainApi for + LightChainApi where + Block: BlockT, + Client: HeaderBackend + 'static, + F: Fetcher + 'static, { type Block = Block; - type Hash = Block::Hash; type Error = error::Error; - type ValidationFuture = Box> + Send + Unpin>; - type BodyFuture = Pin::Extrinsic>>>> + Send>>; + type ValidationFuture = Box< + dyn Future> + Send + Unpin + >; + type BodyFuture = Pin< + Box< + dyn Future::Extrinsic>>>> + + Send + > + >; fn validate_transaction( &self, @@ -211,15 +214,24 @@ impl sc_transaction_graph::ChainApi for LightChainApi) -> error::Result>> { + fn block_id_to_number( + &self, + at: &BlockId, + ) -> error::Result>> { Ok(self.client.block_number_from_id(at)?) } - fn block_id_to_hash(&self, at: &BlockId) -> error::Result>> { + fn block_id_to_hash( + &self, + at: &BlockId, + ) -> error::Result>> { Ok(self.client.block_hash_from_id(at)?) } - fn hash_and_length(&self, ex: &sc_transaction_graph::ExtrinsicFor) -> (Self::Hash, usize) { + fn hash_and_length( + &self, + ex: &sc_transaction_graph::ExtrinsicFor, + ) -> (sc_transaction_graph::ExtrinsicHash, usize) { ex.using_encoded(|x| { (<::Hashing as HashT>::hash(x), x.len()) }) diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index 05d7189a04a0d..326c5e1a751c3 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -35,7 +35,7 @@ pub use sc_transaction_graph as txpool; pub use crate::api::{FullChainApi, LightChainApi}; use std::{collections::HashMap, sync::Arc, pin::Pin}; -use futures::{prelude::*, future::ready, channel::oneshot}; +use futures::{prelude::*, future::{ready, self}, channel::oneshot}; use parking_lot::Mutex; use sp_runtime::{ @@ -47,14 +47,19 @@ use sp_transaction_pool::{ TransactionStatusStreamFor, MaintainedTransactionPool, PoolFuture, ChainEvent, TransactionSource, }; +use sc_transaction_graph::ChainApi; use wasm_timer::Instant; use prometheus_endpoint::Registry as PrometheusRegistry; use crate::metrics::MetricsLink as PrometheusMetrics; -type BoxedReadyIterator = Box>> + Send>; +type BoxedReadyIterator = Box< + dyn Iterator>> + Send +>; -type ReadyIteratorFor = BoxedReadyIterator, sc_transaction_graph::ExtrinsicFor>; +type ReadyIteratorFor = BoxedReadyIterator< + sc_transaction_graph::ExtrinsicHash, sc_transaction_graph::ExtrinsicFor +>; type PolledIterator = Pin> + Send>>; @@ -62,7 +67,7 @@ type PolledIterator = Pin where Block: BlockT, - PoolApi: sc_transaction_graph::ChainApi, + PoolApi: ChainApi, { pool: Arc>, api: Arc, @@ -116,8 +121,7 @@ impl ReadyPoll { #[cfg(not(target_os = "unknown"))] impl parity_util_mem::MallocSizeOf for BasicPool where - PoolApi: sc_transaction_graph::ChainApi, - PoolApi::Hash: parity_util_mem::MallocSizeOf, + PoolApi: ChainApi, Block: BlockT, { fn size_of(&self, ops: &mut parity_util_mem::MallocSizeOfOps) -> usize { @@ -146,7 +150,7 @@ pub enum RevalidationType { impl BasicPool where Block: BlockT, - PoolApi: sc_transaction_graph::ChainApi + 'static, + PoolApi: ChainApi + 'static, { /// Create new basic transaction pool with provided api. /// @@ -226,11 +230,13 @@ impl BasicPool impl TransactionPool for BasicPool where Block: BlockT, - PoolApi: 'static + sc_transaction_graph::ChainApi, + PoolApi: 'static + ChainApi, { type Block = PoolApi::Block; - type Hash = sc_transaction_graph::ExHash; - type InPoolTransaction = sc_transaction_graph::base_pool::Transaction, TransactionFor>; + type Hash = sc_transaction_graph::ExtrinsicHash; + type InPoolTransaction = sc_transaction_graph::base_pool::Transaction< + TxHash, TransactionFor + >; type Error = PoolApi::Error; fn submit_at( @@ -429,22 +435,51 @@ impl RevalidationStatus { } } +/// Prune the known txs for the given block. +async fn prune_known_txs_for_block>( + block_id: BlockId, + api: &Api, + pool: &sc_transaction_graph::Pool, +) { + // We don't query block if we won't prune anything + if pool.validated_pool().status().is_empty() { + return; + } + + let hashes = api.block_body(&block_id).await + .unwrap_or_else(|e| { + log::warn!("Prune known transactions: error request {:?}!", e); + None + }) + .unwrap_or_default() + .into_iter() + .map(|tx| pool.hash_of(&tx)) + .collect::>(); + + if let Err(e) = pool.prune_known(&block_id, &hashes) { + log::error!("Cannot prune known in the pool {:?}!", e); + } +} + impl MaintainedTransactionPool for BasicPool where Block: BlockT, - PoolApi: 'static + sc_transaction_graph::ChainApi, + PoolApi: 'static + ChainApi, { fn maintain(&self, event: ChainEvent) -> Pin + Send>> { match event { - ChainEvent::NewBlock { id, retracted, .. } => { - let id = id.clone(); + ChainEvent::NewBlock { id, tree_route, is_new_best, .. } => { let pool = self.pool.clone(); let api = self.api.clone(); let block_number = match api.block_id_to_number(&id) { Ok(Some(number)) => number, _ => { - log::trace!(target: "txpool", "Skipping chain event - no number for that block {:?}", id); + log::trace!( + target: "txpool", + "Skipping chain event - no number for that block {:?}", + id, + ); return Box::pin(ready(())); } }; @@ -455,40 +490,40 @@ impl MaintainedTransactionPool for BasicPool Some(20.into()), ); let revalidation_strategy = self.revalidation_strategy.clone(); - let retracted = retracted.clone(); let revalidation_queue = self.revalidation_queue.clone(); let ready_poll = self.ready_poll.clone(); async move { - // We don't query block if we won't prune anything - if !pool.validated_pool().status().is_empty() { - let hashes = api.block_body(&id).await - .unwrap_or_else(|e| { - log::warn!("Prune known transactions: error request {:?}!", e); - None - }) - .unwrap_or_default() - .into_iter() - .map(|tx| pool.hash_of(&tx)) - .collect::>(); - - if let Err(e) = pool.prune_known(&id, &hashes) { - log::error!("Cannot prune known in the pool {:?}!", e); - } + // If there is a tree route, we use this to prune known tx based on the enacted + // blocks and otherwise we only prune known txs if the block is + // the new best block. + if let Some(ref tree_route) = tree_route { + future::join_all( + tree_route + .enacted() + .iter().map(|h| + prune_known_txs_for_block( + BlockId::Hash(h.hash.clone()), + &*api, + &*pool, + ), + ), + ).await; + } else if is_new_best { + prune_known_txs_for_block(id.clone(), &*api, &*pool).await; } - let extra_pool = pool.clone(); - // After #5200 lands, this arguably might be moved to the handler of "all blocks notification". - ready_poll.lock().trigger(block_number, move || Box::new(extra_pool.validated_pool().ready())); - - if next_action.resubmit { + if let (true, Some(tree_route)) = (next_action.resubmit, tree_route) { let mut resubmit_transactions = Vec::new(); - for retracted_hash in retracted { + for retracted in tree_route.retracted() { + let hash = retracted.hash.clone(); + // notify txs awaiting finality that it has been retracted - pool.validated_pool().on_block_retracted(retracted_hash.clone()); + pool.validated_pool().on_block_retracted(hash.clone()); - let block_transactions = api.block_body(&BlockId::hash(retracted_hash.clone())).await + let block_transactions = api.block_body(&BlockId::hash(hash)) + .await .unwrap_or_else(|e| { log::warn!("Failed to fetch block body {:?}!", e); None @@ -499,23 +534,37 @@ impl MaintainedTransactionPool for BasicPool resubmit_transactions.extend(block_transactions); } + if let Err(e) = pool.submit_at( &id, // These transactions are coming from retracted blocks, we should // simply consider them external. TransactionSource::External, resubmit_transactions, - true + true, ).await { log::debug!( target: "txpool", - "[{:?}] Error re-submitting transactions: {:?}", id, e + "[{:?}] Error re-submitting transactions: {:?}", + id, + e, ) } } + let extra_pool = pool.clone(); + // After #5200 lands, this arguably might be moved to the + // handler of "all blocks notification". + ready_poll.lock().trigger( + block_number, + move || Box::new(extra_pool.validated_pool().ready()), + ); + if next_action.revalidate { - let hashes = pool.validated_pool().ready().map(|tx| tx.hash.clone()).collect(); + let hashes = pool.validated_pool() + .ready() + .map(|tx| tx.hash.clone()) + .collect(); revalidation_queue.revalidate_later(block_number, hashes).await; } diff --git a/client/transaction-pool/src/revalidation.rs b/client/transaction-pool/src/revalidation.rs index 423ff92ba4db0..05a2076c6659e 100644 --- a/client/transaction-pool/src/revalidation.rs +++ b/client/transaction-pool/src/revalidation.rs @@ -20,7 +20,7 @@ use std::{sync::Arc, pin::Pin, collections::{HashMap, HashSet, BTreeMap}}; -use sc_transaction_graph::{ChainApi, Pool, ExHash, NumberFor, ValidatedTransaction}; +use sc_transaction_graph::{ChainApi, Pool, ExtrinsicHash, NumberFor, ValidatedTransaction}; use sp_runtime::traits::{Zero, SaturatedConversion}; use sp_runtime::generic::BlockId; use sp_runtime::transaction_validity::TransactionValidityError; @@ -39,7 +39,7 @@ const BACKGROUND_REVALIDATION_BATCH_SIZE: usize = 20; /// Payload from queue to worker. struct WorkerPayload { at: NumberFor, - transactions: Vec>, + transactions: Vec>, } /// Async revalidation worker. @@ -49,8 +49,8 @@ struct RevalidationWorker { api: Arc, pool: Arc>, best_block: NumberFor, - block_ordered: BTreeMap, HashSet>>, - members: HashMap, NumberFor>, + block_ordered: BTreeMap, HashSet>>, + members: HashMap, NumberFor>, } impl Unpin for RevalidationWorker {} @@ -63,7 +63,7 @@ async fn batch_revalidate( pool: Arc>, api: Arc, at: NumberFor, - batch: impl IntoIterator>, + batch: impl IntoIterator>, ) { let mut invalid_hashes = Vec::new(); let mut revalidated = HashMap::new(); @@ -129,7 +129,7 @@ impl RevalidationWorker { } } - fn prepare_batch(&mut self) -> Vec> { + fn prepare_batch(&mut self) -> Vec> { let mut queued_exts = Vec::new(); let mut left = BACKGROUND_REVALIDATION_BATCH_SIZE; @@ -334,7 +334,7 @@ where /// If queue configured with background worker, this will return immediately. /// If queue configured without background worker, this will resolve after /// revalidation is actually done. - pub async fn revalidate_later(&self, at: NumberFor, transactions: Vec>) { + pub async fn revalidate_later(&self, at: NumberFor, transactions: Vec>) { if transactions.len() > 0 { log::debug!(target: "txpool", "Sent {} transactions to revalidation queue", transactions.len()); } @@ -359,9 +359,7 @@ mod tests { use sp_transaction_pool::TransactionSource; use substrate_test_runtime_transaction_pool::{TestApi, uxt}; use futures::executor::block_on; - use substrate_test_runtime_client::{ - AccountKeyring::*, - }; + use substrate_test_runtime_client::AccountKeyring::*; fn setup() -> (Arc, Pool) { let test_api = Arc::new(TestApi::empty()); diff --git a/client/transaction-pool/src/testing/pool.rs b/client/transaction-pool/src/testing/pool.rs index 4f30d5b6c3500..dafd829c64d11 100644 --- a/client/transaction-pool/src/testing/pool.rs +++ b/client/transaction-pool/src/testing/pool.rs @@ -25,12 +25,12 @@ use sp_runtime::{ transaction_validity::{ValidTransaction, TransactionSource, InvalidTransaction}, }; use substrate_test_runtime_client::{ - runtime::{Block, Hash, Index, Header, Extrinsic, Transfer}, - AccountKeyring::*, + runtime::{Block, Hash, Index, Header, Extrinsic, Transfer}, AccountKeyring::*, }; use substrate_test_runtime_transaction_pool::{TestApi, uxt}; use futures::{prelude::*, task::Poll}; use codec::Encode; +use std::collections::BTreeSet; fn pool() -> Pool { Pool::new(Default::default(), TestApi::with_alice_nonce(209).into()) @@ -42,7 +42,7 @@ fn maintained_pool() -> ( intervalier::BackSignalControl, ) { let (pool, background_task, notifier) = BasicPool::new_test( - std::sync::Arc::new(TestApi::with_alice_nonce(209)) + Arc::new(TestApi::with_alice_nonce(209)), ); let thread_pool = futures::executor::ThreadPool::new().unwrap(); @@ -112,6 +112,7 @@ fn prune_tags_should_work() { let pending: Vec<_> = pool.validated_pool().ready().map(|a| a.data.transfer().nonce).collect(); assert_eq!(pending, vec![209, 210]); + pool.validated_pool().api().push_block(1, Vec::new()); block_on( pool.prune_tags( &BlockId::number(1), @@ -140,6 +141,37 @@ fn should_ban_invalid_transactions() { block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt.clone())).unwrap_err(); } +#[test] +fn only_prune_on_new_best() { + let pool = maintained_pool().0; + let uxt = uxt(Alice, 209); + + let _ = block_on( + pool.submit_and_watch(&BlockId::number(1), SOURCE, uxt.clone()) + ).expect("1. Imported"); + let header = pool.api.push_block(1, vec![uxt.clone()]); + assert_eq!(pool.status().ready, 1); + + let event = ChainEvent::NewBlock { + id: BlockId::Number(1), + is_new_best: false, + header: header.clone(), + tree_route: None, + }; + block_on(pool.maintain(event)); + assert_eq!(pool.status().ready, 1); + + let header = pool.api.push_block(2, vec![uxt]); + let event = ChainEvent::NewBlock { + id: BlockId::Number(2), + is_new_best: true, + header: header.clone(), + tree_route: None, + }; + block_on(pool.maintain(event)); + assert_eq!(pool.status().ready, 0); +} + #[test] fn should_correctly_prune_transactions_providing_more_than_one_tag() { let api = Arc::new(TestApi::with_alice_nonce(209)); @@ -153,6 +185,7 @@ fn should_correctly_prune_transactions_providing_more_than_one_tag() { // remove the transaction that just got imported. api.increment_nonce(Alice.into()); + api.push_block(1, Vec::new()); block_on(pool.prune_tags(&BlockId::number(1), vec![vec![209]], vec![])).expect("1. Pruned"); assert_eq!(pool.validated_pool().status().ready, 0); // it's re-imported to future @@ -160,6 +193,7 @@ fn should_correctly_prune_transactions_providing_more_than_one_tag() { // so now let's insert another transaction that also provides the 155 api.increment_nonce(Alice.into()); + api.push_block(2, Vec::new()); let xt = uxt(Alice, 211); block_on(pool.submit_one(&BlockId::number(2), SOURCE, xt.clone())).expect("2. Imported"); assert_eq!(pool.validated_pool().status().ready, 1); @@ -169,6 +203,7 @@ fn should_correctly_prune_transactions_providing_more_than_one_tag() { // prune it and make sure the pool is empty api.increment_nonce(Alice.into()); + api.push_block(3, Vec::new()); block_on(pool.prune_tags(&BlockId::number(3), vec![vec![155]], vec![])).expect("2. Pruned"); assert_eq!(pool.validated_pool().status().ready, 0); assert_eq!(pool.validated_pool().status().future, 2); @@ -178,21 +213,26 @@ fn block_event(id: u64) -> ChainEvent { ChainEvent::NewBlock { id: BlockId::number(id), is_new_best: true, - retracted: vec![], + tree_route: None, header: header(id), } } -fn block_event_with_retracted(id: u64, retracted: Vec) -> ChainEvent { +fn block_event_with_retracted( + header: Header, + retracted_start: Hash, + api: &TestApi, +) -> ChainEvent { + let tree_route = api.tree_route(retracted_start, header.hash()).expect("Tree route exists"); + ChainEvent::NewBlock { - id: BlockId::number(id), + id: BlockId::hash(header.hash()), is_new_best: true, - retracted: retracted, - header: header(id), + tree_route: Some(Arc::new(tree_route)), + header, } } - #[test] fn should_prune_old_during_maintenance() { let xt = uxt(Alice, 209); @@ -232,17 +272,16 @@ fn should_revalidate_during_maintenance() { #[test] fn should_resubmit_from_retracted_during_maintenance() { let xt = uxt(Alice, 209); - let retracted_hash = Hash::random(); let (pool, _guard, _notifier) = maintained_pool(); block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported"); assert_eq!(pool.status().ready, 1); - pool.api.push_block(1, vec![]); - pool.api.push_fork_block(retracted_hash, vec![xt.clone()]); + let header = pool.api.push_block(1, vec![]); + let fork_header = pool.api.push_block(1, vec![]); - let event = block_event_with_retracted(1, vec![retracted_hash]); + let event = block_event_with_retracted(header, fork_header.hash(), &*pool.api); block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 1); @@ -251,18 +290,17 @@ fn should_resubmit_from_retracted_during_maintenance() { #[test] fn should_not_retain_invalid_hashes_from_retracted() { let xt = uxt(Alice, 209); - let retracted_hash = Hash::random(); let (pool, _guard, mut notifier) = maintained_pool(); block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported"); assert_eq!(pool.status().ready, 1); - pool.api.push_block(1, vec![]); - pool.api.push_fork_block(retracted_hash, vec![xt.clone()]); + let header = pool.api.push_block(1, vec![]); + let fork_header = pool.api.push_block(1, vec![xt.clone()]); pool.api.add_invalid(&xt); - let event = block_event_with_retracted(1, vec![retracted_hash]); + let event = block_event_with_retracted(header, fork_header.hash(), &*pool.api); block_on(pool.maintain(event)); block_on(notifier.next()); @@ -389,15 +427,24 @@ fn should_push_watchers_during_maintaince() { // events for hash2 are: Ready, InBlock assert_eq!( futures::executor::block_on_stream(watcher0).collect::>(), - vec![TransactionStatus::Ready, TransactionStatus::InBlock(header_hash.clone()), TransactionStatus::Finalized(header_hash.clone())], + vec![ + TransactionStatus::Ready, + TransactionStatus::InBlock(header_hash.clone()), + TransactionStatus::Finalized(header_hash.clone())], ); assert_eq!( futures::executor::block_on_stream(watcher1).collect::>(), - vec![TransactionStatus::Ready, TransactionStatus::InBlock(header_hash.clone()), TransactionStatus::Finalized(header_hash.clone())], + vec![ + TransactionStatus::Ready, + TransactionStatus::InBlock(header_hash.clone()), + TransactionStatus::Finalized(header_hash.clone())], ); assert_eq!( futures::executor::block_on_stream(watcher2).collect::>(), - vec![TransactionStatus::Ready, TransactionStatus::InBlock(header_hash.clone()), TransactionStatus::Finalized(header_hash.clone())], + vec![ + TransactionStatus::Ready, + TransactionStatus::InBlock(header_hash.clone()), + TransactionStatus::Finalized(header_hash.clone())], ); } @@ -423,12 +470,12 @@ fn finalization() { ).expect("1. Imported"); pool.api.push_block(2, vec![xt.clone()]); - let header = pool.api.chain().read().header_by_number.get(&2).cloned().unwrap(); + let header = pool.api.chain().read().block_by_number.get(&2).unwrap()[0].header().clone(); let event = ChainEvent::NewBlock { id: BlockId::Hash(header.hash()), is_new_best: true, header: header.clone(), - retracted: vec![] + tree_route: None, }; block_on(pool.maintain(event)); @@ -467,7 +514,6 @@ fn fork_aware_finalization() { let c2; let d2; - // block B1 { let watcher = block_on( @@ -481,7 +527,7 @@ fn fork_aware_finalization() { id: BlockId::Number(2), is_new_best: true, header: header.clone(), - retracted: vec![], + tree_route: None, }; b1 = header.hash(); block_on(pool.maintain(event)); @@ -492,7 +538,7 @@ fn fork_aware_finalization() { // block C2 { - let header = pool.api.push_fork_block_with_parent(b1, vec![from_dave.clone()]); + let header = pool.api.push_block_with_parent(b1, vec![from_dave.clone()]); from_dave_watcher = block_on( pool.submit_and_watch(&BlockId::number(1), SOURCE, from_dave.clone()) ).expect("1. Imported"); @@ -501,7 +547,7 @@ fn fork_aware_finalization() { id: BlockId::Hash(header.hash()), is_new_best: true, header: header.clone(), - retracted: vec![] + tree_route: None, }; c2 = header.hash(); block_on(pool.maintain(event)); @@ -514,13 +560,13 @@ fn fork_aware_finalization() { pool.submit_and_watch(&BlockId::number(1), SOURCE, from_bob.clone()) ).expect("1. Imported"); assert_eq!(pool.status().ready, 1); - let header = pool.api.push_fork_block_with_parent(c2, vec![from_bob.clone()]); + let header = pool.api.push_block_with_parent(c2, vec![from_bob.clone()]); let event = ChainEvent::NewBlock { id: BlockId::Hash(header.hash()), is_new_best: true, header: header.clone(), - retracted: vec![] + tree_route: None, }; d2 = header.hash(); block_on(pool.maintain(event)); @@ -536,14 +582,10 @@ fn fork_aware_finalization() { let header = pool.api.push_block(3, vec![from_charlie.clone()]); canon_watchers.push((watcher, header.hash())); - let event = ChainEvent::NewBlock { - id: BlockId::Number(3), - is_new_best: true, - header: header.clone(), - retracted: vec![c2, d2], - }; + let event = block_event_with_retracted(header.clone(), d2, &*pool.api); block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 2); + let event = ChainEvent::Finalized { hash: header.hash() }; block_on(pool.maintain(event)); } @@ -562,7 +604,7 @@ fn fork_aware_finalization() { id: BlockId::Hash(header.hash()), is_new_best: true, header: header.clone(), - retracted: vec![] + tree_route: None, }; d1 = header.hash(); block_on(pool.maintain(event)); @@ -581,7 +623,7 @@ fn fork_aware_finalization() { id: BlockId::Hash(header.hash()), is_new_best: true, header: header.clone(), - retracted: vec![] + tree_route: None, }; block_on(pool.maintain(event)); assert_eq!(pool.status().ready, 0); @@ -599,7 +641,7 @@ fn fork_aware_finalization() { { - let mut stream= futures::executor::block_on_stream(from_dave_watcher); + let mut stream = futures::executor::block_on_stream(from_dave_watcher); assert_eq!(stream.next(), Some(TransactionStatus::Ready)); assert_eq!(stream.next(), Some(TransactionStatus::InBlock(c2.clone()))); assert_eq!(stream.next(), Some(TransactionStatus::Retracted(c2))); @@ -610,7 +652,7 @@ fn fork_aware_finalization() { } { - let mut stream= futures::executor::block_on_stream(from_bob_watcher); + let mut stream = futures::executor::block_on_stream(from_bob_watcher); assert_eq!(stream.next(), Some(TransactionStatus::Ready)); assert_eq!(stream.next(), Some(TransactionStatus::InBlock(d2.clone()))); assert_eq!(stream.next(), Some(TransactionStatus::Retracted(d2))); @@ -621,6 +663,217 @@ fn fork_aware_finalization() { } } +/// This test ensures that transactions from a fork are re-submitted if +/// the forked block is not part of the retracted blocks. This happens as the +/// retracted block list only contains the route from the old best to the new +/// best, without any further forks. +/// +/// Given the following: +/// +/// -> D0 (old best, tx0) +/// / +/// C - -> D1 (tx1) +/// \ +/// -> D2 (new best) +/// +/// Retracted will contain `D0`, but we need to re-submit `tx0` and `tx1` as both +/// blocks are not part of the canonical chain. +#[test] +fn resubmit_tx_of_fork_that_is_not_part_of_retracted() { + let api = TestApi::empty(); + // starting block A1 (last finalized.) + api.push_block(1, vec![]); + + let (pool, _background, _) = BasicPool::new_test(api.into()); + + let tx0 = uxt(Alice, 1); + let tx1 = uxt(Dave, 2); + pool.api.increment_nonce(Alice.into()); + pool.api.increment_nonce(Dave.into()); + + let d0; + + // Block D0 + { + let _ = block_on( + pool.submit_and_watch(&BlockId::number(1), SOURCE, tx0.clone()) + ).expect("1. Imported"); + let header = pool.api.push_block(2, vec![tx0.clone()]); + assert_eq!(pool.status().ready, 1); + + let event = ChainEvent::NewBlock { + id: BlockId::Number(2), + is_new_best: true, + header: header.clone(), + tree_route: None, + }; + d0 = header.hash(); + block_on(pool.maintain(event)); + assert_eq!(pool.status().ready, 0); + } + + // Block D1 + { + let _ = block_on( + pool.submit_and_watch(&BlockId::number(1), SOURCE, tx1.clone()) + ).expect("1. Imported"); + let header = pool.api.push_block(2, vec![tx1.clone()]); + assert_eq!(pool.status().ready, 1); + let event = ChainEvent::NewBlock { + id: BlockId::Hash(header.hash()), + is_new_best: false, + header: header.clone(), + tree_route: None, + }; + block_on(pool.maintain(event)); + + // Only transactions from new best should be pruned + assert_eq!(pool.status().ready, 1); + } + + // Block D2 + { + let header = pool.api.push_block(2, vec![]); + let event = block_event_with_retracted(header, d0, &*pool.api); + block_on(pool.maintain(event)); + assert_eq!(pool.status().ready, 2); + } +} + +#[test] +fn resubmit_from_retracted_fork() { + let api = TestApi::empty(); + // starting block A1 (last finalized.) + api.push_block(1, vec![]); + + let (pool, _background, _) = BasicPool::new_test(api.into()); + + let tx0 = uxt(Alice, 1); + let tx1 = uxt(Dave, 2); + let tx2 = uxt(Bob, 3); + + // Transactions of the fork that will be enacted later + let tx3 = uxt(Eve, 1); + let tx4 = uxt(Ferdie, 2); + let tx5 = uxt(One, 3); + + pool.api.increment_nonce(Alice.into()); + pool.api.increment_nonce(Dave.into()); + pool.api.increment_nonce(Bob.into()); + pool.api.increment_nonce(Eve.into()); + pool.api.increment_nonce(Ferdie.into()); + pool.api.increment_nonce(One.into()); + + // Block D0 + { + let _ = block_on( + pool.submit_and_watch(&BlockId::number(1), SOURCE, tx0.clone()) + ).expect("1. Imported"); + let header = pool.api.push_block(2, vec![tx0.clone()]); + assert_eq!(pool.status().ready, 1); + + let event = ChainEvent::NewBlock { + id: BlockId::Number(2), + is_new_best: true, + header: header.clone(), + tree_route: None, + }; + block_on(pool.maintain(event)); + assert_eq!(pool.status().ready, 0); + } + + // Block E0 + { + let _ = block_on( + pool.submit_and_watch(&BlockId::number(1), SOURCE, tx1.clone()) + ).expect("1. Imported"); + let header = pool.api.push_block(3, vec![tx1.clone()]); + let event = ChainEvent::NewBlock { + id: BlockId::Hash(header.hash()), + is_new_best: true, + header: header.clone(), + tree_route: None, + }; + block_on(pool.maintain(event)); + assert_eq!(pool.status().ready, 0); + } + + // Block F0 + let f0 = { + let _ = block_on( + pool.submit_and_watch(&BlockId::number(1), SOURCE, tx2.clone()) + ).expect("1. Imported"); + let header = pool.api.push_block(4, vec![tx2.clone()]); + let event = ChainEvent::NewBlock { + id: BlockId::Hash(header.hash()), + is_new_best: true, + header: header.clone(), + tree_route: None, + }; + block_on(pool.maintain(event)); + assert_eq!(pool.status().ready, 0); + header.hash() + }; + + // Block D1 + let d1 = { + let _ = block_on( + pool.submit_and_watch(&BlockId::number(1), SOURCE, tx3.clone()) + ).expect("1. Imported"); + let header = pool.api.push_block(2, vec![tx3.clone()]); + let event = ChainEvent::NewBlock { + id: BlockId::Hash(header.hash()), + is_new_best: false, + header: header.clone(), + tree_route: None, + }; + block_on(pool.maintain(event)); + assert_eq!(pool.status().ready, 1); + header.hash() + }; + + // Block E1 + let e1 = { + let _ = block_on( + pool.submit_and_watch(&BlockId::number(1), SOURCE, tx4.clone()) + ).expect("1. Imported"); + let header = pool.api.push_block_with_parent(d1.clone(), vec![tx4.clone()]); + let event = ChainEvent::NewBlock { + id: BlockId::Hash(header.hash()), + is_new_best: false, + header: header.clone(), + tree_route: None, + }; + block_on(pool.maintain(event)); + assert_eq!(pool.status().ready, 2); + header.hash() + }; + + // Block F1 + let f1_header = { + let _ = block_on( + pool.submit_and_watch(&BlockId::number(1), SOURCE, tx5.clone()) + ).expect("1. Imported"); + let header = pool.api.push_block_with_parent(e1.clone(), vec![tx5.clone()]); + // Don't announce the block event to the pool directly, because we will + // re-org to this block. + assert_eq!(pool.status().ready, 3); + header + }; + + let ready = pool.ready().map(|t| t.data.encode()).collect::>(); + let expected_ready = vec![tx3, tx4, tx5].iter().map(Encode::encode).collect::>(); + assert_eq!(expected_ready, ready); + + let event = block_event_with_retracted(f1_header, f0, &*pool.api); + block_on(pool.maintain(event)); + + assert_eq!(pool.status().ready, 3); + let ready = pool.ready().map(|t| t.data.encode()).collect::>(); + let expected_ready = vec![tx0, tx1, tx2].iter().map(Encode::encode).collect::>(); + assert_eq!(expected_ready, ready); +} + #[test] fn ready_set_should_not_resolve_before_block_update() { let (pool, _guard, _notifier) = maintained_pool(); @@ -678,6 +931,7 @@ fn should_not_accept_old_signatures() { use std::convert::TryFrom; let client = Arc::new(substrate_test_runtime_client::new()); + let pool = Arc::new( BasicPool::new_test(Arc::new(FullChainApi::new(client))).0 ); diff --git a/primitives/blockchain/Cargo.toml b/primitives/blockchain/Cargo.toml index d76d9c72091c0..689d84d80fcf0 100644 --- a/primitives/blockchain/Cargo.toml +++ b/primitives/blockchain/Cargo.toml @@ -12,7 +12,6 @@ documentation = "https://docs.rs/sp-blockchain" [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] - [dependencies] log = "0.4.8" lru = "0.4.0" diff --git a/primitives/blockchain/src/header_metadata.rs b/primitives/blockchain/src/header_metadata.rs index a4df04b507fd1..b8d9c5c934581 100644 --- a/primitives/blockchain/src/header_metadata.rs +++ b/primitives/blockchain/src/header_metadata.rs @@ -137,7 +137,8 @@ pub fn tree_route>( from = backend.header_metadata(from.parent)?; } - // add the pivot block. and append the reversed to-branch (note that it's reverse order originals) + // add the pivot block. and append the reversed to-branch + // (note that it's reverse order originals) let pivot = from_branch.len(); from_branch.push(HashAndNumber { number: to.number, @@ -182,18 +183,24 @@ pub struct HashAndNumber { /// Tree route from C to E2. Retracted empty. Common is C, enacted [E1, E2] /// C -> E1 -> E2 /// ``` -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TreeRoute { route: Vec>, pivot: usize, } impl TreeRoute { - /// Get a slice of all retracted blocks in reverse order (towards common ancestor) + /// Get a slice of all retracted blocks in reverse order (towards common ancestor). pub fn retracted(&self) -> &[HashAndNumber] { &self.route[..self.pivot] } + /// Convert into all retracted blocks in reverse order (towards common ancestor). + pub fn into_retracted(mut self) -> Vec> { + self.route.truncate(self.pivot); + self.route + } + /// Get the common ancestor block. This might be one of the two blocks of the /// route. pub fn common_block(&self) -> &HashAndNumber { @@ -213,8 +220,15 @@ pub trait HeaderMetadata { /// Error used in case the header metadata is not found. type Error; - fn header_metadata(&self, hash: Block::Hash) -> Result, Self::Error>; - fn insert_header_metadata(&self, hash: Block::Hash, header_metadata: CachedHeaderMetadata); + fn header_metadata( + &self, + hash: Block::Hash, + ) -> Result, Self::Error>; + fn insert_header_metadata( + &self, + hash: Block::Hash, + header_metadata: CachedHeaderMetadata, + ); fn remove_header_metadata(&self, hash: Block::Hash); } diff --git a/primitives/transaction-pool/Cargo.toml b/primitives/transaction-pool/Cargo.toml index dbb21f34b6e0c..b7673e8da0d5c 100644 --- a/primitives/transaction-pool/Cargo.toml +++ b/primitives/transaction-pool/Cargo.toml @@ -20,6 +20,7 @@ futures = { version = "0.3.1", optional = true } log = { version = "0.4.8", optional = true } serde = { version = "1.0.101", features = ["derive"], optional = true} sp-api = { version = "2.0.0-rc2", default-features = false, path = "../api" } +sp-blockchain = { version = "2.0.0-rc2", optional = true, path = "../blockchain" } sp-runtime = { version = "2.0.0-rc2", default-features = false, path = "../runtime" } sp-utils = { version = "2.0.0-rc2", default-features = false, path = "../utils" } @@ -32,5 +33,6 @@ std = [ "log", "serde", "sp-api/std", + "sp-blockchain", "sp-runtime/std", ] diff --git a/primitives/transaction-pool/src/pool.rs b/primitives/transaction-pool/src/pool.rs index 762ff06a9eebb..fa50ef9e41775 100644 --- a/primitives/transaction-pool/src/pool.rs +++ b/primitives/transaction-pool/src/pool.rs @@ -255,8 +255,10 @@ pub enum ChainEvent { id: BlockId, /// Header of the just imported block header: B::Header, - /// List of retracted blocks ordered by block number. - retracted: Vec, + /// Tree route from old best to new best that was calculated on import. + /// + /// If `None`, no re-org happened on import. + tree_route: Option>>, }, /// An existing block has been finalized. Finalized { diff --git a/test-utils/runtime/client/src/lib.rs b/test-utils/runtime/client/src/lib.rs index 5b0eafb4a3d64..f2e049bc0f56b 100644 --- a/test-utils/runtime/client/src/lib.rs +++ b/test-utils/runtime/client/src/lib.rs @@ -35,12 +35,9 @@ use sp_core::{sr25519, ChangesTrieConfiguration}; use sp_core::storage::{ChildInfo, Storage, StorageChild}; use substrate_test_runtime::genesismap::{GenesisConfig, additional_storage_with_genesis}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Hash as HashT, NumberFor, HashFor}; -use sc_service::client::{ - light::fetcher::{ - Fetcher, - RemoteHeaderRequest, RemoteReadRequest, RemoteReadChildRequest, - RemoteCallRequest, RemoteChangesRequest, RemoteBodyRequest, - }, +use sc_service::client::light::fetcher::{ + Fetcher, RemoteHeaderRequest, RemoteReadRequest, RemoteReadChildRequest, + RemoteCallRequest, RemoteChangesRequest, RemoteBodyRequest, }; /// A prelude to import in tests. diff --git a/test-utils/runtime/src/lib.rs b/test-utils/runtime/src/lib.rs index eaceef2defb49..bf6c7450c5086 100644 --- a/test-utils/runtime/src/lib.rs +++ b/test-utils/runtime/src/lib.rs @@ -208,6 +208,8 @@ pub type AccountSignature = sr25519::Signature; pub type AccountId = ::Signer; /// A simple hash type for all our hashing. pub type Hash = H256; +/// The hashing algorithm used. +pub type Hashing = BlakeTwo256; /// The block number type used in this runtime. pub type BlockNumber = u64; /// Index of a transaction. @@ -219,7 +221,7 @@ pub type Digest = sp_runtime::generic::Digest; /// A test block. pub type Block = sp_runtime::generic::Block; /// A test block's header. -pub type Header = sp_runtime::generic::Header; +pub type Header = sp_runtime::generic::Header; /// Run whatever tests we have. pub fn run_tests(mut input: &[u8]) -> Vec { @@ -404,7 +406,7 @@ impl frame_system::Trait for Runtime { type Index = u64; type BlockNumber = u64; type Hash = H256; - type Hashing = BlakeTwo256; + type Hashing = Hashing; type AccountId = u64; type Lookup = IdentityLookup; type Header = Header; @@ -466,7 +468,7 @@ fn code_using_trie() -> u64 { let mut root = sp_std::default::Default::default(); let _ = { let v = &pairs; - let mut t = TrieDBMut::::new(&mut mdb, &mut root); + let mut t = TrieDBMut::::new(&mut mdb, &mut root); for i in 0..v.len() { let key: &[u8]= &v[i].0; let val: &[u8] = &v[i].1; @@ -477,7 +479,7 @@ fn code_using_trie() -> u64 { t }; - if let Ok(trie) = TrieDB::::new(&mdb, &root) { + if let Ok(trie) = TrieDB::::new(&mdb, &root) { if let Ok(iter) = trie.iter() { let mut iter_pairs = Vec::new(); for pair in iter { diff --git a/test-utils/runtime/transaction-pool/src/lib.rs b/test-utils/runtime/transaction-pool/src/lib.rs index c7778a51da105..5140cb8b9258f 100644 --- a/test-utils/runtime/transaction-pool/src/lib.rs +++ b/test-utils/runtime/transaction-pool/src/lib.rs @@ -23,17 +23,18 @@ use codec::Encode; use parking_lot::RwLock; use sp_runtime::{ generic::{self, BlockId}, - traits::{BlakeTwo256, Hash as HashT}, + traits::{BlakeTwo256, Hash as HashT, Block as _, Header as _}, transaction_validity::{ TransactionValidity, ValidTransaction, TransactionValidityError, InvalidTransaction, TransactionSource, }, }; -use std::collections::{HashSet, HashMap}; +use std::collections::{HashSet, HashMap, BTreeMap}; use substrate_test_runtime_client::{ runtime::{Index, AccountId, Block, BlockNumber, Extrinsic, Hash, Header, Transfer}, AccountKeyring::{self, *}, }; +use sp_blockchain::CachedHeaderMetadata; /// Error type used by [`TestApi`]. #[derive(Debug, derive_more::From, derive_more::Display)] @@ -53,9 +54,8 @@ impl std::error::Error for Error { #[derive(Default)] pub struct ChainState { - pub block_by_number: HashMap>, - pub block_by_hash: HashMap>, - pub header_by_number: HashMap, + pub block_by_number: BTreeMap>, + pub block_by_hash: HashMap, pub nonces: HashMap, pub invalid_hashes: HashSet, } @@ -70,11 +70,7 @@ pub struct TestApi { impl TestApi { /// Test Api with Alice nonce set initially. pub fn with_alice_nonce(nonce: u64) -> Self { - let api = TestApi { - valid_modifier: RwLock::new(Box::new(|_| {})), - chain: Default::default(), - validation_requests: RwLock::new(Default::default()), - }; + let api = Self::empty(); api.chain.write().nonces.insert(Alice.into(), nonce); @@ -89,6 +85,9 @@ impl TestApi { validation_requests: RwLock::new(Default::default()), }; + // Push genesis block + api.push_block(0, Vec::new()); + api } @@ -97,46 +96,61 @@ impl TestApi { *self.valid_modifier.write() = modifier; } - /// Push block as a part of canonical chain under given number. + /// Push block under given number. + /// + /// If multiple blocks exists with the same block number, the first inserted block will be + /// interpreted as part of the canonical chain. pub fn push_block(&self, block_number: BlockNumber, xts: Vec) -> Header { - let mut chain = self.chain.write(); - chain.block_by_number.insert(block_number, xts.clone()); - let header = Header { - number: block_number, - digest: Default::default(), - extrinsics_root: Default::default(), - parent_hash: block_number + let parent_hash = { + let chain = self.chain.read(); + block_number .checked_sub(1) .and_then(|num| { - chain.header_by_number.get(&num) - .cloned().map(|h| h.hash()) - }).unwrap_or_default(), - state_root: Default::default(), + chain.block_by_number + .get(&num) + .map(|blocks| { + blocks[0].header.hash() + }) + }).unwrap_or_default() }; - chain.block_by_hash.insert(header.hash(), xts); - chain.header_by_number.insert(block_number, header.clone()); - header + + self.push_block_with_parent(parent_hash, xts) } - /// Push a block without a number. + /// Push a block using the given `parent`. /// - /// As a part of non-canonical chain. - pub fn push_fork_block(&self, block_hash: Hash, xts: Vec) { + /// Panics if `parent` does not exists. + pub fn push_block_with_parent( + &self, + parent: Hash, + xts: Vec, + ) -> Header { let mut chain = self.chain.write(); - chain.block_by_hash.insert(block_hash, xts); - } - pub fn push_fork_block_with_parent(&self, parent: Hash, xts: Vec) -> Header { - let mut chain = self.chain.write(); - let blocknum = chain.block_by_number.keys().max().expect("block_by_number shouldn't be empty"); + // `Hash::default()` is the genesis parent hash + let block_number = if parent == Hash::default() { + 0 + } else { + *chain.block_by_hash + .get(&parent) + .expect("`parent` exists") + .header() + .number() + 1 + }; + let header = Header { - number: *blocknum, + number: block_number, digest: Default::default(), - extrinsics_root: Default::default(), + extrinsics_root: Hash::random(), parent_hash: parent, state_root: Default::default(), }; - chain.block_by_hash.insert(header.hash(), xts); + + let hash = header.hash(); + let block = Block::new(header.clone(), xts); + chain.block_by_hash.insert(hash, block.clone()); + chain.block_by_number.entry(block_number).or_default().push(block); + header } @@ -170,11 +184,19 @@ impl TestApi { let mut chain = self.chain.write(); chain.nonces.entry(account).and_modify(|n| *n += 1).or_insert(1); } + + /// Calculate a tree route between the two given blocks. + pub fn tree_route( + &self, + from: Hash, + to: Hash, + ) -> Result, Error> { + sp_blockchain::tree_route(self, from, to) + } } impl sc_transaction_graph::ChainApi for TestApi { type Block = Block; - type Hash = Hash; type Error = Error; type ValidationFuture = futures::future::Ready>; type BodyFuture = futures::future::Ready>, Error>>; @@ -218,7 +240,14 @@ impl sc_transaction_graph::ChainApi for TestApi { &self, at: &BlockId, ) -> Result>, Error> { - Ok(Some(number_of(at))) + Ok(match at { + generic::BlockId::Hash(x) => self.chain + .read() + .block_by_hash + .get(x) + .map(|b| *b.header.number()), + generic::BlockId::Number(num) => Some(*num), + }) } fn block_id_to_hash( @@ -227,34 +256,60 @@ impl sc_transaction_graph::ChainApi for TestApi { ) -> Result>, Error> { Ok(match at { generic::BlockId::Hash(x) => Some(x.clone()), - generic::BlockId::Number(num) => { - self.chain.read() - .header_by_number.get(num) - .map(|h| h.hash()) - .or_else(|| Some(Default::default())) - }, + generic::BlockId::Number(num) => self.chain + .read() + .block_by_number + .get(num) + .map(|blocks| blocks[0].header().hash()), }) } fn hash_and_length( &self, ex: &sc_transaction_graph::ExtrinsicFor, - ) -> (Self::Hash, usize) { + ) -> (Hash, usize) { Self::hash_and_length_inner(ex) } fn block_body(&self, id: &BlockId) -> Self::BodyFuture { futures::future::ready(Ok(match id { - BlockId::Number(num) => self.chain.read().block_by_number.get(num).cloned(), - BlockId::Hash(hash) => self.chain.read().block_by_hash.get(hash).cloned(), + BlockId::Number(num) => self.chain + .read() + .block_by_number + .get(num) + .map(|b| b[0].extrinsics().to_vec()), + BlockId::Hash(hash) => self.chain + .read() + .block_by_hash + .get(hash) + .map(|b| b.extrinsics().to_vec()), })) } } -fn number_of(at: &BlockId) -> u64 { - match at { - generic::BlockId::Number(n) => *n as u64, - _ => 0, +impl sp_blockchain::HeaderMetadata for TestApi { + type Error = Error; + + fn header_metadata( + &self, + hash: Hash, + ) -> Result, Self::Error> { + let chain = self.chain.read(); + let block = chain.block_by_hash.get(&hash).expect("Hash exists"); + + Ok(block.header().into()) + } + + fn insert_header_metadata( + &self, + _: Hash, + _: CachedHeaderMetadata, + ) { + unimplemented!("Not implemented for tests") + } + + fn remove_header_metadata(&self, _: Hash) { + unimplemented!("Not implemented for tests") } } diff --git a/utils/frame/rpc/system/src/lib.rs b/utils/frame/rpc/system/src/lib.rs index 415f9541b60bd..a3ce1466f6fe9 100644 --- a/utils/frame/rpc/system/src/lib.rs +++ b/utils/frame/rpc/system/src/lib.rs @@ -222,16 +222,14 @@ mod tests { use super::*; use futures::executor::block_on; - use substrate_test_runtime_client::{ - runtime::Transfer, - AccountKeyring, - }; + use substrate_test_runtime_client::{runtime::Transfer, AccountKeyring}; use sc_transaction_pool::{BasicPool, FullChainApi}; #[test] fn should_return_next_nonce_for_some_account() { - // given let _ = env_logger::try_init(); + + // given let client = Arc::new(substrate_test_runtime_client::new()); let pool = Arc::new( BasicPool::new(