diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000000..8a340ade3b2 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +target +!target/debug/examples/tokio_listen diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000000..c94eb421cfc --- /dev/null +++ b/Dockerfile @@ -0,0 +1,3 @@ +FROM ubuntu:latest +COPY target/debug/examples/tokio_listen /usr/bin/tokio_listen +CMD ["bash", "-c", "tokio_listen"] diff --git a/test-low-fd-limit-tcp-listener.sh b/test-low-fd-limit-tcp-listener.sh new file mode 100755 index 00000000000..e6206f0c422 --- /dev/null +++ b/test-low-fd-limit-tcp-listener.sh @@ -0,0 +1,23 @@ +#!/usr/bin/bash + +# This file build and run the `tokio_listen` example in a docker container with a low file descriptor limit. +# You can use the `tokio_dial` example to trigger the error. You might need to run it several times before +# the limit is actually hit. +# +# `cargo run --package libp2p-tcp --example tokio_dial --features tokio -- /ip4/127.0.0.1/tcp/9999` + +cargo build --example tokio_listen --package libp2p-tcp --features tokio + +FILE_DESC_LIMIT=13 + +stop_container() { + docker rm -f "$LISTEN_CONTAINER"; +} + +trap stop_container SIGINT; + +LISTEN_CONTAINER=$(docker run -d -p 9999:9999 --ulimit nofile="$FILE_DESC_LIMIT":50 "$(docker build . --quiet)") + +docker logs -f "$LISTEN_CONTAINER"; + + diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index 06517a84f67..974b0874aef 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -26,6 +26,11 @@ tokio = ["tokio-crate"] async-io = ["async-io-crate"] [dev-dependencies] +anyhow = "1.0.65" async-std = { version = "1.6.5", features = ["attributes"] } -tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["net", "rt", "macros"] } +tokio-crate = { package = "tokio", version = "1.0.1", features = ["full", "macros"] } env_logger = "0.9.0" + +[[example]] +name = "tokio_listen" +required-features = ["tokio"] diff --git a/transports/tcp/examples/tokio_dial.rs b/transports/tcp/examples/tokio_dial.rs new file mode 100644 index 00000000000..ac452bb1b8b --- /dev/null +++ b/transports/tcp/examples/tokio_dial.rs @@ -0,0 +1,64 @@ +// Copyright 2022 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use anyhow::{Context, Result}; +use futures::prelude::*; +use libp2p_core::Transport; +use libp2p_tcp::tokio::TcpStream; +use libp2p_tcp::GenTcpConfig; +use std::time::Duration; +use tokio_crate as tokio; + +#[tokio::main] +async fn main() -> Result<()> { + let mut transport = libp2p_tcp::TokioTcpTransport::new(GenTcpConfig::new()).boxed(); + + let addr = std::env::args() + .skip(1) + .next() + .context("Missing argument")? + .parse() + .context("Failed to parse argument")?; + + let stream = transport.dial(addr)?.await?; + + ping_pong(stream).await?; + + Ok(()) +} + +async fn ping_pong(mut stream: TcpStream) -> Result<()> { + let addr = stream.0.peer_addr()?; + + loop { + stream.write_all(b"PING").await?; + + eprintln!("Sent PING to {addr}",); + + let mut buffer = [0u8; 4]; + stream.read_exact(&mut buffer).await?; + + anyhow::ensure!(&buffer == b"PONG"); + + eprintln!("Received PING from {addr}"); + + tokio::time::sleep(Duration::from_secs(1)).await; + } +} diff --git a/transports/tcp/examples/tokio_listen.rs b/transports/tcp/examples/tokio_listen.rs new file mode 100644 index 00000000000..95b57d6c78a --- /dev/null +++ b/transports/tcp/examples/tokio_listen.rs @@ -0,0 +1,85 @@ +// Copyright 2022 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use anyhow::Result; +use futures::prelude::*; +use libp2p_core::transport::TransportEvent; +use libp2p_core::Transport; +use libp2p_tcp::tokio::TcpStream; +use libp2p_tcp::GenTcpConfig; +use tokio_crate as tokio; + +#[tokio::main] +async fn main() -> Result<()> { + let mut transport = libp2p_tcp::TokioTcpTransport::new(GenTcpConfig::new()).boxed(); + + transport.listen_on("/ip4/0.0.0.0/tcp/9999".parse()?)?; + + loop { + match transport.select_next_some().await { + TransportEvent::NewAddress { listen_addr, .. } => { + eprintln!("Listening on {listen_addr}"); + } + TransportEvent::AddressExpired { listen_addr, .. } => { + eprintln!("No longer listening on {listen_addr}"); + } + TransportEvent::Incoming { + send_back_addr, + upgrade, + .. + } => { + eprintln!("Incoming connection from {send_back_addr}"); + + let task = tokio::spawn(async move { + let stream = upgrade.await?; + ping_pong(stream).await?; + + anyhow::Ok(()) + }); + tokio::spawn(async move { + if let Err(e) = task.await { + eprintln!("Task failed: {e}") + } + }); + } + TransportEvent::ListenerClosed { .. } => {} + TransportEvent::ListenerError { error, .. } => { + eprintln!("Listener encountered an error: {error}"); + } + } + } +} + +async fn ping_pong(mut stream: TcpStream) -> Result<()> { + let addr = stream.0.peer_addr()?; + + loop { + let mut buffer = [0u8; 4]; + stream.read_exact(&mut buffer).await?; + + anyhow::ensure!(&buffer == b"PING"); + + eprintln!("Received PING from {addr}"); + + stream.write_all(b"PONG").await?; + + eprintln!("Sent PONG to {addr}"); + } +} diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index f7b897c0d47..3c424686b8f 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -47,7 +47,6 @@ use futures::{ future::{self, Ready}, prelude::*, }; -use futures_timer::Delay; use libp2p_core::{ address_translation, multiaddr::{Multiaddr, Protocol}, @@ -61,7 +60,6 @@ use std::{ pin::Pin, sync::{Arc, RwLock}, task::{Context, Poll}, - time::Duration, }; use provider::{Incoming, Provider}; @@ -651,11 +649,6 @@ where /// as local addresses for the sockets of outgoing connections. They are /// unregistered when the stream encounters an error or is dropped. port_reuse: PortReuse, - /// How long to sleep after a (non-fatal) error while trying - /// to accept a new connection. - sleep_on_error: Duration, - /// The current pause, if any. - pause: Option, } impl TcpListenStream @@ -679,8 +672,6 @@ where listener_id, listen_addr, if_watcher, - pause: None, - sleep_on_error: Duration::from_millis(100), }) } @@ -725,16 +716,6 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let me = Pin::into_inner(self); - if let Some(mut pause) = me.pause.take() { - match pause.poll_unpin(cx) { - Poll::Ready(_) => {} - Poll::Pending => { - me.pause = Some(pause); - return Poll::Pending; - } - } - } - if let Some(if_watcher) = me.if_watcher.as_mut() { while let Poll::Ready(event) = if_watcher.poll_if_event(cx) { match event { @@ -757,7 +738,6 @@ where } } Err(err) => { - me.pause = Some(Delay::new(me.sleep_on_error)); return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err)))); } } @@ -784,7 +764,6 @@ where } Poll::Ready(Err(e)) => { // These errors are non-fatal for the listener stream. - me.pause = Some(Delay::new(me.sleep_on_error)); return Poll::Ready(Some(Ok(TcpListenerEvent::Error(e)))); } Poll::Pending => {}