From f02e5d04110100d8764238ea61d44fee67431f0d Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 11 May 2023 20:46:29 +0200 Subject: [PATCH 1/8] Introduce `Stream` wrapper around `Negotiated` --- swarm/src/lib.rs | 4 ++- swarm/src/stream.rs | 59 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 swarm/src/stream.rs diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index a32beda411b..a35e8d7b47f 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -58,6 +58,7 @@ mod connection; mod executor; mod registry; +mod stream; mod stream_protocol; #[cfg(test)] mod test; @@ -125,6 +126,7 @@ pub use handler::{ #[cfg(feature = "macros")] pub use libp2p_swarm_derive::NetworkBehaviour; pub use registry::{AddAddressResult, AddressRecord, AddressScore}; +pub use stream::Stream; pub use stream_protocol::{InvalidProtocol, StreamProtocol}; use crate::handler::UpgradeInfoSend; @@ -1379,7 +1381,7 @@ where /// /// Note: This stream is infinite and it is guaranteed that /// [`Stream::poll_next`] will never return `Poll::Ready(None)`. -impl Stream for Swarm +impl futures::Stream for Swarm where TBehaviour: NetworkBehaviour, { diff --git a/swarm/src/stream.rs b/swarm/src/stream.rs new file mode 100644 index 00000000000..3c4c52afc33 --- /dev/null +++ b/swarm/src/stream.rs @@ -0,0 +1,59 @@ +use futures::{AsyncRead, AsyncWrite}; +use libp2p_core::muxing::SubstreamBox; +use libp2p_core::Negotiated; +use std::io::{IoSlice, IoSliceMut}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[derive(Debug)] +pub struct Stream(Negotiated); + +impl Stream { + pub(crate) fn new(stream: Negotiated) -> Self { + Self(stream) + } +} + +impl AsyncRead for Stream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_read(cx, buf) + } + + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_read_vectored(cx, bufs) + } +} + +impl AsyncWrite for Stream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_write(cx, buf) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_write_vectored(cx, bufs) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_close(cx) + } +} From 5130c7f99a88f74fddde5dec4ee2fd7b973c1572 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 11 May 2023 20:51:46 +0200 Subject: [PATCH 2/8] Use `Stream` everywhere --- swarm/src/connection.rs | 8 +++----- swarm/src/upgrade.rs | 22 +++++++++++----------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 32a24161393..59d719ebc9b 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -35,11 +35,9 @@ use crate::handler::{ UpgradeInfoSend, }; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper}; -use crate::{ - ConnectionHandlerEvent, KeepAlive, StreamProtocol, StreamUpgradeError, SubstreamProtocol, -}; +use crate::{ConnectionHandlerEvent, KeepAlive, Stream, StreamProtocol, StreamUpgradeError, SubstreamProtocol}; use futures::stream::FuturesUnordered; -use futures::FutureExt; +use futures::{FutureExt, TryFutureExt}; use futures::StreamExt; use futures_timer::Delay; use instant::Instant; @@ -504,7 +502,7 @@ where Self { user_data: Some(user_data), timeout, - upgrade: upgrade::apply_outbound(substream, SendWrapper(upgrade), effective_version), + upgrade: upgrade::apply_outbound(substream, SendWrapper(upgrade), effective_version).map_ok(Stream::new), } } } diff --git a/swarm/src/upgrade.rs b/swarm/src/upgrade.rs index b584dfae9fd..53b627458c9 100644 --- a/swarm/src/upgrade.rs +++ b/swarm/src/upgrade.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::NegotiatedSubstream; +use crate::Stream; use futures::prelude::*; use libp2p_core::upgrade; @@ -66,12 +66,12 @@ pub trait OutboundUpgradeSend: UpgradeInfoSend { type Future: Future> + Send + 'static; /// Equivalent to [`OutboundUpgrade::upgrade_outbound`](upgrade::OutboundUpgrade::upgrade_outbound). - fn upgrade_outbound(self, socket: NegotiatedSubstream, info: Self::Info) -> Self::Future; + fn upgrade_outbound(self, socket: Stream, info: Self::Info) -> Self::Future; } impl OutboundUpgradeSend for T where - T: upgrade::OutboundUpgrade + UpgradeInfoSend, + T: upgrade::OutboundUpgrade + UpgradeInfoSend, TInfo: AsRef + Clone + Send + 'static, T::Output: Send + 'static, T::Error: Send + 'static, @@ -81,7 +81,7 @@ where type Error = T::Error; type Future = T::Future; - fn upgrade_outbound(self, socket: NegotiatedSubstream, info: TInfo) -> Self::Future { + fn upgrade_outbound(self, socket: Stream, info: TInfo) -> Self::Future { upgrade::OutboundUpgrade::upgrade_outbound(self, socket, info) } } @@ -100,12 +100,12 @@ pub trait InboundUpgradeSend: UpgradeInfoSend { type Future: Future> + Send + 'static; /// Equivalent to [`InboundUpgrade::upgrade_inbound`](upgrade::InboundUpgrade::upgrade_inbound). - fn upgrade_inbound(self, socket: NegotiatedSubstream, info: Self::Info) -> Self::Future; + fn upgrade_inbound(self, socket: Stream, info: Self::Info) -> Self::Future; } impl InboundUpgradeSend for T where - T: upgrade::InboundUpgrade + UpgradeInfoSend, + T: upgrade::InboundUpgrade + UpgradeInfoSend, TInfo: AsRef + Clone + Send + 'static, T::Output: Send + 'static, T::Error: Send + 'static, @@ -115,7 +115,7 @@ where type Error = T::Error; type Future = T::Future; - fn upgrade_inbound(self, socket: NegotiatedSubstream, info: TInfo) -> Self::Future { + fn upgrade_inbound(self, socket: Stream, info: TInfo) -> Self::Future { upgrade::InboundUpgrade::upgrade_inbound(self, socket, info) } } @@ -137,22 +137,22 @@ impl upgrade::UpgradeInfo for SendWrapper { } } -impl upgrade::OutboundUpgrade for SendWrapper { +impl upgrade::OutboundUpgrade for SendWrapper { type Output = T::Output; type Error = T::Error; type Future = T::Future; - fn upgrade_outbound(self, socket: NegotiatedSubstream, info: T::Info) -> Self::Future { + fn upgrade_outbound(self, socket: Stream, info: T::Info) -> Self::Future { OutboundUpgradeSend::upgrade_outbound(self.0, socket, info) } } -impl upgrade::InboundUpgrade for SendWrapper { +impl upgrade::InboundUpgrade for SendWrapper { type Output = T::Output; type Error = T::Error; type Future = T::Future; - fn upgrade_inbound(self, socket: NegotiatedSubstream, info: T::Info) -> Self::Future { + fn upgrade_inbound(self, socket: Stream, info: T::Info) -> Self::Future { InboundUpgradeSend::upgrade_inbound(self.0, socket, info) } } From cae9ac5cb7df907985456e7fcf00be5f43dc3f07 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 11 May 2023 21:40:37 +0200 Subject: [PATCH 3/8] Alias `NegotiatedSubstream` to `Stream` --- Cargo.lock | 1 + swarm/Cargo.toml | 1 + swarm/src/connection.rs | 121 ++++++++++++++++++++++--------------- swarm/src/handler/multi.rs | 6 +- swarm/src/lib.rs | 5 +- 5 files changed, 78 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2e850e031a0..3bb82a6a6aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2989,6 +2989,7 @@ dependencies = [ "libp2p-swarm-test", "libp2p-yamux", "log", + "multistream-select", "once_cell", "quickcheck-ext", "rand 0.8.5", diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 699e0c74b83..483c65f8dfd 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -26,6 +26,7 @@ void = "1" wasm-bindgen-futures = { version = "0.4.34", optional = true } getrandom = { version = "0.2.9", features = ["js"], optional = true } # Explicit dependency to be used in `wasm-bindgen` feature once_cell = "1.17.1" +multistream-select = { workspace = true } [target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies] async-std = { version = "1.6.2", optional = true } diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 59d719ebc9b..6dc2b9fc38b 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -34,10 +34,14 @@ use crate::handler::{ FullyNegotiatedOutbound, ListenUpgradeError, ProtocolSupport, ProtocolsAdded, ProtocolsChange, UpgradeInfoSend, }; -use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper}; -use crate::{ConnectionHandlerEvent, KeepAlive, Stream, StreamProtocol, StreamUpgradeError, SubstreamProtocol}; +use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend}; +use crate::{ + ConnectionHandlerEvent, KeepAlive, Stream, StreamProtocol, StreamUpgradeError, + SubstreamProtocol, +}; +use futures::future::BoxFuture; use futures::stream::FuturesUnordered; -use futures::{FutureExt, TryFutureExt}; +use futures::FutureExt; use futures::StreamExt; use futures_timer::Delay; use instant::Instant; @@ -45,9 +49,7 @@ use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Multiaddr; use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerEvent, StreamMuxerExt, SubstreamBox}; use libp2p_core::upgrade; -use libp2p_core::upgrade::{ - InboundUpgradeApply, NegotiationError, OutboundUpgradeApply, ProtocolError, -}; +use libp2p_core::upgrade::{NegotiationError, ProtocolError}; use libp2p_core::Endpoint; use libp2p_identity::PeerId; use std::collections::HashSet; @@ -120,14 +122,16 @@ where negotiating_in: FuturesUnordered< SubstreamUpgrade< THandler::InboundOpenInfo, - InboundUpgradeApply>, + ::Output, + ::Error, >, >, /// Futures that upgrade outgoing substreams. negotiating_out: FuturesUnordered< SubstreamUpgrade< THandler::OutboundOpenInfo, - OutboundUpgradeApply>, + ::Output, + ::Error, >, >, /// The currently planned connection & handler shutdown. @@ -468,24 +472,23 @@ impl<'a> IncomingInfo<'a> { } } -struct SubstreamUpgrade { +struct SubstreamUpgrade { user_data: Option, timeout: Delay, - upgrade: Upgrade, + upgrade: BoxFuture<'static, Result>>, } -impl - SubstreamUpgrade>> -where - Upgrade: Send + OutboundUpgradeSend, -{ - fn new_outbound( +impl SubstreamUpgrade { + fn new_outbound( substream: SubstreamBox, user_data: UserData, timeout: Delay, upgrade: Upgrade, version_override: Option, - ) -> Self { + ) -> Self + where + Upgrade: OutboundUpgradeSend, + { let effective_version = match version_override { Some(version_override) if version_override != upgrade::Version::default() => { log::debug!( @@ -498,45 +501,77 @@ where } _ => upgrade::Version::default(), }; + let protocols = upgrade.protocol_info(); Self { user_data: Some(user_data), timeout, - upgrade: upgrade::apply_outbound(substream, SendWrapper(upgrade), effective_version).map_ok(Stream::new), + upgrade: Box::pin(async move { + let (info, stream) = multistream_select::dialer_select_proto( + substream, + protocols, + effective_version, + ) + .await + .map_err(to_stream_upgrade_error)?; + + let output = upgrade + .upgrade_outbound(Stream::new(stream), info) + .await + .map_err(StreamUpgradeError::Apply)?; + + Ok(output) + }), } } } -impl - SubstreamUpgrade>> -where - Upgrade: Send + InboundUpgradeSend, -{ - fn new_inbound( +impl SubstreamUpgrade { + fn new_inbound( substream: SubstreamBox, protocol: SubstreamProtocol, - ) -> Self { + ) -> Self + where + Upgrade: InboundUpgradeSend, + { let timeout = *protocol.timeout(); let (upgrade, open_info) = protocol.into_upgrade(); + let protocols = upgrade.protocol_info(); Self { user_data: Some(open_info), timeout: Delay::new(timeout), - upgrade: upgrade::apply_inbound(substream, SendWrapper(upgrade)), + upgrade: Box::pin(async move { + let (info, stream) = + multistream_select::listener_select_proto(substream, protocols) + .await + .map_err(to_stream_upgrade_error)?; + + let output = upgrade + .upgrade_inbound(Stream::new(stream), info) + .await + .map_err(StreamUpgradeError::Apply)?; + + Ok(output) + }), } } } -impl Unpin for SubstreamUpgrade {} +fn to_stream_upgrade_error(e: NegotiationError) -> StreamUpgradeError { + match e { + NegotiationError::Failed => StreamUpgradeError::NegotiationFailed, + NegotiationError::ProtocolError(ProtocolError::IoError(e)) => StreamUpgradeError::Io(e), + NegotiationError::ProtocolError(other) => { + StreamUpgradeError::Io(io::Error::new(io::ErrorKind::Other, other)) + } + } +} -impl Future for SubstreamUpgrade -where - Upgrade: Future>> + Unpin, -{ - type Output = ( - UserData, - Result>, - ); +impl Unpin for SubstreamUpgrade {} + +impl Future for SubstreamUpgrade { + type Output = (UserData, Result>); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { match self.timeout.poll_unpin(cx) { @@ -558,21 +593,7 @@ where .take() .expect("Future not to be polled again once ready."); - Poll::Ready(( - user_data, - result.map_err(|e| match e { - upgrade::UpgradeError::Select(NegotiationError::Failed) => { - StreamUpgradeError::NegotiationFailed - } - upgrade::UpgradeError::Select(NegotiationError::ProtocolError( - ProtocolError::IoError(e), - )) => StreamUpgradeError::Io(e), - upgrade::UpgradeError::Select(NegotiationError::ProtocolError(other)) => { - StreamUpgradeError::Io(io::Error::new(io::ErrorKind::Other, other)) - } - upgrade::UpgradeError::Apply(e) => StreamUpgradeError::Apply(e), - }), - )) + Poll::Ready((user_data, result)) } } diff --git a/swarm/src/handler/multi.rs b/swarm/src/handler/multi.rs index 61c357b6597..5caf611278f 100644 --- a/swarm/src/handler/multi.rs +++ b/swarm/src/handler/multi.rs @@ -27,7 +27,7 @@ use crate::handler::{ SubstreamProtocol, }; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, UpgradeInfoSend}; -use crate::NegotiatedSubstream; +use crate::Stream; use futures::{future::BoxFuture, prelude::*}; use rand::Rng; use std::{ @@ -373,7 +373,7 @@ where type Error = (K, ::Error); type Future = BoxFuture<'static, Result>; - fn upgrade_inbound(mut self, resource: NegotiatedSubstream, info: Self::Info) -> Self::Future { + fn upgrade_inbound(mut self, resource: Stream, info: Self::Info) -> Self::Future { let IndexedProtoName(index, info) = info; let (key, upgrade) = self.upgrades.remove(index); upgrade @@ -395,7 +395,7 @@ where type Error = (K, ::Error); type Future = BoxFuture<'static, Result>; - fn upgrade_outbound(mut self, resource: NegotiatedSubstream, info: Self::Info) -> Self::Future { + fn upgrade_outbound(mut self, resource: Stream, info: Self::Info) -> Self::Future { let IndexedProtoName(index, info) = info; let (key, upgrade) = self.upgrades.remove(index); upgrade diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index a35e8d7b47f..d9b1bac9d3a 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -137,14 +137,13 @@ use connection::{ }; use dial_opts::{DialOpts, PeerCondition}; use futures::{executor::ThreadPoolBuilder, prelude::*, stream::FusedStream}; -use libp2p_core::muxing::SubstreamBox; use libp2p_core::{ connection::ConnectedPoint, multiaddr, multihash::Multihash, muxing::StreamMuxerBox, transport::{self, ListenerId, TransportError, TransportEvent}, - Endpoint, Multiaddr, Negotiated, Transport, + Endpoint, Multiaddr, Transport, }; use libp2p_identity::PeerId; use registry::{AddressIntoIter, Addresses}; @@ -162,7 +161,7 @@ use std::{ /// /// Implements the [`AsyncRead`](futures::io::AsyncRead) and /// [`AsyncWrite`](futures::io::AsyncWrite) traits. -pub type NegotiatedSubstream = Negotiated; +pub type NegotiatedSubstream = Stream; /// Event generated by the [`NetworkBehaviour`] that the swarm will report back. type TBehaviourOutEvent = ::OutEvent; From f7859916af8a4aa0960691b45aab32475c4ffbde Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 11 May 2023 21:44:16 +0200 Subject: [PATCH 4/8] Deprecate the `NegotiatedSubstream` alias --- libp2p/src/lib.rs | 2 +- protocols/dcutr/src/protocol/inbound.rs | 8 ++--- protocols/dcutr/src/protocol/outbound.rs | 6 ++-- protocols/gossipsub/src/handler.rs | 14 ++++---- protocols/kad/src/handler.rs | 36 +++++++------------ protocols/ping/src/handler.rs | 8 ++--- protocols/relay/src/behaviour/handler.rs | 12 +++---- protocols/relay/src/priv_client.rs | 8 ++--- protocols/relay/src/protocol/inbound_hop.rs | 12 +++---- protocols/relay/src/protocol/inbound_stop.rs | 10 +++--- protocols/relay/src/protocol/outbound_hop.rs | 8 ++--- protocols/relay/src/protocol/outbound_stop.rs | 8 ++--- protocols/rendezvous/src/handler/inbound.rs | 12 +++---- protocols/rendezvous/src/handler/outbound.rs | 4 +-- protocols/rendezvous/src/substream_handler.rs | 7 ++-- .../request-response/src/handler/protocol.rs | 18 +++------- swarm/src/lib.rs | 1 + 17 files changed, 77 insertions(+), 97 deletions(-) diff --git a/libp2p/src/lib.rs b/libp2p/src/lib.rs index 3a8c09a068b..f086c2872f4 100644 --- a/libp2p/src/lib.rs +++ b/libp2p/src/lib.rs @@ -177,7 +177,7 @@ pub use self::swarm::Swarm; pub use self::transport_ext::TransportExt; pub use libp2p_identity as identity; pub use libp2p_identity::PeerId; -pub use libp2p_swarm::StreamProtocol; +pub use libp2p_swarm::{Stream, StreamProtocol}; /// Builds a `Transport` based on TCP/IP that supports the most commonly-used features of libp2p: /// diff --git a/protocols/dcutr/src/protocol/inbound.rs b/protocols/dcutr/src/protocol/inbound.rs index 83fa926a550..d38b6f4559a 100644 --- a/protocols/dcutr/src/protocol/inbound.rs +++ b/protocols/dcutr/src/protocol/inbound.rs @@ -22,7 +22,7 @@ use crate::proto; use asynchronous_codec::Framed; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr}; -use libp2p_swarm::{NegotiatedSubstream, StreamProtocol}; +use libp2p_swarm::{Stream, StreamProtocol}; use std::convert::TryFrom; use std::iter; use thiserror::Error; @@ -38,12 +38,12 @@ impl upgrade::UpgradeInfo for Upgrade { } } -impl upgrade::InboundUpgrade for Upgrade { +impl upgrade::InboundUpgrade for Upgrade { type Output = PendingConnect; type Error = UpgradeError; type Future = BoxFuture<'static, Result>; - fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, substream: Stream, _: Self::Info) -> Self::Future { let mut substream = Framed::new( substream, quick_protobuf_codec::Codec::new(super::MAX_MESSAGE_SIZE_BYTES), @@ -92,7 +92,7 @@ impl upgrade::InboundUpgrade for Upgrade { } pub struct PendingConnect { - substream: Framed>, + substream: Framed>, remote_obs_addrs: Vec, } diff --git a/protocols/dcutr/src/protocol/outbound.rs b/protocols/dcutr/src/protocol/outbound.rs index 00b16e20617..960d98cbe66 100644 --- a/protocols/dcutr/src/protocol/outbound.rs +++ b/protocols/dcutr/src/protocol/outbound.rs @@ -24,7 +24,7 @@ use futures::{future::BoxFuture, prelude::*}; use futures_timer::Delay; use instant::Instant; use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr}; -use libp2p_swarm::{NegotiatedSubstream, StreamProtocol}; +use libp2p_swarm::{Stream, StreamProtocol}; use std::convert::TryFrom; use std::iter; use thiserror::Error; @@ -48,12 +48,12 @@ impl Upgrade { } } -impl upgrade::OutboundUpgrade for Upgrade { +impl upgrade::OutboundUpgrade for Upgrade { type Output = Connect; type Error = UpgradeError; type Future = BoxFuture<'static, Result>; - fn upgrade_outbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, substream: Stream, _: Self::Info) -> Self::Future { let mut substream = Framed::new( substream, quick_protobuf_codec::Codec::new(super::MAX_MESSAGE_SIZE_BYTES), diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 65a4a31b60c..5a5b91d00d1 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -33,7 +33,7 @@ use libp2p_swarm::handler::{ FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, StreamUpgradeError, SubstreamProtocol, }; -use libp2p_swarm::NegotiatedSubstream; +use libp2p_swarm::Stream; use smallvec::SmallVec; use std::{ pin::Pin, @@ -143,9 +143,9 @@ pub enum DisabledHandler { /// State of the inbound substream, opened either by us or by the remote. enum InboundSubstreamState { /// Waiting for a message from the remote. The idle state for an inbound substream. - WaitingInput(Framed), + WaitingInput(Framed), /// The substream is being closed. - Closing(Framed), + Closing(Framed), /// An error occurred during processing. Poisoned, } @@ -153,11 +153,11 @@ enum InboundSubstreamState { /// State of the outbound substream, opened either by us or by the remote. enum OutboundSubstreamState { /// Waiting for the user to send a message. The idle state for an outbound substream. - WaitingOutput(Framed), + WaitingOutput(Framed), /// Waiting to send a message to the remote. - PendingSend(Framed, proto::RPC), + PendingSend(Framed, proto::RPC), /// Waiting to flush the substream so that the data arrives to the remote. - PendingFlush(Framed), + PendingFlush(Framed), /// An error occurred during processing. Poisoned, } @@ -185,7 +185,7 @@ impl Handler { impl EnabledHandler { fn on_fully_negotiated_inbound( &mut self, - (substream, peer_kind): (Framed, PeerKind), + (substream, peer_kind): (Framed, PeerKind), ) { // update the known kind of peer if self.peer_kind.is_none() { diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 3fa123410ee..25daa8dcd15 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -33,7 +33,7 @@ use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, StreamUpgradeError, + ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream, StreamUpgradeError, SubstreamProtocol, }; use log::trace; @@ -116,20 +116,16 @@ pub struct KademliaHandlerConfig { /// State of an active outbound substream. enum OutboundSubstreamState { /// Waiting to send a message to the remote. - PendingSend( - KadOutStreamSink, - KadRequestMsg, - Option, - ), + PendingSend(KadOutStreamSink, KadRequestMsg, Option), /// Waiting to flush the substream so that the data arrives to the remote. - PendingFlush(KadOutStreamSink, Option), + PendingFlush(KadOutStreamSink, Option), /// Waiting for an answer back from the remote. // TODO: add timeout - WaitingAnswer(KadOutStreamSink, TUserData), + WaitingAnswer(KadOutStreamSink, TUserData), /// An error happened on the substream and we should report the error to the user. ReportError(KademliaHandlerQueryErr, TUserData), /// The substream is being closed. - Closing(KadOutStreamSink), + Closing(KadOutStreamSink), /// The substream is complete and will not perform any more work. Done, Poisoned, @@ -142,24 +138,16 @@ enum InboundSubstreamState { /// Whether it is the first message to be awaited on this stream. first: bool, connection_id: UniqueConnecId, - substream: KadInStreamSink, + substream: KadInStreamSink, }, /// Waiting for the behaviour to send a [`KademliaHandlerIn`] event containing the response. - WaitingBehaviour( - UniqueConnecId, - KadInStreamSink, - Option, - ), + WaitingBehaviour(UniqueConnecId, KadInStreamSink, Option), /// Waiting to send an answer back to the remote. - PendingSend( - UniqueConnecId, - KadInStreamSink, - KadResponseMsg, - ), + PendingSend(UniqueConnecId, KadInStreamSink, KadResponseMsg), /// Waiting to flush an answer back to the remote. - PendingFlush(UniqueConnecId, KadInStreamSink), + PendingFlush(UniqueConnecId, KadInStreamSink), /// The substream is being closed. - Closing(KadInStreamSink), + Closing(KadInStreamSink), /// The substream was cancelled in favor of a new one. Cancelled, @@ -813,7 +801,7 @@ impl Default for KademliaHandlerConfig { } } -impl Stream for OutboundSubstreamState +impl futures::Stream for OutboundSubstreamState where TUserData: Unpin, { @@ -949,7 +937,7 @@ where } } -impl Stream for InboundSubstreamState +impl futures::Stream for InboundSubstreamState where TUserData: Unpin, { diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index be05c88f7d4..94b484927b8 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -27,7 +27,7 @@ use libp2p_swarm::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, StreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream, StreamProtocol, StreamUpgradeError, SubstreamProtocol, }; use std::collections::VecDeque; @@ -390,15 +390,15 @@ impl ConnectionHandler for Handler { } } -type PingFuture = BoxFuture<'static, Result<(NegotiatedSubstream, Duration), io::Error>>; -type PongFuture = BoxFuture<'static, Result>; +type PingFuture = BoxFuture<'static, Result<(Stream, Duration), io::Error>>; +type PongFuture = BoxFuture<'static, Result>; /// The current state w.r.t. outbound pings. enum OutboundState { /// A new substream is being negotiated for the ping protocol. OpenStream, /// The substream is idle, waiting to send the next ping. - Idle(NegotiatedSubstream), + Idle(Stream), /// A ping is being sent and the response awaited. Ping(PingFuture), } diff --git a/protocols/relay/src/behaviour/handler.rs b/protocols/relay/src/behaviour/handler.rs index ff2abc65aa0..7494d055f36 100644 --- a/protocols/relay/src/behaviour/handler.rs +++ b/protocols/relay/src/behaviour/handler.rs @@ -37,8 +37,8 @@ use libp2p_swarm::handler::{ ListenUpgradeError, }; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionId, KeepAlive, NegotiatedSubstream, - StreamUpgradeError, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, ConnectionId, KeepAlive, Stream, StreamUpgradeError, + SubstreamProtocol, }; use std::collections::VecDeque; use std::fmt; @@ -77,7 +77,7 @@ pub enum In { dst_peer_id: PeerId, inbound_circuit_req: inbound_hop::CircuitReq, dst_handler_notifier: oneshot::Sender<()>, - dst_stream: NegotiatedSubstream, + dst_stream: Stream, dst_pending_data: Bytes, }, } @@ -193,7 +193,7 @@ pub enum Event { src_connection_id: ConnectionId, inbound_circuit_req: inbound_hop::CircuitReq, dst_handler_notifier: oneshot::Sender<()>, - dst_stream: NegotiatedSubstream, + dst_stream: Stream, dst_pending_data: Bytes, }, /// Negotiating an outbound substream for an inbound circuit request failed. @@ -914,10 +914,10 @@ pub struct OutboundOpenInfo { pub(crate) struct CircuitParts { circuit_id: CircuitId, - src_stream: NegotiatedSubstream, + src_stream: Stream, src_pending_data: Bytes, dst_peer_id: PeerId, dst_handler_notifier: oneshot::Sender<()>, - dst_stream: NegotiatedSubstream, + dst_stream: Stream, dst_pending_data: Bytes, } diff --git a/protocols/relay/src/priv_client.rs b/protocols/relay/src/priv_client.rs index 656196fb9cd..8592c57d3b6 100644 --- a/protocols/relay/src/priv_client.rs +++ b/protocols/relay/src/priv_client.rs @@ -39,8 +39,8 @@ use libp2p_identity::PeerId; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm}; use libp2p_swarm::dial_opts::DialOpts; use libp2p_swarm::{ - dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NegotiatedSubstream, - NetworkBehaviour, NotifyHandler, PollParameters, StreamUpgradeError, THandler, THandlerInEvent, + dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NetworkBehaviour, + NotifyHandler, PollParameters, Stream, StreamUpgradeError, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use std::collections::{hash_map, HashMap, VecDeque}; @@ -391,7 +391,7 @@ enum ConnectionState { }, Operational { read_buffer: Bytes, - substream: NegotiatedSubstream, + substream: Stream, /// "Drop notifier" pattern to signal to the transport that the connection has been dropped. /// /// This is flagged as "dead-code" by the compiler because we never read from it here. @@ -425,7 +425,7 @@ impl ConnectionState { } pub(crate) fn new_outbound( - substream: NegotiatedSubstream, + substream: Stream, read_buffer: Bytes, drop_notifier: oneshot::Sender, ) -> Self { diff --git a/protocols/relay/src/protocol/inbound_hop.rs b/protocols/relay/src/protocol/inbound_hop.rs index 1af258fc25b..c5886d93ba6 100644 --- a/protocols/relay/src/protocol/inbound_hop.rs +++ b/protocols/relay/src/protocol/inbound_hop.rs @@ -26,7 +26,7 @@ use futures::{future::BoxFuture, prelude::*}; use instant::{Duration, SystemTime}; use libp2p_core::{upgrade, Multiaddr}; use libp2p_identity::PeerId; -use libp2p_swarm::{NegotiatedSubstream, StreamProtocol}; +use libp2p_swarm::{Stream, StreamProtocol}; use std::convert::TryInto; use std::iter; use thiserror::Error; @@ -46,12 +46,12 @@ impl upgrade::UpgradeInfo for Upgrade { } } -impl upgrade::InboundUpgrade for Upgrade { +impl upgrade::InboundUpgrade for Upgrade { type Output = Req; type Error = UpgradeError; type Future = BoxFuture<'static, Result>; - fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, substream: Stream, _: Self::Info) -> Self::Future { let mut substream = Framed::new( substream, quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE), @@ -126,7 +126,7 @@ pub enum Req { } pub struct ReservationReq { - substream: Framed>, + substream: Framed>, reservation_duration: Duration, max_circuit_duration: Duration, max_circuit_bytes: u64, @@ -183,7 +183,7 @@ impl ReservationReq { pub struct CircuitReq { dst: PeerId, - substream: Framed>, + substream: Framed>, } impl CircuitReq { @@ -191,7 +191,7 @@ impl CircuitReq { self.dst } - pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), UpgradeError> { + pub async fn accept(mut self) -> Result<(Stream, Bytes), UpgradeError> { let msg = proto::HopMessage { type_pb: proto::HopMessageType::STATUS, peer: None, diff --git a/protocols/relay/src/protocol/inbound_stop.rs b/protocols/relay/src/protocol/inbound_stop.rs index bfffb6a1e9c..c279c8ee601 100644 --- a/protocols/relay/src/protocol/inbound_stop.rs +++ b/protocols/relay/src/protocol/inbound_stop.rs @@ -25,7 +25,7 @@ use bytes::Bytes; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::upgrade; use libp2p_identity::PeerId; -use libp2p_swarm::{NegotiatedSubstream, StreamProtocol}; +use libp2p_swarm::{Stream, StreamProtocol}; use std::iter; use thiserror::Error; @@ -40,12 +40,12 @@ impl upgrade::UpgradeInfo for Upgrade { } } -impl upgrade::InboundUpgrade for Upgrade { +impl upgrade::InboundUpgrade for Upgrade { type Output = Circuit; type Error = UpgradeError; type Future = BoxFuture<'static, Result>; - fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, substream: Stream, _: Self::Info) -> Self::Future { let mut substream = Framed::new( substream, quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE), @@ -111,7 +111,7 @@ pub enum FatalUpgradeError { } pub struct Circuit { - substream: Framed>, + substream: Framed>, src_peer_id: PeerId, limit: Option, } @@ -125,7 +125,7 @@ impl Circuit { self.limit } - pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), UpgradeError> { + pub async fn accept(mut self) -> Result<(Stream, Bytes), UpgradeError> { let msg = proto::StopMessage { type_pb: proto::StopMessageType::STATUS, peer: None, diff --git a/protocols/relay/src/protocol/outbound_hop.rs b/protocols/relay/src/protocol/outbound_hop.rs index 07d09157404..bec348e87db 100644 --- a/protocols/relay/src/protocol/outbound_hop.rs +++ b/protocols/relay/src/protocol/outbound_hop.rs @@ -27,7 +27,7 @@ use futures_timer::Delay; use instant::{Duration, SystemTime}; use libp2p_core::{upgrade, Multiaddr}; use libp2p_identity::PeerId; -use libp2p_swarm::{NegotiatedSubstream, StreamProtocol}; +use libp2p_swarm::{Stream, StreamProtocol}; use std::convert::TryFrom; use std::iter; use thiserror::Error; @@ -46,12 +46,12 @@ impl upgrade::UpgradeInfo for Upgrade { } } -impl upgrade::OutboundUpgrade for Upgrade { +impl upgrade::OutboundUpgrade for Upgrade { type Output = Output; type Error = UpgradeError; type Future = BoxFuture<'static, Result>; - fn upgrade_outbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, substream: Stream, _: Self::Info) -> Self::Future { let msg = match self { Upgrade::Reserve => proto::HopMessage { type_pb: proto::HopMessageType::RESERVE, @@ -269,7 +269,7 @@ pub enum Output { limit: Option, }, Circuit { - substream: NegotiatedSubstream, + substream: Stream, read_buffer: Bytes, limit: Option, }, diff --git a/protocols/relay/src/protocol/outbound_stop.rs b/protocols/relay/src/protocol/outbound_stop.rs index 782808acc57..836468a8605 100644 --- a/protocols/relay/src/protocol/outbound_stop.rs +++ b/protocols/relay/src/protocol/outbound_stop.rs @@ -25,7 +25,7 @@ use bytes::Bytes; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::upgrade; use libp2p_identity::PeerId; -use libp2p_swarm::{NegotiatedSubstream, StreamProtocol}; +use libp2p_swarm::{Stream, StreamProtocol}; use std::convert::TryInto; use std::iter; use std::time::Duration; @@ -46,12 +46,12 @@ impl upgrade::UpgradeInfo for Upgrade { } } -impl upgrade::OutboundUpgrade for Upgrade { - type Output = (NegotiatedSubstream, Bytes); +impl upgrade::OutboundUpgrade for Upgrade { + type Output = (Stream, Bytes); type Error = UpgradeError; type Future = BoxFuture<'static, Result>; - fn upgrade_outbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, substream: Stream, _: Self::Info) -> Self::Future { let msg = proto::StopMessage { type_pb: proto::StopMessageType::CONNECT, peer: Some(proto::Peer { diff --git a/protocols/rendezvous/src/handler/inbound.rs b/protocols/rendezvous/src/handler/inbound.rs index 5ed2e4052ab..bf0083780c5 100644 --- a/protocols/rendezvous/src/handler/inbound.rs +++ b/protocols/rendezvous/src/handler/inbound.rs @@ -26,7 +26,7 @@ use crate::handler::PROTOCOL_IDENT; use crate::substream_handler::{Next, PassthroughProtocol, SubstreamHandler}; use asynchronous_codec::Framed; use futures::{SinkExt, StreamExt}; -use libp2p_swarm::{NegotiatedSubstream, SubstreamProtocol}; +use libp2p_swarm::SubstreamProtocol; use std::fmt; use std::task::{Context, Poll}; @@ -35,13 +35,13 @@ use std::task::{Context, Poll}; #[allow(clippy::enum_variant_names)] pub enum Stream { /// We are in the process of reading a message from the substream. - PendingRead(Framed), + PendingRead(Framed), /// We read a message, dispatched it to the behaviour and are waiting for the response. - PendingBehaviour(Framed), + PendingBehaviour(Framed), /// We are in the process of sending a response. - PendingSend(Framed, Message), + PendingSend(Framed, Message), /// We've sent the message and are now closing down the substream. - PendingClose(Framed), + PendingClose(Framed), } impl fmt::Debug for Stream { @@ -93,7 +93,7 @@ impl SubstreamHandler for Stream { SubstreamProtocol::new(PassthroughProtocol::new(PROTOCOL_IDENT), open_info) } - fn new(substream: NegotiatedSubstream, _: Self::OpenInfo) -> Self { + fn new(substream: libp2p_swarm::Stream, _: Self::OpenInfo) -> Self { Stream::PendingRead(Framed::new(substream, RendezvousCodec::default())) } diff --git a/protocols/rendezvous/src/handler/outbound.rs b/protocols/rendezvous/src/handler/outbound.rs index d80bcdeb82a..dd44bf8c2b4 100644 --- a/protocols/rendezvous/src/handler/outbound.rs +++ b/protocols/rendezvous/src/handler/outbound.rs @@ -25,7 +25,7 @@ use crate::substream_handler::{FutureSubstream, Next, PassthroughProtocol, Subst use crate::{ErrorCode, Namespace, Registration, Ttl}; use asynchronous_codec::Framed; use futures::{SinkExt, TryFutureExt, TryStreamExt}; -use libp2p_swarm::{NegotiatedSubstream, SubstreamProtocol}; +use libp2p_swarm::SubstreamProtocol; use std::task::Context; use void::Void; @@ -43,7 +43,7 @@ impl SubstreamHandler for Stream { SubstreamProtocol::new(PassthroughProtocol::new(PROTOCOL_IDENT), open_info) } - fn new(substream: NegotiatedSubstream, info: Self::OpenInfo) -> Self { + fn new(substream: libp2p_swarm::Stream, info: Self::OpenInfo) -> Self { let mut stream = Framed::new(substream, RendezvousCodec::default()); let sent_message = match info { OpenInfo::RegisterRequest(new_registration) => Message::Register(new_registration), diff --git a/protocols/rendezvous/src/substream_handler.rs b/protocols/rendezvous/src/substream_handler.rs index e4645449795..289c6a36d7e 100644 --- a/protocols/rendezvous/src/substream_handler.rs +++ b/protocols/rendezvous/src/substream_handler.rs @@ -31,8 +31,7 @@ use instant::Instant; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_swarm::handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound}; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, StreamProtocol, - SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream, StreamProtocol, SubstreamProtocol, }; use std::collections::{HashMap, VecDeque}; use std::fmt; @@ -51,7 +50,7 @@ pub trait SubstreamHandler: Sized { fn upgrade(open_info: Self::OpenInfo) -> SubstreamProtocol; - fn new(substream: NegotiatedSubstream, info: Self::OpenInfo) -> Self; + fn new(substream: Stream, info: Self::OpenInfo) -> Self; fn on_event(self, event: Self::InEvent) -> Self; fn advance(self, cx: &mut Context<'_>) -> Result, Self::Error>; } @@ -541,7 +540,7 @@ impl SubstreamHandler for void::Void { type Error = void::Void; type OpenInfo = (); - fn new(_: NegotiatedSubstream, _: Self::OpenInfo) -> Self { + fn new(_: Stream, _: Self::OpenInfo) -> Self { unreachable!("we should never yield a substream") } diff --git a/protocols/request-response/src/handler/protocol.rs b/protocols/request-response/src/handler/protocol.rs index 84ef365734f..1368a3c1f98 100644 --- a/protocols/request-response/src/handler/protocol.rs +++ b/protocols/request-response/src/handler/protocol.rs @@ -28,7 +28,7 @@ use crate::RequestId; use futures::{channel::oneshot, future::BoxFuture, prelude::*}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use libp2p_swarm::NegotiatedSubstream; +use libp2p_swarm::Stream; use smallvec::SmallVec; use std::{fmt, io}; @@ -88,7 +88,7 @@ where } } -impl InboundUpgrade for ResponseProtocol +impl InboundUpgrade for ResponseProtocol where TCodec: Codec + Send + 'static, { @@ -96,11 +96,7 @@ where type Error = io::Error; type Future = BoxFuture<'static, Result>; - fn upgrade_inbound( - mut self, - mut io: NegotiatedSubstream, - protocol: Self::Info, - ) -> Self::Future { + fn upgrade_inbound(mut self, mut io: Stream, protocol: Self::Info) -> Self::Future { async move { let read = self.codec.read_request(&protocol, &mut io); let request = read.await?; @@ -163,7 +159,7 @@ where } } -impl OutboundUpgrade for RequestProtocol +impl OutboundUpgrade for RequestProtocol where TCodec: Codec + Send + 'static, { @@ -171,11 +167,7 @@ where type Error = io::Error; type Future = BoxFuture<'static, Result>; - fn upgrade_outbound( - mut self, - mut io: NegotiatedSubstream, - protocol: Self::Info, - ) -> Self::Future { + fn upgrade_outbound(mut self, mut io: Stream, protocol: Self::Info) -> Self::Future { async move { let write = self.codec.write_request(&protocol, &mut io, self.request); write.await?; diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index d9b1bac9d3a..fd1afb0a491 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -161,6 +161,7 @@ use std::{ /// /// Implements the [`AsyncRead`](futures::io::AsyncRead) and /// [`AsyncWrite`](futures::io::AsyncWrite) traits. +#[deprecated(note = "The 'substream' terminology is deprecated. Use 'Stream' instead")] pub type NegotiatedSubstream = Stream; /// Event generated by the [`NetworkBehaviour`] that the swarm will report back. From 0416f18c401502d49407aeab7f1fcfd313307493 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 11 May 2023 21:45:15 +0200 Subject: [PATCH 5/8] Add changelog entry --- swarm/CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 14a584217ad..7def948d7f0 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -42,6 +42,9 @@ See [PR 3651]. +- Deprecate the `NegotiatedSubstream` type and replace it with `Stream`. + See [PR XXXX]. + [PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605 [PR 3651]: https://github.com/libp2p/rust-libp2p/pull/3651 [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 @@ -51,6 +54,7 @@ [PR 3884]: https://github.com/libp2p/rust-libp2p/pull/3884 [PR 3885]: https://github.com/libp2p/rust-libp2p/pull/3885 [PR 3886]: https://github.com/libp2p/rust-libp2p/pull/3886 +[PR XXXX]: https://github.com/libp2p/rust-libp2p/pull/XXXX ## 0.42.2 From 17a18747859330faceaa220cd9b8ac44db80ebd0 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 11 May 2023 21:52:53 +0200 Subject: [PATCH 6/8] Fix doclink --- swarm/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index fd1afb0a491..0796a26f593 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -1380,7 +1380,7 @@ where /// connection and listener status. See [`SwarmEvent`] for details. /// /// Note: This stream is infinite and it is guaranteed that -/// [`Stream::poll_next`] will never return `Poll::Ready(None)`. +/// [`futures::Stream::poll_next`] will never return `Poll::Ready(None)`. impl futures::Stream for Swarm where TBehaviour: NetworkBehaviour, From 7fd8b69e8942a61fd559ef943b2cd43785d40482 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 12 May 2023 15:57:31 +1000 Subject: [PATCH 7/8] Apply suggestions from code review Co-authored-by: Max Inden --- swarm/CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 7def948d7f0..86023f5541f 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -43,7 +43,7 @@ See [PR 3651]. - Deprecate the `NegotiatedSubstream` type and replace it with `Stream`. - See [PR XXXX]. + See [PR 3912]. [PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605 [PR 3651]: https://github.com/libp2p/rust-libp2p/pull/3651 @@ -54,7 +54,7 @@ [PR 3884]: https://github.com/libp2p/rust-libp2p/pull/3884 [PR 3885]: https://github.com/libp2p/rust-libp2p/pull/3885 [PR 3886]: https://github.com/libp2p/rust-libp2p/pull/3886 -[PR XXXX]: https://github.com/libp2p/rust-libp2p/pull/XXXX +[PR 3912]: https://github.com/libp2p/rust-libp2p/pull/3912 ## 0.42.2 From c5de69b30148753dc45bceb39a148db4de0ec4e2 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 12 May 2023 07:59:47 +0200 Subject: [PATCH 8/8] Rename internal type --- swarm/src/connection.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 6dc2b9fc38b..30f95a29317 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -120,7 +120,7 @@ where handler: THandler, /// Futures that upgrade incoming substreams. negotiating_in: FuturesUnordered< - SubstreamUpgrade< + StreamUpgrade< THandler::InboundOpenInfo, ::Output, ::Error, @@ -128,7 +128,7 @@ where >, /// Futures that upgrade outgoing substreams. negotiating_out: FuturesUnordered< - SubstreamUpgrade< + StreamUpgrade< THandler::OutboundOpenInfo, ::Output, ::Error, @@ -398,7 +398,7 @@ where Poll::Ready(substream) => { let (user_data, timeout, upgrade) = requested_substream.extract(); - negotiating_out.push(SubstreamUpgrade::new_outbound( + negotiating_out.push(StreamUpgrade::new_outbound( substream, user_data, timeout, @@ -417,7 +417,7 @@ where Poll::Ready(substream) => { let protocol = handler.listen_protocol(); - negotiating_in.push(SubstreamUpgrade::new_inbound(substream, protocol)); + negotiating_in.push(StreamUpgrade::new_inbound(substream, protocol)); continue; // Go back to the top, handler can potentially make progress again. } @@ -472,13 +472,13 @@ impl<'a> IncomingInfo<'a> { } } -struct SubstreamUpgrade { +struct StreamUpgrade { user_data: Option, timeout: Delay, upgrade: BoxFuture<'static, Result>>, } -impl SubstreamUpgrade { +impl StreamUpgrade { fn new_outbound( substream: SubstreamBox, user_data: UserData, @@ -526,7 +526,7 @@ impl SubstreamUpgrade { } } -impl SubstreamUpgrade { +impl StreamUpgrade { fn new_inbound( substream: SubstreamBox, protocol: SubstreamProtocol, @@ -568,9 +568,9 @@ fn to_stream_upgrade_error(e: NegotiationError) -> StreamUpgradeError { } } -impl Unpin for SubstreamUpgrade {} +impl Unpin for StreamUpgrade {} -impl Future for SubstreamUpgrade { +impl Future for StreamUpgrade { type Output = (UserData, Result>); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll {