From 6e0291d980dd514ca042471098f917b17b477ee7 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 26 Aug 2022 16:25:26 +0900 Subject: [PATCH 1/3] transports/tcp: Remove sleep_on_error The `sleep_on_error` mechanism in `libp2p-tcp` would delay the next poll on the listener stream when an error happens. This mechanism was introduced in https://github.com/libp2p/rust-libp2p/pull/402 based on the [`tk_listen`](https://docs.rs/tk-listen/latest/tk_listen/) crate also referenced in the tokio documentation. When running out of file descriptors, the listening socket would return an `EMFILE`. Instead of polling the socket again, thus likely receiving another `EMFILE`, one would instead wait for 100ms. Modern operating systems should run with high file descriptor limits and thus this error should not happen in the wild. In addition, delaying the next poll only covers up the issue, but does not solve it. With the above in mind, this pull request removes the daly. (The mention of `tk_listen` has since been removed from tokio.) --- transports/tcp/src/lib.rs | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 981c896bcb5..98e4ebecd67 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -47,7 +47,6 @@ use futures::{ prelude::*, ready, }; -use futures_timer::Delay; use libp2p_core::{ address_translation, multiaddr::{Multiaddr, Protocol}, @@ -61,7 +60,6 @@ use std::{ pin::Pin, sync::{Arc, RwLock}, task::{Context, Poll}, - time::Duration, }; use provider::{IfEvent, Provider}; @@ -650,11 +648,6 @@ where /// as local addresses for the sockets of outgoing connections. They are /// unregistered when the stream encounters an error or is dropped. port_reuse: PortReuse, - /// How long to sleep after a (non-fatal) error while trying - /// to accept a new connection. - sleep_on_error: Duration, - /// The current pause, if any. - pause: Option, } impl TcpListenStream @@ -695,8 +688,6 @@ where listener_id, listen_addr, in_addr, - pause: None, - sleep_on_error: Duration::from_millis(100), }) } @@ -756,7 +747,6 @@ where err }; *if_watch = IfWatch::Pending(T::if_watcher()); - me.pause = Some(Delay::new(me.sleep_on_error)); return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err)))); } }, @@ -793,7 +783,6 @@ where "Failure polling interfaces: {:?}. Scheduling retry.", err }; - me.pause = Some(Delay::new(me.sleep_on_error)); return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err)))); } } @@ -810,16 +799,6 @@ where } } - if let Some(mut pause) = me.pause.take() { - match Pin::new(&mut pause).poll(cx) { - Poll::Ready(_) => {} - Poll::Pending => { - me.pause = Some(pause); - return Poll::Pending; - } - } - } - // Take the pending connection from the backlog. let incoming = match T::poll_accept(&mut me.listener, cx) { Poll::Pending => return Poll::Pending, @@ -827,7 +806,6 @@ where Poll::Ready(Err(e)) => { // These errors are non-fatal for the listener stream. log::error!("error accepting incoming connection: {}", e); - me.pause = Some(Delay::new(me.sleep_on_error)); return Poll::Ready(Some(Ok(TcpListenerEvent::Error(e)))); } }; From decdffc978fcf4a96572ceb31f022d673e045a3a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 30 Sep 2022 12:32:22 +1000 Subject: [PATCH 2/3] Add infrastructure to test low file system limit --- .dockerignore | 2 + Dockerfile | 3 + test-low-fd-limit-tcp-listener.sh | 21 ++++++ transports/tcp/Cargo.toml | 7 +- transports/tcp/examples/tokio_dial.rs | 64 +++++++++++++++++++ transports/tcp/examples/tokio_listen.rs | 85 +++++++++++++++++++++++++ 6 files changed, 181 insertions(+), 1 deletion(-) create mode 100644 .dockerignore create mode 100644 Dockerfile create mode 100755 test-low-fd-limit-tcp-listener.sh create mode 100644 transports/tcp/examples/tokio_dial.rs create mode 100644 transports/tcp/examples/tokio_listen.rs diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000000..8a340ade3b2 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +target +!target/debug/examples/tokio_listen diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000000..c94eb421cfc --- /dev/null +++ b/Dockerfile @@ -0,0 +1,3 @@ +FROM ubuntu:latest +COPY target/debug/examples/tokio_listen /usr/bin/tokio_listen +CMD ["bash", "-c", "tokio_listen"] diff --git a/test-low-fd-limit-tcp-listener.sh b/test-low-fd-limit-tcp-listener.sh new file mode 100755 index 00000000000..84f6130c6d5 --- /dev/null +++ b/test-low-fd-limit-tcp-listener.sh @@ -0,0 +1,21 @@ +#!/usr/bin/bash + +# This file build and run the `tokio_listen` example in a docker container with a low file descriptor limit. +# You can use the `tokio_dial` example to trigger the error. You might need to run it several times before +# the limit is actually hit. +# +# `cargo run --package libp2p-tcp --example tokio_dial --features tokio -- /ip4/127.0.0.1/tcp/9999` + +FILE_DESC_LIMIT=12 + +stop_container() { + docker rm -f "$LISTEN_CONTAINER"; +} + +trap stop_container SIGINT; + +LISTEN_CONTAINER=$(docker run -d -p 9999:9999 --ulimit nofile="$FILE_DESC_LIMIT":50 "$(docker build . --quiet)") + +docker logs -f "$LISTEN_CONTAINER"; + + diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index 7273db58c51..5a86aaea5a5 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -29,6 +29,11 @@ tokio = ["tokio-crate", "if-addrs"] async-io = ["async-io-crate", "if-watch"] [dev-dependencies] +anyhow = "1.0.65" async-std = { version = "1.6.5", features = ["attributes"] } -tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["net", "rt"] } +tokio-crate = { package = "tokio", version = "1.0.1", features = ["full"] } env_logger = "0.9.0" + +[[example]] +name = "tokio_listen" +required-features = ["tokio"] diff --git a/transports/tcp/examples/tokio_dial.rs b/transports/tcp/examples/tokio_dial.rs new file mode 100644 index 00000000000..ac452bb1b8b --- /dev/null +++ b/transports/tcp/examples/tokio_dial.rs @@ -0,0 +1,64 @@ +// Copyright 2022 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use anyhow::{Context, Result}; +use futures::prelude::*; +use libp2p_core::Transport; +use libp2p_tcp::tokio::TcpStream; +use libp2p_tcp::GenTcpConfig; +use std::time::Duration; +use tokio_crate as tokio; + +#[tokio::main] +async fn main() -> Result<()> { + let mut transport = libp2p_tcp::TokioTcpTransport::new(GenTcpConfig::new()).boxed(); + + let addr = std::env::args() + .skip(1) + .next() + .context("Missing argument")? + .parse() + .context("Failed to parse argument")?; + + let stream = transport.dial(addr)?.await?; + + ping_pong(stream).await?; + + Ok(()) +} + +async fn ping_pong(mut stream: TcpStream) -> Result<()> { + let addr = stream.0.peer_addr()?; + + loop { + stream.write_all(b"PING").await?; + + eprintln!("Sent PING to {addr}",); + + let mut buffer = [0u8; 4]; + stream.read_exact(&mut buffer).await?; + + anyhow::ensure!(&buffer == b"PONG"); + + eprintln!("Received PING from {addr}"); + + tokio::time::sleep(Duration::from_secs(1)).await; + } +} diff --git a/transports/tcp/examples/tokio_listen.rs b/transports/tcp/examples/tokio_listen.rs new file mode 100644 index 00000000000..95b57d6c78a --- /dev/null +++ b/transports/tcp/examples/tokio_listen.rs @@ -0,0 +1,85 @@ +// Copyright 2022 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use anyhow::Result; +use futures::prelude::*; +use libp2p_core::transport::TransportEvent; +use libp2p_core::Transport; +use libp2p_tcp::tokio::TcpStream; +use libp2p_tcp::GenTcpConfig; +use tokio_crate as tokio; + +#[tokio::main] +async fn main() -> Result<()> { + let mut transport = libp2p_tcp::TokioTcpTransport::new(GenTcpConfig::new()).boxed(); + + transport.listen_on("/ip4/0.0.0.0/tcp/9999".parse()?)?; + + loop { + match transport.select_next_some().await { + TransportEvent::NewAddress { listen_addr, .. } => { + eprintln!("Listening on {listen_addr}"); + } + TransportEvent::AddressExpired { listen_addr, .. } => { + eprintln!("No longer listening on {listen_addr}"); + } + TransportEvent::Incoming { + send_back_addr, + upgrade, + .. + } => { + eprintln!("Incoming connection from {send_back_addr}"); + + let task = tokio::spawn(async move { + let stream = upgrade.await?; + ping_pong(stream).await?; + + anyhow::Ok(()) + }); + tokio::spawn(async move { + if let Err(e) = task.await { + eprintln!("Task failed: {e}") + } + }); + } + TransportEvent::ListenerClosed { .. } => {} + TransportEvent::ListenerError { error, .. } => { + eprintln!("Listener encountered an error: {error}"); + } + } + } +} + +async fn ping_pong(mut stream: TcpStream) -> Result<()> { + let addr = stream.0.peer_addr()?; + + loop { + let mut buffer = [0u8; 4]; + stream.read_exact(&mut buffer).await?; + + anyhow::ensure!(&buffer == b"PING"); + + eprintln!("Received PING from {addr}"); + + stream.write_all(b"PONG").await?; + + eprintln!("Sent PONG to {addr}"); + } +} From 6960ab42a44268d5b78fe4fd60f7b4369f9d16e8 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 30 Sep 2022 12:41:10 +1000 Subject: [PATCH 3/3] Adjust to new implementation --- test-low-fd-limit-tcp-listener.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test-low-fd-limit-tcp-listener.sh b/test-low-fd-limit-tcp-listener.sh index 84f6130c6d5..e6206f0c422 100755 --- a/test-low-fd-limit-tcp-listener.sh +++ b/test-low-fd-limit-tcp-listener.sh @@ -6,7 +6,9 @@ # # `cargo run --package libp2p-tcp --example tokio_dial --features tokio -- /ip4/127.0.0.1/tcp/9999` -FILE_DESC_LIMIT=12 +cargo build --example tokio_listen --package libp2p-tcp --features tokio + +FILE_DESC_LIMIT=13 stop_container() { docker rm -f "$LISTEN_CONTAINER";