Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(swarm): deprecate NegotiatedSubstream in favor of Stream #3912

Merged
merged 10 commits into from
May 12, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion libp2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ pub use self::swarm::Swarm;
pub use self::transport_ext::TransportExt;
pub use libp2p_identity as identity;
pub use libp2p_identity::PeerId;
pub use libp2p_swarm::StreamProtocol;
pub use libp2p_swarm::{Stream, StreamProtocol};

/// Builds a `Transport` based on TCP/IP that supports the most commonly-used features of libp2p:
///
Expand Down
8 changes: 4 additions & 4 deletions protocols/dcutr/src/protocol/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::proto;
use asynchronous_codec::Framed;
use futures::{future::BoxFuture, prelude::*};
use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr};
use libp2p_swarm::{NegotiatedSubstream, StreamProtocol};
use libp2p_swarm::{Stream, StreamProtocol};
use std::convert::TryFrom;
use std::iter;
use thiserror::Error;
Expand All @@ -38,12 +38,12 @@ impl upgrade::UpgradeInfo for Upgrade {
}
}

impl upgrade::InboundUpgrade<NegotiatedSubstream> for Upgrade {
impl upgrade::InboundUpgrade<Stream> for Upgrade {
type Output = PendingConnect;
type Error = UpgradeError;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;

fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future {
fn upgrade_inbound(self, substream: Stream, _: Self::Info) -> Self::Future {
let mut substream = Framed::new(
substream,
quick_protobuf_codec::Codec::new(super::MAX_MESSAGE_SIZE_BYTES),
Expand Down Expand Up @@ -92,7 +92,7 @@ impl upgrade::InboundUpgrade<NegotiatedSubstream> for Upgrade {
}

pub struct PendingConnect {
substream: Framed<NegotiatedSubstream, quick_protobuf_codec::Codec<proto::HolePunch>>,
substream: Framed<Stream, quick_protobuf_codec::Codec<proto::HolePunch>>,
remote_obs_addrs: Vec<Multiaddr>,
}

Expand Down
6 changes: 3 additions & 3 deletions protocols/dcutr/src/protocol/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use futures::{future::BoxFuture, prelude::*};
use futures_timer::Delay;
use instant::Instant;
use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr};
use libp2p_swarm::{NegotiatedSubstream, StreamProtocol};
use libp2p_swarm::{Stream, StreamProtocol};
use std::convert::TryFrom;
use std::iter;
use thiserror::Error;
Expand All @@ -48,12 +48,12 @@ impl Upgrade {
}
}

impl upgrade::OutboundUpgrade<NegotiatedSubstream> for Upgrade {
impl upgrade::OutboundUpgrade<Stream> for Upgrade {
type Output = Connect;
type Error = UpgradeError;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;

fn upgrade_outbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future {
fn upgrade_outbound(self, substream: Stream, _: Self::Info) -> Self::Future {
let mut substream = Framed::new(
substream,
quick_protobuf_codec::Codec::new(super::MAX_MESSAGE_SIZE_BYTES),
Expand Down
14 changes: 7 additions & 7 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use libp2p_swarm::handler::{
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, StreamUpgradeError,
SubstreamProtocol,
};
use libp2p_swarm::NegotiatedSubstream;
use libp2p_swarm::Stream;
use smallvec::SmallVec;
use std::{
pin::Pin,
Expand Down Expand Up @@ -143,21 +143,21 @@ pub enum DisabledHandler {
/// State of the inbound substream, opened either by us or by the remote.
enum InboundSubstreamState {
/// Waiting for a message from the remote. The idle state for an inbound substream.
WaitingInput(Framed<NegotiatedSubstream, GossipsubCodec>),
WaitingInput(Framed<Stream, GossipsubCodec>),
/// The substream is being closed.
Closing(Framed<NegotiatedSubstream, GossipsubCodec>),
Closing(Framed<Stream, GossipsubCodec>),
/// An error occurred during processing.
Poisoned,
}

/// State of the outbound substream, opened either by us or by the remote.
enum OutboundSubstreamState {
/// Waiting for the user to send a message. The idle state for an outbound substream.
WaitingOutput(Framed<NegotiatedSubstream, GossipsubCodec>),
WaitingOutput(Framed<Stream, GossipsubCodec>),
/// Waiting to send a message to the remote.
PendingSend(Framed<NegotiatedSubstream, GossipsubCodec>, proto::RPC),
PendingSend(Framed<Stream, GossipsubCodec>, proto::RPC),
/// Waiting to flush the substream so that the data arrives to the remote.
PendingFlush(Framed<NegotiatedSubstream, GossipsubCodec>),
PendingFlush(Framed<Stream, GossipsubCodec>),
/// An error occurred during processing.
Poisoned,
}
Expand Down Expand Up @@ -185,7 +185,7 @@ impl Handler {
impl EnabledHandler {
fn on_fully_negotiated_inbound(
&mut self,
(substream, peer_kind): (Framed<NegotiatedSubstream, GossipsubCodec>, PeerKind),
(substream, peer_kind): (Framed<Stream, GossipsubCodec>, PeerKind),
) {
// update the known kind of peer
if self.peer_kind.is_none() {
Expand Down
36 changes: 12 additions & 24 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, StreamUpgradeError,
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream, StreamUpgradeError,
SubstreamProtocol,
};
use log::trace;
Expand Down Expand Up @@ -116,20 +116,16 @@ pub struct KademliaHandlerConfig {
/// State of an active outbound substream.
enum OutboundSubstreamState<TUserData> {
/// Waiting to send a message to the remote.
PendingSend(
KadOutStreamSink<NegotiatedSubstream>,
KadRequestMsg,
Option<TUserData>,
),
PendingSend(KadOutStreamSink<Stream>, KadRequestMsg, Option<TUserData>),
/// Waiting to flush the substream so that the data arrives to the remote.
PendingFlush(KadOutStreamSink<NegotiatedSubstream>, Option<TUserData>),
PendingFlush(KadOutStreamSink<Stream>, Option<TUserData>),
/// Waiting for an answer back from the remote.
// TODO: add timeout
WaitingAnswer(KadOutStreamSink<NegotiatedSubstream>, TUserData),
WaitingAnswer(KadOutStreamSink<Stream>, TUserData),
/// An error happened on the substream and we should report the error to the user.
ReportError(KademliaHandlerQueryErr, TUserData),
/// The substream is being closed.
Closing(KadOutStreamSink<NegotiatedSubstream>),
Closing(KadOutStreamSink<Stream>),
/// The substream is complete and will not perform any more work.
Done,
Poisoned,
Expand All @@ -142,24 +138,16 @@ enum InboundSubstreamState<TUserData> {
/// Whether it is the first message to be awaited on this stream.
first: bool,
connection_id: UniqueConnecId,
substream: KadInStreamSink<NegotiatedSubstream>,
substream: KadInStreamSink<Stream>,
},
/// Waiting for the behaviour to send a [`KademliaHandlerIn`] event containing the response.
WaitingBehaviour(
UniqueConnecId,
KadInStreamSink<NegotiatedSubstream>,
Option<Waker>,
),
WaitingBehaviour(UniqueConnecId, KadInStreamSink<Stream>, Option<Waker>),
/// Waiting to send an answer back to the remote.
PendingSend(
UniqueConnecId,
KadInStreamSink<NegotiatedSubstream>,
KadResponseMsg,
),
PendingSend(UniqueConnecId, KadInStreamSink<Stream>, KadResponseMsg),
/// Waiting to flush an answer back to the remote.
PendingFlush(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>),
PendingFlush(UniqueConnecId, KadInStreamSink<Stream>),
/// The substream is being closed.
Closing(KadInStreamSink<NegotiatedSubstream>),
Closing(KadInStreamSink<Stream>),
/// The substream was cancelled in favor of a new one.
Cancelled,

Expand Down Expand Up @@ -813,7 +801,7 @@ impl Default for KademliaHandlerConfig {
}
}

impl<TUserData> Stream for OutboundSubstreamState<TUserData>
impl<TUserData> futures::Stream for OutboundSubstreamState<TUserData>
where
TUserData: Unpin,
{
Expand Down Expand Up @@ -949,7 +937,7 @@ where
}
}

impl<TUserData> Stream for InboundSubstreamState<TUserData>
impl<TUserData> futures::Stream for InboundSubstreamState<TUserData>
where
TUserData: Unpin,
{
Expand Down
8 changes: 4 additions & 4 deletions protocols/ping/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, StreamProtocol,
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream, StreamProtocol,
StreamUpgradeError, SubstreamProtocol,
};
use std::collections::VecDeque;
Expand Down Expand Up @@ -390,15 +390,15 @@ impl ConnectionHandler for Handler {
}
}

type PingFuture = BoxFuture<'static, Result<(NegotiatedSubstream, Duration), io::Error>>;
type PongFuture = BoxFuture<'static, Result<NegotiatedSubstream, io::Error>>;
type PingFuture = BoxFuture<'static, Result<(Stream, Duration), io::Error>>;
type PongFuture = BoxFuture<'static, Result<Stream, io::Error>>;

/// The current state w.r.t. outbound pings.
enum OutboundState {
/// A new substream is being negotiated for the ping protocol.
OpenStream,
/// The substream is idle, waiting to send the next ping.
Idle(NegotiatedSubstream),
Idle(Stream),
/// A ping is being sent and the response awaited.
Ping(PingFuture),
}
12 changes: 6 additions & 6 deletions protocols/relay/src/behaviour/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use libp2p_swarm::handler::{
ListenUpgradeError,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionId, KeepAlive, NegotiatedSubstream,
StreamUpgradeError, SubstreamProtocol,
ConnectionHandler, ConnectionHandlerEvent, ConnectionId, KeepAlive, Stream, StreamUpgradeError,
SubstreamProtocol,
};
use std::collections::VecDeque;
use std::fmt;
Expand Down Expand Up @@ -77,7 +77,7 @@ pub enum In {
dst_peer_id: PeerId,
inbound_circuit_req: inbound_hop::CircuitReq,
dst_handler_notifier: oneshot::Sender<()>,
dst_stream: NegotiatedSubstream,
dst_stream: Stream,
dst_pending_data: Bytes,
},
}
Expand Down Expand Up @@ -193,7 +193,7 @@ pub enum Event {
src_connection_id: ConnectionId,
inbound_circuit_req: inbound_hop::CircuitReq,
dst_handler_notifier: oneshot::Sender<()>,
dst_stream: NegotiatedSubstream,
dst_stream: Stream,
dst_pending_data: Bytes,
},
/// Negotiating an outbound substream for an inbound circuit request failed.
Expand Down Expand Up @@ -914,10 +914,10 @@ pub struct OutboundOpenInfo {

pub(crate) struct CircuitParts {
circuit_id: CircuitId,
src_stream: NegotiatedSubstream,
src_stream: Stream,
src_pending_data: Bytes,
dst_peer_id: PeerId,
dst_handler_notifier: oneshot::Sender<()>,
dst_stream: NegotiatedSubstream,
dst_stream: Stream,
dst_pending_data: Bytes,
}
8 changes: 4 additions & 4 deletions protocols/relay/src/priv_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use libp2p_identity::PeerId;
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm};
use libp2p_swarm::dial_opts::DialOpts;
use libp2p_swarm::{
dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NegotiatedSubstream,
NetworkBehaviour, NotifyHandler, PollParameters, StreamUpgradeError, THandler, THandlerInEvent,
dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NetworkBehaviour,
NotifyHandler, PollParameters, Stream, StreamUpgradeError, THandler, THandlerInEvent,
THandlerOutEvent, ToSwarm,
};
use std::collections::{hash_map, HashMap, VecDeque};
Expand Down Expand Up @@ -391,7 +391,7 @@ enum ConnectionState {
},
Operational {
read_buffer: Bytes,
substream: NegotiatedSubstream,
substream: Stream,
/// "Drop notifier" pattern to signal to the transport that the connection has been dropped.
///
/// This is flagged as "dead-code" by the compiler because we never read from it here.
Expand Down Expand Up @@ -425,7 +425,7 @@ impl ConnectionState {
}

pub(crate) fn new_outbound(
substream: NegotiatedSubstream,
substream: Stream,
read_buffer: Bytes,
drop_notifier: oneshot::Sender<void::Void>,
) -> Self {
Expand Down
12 changes: 6 additions & 6 deletions protocols/relay/src/protocol/inbound_hop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use futures::{future::BoxFuture, prelude::*};
use instant::{Duration, SystemTime};
use libp2p_core::{upgrade, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_swarm::{NegotiatedSubstream, StreamProtocol};
use libp2p_swarm::{Stream, StreamProtocol};
use std::convert::TryInto;
use std::iter;
use thiserror::Error;
Expand All @@ -46,12 +46,12 @@ impl upgrade::UpgradeInfo for Upgrade {
}
}

impl upgrade::InboundUpgrade<NegotiatedSubstream> for Upgrade {
impl upgrade::InboundUpgrade<Stream> for Upgrade {
type Output = Req;
type Error = UpgradeError;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;

fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future {
fn upgrade_inbound(self, substream: Stream, _: Self::Info) -> Self::Future {
let mut substream = Framed::new(
substream,
quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE),
Expand Down Expand Up @@ -126,7 +126,7 @@ pub enum Req {
}

pub struct ReservationReq {
substream: Framed<NegotiatedSubstream, quick_protobuf_codec::Codec<proto::HopMessage>>,
substream: Framed<Stream, quick_protobuf_codec::Codec<proto::HopMessage>>,
reservation_duration: Duration,
max_circuit_duration: Duration,
max_circuit_bytes: u64,
Expand Down Expand Up @@ -183,15 +183,15 @@ impl ReservationReq {

pub struct CircuitReq {
dst: PeerId,
substream: Framed<NegotiatedSubstream, quick_protobuf_codec::Codec<proto::HopMessage>>,
substream: Framed<Stream, quick_protobuf_codec::Codec<proto::HopMessage>>,
}

impl CircuitReq {
pub fn dst(&self) -> PeerId {
self.dst
}

pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), UpgradeError> {
pub async fn accept(mut self) -> Result<(Stream, Bytes), UpgradeError> {
let msg = proto::HopMessage {
type_pb: proto::HopMessageType::STATUS,
peer: None,
Expand Down
10 changes: 5 additions & 5 deletions protocols/relay/src/protocol/inbound_stop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use bytes::Bytes;
use futures::{future::BoxFuture, prelude::*};
use libp2p_core::upgrade;
use libp2p_identity::PeerId;
use libp2p_swarm::{NegotiatedSubstream, StreamProtocol};
use libp2p_swarm::{Stream, StreamProtocol};
use std::iter;
use thiserror::Error;

Expand All @@ -40,12 +40,12 @@ impl upgrade::UpgradeInfo for Upgrade {
}
}

impl upgrade::InboundUpgrade<NegotiatedSubstream> for Upgrade {
impl upgrade::InboundUpgrade<Stream> for Upgrade {
type Output = Circuit;
type Error = UpgradeError;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;

fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future {
fn upgrade_inbound(self, substream: Stream, _: Self::Info) -> Self::Future {
let mut substream = Framed::new(
substream,
quick_protobuf_codec::Codec::new(MAX_MESSAGE_SIZE),
Expand Down Expand Up @@ -111,7 +111,7 @@ pub enum FatalUpgradeError {
}

pub struct Circuit {
substream: Framed<NegotiatedSubstream, quick_protobuf_codec::Codec<proto::StopMessage>>,
substream: Framed<Stream, quick_protobuf_codec::Codec<proto::StopMessage>>,
src_peer_id: PeerId,
limit: Option<protocol::Limit>,
}
Expand All @@ -125,7 +125,7 @@ impl Circuit {
self.limit
}

pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), UpgradeError> {
pub async fn accept(mut self) -> Result<(Stream, Bytes), UpgradeError> {
let msg = proto::StopMessage {
type_pb: proto::StopMessageType::STATUS,
peer: None,
Expand Down
Loading