From ec48b0454e51f6217df77f7281eac646a3b9f683 Mon Sep 17 00:00:00 2001 From: Aaro Altonen <48052676+altonen@users.noreply.github.com> Date: Wed, 12 Jun 2024 11:08:23 +0300 Subject: [PATCH] transport: Make `TCP_NODELAY` configurable (#146) Allow configuring `TCP_NODELAY` through TCP/WebSocket configurations. Polkadot configures it to `true` btw: https://github.com/paritytech/polkadot-sdk/blob/b65313e81465dd730e48d4ce00deb76922618375/substrate/client/network/src/transport.rs#L58 --- src/transport/tcp/config.rs | 6 ++++++ src/transport/tcp/connection.rs | 6 ++++++ src/transport/tcp/listener.rs | 12 ++++++++---- src/transport/tcp/mod.rs | 15 +++++++++++++-- src/transport/websocket/config.rs | 6 ++++++ src/transport/websocket/listener.rs | 8 +++++--- src/transport/websocket/mod.rs | 16 ++++++++++++++-- 7 files changed, 58 insertions(+), 11 deletions(-) diff --git a/src/transport/tcp/config.rs b/src/transport/tcp/config.rs index cd4926b2..cfa12af1 100644 --- a/src/transport/tcp/config.rs +++ b/src/transport/tcp/config.rs @@ -41,6 +41,11 @@ pub struct Config { /// Defaults to `true`. pub reuse_port: bool, + /// Enable `TCP_NODELAY`. + /// + /// Defaults to `false`. + pub nodelay: bool, + /// Yamux configuration. pub yamux_config: crate::yamux::Config, @@ -85,6 +90,7 @@ impl Default for Config { "/ip6/::/tcp/0".parse().expect("valid address"), ], reuse_port: true, + nodelay: false, yamux_config: Default::default(), noise_read_ahead_frame_count: MAX_READ_AHEAD_FACTOR, noise_write_buffer_size: MAX_WRITE_BUFFER_SIZE, diff --git a/src/transport/tcp/connection.rs b/src/transport/tcp/connection.rs index ac19d404..d01b1889 100644 --- a/src/transport/tcp/connection.rs +++ b/src/transport/tcp/connection.rs @@ -681,6 +681,7 @@ mod tests { .with(Protocol::Tcp(address.port())), Default::default(), Duration::from_secs(10), + false, ) .await .unwrap(); @@ -775,6 +776,7 @@ mod tests { .with(Protocol::Tcp(address.port())), Default::default(), Duration::from_secs(10), + false, ) .await .unwrap(); @@ -916,6 +918,7 @@ mod tests { .with(Protocol::Tcp(address.port())), Default::default(), Duration::from_secs(10), + false, ) .await .unwrap(); @@ -961,6 +964,7 @@ mod tests { .with(Protocol::Tcp(address.port())), Default::default(), Duration::from_secs(10), + false, ) .await .unwrap(); @@ -1115,6 +1119,7 @@ mod tests { .with(Protocol::Tcp(address.port())), Default::default(), Duration::from_secs(10), + false, ) .await .unwrap(); @@ -1224,6 +1229,7 @@ mod tests { .with(Protocol::Tcp(address.port())), Default::default(), Duration::from_secs(10), + false, ) .await .unwrap(); diff --git a/src/transport/tcp/listener.rs b/src/transport/tcp/listener.rs index 40e1043e..5cfd2a77 100644 --- a/src/transport/tcp/listener.rs +++ b/src/transport/tcp/listener.rs @@ -106,6 +106,7 @@ impl TcpListener { pub fn new( addresses: Vec, reuse_port: bool, + nodelay: bool, ) -> (Self, Vec, DialAddresses) { let (listeners, listen_addresses): (_, Vec>) = addresses .into_iter() @@ -131,6 +132,7 @@ impl TcpListener { }, }; + socket.set_nodelay(nodelay).ok()?; socket.set_nonblocking(true).ok()?; socket.set_reuse_address(true).ok()?; #[cfg(unix)] @@ -339,7 +341,7 @@ mod tests { #[tokio::test] async fn no_listeners() { - let (mut listener, _, _) = TcpListener::new(Vec::new(), true); + let (mut listener, _, _) = TcpListener::new(Vec::new(), true, false); futures::future::poll_fn(|cx| match listener.poll_next_unpin(cx) { Poll::Pending => Poll::Ready(()), @@ -351,7 +353,8 @@ mod tests { #[tokio::test] async fn one_listener() { let address: Multiaddr = "/ip6/::1/tcp/0".parse().unwrap(); - let (mut listener, listen_addresses, _) = TcpListener::new(vec![address.clone()], true); + let (mut listener, listen_addresses, _) = + TcpListener::new(vec![address.clone()], true, false); let Some(Protocol::Tcp(port)) = listen_addresses.iter().next().unwrap().clone().iter().skip(1).next() else { @@ -368,7 +371,8 @@ mod tests { async fn two_listeners() { let address1: Multiaddr = "/ip6/::1/tcp/0".parse().unwrap(); let address2: Multiaddr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); - let (mut listener, listen_addresses, _) = TcpListener::new(vec![address1, address2], true); + let (mut listener, listen_addresses, _) = + TcpListener::new(vec![address1, address2], true, false); let Some(Protocol::Tcp(port1)) = listen_addresses.iter().next().unwrap().clone().iter().skip(1).next() else { @@ -421,7 +425,7 @@ mod tests { async fn show_all_addresses() { let address1: Multiaddr = "/ip6/::/tcp/0".parse().unwrap(); let address2: Multiaddr = "/ip4/0.0.0.0/tcp/0".parse().unwrap(); - let (_, listen_addresses, _) = TcpListener::new(vec![address1, address2], true); + let (_, listen_addresses, _) = TcpListener::new(vec![address1, address2], true, false); println!("{listen_addresses:#?}"); } diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index e9dee257..06c4f0b7 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -137,6 +137,7 @@ impl TcpTransport { address: Multiaddr, dial_addresses: DialAddresses, connection_open_timeout: Duration, + nodelay: bool, ) -> crate::Result<(Multiaddr, TcpStream)> { let (socket_address, _) = TcpListener::get_socket_address(&address)?; let remote_address = match socket_address { @@ -198,6 +199,7 @@ impl TcpTransport { socket.set_only_v6(true)?; } socket.set_nonblocking(true)?; + socket.set_nodelay(nodelay)?; match dial_addresses.local_dial_address(&remote_address.ip()) { Ok(Some(dial_address)) => { @@ -261,6 +263,7 @@ impl TransportBuilder for TcpTransport { let (listener, listen_addresses, dial_addresses) = TcpListener::new( std::mem::take(&mut config.listen_addresses), config.reuse_port, + config.nodelay, ); Ok(( @@ -293,11 +296,12 @@ impl Transport for TcpTransport { let substream_open_timeout = self.config.substream_open_timeout; let dial_addresses = self.dial_addresses.clone(); let keypair = self.context.keypair.clone(); + let nodelay = self.config.nodelay; self.pending_dials.insert(connection_id, address.clone()); self.pending_connections.push(Box::pin(async move { let (_, stream) = - TcpTransport::dial_peer(address, dial_addresses, connection_open_timeout) + TcpTransport::dial_peer(address, dial_addresses, connection_open_timeout, nodelay) .await .map_err(|error| (connection_id, error))?; @@ -370,9 +374,16 @@ impl Transport for TcpTransport { .map(|address| { let dial_addresses = self.dial_addresses.clone(); let connection_open_timeout = self.config.connection_open_timeout; + let nodelay = self.config.nodelay; async move { - TcpTransport::dial_peer(address, dial_addresses, connection_open_timeout).await + TcpTransport::dial_peer( + address, + dial_addresses, + connection_open_timeout, + nodelay, + ) + .await } }) .collect(); diff --git a/src/transport/websocket/config.rs b/src/transport/websocket/config.rs index 1ec113a6..facab4c6 100644 --- a/src/transport/websocket/config.rs +++ b/src/transport/websocket/config.rs @@ -41,6 +41,11 @@ pub struct Config { /// Defaults to `true`. pub reuse_port: bool, + /// Enable `TCP_NODELAY`. + /// + /// Defaults to `false`. + pub nodelay: bool, + /// Yamux configuration. pub yamux_config: crate::yamux::Config, @@ -85,6 +90,7 @@ impl Default for Config { "/ip6/::/tcp/0/ws".parse().expect("valid address"), ], reuse_port: true, + nodelay: false, yamux_config: Default::default(), noise_read_ahead_frame_count: MAX_READ_AHEAD_FACTOR, noise_write_buffer_size: MAX_WRITE_BUFFER_SIZE, diff --git a/src/transport/websocket/listener.rs b/src/transport/websocket/listener.rs index 87dc057a..e6c1c962 100644 --- a/src/transport/websocket/listener.rs +++ b/src/transport/websocket/listener.rs @@ -105,6 +105,7 @@ impl WebSocketListener { pub fn new( addresses: Vec, reuse_port: bool, + nodelay: bool, ) -> (Self, Vec, DialAddresses) { let (listeners, listen_addresses): (_, Vec>) = addresses .into_iter() @@ -135,6 +136,7 @@ impl WebSocketListener { .ok()?, }; + socket.set_nodelay(nodelay).ok()?; socket.set_nonblocking(true).ok()?; socket.set_reuse_address(true).ok()?; #[cfg(unix)] @@ -398,7 +400,7 @@ mod tests { #[tokio::test] async fn no_listeners() { - let (mut listener, _, _) = WebSocketListener::new(Vec::new(), true); + let (mut listener, _, _) = WebSocketListener::new(Vec::new(), true, false); futures::future::poll_fn(|cx| match listener.poll_next_unpin(cx) { Poll::Pending => Poll::Ready(()), @@ -411,7 +413,7 @@ mod tests { async fn one_listener() { let address: Multiaddr = "/ip6/::1/tcp/0/ws".parse().unwrap(); let (mut listener, listen_addresses, _) = - WebSocketListener::new(vec![address.clone()], true); + WebSocketListener::new(vec![address.clone()], true, false); let Some(Protocol::Tcp(port)) = listen_addresses.iter().next().unwrap().clone().iter().skip(1).next() else { @@ -429,7 +431,7 @@ mod tests { let address1: Multiaddr = "/ip6/::1/tcp/0/ws".parse().unwrap(); let address2: Multiaddr = "/ip4/127.0.0.1/tcp/0/ws".parse().unwrap(); let (mut listener, listen_addresses, _) = - WebSocketListener::new(vec![address1, address2], true); + WebSocketListener::new(vec![address1, address2], true, false); let Some(Protocol::Tcp(port1)) = listen_addresses.iter().next().unwrap().clone().iter().skip(1).next() diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index a90db4ed..b8288270 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -182,6 +182,7 @@ impl WebSocketTransport { address: Multiaddr, dial_addresses: DialAddresses, connection_open_timeout: Duration, + nodelay: bool, ) -> crate::Result<(Multiaddr, WebSocketStream>)> { let (url, _) = Self::multiaddr_into_url(address.clone())?; let (socket_address, _) = WebSocketListener::get_socket_address(&address)?; @@ -245,6 +246,7 @@ impl WebSocketTransport { socket.set_only_v6(true)?; } socket.set_nonblocking(true)?; + socket.set_nodelay(nodelay)?; match dial_addresses.local_dial_address(&remote_address.ip()) { Ok(Some(dial_address)) => { @@ -315,6 +317,7 @@ impl TransportBuilder for WebSocketTransport { let (listener, listen_addresses, dial_addresses) = WebSocketListener::new( std::mem::take(&mut config.listen_addresses), config.reuse_port, + config.nodelay, ); Ok(( @@ -344,6 +347,8 @@ impl Transport for WebSocketTransport { let max_read_ahead_factor = self.config.noise_read_ahead_frame_count; let max_write_buffer_size = self.config.noise_write_buffer_size; let dial_addresses = self.dial_addresses.clone(); + let nodelay = self.config.nodelay; + self.pending_dials.insert(connection_id, address.clone()); tracing::debug!(target: LOG_TARGET, ?connection_id, ?address, "open connection"); @@ -353,6 +358,7 @@ impl Transport for WebSocketTransport { address.clone(), dial_addresses, connection_open_timeout, + nodelay, ) .await .map_err(|error| WebSocketError::new(error, Some(connection_id)))?; @@ -437,10 +443,16 @@ impl Transport for WebSocketTransport { .map(|address| { let connection_open_timeout = self.config.connection_open_timeout; let dial_addresses = self.dial_addresses.clone(); + let nodelay = self.config.nodelay; async move { - WebSocketTransport::dial_peer(address, dial_addresses, connection_open_timeout) - .await + WebSocketTransport::dial_peer( + address, + dial_addresses, + connection_open_timeout, + nodelay, + ) + .await } }) .collect();