Skip to content

Commit

Permalink
Move template tasks into Frontier. (paritytech#314)
Browse files Browse the repository at this point in the history
* Pending transactions task

* Filter pool task + simpler trait bounds

* Trim trailing whitespaces
  • Loading branch information
nanocryk authored Mar 9, 2021
1 parent d990d20 commit 03ce327
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 43 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ fc-consensus = { path = "../consensus" }
fc-db = { path = "../db" }
fc-rpc-core = { path = "../rpc-core" }
fp-rpc = { path = "../../primitives/rpc" }
fp-consensus = { path = "../../primitives/consensus" }
sp-io = { git = "https://github.com/paritytech/substrate.git", branch = "frontier" }
sp-runtime = { git = "https://github.com/paritytech/substrate.git", branch = "frontier" }
sp-api = { git = "https://github.com/paritytech/substrate.git", branch = "frontier" }
Expand Down
88 changes: 83 additions & 5 deletions client/rpc/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,22 @@
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::{marker::PhantomData, sync::Arc};
use std::collections::BTreeMap;
use std::{marker::PhantomData, sync::{Mutex, Arc}};
use std::collections::{HashMap, BTreeMap};
use ethereum::{
Block as EthereumBlock, Transaction as EthereumTransaction
};
use ethereum_types::{H160, H256, H64, U256, U64, H512};
use jsonrpc_core::{BoxFuture, Result, futures::future::{self, Future}};
use futures::future::TryFutureExt;
use futures::{StreamExt, future::TryFutureExt};
use sp_runtime::{
traits::{Block as BlockT, UniqueSaturatedInto, Zero, One, Saturating, BlakeTwo256},
transaction_validity::TransactionSource
transaction_validity::TransactionSource,
generic::OpaqueDigestItemId
};
use sp_api::{ProvideRuntimeApi, BlockId, Core, HeaderT};
use sp_transaction_pool::{TransactionPool, InPoolTransaction};
use sc_client_api::backend::{StorageProvider, Backend, StateBackend, AuxStore};
use sc_client_api::{client::BlockchainEvents, backend::{StorageProvider, Backend, StateBackend, AuxStore}};
use sha3::{Keccak256, Digest};
use sp_blockchain::{Error as BlockChainError, HeaderMetadata, HeaderBackend};
use sc_network::{NetworkService, ExHashT};
Expand Down Expand Up @@ -1533,3 +1534,80 @@ impl<B, C> EthFilterApiT for EthFilterApi<B, C> where
response
}
}

pub struct EthTask<B, C>(PhantomData<(B, C)>);

impl<B, C> EthTask<B, C>
where
C: ProvideRuntimeApi<B> + BlockchainEvents<B>,
B: BlockT,
{
pub async fn pending_transaction_task(
client: Arc<C>,
pending_transactions: Arc<Mutex<HashMap<H256, PendingTransaction>>>,
retain_threshold: u64
) {
let mut notification_st = client.import_notification_stream();

while let Some(notification) = notification_st.next().await {
if let Ok(mut pending_transactions) = pending_transactions.lock() {
// As pending transactions have a finite lifespan anyway
// we can ignore MultiplePostRuntimeLogs error checks.
let log = fp_consensus::find_log(&notification.header.digest()).ok();
let post_hashes = log.map(|log| log.into_hashes());

if let Some(post_hashes) = post_hashes {
// Retain all pending transactions that were not
// processed in the current block.
pending_transactions.retain(|&k, _| !post_hashes.transaction_hashes.contains(&k));
}

let imported_number: u64 = UniqueSaturatedInto::<u64>::unique_saturated_into(
*notification.header.number()
);

pending_transactions.retain(|_, v| {
// Drop all the transactions that exceeded the given lifespan.
let lifespan_limit = v.at_block + retain_threshold;
lifespan_limit > imported_number
});
}
}
}

pub async fn filter_pool_task(
client: Arc<C>,
filter_pool: Arc<Mutex<BTreeMap<U256, FilterPoolItem>>>,
retain_threshold: u64
) {
let mut notification_st = client.import_notification_stream();

while let Some(notification) = notification_st.next().await {
if let Ok(filter_pool) = &mut filter_pool.lock() {
let imported_number: u64 = UniqueSaturatedInto::<u64>::unique_saturated_into(
*notification.header.number()
);

// BTreeMap::retain is unstable :c.
// 1. We collect all keys to remove.
// 2. We remove them.
let remove_list: Vec<_> = filter_pool
.iter()
.filter_map(|(&k, v)| {
let lifespan_limit = v.at_block + retain_threshold;
if lifespan_limit <= imported_number {
Some(k)
} else {
None
}
})
.collect();

for key in remove_list {
filter_pool.remove(&key);
}
}
}
}
}

