Skip to content

Commit

Permalink
Send whole transaction pool upon subscription to gossip (#2131)
Browse files Browse the repository at this point in the history
## Linked Issues/PRs
Part of #1049

## Description
This PR introduce a new workflow when we successfully subscribed to the
TX topic in gossip we try ask for all the tx_ids in the pool's peer and
then ask the full transactions that we don't have.
This PR also build the `SharedState` of `P2PService` independently to
give more flexibility to the initialisation and avoid cyclic
dependencies.

### Detailed workflow : 

- When a peer has subscribed to our topic send inform the TxPool
- The TxPool asks the tx ids of the peer
- The TxPool filter the one it already knows and ask for the full
transactions of the others (a multi message send should be done if the
number of txs asked is above the limit (currently 10k chosen randomly)).
- We verify and add them to our txpool

### Questions for review

- What's happens if the peer doesn't answer to a request that we await ?
- I placed Option in the request/response because there was in the other
messages but why it's needed ?

## Checklist
- [x] Breaking changes are clearly marked as such in the PR description
and changelog
- [x] New behavior is reflected in tests
- [x] [The specification](https://github.com/FuelLabs/fuel-specs/)
matches the implemented behavior (link update PR if changes are needed)

### Before requesting review
- [x] I have reviewed the code myself
- [x] I have created follow-up issues caused by this PR and linked them
here

---------

Co-authored-by: Green Baneling <XgreenX9999@gmail.com>
  • Loading branch information
AurelienFT and xgreenx committed Sep 19, 2024
1 parent 408c468 commit 5963a4e
Show file tree
Hide file tree
Showing 24 changed files with 1,175 additions and 99 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

### Added
- [2131](https://github.com/FuelLabs/fuel-core/pull/2131): Add flow in TxPool in order to ask to newly connected peers to share their transaction pool

## [Version 0.36.0]

### Added
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions bin/fuel-core/src/cli/run/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ pub struct P2PArgs {
#[clap(long = "max-headers-per-request", default_value = "100", env)]
pub max_headers_per_request: usize,

/// Max number of txs in a single txs request response
#[clap(long = "max-txs-per-request", default_value = "10000", env)]
pub max_txs_per_request: usize,

/// Addresses of the bootstrap nodes
/// They should contain PeerId within their `Multiaddr`
#[clap(long = "bootstrap-nodes", value_delimiter = ',', env)]
Expand Down Expand Up @@ -304,6 +308,7 @@ impl P2PArgs {
tcp_port: self.peering_port,
max_block_size: self.max_block_size,
max_headers_per_request: self.max_headers_per_request,
max_txs_per_request: self.max_txs_per_request,
bootstrap_nodes: self.bootstrap_nodes,
reserved_nodes: self.reserved_nodes,
reserved_nodes_only_mode: self.reserved_nodes_only_mode,
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ use crate::state::{
};
#[cfg(feature = "rocksdb")]
use std::path::Path;
use fuel_core_gas_price_service::fuel_gas_price_updater::fuel_core_storage_adapter::storage::{ GasPriceMetadata};
use fuel_core_gas_price_service::fuel_gas_price_updater::fuel_core_storage_adapter::storage::GasPriceMetadata;
use crate::database::database_description::gas_price::GasPriceDatabase;

// Storages implementation
Expand Down
36 changes: 27 additions & 9 deletions crates/fuel-core/src/p2p_test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ use fuel_core_p2p::{
codecs::postcard::PostcardCodec,
network_service::FuelP2PService,
p2p_service::FuelP2PEvent,
request_response::messages::{
RequestMessage,
ResponseMessage,
},
service::to_message_acceptance,
};
use fuel_core_poa::{
Expand Down Expand Up @@ -153,16 +157,30 @@ impl Bootstrap {
}
event = bootstrap.next_event() => {
// The bootstrap node only forwards data without validating it.
if let Some(FuelP2PEvent::GossipsubMessage {
peer_id,
message_id,
..
}) = event {
bootstrap.report_message_validation_result(
&message_id,
match event {
Some(FuelP2PEvent::GossipsubMessage {
peer_id,
to_message_acceptance(&GossipsubMessageAcceptance::Accept)
)
message_id,
..
}) => {
bootstrap.report_message_validation_result(
&message_id,
peer_id,
to_message_acceptance(&GossipsubMessageAcceptance::Accept)
)
},
Some(FuelP2PEvent::InboundRequestMessage {
request_id,
request_message
}) => {
if request_message == RequestMessage::TxPoolAllTransactionsIds {
let _ = bootstrap.send_response_msg(
request_id,
ResponseMessage::TxPoolAllTransactionsIds(Some(vec![])),
);
}
}
_ => {}
}
}
}
Expand Down
30 changes: 28 additions & 2 deletions crates/fuel-core/src/service/adapters/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
use super::BlockImporterAdapter;
use super::{
BlockImporterAdapter,
TxPoolAdapter,
};
use crate::database::OnChainIterableKeyValueView;
use fuel_core_p2p::ports::{
BlockHeightImporter,
P2pDb,
TxPool,
};
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::Result as StorageResult;
use fuel_core_txpool::types::TxId;
use fuel_core_types::{
blockchain::{
consensus::Genesis,
SealedBlockHeader,
},
fuel_types::BlockHeight,
services::p2p::Transactions,
services::p2p::{
NetworkableTransactionPool,
Transactions,
},
};
use std::ops::Range;

Expand Down Expand Up @@ -49,3 +57,21 @@ impl BlockHeightImporter for BlockImporterAdapter {
)
}
}

impl TxPool for TxPoolAdapter {
fn get_tx_ids(&self, max_txs: usize) -> Vec<TxId> {
self.service.get_tx_ids(max_txs)
}

fn get_full_txs(&self, tx_ids: Vec<TxId>) -> Vec<Option<NetworkableTransactionPool>> {
self.service
.find(tx_ids)
.into_iter()
.map(|tx_info| {
tx_info.map(|tx| {
NetworkableTransactionPool::PoolTransaction(tx.tx().clone())
})
})
.collect()
}
}
4 changes: 1 addition & 3 deletions crates/fuel-core/src/service/adapters/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ impl PeerToPeerPort for P2PAdapter {
data: range,
} = range;
if let Some(service) = &self.service {
service
.get_transactions_from_peer(peer_id.into(), range)
.await
service.get_transactions_from_peer(peer_id, range).await
} else {
Err(anyhow::anyhow!("No P2P service available"))
}
Expand Down
57 changes: 57 additions & 0 deletions crates/fuel-core/src/service/adapters/txpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use fuel_core_txpool::{
GasPriceProvider,
MemoryPool,
},
types::TxId,
Result as TxPoolResult,
};
use fuel_core_types::{
Expand All @@ -52,6 +53,7 @@ use fuel_core_types::{
p2p::{
GossipsubMessageAcceptance,
GossipsubMessageInfo,
PeerId,
TransactionGossipData,
},
},
Expand All @@ -65,6 +67,7 @@ impl BlockImporter for BlockImporterAdapter {
}

#[cfg(feature = "p2p")]
#[async_trait::async_trait]
impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
type GossipedTransaction = TransactionGossipData;

Expand All @@ -91,6 +94,21 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
}
}

fn subscribe_new_peers(&self) -> BoxStream<PeerId> {
use tokio_stream::{
wrappers::BroadcastStream,
StreamExt,
};
if let Some(service) = &self.service {
Box::pin(
BroadcastStream::new(service.subscribe_new_peers())
.filter_map(|result| result.ok()),
)
} else {
Box::pin(fuel_core_services::stream::pending())
}
}

fn notify_gossip_transaction_validity(
&self,
message_info: GossipsubMessageInfo,
Expand All @@ -102,9 +120,32 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
Ok(())
}
}

async fn request_tx_ids(&self, peer_id: PeerId) -> anyhow::Result<Vec<TxId>> {
if let Some(service) = &self.service {
service.get_all_transactions_ids_from_peer(peer_id).await
} else {
Ok(vec![])
}
}

async fn request_txs(
&self,
peer_id: PeerId,
tx_ids: Vec<TxId>,
) -> anyhow::Result<Vec<Option<Transaction>>> {
if let Some(service) = &self.service {
service
.get_full_transactions_from_peer(peer_id, tx_ids)
.await
} else {
Ok(vec![])
}
}
}

