Skip to content

Commit

Permalink
[swarm] Permit configuration override for the substream upgrade proto…
Browse files Browse the repository at this point in the history
…col to use. (#1858)

* Permit global configuration of a substream upgrade protocol override.

* Revert temporary test.

* Revert temporary test.

* Update swarm changelog.
  • Loading branch information
romanb authored Nov 25, 2020
1 parent 65c4bf8 commit 83e87f7
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 9 deletions.
4 changes: 4 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# 0.25.0 [unreleased]

- Permit a configuration override for the substream upgrade protocol
to use for all (outbound) substreams.
[PR 1858](https://github.com/libp2p/rust-libp2p/pull/1858).

- Changed parameters for connection limits from `usize` to `u32`.
Connection limits are now configured via `SwarmBuilder::connection_limits()`.

Expand Down
43 changes: 35 additions & 8 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ use libp2p_core::{
NetworkConfig,
peer::ConnectedPeer,
},
upgrade::ProtocolName,
upgrade::{ProtocolName},
};
use registry::{Addresses, AddressIntoIter};
use smallvec::SmallVec;
Expand Down Expand Up @@ -286,7 +286,10 @@ where
/// Pending event to be delivered to connection handlers
/// (or dropped if the peer disconnected) before the `behaviour`
/// can be polled again.
pending_event: Option<(PeerId, PendingNotifyHandler, TInEvent)>
pending_event: Option<(PeerId, PendingNotifyHandler, TInEvent)>,

/// The configured override for substream protocol upgrades, if any.
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
}

impl<TBehaviour, TInEvent, TOutEvent, THandler> Deref for
Expand Down Expand Up @@ -357,8 +360,10 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,

/// Initiates a new dialing attempt to the given address.
pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), ConnectionLimit> {
let handler = me.behaviour.new_handler();
me.network.dial(&addr, handler.into_node_handler_builder()).map(|_id| ())
let handler = me.behaviour.new_handler()
.into_node_handler_builder()
.with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override);
me.network.dial(&addr, handler).map(|_id| ())
}

/// Initiates a new dialing attempt to the given peer.
Expand All @@ -375,7 +380,9 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,

let result =
if let Some(first) = addrs.next() {
let handler = me.behaviour.new_handler().into_node_handler_builder();
let handler = me.behaviour.new_handler()
.into_node_handler_builder()
.with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override);
me.network.peer(peer_id.clone())
.dial(first, addrs, handler)
.map(|_| ())
Expand Down Expand Up @@ -546,10 +553,12 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
});
},
Poll::Ready(NetworkEvent::IncomingConnection { connection, .. }) => {
let handler = this.behaviour.new_handler();
let handler = this.behaviour.new_handler()
.into_node_handler_builder()
.with_substream_upgrade_protocol_override(this.substream_upgrade_protocol_override);
let local_addr = connection.local_addr.clone();
let send_back_addr = connection.send_back_addr.clone();
if let Err(e) = this.network.accept(connection, handler.into_node_handler_builder()) {
if let Err(e) = this.network.accept(connection, handler) {
log::warn!("Incoming connection rejected: {:?}", e);
}
return Poll::Ready(SwarmEvent::IncomingConnection {
Expand Down Expand Up @@ -962,6 +971,7 @@ pub struct SwarmBuilder<TBehaviour> {
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
network_config: NetworkConfig,
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
}

impl<TBehaviour> SwarmBuilder<TBehaviour>
Expand All @@ -980,6 +990,7 @@ where TBehaviour: NetworkBehaviour,
transport: transport,
behaviour,
network_config: Default::default(),
substream_upgrade_protocol_override: None,
}
}

Expand Down Expand Up @@ -1040,6 +1051,21 @@ where TBehaviour: NetworkBehaviour,
self
}

/// Configures an override for the substream upgrade protocol to use.
///
/// The subtream upgrade protocol is the multistream-select protocol
/// used for protocol negotiation on substreams. Since a listener
/// supports all existing versions, the choice of upgrade protocol
/// only effects the "dialer", i.e. the peer opening a substream.
///
/// > **Note**: If configured, specific upgrade protocols for
/// > individual [`SubstreamProtocol`]s emitted by the `NetworkBehaviour`
/// > are ignored.
pub fn substream_upgrade_protocol_override(mut self, v: libp2p_core::upgrade::Version) -> Self {
self.substream_upgrade_protocol_override = Some(v);
self
}

/// Builds a `Swarm` with the current configuration.
pub fn build(mut self) -> Swarm<TBehaviour> {
let supported_protocols = self.behaviour
Expand Down Expand Up @@ -1075,7 +1101,8 @@ where TBehaviour: NetworkBehaviour,
listened_addrs: SmallVec::new(),
external_addrs: Addresses::default(),
banned_peers: HashSet::new(),
pending_event: None
pending_event: None,
substream_upgrade_protocol_override: self.substream_upgrade_protocol_override,
}
}
}
Expand Down
22 changes: 21 additions & 1 deletion swarm/src/protocols_handler/node_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ use wasm_timer::{Delay, Instant};
pub struct NodeHandlerWrapperBuilder<TIntoProtoHandler> {
/// The underlying handler.
handler: TIntoProtoHandler,
/// The substream upgrade protocol override, if any.
substream_upgrade_protocol_override: Option<upgrade::Version>,
}

impl<TIntoProtoHandler> NodeHandlerWrapperBuilder<TIntoProtoHandler>
Expand All @@ -59,8 +61,17 @@ where
pub(crate) fn new(handler: TIntoProtoHandler) -> Self {
NodeHandlerWrapperBuilder {
handler,
substream_upgrade_protocol_override: None,
}
}

pub(crate) fn with_substream_upgrade_protocol_override(
mut self,
version: Option<upgrade::Version>
) -> Self {
self.substream_upgrade_protocol_override = version;
self
}
}

impl<TIntoProtoHandler, TProtoHandler> IntoConnectionHandler
Expand All @@ -79,6 +90,7 @@ where
queued_dial_upgrades: Vec::new(),
unique_dial_upgrade_id: 0,
shutdown: Shutdown::None,
substream_upgrade_protocol_override: self.substream_upgrade_protocol_override,
}
}
}
Expand Down Expand Up @@ -109,6 +121,8 @@ where
unique_dial_upgrade_id: u64,
/// The currently planned connection & handler shutdown.
shutdown: Shutdown,
/// The substream upgrade protocol override, if any.
substream_upgrade_protocol_override: Option<upgrade::Version>,
}

struct SubstreamUpgrade<UserData, Upgrade> {
Expand Down Expand Up @@ -254,7 +268,13 @@ where
}
};

let (_, (version, upgrade)) = self.queued_dial_upgrades.remove(pos);
let (_, (mut version, upgrade)) = self.queued_dial_upgrades.remove(pos);
if let Some(v) = self.substream_upgrade_protocol_override {
if v != version {
log::debug!("Substream upgrade protocol override: {:?} -> {:?}", version, v);
version = v;
}
}
let upgrade = upgrade::apply_outbound(substream, upgrade, version);
let timeout = Delay::new(timeout);
self.negotiating_out.push(SubstreamUpgrade {
Expand Down

0 comments on commit 83e87f7

Please sign in to comment.