3 changes: 2 additions & 1 deletion client/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ mod eth;
mod eth_pubsub;

pub use eth::{
EthApi, EthApiServer, EthFilterApi, EthFilterApiServer, NetApi, NetApiServer, Web3Api, Web3ApiServer
EthApi, EthApiServer, EthFilterApi, EthFilterApiServer, NetApi, NetApiServer, Web3Api, Web3ApiServer,
EthTask,
};
pub use eth_pubsub::{EthPubSubApi, EthPubSubApiServer, HexEncodedIdProvider};

Expand Down
50 changes: 13 additions & 37 deletions template/node/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
use std::{sync::{Arc, Mutex}, cell::RefCell, time::Duration, collections::{HashMap, BTreeMap}};
use fc_rpc::EthTask;
use fc_rpc_core::types::{FilterPool, PendingTransactions};
use sc_client_api::{ExecutorProvider, RemoteBackend, BlockchainEvents};
use sc_consensus_manual_seal::{self as manual_seal};
Expand Down Expand Up @@ -272,54 +273,29 @@ pub fn new_full(
})?;

// Spawn Frontier EthFilterApi maintenance task.
if filter_pool.is_some() {
if let Some(filter_pool) = filter_pool {
// Each filter is allowed to stay in the pool for 100 blocks.
const FILTER_RETAIN_THRESHOLD: u64 = 100;
task_manager.spawn_essential_handle().spawn(
"frontier-filter-pool",
client.import_notification_stream().for_each(move |notification| {
if let Ok(locked) = &mut filter_pool.clone().unwrap().lock() {
let imported_number: u64 = notification.header.number as u64;
for (k, v) in locked.clone().iter() {
let lifespan_limit = v.at_block + FILTER_RETAIN_THRESHOLD;
if lifespan_limit <= imported_number {
locked.remove(&k);
}
}
}
futures::future::ready(())
})
EthTask::filter_pool_task(
Arc::clone(&client),
filter_pool,
FILTER_RETAIN_THRESHOLD,
)
);
}

// Spawn Frontier pending transactions maintenance task (as essential, otherwise we leak).
if pending_transactions.is_some() {
if let Some(pending_transactions) = pending_transactions {
const TRANSACTION_RETAIN_THRESHOLD: u64 = 5;
task_manager.spawn_essential_handle().spawn(
"frontier-pending-transactions",
client.import_notification_stream().for_each(move |notification| {
if let Ok(locked) = &mut pending_transactions.clone().unwrap().lock() {
// As pending transactions have a finite lifespan anyway
// we can ignore MultiplePostRuntimeLogs error checks.
let log = fp_consensus::find_log(&notification.header.digest).ok();
let post_hashes = log.map(|log| log.into_hashes());

if let Some(post_hashes) = post_hashes {
// Retain all pending transactions that were not
// processed in the current block.
locked.retain(|&k, _| !post_hashes.transaction_hashes.contains(&k));
}

let imported_number: u64 = notification.header.number as u64;

locked.retain(|_, v| {
// Drop all the transactions that exceeded the given lifespan.
let lifespan_limit = v.at_block + TRANSACTION_RETAIN_THRESHOLD;
lifespan_limit > imported_number
});
}
futures::future::ready(())
})
EthTask::pending_transaction_task(
Arc::clone(&client),
pending_transactions,
TRANSACTION_RETAIN_THRESHOLD,
)
);
}

Expand Down

0 comments on commit 03ce327

Please sign in to comment.