Skip to content

Commit

Permalink
fix(SwarmBuilder): prioritize relay, then websocket, then any other
Browse files Browse the repository at this point in the history
There is a niche case where dialing a relay address does not work if the other transport is a DNS transport. By composing the relay transport first, we avoid this issue.

Pull-Request: #4672.
  • Loading branch information
thomaseizinger authored Oct 18, 2023
1 parent 407dd42 commit 8df38eb
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 132 deletions.
12 changes: 6 additions & 6 deletions libp2p/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ mod select_security;
/// .with_quic()
/// .with_other_transport(|_key| DummyTransport::<(PeerId, StreamMuxerBox)>::new())?
/// .with_dns()?
/// .with_relay_client(
/// (libp2p_tls::Config::new, libp2p_noise::Config::new),
/// libp2p_yamux::Config::default,
/// )?
/// .with_websocket(
/// (libp2p_tls::Config::new, libp2p_noise::Config::new),
/// libp2p_yamux::Config::default,
/// )
/// .await?
/// .with_relay_client(
/// (libp2p_tls::Config::new, libp2p_noise::Config::new),
/// libp2p_yamux::Config::default,
/// )?
/// .with_behaviour(|_key, relay| MyBehaviour { relay })?
/// .build();
/// #
Expand Down Expand Up @@ -307,11 +307,11 @@ mod tests {
.with_quic()
.with_dns()
.unwrap()
.with_relay_client(libp2p_tls::Config::new, libp2p_yamux::Config::default)
.unwrap()
.with_websocket(libp2p_tls::Config::new, libp2p_yamux::Config::default)
.await
.unwrap()
.with_relay_client(libp2p_tls::Config::new, libp2p_yamux::Config::default)
.unwrap()
.with_bandwidth_logging();
let _: Swarm<MyBehaviour> = builder
.with_behaviour(|_key, relay| MyBehaviour { relay })
Expand Down
20 changes: 13 additions & 7 deletions libp2p/src/builder/phase/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ impl<T: AuthenticatedMultiplexedTransport> SwarmBuilder<super::provider::AsyncSt
pub async fn with_dns(
self,
) -> Result<
SwarmBuilder<super::provider::AsyncStd, RelayPhase<impl AuthenticatedMultiplexedTransport>>,
SwarmBuilder<
super::provider::AsyncStd,
WebsocketPhase<impl AuthenticatedMultiplexedTransport>,
>,
std::io::Error,
> {
Ok(SwarmBuilder {
keypair: self.keypair,
phantom: PhantomData,
phase: RelayPhase {
phase: WebsocketPhase {
transport: libp2p_dns::async_std::Transport::system(self.phase.transport).await?,
},
})
Expand All @@ -29,25 +32,28 @@ impl<T: AuthenticatedMultiplexedTransport> SwarmBuilder<super::provider::Tokio,
pub fn with_dns(
self,
) -> Result<
SwarmBuilder<super::provider::Tokio, RelayPhase<impl AuthenticatedMultiplexedTransport>>,
SwarmBuilder<
super::provider::Tokio,
WebsocketPhase<impl AuthenticatedMultiplexedTransport>,
>,
std::io::Error,
> {
Ok(SwarmBuilder {
keypair: self.keypair,
phantom: PhantomData,
phase: RelayPhase {
phase: WebsocketPhase {
transport: libp2p_dns::tokio::Transport::system(self.phase.transport)?,
},
})
}
}

impl<Provider, T> SwarmBuilder<Provider, DnsPhase<T>> {
pub(crate) fn without_dns(self) -> SwarmBuilder<Provider, RelayPhase<T>> {
pub(crate) fn without_dns(self) -> SwarmBuilder<Provider, WebsocketPhase<T>> {
SwarmBuilder {
keypair: self.keypair,
phantom: PhantomData,
phase: RelayPhase {
phase: WebsocketPhase {
transport: self.phase.transport,
},
}
Expand All @@ -61,8 +67,8 @@ impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, DnsP
constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
) -> Result<SwarmBuilder<Provider, SwarmPhase<T, B>>, R::Error> {
self.without_dns()
.without_relay()
.without_websocket()
.without_relay()
.with_behaviour(constructor)
}
}
17 changes: 12 additions & 5 deletions libp2p/src/builder/phase/other_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ impl<T: AuthenticatedMultiplexedTransport>
pub async fn with_dns(
self,
) -> Result<
SwarmBuilder<super::provider::AsyncStd, RelayPhase<impl AuthenticatedMultiplexedTransport>>,
SwarmBuilder<
super::provider::AsyncStd,
WebsocketPhase<impl AuthenticatedMultiplexedTransport>,
>,
std::io::Error,
> {
self.without_any_other_transports().with_dns().await
Expand All @@ -87,7 +90,10 @@ impl<T: AuthenticatedMultiplexedTransport>
pub fn with_dns(
self,
) -> Result<
SwarmBuilder<super::provider::Tokio, RelayPhase<impl AuthenticatedMultiplexedTransport>>,
SwarmBuilder<
super::provider::Tokio,
WebsocketPhase<impl AuthenticatedMultiplexedTransport>,
>,
std::io::Error,
> {
self.without_any_other_transports().with_dns()
Expand All @@ -105,7 +111,7 @@ impl<T: AuthenticatedMultiplexedTransport, Provider>
) -> Result<
SwarmBuilder<
Provider,
WebsocketPhase<impl AuthenticatedMultiplexedTransport, libp2p_relay::client::Behaviour>,
BandwidthLoggingPhase<impl AuthenticatedMultiplexedTransport, libp2p_relay::client::Behaviour>,
>,
SecUpgrade::Error,
> where
Expand All @@ -132,6 +138,7 @@ impl<T: AuthenticatedMultiplexedTransport, Provider>
{
self.without_any_other_transports()
.without_dns()
.without_websocket()
.with_relay_client(security_upgrade, multiplexer_upgrade)
}
}
Expand All @@ -149,8 +156,8 @@ impl<Provider, T: AuthenticatedMultiplexedTransport>
) {
self.without_any_other_transports()
.without_dns()
.without_relay()
.without_websocket()
.without_relay()
.with_bandwidth_logging()
}
}
Expand All @@ -163,8 +170,8 @@ impl<Provider, T: AuthenticatedMultiplexedTransport>
) -> Result<SwarmBuilder<Provider, SwarmPhase<T, B>>, R::Error> {
self.without_any_other_transports()
.without_dns()
.without_relay()
.without_websocket()
.without_relay()
.without_bandwidth_logging()
.with_behaviour(constructor)
}
Expand Down
22 changes: 15 additions & 7 deletions libp2p/src/builder/phase/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, Quic
) -> Result<
SwarmBuilder<
Provider,
super::websocket::WebsocketPhase<impl AuthenticatedMultiplexedTransport, libp2p_relay::client::Behaviour>,
BandwidthLoggingPhase<impl AuthenticatedMultiplexedTransport, libp2p_relay::client::Behaviour>,
>,
SecUpgrade::Error,
> where
Expand All @@ -108,6 +108,9 @@ impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, Quic
<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
{
self.without_quic()
.without_any_other_transports()
.without_dns()
.without_websocket()
.with_relay_client(security_upgrade, multiplexer_upgrade)
}

Expand Down Expand Up @@ -139,8 +142,8 @@ impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, Quic
self.without_quic()
.without_any_other_transports()
.without_dns()
.without_relay()
.without_websocket()
.without_relay()
.with_behaviour(constructor)
}
}
Expand All @@ -149,7 +152,10 @@ impl<T: AuthenticatedMultiplexedTransport> SwarmBuilder<super::provider::AsyncSt
pub async fn with_dns(
self,
) -> Result<
SwarmBuilder<super::provider::AsyncStd, RelayPhase<impl AuthenticatedMultiplexedTransport>>,
SwarmBuilder<
super::provider::AsyncStd,
WebsocketPhase<impl AuthenticatedMultiplexedTransport>,
>,
std::io::Error,
> {
self.without_quic()
Expand All @@ -163,7 +169,10 @@ impl<T: AuthenticatedMultiplexedTransport> SwarmBuilder<super::provider::Tokio,
pub fn with_dns(
self,
) -> Result<
SwarmBuilder<super::provider::Tokio, RelayPhase<impl AuthenticatedMultiplexedTransport>>,
SwarmBuilder<
super::provider::Tokio,
WebsocketPhase<impl AuthenticatedMultiplexedTransport>,
>,
std::io::Error,
> {
self.without_quic()
Expand All @@ -190,7 +199,7 @@ macro_rules! impl_quic_phase_with_websocket {
) -> Result<
SwarmBuilder<
$providerPascalCase,
BandwidthLoggingPhase<impl AuthenticatedMultiplexedTransport, NoRelayBehaviour>,
RelayPhase<impl AuthenticatedMultiplexedTransport>,
>,
super::websocket::WebsocketError<SecUpgrade::Error>,
>
Expand Down Expand Up @@ -218,7 +227,6 @@ macro_rules! impl_quic_phase_with_websocket {
self.without_quic()
.without_any_other_transports()
.without_dns()
.without_relay()
.with_websocket(security_upgrade, multiplexer_upgrade)
.await
}
Expand Down Expand Up @@ -250,8 +258,8 @@ impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, Quic
self.without_quic()
.without_any_other_transports()
.without_dns()
.without_relay()
.without_websocket()
.without_relay()
.with_bandwidth_logging()
}
}
91 changes: 12 additions & 79 deletions libp2p/src/builder/phase/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, Rela
) -> Result<
SwarmBuilder<
Provider,
WebsocketPhase<impl AuthenticatedMultiplexedTransport, libp2p_relay::client::Behaviour>,
BandwidthLoggingPhase<impl AuthenticatedMultiplexedTransport, libp2p_relay::client::Behaviour>,
>,
SecUpgrade::Error,
> where
Expand All @@ -78,20 +78,17 @@ impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, Rela
{
let (relay_transport, relay_behaviour) =
libp2p_relay::client::new(self.keypair.public().to_peer_id());
let relay_transport = relay_transport
.upgrade(libp2p_core::upgrade::Version::V1Lazy)
.authenticate(security_upgrade.into_security_upgrade(&self.keypair)?)
.multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
.map(|(p, c), _| (p, StreamMuxerBox::new(c)));

Ok(SwarmBuilder {
phase: WebsocketPhase {
phase: BandwidthLoggingPhase {
relay_behaviour,
transport: self
.phase
.transport
.or_transport(
relay_transport
.upgrade(libp2p_core::upgrade::Version::V1Lazy)
.authenticate(security_upgrade.into_security_upgrade(&self.keypair)?)
.multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
.map(|(p, c), _| (p, StreamMuxerBox::new(c))),
)
transport: relay_transport
.or_transport(self.phase.transport)
.map(|either, _| either.into_inner()),
},
keypair: self.keypair,
Expand All @@ -105,11 +102,11 @@ pub struct NoRelayBehaviour;
impl<Provider, T> SwarmBuilder<Provider, RelayPhase<T>> {
pub(crate) fn without_relay(
self,
) -> SwarmBuilder<Provider, WebsocketPhase<T, NoRelayBehaviour>> {
) -> SwarmBuilder<Provider, BandwidthLoggingPhase<T, NoRelayBehaviour>> {
SwarmBuilder {
keypair: self.keypair,
phantom: PhantomData,
phase: WebsocketPhase {
phase: BandwidthLoggingPhase {
transport: self.phase.transport,
relay_behaviour: NoRelayBehaviour,
},
Expand All @@ -123,70 +120,6 @@ impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, Rela
self,
constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
) -> Result<SwarmBuilder<Provider, SwarmPhase<T, B>>, R::Error> {
self.without_relay()
.without_websocket()
.with_behaviour(constructor)
self.without_relay().with_behaviour(constructor)
}
}
macro_rules! impl_relay_phase_with_websocket {
($providerKebabCase:literal, $providerPascalCase:ty, $websocketStream:ty) => {
#[cfg(all(feature = $providerKebabCase, not(target_arch = "wasm32"), feature = "websocket"))]
impl<T: AuthenticatedMultiplexedTransport> SwarmBuilder<$providerPascalCase, RelayPhase<T>> {
pub async fn with_websocket <
SecUpgrade,
SecStream,
SecError,
MuxUpgrade,
MuxStream,
MuxError,
> (
self,
security_upgrade: SecUpgrade,
multiplexer_upgrade: MuxUpgrade,
) -> Result<
SwarmBuilder<
$providerPascalCase,
BandwidthLoggingPhase<impl AuthenticatedMultiplexedTransport, NoRelayBehaviour>,
>,
super::websocket::WebsocketError<SecUpgrade::Error>,
>
where
SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
SecError: std::error::Error + Send + Sync + 'static,
SecUpgrade: IntoSecurityUpgrade<$websocketStream>,
SecUpgrade::Upgrade: InboundUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
<SecUpgrade::Upgrade as InboundUpgrade<Negotiated<$websocketStream>>>::Future: Send,
<SecUpgrade::Upgrade as OutboundUpgrade<Negotiated<$websocketStream>>>::Future: Send,
<<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::Info: Send,

MuxStream: StreamMuxer + Send + 'static,
MuxStream::Substream: Send + 'static,
MuxStream::Error: Send + Sync + 'static,
MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
MuxUpgrade::Upgrade: InboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
<MuxUpgrade::Upgrade as InboundUpgrade<Negotiated<SecStream>>>::Future: Send,
<MuxUpgrade::Upgrade as OutboundUpgrade<Negotiated<SecStream>>>::Future: Send,
MuxError: std::error::Error + Send + Sync + 'static,
<<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
{
self.without_relay()
.with_websocket(security_upgrade, multiplexer_upgrade)
.await
}
}
}
}
impl_relay_phase_with_websocket!(
"async-std",
super::provider::AsyncStd,
rw_stream_sink::RwStreamSink<
libp2p_websocket::BytesConnection<libp2p_tcp::async_io::TcpStream>,
>
);
impl_relay_phase_with_websocket!(
"tokio",
super::provider::Tokio,
rw_stream_sink::RwStreamSink<libp2p_websocket::BytesConnection<libp2p_tcp::tokio::TcpStream>>
);
3 changes: 1 addition & 2 deletions libp2p/src/builder/phase/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ macro_rules! impl_tcp_phase_with_websocket {
) -> Result<
SwarmBuilder<
$providerPascalCase,
BandwidthLoggingPhase<impl AuthenticatedMultiplexedTransport, NoRelayBehaviour>,
RelayPhase<impl AuthenticatedMultiplexedTransport>,
>,
WebsocketError<SecUpgrade::Error>,
>
Expand Down Expand Up @@ -205,7 +205,6 @@ macro_rules! impl_tcp_phase_with_websocket {
.without_quic()
.without_any_other_transports()
.without_dns()
.without_relay()
.with_websocket(security_upgrade, multiplexer_upgrade)
.await
}
Expand Down
Loading

0 comments on commit 8df38eb

Please sign in to comment.