diff --git a/Cargo.lock b/Cargo.lock index 3caf437f..b8b957fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -129,9 +129,9 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.1", "tokio", - "tower", - "tower-layer", - "tower-service", + "tower 0.4.13", + "tower-layer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "tracing", ] @@ -151,8 +151,8 @@ dependencies = [ "pin-project-lite", "rustversion", "sync_wrapper 0.1.2", - "tower-layer", - "tower-service", + "tower-layer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "tracing", ] @@ -507,7 +507,7 @@ dependencies = [ "thiserror", "tokio", "tokio-util", - "tower", + "tower 0.5.1", "tracing", ] @@ -545,7 +545,7 @@ dependencies = [ "tempfile", "thread_local", "tokio", - "tower", + "tower 0.5.1", ] [[package]] @@ -572,7 +572,7 @@ dependencies = [ "tokio", "tokio-test", "tokio-util", - "tower", + "tower 0.5.1", "tracing", ] @@ -618,7 +618,7 @@ dependencies = [ "thiserror", "tokio", "tokio-util", - "tower", + "tower 0.5.1", "tracing", ] @@ -648,7 +648,7 @@ dependencies = [ "futures", "rayon", "serde", - "tower", + "tower 0.5.1", ] [[package]] @@ -680,7 +680,7 @@ dependencies = [ "sha3", "thiserror", "tokio", - "tower", + "tower 0.5.1", ] [[package]] @@ -766,7 +766,7 @@ dependencies = [ "tokio-stream", "tokio-test", "tokio-util", - "tower", + "tower 0.5.1", "tracing", ] @@ -789,7 +789,7 @@ dependencies = [ "tokio-stream", "tokio-test", "tokio-util", - "tower", + "tower 0.5.1", "tracing", ] @@ -817,7 +817,7 @@ dependencies = [ "serde", "serde_json", "tokio", - "tower", + "tower 0.5.1", "ureq", ] @@ -878,7 +878,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", - "tower", + "tower 0.5.1", ] [[package]] @@ -972,7 +972,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", - "tower", + "tower 0.5.1", "tracing", "tracing-subscriber", ] @@ -1471,7 +1471,7 @@ dependencies = [ "rustls-pki-types", "tokio", "tokio-rustls", - "tower-service", + "tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1489,8 +1489,8 @@ dependencies = [ "pin-project-lite", "socket2", "tokio", - "tower", - "tower-service", + "tower 0.4.13", + "tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "tracing", ] @@ -2519,7 +2519,7 @@ dependencies = [ "hyper-rustls", "hyper-util", "tokio", - "tower-service", + "tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2801,9 +2801,24 @@ dependencies = [ "pin-project", "pin-project-lite", "tokio", + "tower-layer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tracing", +] + +[[package]] +name = "tower" +version = "0.5.1" +source = "git+https://github.com/Boog900/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 0.1.2", + "tokio", "tokio-util", - "tower-layer", - "tower-service", + "tower-layer 0.3.3 (git+https://github.com/Boog900/tower.git?rev=6c7faf0)", + "tower-service 0.3.3 (git+https://github.com/Boog900/tower.git?rev=6c7faf0)", "tracing", ] @@ -2813,12 +2828,22 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "git+https://github.com/Boog900/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf" + [[package]] name = "tower-service" version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" +[[package]] +name = "tower-service" +version = "0.3.3" +source = "git+https://github.com/Boog900/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf" + [[package]] name = "tracing" version = "0.1.40" diff --git a/Cargo.toml b/Cargo.toml index 254d3ce4..aedebf96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,7 +81,7 @@ thread_local = { version = "1.1.8", default-features = false } tokio-util = { version = "0.7.12", default-features = false } tokio-stream = { version = "0.1.16", default-features = false } tokio = { version = "1.40.0", default-features = false } -tower = { version = "0.4.13", default-features = false } +tower = { git = "https://github.com/Boog900/tower.git", rev = "6c7faf0", default-features = false } tracing-subscriber = { version = "0.3.18", default-features = false } tracing = { version = "0.1.40", default-features = false } diff --git a/p2p/p2p-core/Cargo.toml b/p2p/p2p-core/Cargo.toml index 8341fe99..a30590fa 100644 --- a/p2p/p2p-core/Cargo.toml +++ b/p2p/p2p-core/Cargo.toml @@ -19,7 +19,7 @@ tokio-util = { workspace = true, features = ["codec"] } tokio-stream = { workspace = true, features = ["sync"]} futures = { workspace = true, features = ["std"] } async-trait = { workspace = true } -tower = { workspace = true, features = ["util", "tracing"] } +tower = { workspace = true, features = ["util", "tracing", "make"] } cfg-if = { workspace = true } thiserror = { workspace = true } diff --git a/p2p/p2p-core/src/client/connector.rs b/p2p/p2p-core/src/client/connector.rs index 553f5a44..62418fe9 100644 --- a/p2p/p2p-core/src/client/connector.rs +++ b/p2p/p2p-core/src/client/connector.rs @@ -12,12 +12,13 @@ use std::{ use futures::{FutureExt, Stream}; use tokio::sync::OwnedSemaphorePermit; -use tower::{Service, ServiceExt}; +use tower::{MakeService, Service, ServiceExt}; +use crate::client::PeerInformation; use crate::{ client::{handshaker::HandShaker, Client, DoHandshakeRequest, HandshakeError, InternalPeerID}, AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone, PeerSyncSvc, - ProtocolRequestHandler, + ProtocolRequest, ProtocolRequestHandler, ProtocolResponse, }; /// A request to connect to a peer. @@ -32,28 +33,36 @@ pub struct ConnectRequest { } /// The connector service, this service connects to peer and returns the [`Client`]. -pub struct Connector { - handshaker: HandShaker, +pub struct Connector { + handshaker: HandShaker, } -impl - Connector +impl + Connector { /// Create a new connector from a handshaker. pub const fn new( - handshaker: HandShaker, + handshaker: HandShaker, ) -> Self { Self { handshaker } } } -impl - Service> for Connector +impl + Service> for Connector where AdrBook: AddressBook + Clone, CSync: CoreSyncSvc + Clone, PSync: PeerSyncSvc + Clone, - ProtoHdlr: ProtocolRequestHandler + Clone, + ProtoHdlrMkr: MakeService< + PeerInformation, + ProtocolRequest, + MakeError = tower::BoxError, + Service: ProtocolRequestHandler, + Future: Send + 'static, + > + Clone + + Send + + 'static, BrdcstStrm: Stream + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Clone + Send + 'static, { diff --git a/p2p/p2p-core/src/client/handshaker.rs b/p2p/p2p-core/src/client/handshaker.rs index d6873a85..19c75c9a 100644 --- a/p2p/p2p-core/src/client/handshaker.rs +++ b/p2p/p2p-core/src/client/handshaker.rs @@ -17,7 +17,7 @@ use tokio::{ sync::{mpsc, OwnedSemaphorePermit, Semaphore}, time::{error::Elapsed, timeout}, }; -use tower::{Service, ServiceExt}; +use tower::{MakeService, Service, ServiceExt}; use tracing::{info_span, Instrument, Span}; use cuprate_pruning::{PruningError, PruningSeed}; @@ -43,7 +43,7 @@ use crate::{ services::PeerSyncRequest, AddressBook, AddressBookRequest, AddressBookResponse, BroadcastMessage, ConnectionDirection, CoreSyncDataRequest, CoreSyncDataResponse, CoreSyncSvc, NetZoneAddress, NetworkZone, - PeerSyncSvc, ProtocolRequestHandler, SharedError, + PeerSyncSvc, ProtocolRequest, ProtocolRequestHandler, SharedError, }; pub mod builder; @@ -87,7 +87,7 @@ pub struct DoHandshakeRequest { /// The peer handshaking service. #[derive(Debug, Clone)] -pub struct HandShaker { +pub struct HandShaker { /// The address book service. address_book: AdrBook, /// The core sync data service. @@ -95,7 +95,7 @@ pub struct HandShaker, } -impl - HandShaker +impl + HandShaker { /// Creates a new handshaker. const fn new( address_book: AdrBook, peer_sync_svc: PSync, core_sync_svc: CSync, - protocol_request_svc: ProtoHdlr, + protocol_request_svc_maker: ProtoHdlrMkr, broadcast_stream_maker: BrdcstStrmMkr, our_basic_node_data: BasicNodeData, connection_parent_span: Span, @@ -126,7 +126,7 @@ impl address_book, peer_sync_svc, core_sync_svc, - protocol_request_svc, + protocol_request_svc_maker, broadcast_stream_maker, our_basic_node_data, connection_parent_span, @@ -135,14 +135,22 @@ impl } } -impl +impl Service> - for HandShaker + for HandShaker where AdrBook: AddressBook + Clone, CSync: CoreSyncSvc + Clone, PSync: PeerSyncSvc + Clone, - ProtoHdlr: ProtocolRequestHandler + Clone, + ProtoHdlrMkr: MakeService< + PeerInformation, + ProtocolRequest, + MakeError = tower::BoxError, + Service: ProtocolRequestHandler, + Future: Send + 'static, + > + Clone + + Send + + 'static, BrdcstStrm: Stream + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Clone + Send + 'static, { @@ -159,7 +167,7 @@ where let broadcast_stream_maker = self.broadcast_stream_maker.clone(); let address_book = self.address_book.clone(); - let protocol_request_svc = self.protocol_request_svc.clone(); + let protocol_request_svc_maker = self.protocol_request_svc_maker.clone(); let core_sync_svc = self.core_sync_svc.clone(); let peer_sync_svc = self.peer_sync_svc.clone(); let our_basic_node_data = self.our_basic_node_data.clone(); @@ -177,7 +185,7 @@ where address_book, core_sync_svc, peer_sync_svc, - protocol_request_svc, + protocol_request_svc_maker, our_basic_node_data, connection_parent_span, ), @@ -232,7 +240,7 @@ pub async fn ping(addr: N::Addr) -> Result /// This function completes a handshake with the requested peer. #[expect(clippy::too_many_arguments)] -async fn handshake( +async fn handshake( req: DoHandshakeRequest, broadcast_stream_maker: BrdcstStrmMkr, @@ -240,7 +248,7 @@ async fn handshake Result, HandshakeError> @@ -248,7 +256,14 @@ where AdrBook: AddressBook + Clone, CSync: CoreSyncSvc + Clone, PSync: PeerSyncSvc + Clone, - ProtoHdlr: ProtocolRequestHandler, + ProtoHdlrMkr: MakeService< + PeerInformation, + ProtocolRequest, + MakeError = tower::BoxError, + Service: ProtocolRequestHandler, + Future: Send + 'static, + > + Send + + 'static, BrdcstStrm: Stream + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Send + 'static, { @@ -480,6 +495,13 @@ where pruning_seed, }; + let protocol_request_handler = protocol_request_svc_maker + .as_service() + .ready() + .await? + .call(info.clone()) + .await?; + let request_handler = PeerRequestHandler { address_book_svc: address_book.clone(), our_sync_svc: core_sync_svc.clone(), diff --git a/p2p/p2p-core/src/client/handshaker/builder.rs b/p2p/p2p-core/src/client/handshaker/builder.rs index 069811df..6d534fef 100644 --- a/p2p/p2p-core/src/client/handshaker/builder.rs +++ b/p2p/p2p-core/src/client/handshaker/builder.rs @@ -1,18 +1,21 @@ use std::marker::PhantomData; use futures::{stream, Stream}; +use tower::MakeService; use tracing::Span; use cuprate_wire::BasicNodeData; use crate::{ - client::{handshaker::HandShaker, InternalPeerID}, - AddressBook, BroadcastMessage, CoreSyncSvc, NetworkZone, PeerSyncSvc, ProtocolRequestHandler, + client::{handshaker::HandShaker, InternalPeerID, PeerInformation}, + AddressBook, BroadcastMessage, CoreSyncSvc, NetworkZone, PeerSyncSvc, ProtocolRequest, + ProtocolRequestHandler, }; mod dummy; pub use dummy::{ DummyAddressBook, DummyCoreSyncSvc, DummyPeerSyncSvc, DummyProtocolRequestHandler, + DummyProtocolRequestHandlerMaker, }; /// A [`HandShaker`] [`Service`](tower::Service) builder. @@ -29,7 +32,7 @@ pub struct HandshakerBuilder< AdrBook = DummyAddressBook, CSync = DummyCoreSyncSvc, PSync = DummyPeerSyncSvc, - ProtoHdlr = DummyProtocolRequestHandler, + ProtoHdlrMkr = DummyProtocolRequestHandlerMaker, BrdcstStrmMkr = fn( InternalPeerID<::Addr>, ) -> stream::Pending, @@ -41,7 +44,7 @@ pub struct HandshakerBuilder< /// The peer sync service. peer_sync_svc: PSync, /// The protocol request service. - protocol_request_svc: ProtoHdlr, + protocol_request_svc_maker: ProtoHdlrMkr, /// Our [`BasicNodeData`] our_basic_node_data: BasicNodeData, /// A function that returns a stream that will give items to be broadcast by a connection. @@ -60,7 +63,7 @@ impl HandshakerBuilder { address_book: DummyAddressBook, core_sync_svc: DummyCoreSyncSvc::static_mainnet_genesis(), peer_sync_svc: DummyPeerSyncSvc, - protocol_request_svc: DummyProtocolRequestHandler, + protocol_request_svc_maker: DummyProtocolRequestHandlerMaker, our_basic_node_data, broadcast_stream_maker: |_| stream::pending(), connection_parent_span: None, @@ -90,7 +93,7 @@ impl let Self { core_sync_svc, peer_sync_svc, - protocol_request_svc, + protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker, connection_parent_span, @@ -101,7 +104,7 @@ impl address_book: new_address_book, core_sync_svc, peer_sync_svc, - protocol_request_svc, + protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker, connection_parent_span, @@ -132,7 +135,7 @@ impl let Self { address_book, peer_sync_svc, - protocol_request_svc, + protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker, connection_parent_span, @@ -143,7 +146,7 @@ impl address_book, core_sync_svc: new_core_sync_svc, peer_sync_svc, - protocol_request_svc, + protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker, connection_parent_span, @@ -168,7 +171,7 @@ impl let Self { address_book, core_sync_svc, - protocol_request_svc, + protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker, connection_parent_span, @@ -179,7 +182,7 @@ impl address_book, core_sync_svc, peer_sync_svc: new_peer_sync_svc, - protocol_request_svc, + protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker, connection_parent_span, @@ -187,19 +190,28 @@ impl } } - /// Changes the protocol request handler, which handles [`ProtocolRequest`](crate::ProtocolRequest)s to our node. + /// Changes the protocol request handler maker, which creates the service that handles [`ProtocolRequest`](crate::ProtocolRequest)s + /// to our node. /// /// ## Default Protocol Request Handler /// - /// The default protocol request handler will not respond to any protocol requests, this should not + /// The default service maker will create services that will not respond to any protocol requests, this should not /// be an issue as long as peers do not think we are ahead of them, if they do they will send requests /// for our blocks, and we won't respond which will cause them to disconnect. - pub fn with_protocol_request_handler( + pub fn with_protocol_request_handler_maker( self, - new_protocol_handler: NProtoHdlr, - ) -> HandshakerBuilder + new_protocol_request_svc_maker: NProtoHdlrMkr, + ) -> HandshakerBuilder where - NProtoHdlr: ProtocolRequestHandler + Clone, + NProtoHdlrMkr: MakeService< + PeerInformation, + ProtocolRequest, + MakeError = tower::BoxError, + Service: ProtocolRequestHandler, + Future: Send + 'static, + > + Clone + + Send + + 'static, { let Self { address_book, @@ -215,7 +227,7 @@ impl address_book, core_sync_svc, peer_sync_svc, - protocol_request_svc: new_protocol_handler, + protocol_request_svc_maker: new_protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker, connection_parent_span, @@ -242,7 +254,7 @@ impl address_book, core_sync_svc, peer_sync_svc, - protocol_request_svc, + protocol_request_svc_maker, our_basic_node_data, connection_parent_span, .. @@ -252,7 +264,7 @@ impl address_book, core_sync_svc, peer_sync_svc, - protocol_request_svc, + protocol_request_svc_maker, our_basic_node_data, broadcast_stream_maker: new_broadcast_stream_maker, connection_parent_span, @@ -279,7 +291,7 @@ impl self.address_book, self.peer_sync_svc, self.core_sync_svc, - self.protocol_request_svc, + self.protocol_request_svc_maker, self.broadcast_stream_maker, self.our_basic_node_data, self.connection_parent_span.unwrap_or(Span::none()), diff --git a/p2p/p2p-core/src/client/handshaker/builder/dummy.rs b/p2p/p2p-core/src/client/handshaker/builder/dummy.rs index e3c4335a..8f052300 100644 --- a/p2p/p2p-core/src/client/handshaker/builder/dummy.rs +++ b/p2p/p2p-core/src/client/handshaker/builder/dummy.rs @@ -7,12 +7,13 @@ use tower::Service; use cuprate_wire::CoreSyncData; +use crate::client::PeerInformation; use crate::{ services::{ AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse, PeerSyncRequest, PeerSyncResponse, }, - NetworkZone, ProtocolRequest, ProtocolResponse, + NetZoneAddress, NetworkZone, ProtocolRequest, ProtocolResponse, }; /// A dummy peer sync service, that doesn't actually keep track of peers sync states. @@ -132,6 +133,24 @@ impl Service> for DummyAddressBook { } } +/// A [`DummyProtocolRequestHandler`] maker. +#[derive(Debug, Clone)] +pub struct DummyProtocolRequestHandlerMaker; + +impl Service> for DummyProtocolRequestHandlerMaker { + type Response = DummyProtocolRequestHandler; + type Error = tower::BoxError; + type Future = Ready>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _: PeerInformation) -> Self::Future { + ready(Ok(DummyProtocolRequestHandler)) + } +} + /// A dummy protocol request handler. #[derive(Debug, Clone)] pub struct DummyProtocolRequestHandler; diff --git a/p2p/p2p/src/lib.rs b/p2p/p2p/src/lib.rs index 2f51c6c5..2d693edb 100644 --- a/p2p/p2p/src/lib.rs +++ b/p2p/p2p/src/lib.rs @@ -10,7 +10,7 @@ use tokio::{ task::JoinSet, }; use tokio_stream::wrappers::WatchStream; -use tower::{buffer::Buffer, util::BoxCloneService, Service, ServiceExt}; +use tower::{buffer::Buffer, util::BoxCloneService, MakeService, Service, ServiceExt}; use tracing::{instrument, Instrument, Span}; use cuprate_async_buffer::BufferStream; @@ -18,7 +18,7 @@ use cuprate_p2p_core::{ client::Connector, client::InternalPeerID, services::{AddressBookRequest, AddressBookResponse, PeerSyncRequest}, - CoreSyncSvc, NetworkZone, ProtocolRequestHandler, + CoreSyncSvc, NetworkZone, ProtocolRequest, ProtocolRequestHandler, }; mod block_downloader; @@ -35,6 +35,7 @@ pub use broadcast::{BroadcastRequest, BroadcastSvc}; use client_pool::ClientPoolDropGuard; pub use config::P2PConfig; use connection_maintainer::MakeConnectionRequest; +use cuprate_p2p_core::client::PeerInformation; /// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`]. /// @@ -46,14 +47,22 @@ use connection_maintainer::MakeConnectionRequest; /// - A core sync service, which keeps track of the sync state of our node #[instrument(level = "debug", name = "net", skip_all, fields(zone = N::NAME))] pub async fn initialize_network( - protocol_request_handler: PR, + protocol_request_handler_maker: PR, core_sync_svc: CS, config: P2PConfig, ) -> Result, tower::BoxError> where N: NetworkZone, N::Addr: borsh::BorshDeserialize + borsh::BorshSerialize, - PR: ProtocolRequestHandler + Clone, + PR: MakeService< + PeerInformation, + ProtocolRequest, + MakeError = tower::BoxError, + Service: ProtocolRequestHandler, + Future: Send + 'static, + > + Clone + + Send + + 'static, CS: CoreSyncSvc + Clone, { let address_book = @@ -85,7 +94,7 @@ where .with_address_book(address_book.clone()) .with_peer_sync_svc(sync_states_svc.clone()) .with_core_sync_svc(core_sync_svc) - .with_protocol_request_handler(protocol_request_handler) + .with_protocol_request_handler_maker(protocol_request_handler_maker) .with_broadcast_stream_maker(outbound_mkr) .with_connection_parent_span(Span::current()); @@ -160,7 +169,7 @@ pub struct NetworkInterface { /// The address book service. address_book: BoxCloneService, AddressBookResponse, tower::BoxError>, /// The peer's sync states service. - sync_states_svc: Buffer, PeerSyncRequest>, + sync_states_svc: Buffer, as Service>>::Future>, /// Background tasks that will be aborted when this interface is dropped. _background_tasks: Arc>, }