Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transports/tcp: Remove sleep_on_error #2849

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
target
!target/debug/examples/tokio_listen
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM ubuntu:latest
COPY target/debug/examples/tokio_listen /usr/bin/tokio_listen
CMD ["bash", "-c", "tokio_listen"]
23 changes: 23 additions & 0 deletions test-low-fd-limit-tcp-listener.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/usr/bin/bash

# This file build and run the `tokio_listen` example in a docker container with a low file descriptor limit.
# You can use the `tokio_dial` example to trigger the error. You might need to run it several times before
# the limit is actually hit.
#
# `cargo run --package libp2p-tcp --example tokio_dial --features tokio -- /ip4/127.0.0.1/tcp/9999`

cargo build --example tokio_listen --package libp2p-tcp --features tokio

FILE_DESC_LIMIT=13

stop_container() {
docker rm -f "$LISTEN_CONTAINER";
}

trap stop_container SIGINT;

LISTEN_CONTAINER=$(docker run -d -p 9999:9999 --ulimit nofile="$FILE_DESC_LIMIT":50 "$(docker build . --quiet)")

docker logs -f "$LISTEN_CONTAINER";


7 changes: 6 additions & 1 deletion transports/tcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ tokio = ["tokio-crate"]
async-io = ["async-io-crate"]

[dev-dependencies]
anyhow = "1.0.65"
async-std = { version = "1.6.5", features = ["attributes"] }
tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["net", "rt", "macros"] }
tokio-crate = { package = "tokio", version = "1.0.1", features = ["full", "macros"] }
env_logger = "0.9.0"

[[example]]
name = "tokio_listen"
required-features = ["tokio"]
64 changes: 64 additions & 0 deletions transports/tcp/examples/tokio_dial.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2022 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use anyhow::{Context, Result};
use futures::prelude::*;
use libp2p_core::Transport;
use libp2p_tcp::tokio::TcpStream;
use libp2p_tcp::GenTcpConfig;
use std::time::Duration;
use tokio_crate as tokio;

#[tokio::main]
async fn main() -> Result<()> {
let mut transport = libp2p_tcp::TokioTcpTransport::new(GenTcpConfig::new()).boxed();

let addr = std::env::args()
.skip(1)
.next()
.context("Missing argument")?
.parse()
.context("Failed to parse argument")?;

let stream = transport.dial(addr)?.await?;

ping_pong(stream).await?;

Ok(())
}

async fn ping_pong(mut stream: TcpStream) -> Result<()> {
let addr = stream.0.peer_addr()?;

loop {
stream.write_all(b"PING").await?;

eprintln!("Sent PING to {addr}",);

let mut buffer = [0u8; 4];
stream.read_exact(&mut buffer).await?;

anyhow::ensure!(&buffer == b"PONG");

eprintln!("Received PING from {addr}");

tokio::time::sleep(Duration::from_secs(1)).await;
}
}
85 changes: 85 additions & 0 deletions transports/tcp/examples/tokio_listen.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2022 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use anyhow::Result;
use futures::prelude::*;
use libp2p_core::transport::TransportEvent;
use libp2p_core::Transport;
use libp2p_tcp::tokio::TcpStream;
use libp2p_tcp::GenTcpConfig;
use tokio_crate as tokio;

#[tokio::main]
async fn main() -> Result<()> {
let mut transport = libp2p_tcp::TokioTcpTransport::new(GenTcpConfig::new()).boxed();

transport.listen_on("/ip4/0.0.0.0/tcp/9999".parse()?)?;

loop {
match transport.select_next_some().await {
TransportEvent::NewAddress { listen_addr, .. } => {
eprintln!("Listening on {listen_addr}");
}
TransportEvent::AddressExpired { listen_addr, .. } => {
eprintln!("No longer listening on {listen_addr}");
}
TransportEvent::Incoming {
send_back_addr,
upgrade,
..
} => {
eprintln!("Incoming connection from {send_back_addr}");

let task = tokio::spawn(async move {
let stream = upgrade.await?;
ping_pong(stream).await?;

anyhow::Ok(())
});
tokio::spawn(async move {
if let Err(e) = task.await {
eprintln!("Task failed: {e}")
}
});
}
TransportEvent::ListenerClosed { .. } => {}
TransportEvent::ListenerError { error, .. } => {
eprintln!("Listener encountered an error: {error}");
}
}
}
}

async fn ping_pong(mut stream: TcpStream) -> Result<()> {
let addr = stream.0.peer_addr()?;

loop {
let mut buffer = [0u8; 4];
stream.read_exact(&mut buffer).await?;

anyhow::ensure!(&buffer == b"PING");

eprintln!("Received PING from {addr}");

stream.write_all(b"PONG").await?;

eprintln!("Sent PONG to {addr}");
}
}
21 changes: 0 additions & 21 deletions transports/tcp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ use futures::{
future::{self, Ready},
prelude::*,
};
use futures_timer::Delay;
use libp2p_core::{
address_translation,
multiaddr::{Multiaddr, Protocol},
Expand All @@ -61,7 +60,6 @@ use std::{
pin::Pin,
sync::{Arc, RwLock},
task::{Context, Poll},
time::Duration,
};

use provider::{Incoming, Provider};
Expand Down Expand Up @@ -651,11 +649,6 @@ where
/// as local addresses for the sockets of outgoing connections. They are
/// unregistered when the stream encounters an error or is dropped.
port_reuse: PortReuse,
/// How long to sleep after a (non-fatal) error while trying
/// to accept a new connection.
sleep_on_error: Duration,
/// The current pause, if any.
pause: Option<Delay>,
}

impl<T> TcpListenStream<T>
Expand All @@ -679,8 +672,6 @@ where
listener_id,
listen_addr,
if_watcher,
pause: None,
sleep_on_error: Duration::from_millis(100),
})
}

Expand Down Expand Up @@ -725,16 +716,6 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let me = Pin::into_inner(self);

if let Some(mut pause) = me.pause.take() {
match pause.poll_unpin(cx) {
Poll::Ready(_) => {}
Poll::Pending => {
me.pause = Some(pause);
return Poll::Pending;
}
}
}

if let Some(if_watcher) = me.if_watcher.as_mut() {
while let Poll::Ready(event) = if_watcher.poll_if_event(cx) {
match event {
Expand All @@ -757,7 +738,6 @@ where
}
}
Err(err) => {
me.pause = Some(Delay::new(me.sleep_on_error));
return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err))));
}
}
Expand All @@ -784,7 +764,6 @@ where
}
Poll::Ready(Err(e)) => {
// These errors are non-fatal for the listener stream.
me.pause = Some(Delay::new(me.sleep_on_error));
return Poll::Ready(Some(Ok(TcpListenerEvent::Error(e))));
}
Poll::Pending => {}
Expand Down