Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Pooled transactions exchange protocol #1228

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
216886e
feat: Add initial new tx pool sync service
digorithm Jun 13, 2023
be9f6ce
Checkpoint: pooled transactions request handling
digorithm Jun 23, 2023
685bf10
Merge branch 'master' into rodrigo/txpool-sync
digorithm Jun 23, 2023
0359128
Sync with master and resolve conflicts
digorithm Jul 18, 2023
f1e2c91
Format with nightly
digorithm Jul 18, 2023
791433d
Format with nightly
digorithm Jul 18, 2023
4ff38fa
Sync with master and resolve conflicts
digorithm Aug 17, 2023
5950f1f
Sync with master and resolve conflicts
digorithm Aug 22, 2023
b4eb7ca
Move from requesting txs to sending txs upon connection
digorithm Sep 16, 2023
5b52638
Sync with master and resolve conflicts
digorithm Sep 16, 2023
c3afcb6
Sync with master and resolve conflicts
digorithm Sep 21, 2023
56aca1d
Sync with master and resolve conflicts
digorithm Sep 28, 2023
27ba178
Working draft. Some cleanups and TODOs left.
digorithm Oct 6, 2023
8321049
Sync with master
digorithm Oct 6, 2023
ff93950
Remove the need for a response channel when sending a one way request…
digorithm Oct 6, 2023
aa00a68
Merge branch 'master' into rodrigo/txpool-sync
digorithm Oct 6, 2023
1e1aea9
Split transactions into batches
digorithm Oct 13, 2023
1b5b6fb
Handle error and other nits
digorithm Oct 20, 2023
f79a1a5
Merge branch 'master' into rodrigo/txpool-sync
digorithm Oct 20, 2023
47f0983
Some more nits
digorithm Oct 20, 2023
1871b26
fmt
digorithm Oct 20, 2023
e3fd9d0
Update changelog
digorithm Oct 20, 2023
adce482
Implement missing trait fns
digorithm Oct 20, 2023
7c82238
More CI-related fixes/improvements
digorithm Oct 20, 2023
dbed77c
Rustfmt
digorithm Oct 20, 2023
6a3d033
Remove unused dependencies
digorithm Oct 20, 2023
6423138
Fix test
digorithm Oct 20, 2023
14c00b7
Update multiple batches test
digorithm Oct 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Description of the upcoming release here.

### Added

