Skip to content

Commit

Permalink
TransactionPool API uses async_trait (#6528)
Browse files Browse the repository at this point in the history
This PR refactors `TransactionPool` API to use `async_trait`, replacing
the` Pin<Box<...>>` pattern. This should improve readability and
maintainability.

The change is not altering any functionality.

---------

Co-authored-by: GitHub Action <action@github.com>
  • Loading branch information
michalkucharczyk and actions-user authored Nov 19, 2024
1 parent b71bd53 commit 5721e55
Show file tree
Hide file tree
Showing 14 changed files with 268 additions and 333 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions prdoc/pr_6528.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
title: 'TransactionPool API uses async_trait'
doc:
- audience: Node Dev
description: |-
This PR refactors `TransactionPool` API to use `async_trait`, replacing the` Pin<Box<...>>` pattern. This should improve readability and maintainability.

The change is not altering any functionality.
crates:
- name: sc-rpc-spec-v2
bump: minor
- name: sc-service
bump: minor
- name: sc-transaction-pool-api
bump: major
- name: sc-transaction-pool
bump: major
- name: sc-rpc
bump: minor
1 change: 1 addition & 0 deletions substrate/bin/node/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = { workspace = true }
array-bytes = { workspace = true, default-features = true }
clap = { features = ["derive"], workspace = true }
log = { workspace = true, default-features = true }
Expand Down
48 changes: 17 additions & 31 deletions substrate/bin/node/bench/src/construct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
//! DO NOT depend on user input). Thus transaction generation should be
//! based on randomized data.

use futures::Future;
use std::{borrow::Cow, collections::HashMap, pin::Pin, sync::Arc};

use async_trait::async_trait;
use node_primitives::Block;
use node_testing::bench::{BenchDb, BlockType, DatabaseType, KeyTypes};
use sc_transaction_pool_api::{
ImportNotificationStream, PoolFuture, PoolStatus, ReadyTransactions, TransactionFor,
TransactionSource, TransactionStatusStreamFor, TxHash,
ImportNotificationStream, PoolStatus, ReadyTransactions, TransactionFor, TransactionSource,
TransactionStatusStreamFor, TxHash,
};
use sp_consensus::{Environment, Proposer};
use sp_inherents::InherentDataProvider;
Expand Down Expand Up @@ -224,54 +224,47 @@ impl ReadyTransactions for TransactionsIterator {
fn report_invalid(&mut self, _tx: &Self::Item) {}
}

#[async_trait]
impl sc_transaction_pool_api::TransactionPool for Transactions {
type Block = Block;
type Hash = node_primitives::Hash;
type InPoolTransaction = PoolTransaction;
type Error = sc_transaction_pool_api::error::Error;

/// Returns a future that imports a bunch of unverified transactions to the pool.
fn submit_at(
/// Asynchronously imports a bunch of unverified transactions to the pool.
async fn submit_at(
&self,
_at: Self::Hash,
_source: TransactionSource,
_xts: Vec<TransactionFor<Self>>,
) -> PoolFuture<Vec<Result<node_primitives::Hash, Self::Error>>, Self::Error> {
) -> Result<Vec<Result<node_primitives::Hash, Self::Error>>, Self::Error> {
unimplemented!()
}

/// Returns a future that imports one unverified transaction to the pool.
fn submit_one(
/// Asynchronously imports one unverified transaction to the pool.
async fn submit_one(
&self,
_at: Self::Hash,
_source: TransactionSource,
_xt: TransactionFor<Self>,
) -> PoolFuture<TxHash<Self>, Self::Error> {
) -> Result<TxHash<Self>, Self::Error> {
unimplemented!()
}

fn submit_and_watch(
async fn submit_and_watch(
&self,
_at: Self::Hash,
_source: TransactionSource,
_xt: TransactionFor<Self>,
) -> PoolFuture<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
unimplemented!()
}

fn ready_at(
async fn ready_at(
&self,
_at: Self::Hash,
) -> Pin<
Box<
dyn Future<
Output = Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>,
> + Send,
>,
> {
let iter: Box<dyn ReadyTransactions<Item = Arc<PoolTransaction>> + Send> =
Box::new(TransactionsIterator(self.0.clone().into_iter()));
Box::pin(futures::future::ready(iter))
) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send> {
Box::new(TransactionsIterator(self.0.clone().into_iter()))
}

fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send> {
Expand Down Expand Up @@ -306,18 +299,11 @@ impl sc_transaction_pool_api::TransactionPool for Transactions {
unimplemented!()
}

fn ready_at_with_timeout(
async fn ready_at_with_timeout(
&self,
_at: Self::Hash,
_timeout: std::time::Duration,
) -> Pin<
Box<
dyn Future<
Output = Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>,
> + Send
+ '_,
>,
> {
) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send> {
unimplemented!()
}
}
1 change: 1 addition & 0 deletions substrate/client/rpc-spec-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ rand = { workspace = true, default-features = true }
schnellru = { workspace = true }

[dev-dependencies]
async-trait = { workspace = true }
jsonrpsee = { workspace = true, features = ["server", "ws-client"] }
serde_json = { workspace = true, default-features = true }
tokio = { features = ["macros"], workspace = true, default-features = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use async_trait::async_trait;
use codec::Encode;
use futures::Future;
use sc_transaction_pool::BasicPool;
use sc_transaction_pool_api::{
ImportNotificationStream, PoolFuture, PoolStatus, ReadyTransactions, TransactionFor,
TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash,
ImportNotificationStream, PoolStatus, ReadyTransactions, TransactionFor, TransactionPool,
TransactionSource, TransactionStatusStreamFor, TxHash,
};

use crate::hex_string;
use futures::{FutureExt, StreamExt};
use futures::StreamExt;

use sp_runtime::traits::Block as BlockT;
use std::{collections::HashMap, pin::Pin, sync::Arc};
Expand Down Expand Up @@ -77,67 +77,64 @@ impl MiddlewarePool {
}
}

#[async_trait]
impl TransactionPool for MiddlewarePool {
type Block = <BasicPool<TestApi, Block> as TransactionPool>::Block;
type Hash = <BasicPool<TestApi, Block> as TransactionPool>::Hash;
type InPoolTransaction = <BasicPool<TestApi, Block> as TransactionPool>::InPoolTransaction;
type Error = <BasicPool<TestApi, Block> as TransactionPool>::Error;

fn submit_at(
async fn submit_at(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
xts: Vec<TransactionFor<Self>>,
) -> PoolFuture<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
self.inner_pool.submit_at(at, source, xts)
) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
self.inner_pool.submit_at(at, source, xts).await
}

fn submit_one(
async fn submit_one(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
xt: TransactionFor<Self>,
) -> PoolFuture<TxHash<Self>, Self::Error> {
self.inner_pool.submit_one(at, source, xt)
) -> Result<TxHash<Self>, Self::Error> {
self.inner_pool.submit_one(at, source, xt).await
}

fn submit_and_watch(
async fn submit_and_watch(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
xt: TransactionFor<Self>,
) -> PoolFuture<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
let pool = self.inner_pool.clone();
let sender = self.sender.clone();
) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
let transaction = hex_string(&xt.encode());
let sender = self.sender.clone();

async move {
let watcher = match pool.submit_and_watch(at, source, xt).await {
Ok(watcher) => watcher,
Err(err) => {
let _ = sender.send(MiddlewarePoolEvent::PoolError {
transaction: transaction.clone(),
err: err.to_string(),
});
return Err(err);
},
};

let watcher = watcher.map(move |status| {
let sender = sender.clone();
let transaction = transaction.clone();

let _ = sender.send(MiddlewarePoolEvent::TransactionStatus {
transaction,
status: status.clone(),
let watcher = match self.inner_pool.submit_and_watch(at, source, xt).await {
Ok(watcher) => watcher,
Err(err) => {
let _ = sender.send(MiddlewarePoolEvent::PoolError {
transaction: transaction.clone(),
err: err.to_string(),
});
return Err(err);
},
};

let watcher = watcher.map(move |status| {
let sender = sender.clone();
let transaction = transaction.clone();

status
let _ = sender.send(MiddlewarePoolEvent::TransactionStatus {
transaction,
status: status.clone(),
});

Ok(watcher.boxed())
}
.boxed()
status
});

Ok(watcher.boxed())
}

fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
Expand All @@ -164,17 +161,11 @@ impl TransactionPool for MiddlewarePool {
self.inner_pool.ready_transaction(hash)
}

fn ready_at(
async fn ready_at(
&self,
at: <Self::Block as BlockT>::Hash,
) -> Pin<
Box<
dyn Future<
Output = Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>,
> + Send,
>,
> {
self.inner_pool.ready_at(at)
) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send> {
self.inner_pool.ready_at(at).await
}

fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send> {
Expand All @@ -185,18 +176,11 @@ impl TransactionPool for MiddlewarePool {
self.inner_pool.futures()
}

fn ready_at_with_timeout(
async fn ready_at_with_timeout(
&self,
at: <Self::Block as BlockT>::Hash,
_timeout: std::time::Duration,
) -> Pin<
Box<
dyn Future<
Output = Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>,
> + Send
+ '_,
>,
> {
self.inner_pool.ready_at(at)
) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send> {
self.inner_pool.ready_at(at).await
}
}
17 changes: 9 additions & 8 deletions substrate/client/rpc/src/author/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::{
};

use codec::{Decode, Encode};
use futures::TryFutureExt;
use jsonrpsee::{core::async_trait, types::ErrorObject, Extensions, PendingSubscriptionSink};
use sc_rpc_api::check_if_safe;
use sc_transaction_pool_api::{
Expand Down Expand Up @@ -191,14 +190,16 @@ where
},
};

let submit = self.pool.submit_and_watch(best_block_hash, TX_SOURCE, dxt).map_err(|e| {
e.into_pool_error()
.map(error::Error::from)
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)))
});

let pool = self.pool.clone();
let fut = async move {
let stream = match submit.await {
let submit =
pool.submit_and_watch(best_block_hash, TX_SOURCE, dxt).await.map_err(|e| {
e.into_pool_error()
.map(error::Error::from)
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)))
});

let stream = match submit {
Ok(stream) => stream,
Err(err) => {
let _ = pending.reject(ErrorObject::from(err)).await;
Expand Down
16 changes: 10 additions & 6 deletions substrate/client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,13 +528,17 @@ where
};

let start = std::time::Instant::now();
let import_future = self.pool.submit_one(
self.client.info().best_hash,
sc_transaction_pool_api::TransactionSource::External,
uxt,
);
let pool = self.pool.clone();
let client = self.client.clone();
Box::pin(async move {
match import_future.await {
match pool
.submit_one(
client.info().best_hash,
sc_transaction_pool_api::TransactionSource::External,
uxt,
)
.await
{
Ok(_) => {
let elapsed = start.elapsed();
debug!(target: sc_transaction_pool::LOG_TARGET, "import transaction: {elapsed:?}");
Expand Down
Loading

0 comments on commit 5721e55

Please sign in to comment.