From 87be0b34bcb1203d7d1eb32a0535960546194a70 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 2 Dec 2022 13:04:35 +1100 Subject: [PATCH 1/8] Don't use `task_event_buffer_size` for pending connections The task for a pending connection only ever sends one event into this channel: Either a success or a failure. Cloning a sender adds one slot to the capacity of the channel. Hence, we can start this capacity at 0 and have the `cloning` of the `Sender` take care of properly increasing the capacity. --- swarm/src/connection/pool.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 8729b2e36e1..6ee085989d4 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -314,8 +314,7 @@ where { /// Creates a new empty `Pool`. pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self { - let (pending_connection_events_tx, pending_connection_events_rx) = - mpsc::channel(config.task_event_buffer_size); + let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(0); let (established_connection_events_tx, established_connection_events_rx) = mpsc::channel(config.task_event_buffer_size); let executor = match config.executor { From 0019edf7d26def0eff13746c677a5b46fca96d85 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 2 Dec 2022 13:15:04 +1100 Subject: [PATCH 2/8] Don't share event buffer between established connections --- swarm/src/connection/pool.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 6ee085989d4..b8411a583ca 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -32,6 +32,7 @@ use crate::{ use concurrent_dial::ConcurrentDial; use fnv::FnvHashMap; use futures::prelude::*; +use futures::stream::SelectAll; use futures::{ channel::{mpsc, oneshot}, future::{poll_fn, BoxFuture, Either}, @@ -117,6 +118,9 @@ where /// See [`Connection::max_negotiating_inbound_streams`]. max_negotiating_inbound_streams: usize, + /// How many [`task::EstablishedConnectionEvent`]s can be buffered before the connection is back-pressured. + task_event_buffer_size: usize, + /// The executor to use for running connection tasks. Can either be a global executor /// or a local queue. executor: ExecSwitch, @@ -128,14 +132,9 @@ where /// Receiver for events reported from pending tasks. pending_connection_events_rx: mpsc::Receiver>, - /// Sender distributed to established tasks for reporting events back - /// to the pool. - established_connection_events_tx: - mpsc::Sender>, - - /// Receiver for events reported from established tasks. - established_connection_events_rx: - mpsc::Receiver>, + /// Receivers for events reported from established connections. + established_connection_events: + SelectAll>>, } #[derive(Debug)] @@ -315,8 +314,6 @@ where /// Creates a new empty `Pool`. pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self { let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(0); - let (established_connection_events_tx, established_connection_events_rx) = - mpsc::channel(config.task_event_buffer_size); let executor = match config.executor { Some(exec) => ExecSwitch::Executor(exec), None => ExecSwitch::LocalSpawn(Default::default()), @@ -331,11 +328,11 @@ where dial_concurrency_factor: config.dial_concurrency_factor, substream_upgrade_protocol_override: config.substream_upgrade_protocol_override, max_negotiating_inbound_streams: config.max_negotiating_inbound_streams, + task_event_buffer_size: config.task_event_buffer_size, executor, pending_connection_events_tx, pending_connection_events_rx, - established_connection_events_tx, - established_connection_events_rx, + established_connection_events: Default::default(), } } @@ -556,7 +553,7 @@ where // // Note that established connections are polled before pending connections, thus // prioritizing established connections over pending connections. - match self.established_connection_events_rx.poll_next_unpin(cx) { + match self.established_connection_events.poll_next_unpin(cx) { Poll::Pending => {} Poll::Ready(None) => unreachable!("Pool holds both sender and receiver."), @@ -758,6 +755,8 @@ where let (command_sender, command_receiver) = mpsc::channel(self.task_command_buffer_size); + let (event_sender, event_receiver) = mpsc::channel(self.task_event_buffer_size); + conns.insert( id, EstablishedConnection { @@ -765,20 +764,21 @@ where sender: command_sender, }, ); - + self.established_connection_events.push(event_receiver); let connection = Connection::new( muxer, handler.into_handler(&obtained_peer_id, &endpoint), self.substream_upgrade_protocol_override, self.max_negotiating_inbound_streams, ); + self.spawn( task::new_for_established_connection( id, obtained_peer_id, connection, command_receiver, - self.established_connection_events_tx.clone(), + event_sender, ) .boxed(), ); From d610b852036470cc70597978717c005d54d10e42 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 2 Dec 2022 14:00:16 +1100 Subject: [PATCH 3/8] Properly handle empty stream of established connections --- swarm/src/connection/pool.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index b8411a583ca..cb3ac9e7720 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -41,6 +41,7 @@ use futures::{ }; use libp2p_core::connection::{ConnectionId, Endpoint, PendingPoint}; use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt}; +use std::task::Waker; use std::{ collections::{hash_map, HashMap}, convert::TryFrom as _, @@ -132,6 +133,9 @@ where /// Receiver for events reported from pending tasks. pending_connection_events_rx: mpsc::Receiver>, + /// Waker in case we haven't established any connections yet. + no_established_connections_waker: Option, + /// Receivers for events reported from established connections. established_connection_events: SelectAll>>, @@ -332,6 +336,7 @@ where executor, pending_connection_events_tx, pending_connection_events_rx, + no_established_connections_waker: None, established_connection_events: Default::default(), } } @@ -555,7 +560,9 @@ where // prioritizing established connections over pending connections. match self.established_connection_events.poll_next_unpin(cx) { Poll::Pending => {} - Poll::Ready(None) => unreachable!("Pool holds both sender and receiver."), + Poll::Ready(None) => { + self.no_established_connections_waker = Some(cx.waker().clone()); + } Poll::Ready(Some(task::EstablishedConnectionEvent::Notify { id, peer_id, event })) => { return Poll::Ready(PoolEvent::ConnectionEvent { peer_id, id, event }); @@ -765,6 +772,10 @@ where }, ); self.established_connection_events.push(event_receiver); + if let Some(waker) = self.no_established_connections_waker.take() { + waker.wake(); + } + let connection = Connection::new( muxer, handler.into_handler(&obtained_peer_id, &endpoint), From 1b6715206732b1695f353c64e0f269aebf0aab7b Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 9 Dec 2022 15:10:44 +1100 Subject: [PATCH 4/8] Document semantic change of variable --- swarm/CHANGELOG.md | 6 ++++++ swarm/Cargo.toml | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 28504e85bde..a4293cdb756 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.41.2 + +- `SwarmBuilder::connection_event_buffer_size` now represents the size of a buffer _per connection_. See [PR 3188]. + +[PR 3188]: https://github.com/libp2p/rust-libp2p/pull/3188 + # 0.41.1 - Update to `libp2p-swarm-derive` `v0.31.0`. diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index bda9141e813..292e88ba4cb 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-swarm" edition = "2021" rust-version = "1.62.0" description = "The libp2p swarm" -version = "0.41.1" +version = "0.41.2" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" From 39d02f4a427b56d425823a1d340c0d9b9066c7de Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 19 Dec 2022 16:28:32 +1100 Subject: [PATCH 5/8] Update docs --- swarm/src/lib.rs | 22 +++++----------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index cf6051e1e85..b53d0c72a4f 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -1515,25 +1515,13 @@ where self } - /// Configures the number of extra events from the [`ConnectionHandler`] in - /// destination to the [`NetworkBehaviour`] that can be buffered before - /// the [`ConnectionHandler`] has to go to sleep. - /// - /// There exists a buffer of events received from [`ConnectionHandler`]s - /// that the [`NetworkBehaviour`] has yet to process. This buffer is - /// shared between all instances of [`ConnectionHandler`]. Each instance of - /// [`ConnectionHandler`] is guaranteed one slot in this buffer, meaning - /// that delivering an event for the first time is guaranteed to be - /// instantaneous. Any extra event delivery, however, must wait for that - /// first event to be delivered or for an "extra slot" to be available. + /// Configures the size of the buffer for events sent by a [`ConnectionHandler`] to the + /// [`NetworkBehaviour`]. /// - /// This option configures the number of such "extra slots" in this - /// shared buffer. These extra slots are assigned in a first-come, - /// first-served basis. + /// Each connection has its own buffer. /// - /// The ideal value depends on the executor used, the CPU speed, the - /// average number of connections, and the volume of events. If this value - /// is too low, then the [`ConnectionHandler`]s will be sleeping more often + /// The ideal value depends on the executor used, the CPU speed and the volume of events. + /// If this value is too low, then the [`ConnectionHandler`]s will be sleeping more often /// than necessary. Increasing this value increases the overall memory /// usage, and more importantly the latency between the moment when an /// event is emitted and the moment when it is received by the From f71b87a55bf041c9ce7bc91864298583494e20e3 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 17 Jan 2023 16:35:35 +1100 Subject: [PATCH 6/8] Remove bad changelog entry --- swarm/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index c5d62583c48..2bad2e8c375 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -3,7 +3,7 @@ - Update to `libp2p-core` `v0.39.0`. - Removed deprecated Swarm constructors. For transition notes see [0.41.0](#0.41.0). See [PR 3170]. -- + - Deprecate functions on `PollParameters` in preparation for `PollParameters` to be removed entirely eventually. See [PR 3153]. - Add `estblished_in` to `SwarmEvent::ConnectionEstablished`. See [PR 3134]. From 5d49db8d80e6eda10dab2874999c38cc9439ddcb Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 17 Jan 2023 16:36:34 +1100 Subject: [PATCH 7/8] Rename internal variable --- swarm/src/connection/pool.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 36d066e69aa..bf347209669 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -120,7 +120,7 @@ where max_negotiating_inbound_streams: usize, /// How many [`task::EstablishedConnectionEvent`]s can be buffered before the connection is back-pressured. - task_event_buffer_size: usize, + per_connection_event_buffer_size: usize, /// The executor to use for running connection tasks. Can either be a global executor /// or a local queue. @@ -332,7 +332,7 @@ where dial_concurrency_factor: config.dial_concurrency_factor, substream_upgrade_protocol_override: config.substream_upgrade_protocol_override, max_negotiating_inbound_streams: config.max_negotiating_inbound_streams, - task_event_buffer_size: config.per_connection_event_buffer_size, + per_connection_event_buffer_size: config.per_connection_event_buffer_size, executor, pending_connection_events_tx, pending_connection_events_rx, @@ -754,7 +754,7 @@ where let (command_sender, command_receiver) = mpsc::channel(self.task_command_buffer_size); - let (event_sender, event_receiver) = mpsc::channel(self.task_event_buffer_size); + let (event_sender, event_receiver) = mpsc::channel(self.per_connection_event_buffer_size); conns.insert( id, From e9fc37c1207d3ebef6b9d896b5dddb23c678d796 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 17 Jan 2023 16:38:55 +1100 Subject: [PATCH 8/8] Fix formatting --- swarm/src/connection/pool.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index bf347209669..9c1eed7aae1 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -754,7 +754,8 @@ where let (command_sender, command_receiver) = mpsc::channel(self.task_command_buffer_size); - let (event_sender, event_receiver) = mpsc::channel(self.per_connection_event_buffer_size); + let (event_sender, event_receiver) = + mpsc::channel(self.per_connection_event_buffer_size); conns.insert( id,