- [#1228](https://github.com/FuelLabs/fuel-core/pull/1228): Add a new capability of sharing pooled transactions when two nodes connect.
- [#1432](https://github.com/FuelLabs/fuel-core/pull/1432): Add a new `--api-request-timeout` argument to control TTL for GraphQL requests.
- [#1419](https://github.com/FuelLabs/fuel-core/pull/1419): Add additional "sanity" benchmarks for arithmetic op code instructions.
- [#1411](https://github.com/FuelLabs/fuel-core/pull/1411): Added WASM and `no_std` compatibility
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions bin/e2e-test-client/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ async fn works_in_multinode_local_env() {
let mut rng = StdRng::seed_from_u64(line!() as u64);
let secret = SecretKey::random(&mut rng);
let pub_key = Input::owner(&secret.public_key());
let disable_block_production = false;
let Nodes {
mut producers,
mut validators,
Expand All @@ -51,6 +52,7 @@ async fn works_in_multinode_local_env() {
ProducerSetup::new(secret).with_txs(1).with_name("Alice"),
)],
[Some(ValidatorSetup::new(pub_key).with_name("Bob"))],
disable_block_production,
)
.await;

Expand Down
13 changes: 12 additions & 1 deletion crates/fuel-core/src/p2p_test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ pub async fn make_nodes(
bootstrap_setup: impl IntoIterator<Item = Option<BootstrapSetup>>,
producers_setup: impl IntoIterator<Item = Option<ProducerSetup>>,
validators_setup: impl IntoIterator<Item = Option<ValidatorSetup>>,
disable_block_production: bool,
) -> Nodes {
let producers: Vec<_> = producers_setup.into_iter().collect();

Expand Down Expand Up @@ -227,6 +228,7 @@ pub async fn make_nodes(
.then_some(name)
.unwrap_or_else(|| format!("b:{i}")),
chain_config.clone(),
disable_block_production,
);
if let Some(BootstrapSetup { pub_key, .. }) = boot {
match &mut node_config.chain_conf.consensus {
Expand All @@ -252,6 +254,7 @@ pub async fn make_nodes(
.then_some(name)
.unwrap_or_else(|| format!("p:{i}")),
chain_config.clone(),
disable_block_production,
);

let mut test_txs = Vec::with_capacity(0);
Expand Down Expand Up @@ -284,6 +287,7 @@ pub async fn make_nodes(
.then_some(name)
.unwrap_or_else(|| format!("v:{i}")),
chain_config.clone(),
disable_block_production,
);
node_config.block_production = Trigger::Never;
node_config.p2p.as_mut().unwrap().bootstrap_nodes = boots.clone();
Expand All @@ -305,10 +309,17 @@ pub async fn make_nodes(
}
}

pub fn make_config(name: String, chain_config: ChainConfig) -> Config {
pub fn make_config(
name: String,
chain_config: ChainConfig,
disable_block_production: bool,
) -> Config {
let mut node_config = Config::local_node();
node_config.chain_conf = chain_config;
node_config.utxo_validation = true;
if disable_block_production {
node_config.block_production = Trigger::Never;
}
node_config.name = name;
node_config
}
Expand Down
64 changes: 64 additions & 0 deletions crates/fuel-core/src/service/adapters/txpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
P2PAdapter,
},
};

use fuel_core_services::stream::BoxStream;
use fuel_core_storage::{
not_found,
Expand Down Expand Up @@ -37,6 +38,7 @@ use fuel_core_types::{
p2p::{
GossipsubMessageAcceptance,
GossipsubMessageInfo,
PeerId,
TransactionGossipData,
},
},
Expand All @@ -57,6 +59,7 @@ impl BlockImporter for BlockImporterAdapter {
}

#[cfg(feature = "p2p")]
#[async_trait::async_trait]
impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
type GossipedTransaction = TransactionGossipData;

Expand All @@ -68,6 +71,20 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
}
}

async fn send_pooled_transactions(
&self,
peer_id: PeerId,
transactions: Vec<Transaction>,
) -> anyhow::Result<()> {
if let Some(service) = &self.service {
service
.send_pooled_transactions_to_peer(peer_id.into(), transactions)
.await
} else {
Ok(())
}
}

fn gossiped_transaction_events(&self) -> BoxStream<Self::GossipedTransaction> {
use tokio_stream::{
wrappers::BroadcastStream,
Expand All @@ -94,9 +111,40 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
Ok(())
}
}

fn incoming_pooled_transactions(&self) -> BoxStream<Vec<Transaction>> {
use tokio_stream::{
wrappers::BroadcastStream,
StreamExt,
};
if let Some(service) = &self.service {
Box::pin(
BroadcastStream::new(service.subscribe_to_incoming_pooled_transactions())
.filter_map(|result| result.ok()),
)
} else {
fuel_core_services::stream::IntoBoxStream::into_boxed(tokio_stream::pending())
}
}

fn new_connection(&self) -> BoxStream<PeerId> {
use tokio_stream::{
wrappers::BroadcastStream,
StreamExt,
};
if let Some(service) = &self.service {
Box::pin(
BroadcastStream::new(service.subscribe_to_connections())
.filter_map(|result| result.ok()),
)
} else {
fuel_core_services::stream::IntoBoxStream::into_boxed(tokio_stream::pending())
}
}
}

#[cfg(not(feature = "p2p"))]
#[async_trait::async_trait]
impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
type GossipedTransaction = TransactionGossipData;

Expand All @@ -118,6 +166,22 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
) -> anyhow::Result<()> {
Ok(())
}

fn new_connection(&self) -> BoxStream<PeerId> {
Box::pin(fuel_core_services::stream::pending())
}

fn incoming_pooled_transactions(&self) -> BoxStream<Vec<Transaction>> {
Box::pin(fuel_core_services::stream::pending())
}

async fn send_pooled_transactions(
&self,
_peer_id: PeerId,
_transactions: Vec<Transaction>,
) -> anyhow::Result<()> {
Ok(())
}
}

impl fuel_core_txpool::ports::TxPoolDb for Database {
Expand Down
2 changes: 1 addition & 1 deletion crates/services/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod p2p_service;
mod peer_manager;
mod peer_report;
pub mod ports;
mod request_response;
pub mod request_response;
pub mod service;

pub use gossipsub::config as gossipsub_config;
Expand Down
44 changes: 43 additions & 1 deletion crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,20 @@ impl<Codec: NetworkCodec> FuelP2PService<Codec> {
}
}

/// Sends a one way message to a peer.
/// It leverages the `RequestMessage` type to send the message.
/// But, unlike `send_request_msg`, it does not expect a
/// response through a response channel.
pub fn send_msg(
&mut self,
peer_id: PeerId,
message_request: RequestMessage,
) -> RequestId {
self.swarm
.behaviour_mut()
.send_request_msg(message_request, &peer_id)
}

/// Sends RequestMessage to a peer
/// If the peer is not defined it will pick one at random
pub fn send_request_msg(
Expand Down Expand Up @@ -783,7 +797,23 @@ mod tests {
#[tokio::test]
#[instrument]
async fn p2p_service_works() {
build_service_from_config(Config::default_initialized("p2p_service_works")).await;
let mut fuel_p2p_service =
build_service_from_config(Config::default_initialized("p2p_service_works"))
.await;

loop {
match fuel_p2p_service.swarm.select_next_some().await {
SwarmEvent::NewListenAddr { .. } => {
// listener address registered, we are good to go
break
}
SwarmEvent::Behaviour(_) => {}
other_event => {
tracing::error!("Unexpected event: {:?}", other_event);
panic!("Unexpected event {other_event:?}")
}
}
}
}

// Single sentry node connects to multiple reserved nodes and `max_peers_allowed` amount of non-reserved nodes.
Expand Down Expand Up @@ -1623,6 +1653,12 @@ mod tests {
}
});
}
RequestMessage::PooledTransactions(_) => {
// This test case isn't applicable here because
// we send a one way message containing the pooled
// transactions. The node doesn't respond with anything
// so we can't test the response.
}
}
}
}
Expand Down Expand Up @@ -1654,6 +1690,12 @@ mod tests {
let transactions = vec![Transactions(txs)];
let _ = node_b.send_response_msg(*request_id, OutboundResponse::Transactions(Some(Arc::new(transactions))));
}
RequestMessage::PooledTransactions(_) => {
// This test case isn't applicable here because
// we send a one way message containing the pooled
// transactions. The node doesn't respond with anything
// so we can't test the response.
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion crates/services/p2p/src/request_response/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use fuel_core_types::{
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::Transactions,
};
Expand All @@ -22,7 +23,7 @@ use tokio::sync::oneshot;
pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID: &[u8] = b"/fuel/req_res/0.0.1";

/// Max Size in Bytes of the Request Message
pub(crate) const MAX_REQUEST_SIZE: usize = core::mem::size_of::<RequestMessage>();
pub const MAX_REQUEST_SIZE: usize = 32 * 1024;

// Peer receives a `RequestMessage`.
// It prepares a response in form of `OutboundResponse`
Expand All @@ -38,6 +39,7 @@ pub(crate) const MAX_REQUEST_SIZE: usize = core::mem::size_of::<RequestMessage>(
pub enum RequestMessage {
Block(BlockHeight),
SealedHeaders(Range<u32>),
PooledTransactions(Vec<Transaction>),
Transactions(Range<u32>),
}

Expand Down
Loading
Loading