diff --git a/CHANGELOG.md b/CHANGELOG.md index 85a31d0825..2ab4b1b71f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Description of the upcoming release here. ### Added +- [#1228](https://github.com/FuelLabs/fuel-core/pull/1228): Add a new capability of sharing pooled transactions when two nodes connect. - [#1432](https://github.com/FuelLabs/fuel-core/pull/1432): Add a new `--api-request-timeout` argument to control TTL for GraphQL requests. - [#1419](https://github.com/FuelLabs/fuel-core/pull/1419): Add additional "sanity" benchmarks for arithmetic op code instructions. - [#1411](https://github.com/FuelLabs/fuel-core/pull/1411): Added WASM and `no_std` compatibility diff --git a/Cargo.lock b/Cargo.lock index 700302fe7a..8bb558933b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3162,6 +3162,7 @@ dependencies = [ "async-trait", "fuel-core-chain-config", "fuel-core-metrics", + "fuel-core-p2p", "fuel-core-services", "fuel-core-storage", "fuel-core-trace", @@ -3171,6 +3172,7 @@ dependencies = [ "itertools 0.10.5", "mockall", "parking_lot 0.12.1", + "postcard", "proptest", "rstest", "test-strategy", diff --git a/bin/e2e-test-client/tests/integration_tests.rs b/bin/e2e-test-client/tests/integration_tests.rs index 530df1a66d..b311abd804 100644 --- a/bin/e2e-test-client/tests/integration_tests.rs +++ b/bin/e2e-test-client/tests/integration_tests.rs @@ -41,6 +41,7 @@ async fn works_in_multinode_local_env() { let mut rng = StdRng::seed_from_u64(line!() as u64); let secret = SecretKey::random(&mut rng); let pub_key = Input::owner(&secret.public_key()); + let disable_block_production = false; let Nodes { mut producers, mut validators, @@ -51,6 +52,7 @@ async fn works_in_multinode_local_env() { ProducerSetup::new(secret).with_txs(1).with_name("Alice"), )], [Some(ValidatorSetup::new(pub_key).with_name("Bob"))], + disable_block_production, ) .await; diff --git a/crates/fuel-core/src/p2p_test_helpers.rs b/crates/fuel-core/src/p2p_test_helpers.rs index 92dc84f038..0d8379d49c 100644 --- a/crates/fuel-core/src/p2p_test_helpers.rs +++ b/crates/fuel-core/src/p2p_test_helpers.rs @@ -148,6 +148,7 @@ pub async fn make_nodes( bootstrap_setup: impl IntoIterator>, producers_setup: impl IntoIterator>, validators_setup: impl IntoIterator>, + disable_block_production: bool, ) -> Nodes { let producers: Vec<_> = producers_setup.into_iter().collect(); @@ -227,6 +228,7 @@ pub async fn make_nodes( .then_some(name) .unwrap_or_else(|| format!("b:{i}")), chain_config.clone(), + disable_block_production, ); if let Some(BootstrapSetup { pub_key, .. }) = boot { match &mut node_config.chain_conf.consensus { @@ -252,6 +254,7 @@ pub async fn make_nodes( .then_some(name) .unwrap_or_else(|| format!("p:{i}")), chain_config.clone(), + disable_block_production, ); let mut test_txs = Vec::with_capacity(0); @@ -284,6 +287,7 @@ pub async fn make_nodes( .then_some(name) .unwrap_or_else(|| format!("v:{i}")), chain_config.clone(), + disable_block_production, ); node_config.block_production = Trigger::Never; node_config.p2p.as_mut().unwrap().bootstrap_nodes = boots.clone(); @@ -305,10 +309,17 @@ pub async fn make_nodes( } } -pub fn make_config(name: String, chain_config: ChainConfig) -> Config { +pub fn make_config( + name: String, + chain_config: ChainConfig, + disable_block_production: bool, +) -> Config { let mut node_config = Config::local_node(); node_config.chain_conf = chain_config; node_config.utxo_validation = true; + if disable_block_production { + node_config.block_production = Trigger::Never; + } node_config.name = name; node_config } diff --git a/crates/fuel-core/src/service/adapters/txpool.rs b/crates/fuel-core/src/service/adapters/txpool.rs index 3dd9ba9c08..22e8d06364 100644 --- a/crates/fuel-core/src/service/adapters/txpool.rs +++ b/crates/fuel-core/src/service/adapters/txpool.rs @@ -5,6 +5,7 @@ use crate::{ P2PAdapter, }, }; + use fuel_core_services::stream::BoxStream; use fuel_core_storage::{ not_found, @@ -37,6 +38,7 @@ use fuel_core_types::{ p2p::{ GossipsubMessageAcceptance, GossipsubMessageInfo, + PeerId, TransactionGossipData, }, }, @@ -57,6 +59,7 @@ impl BlockImporter for BlockImporterAdapter { } #[cfg(feature = "p2p")] +#[async_trait::async_trait] impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter { type GossipedTransaction = TransactionGossipData; @@ -68,6 +71,20 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter { } } + async fn send_pooled_transactions( + &self, + peer_id: PeerId, + transactions: Vec, + ) -> anyhow::Result<()> { + if let Some(service) = &self.service { + service + .send_pooled_transactions_to_peer(peer_id.into(), transactions) + .await + } else { + Ok(()) + } + } + fn gossiped_transaction_events(&self) -> BoxStream { use tokio_stream::{ wrappers::BroadcastStream, @@ -94,9 +111,40 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter { Ok(()) } } + + fn incoming_pooled_transactions(&self) -> BoxStream> { + use tokio_stream::{ + wrappers::BroadcastStream, + StreamExt, + }; + if let Some(service) = &self.service { + Box::pin( + BroadcastStream::new(service.subscribe_to_incoming_pooled_transactions()) + .filter_map(|result| result.ok()), + ) + } else { + fuel_core_services::stream::IntoBoxStream::into_boxed(tokio_stream::pending()) + } + } + + fn new_connection(&self) -> BoxStream { + use tokio_stream::{ + wrappers::BroadcastStream, + StreamExt, + }; + if let Some(service) = &self.service { + Box::pin( + BroadcastStream::new(service.subscribe_to_connections()) + .filter_map(|result| result.ok()), + ) + } else { + fuel_core_services::stream::IntoBoxStream::into_boxed(tokio_stream::pending()) + } + } } #[cfg(not(feature = "p2p"))] +#[async_trait::async_trait] impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter { type GossipedTransaction = TransactionGossipData; @@ -118,6 +166,22 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter { ) -> anyhow::Result<()> { Ok(()) } + + fn new_connection(&self) -> BoxStream { + Box::pin(fuel_core_services::stream::pending()) + } + + fn incoming_pooled_transactions(&self) -> BoxStream> { + Box::pin(fuel_core_services::stream::pending()) + } + + async fn send_pooled_transactions( + &self, + _peer_id: PeerId, + _transactions: Vec, + ) -> anyhow::Result<()> { + Ok(()) + } } impl fuel_core_txpool::ports::TxPoolDb for Database { diff --git a/crates/services/p2p/src/lib.rs b/crates/services/p2p/src/lib.rs index 83ad440dfd..af8c545182 100644 --- a/crates/services/p2p/src/lib.rs +++ b/crates/services/p2p/src/lib.rs @@ -8,7 +8,7 @@ mod p2p_service; mod peer_manager; mod peer_report; pub mod ports; -mod request_response; +pub mod request_response; pub mod service; pub use gossipsub::config as gossipsub_config; diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 0fe5a156a0..8f34440d2c 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -298,6 +298,20 @@ impl FuelP2PService { } } + /// Sends a one way message to a peer. + /// It leverages the `RequestMessage` type to send the message. + /// But, unlike `send_request_msg`, it does not expect a + /// response through a response channel. + pub fn send_msg( + &mut self, + peer_id: PeerId, + message_request: RequestMessage, + ) -> RequestId { + self.swarm + .behaviour_mut() + .send_request_msg(message_request, &peer_id) + } + /// Sends RequestMessage to a peer /// If the peer is not defined it will pick one at random pub fn send_request_msg( @@ -783,7 +797,23 @@ mod tests { #[tokio::test] #[instrument] async fn p2p_service_works() { - build_service_from_config(Config::default_initialized("p2p_service_works")).await; + let mut fuel_p2p_service = + build_service_from_config(Config::default_initialized("p2p_service_works")) + .await; + + loop { + match fuel_p2p_service.swarm.select_next_some().await { + SwarmEvent::NewListenAddr { .. } => { + // listener address registered, we are good to go + break + } + SwarmEvent::Behaviour(_) => {} + other_event => { + tracing::error!("Unexpected event: {:?}", other_event); + panic!("Unexpected event {other_event:?}") + } + } + } } // Single sentry node connects to multiple reserved nodes and `max_peers_allowed` amount of non-reserved nodes. @@ -1623,6 +1653,12 @@ mod tests { } }); } + RequestMessage::PooledTransactions(_) => { + // This test case isn't applicable here because + // we send a one way message containing the pooled + // transactions. The node doesn't respond with anything + // so we can't test the response. + } } } } @@ -1654,6 +1690,12 @@ mod tests { let transactions = vec![Transactions(txs)]; let _ = node_b.send_response_msg(*request_id, OutboundResponse::Transactions(Some(Arc::new(transactions)))); } + RequestMessage::PooledTransactions(_) => { + // This test case isn't applicable here because + // we send a one way message containing the pooled + // transactions. The node doesn't respond with anything + // so we can't test the response. + } } } diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index 39ea25405e..453ad80801 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -8,6 +8,7 @@ use fuel_core_types::{ SealedBlock, SealedBlockHeader, }, + fuel_tx::Transaction, fuel_types::BlockHeight, services::p2p::Transactions, }; @@ -22,7 +23,7 @@ use tokio::sync::oneshot; pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID: &[u8] = b"/fuel/req_res/0.0.1"; /// Max Size in Bytes of the Request Message -pub(crate) const MAX_REQUEST_SIZE: usize = core::mem::size_of::(); +pub const MAX_REQUEST_SIZE: usize = 32 * 1024; // Peer receives a `RequestMessage`. // It prepares a response in form of `OutboundResponse` @@ -38,6 +39,7 @@ pub(crate) const MAX_REQUEST_SIZE: usize = core::mem::size_of::( pub enum RequestMessage { Block(BlockHeight), SealedHeaders(Range), + PooledTransactions(Vec), Transactions(Range), } diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 3a4a25940d..be7f87f0b8 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -104,6 +104,10 @@ enum TaskRequest { block_height_range: Range, channel: oneshot::Sender<(PeerId, Option>)>, }, + SendPooledTransactions { + to_peer: PeerId, + transactions: Vec, + }, GetTransactions { block_height_range: Range, from_peer: PeerId, @@ -141,6 +145,13 @@ pub trait TaskP2PService: Send { &mut self, message: GossipsubBroadcastRequest, ) -> anyhow::Result<()>; + + fn send_msg( + &mut self, + peer_id: PeerId, + request_msg: RequestMessage, + ) -> anyhow::Result<()>; + fn send_request_msg( &mut self, peer_id: Option, @@ -194,6 +205,16 @@ impl TaskP2PService for FuelP2PService { Ok(()) } + /// Sends a one way request message to a peer. + fn send_msg( + &mut self, + peer_id: PeerId, + request_msg: RequestMessage, + ) -> anyhow::Result<()> { + self.send_msg(peer_id, request_msg); + Ok(()) + } + fn send_request_msg( &mut self, peer_id: Option, @@ -252,6 +273,13 @@ pub trait Broadcast: Send { ) -> anyhow::Result<()>; fn tx_broadcast(&self, transaction: TransactionGossipData) -> anyhow::Result<()>; + + fn connection_broadcast(&self, peer_id: FuelPeerId) -> anyhow::Result<()>; + + fn incoming_pooled_transactions( + &self, + transactions: Vec, + ) -> anyhow::Result<()>; } impl Broadcast for SharedState { @@ -276,6 +304,19 @@ impl Broadcast for SharedState { self.tx_broadcast.send(transaction)?; Ok(()) } + + fn connection_broadcast(&self, peer_id: FuelPeerId) -> anyhow::Result<()> { + self.connection_broadcast.send(peer_id)?; + Ok(()) + } + + fn incoming_pooled_transactions( + &self, + transactions: Vec, + ) -> anyhow::Result<()> { + self.incoming_pooled_transactions.send(transactions)?; + Ok(()) + } } /// Orchestrates various p2p-related events between the inner `P2pService` @@ -321,6 +362,8 @@ impl Task, D, SharedState> { let (request_sender, request_receiver) = mpsc::channel(100); let (tx_broadcast, _) = broadcast::channel(100); let (block_height_broadcast, _) = broadcast::channel(100); + let (connection_broadcast, _) = broadcast::channel(100); + let (incoming_pooled_transactions, _) = broadcast::channel(100); // Hardcoded for now, but left here to be configurable in the future. // TODO: https://github.com/FuelLabs/fuel-core/issues/1340 @@ -348,6 +391,8 @@ impl Task, D, SharedState> { tx_broadcast, reserved_peers_broadcast, block_height_broadcast, + connection_broadcast, + incoming_pooled_transactions, }, max_headers_per_request, heartbeat_check_interval, @@ -453,6 +498,10 @@ where next_service_request = self.request_receiver.recv() => { should_continue = true; match next_service_request { + Some(TaskRequest::SendPooledTransactions { to_peer, transactions }) => { + let request_msg = RequestMessage::PooledTransactions(transactions); + let _ = self.p2p_service.send_msg(to_peer, request_msg); + } Some(TaskRequest::BroadcastTransaction(transaction)) => { let tx_id = transaction.id(&self.chain_id); let broadcast = GossipsubBroadcastRequest::NewTx(transaction); @@ -516,6 +565,13 @@ where p2p_event = self.p2p_service.next_event() => { should_continue = true; match p2p_event { + Some(FuelP2PEvent::PeerConnected (peer_id) ) => { + // Send the peer ID that connected to this node to a + // channel that the TxPoolSync service is listening to. + // This is the first step of the protocol for the initial + // pool sync between two nodes. + let _ = self.broadcast.connection_broadcast(FuelPeerId::from(peer_id.to_bytes())); + } Some(FuelP2PEvent::PeerInfoUpdated { peer_id, block_height }) => { let peer_id: Vec = peer_id.into(); let block_height_data = BlockHeightHeartbeatData { @@ -573,6 +629,11 @@ where } } } + RequestMessage::PooledTransactions(transactions) => { + // Received pooled transactions from a peer. Send those + // transactions to the txpool service. + self.broadcast.incoming_pooled_transactions(transactions)?; + } RequestMessage::SealedHeaders(range) => { let max_len = self.max_headers_per_request.try_into().expect("u32 should always fit into usize"); if range.len() > max_len { @@ -647,6 +708,10 @@ pub struct SharedState { request_sender: mpsc::Sender, /// Sender of p2p blopck height data block_height_broadcast: broadcast::Sender, + /// Sender of new incoming connections + connection_broadcast: broadcast::Sender, + /// Sender of incoming pooled Transactions + incoming_pooled_transactions: broadcast::Sender>, } impl SharedState { @@ -722,6 +787,23 @@ impl SharedState { receiver.await.map_err(|e| anyhow!("{}", e)) } + pub async fn send_pooled_transactions_to_peer( + &self, + peer_id: Vec, + transactions: Vec, + ) -> anyhow::Result<()> { + let to_peer = PeerId::from_bytes(&peer_id).expect("Valid PeerId"); + + self.request_sender + .send(TaskRequest::SendPooledTransactions { + to_peer, + transactions, + }) + .await?; + + Ok(()) + } + pub fn broadcast_vote(&self, vote: Arc) -> anyhow::Result<()> { self.request_sender .try_send(TaskRequest::BroadcastVote(vote))?; @@ -769,6 +851,16 @@ impl SharedState { self.reserved_peers_broadcast.subscribe() } + pub fn subscribe_to_connections(&self) -> broadcast::Receiver { + self.connection_broadcast.subscribe() + } + + pub fn subscribe_to_incoming_pooled_transactions( + &self, + ) -> broadcast::Receiver> { + self.incoming_pooled_transactions.subscribe() + } + pub fn report_peer( &self, peer_id: FuelPeerId, @@ -940,6 +1032,14 @@ pub mod tests { std::future::pending().boxed() } + fn send_msg( + &mut self, + _peer_id: PeerId, + _request_msg: RequestMessage, + ) -> anyhow::Result<()> { + todo!() + } + fn publish_message( &mut self, _message: GossipsubBroadcastRequest, @@ -1044,6 +1144,17 @@ pub mod tests { todo!() } + fn incoming_pooled_transactions( + &self, + _transactions: Vec, + ) -> anyhow::Result<()> { + todo!() + } + + fn connection_broadcast(&self, _peer_id: FuelPeerId) -> anyhow::Result<()> { + todo!() + } + fn tx_broadcast( &self, _transaction: TransactionGossipData, diff --git a/crates/services/src/lib.rs b/crates/services/src/lib.rs index ae7adf2235..63a18e6a5f 100644 --- a/crates/services/src/lib.rs +++ b/crates/services/src/lib.rs @@ -10,6 +10,7 @@ mod state; pub mod stream { #[doc(no_inline)] pub use futures::stream::{ + empty, pending, unfold, Stream, diff --git a/crates/services/txpool/Cargo.toml b/crates/services/txpool/Cargo.toml index 9c07108646..600efe2417 100644 --- a/crates/services/txpool/Cargo.toml +++ b/crates/services/txpool/Cargo.toml @@ -15,11 +15,13 @@ anyhow = { workspace = true } async-trait = { workspace = true } fuel-core-chain-config = { workspace = true } fuel-core-metrics = { workspace = true } +fuel-core-p2p = { workspace = true } fuel-core-services = { workspace = true } fuel-core-storage = { workspace = true } fuel-core-types = { workspace = true } futures = { workspace = true } parking_lot = { workspace = true } +postcard = { workspace = true, features = ["use-std"] } tokio = { workspace = true, default-features = false, features = ["sync"] } tokio-rayon = { workspace = true } tokio-stream = { workspace = true } diff --git a/crates/services/txpool/src/ports.rs b/crates/services/txpool/src/ports.rs index de51f429e9..3d040664c5 100644 --- a/crates/services/txpool/src/ports.rs +++ b/crates/services/txpool/src/ports.rs @@ -21,12 +21,14 @@ use fuel_core_types::{ GossipsubMessageAcceptance, GossipsubMessageInfo, NetworkData, + PeerId, }, txpool::TransactionStatus, }, }; use std::sync::Arc; +#[async_trait::async_trait] pub trait PeerToPeer: Send + Sync { type GossipedTransaction: NetworkData; @@ -42,6 +44,19 @@ pub trait PeerToPeer: Send + Sync { message_info: GossipsubMessageInfo, validity: GossipsubMessageAcceptance, ) -> anyhow::Result<()>; + + /// Streams new connections to the node. + fn new_connection(&self) -> BoxStream; + + /// Streams incoming pooled transactions. + fn incoming_pooled_transactions(&self) -> BoxStream>; + + /// Send pooled transactions to a peer. + async fn send_pooled_transactions( + &self, + peer_id: PeerId, + transactions: Vec, + ) -> anyhow::Result<()>; } pub trait BlockImporter: Send + Sync { diff --git a/crates/services/txpool/src/service.rs b/crates/services/txpool/src/service.rs index d93ec166e0..fee5aba21a 100644 --- a/crates/services/txpool/src/service.rs +++ b/crates/services/txpool/src/service.rs @@ -15,10 +15,16 @@ use crate::{ TxPool, }; +use fuel_core_p2p::request_response::messages::{ + RequestMessage, + MAX_REQUEST_SIZE, +}; + use fuel_core_services::{ stream::BoxStream, RunnableService, RunnableTask, + Service as _, ServiceRunner, StateWatcher, }; @@ -39,6 +45,7 @@ use fuel_core_types::{ GossipData, GossipsubMessageAcceptance, GossipsubMessageInfo, + PeerId, TransactionGossipData, }, txpool::{ @@ -71,7 +78,15 @@ use self::update_sender::{ mod update_sender; +// Main transaction pool service. It is mainly responsible for +// listening for new transactions being gossiped through the network and +// adding those to its own pool. pub type Service = ServiceRunner>; +// Sidecar service that is responsible for a request-response style of syncing +// pending transactions with other peers upon connection. In other words, +// when a new connection between peers happen, they will compare their +// pending transactions and sync them with each other. +pub type TxPoolSyncService = ServiceRunner>; #[derive(Clone)] pub struct TxStatusChange { @@ -141,10 +156,147 @@ impl Clone for SharedState { } } -pub struct Task { +pub struct TxPoolSyncTask { + ttl_timer: tokio::time::Interval, + peer_connections: BoxStream, + incoming_pooled_transactions: BoxStream>, + shared: SharedState, +} + +#[async_trait::async_trait] +impl RunnableService for TxPoolSyncTask +where + DB: TxPoolDb + Clone, + P2P: PeerToPeer + Send + Sync, +{ + const NAME: &'static str = "TxPoolSync"; + + type SharedData = SharedState; + type Task = TxPoolSyncTask; + type TaskParams = (); + + fn shared_data(&self) -> Self::SharedData { + self.shared.clone() + } + + async fn into_task( + mut self, + _: &StateWatcher, + _: Self::TaskParams, + ) -> anyhow::Result { + self.ttl_timer.reset(); + Ok(self) + } +} + +#[async_trait::async_trait] +impl RunnableTask for TxPoolSyncTask +where + DB: TxPoolDb, + P2P: PeerToPeer + Send + Sync, +{ + async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { + let should_continue; + + tokio::select! { + biased; + + _ = watcher.while_started() => { + should_continue = false; + } + + incoming_pooled_transactions = self.incoming_pooled_transactions.next() => { + if let Some(incoming_pooled_transactions) = incoming_pooled_transactions { + let current_height = self.shared.db.current_block_height()?; + + let mut res = Vec::new(); + + for tx in incoming_pooled_transactions.into_iter() { + let checked_tx = match check_single_tx(tx, current_height, &self.shared.config).await { + Ok(tx) => tx, + Err(e) => { + tracing::error!("Unable to insert pooled transaction coming from a newly connected peer, got an {} error", e); + continue; + } + }; + res.push(self.shared.txpool.lock().insert_inner(checked_tx)); + } + + should_continue = true; + + } else { + should_continue = false; + } + } + + new_connection = self.peer_connections.next() => { + if let Some(peer_id) = new_connection { + should_continue = true; + + let mut txs = vec![]; + for tx in self.shared.txpool.lock().txs() { + txs.push(Transaction::from(&*tx.1.tx)); + } + + + if !txs.is_empty() { + for txs in split_into_batches(txs) { + let result = self.shared.p2p.send_pooled_transactions(peer_id.clone(), txs).await; + if let Err(e) = result { + tracing::error!("Unable to send pooled transactions, got an {} error", e); + } + } + } + + } else { + should_continue = false; + } + } + } + + Ok(should_continue) + } + + async fn shutdown(self) -> anyhow::Result<()> { + // Nothing to shut down because we don't have any temporary state that should be dumped, + // and we don't spawn any sub-tasks that we need to finish or await. + // Maybe we will save and load the previous list of transactions in the future to + // avoid losing them. + + Ok(()) + } +} + +// Split transactions into batches of size less than MAX_REQUEST_SIZE. +fn split_into_batches(txs: Vec) -> Vec> { + let mut batches = Vec::new(); + let mut batch = Vec::new(); + let mut size = 0; + for tx in txs.into_iter() { + let m = RequestMessage::PooledTransactions(vec![tx.clone()]); + let tx_size = postcard::to_stdvec(&m).unwrap().len(); + if size + tx_size < MAX_REQUEST_SIZE { + batch.push(tx); + size += tx_size; + } else { + batches.push(batch); + batch = vec![tx]; + size = tx_size; + } + } + batches.push(batch); + batches +} + +pub struct Task +where + DB: TxPoolDb + 'static + Clone, + P2P: PeerToPeer + Send + Sync + 'static, +{ gossiped_tx_stream: BoxStream, committed_block_stream: BoxStream>, shared: SharedState, + txpool_sync_task: ServiceRunner>, ttl_timer: tokio::time::Interval, } @@ -169,6 +321,9 @@ where _: &StateWatcher, _: Self::TaskParams, ) -> anyhow::Result { + // Transaction pool sync task work as a sub service to the transaction pool task. + // So we start it here and shut it down when this task is shut down. + self.txpool_sync_task.start()?; self.ttl_timer.reset(); Ok(self) } @@ -178,7 +333,7 @@ where impl RunnableTask for Task where P2P: PeerToPeer + Send + Sync, - DB: TxPoolDb, + DB: TxPoolDb + Clone, { async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { let should_continue; @@ -271,10 +426,12 @@ where } async fn shutdown(self) -> anyhow::Result<()> { - // Nothing to shut down because we don't have any temporary state that should be dumped, + // Nothing other than the txpool sync task to shut down + // because we don't have any temporary state that should be dumped, // and we don't spawn any sub-tasks that we need to finish or await. // Maybe we will save and load the previous list of transactions in the future to // avoid losing them. + self.txpool_sync_task.stop_and_await().await?; Ok(()) } } @@ -431,6 +588,40 @@ pub enum TxStatusMessage { FailedStatus, } +pub fn new_txpool_syncing_service( + config: Config, + txpool: Arc>>, + p2p: Arc, + db: DB, +) -> TxPoolSyncService +where + DB: TxPoolDb + 'static + Clone, + P2P: PeerToPeer + 'static, +{ + let mut ttl_timer = tokio::time::interval(config.transaction_ttl); + ttl_timer.set_missed_tick_behavior(MissedTickBehavior::Skip); + let number_of_active_subscription = config.number_of_active_subscription; + let consensus_params = config.chain_config.consensus_parameters.clone(); + let tx_sync_task = TxPoolSyncTask { + peer_connections: p2p.new_connection(), + incoming_pooled_transactions: p2p.incoming_pooled_transactions(), + ttl_timer, + shared: SharedState { + db, + tx_status_sender: TxStatusChange::new( + number_of_active_subscription, + 2 * config.transaction_ttl, + ), + config, + txpool, + p2p, + consensus_params, + }, + }; + + TxPoolSyncService::new(tx_sync_task) +} + pub fn new_service( config: Config, db: DB, @@ -450,9 +641,18 @@ where let consensus_params = config.chain_config.consensus_parameters.clone(); let number_of_active_subscription = config.number_of_active_subscription; let txpool = Arc::new(ParkingMutex::new(TxPool::new(config.clone(), db.clone()))); + + let txpool_sync_task = new_txpool_syncing_service( + config.clone(), + txpool.clone(), + p2p.clone(), + db.clone(), + ); + let task = Task { gossiped_tx_stream, committed_block_stream, + txpool_sync_task, shared: SharedState { tx_status_sender: TxStatusChange::new( number_of_active_subscription, diff --git a/crates/services/txpool/src/service/test_helpers.rs b/crates/services/txpool/src/service/test_helpers.rs index 6c95f3c3ce..0a7fe324be 100644 --- a/crates/services/txpool/src/service/test_helpers.rs +++ b/crates/services/txpool/src/service/test_helpers.rs @@ -63,6 +63,7 @@ impl TestContext { mockall::mock! { pub P2P {} + #[async_trait::async_trait] impl PeerToPeer for P2P { type GossipedTransaction = GossipedTransaction; @@ -70,6 +71,16 @@ mockall::mock! { fn gossiped_transaction_events(&self) -> BoxStream; + fn new_connection(&self) -> BoxStream; + + fn incoming_pooled_transactions(&self) -> BoxStream>; + + async fn send_pooled_transactions( + &self, + peer_id: PeerId, + transactions: Vec, + ) -> anyhow::Result<()>; + fn notify_gossip_transaction_validity( &self, message_info: GossipsubMessageInfo, @@ -95,6 +106,17 @@ impl MockP2P { }); p2p.expect_broadcast_transaction() .returning(move |_| Ok(())); + p2p.expect_notify_gossip_transaction_validity() + .returning(move |_, _| Ok(())); + p2p.expect_new_connection().returning(move || { + let stream = fuel_core_services::stream::empty::(); + Box::pin(stream) + }); + p2p.expect_incoming_pooled_transactions() + .returning(move || { + let stream = fuel_core_services::stream::empty::>(); + Box::pin(stream) + }); p2p } } diff --git a/crates/services/txpool/src/txpool.rs b/crates/services/txpool/src/txpool.rs index 00d56748b5..58230ab8e7 100644 --- a/crates/services/txpool/src/txpool.rs +++ b/crates/services/txpool/src/txpool.rs @@ -87,7 +87,7 @@ where #[tracing::instrument(level = "info", skip_all, fields(tx_id = %tx.id()), ret, err)] // this is atomic operation. Return removed(pushed out/replaced) transactions - fn insert_inner( + pub fn insert_inner( &mut self, tx: Checked, ) -> anyhow::Result { diff --git a/tests/tests/lib.rs b/tests/tests/lib.rs index 9932f39ab2..c1b7df4896 100644 --- a/tests/tests/lib.rs +++ b/tests/tests/lib.rs @@ -25,5 +25,7 @@ mod trigger_integration; mod tx; #[cfg(feature = "p2p")] mod tx_gossip; +#[cfg(feature = "p2p")] +mod tx_request_response; fuel_core_trace::enable_tracing!(); diff --git a/tests/tests/poa.rs b/tests/tests/poa.rs index 40bde7ce5a..a47e13e72f 100644 --- a/tests/tests/poa.rs +++ b/tests/tests/poa.rs @@ -124,11 +124,20 @@ mod p2p { signing_key: pub_key, }; - let bootstrap_config = make_config("Bootstrap".to_string(), chain_config.clone()); + let disable_block_production = false; + let bootstrap_config = make_config( + "Bootstrap".to_string(), + chain_config.clone(), + disable_block_production, + ); let bootstrap = Bootstrap::new(&bootstrap_config).await; let make_node_config = |name: &str| { - let mut config = make_config(name.to_string(), chain_config.clone()); + let mut config = make_config( + name.to_string(), + chain_config.clone(), + disable_block_production, + ); config.block_production = Trigger::Interval { block_time: Duration::from_secs(INTERVAL), }; diff --git a/tests/tests/sync.rs b/tests/tests/sync.rs index eb806c77a8..d6bbc30016 100644 --- a/tests/tests/sync.rs +++ b/tests/tests/sync.rs @@ -28,6 +28,7 @@ async fn test_producer_getting_own_blocks_back() { // Create a producer and a validator that share the same key pair. let secret = SecretKey::random(&mut rng); let pub_key = Input::owner(&secret.public_key()); + let disable_block_production = false; let Nodes { mut producers, mut validators, @@ -38,6 +39,7 @@ async fn test_producer_getting_own_blocks_back() { ProducerSetup::new(secret).with_txs(1).with_name("Alice"), )], [Some(ValidatorSetup::new(pub_key).with_name("Bob"))], + disable_block_production, ) .await; @@ -73,6 +75,7 @@ async fn test_partition_single(num_txs: usize) { // Create a producer and two validators that share the same key pair. let secret = SecretKey::random(&mut rng); let pub_key = Input::owner(&secret.public_key()); + let disable_block_production = false; let Nodes { mut producers, validators, @@ -88,6 +91,7 @@ async fn test_partition_single(num_txs: usize) { Some(ValidatorSetup::new(pub_key).with_name("Bob")), Some(ValidatorSetup::new(pub_key).with_name("Carol")), ], + disable_block_production, ) .await; @@ -138,6 +142,7 @@ async fn test_partitions_larger_groups( // Create a producer and a set of validators that share the same key pair. let secret = SecretKey::random(&mut rng); let pub_key = Input::owner(&secret.public_key()); + let disable_block_production = false; let Nodes { mut producers, mut validators, @@ -152,6 +157,7 @@ async fn test_partitions_larger_groups( (0..num_validators).map(|i| { Some(ValidatorSetup::new(pub_key).with_name(format!("{pub_key}:{i}"))) }), + disable_block_production, ) .await; @@ -232,6 +238,7 @@ async fn test_multiple_producers_different_keys() { // Create a producer for each key pair and a set of validators that share // the same key pair. + let disable_block_production = false; let Nodes { mut producers, validators, @@ -252,6 +259,7 @@ async fn test_multiple_producers_different_keys() { Some(ValidatorSetup::new(*pub_key).with_name(format!("{pub_key}:{i}"))) }) }), + disable_block_production, ) .await; @@ -300,6 +308,7 @@ async fn test_multiple_producers_same_key() { let secret = SecretKey::random(&mut rng); let pub_key = Input::owner(&secret.public_key()); + let disable_block_production = false; let Nodes { mut producers, mut validators, @@ -308,6 +317,7 @@ async fn test_multiple_producers_same_key() { std::iter::repeat(Some(BootstrapSetup::new(pub_key))).take(num_producers), std::iter::repeat(Some(ProducerSetup::new(secret))).take(num_producers), std::iter::repeat(Some(ValidatorSetup::new(pub_key))).take(num_validators), + disable_block_production, ) .await; diff --git a/tests/tests/tx_gossip.rs b/tests/tests/tx_gossip.rs index 8e9f293b1b..22387da962 100644 --- a/tests/tests/tx_gossip.rs +++ b/tests/tests/tx_gossip.rs @@ -7,9 +7,10 @@ use fuel_core::p2p_test_helpers::{ }; use fuel_core_client::client::FuelClient; use fuel_core_types::{ - fuel_tx::*, + fuel_tx::input::Input, fuel_vm::*, }; + use rand::{ rngs::StdRng, SeedableRng, @@ -42,6 +43,7 @@ async fn test_tx_gossiping() { .map(|secret| Input::owner(&secret.public_key())) .collect(); + let disable_block_production = false; // Create a producer for each key pair and a set of validators that share // the same key pair. let Nodes { @@ -64,6 +66,7 @@ async fn test_tx_gossiping() { Some(ValidatorSetup::new(*pub_key).with_name(format!("{pub_key}:{i}"))) }) }), + disable_block_production, ) .await; diff --git a/tests/tests/tx_request_response.rs b/tests/tests/tx_request_response.rs new file mode 100644 index 0000000000..35a097480e --- /dev/null +++ b/tests/tests/tx_request_response.rs @@ -0,0 +1,230 @@ +use fuel_core::p2p_test_helpers::{ + make_nodes, + BootstrapSetup, + Nodes, + ProducerSetup, + ValidatorSetup, +}; +use fuel_core_client::client::FuelClient; +use fuel_core_types::{ + fuel_tx::{ + input::Input, + UniqueIdentifier, + }, + fuel_vm::*, +}; +use rand::{ + rngs::StdRng, + SeedableRng, +}; +use std::{ + collections::hash_map::DefaultHasher, + hash::{ + Hash, + Hasher, + }, + time::Duration, +}; + +// This test is set up in such a way that the transaction is not committed +// as we've disabled the block production. This is to test that the peer +// will request this transaction from the other peer upon connection. +#[tokio::test(flavor = "multi_thread")] +async fn test_tx_request() { + // Create a random seed based on the test parameters. + let mut hasher = DefaultHasher::new(); + let num_txs = 1; + let num_validators = 1; + let num_partitions = 1; + (num_txs, num_validators, num_partitions, line!()).hash(&mut hasher); + let mut rng = StdRng::seed_from_u64(hasher.finish()); + + // Create a set of key pairs. + let secrets: Vec<_> = (0..1).map(|_| SecretKey::random(&mut rng)).collect(); + let pub_keys: Vec<_> = secrets + .clone() + .into_iter() + .map(|secret| Input::owner(&secret.public_key())) + .collect(); + + // Creates a simple node config with no block production. + // This is used for these tests because we don't want the transactions to be + // included in a block. We want them to be included in the mempool and check + // that new connected nodes sync the mempool. + let disable_block_production = true; + // Create a producer for each key pair and a set of validators that share + // the same key pair. + let Nodes { + mut producers, + mut validators, + bootstrap_nodes: _dont_drop, + } = make_nodes( + pub_keys + .iter() + .map(|pub_key| Some(BootstrapSetup::new(*pub_key))), + secrets.clone().into_iter().enumerate().map(|(i, secret)| { + Some( + ProducerSetup::new(secret) + .with_txs(num_txs) + .with_name(format!("{}:producer", pub_keys[i])), + ) + }), + pub_keys.iter().flat_map(|pub_key| { + (0..num_validators).map(move |i| { + Some(ValidatorSetup::new(*pub_key).with_name(format!("{pub_key}:{i}"))) + }) + }), + disable_block_production, + ) + .await; + + let first_node = &mut validators[0]; + let second_node = &mut producers[0]; + + let client_one = FuelClient::from(first_node.node.bound_address); + + // Temporarily shut down the second node. + second_node.shutdown().await; + + // Insert transactions into the mempool of the first node. + first_node.test_txs = second_node.test_txs.clone(); + let _tx = first_node.insert_txs().await; + + // Sleep for 2 seconds + tokio::time::sleep(Duration::from_secs(2)).await; + + second_node.start().await; + + // Wait for nodes to share their transaction pool. + tokio::time::sleep(Duration::from_secs(2)).await; + + let client_two = FuelClient::from(second_node.node.bound_address); + + // Produce a new block with the participating nodes. + // It's expected that they will: + // 1. Connect to each other; + // 2. Share the transaction; + // 3. Produce a block with the transaction; + let _ = client_one.produce_blocks(1, None).await.unwrap(); + let _ = client_two.produce_blocks(1, None).await.unwrap(); + + let chain_id = first_node.config.chain_conf.consensus_parameters.chain_id(); + + let request = fuel_core_client::client::pagination::PaginationRequest { + cursor: None, + results: 10, + direction: fuel_core_client::client::pagination::PageDirection::Forward, + }; + + let first_node_txs = client_two.transactions(request.clone()).await.unwrap(); + let second_node_txs = client_one.transactions(request).await.unwrap(); + + let first_node_tx = first_node_txs + .results + .first() + .expect("txs should have at least one result") + .transaction + .id(&chain_id); + let second_node_tx = second_node_txs + .results + .first() + .expect("txs should have at least one result") + .transaction + .id(&chain_id); + + assert_eq!(first_node_tx, second_node_tx); +} + +// Similar to the previous test, but this time we're testing +// sending enough requests that it won't fit into a single batch. +#[tokio::test(flavor = "multi_thread")] +async fn test_tx_request_multiple_batches() { + // Create a random seed based on the test parameters. + let mut hasher = DefaultHasher::new(); + let num_txs = 200; + let num_validators = 1; + let num_partitions = 1; + (num_txs, num_validators, num_partitions, line!()).hash(&mut hasher); + let mut rng = StdRng::seed_from_u64(hasher.finish()); + + // Create a set of key pairs. + let secrets: Vec<_> = (0..1).map(|_| SecretKey::random(&mut rng)).collect(); + let pub_keys: Vec<_> = secrets + .clone() + .into_iter() + .map(|secret| Input::owner(&secret.public_key())) + .collect(); + + // Creates a simple node config with no block production. + // This is used for these tests because we don't want the transactions to be + // included in a block. We want them to be included in the mempool and check + // that new connected nodes sync the mempool. + let disable_block_production = true; + // Create a producer for each key pair and a set of validators that share + // the same key pair. + let Nodes { + mut producers, + mut validators, + bootstrap_nodes: _dont_drop, + } = make_nodes( + pub_keys + .iter() + .map(|pub_key| Some(BootstrapSetup::new(*pub_key))), + secrets.clone().into_iter().enumerate().map(|(i, secret)| { + Some( + ProducerSetup::new(secret) + .with_txs(num_txs) + .with_name(format!("{}:producer", pub_keys[i])), + ) + }), + pub_keys.iter().flat_map(|pub_key| { + (0..num_validators).map(move |i| { + Some(ValidatorSetup::new(*pub_key).with_name(format!("{pub_key}:{i}"))) + }) + }), + disable_block_production, + ) + .await; + + let first_node = &mut validators[0]; + let second_node = &mut producers[0]; + + let client_one = FuelClient::from(first_node.node.bound_address); + + // Temporarily shut down the second node. + second_node.shutdown().await; + + // Insert transactions into the mempool of the first node. + first_node.test_txs = second_node.test_txs.clone(); + let _tx = first_node.insert_txs().await; + + // Sleep for 2 seconds + tokio::time::sleep(Duration::from_secs(2)).await; + + // Start second node + second_node.start().await; + + // Wait for nodes to share their transaction pool. + tokio::time::sleep(Duration::from_secs(10)).await; + + let client_two = FuelClient::from(second_node.node.bound_address); + + // Produce a new block with the participating nodes. + // It's expected that they will: + // 1. Connect to each other; + // 2. Share the transaction; + // 3. Produce a block with the transaction; + let _ = client_one.produce_blocks(1, None).await.unwrap(); + let _ = client_two.produce_blocks(1, None).await.unwrap(); + + let request = fuel_core_client::client::pagination::PaginationRequest { + cursor: None, + results: 1000, + direction: fuel_core_client::client::pagination::PageDirection::Forward, + }; + + let first_node_txs = client_one.transactions(request.clone()).await.unwrap(); + let second_node_txs = client_two.transactions(request).await.unwrap(); + + assert_eq!(first_node_txs.results.len(), second_node_txs.results.len()); +}