#[cfg(not(feature = "p2p"))]
#[async_trait::async_trait]
impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
type GossipedTransaction = TransactionGossipData;

Expand All @@ -126,6 +167,22 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
) -> anyhow::Result<()> {
Ok(())
}

fn subscribe_new_peers(&self) -> BoxStream<PeerId> {
Box::pin(fuel_core_services::stream::pending())
}

async fn request_tx_ids(&self, _peer_id: PeerId) -> anyhow::Result<Vec<TxId>> {
Ok(vec![])
}

async fn request_txs(
&self,
_peer_id: PeerId,
_tx_ids: Vec<TxId>,
) -> anyhow::Result<Vec<Option<Transaction>>> {
Ok(vec![])
}
}

impl fuel_core_txpool::ports::TxPoolDb for OnChainIterableKeyValueView {
Expand Down
31 changes: 21 additions & 10 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub type PoAService = fuel_core_poa::Service<
SystemTime,
>;
#[cfg(feature = "p2p")]
pub type P2PService = fuel_core_p2p::service::Service<Database>;
pub type P2PService = fuel_core_p2p::service::Service<Database, TxPoolAdapter>;
pub type TxPoolSharedState = fuel_core_txpool::service::SharedState<
P2PAdapter,
Database,
Expand Down Expand Up @@ -169,14 +169,10 @@ pub fn init_sub_services(
};

#[cfg(feature = "p2p")]
let mut network = config.p2p.clone().map(|p2p_config| {
fuel_core_p2p::service::new_service(
chain_id,
p2p_config,
database.on_chain().clone(),
importer_adapter.clone(),
)
});
let p2p_externals = config
.p2p
.clone()
.map(fuel_core_p2p::service::build_shared_state);

#[cfg(feature = "p2p")]
let p2p_adapter = {
Expand All @@ -192,7 +188,7 @@ pub fn init_sub_services(
invalid_transactions: -100.,
};
P2PAdapter::new(
network.as_ref().map(|network| network.shared.clone()),
p2p_externals.as_ref().map(|ext| ext.0.clone()),
peer_report_config,
)
};
Expand Down Expand Up @@ -229,6 +225,21 @@ pub fn init_sub_services(
);
let tx_pool_adapter = TxPoolAdapter::new(txpool.shared.clone());

#[cfg(feature = "p2p")]
let mut network = config.p2p.clone().zip(p2p_externals).map(
|(p2p_config, (shared_state, request_receiver))| {
fuel_core_p2p::service::new_service(
chain_id,
p2p_config,
shared_state,
request_receiver,
database.on_chain().clone(),
importer_adapter.clone(),
tx_pool_adapter.clone(),
)
},
);

let block_producer = fuel_core_producer::Producer {
config: config.block_producer.clone(),
view_provider: database.on_chain().clone(),
Expand Down
8 changes: 8 additions & 0 deletions crates/services/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub const MAX_RESPONSE_SIZE: usize = 18 * 1024 * 1024;
/// Maximum number of blocks per request.
pub const MAX_HEADERS_PER_REQUEST: usize = 100;

/// Maximum number of transactions ids asked per request.
pub const MAX_TXS_PER_REQUEST: usize = 10000;

#[derive(Clone, Debug)]
pub struct Config<State = Initialized> {
/// The keypair used for handshake during communication with other p2p nodes.
Expand All @@ -73,6 +76,9 @@ pub struct Config<State = Initialized> {
pub max_block_size: usize,
pub max_headers_per_request: usize,

// Maximum of txs id asked in a single request
pub max_txs_per_request: usize,

// `DiscoveryBehaviour` related fields
pub bootstrap_nodes: Vec<Multiaddr>,
pub enable_mdns: bool,
Expand Down Expand Up @@ -151,6 +157,7 @@ impl Config<NotInitialized> {
tcp_port: self.tcp_port,
max_block_size: self.max_block_size,
max_headers_per_request: self.max_headers_per_request,
max_txs_per_request: self.max_txs_per_request,
bootstrap_nodes: self.bootstrap_nodes,
enable_mdns: self.enable_mdns,
max_peers_connected: self.max_peers_connected,
Expand Down Expand Up @@ -200,6 +207,7 @@ impl Config<NotInitialized> {
tcp_port: 0,
max_block_size: MAX_RESPONSE_SIZE,
max_headers_per_request: MAX_HEADERS_PER_REQUEST,
max_txs_per_request: MAX_TXS_PER_REQUEST,
bootstrap_nodes: vec![],
enable_mdns: false,
max_peers_connected: 50,
Expand Down
Loading

0 comments on commit 5963a4e

Please sign in to comment.