Skip to content

Commit

Permalink
transports/tcp: revert Stream impl for GenTcpTransport
Browse files Browse the repository at this point in the history
With PR libp2p#2667 the `Sync` trait bound for transport::Boxed is removed.
If a tcp transport should be polled as a stream we can now do this via
`TcpTransport::new(..)::boxed` and do not need a separate impl of
`Stream` for it.
  • Loading branch information
elenaf9 committed May 29, 2022
1 parent d7f5019 commit b4164e8
Showing 1 changed file with 8 additions and 34 deletions.
42 changes: 8 additions & 34 deletions transports/tcp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ use futures::{
future::{self, BoxFuture, Ready},
prelude::*,
ready,
stream::FusedStream,
};
use futures_timer::Delay;
use libp2p_core::{
Expand Down Expand Up @@ -306,7 +305,7 @@ impl GenTcpConfig {
/// let listen_addr1: Multiaddr = "/ip4/127.0.0.1/tcp/9001".parse().unwrap();
/// let listen_addr2: Multiaddr = "/ip4/127.0.0.1/tcp/9002".parse().unwrap();
///
/// let mut tcp1 = TcpTransport::new(GenTcpConfig::new().port_reuse(true));
/// let mut tcp1 = TcpTransport::new(GenTcpConfig::new().port_reuse(true)).boxed();
/// tcp1.listen_on(ListenerId::new(1), listen_addr1.clone()).expect("listener");
/// match tcp1.select_next_some().await {
/// TransportEvent::NewAddress { listen_addr, .. } => {
Expand All @@ -317,7 +316,7 @@ impl GenTcpConfig {
/// _ => {}
/// }
///
/// let mut tcp2 = TcpTransport::new(GenTcpConfig::new().port_reuse(true));
/// let mut tcp2 = TcpTransport::new(GenTcpConfig::new().port_reuse(true)).boxed();
/// tcp2.listen_on(ListenerId::new(1), listen_addr2).expect("listener");
/// match tcp2.select_next_some().await {
/// TransportEvent::NewAddress { listen_addr, .. } => {
Expand Down Expand Up @@ -551,31 +550,6 @@ where
}
}

impl<T> Stream for GenTcpTransport<T>
where
T: Provider + Send + 'static,
T::Listener: Unpin,
T::IfWatcher: Unpin,
T::Stream: Unpin,
{
type Item = TransportEvent<<Self as Transport>::ListenerUpgrade, <Self as Transport>::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Transport::poll(self, cx).map(Some)
}
}
impl<T> FusedStream for GenTcpTransport<T>
where
T: Provider + Send + 'static,
T::Listener: Unpin,
T::IfWatcher: Unpin,
T::Stream: Unpin,
{
fn is_terminated(&self) -> bool {
false
}
}

#[derive(Debug)]
pub enum TcpTransportEvent<S> {
/// The transport is listening on a new additional [`Multiaddr`].
Expand Down Expand Up @@ -937,7 +911,7 @@ mod tests {
env_logger::try_init().ok();

async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new());
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new()).boxed();
tcp.listen_on(ListenerId::new(1), addr).unwrap();
loop {
match tcp.select_next_some().await {
Expand Down Expand Up @@ -1006,7 +980,7 @@ mod tests {
env_logger::try_init().ok();

async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new());
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new()).boxed();
tcp.listen_on(ListenerId::new(1), addr).unwrap();

loop {
Expand Down Expand Up @@ -1075,7 +1049,7 @@ mod tests {
env_logger::try_init().ok();

async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new());
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new()).boxed();
tcp.listen_on(ListenerId::new(1), addr).unwrap();
loop {
match tcp.select_next_some().await {
Expand All @@ -1097,7 +1071,7 @@ mod tests {

async fn dialer<T: Provider>(addr: Multiaddr, mut ready_rx: mpsc::Receiver<Multiaddr>) {
let dest_addr = ready_rx.next().await.unwrap();
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new().port_reuse(true));
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new().port_reuse(true)).boxed();
tcp.listen_on(ListenerId::new(1), addr).unwrap();
match tcp.select_next_some().await {
TransportEvent::NewAddress { .. } => {
Expand Down Expand Up @@ -1155,7 +1129,7 @@ mod tests {
T::IfWatcher: Sync,
T::Stream: Sync,
{
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new().port_reuse(true));
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new().port_reuse(true)).boxed();
tcp.listen_on(ListenerId::new(1), addr).unwrap();
match tcp.select_next_some().await {
TransportEvent::NewAddress {
Expand Down Expand Up @@ -1207,7 +1181,7 @@ mod tests {
T: Provider,
T::IfWatcher: Sync,
{
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new());
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new()).boxed();
tcp.listen_on(ListenerId::new(1), addr).unwrap();
tcp.select_next_some()
.await
Expand Down

0 comments on commit b4164e8

Please sign in to comment.