From 75c70eb720e4509f222642344777cc417c435726 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Fri, 27 Sep 2024 20:36:07 +0100 Subject: [PATCH 01/10] give the protocol handler access to the peer info --- Cargo.lock | 69 +++++++++++++------ Cargo.toml | 2 +- p2p/p2p-core/Cargo.toml | 2 +- p2p/p2p-core/src/client/connector.rs | 29 +++++--- p2p/p2p-core/src/client/handshaker.rs | 54 ++++++++++----- p2p/p2p-core/src/client/handshaker/builder.rs | 54 +++++++++------ .../src/client/handshaker/builder/dummy.rs | 21 +++++- p2p/p2p/src/lib.rs | 21 ++++-- 8 files changed, 174 insertions(+), 78 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3caf437fa..b8b957fef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -129,9 +129,9 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.1", "tokio", - "tower", - "tower-layer", - "tower-service", + "tower 0.4.13", + "tower-layer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "tracing", ] @@ -151,8 +151,8 @@ dependencies = [ "pin-project-lite", "rustversion", "sync_wrapper 0.1.2", - "tower-layer", - "tower-service", + "tower-layer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "tracing", ] @@ -507,7 +507,7 @@ dependencies = [ "thiserror", "tokio", "tokio-util", - "tower", + "tower 0.5.1", "tracing", ] @@ -545,7 +545,7 @@ dependencies = [ "tempfile", "thread_local", "tokio", - "tower", + "tower 0.5.1", ] [[package]] @@ -572,7 +572,7 @@ dependencies = [ "tokio", "tokio-test", "tokio-util", - "tower", + "tower 0.5.1", "tracing", ] @@ -618,7 +618,7 @@ dependencies = [ "thiserror", "tokio", "tokio-util", - "tower", + "tower 0.5.1", "tracing", ] @@ -648,7 +648,7 @@ dependencies = [ "futures", "rayon", "serde", - "tower", + "tower 0.5.1", ] [[package]] @@ -680,7 +680,7 @@ dependencies = [ "sha3", "thiserror", "tokio", - "tower", + "tower 0.5.1", ] [[package]] @@ -766,7 +766,7 @@ dependencies = [ "tokio-stream", "tokio-test", "tokio-util", - "tower", + "tower 0.5.1", "tracing", ] @@ -789,7 +789,7 @@ dependencies = [ "tokio-stream", "tokio-test", "tokio-util", - "tower", + "tower 0.5.1", "tracing", ] @@ -817,7 +817,7 @@ dependencies = [ "serde", "serde_json", "tokio", - "tower", + "tower 0.5.1", "ureq", ] @@ -878,7 +878,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", - "tower", + "tower 0.5.1", ] [[package]] @@ -972,7 +972,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", - "tower", + "tower 0.5.1", "tracing", "tracing-subscriber", ] @@ -1471,7 +1471,7 @@ dependencies = [ "rustls-pki-types", "tokio", "tokio-rustls", - "tower-service", + "tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1489,8 +1489,8 @@ dependencies = [ "pin-project-lite", "socket2", "tokio", - "tower", - "tower-service", + "tower 0.4.13", + "tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "tracing", ] @@ -2519,7 +2519,7 @@ dependencies = [ "hyper-rustls", "hyper-util", "tokio", - "tower-service", + "tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2801,9 +2801,24 @@ dependencies = [ "pin-project", "pin-project-lite", "tokio", + "tower-layer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tracing", +] + +[[package]] +name = "tower" +version = "0.5.1" +source = "git+https://github.com/Boog900/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 0.1.2", + "tokio", "tokio-util", - "tower-layer", - "tower-service", + "tower-layer 0.3.3 (git+https://github.com/Boog900/tower.git?rev=6c7faf0)", + "tower-service 0.3.3 (git+https://github.com/Boog900/tower.git?rev=6c7faf0)", "tracing", ] @@ -2813,12 +2828,22 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "git+https://github.com/Boog900/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf" + [[package]] name = "tower-service" version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" +[[package]] +name = "tower-service" +version = "0.3.3" +source = "git+https://github.com/Boog900/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf" + [[package]] name = "tracing" version = "0.1.40" diff --git a/Cargo.toml b/Cargo.toml index 254d3ce40..aedebf96f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,7 +81,7 @@ thread_local = { version = "1.1.8", default-features = false } tokio-util = { version = "0.7.12", default-features = false } tokio-stream = { version = "0.1.16", default-features = false } tokio = { version = "1.40.0", default-features = false } -tower = { version = "0.4.13", default-features = false } +tower = { git = "https://github.com/Boog900/tower.git", rev = "6c7faf0", default-features = false } tracing-subscriber = { version = "0.3.18", default-features = false } tracing = { version = "0.1.40", default-features = false } diff --git a/p2p/p2p-core/Cargo.toml b/p2p/p2p-core/Cargo.toml index 8341fe999..a30590fac 100644 --- a/p2p/p2p-core/Cargo.toml +++ b/p2p/p2p-core/Cargo.toml @@ -19,7 +19,7 @@ tokio-util = { workspace = true, features = ["codec"] } tokio-stream = { workspace = true, features = ["sync"]} futures = { workspace = true, features = ["std"] } async-trait = { workspace = true } -tower = { workspace = true, features = ["util", "tracing"] } +tower = { workspace = true, features = ["util", "tracing", "make"] } cfg-if = { workspace = true } thiserror = { workspace = true } diff --git a/p2p/p2p-core/src/client/connector.rs b/p2p/p2p-core/src/client/connector.rs index 553f5a44f..62418fe91 100644 --- a/p2p/p2p-core/src/client/connector.rs +++ b/p2p/p2p-core/src/client/connector.rs @@ -12,12 +12,13 @@ use std::{ use futures::{FutureExt, Stream}; use tokio::sync::OwnedSemaphorePermit; -use tower::{Service, ServiceExt}; +use tower::{MakeService, Service, ServiceExt}; +use crate::client::PeerInformation; use crate::{ client::{handshaker::HandShaker, Client, DoHandshakeRequest, HandshakeError, InternalPeerID}, AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone, PeerSyncSvc, - ProtocolRequestHandler, + ProtocolRequest, ProtocolRequestHandler, ProtocolResponse, }; /// A request to connect to a peer. @@ -32,28 +33,36 @@ pub struct ConnectRequest { } /// The connector service, this service connects to peer and returns the [`Client`]. -pub struct Connector { - handshaker: HandShaker, +pub struct Connector { + handshaker: HandShaker, } -impl - Connector +impl + Connector { /// Create a new connector from a handshaker. pub const fn new( - handshaker: HandShaker, + handshaker: HandShaker, ) -> Self { Self { handshaker } } } -impl - Service> for Connector +impl + Service> for Connector where AdrBook: AddressBook + Clone, CSync: CoreSyncSvc + Clone, PSync: PeerSyncSvc + Clone, - ProtoHdlr: ProtocolRequestHandler + Clone, + ProtoHdlrMkr: MakeService< + PeerInformation, + ProtocolRequest, + MakeError = tower::BoxError, + Service: ProtocolRequestHandler, + Future: Send + 'static, + > + Clone + + Send + + 'static, BrdcstStrm: Stream + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Clone + Send + 'static, { diff --git a/p2p/p2p-core/src/client/handshaker.rs b/p2p/p2p-core/src/client/handshaker.rs index d6873a85f..19c75c9a3 100644 --- a/p2p/p2p-core/src/client/handshaker.rs +++ b/p2p/p2p-core/src/client/handshaker.rs @@ -17,7 +17,7 @@ use tokio::{ sync::{mpsc, OwnedSemaphorePermit, Semaphore}, time::{error::Elapsed, timeout}, }; -use tower::{Service, ServiceExt}; +use tower::{MakeService, Service, ServiceExt}; use tracing::{info_span, Instrument, Span}; use cuprate_pruning::{PruningError, PruningSeed}; @@ -43,7 +43,7 @@ use crate::{ services::PeerSyncRequest, AddressBook, AddressBookRequest, AddressBookResponse, BroadcastMessage, ConnectionDirection, CoreSyncDataRequest, CoreSyncDataResponse, CoreSyncSvc, NetZoneAddress, NetworkZone, - PeerSyncSvc, ProtocolRequestHandler, SharedError, + PeerSyncSvc, ProtocolRequest, ProtocolRequestHandler, SharedError, }; pub mod builder; @@ -87,7 +87,7 @@ pub struct DoHandshakeRequest { /// The peer handshaking service. #[derive(Debug, Clone)] -pub struct HandShaker { +pub struct HandShaker { /// The address book service. address_book: AdrBook, /// The core sync data service. @@ -95,7 +95,7 @@ pub struct HandShaker, } -impl - HandShaker +impl + HandShaker { /// Creates a new handshaker. const fn new( address_book: AdrBook, peer_sync_svc: PSync, core_sync_svc: CSync, - protocol_request_svc: ProtoHdlr, + protocol_request_svc_maker: ProtoHdlrMkr, broadcast_stream_maker: BrdcstStrmMkr, our_basic_node_data: BasicNodeData, connection_parent_span: Span, @@ -126,7 +126,7 @@ impl address_book, peer_sync_svc, core_sync_svc, - protocol_request_svc, + protocol_request_svc_maker, broadcast_stream_maker, our_basic_node_data, connection_parent_span, @@ -135,14 +135,22 @@ impl } } -impl +impl Service> - for HandShaker + for HandShaker where AdrBook: AddressBook + Clone, CSync: CoreSyncSvc + Clone, PSync: PeerSyncSvc + Clone, - ProtoHdlr: ProtocolRequestHandler + Clone, + ProtoHdlrMkr: MakeService< + PeerInformation, + ProtocolRequest, + MakeError = tower::BoxError, + Service: ProtocolRequestHandler, + Future: Send + 'static, + > + Clone + + Send + + 'static, BrdcstStrm: Stream + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Clone + Send + 'static, { @@ -159,7 +167,7 @@ where let broadcast_stream_maker = self.broadcast_stream_maker.clone(); let address_book = self.address_book.clone(); - let protocol_request_svc = self.protocol_request_svc.clone(); + let protocol_request_svc_maker = self.protocol_request_svc_maker.clone(); let core_sync_svc = self.core_sync_svc.clone(); let peer_sync_svc = self.peer_sync_svc.clone(); let our_basic_node_data = self.our_basic_node_data.clone(); @@ -177,7 +185,7 @@ where address_book, core_sync_svc, peer_sync_svc, - protocol_request_svc, + protocol_request_svc_maker, our_basic_node_data, connection_parent_span, ), @@ -232,7 +240,7 @@ pub async fn ping(addr: N::Addr) -> Result /// This function completes a handshake with the requested peer. #[expect(clippy::too_many_arguments)] -async fn handshake( +async fn handshake( req: DoHandshakeRequest, broadcast_stream_maker: BrdcstStrmMkr, @@ -240,7 +248,7 @@ async fn handshake Result, HandshakeError> @@ -248,7 +256,14 @@ where AdrBook: AddressBook + Clone, CSync: CoreSyncSvc + Clone, PSync: PeerSyncSvc + Clone, - ProtoHdlr: ProtocolRequestHandler, + ProtoHdlrMkr: MakeService< + PeerInformation, + ProtocolRequest, + MakeError = tower::BoxError, + Service: ProtocolRequestHandler, + Future: Send + 'static, + > + Send + + 'static, BrdcstStrm: Stream + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Send + 'static, { @@ -480,6 +495,13 @@ where pruning_seed, }; + let protocol_request_handler = protocol_request_svc_maker + .as_service() + .ready() + .await? + .call(info.clone()) + .await?; + let request_handler = PeerRequestHandler { address_book_svc: address_book.clone(), our_sync_svc: core_sync_svc.clone(), diff --git a/p2p/p2p-core/src/client/handshaker/builder.rs b/p2p/p2p-core/src/client/handshaker/builder.rs index 069811dfa..6d534fefb 100644 --- a/p2p/p2p-core/src/client/handshaker/builder.rs +++ b/p2p/p2p-core/src/client/handshaker/builder.rs @@ -1,18 +1,21 @@ use std::marker::PhantomData; use futures::{stream, Stream}; +use tower::MakeService; use tracing::Span; use cuprate_wire::BasicNodeData; use crate::{ - client::{handshaker::HandShaker, InternalPeerID}, - AddressBook, BroadcastMessage, CoreSyncSvc, NetworkZone, PeerSyncSvc, ProtocolRequestHandler, + client::{handshaker::HandShaker, InternalPeerID, PeerInformation}, + AddressBook, BroadcastMessage, CoreSyncSvc, NetworkZone, PeerSyncSvc, ProtocolRequest, + ProtocolRequestHandler, }; mod dummy; pub use dummy::{ DummyAddressBook, DummyCoreSyncSvc, DummyPeerSyncSvc, DummyProtocolRequestHandler, + DummyProtocolRequestHandlerMaker, }; /// A [`HandShaker`] [`Service`](tower::Service) builder. @@ -29,7 +32,7 @@ pub struct HandshakerBuilder< AdrBook = DummyAddressBook, CSync = DummyCoreSyncSvc, PSync = DummyPeerSyncSvc, - ProtoHdlr = DummyProtocolRequestHandler, + ProtoHdlrMkr = DummyProtocolRequestHandlerMaker, BrdcstStrmMkr = fn( InternalPeerID<::Addr>, ) -> stream::Pending, @@ -41,7 +44,7 @@ pub struct HandshakerBuilder< /// The peer sync service. peer_sync_svc: PSync, /// The protocol request service. - protocol_request_svc: ProtoHdlr, + protocol_request_svc_maker: ProtoHdlrMkr, /// Our [`BasicNodeData`] our_basic_node_data: BasicNodeData, /// A function that returns a stream that will give items to be broadcast by a connection. @@ -60,7 +63,7 @@ impl HandshakerBuilder { address_book: DummyAddressBook, core_sync_svc: DummyCoreSyncSvc::static_mainnet_genesis(), peer_sync_svc: DummyPeerSyncSvc, - protocol_request_svc: DummyProtocolRequestHandler, + protocol_request_svc_maker: DummyProtocolRequestHandlerMaker, our_basic_node_data, broadcast_stream_maker: |_| stream::pending(), connection_parent_span: None, @@ -90,7 +93,7 @@ impl let Self { core_sync_svc, peer_sync_svc, - protocol_request_svc, + protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker, connection_parent_span, @@ -101,7 +104,7 @@ impl address_book: new_address_book, core_sync_svc, peer_sync_svc, - protocol_request_svc, + protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker, connection_parent_span, @@ -132,7 +135,7 @@ impl let Self { address_book, peer_sync_svc, - protocol_request_svc, + protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker, connection_parent_span, @@ -143,7 +146,7 @@ impl address_book, core_sync_svc: new_core_sync_svc, peer_sync_svc, - protocol_request_svc, + protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker, connection_parent_span, @@ -168,7 +171,7 @@ impl let Self { address_book, core_sync_svc, - protocol_request_svc, + protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker, connection_parent_span, @@ -179,7 +182,7 @@ impl address_book, core_sync_svc, peer_sync_svc: new_peer_sync_svc, - protocol_request_svc, + protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker, connection_parent_span, @@ -187,19 +190,28 @@ impl } } - /// Changes the protocol request handler, which handles [`ProtocolRequest`](crate::ProtocolRequest)s to our node. + /// Changes the protocol request handler maker, which creates the service that handles [`ProtocolRequest`](crate::ProtocolRequest)s + /// to our node. /// /// ## Default Protocol Request Handler /// - /// The default protocol request handler will not respond to any protocol requests, this should not + /// The default service maker will create services that will not respond to any protocol requests, this should not /// be an issue as long as peers do not think we are ahead of them, if they do they will send requests /// for our blocks, and we won't respond which will cause them to disconnect. - pub fn with_protocol_request_handler( + pub fn with_protocol_request_handler_maker( self, - new_protocol_handler: NProtoHdlr, - ) -> HandshakerBuilder + new_protocol_request_svc_maker: NProtoHdlrMkr, + ) -> HandshakerBuilder where - NProtoHdlr: ProtocolRequestHandler + Clone, + NProtoHdlrMkr: MakeService< + PeerInformation, + ProtocolRequest, + MakeError = tower::BoxError, + Service: ProtocolRequestHandler, + Future: Send + 'static, + > + Clone + + Send + + 'static, { let Self { address_book, @@ -215,7 +227,7 @@ impl address_book, core_sync_svc, peer_sync_svc, - protocol_request_svc: new_protocol_handler, + protocol_request_svc_maker: new_protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker, connection_parent_span, @@ -242,7 +254,7 @@ impl address_book, core_sync_svc, peer_sync_svc, - protocol_request_svc, + protocol_request_svc_maker, our_basic_node_data, connection_parent_span, .. @@ -252,7 +264,7 @@ impl address_book, core_sync_svc, peer_sync_svc, - protocol_request_svc, + protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker: new_broadcast_stream_maker, connection_parent_span, @@ -279,7 +291,7 @@ impl self.address_book, self.peer_sync_svc, self.core_sync_svc, - self.protocol_request_svc, + self.protocol_request_svc_maker, self.broadcast_stream_maker, self.our_basic_node_data, self.connection_parent_span.unwrap_or(Span::none()), diff --git a/p2p/p2p-core/src/client/handshaker/builder/dummy.rs b/p2p/p2p-core/src/client/handshaker/builder/dummy.rs index e3c4335a0..8f0523004 100644 --- a/p2p/p2p-core/src/client/handshaker/builder/dummy.rs +++ b/p2p/p2p-core/src/client/handshaker/builder/dummy.rs @@ -7,12 +7,13 @@ use tower::Service; use cuprate_wire::CoreSyncData; +use crate::client::PeerInformation; use crate::{ services::{ AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse, PeerSyncRequest, PeerSyncResponse, }, - NetworkZone, ProtocolRequest, ProtocolResponse, + NetZoneAddress, NetworkZone, ProtocolRequest, ProtocolResponse, }; /// A dummy peer sync service, that doesn't actually keep track of peers sync states. @@ -132,6 +133,24 @@ impl Service> for DummyAddressBook { } } +/// A [`DummyProtocolRequestHandler`] maker. +#[derive(Debug, Clone)] +pub struct DummyProtocolRequestHandlerMaker; + +impl Service> for DummyProtocolRequestHandlerMaker { + type Response = DummyProtocolRequestHandler; + type Error = tower::BoxError; + type Future = Ready>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _: PeerInformation) -> Self::Future { + ready(Ok(DummyProtocolRequestHandler)) + } +} + /// A dummy protocol request handler. #[derive(Debug, Clone)] pub struct DummyProtocolRequestHandler; diff --git a/p2p/p2p/src/lib.rs b/p2p/p2p/src/lib.rs index 2f51c6c55..2d693edbe 100644 --- a/p2p/p2p/src/lib.rs +++ b/p2p/p2p/src/lib.rs @@ -10,7 +10,7 @@ use tokio::{ task::JoinSet, }; use tokio_stream::wrappers::WatchStream; -use tower::{buffer::Buffer, util::BoxCloneService, Service, ServiceExt}; +use tower::{buffer::Buffer, util::BoxCloneService, MakeService, Service, ServiceExt}; use tracing::{instrument, Instrument, Span}; use cuprate_async_buffer::BufferStream; @@ -18,7 +18,7 @@ use cuprate_p2p_core::{ client::Connector, client::InternalPeerID, services::{AddressBookRequest, AddressBookResponse, PeerSyncRequest}, - CoreSyncSvc, NetworkZone, ProtocolRequestHandler, + CoreSyncSvc, NetworkZone, ProtocolRequest, ProtocolRequestHandler, }; mod block_downloader; @@ -35,6 +35,7 @@ pub use broadcast::{BroadcastRequest, BroadcastSvc}; use client_pool::ClientPoolDropGuard; pub use config::P2PConfig; use connection_maintainer::MakeConnectionRequest; +use cuprate_p2p_core::client::PeerInformation; /// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`]. /// @@ -46,14 +47,22 @@ use connection_maintainer::MakeConnectionRequest; /// - A core sync service, which keeps track of the sync state of our node #[instrument(level = "debug", name = "net", skip_all, fields(zone = N::NAME))] pub async fn initialize_network( - protocol_request_handler: PR, + protocol_request_handler_maker: PR, core_sync_svc: CS, config: P2PConfig, ) -> Result, tower::BoxError> where N: NetworkZone, N::Addr: borsh::BorshDeserialize + borsh::BorshSerialize, - PR: ProtocolRequestHandler + Clone, + PR: MakeService< + PeerInformation, + ProtocolRequest, + MakeError = tower::BoxError, + Service: ProtocolRequestHandler, + Future: Send + 'static, + > + Clone + + Send + + 'static, CS: CoreSyncSvc + Clone, { let address_book = @@ -85,7 +94,7 @@ where .with_address_book(address_book.clone()) .with_peer_sync_svc(sync_states_svc.clone()) .with_core_sync_svc(core_sync_svc) - .with_protocol_request_handler(protocol_request_handler) + .with_protocol_request_handler_maker(protocol_request_handler_maker) .with_broadcast_stream_maker(outbound_mkr) .with_connection_parent_span(Span::current()); @@ -160,7 +169,7 @@ pub struct NetworkInterface { /// The address book service. address_book: BoxCloneService, AddressBookResponse, tower::BoxError>, /// The peer's sync states service. - sync_states_svc: Buffer, PeerSyncRequest>, + sync_states_svc: Buffer, as Service>>::Future>, /// Background tasks that will be aborted when this interface is dropped. _background_tasks: Arc>, } From 3f30ae32a8a01695a012bb821939b8cd63e057cc Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Sun, 29 Sep 2024 00:38:44 +0100 Subject: [PATCH 02/10] add trait alias --- Cargo.lock | 10 +-- Cargo.toml | 2 +- p2p/p2p-core/src/client/connector.rs | 18 ++-- p2p/p2p-core/src/client/handshaker.rs | 27 ++---- p2p/p2p-core/src/client/handshaker/builder.rs | 12 +-- p2p/p2p-core/src/lib.rs | 90 ++++++++++--------- p2p/p2p/src/lib.rs | 22 ++--- 7 files changed, 76 insertions(+), 105 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b8b957fef..aab70fb26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2809,7 +2809,7 @@ dependencies = [ [[package]] name = "tower" version = "0.5.1" -source = "git+https://github.com/Boog900/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf" +source = "git+https://github.com/Cuprate/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf" dependencies = [ "futures-core", "futures-util", @@ -2817,8 +2817,8 @@ dependencies = [ "sync_wrapper 0.1.2", "tokio", "tokio-util", - "tower-layer 0.3.3 (git+https://github.com/Boog900/tower.git?rev=6c7faf0)", - "tower-service 0.3.3 (git+https://github.com/Boog900/tower.git?rev=6c7faf0)", + "tower-layer 0.3.3 (git+https://github.com/Cuprate/tower.git?rev=6c7faf0)", + "tower-service 0.3.3 (git+https://github.com/Cuprate/tower.git?rev=6c7faf0)", "tracing", ] @@ -2831,7 +2831,7 @@ checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-layer" version = "0.3.3" -source = "git+https://github.com/Boog900/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf" +source = "git+https://github.com/Cuprate/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf" [[package]] name = "tower-service" @@ -2842,7 +2842,7 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tower-service" version = "0.3.3" -source = "git+https://github.com/Boog900/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf" +source = "git+https://github.com/Cuprate/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf" [[package]] name = "tracing" diff --git a/Cargo.toml b/Cargo.toml index aedebf96f..9d6f1b248 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,7 +81,7 @@ thread_local = { version = "1.1.8", default-features = false } tokio-util = { version = "0.7.12", default-features = false } tokio-stream = { version = "0.1.16", default-features = false } tokio = { version = "1.40.0", default-features = false } -tower = { git = "https://github.com/Boog900/tower.git", rev = "6c7faf0", default-features = false } +tower = { git = "https://github.com/Cuprate/tower.git", rev = "6c7faf0", default-features = false } tracing-subscriber = { version = "0.3.18", default-features = false } tracing = { version = "0.1.40", default-features = false } diff --git a/p2p/p2p-core/src/client/connector.rs b/p2p/p2p-core/src/client/connector.rs index 62418fe91..eb348c717 100644 --- a/p2p/p2p-core/src/client/connector.rs +++ b/p2p/p2p-core/src/client/connector.rs @@ -14,11 +14,13 @@ use futures::{FutureExt, Stream}; use tokio::sync::OwnedSemaphorePermit; use tower::{MakeService, Service, ServiceExt}; -use crate::client::PeerInformation; use crate::{ - client::{handshaker::HandShaker, Client, DoHandshakeRequest, HandshakeError, InternalPeerID}, + client::{ + handshaker::HandShaker, Client, DoHandshakeRequest, HandshakeError, InternalPeerID, + PeerInformation, + }, AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone, PeerSyncSvc, - ProtocolRequest, ProtocolRequestHandler, ProtocolResponse, + ProtocolRequest, ProtocolRequestHandler, ProtocolRequestHandlerMaker, ProtocolResponse, }; /// A request to connect to a peer. @@ -54,15 +56,7 @@ where AdrBook: AddressBook + Clone, CSync: CoreSyncSvc + Clone, PSync: PeerSyncSvc + Clone, - ProtoHdlrMkr: MakeService< - PeerInformation, - ProtocolRequest, - MakeError = tower::BoxError, - Service: ProtocolRequestHandler, - Future: Send + 'static, - > + Clone - + Send - + 'static, + ProtoHdlrMkr: ProtocolRequestHandlerMaker + Clone, BrdcstStrm: Stream + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Clone + Send + 'static, { diff --git a/p2p/p2p-core/src/client/handshaker.rs b/p2p/p2p-core/src/client/handshaker.rs index 19c75c9a3..b5e9f8213 100644 --- a/p2p/p2p-core/src/client/handshaker.rs +++ b/p2p/p2p-core/src/client/handshaker.rs @@ -17,7 +17,7 @@ use tokio::{ sync::{mpsc, OwnedSemaphorePermit, Semaphore}, time::{error::Elapsed, timeout}, }; -use tower::{MakeService, Service, ServiceExt}; +use tower::{Service, ServiceExt}; use tracing::{info_span, Instrument, Span}; use cuprate_pruning::{PruningError, PruningSeed}; @@ -43,7 +43,7 @@ use crate::{ services::PeerSyncRequest, AddressBook, AddressBookRequest, AddressBookResponse, BroadcastMessage, ConnectionDirection, CoreSyncDataRequest, CoreSyncDataResponse, CoreSyncSvc, NetZoneAddress, NetworkZone, - PeerSyncSvc, ProtocolRequest, ProtocolRequestHandler, SharedError, + PeerSyncSvc, ProtocolRequest, ProtocolRequestHandler, ProtocolRequestHandlerMaker, SharedError, }; pub mod builder; @@ -142,15 +142,7 @@ where AdrBook: AddressBook + Clone, CSync: CoreSyncSvc + Clone, PSync: PeerSyncSvc + Clone, - ProtoHdlrMkr: MakeService< - PeerInformation, - ProtocolRequest, - MakeError = tower::BoxError, - Service: ProtocolRequestHandler, - Future: Send + 'static, - > + Clone - + Send - + 'static, + ProtoHdlrMkr: ProtocolRequestHandlerMaker + Clone, BrdcstStrm: Stream + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Clone + Send + 'static, { @@ -254,16 +246,9 @@ async fn handshake Result, HandshakeError> where AdrBook: AddressBook + Clone, - CSync: CoreSyncSvc + Clone, - PSync: PeerSyncSvc + Clone, - ProtoHdlrMkr: MakeService< - PeerInformation, - ProtocolRequest, - MakeError = tower::BoxError, - Service: ProtocolRequestHandler, - Future: Send + 'static, - > + Send - + 'static, + CSync: CoreSyncSvc+ Clone, + PSync: PeerSyncSvc+ Clone, + ProtoHdlrMkr: ProtocolRequestHandlerMaker, BrdcstStrm: Stream + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Send + 'static, { diff --git a/p2p/p2p-core/src/client/handshaker/builder.rs b/p2p/p2p-core/src/client/handshaker/builder.rs index 6d534fefb..fe8c6b845 100644 --- a/p2p/p2p-core/src/client/handshaker/builder.rs +++ b/p2p/p2p-core/src/client/handshaker/builder.rs @@ -9,7 +9,7 @@ use cuprate_wire::BasicNodeData; use crate::{ client::{handshaker::HandShaker, InternalPeerID, PeerInformation}, AddressBook, BroadcastMessage, CoreSyncSvc, NetworkZone, PeerSyncSvc, ProtocolRequest, - ProtocolRequestHandler, + ProtocolRequestHandler, ProtocolRequestHandlerMaker, }; mod dummy; @@ -203,15 +203,7 @@ impl new_protocol_request_svc_maker: NProtoHdlrMkr, ) -> HandshakerBuilder where - NProtoHdlrMkr: MakeService< - PeerInformation, - ProtocolRequest, - MakeError = tower::BoxError, - Service: ProtocolRequestHandler, - Future: Send + 'static, - > + Clone - + Send - + 'static, + NProtoHdlrMkr: ProtocolRequestHandlerMaker + Clone, { let Self { address_book, diff --git a/p2p/p2p-core/src/lib.rs b/p2p/p2p-core/src/lib.rs index 04e86761d..cbb2de662 100644 --- a/p2p/p2p-core/src/lib.rs +++ b/p2p/p2p-core/src/lib.rs @@ -197,23 +197,21 @@ pub trait PeerSyncSvc: PeerSyncRequest, Response = PeerSyncResponse, Error = tower::BoxError, - Future = Self::Future2, + Future: Future> + Send + 'static, > + Send + 'static { - // This allows us to put more restrictive bounds on the future without defining the future here - // explicitly. - type Future2: Future> + Send + 'static; } -impl PeerSyncSvc for T -where - T: tower::Service, Response = PeerSyncResponse, Error = tower::BoxError> - + Send - + 'static, - T::Future: Future> + Send + 'static, +impl PeerSyncSvc for T where + T: tower::Service< + PeerSyncRequest, + Response = PeerSyncResponse, + Error = tower::BoxError, + Future: Future> + Send + 'static, + > + Send + + 'static { - type Future2 = T::Future; } pub trait AddressBook: @@ -221,26 +219,21 @@ pub trait AddressBook: AddressBookRequest, Response = AddressBookResponse, Error = tower::BoxError, - Future = Self::Future2, + Future: Future> + Send + 'static, > + Send + 'static { - // This allows us to put more restrictive bounds on the future without defining the future here - // explicitly. - type Future2: Future> + Send + 'static; } -impl AddressBook for T -where +impl AddressBook for T where T: tower::Service< AddressBookRequest, Response = AddressBookResponse, Error = tower::BoxError, + Future: Future> + Send + 'static, > + Send - + 'static, - T::Future: Future> + Send + 'static, + + 'static { - type Future2 = T::Future; } pub trait CoreSyncSvc: @@ -248,26 +241,21 @@ pub trait CoreSyncSvc: CoreSyncDataRequest, Response = CoreSyncDataResponse, Error = tower::BoxError, - Future = Self::Future2, + Future: Future> + Send + 'static, > + Send + 'static { - // This allows us to put more restrictive bounds on the future without defining the future here - // explicitly. - type Future2: Future> + Send + 'static; } -impl CoreSyncSvc for T -where +impl CoreSyncSvc for T where T: tower::Service< CoreSyncDataRequest, Response = CoreSyncDataResponse, Error = tower::BoxError, + Future: Future> + Send + 'static, > + Send - + 'static, - T::Future: Future> + Send + 'static, + + 'static { - type Future2 = T::Future; } pub trait ProtocolRequestHandler: @@ -275,21 +263,43 @@ pub trait ProtocolRequestHandler: ProtocolRequest, Response = ProtocolResponse, Error = tower::BoxError, - Future = Self::Future2, + Future: Future> + Send + 'static, > + Send + 'static { - // This allows us to put more restrictive bounds on the future without defining the future here - // explicitly. - type Future2: Future> + Send + 'static; } -impl ProtocolRequestHandler for T -where - T: tower::Service - + Send - + 'static, - T::Future: Future> + Send + 'static, +impl ProtocolRequestHandler for T where + T: tower::Service< + ProtocolRequest, + Response = ProtocolResponse, + Error = tower::BoxError, + Future: Future> + Send + 'static, + > + Send + + 'static +{ +} + +pub trait ProtocolRequestHandlerMaker: + tower::MakeService< + client::PeerInformation, + ProtocolRequest, + MakeError = tower::BoxError, + Service: ProtocolRequestHandler, + Future: Send + 'static, + > + Send + + 'static +{ +} + +impl ProtocolRequestHandlerMaker for T where + T: tower::MakeService< + client::PeerInformation, + ProtocolRequest, + MakeError = tower::BoxError, + Service: ProtocolRequestHandler, + Future: Send + 'static, + > + Send + + 'static { - type Future2 = T::Future; } diff --git a/p2p/p2p/src/lib.rs b/p2p/p2p/src/lib.rs index 2d693edbe..57c82b5ea 100644 --- a/p2p/p2p/src/lib.rs +++ b/p2p/p2p/src/lib.rs @@ -14,12 +14,7 @@ use tower::{buffer::Buffer, util::BoxCloneService, MakeService, Service, Service use tracing::{instrument, Instrument, Span}; use cuprate_async_buffer::BufferStream; -use cuprate_p2p_core::{ - client::Connector, - client::InternalPeerID, - services::{AddressBookRequest, AddressBookResponse, PeerSyncRequest}, - CoreSyncSvc, NetworkZone, ProtocolRequest, ProtocolRequestHandler, -}; +use cuprate_p2p_core::{client::Connector, client::InternalPeerID, services::{AddressBookRequest, AddressBookResponse, PeerSyncRequest}, CoreSyncSvc, NetworkZone, ProtocolRequest, ProtocolRequestHandler, ProtocolRequestHandlerMaker}; mod block_downloader; mod broadcast; @@ -54,15 +49,7 @@ pub async fn initialize_network( where N: NetworkZone, N::Addr: borsh::BorshDeserialize + borsh::BorshSerialize, - PR: MakeService< - PeerInformation, - ProtocolRequest, - MakeError = tower::BoxError, - Service: ProtocolRequestHandler, - Future: Send + 'static, - > + Clone - + Send - + 'static, + PR: ProtocolRequestHandlerMaker + Clone, CS: CoreSyncSvc + Clone, { let address_book = @@ -169,7 +156,10 @@ pub struct NetworkInterface { /// The address book service. address_book: BoxCloneService, AddressBookResponse, tower::BoxError>, /// The peer's sync states service. - sync_states_svc: Buffer, as Service>>::Future>, + sync_states_svc: Buffer< + PeerSyncRequest, + as Service>>::Future, + >, /// Background tasks that will be aborted when this interface is dropped. _background_tasks: Arc>, } From 2de1208c85e21d8fe2403a7689bc13e7f4513b2d Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Mon, 30 Sep 2024 17:00:24 +0100 Subject: [PATCH 03/10] clippy + fmt --- p2p/p2p-core/src/client/connector.rs | 9 +++------ p2p/p2p-core/src/client/handshaker.rs | 6 +++--- p2p/p2p-core/src/client/handshaker/builder.rs | 7 +++---- p2p/p2p/src/lib.rs | 10 +++++++--- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/p2p/p2p-core/src/client/connector.rs b/p2p/p2p-core/src/client/connector.rs index eb348c717..9e904f008 100644 --- a/p2p/p2p-core/src/client/connector.rs +++ b/p2p/p2p-core/src/client/connector.rs @@ -12,15 +12,12 @@ use std::{ use futures::{FutureExt, Stream}; use tokio::sync::OwnedSemaphorePermit; -use tower::{MakeService, Service, ServiceExt}; +use tower::{Service, ServiceExt}; use crate::{ - client::{ - handshaker::HandShaker, Client, DoHandshakeRequest, HandshakeError, InternalPeerID, - PeerInformation, - }, + client::{handshaker::HandShaker, Client, DoHandshakeRequest, HandshakeError, InternalPeerID}, AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone, PeerSyncSvc, - ProtocolRequest, ProtocolRequestHandler, ProtocolRequestHandlerMaker, ProtocolResponse, + ProtocolRequestHandlerMaker, }; /// A request to connect to a peer. diff --git a/p2p/p2p-core/src/client/handshaker.rs b/p2p/p2p-core/src/client/handshaker.rs index b5e9f8213..d6d0d02c5 100644 --- a/p2p/p2p-core/src/client/handshaker.rs +++ b/p2p/p2p-core/src/client/handshaker.rs @@ -43,7 +43,7 @@ use crate::{ services::PeerSyncRequest, AddressBook, AddressBookRequest, AddressBookResponse, BroadcastMessage, ConnectionDirection, CoreSyncDataRequest, CoreSyncDataResponse, CoreSyncSvc, NetZoneAddress, NetworkZone, - PeerSyncSvc, ProtocolRequest, ProtocolRequestHandler, ProtocolRequestHandlerMaker, SharedError, + PeerSyncSvc, ProtocolRequestHandlerMaker, SharedError, }; pub mod builder; @@ -246,8 +246,8 @@ async fn handshake Result, HandshakeError> where AdrBook: AddressBook + Clone, - CSync: CoreSyncSvc+ Clone, - PSync: PeerSyncSvc+ Clone, + CSync: CoreSyncSvc + Clone, + PSync: PeerSyncSvc + Clone, ProtoHdlrMkr: ProtocolRequestHandlerMaker, BrdcstStrm: Stream + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Send + 'static, diff --git a/p2p/p2p-core/src/client/handshaker/builder.rs b/p2p/p2p-core/src/client/handshaker/builder.rs index fe8c6b845..a03fc9f27 100644 --- a/p2p/p2p-core/src/client/handshaker/builder.rs +++ b/p2p/p2p-core/src/client/handshaker/builder.rs @@ -1,15 +1,14 @@ use std::marker::PhantomData; use futures::{stream, Stream}; -use tower::MakeService; use tracing::Span; use cuprate_wire::BasicNodeData; use crate::{ - client::{handshaker::HandShaker, InternalPeerID, PeerInformation}, - AddressBook, BroadcastMessage, CoreSyncSvc, NetworkZone, PeerSyncSvc, ProtocolRequest, - ProtocolRequestHandler, ProtocolRequestHandlerMaker, + client::{handshaker::HandShaker, InternalPeerID}, + AddressBook, BroadcastMessage, CoreSyncSvc, NetworkZone, PeerSyncSvc, + ProtocolRequestHandlerMaker, }; mod dummy; diff --git a/p2p/p2p/src/lib.rs b/p2p/p2p/src/lib.rs index 57c82b5ea..452ac2cd2 100644 --- a/p2p/p2p/src/lib.rs +++ b/p2p/p2p/src/lib.rs @@ -10,11 +10,16 @@ use tokio::{ task::JoinSet, }; use tokio_stream::wrappers::WatchStream; -use tower::{buffer::Buffer, util::BoxCloneService, MakeService, Service, ServiceExt}; +use tower::{buffer::Buffer, util::BoxCloneService, Service, ServiceExt}; use tracing::{instrument, Instrument, Span}; use cuprate_async_buffer::BufferStream; -use cuprate_p2p_core::{client::Connector, client::InternalPeerID, services::{AddressBookRequest, AddressBookResponse, PeerSyncRequest}, CoreSyncSvc, NetworkZone, ProtocolRequest, ProtocolRequestHandler, ProtocolRequestHandlerMaker}; +use cuprate_p2p_core::{ + client::Connector, + client::InternalPeerID, + services::{AddressBookRequest, AddressBookResponse, PeerSyncRequest}, + CoreSyncSvc, NetworkZone, ProtocolRequestHandlerMaker, +}; mod block_downloader; mod broadcast; @@ -30,7 +35,6 @@ pub use broadcast::{BroadcastRequest, BroadcastSvc}; use client_pool::ClientPoolDropGuard; pub use config::P2PConfig; use connection_maintainer::MakeConnectionRequest; -use cuprate_p2p_core::client::PeerInformation; /// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`]. /// From 27d2327a0897ed1812933a57e26bbd31c47a3423 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Mon, 30 Sep 2024 17:59:30 +0100 Subject: [PATCH 04/10] update doc --- p2p/p2p-core/src/client/handshaker/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/p2p-core/src/client/handshaker/builder.rs b/p2p/p2p-core/src/client/handshaker/builder.rs index a03fc9f27..545cc1ead 100644 --- a/p2p/p2p-core/src/client/handshaker/builder.rs +++ b/p2p/p2p-core/src/client/handshaker/builder.rs @@ -20,7 +20,7 @@ pub use dummy::{ /// A [`HandShaker`] [`Service`](tower::Service) builder. /// /// This builder applies default values to make usage easier, behaviour and drawbacks of the defaults are documented -/// on the `with_*` method to change it, for example [`HandshakerBuilder::with_protocol_request_handler`]. +/// on the `with_*` method to change it, for example [`HandshakerBuilder::with_protocol_request_handler_maker`]. /// /// If you want to use any network other than [`Mainnet`](crate::Network::Mainnet) /// you will need to change the core sync service with [`HandshakerBuilder::with_core_sync_svc`], From 7d1c4a1283ea3ecd08f5025f41a10d1d198a024f Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Mon, 30 Sep 2024 18:17:07 +0100 Subject: [PATCH 05/10] simplify trait aliases --- .../src/client/handshaker/builder/dummy.rs | 2 +- p2p/p2p-core/src/lib.rs | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/p2p/p2p-core/src/client/handshaker/builder/dummy.rs b/p2p/p2p-core/src/client/handshaker/builder/dummy.rs index 8f0523004..3ebbdb656 100644 --- a/p2p/p2p-core/src/client/handshaker/builder/dummy.rs +++ b/p2p/p2p-core/src/client/handshaker/builder/dummy.rs @@ -7,8 +7,8 @@ use tower::Service; use cuprate_wire::CoreSyncData; -use crate::client::PeerInformation; use crate::{ + client::PeerInformation, services::{ AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse, PeerSyncRequest, PeerSyncResponse, diff --git a/p2p/p2p-core/src/lib.rs b/p2p/p2p-core/src/lib.rs index cbb2de662..47d49b8cd 100644 --- a/p2p/p2p-core/src/lib.rs +++ b/p2p/p2p-core/src/lib.rs @@ -66,7 +66,7 @@ cfg_if::cfg_if! { } } -use std::{fmt::Debug, future::Future, hash::Hash}; +use std::{fmt::Debug, hash::Hash}; use futures::{Sink, Stream}; @@ -197,7 +197,7 @@ pub trait PeerSyncSvc: PeerSyncRequest, Response = PeerSyncResponse, Error = tower::BoxError, - Future: Future> + Send + 'static, + Future: Send + 'static, > + Send + 'static { @@ -208,7 +208,7 @@ impl PeerSyncSvc for T where PeerSyncRequest, Response = PeerSyncResponse, Error = tower::BoxError, - Future: Future> + Send + 'static, + Future: Send + 'static, > + Send + 'static { @@ -219,7 +219,7 @@ pub trait AddressBook: AddressBookRequest, Response = AddressBookResponse, Error = tower::BoxError, - Future: Future> + Send + 'static, + Future: Send + 'static, > + Send + 'static { @@ -230,7 +230,7 @@ impl AddressBook for T where AddressBookRequest, Response = AddressBookResponse, Error = tower::BoxError, - Future: Future> + Send + 'static, + Future: Send + 'static, > + Send + 'static { @@ -241,7 +241,7 @@ pub trait CoreSyncSvc: CoreSyncDataRequest, Response = CoreSyncDataResponse, Error = tower::BoxError, - Future: Future> + Send + 'static, + Future: Send + 'static, > + Send + 'static { @@ -252,7 +252,7 @@ impl CoreSyncSvc for T where CoreSyncDataRequest, Response = CoreSyncDataResponse, Error = tower::BoxError, - Future: Future> + Send + 'static, + Future: Send + 'static, > + Send + 'static { @@ -263,7 +263,7 @@ pub trait ProtocolRequestHandler: ProtocolRequest, Response = ProtocolResponse, Error = tower::BoxError, - Future: Future> + Send + 'static, + Future: Send + 'static, > + Send + 'static { @@ -274,7 +274,7 @@ impl ProtocolRequestHandler for T where ProtocolRequest, Response = ProtocolResponse, Error = tower::BoxError, - Future: Future> + Send + 'static, + Future: Send + 'static, > + Send + 'static { From 7d3a751775a2e24b3be556961d780946a5d6675f Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Mon, 30 Sep 2024 19:26:55 +0100 Subject: [PATCH 06/10] use tower `Shared` --- p2p/p2p-core/src/client/handshaker/builder.rs | 10 ++++++--- .../src/client/handshaker/builder/dummy.rs | 21 +------------------ 2 files changed, 8 insertions(+), 23 deletions(-) diff --git a/p2p/p2p-core/src/client/handshaker/builder.rs b/p2p/p2p-core/src/client/handshaker/builder.rs index 545cc1ead..5c4a38f38 100644 --- a/p2p/p2p-core/src/client/handshaker/builder.rs +++ b/p2p/p2p-core/src/client/handshaker/builder.rs @@ -1,6 +1,8 @@ +use std::convert::Infallible; use std::marker::PhantomData; use futures::{stream, Stream}; +use tower::{make::Shared, util::MapErr}; use tracing::Span; use cuprate_wire::BasicNodeData; @@ -14,7 +16,6 @@ use crate::{ mod dummy; pub use dummy::{ DummyAddressBook, DummyCoreSyncSvc, DummyPeerSyncSvc, DummyProtocolRequestHandler, - DummyProtocolRequestHandlerMaker, }; /// A [`HandShaker`] [`Service`](tower::Service) builder. @@ -31,7 +32,7 @@ pub struct HandshakerBuilder< AdrBook = DummyAddressBook, CSync = DummyCoreSyncSvc, PSync = DummyPeerSyncSvc, - ProtoHdlrMkr = DummyProtocolRequestHandlerMaker, + ProtoHdlrMkr = MapErr, fn(Infallible) -> tower::BoxError>, BrdcstStrmMkr = fn( InternalPeerID<::Addr>, ) -> stream::Pending, @@ -62,7 +63,10 @@ impl HandshakerBuilder { address_book: DummyAddressBook, core_sync_svc: DummyCoreSyncSvc::static_mainnet_genesis(), peer_sync_svc: DummyPeerSyncSvc, - protocol_request_svc_maker: DummyProtocolRequestHandlerMaker, + protocol_request_svc_maker: MapErr::new( + Shared::new(DummyProtocolRequestHandler), + tower::BoxError::from, + ), our_basic_node_data, broadcast_stream_maker: |_| stream::pending(), connection_parent_span: None, diff --git a/p2p/p2p-core/src/client/handshaker/builder/dummy.rs b/p2p/p2p-core/src/client/handshaker/builder/dummy.rs index 3ebbdb656..e3c4335a0 100644 --- a/p2p/p2p-core/src/client/handshaker/builder/dummy.rs +++ b/p2p/p2p-core/src/client/handshaker/builder/dummy.rs @@ -8,12 +8,11 @@ use tower::Service; use cuprate_wire::CoreSyncData; use crate::{ - client::PeerInformation, services::{ AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse, PeerSyncRequest, PeerSyncResponse, }, - NetZoneAddress, NetworkZone, ProtocolRequest, ProtocolResponse, + NetworkZone, ProtocolRequest, ProtocolResponse, }; /// A dummy peer sync service, that doesn't actually keep track of peers sync states. @@ -133,24 +132,6 @@ impl Service> for DummyAddressBook { } } -/// A [`DummyProtocolRequestHandler`] maker. -#[derive(Debug, Clone)] -pub struct DummyProtocolRequestHandlerMaker; - -impl Service> for DummyProtocolRequestHandlerMaker { - type Response = DummyProtocolRequestHandler; - type Error = tower::BoxError; - type Future = Ready>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, _: PeerInformation) -> Self::Future { - ready(Ok(DummyProtocolRequestHandler)) - } -} - /// A dummy protocol request handler. #[derive(Debug, Clone)] pub struct DummyProtocolRequestHandler; From edc5f361e6e5c31c7c3e74e7d4cb324ec640b8bb Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Mon, 30 Sep 2024 19:45:10 +0100 Subject: [PATCH 07/10] clean import --- p2p/p2p-core/src/client/handshaker/builder.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/p2p/p2p-core/src/client/handshaker/builder.rs b/p2p/p2p-core/src/client/handshaker/builder.rs index 5c4a38f38..982559bf2 100644 --- a/p2p/p2p-core/src/client/handshaker/builder.rs +++ b/p2p/p2p-core/src/client/handshaker/builder.rs @@ -1,5 +1,4 @@ -use std::convert::Infallible; -use std::marker::PhantomData; +use std::{marker::PhantomData, convert::Infallible}; use futures::{stream, Stream}; use tower::{make::Shared, util::MapErr}; From 09a5d28c83fb4d483f895ff4a2de9abfdd8f072a Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Mon, 30 Sep 2024 19:50:59 +0100 Subject: [PATCH 08/10] fmt --- p2p/p2p-core/src/client/handshaker/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/p2p-core/src/client/handshaker/builder.rs b/p2p/p2p-core/src/client/handshaker/builder.rs index 982559bf2..21611638e 100644 --- a/p2p/p2p-core/src/client/handshaker/builder.rs +++ b/p2p/p2p-core/src/client/handshaker/builder.rs @@ -1,4 +1,4 @@ -use std::{marker::PhantomData, convert::Infallible}; +use std::{convert::Infallible, marker::PhantomData}; use futures::{stream, Stream}; use tower::{make::Shared, util::MapErr}; From e759b82f088192696968a629c53dbf34b8db922b Mon Sep 17 00:00:00 2001 From: Boog900 Date: Mon, 30 Sep 2024 22:16:04 +0100 Subject: [PATCH 09/10] Update Cargo.toml Co-authored-by: hinto-janai --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 9d6f1b248..7c3b7e9d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,7 +81,7 @@ thread_local = { version = "1.1.8", default-features = false } tokio-util = { version = "0.7.12", default-features = false } tokio-stream = { version = "0.1.16", default-features = false } tokio = { version = "1.40.0", default-features = false } -tower = { git = "https://github.com/Cuprate/tower.git", rev = "6c7faf0", default-features = false } +tower = { git = "https://github.com/Cuprate/tower.git", rev = "6c7faf0", default-features = false } // tracing-subscriber = { version = "0.3.18", default-features = false } tracing = { version = "0.1.40", default-features = false } From 2dba5f66fae06b845fe15f4b12e20031c53ed0e7 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Mon, 30 Sep 2024 23:04:30 +0100 Subject: [PATCH 10/10] fix merge --- Cargo.toml | 2 +- p2p/p2p-core/src/client/connector.rs | 17 +++-- p2p/p2p-core/src/client/handshaker.rs | 26 ++++--- p2p/p2p-core/src/client/handshaker/builder.rs | 70 +++---------------- p2p/p2p-core/src/lib.rs | 22 ------ p2p/p2p/src/lib.rs | 7 +- 6 files changed, 39 insertions(+), 105 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f1efa7d3e..31e928577 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,7 +82,7 @@ thread_local = { version = "1.1.8", default-features = false } tokio-util = { version = "0.7.12", default-features = false } tokio-stream = { version = "0.1.16", default-features = false } tokio = { version = "1.40.0", default-features = false } -tower = { git = "https://github.com/Cuprate/tower.git", rev = "6c7faf0", default-features = false } // +tower = { git = "https://github.com/Cuprate/tower.git", rev = "6c7faf0", default-features = false } # tracing-subscriber = { version = "0.3.18", default-features = false } tracing = { version = "0.1.40", default-features = false } diff --git a/p2p/p2p-core/src/client/connector.rs b/p2p/p2p-core/src/client/connector.rs index 9e904f008..abe7e137a 100644 --- a/p2p/p2p-core/src/client/connector.rs +++ b/p2p/p2p-core/src/client/connector.rs @@ -16,7 +16,7 @@ use tower::{Service, ServiceExt}; use crate::{ client::{handshaker::HandShaker, Client, DoHandshakeRequest, HandshakeError, InternalPeerID}, - AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone, PeerSyncSvc, + AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone, ProtocolRequestHandlerMaker, }; @@ -32,27 +32,26 @@ pub struct ConnectRequest { } /// The connector service, this service connects to peer and returns the [`Client`]. -pub struct Connector { - handshaker: HandShaker, +pub struct Connector { + handshaker: HandShaker, } -impl - Connector +impl + Connector { /// Create a new connector from a handshaker. pub const fn new( - handshaker: HandShaker, + handshaker: HandShaker, ) -> Self { Self { handshaker } } } -impl - Service> for Connector +impl + Service> for Connector where AdrBook: AddressBook + Clone, CSync: CoreSyncSvc + Clone, - PSync: PeerSyncSvc + Clone, ProtoHdlrMkr: ProtocolRequestHandlerMaker + Clone, BrdcstStrm: Stream + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Clone + Send + 'static, diff --git a/p2p/p2p-core/src/client/handshaker.rs b/p2p/p2p-core/src/client/handshaker.rs index 071fade8c..66acb5b36 100644 --- a/p2p/p2p-core/src/client/handshaker.rs +++ b/p2p/p2p-core/src/client/handshaker.rs @@ -30,13 +30,20 @@ use cuprate_wire::{ AdminRequestMessage, AdminResponseMessage, BasicNodeData, BucketError, LevinCommand, Message, }; -use crate::{client::{ - connection::Connection, request_handler::PeerRequestHandler, - timeout_monitor::connection_timeout_monitor_task, Client, InternalPeerID, PeerInformation, -}, constants::{ - HANDSHAKE_TIMEOUT, MAX_EAGER_PROTOCOL_MESSAGES, MAX_PEERS_IN_PEER_LIST_MESSAGE, - PING_TIMEOUT, -}, handles::HandleBuilder, AddressBook, AddressBookRequest, AddressBookResponse, BroadcastMessage, ConnectionDirection, CoreSyncDataRequest, CoreSyncDataResponse, CoreSyncSvc, NetZoneAddress, NetworkZone, ProtocolRequestHandler, SharedError, ProtocolRequestHandlerMaker}; +use crate::{ + client::{ + connection::Connection, request_handler::PeerRequestHandler, + timeout_monitor::connection_timeout_monitor_task, Client, InternalPeerID, PeerInformation, + }, + constants::{ + HANDSHAKE_TIMEOUT, MAX_EAGER_PROTOCOL_MESSAGES, MAX_PEERS_IN_PEER_LIST_MESSAGE, + PING_TIMEOUT, + }, + handles::HandleBuilder, + AddressBook, AddressBookRequest, AddressBookResponse, BroadcastMessage, ConnectionDirection, + CoreSyncDataRequest, CoreSyncDataResponse, CoreSyncSvc, NetZoneAddress, NetworkZone, + ProtocolRequestHandlerMaker, SharedError, +}; pub mod builder; pub use builder::HandshakerBuilder; @@ -124,8 +131,7 @@ impl } impl - Service> - for HandShaker + Service> for HandShaker where AdrBook: AddressBook + Clone, CSync: CoreSyncSvc + Clone, @@ -162,7 +168,7 @@ where broadcast_stream_maker, address_book, core_sync_svc, - protocol_request_svc_maker, , + protocol_request_svc_maker, our_basic_node_data, connection_parent_span, ), diff --git a/p2p/p2p-core/src/client/handshaker/builder.rs b/p2p/p2p-core/src/client/handshaker/builder.rs index 21611638e..c1c3f3f9f 100644 --- a/p2p/p2p-core/src/client/handshaker/builder.rs +++ b/p2p/p2p-core/src/client/handshaker/builder.rs @@ -8,14 +8,11 @@ use cuprate_wire::BasicNodeData; use crate::{ client::{handshaker::HandShaker, InternalPeerID}, - AddressBook, BroadcastMessage, CoreSyncSvc, NetworkZone, PeerSyncSvc, - ProtocolRequestHandlerMaker, + AddressBook, BroadcastMessage, CoreSyncSvc, NetworkZone, ProtocolRequestHandlerMaker, }; mod dummy; -pub use dummy::{ - DummyAddressBook, DummyCoreSyncSvc, DummyPeerSyncSvc, DummyProtocolRequestHandler, -}; +pub use dummy::{DummyAddressBook, DummyCoreSyncSvc, DummyProtocolRequestHandler}; /// A [`HandShaker`] [`Service`](tower::Service) builder. /// @@ -30,7 +27,6 @@ pub struct HandshakerBuilder< N: NetworkZone, AdrBook = DummyAddressBook, CSync = DummyCoreSyncSvc, - PSync = DummyPeerSyncSvc, ProtoHdlrMkr = MapErr, fn(Infallible) -> tower::BoxError>, BrdcstStrmMkr = fn( InternalPeerID<::Addr>, @@ -40,8 +36,6 @@ pub struct HandshakerBuilder< address_book: AdrBook, /// The core sync data service. core_sync_svc: CSync, - /// The peer sync service. - peer_sync_svc: PSync, /// The protocol request service. protocol_request_svc_maker: ProtoHdlrMkr, /// Our [`BasicNodeData`] @@ -61,7 +55,6 @@ impl HandshakerBuilder { Self { address_book: DummyAddressBook, core_sync_svc: DummyCoreSyncSvc::static_mainnet_genesis(), - peer_sync_svc: DummyPeerSyncSvc, protocol_request_svc_maker: MapErr::new( Shared::new(DummyProtocolRequestHandler), tower::BoxError::from, @@ -74,8 +67,8 @@ impl HandshakerBuilder { } } -impl - HandshakerBuilder +impl + HandshakerBuilder { /// Changes the address book to the provided one. /// @@ -88,13 +81,12 @@ impl pub fn with_address_book( self, new_address_book: NAdrBook, - ) -> HandshakerBuilder + ) -> HandshakerBuilder where NAdrBook: AddressBook + Clone, { let Self { core_sync_svc, - peer_sync_svc, protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker, @@ -105,7 +97,6 @@ impl HandshakerBuilder { address_book: new_address_book, core_sync_svc, - peer_sync_svc, protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker, @@ -130,13 +121,12 @@ impl pub fn with_core_sync_svc( self, new_core_sync_svc: NCSync, - ) -> HandshakerBuilder + ) -> HandshakerBuilder where NCSync: CoreSyncSvc + Clone, { let Self { address_book, - peer_sync_svc, protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker, @@ -147,43 +137,6 @@ impl HandshakerBuilder { address_book, core_sync_svc: new_core_sync_svc, - peer_sync_svc, - protocol_request_svc_maker, - our_basic_node_data, - broadcast_stream_maker, - connection_parent_span, - _zone: PhantomData, - } - } - - /// Changes the peer sync service, which keeps track of peers sync states. - /// - /// ## Default Peer Sync Service - /// - /// The default peer sync service will be used if this method is not called. - /// - /// The default peer sync service will not keep track of peers sync states. - pub fn with_peer_sync_svc( - self, - new_peer_sync_svc: NPSync, - ) -> HandshakerBuilder - where - NPSync: PeerSyncSvc + Clone, - { - let Self { - address_book, - core_sync_svc, - protocol_request_svc_maker, - our_basic_node_data, - broadcast_stream_maker, - connection_parent_span, - .. - } = self; - - HandshakerBuilder { - address_book, - core_sync_svc, - peer_sync_svc: new_peer_sync_svc, protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker, @@ -203,14 +156,13 @@ impl pub fn with_protocol_request_handler_maker( self, new_protocol_request_svc_maker: NProtoHdlrMkr, - ) -> HandshakerBuilder + ) -> HandshakerBuilder where NProtoHdlrMkr: ProtocolRequestHandlerMaker + Clone, { let Self { address_book, core_sync_svc, - peer_sync_svc, our_basic_node_data, broadcast_stream_maker, connection_parent_span, @@ -220,7 +172,6 @@ impl HandshakerBuilder { address_book, core_sync_svc, - peer_sync_svc, protocol_request_svc_maker: new_protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker, @@ -239,7 +190,7 @@ impl pub fn with_broadcast_stream_maker( self, new_broadcast_stream_maker: NBrdcstStrmMkr, - ) -> HandshakerBuilder + ) -> HandshakerBuilder where BrdcstStrm: Stream + Send + 'static, NBrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Clone + Send + 'static, @@ -247,7 +198,6 @@ impl let Self { address_book, core_sync_svc, - peer_sync_svc, protocol_request_svc_maker, our_basic_node_data, connection_parent_span, @@ -257,7 +207,6 @@ impl HandshakerBuilder { address_book, core_sync_svc, - peer_sync_svc, protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker: new_broadcast_stream_maker, @@ -280,10 +229,9 @@ impl } /// Builds the [`HandShaker`]. - pub fn build(self) -> HandShaker { + pub fn build(self) -> HandShaker { HandShaker::new( self.address_book, - self.peer_sync_svc, self.core_sync_svc, self.protocol_request_svc_maker, self.broadcast_stream_maker, diff --git a/p2p/p2p-core/src/lib.rs b/p2p/p2p-core/src/lib.rs index 47d49b8cd..c9a58f524 100644 --- a/p2p/p2p-core/src/lib.rs +++ b/p2p/p2p-core/src/lib.rs @@ -192,28 +192,6 @@ pub trait NetworkZone: Clone + Copy + Send + 'static { // Below here is just helper traits, so we don't have to type out tower::Service bounds // everywhere but still get to use tower. -pub trait PeerSyncSvc: - tower::Service< - PeerSyncRequest, - Response = PeerSyncResponse, - Error = tower::BoxError, - Future: Send + 'static, - > + Send - + 'static -{ -} - -impl PeerSyncSvc for T where - T: tower::Service< - PeerSyncRequest, - Response = PeerSyncResponse, - Error = tower::BoxError, - Future: Send + 'static, - > + Send - + 'static -{ -} - pub trait AddressBook: tower::Service< AddressBookRequest, diff --git a/p2p/p2p/src/lib.rs b/p2p/p2p/src/lib.rs index 7ae7a090a..243115898 100644 --- a/p2p/p2p/src/lib.rs +++ b/p2p/p2p/src/lib.rs @@ -10,9 +10,12 @@ use tower::{buffer::Buffer, util::BoxCloneService, Service, ServiceExt}; use tracing::{instrument, Instrument, Span}; use cuprate_async_buffer::BufferStream; -use cuprate_p2p_core::{client::Connector, client::InternalPeerID, services::{AddressBookRequest, AddressBookResponse}, CoreSyncSvc, NetworkZone, ProtocolRequestHandler, ProtocolRequestHandlerMaker}; - services::{AddressBookRequest, AddressBookResponse, PeerSyncRequest}, +use cuprate_p2p_core::{ + client::Connector, + client::InternalPeerID, + services::{AddressBookRequest, AddressBookResponse}, CoreSyncSvc, NetworkZone, ProtocolRequestHandlerMaker, +}; mod block_downloader; mod broadcast;