Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(SwarmBuilder): prioritize relay, then websocket, then any other #4672

Merged
merged 6 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions libp2p/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ mod select_security;
/// .with_quic()
/// .with_other_transport(|_key| DummyTransport::<(PeerId, StreamMuxerBox)>::new())?
/// .with_dns()?
/// .with_relay_client(
/// (libp2p_tls::Config::new, libp2p_noise::Config::new),
/// libp2p_yamux::Config::default,
/// )?
/// .with_websocket(
/// (libp2p_tls::Config::new, libp2p_noise::Config::new),
/// libp2p_yamux::Config::default,
/// )
/// .await?
/// .with_relay_client(
/// (libp2p_tls::Config::new, libp2p_noise::Config::new),
/// libp2p_yamux::Config::default,
/// )?
/// .with_behaviour(|_key, relay| MyBehaviour { relay })?
/// .build();
/// #
Expand Down Expand Up @@ -307,11 +307,11 @@ mod tests {
.with_quic()
.with_dns()
.unwrap()
.with_relay_client(libp2p_tls::Config::new, libp2p_yamux::Config::default)
.unwrap()
.with_websocket(libp2p_tls::Config::new, libp2p_yamux::Config::default)
.await
.unwrap()
.with_relay_client(libp2p_tls::Config::new, libp2p_yamux::Config::default)
.unwrap()
.with_bandwidth_logging();
let _: Swarm<MyBehaviour> = builder
.with_behaviour(|_key, relay| MyBehaviour { relay })
Expand Down
20 changes: 13 additions & 7 deletions libp2p/src/builder/phase/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ impl<T: AuthenticatedMultiplexedTransport> SwarmBuilder<super::provider::AsyncSt
pub async fn with_dns(
self,
) -> Result<
SwarmBuilder<super::provider::AsyncStd, RelayPhase<impl AuthenticatedMultiplexedTransport>>,
SwarmBuilder<
super::provider::AsyncStd,
WebsocketPhase<impl AuthenticatedMultiplexedTransport>,
>,
std::io::Error,
> {
Ok(SwarmBuilder {
keypair: self.keypair,
phantom: PhantomData,
phase: RelayPhase {
phase: WebsocketPhase {
transport: libp2p_dns::async_std::Transport::system(self.phase.transport).await?,
},
})
Expand All @@ -29,25 +32,28 @@ impl<T: AuthenticatedMultiplexedTransport> SwarmBuilder<super::provider::Tokio,
pub fn with_dns(
self,
) -> Result<
SwarmBuilder<super::provider::Tokio, RelayPhase<impl AuthenticatedMultiplexedTransport>>,
SwarmBuilder<
super::provider::Tokio,
WebsocketPhase<impl AuthenticatedMultiplexedTransport>,
>,
std::io::Error,
> {
Ok(SwarmBuilder {
keypair: self.keypair,
phantom: PhantomData,
phase: RelayPhase {
phase: WebsocketPhase {
transport: libp2p_dns::tokio::Transport::system(self.phase.transport)?,
},
})
}
}

impl<Provider, T> SwarmBuilder<Provider, DnsPhase<T>> {
pub(crate) fn without_dns(self) -> SwarmBuilder<Provider, RelayPhase<T>> {
pub(crate) fn without_dns(self) -> SwarmBuilder<Provider, WebsocketPhase<T>> {
SwarmBuilder {
keypair: self.keypair,
phantom: PhantomData,
phase: RelayPhase {
phase: WebsocketPhase {
transport: self.phase.transport,
},
}
Expand All @@ -61,8 +67,8 @@ impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, DnsP
constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
) -> Result<SwarmBuilder<Provider, SwarmPhase<T, B>>, R::Error> {
self.without_dns()
.without_relay()
.without_websocket()
.without_relay()
.with_behaviour(constructor)
}
}
17 changes: 12 additions & 5 deletions libp2p/src/builder/phase/other_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ impl<T: AuthenticatedMultiplexedTransport>
pub async fn with_dns(
self,
) -> Result<
SwarmBuilder<super::provider::AsyncStd, RelayPhase<impl AuthenticatedMultiplexedTransport>>,
SwarmBuilder<
super::provider::AsyncStd,
WebsocketPhase<impl AuthenticatedMultiplexedTransport>,
>,
std::io::Error,
> {
self.without_any_other_transports().with_dns().await
Expand All @@ -87,7 +90,10 @@ impl<T: AuthenticatedMultiplexedTransport>
pub fn with_dns(
self,
) -> Result<
SwarmBuilder<super::provider::Tokio, RelayPhase<impl AuthenticatedMultiplexedTransport>>,
SwarmBuilder<
super::provider::Tokio,
WebsocketPhase<impl AuthenticatedMultiplexedTransport>,
>,
std::io::Error,
> {
self.without_any_other_transports().with_dns()
Expand All @@ -105,7 +111,7 @@ impl<T: AuthenticatedMultiplexedTransport, Provider>
) -> Result<
SwarmBuilder<
Provider,
WebsocketPhase<impl AuthenticatedMultiplexedTransport, libp2p_relay::client::Behaviour>,
BandwidthLoggingPhase<impl AuthenticatedMultiplexedTransport, libp2p_relay::client::Behaviour>,
>,
SecUpgrade::Error,
> where
Expand All @@ -132,6 +138,7 @@ impl<T: AuthenticatedMultiplexedTransport, Provider>
{
self.without_any_other_transports()
.without_dns()
.without_websocket()
.with_relay_client(security_upgrade, multiplexer_upgrade)
}
}
Expand All @@ -149,8 +156,8 @@ impl<Provider, T: AuthenticatedMultiplexedTransport>
) {
self.without_any_other_transports()
.without_dns()
.without_relay()
.without_websocket()
.without_relay()
.with_bandwidth_logging()
}
}
Expand All @@ -163,8 +170,8 @@ impl<Provider, T: AuthenticatedMultiplexedTransport>
) -> Result<SwarmBuilder<Provider, SwarmPhase<T, B>>, R::Error> {
self.without_any_other_transports()
.without_dns()
.without_relay()
.without_websocket()
.without_relay()
.without_bandwidth_logging()
.with_behaviour(constructor)
}
Expand Down
22 changes: 15 additions & 7 deletions libp2p/src/builder/phase/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, Quic
) -> Result<
SwarmBuilder<
Provider,
super::websocket::WebsocketPhase<impl AuthenticatedMultiplexedTransport, libp2p_relay::client::Behaviour>,
BandwidthLoggingPhase<impl AuthenticatedMultiplexedTransport, libp2p_relay::client::Behaviour>,
>,
SecUpgrade::Error,
> where
Expand All @@ -108,6 +108,9 @@ impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, Quic
<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
{
self.without_quic()
.without_any_other_transports()
.without_dns()
.without_websocket()
.with_relay_client(security_upgrade, multiplexer_upgrade)
}

Expand Down Expand Up @@ -139,8 +142,8 @@ impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, Quic
self.without_quic()
.without_any_other_transports()
.without_dns()
.without_relay()
.without_websocket()
.without_relay()
.with_behaviour(constructor)
}
}
Expand All @@ -149,7 +152,10 @@ impl<T: AuthenticatedMultiplexedTransport> SwarmBuilder<super::provider::AsyncSt
pub async fn with_dns(
self,
) -> Result<
SwarmBuilder<super::provider::AsyncStd, RelayPhase<impl AuthenticatedMultiplexedTransport>>,
SwarmBuilder<
super::provider::AsyncStd,
WebsocketPhase<impl AuthenticatedMultiplexedTransport>,
>,
std::io::Error,
> {
self.without_quic()
Expand All @@ -163,7 +169,10 @@ impl<T: AuthenticatedMultiplexedTransport> SwarmBuilder<super::provider::Tokio,
pub fn with_dns(
self,
) -> Result<
SwarmBuilder<super::provider::Tokio, RelayPhase<impl AuthenticatedMultiplexedTransport>>,
SwarmBuilder<
super::provider::Tokio,
WebsocketPhase<impl AuthenticatedMultiplexedTransport>,
>,
std::io::Error,
> {
self.without_quic()
Expand All @@ -190,7 +199,7 @@ macro_rules! impl_quic_phase_with_websocket {
) -> Result<
SwarmBuilder<
$providerPascalCase,
BandwidthLoggingPhase<impl AuthenticatedMultiplexedTransport, NoRelayBehaviour>,
RelayPhase<impl AuthenticatedMultiplexedTransport>,
>,
super::websocket::WebsocketError<SecUpgrade::Error>,
>
Expand Down Expand Up @@ -218,7 +227,6 @@ macro_rules! impl_quic_phase_with_websocket {
self.without_quic()
.without_any_other_transports()
.without_dns()
.without_relay()
.with_websocket(security_upgrade, multiplexer_upgrade)
.await
}
Expand Down Expand Up @@ -250,8 +258,8 @@ impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, Quic
self.without_quic()
.without_any_other_transports()
.without_dns()
.without_relay()
.without_websocket()
.without_relay()
.with_bandwidth_logging()
}
}
91 changes: 12 additions & 79 deletions libp2p/src/builder/phase/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, Rela
) -> Result<
SwarmBuilder<
Provider,
WebsocketPhase<impl AuthenticatedMultiplexedTransport, libp2p_relay::client::Behaviour>,
BandwidthLoggingPhase<impl AuthenticatedMultiplexedTransport, libp2p_relay::client::Behaviour>,
>,
SecUpgrade::Error,
> where
Expand All @@ -78,20 +78,17 @@ impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, Rela
{
let (relay_transport, relay_behaviour) =
libp2p_relay::client::new(self.keypair.public().to_peer_id());
let relay_transport = relay_transport
.upgrade(libp2p_core::upgrade::Version::V1Lazy)
.authenticate(security_upgrade.into_security_upgrade(&self.keypair)?)
.multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
.map(|(p, c), _| (p, StreamMuxerBox::new(c)));

Ok(SwarmBuilder {
phase: WebsocketPhase {
phase: BandwidthLoggingPhase {
relay_behaviour,
transport: self
.phase
.transport
.or_transport(
relay_transport
.upgrade(libp2p_core::upgrade::Version::V1Lazy)
.authenticate(security_upgrade.into_security_upgrade(&self.keypair)?)
.multiplex(multiplexer_upgrade.into_multiplexer_upgrade())
.map(|(p, c), _| (p, StreamMuxerBox::new(c))),
)
transport: relay_transport
.or_transport(self.phase.transport)
.map(|either, _| either.into_inner()),
},
keypair: self.keypair,
Expand All @@ -105,11 +102,11 @@ pub struct NoRelayBehaviour;
impl<Provider, T> SwarmBuilder<Provider, RelayPhase<T>> {
pub(crate) fn without_relay(
self,
) -> SwarmBuilder<Provider, WebsocketPhase<T, NoRelayBehaviour>> {
) -> SwarmBuilder<Provider, BandwidthLoggingPhase<T, NoRelayBehaviour>> {
SwarmBuilder {
keypair: self.keypair,
phantom: PhantomData,
phase: WebsocketPhase {
phase: BandwidthLoggingPhase {
transport: self.phase.transport,
relay_behaviour: NoRelayBehaviour,
},
Expand All @@ -123,70 +120,6 @@ impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, Rela
self,
constructor: impl FnOnce(&libp2p_identity::Keypair) -> R,
) -> Result<SwarmBuilder<Provider, SwarmPhase<T, B>>, R::Error> {
self.without_relay()
.without_websocket()
.with_behaviour(constructor)
self.without_relay().with_behaviour(constructor)
}
}
macro_rules! impl_relay_phase_with_websocket {
($providerKebabCase:literal, $providerPascalCase:ty, $websocketStream:ty) => {
#[cfg(all(feature = $providerKebabCase, not(target_arch = "wasm32"), feature = "websocket"))]
impl<T: AuthenticatedMultiplexedTransport> SwarmBuilder<$providerPascalCase, RelayPhase<T>> {
pub async fn with_websocket <
SecUpgrade,
SecStream,
SecError,
MuxUpgrade,
MuxStream,
MuxError,
> (
self,
security_upgrade: SecUpgrade,
multiplexer_upgrade: MuxUpgrade,
) -> Result<
SwarmBuilder<
$providerPascalCase,
BandwidthLoggingPhase<impl AuthenticatedMultiplexedTransport, NoRelayBehaviour>,
>,
super::websocket::WebsocketError<SecUpgrade::Error>,
>
where
SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
SecError: std::error::Error + Send + Sync + 'static,
SecUpgrade: IntoSecurityUpgrade<$websocketStream>,
SecUpgrade::Upgrade: InboundUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
<SecUpgrade::Upgrade as InboundUpgrade<Negotiated<$websocketStream>>>::Future: Send,
<SecUpgrade::Upgrade as OutboundUpgrade<Negotiated<$websocketStream>>>::Future: Send,
<<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::Info: Send,

MuxStream: StreamMuxer + Send + 'static,
MuxStream::Substream: Send + 'static,
MuxStream::Error: Send + Sync + 'static,
MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
MuxUpgrade::Upgrade: InboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
<MuxUpgrade::Upgrade as InboundUpgrade<Negotiated<SecStream>>>::Future: Send,
<MuxUpgrade::Upgrade as OutboundUpgrade<Negotiated<SecStream>>>::Future: Send,
MuxError: std::error::Error + Send + Sync + 'static,
<<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
{
self.without_relay()
.with_websocket(security_upgrade, multiplexer_upgrade)
.await
}
}
}
}
impl_relay_phase_with_websocket!(
"async-std",
super::provider::AsyncStd,
rw_stream_sink::RwStreamSink<
libp2p_websocket::BytesConnection<libp2p_tcp::async_io::TcpStream>,
>
);
impl_relay_phase_with_websocket!(
"tokio",
super::provider::Tokio,
rw_stream_sink::RwStreamSink<libp2p_websocket::BytesConnection<libp2p_tcp::tokio::TcpStream>>
);
3 changes: 1 addition & 2 deletions libp2p/src/builder/phase/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ macro_rules! impl_tcp_phase_with_websocket {
) -> Result<
SwarmBuilder<
$providerPascalCase,
BandwidthLoggingPhase<impl AuthenticatedMultiplexedTransport, NoRelayBehaviour>,
RelayPhase<impl AuthenticatedMultiplexedTransport>,
>,
WebsocketError<SecUpgrade::Error>,
>
Expand Down Expand Up @@ -205,7 +205,6 @@ macro_rules! impl_tcp_phase_with_websocket {
.without_quic()
.without_any_other_transports()
.without_dns()
.without_relay()
.with_websocket(security_upgrade, multiplexer_upgrade)
.await
}
Expand Down
Loading
Loading