diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 25969d67052..2bad2e8c375 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -3,6 +3,7 @@ - Update to `libp2p-core` `v0.39.0`. - Removed deprecated Swarm constructors. For transition notes see [0.41.0](#0.41.0). See [PR 3170]. + - Deprecate functions on `PollParameters` in preparation for `PollParameters` to be removed entirely eventually. See [PR 3153]. - Add `estblished_in` to `SwarmEvent::ConnectionEstablished`. See [PR 3134]. @@ -15,11 +16,18 @@ - Remove type parameter from `PendingOutboundConnectionError` and `PendingInboundConnectionError`. These two types are always used with `std::io::Error`. See [PR 3272]. +- Replace `SwarmBuilder::connection_event_buffer_size` with `SwarmBuilder::per_connection_event_buffer_size` . + The configured value now applies _per_ connection. + The default values remains 7. + If you have previously set `connection_event_buffer_size` you should re-evaluate what a good size for a _per connection_ buffer is. + See [PR 3188]. + [PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170 [PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134 [PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153 [PR 3264]: https://github.com/libp2p/rust-libp2p/pull/3264 [PR 3272]: https://github.com/libp2p/rust-libp2p/pull/3272 +[PR 3188]: https://github.com/libp2p/rust-libp2p/pull/3188 # 0.41.1 diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 80b464a033e..a23edadd640 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -32,6 +32,7 @@ use crate::{ use concurrent_dial::ConcurrentDial; use fnv::FnvHashMap; use futures::prelude::*; +use futures::stream::SelectAll; use futures::{ channel::{mpsc, oneshot}, future::{poll_fn, BoxFuture, Either}, @@ -41,6 +42,7 @@ use futures::{ use instant::Instant; use libp2p_core::connection::Endpoint; use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt}; +use std::task::Waker; use std::{ collections::{hash_map, HashMap}, convert::TryFrom as _, @@ -117,6 +119,9 @@ where /// See [`Connection::max_negotiating_inbound_streams`]. max_negotiating_inbound_streams: usize, + /// How many [`task::EstablishedConnectionEvent`]s can be buffered before the connection is back-pressured. + per_connection_event_buffer_size: usize, + /// The executor to use for running connection tasks. Can either be a global executor /// or a local queue. executor: ExecSwitch, @@ -128,14 +133,12 @@ where /// Receiver for events reported from pending tasks. pending_connection_events_rx: mpsc::Receiver, - /// Sender distributed to established tasks for reporting events back - /// to the pool. - established_connection_events_tx: - mpsc::Sender>, + /// Waker in case we haven't established any connections yet. + no_established_connections_waker: Option, - /// Receiver for events reported from established tasks. - established_connection_events_rx: - mpsc::Receiver>, + /// Receivers for events reported from established connections. + established_connection_events: + SelectAll>>, } #[derive(Debug)] @@ -315,8 +318,6 @@ where /// Creates a new empty `Pool`. pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self { let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(0); - let (established_connection_events_tx, established_connection_events_rx) = - mpsc::channel(config.task_event_buffer_size); let executor = match config.executor { Some(exec) => ExecSwitch::Executor(exec), None => ExecSwitch::LocalSpawn(Default::default()), @@ -331,11 +332,12 @@ where dial_concurrency_factor: config.dial_concurrency_factor, substream_upgrade_protocol_override: config.substream_upgrade_protocol_override, max_negotiating_inbound_streams: config.max_negotiating_inbound_streams, + per_connection_event_buffer_size: config.per_connection_event_buffer_size, executor, pending_connection_events_tx, pending_connection_events_rx, - established_connection_events_tx, - established_connection_events_rx, + no_established_connections_waker: None, + established_connection_events: Default::default(), } } @@ -547,9 +549,11 @@ where // // Note that established connections are polled before pending connections, thus // prioritizing established connections over pending connections. - match self.established_connection_events_rx.poll_next_unpin(cx) { + match self.established_connection_events.poll_next_unpin(cx) { Poll::Pending => {} - Poll::Ready(None) => unreachable!("Pool holds both sender and receiver."), + Poll::Ready(None) => { + self.no_established_connections_waker = Some(cx.waker().clone()); + } Poll::Ready(Some(task::EstablishedConnectionEvent::Notify { id, peer_id, event })) => { return Poll::Ready(PoolEvent::ConnectionEvent { peer_id, id, event }); @@ -750,6 +754,9 @@ where let (command_sender, command_receiver) = mpsc::channel(self.task_command_buffer_size); + let (event_sender, event_receiver) = + mpsc::channel(self.per_connection_event_buffer_size); + conns.insert( id, EstablishedConnection { @@ -757,6 +764,10 @@ where sender: command_sender, }, ); + self.established_connection_events.push(event_receiver); + if let Some(waker) = self.no_established_connections_waker.take() { + waker.wake(); + } let connection = Connection::new( muxer, @@ -764,13 +775,14 @@ where self.substream_upgrade_protocol_override, self.max_negotiating_inbound_streams, ); + self.spawn( task::new_for_established_connection( id, obtained_peer_id, connection, command_receiver, - self.established_connection_events_tx.clone(), + event_sender, ) .boxed(), ); @@ -1069,7 +1081,7 @@ pub struct PoolConfig { /// Size of the pending connection task event buffer and the established connection task event /// buffer. - pub task_event_buffer_size: usize, + pub per_connection_event_buffer_size: usize, /// Number of addresses concurrently dialed for a single outbound connection attempt. pub dial_concurrency_factor: NonZeroU8, @@ -1088,7 +1100,7 @@ impl PoolConfig { Self { executor, task_command_buffer_size: 32, - task_event_buffer_size: 7, + per_connection_event_buffer_size: 7, dial_concurrency_factor: NonZeroU8::new(8).expect("8 > 0"), substream_upgrade_protocol_override: None, max_negotiating_inbound_streams: 128, @@ -1113,8 +1125,8 @@ impl PoolConfig { /// When the buffer is full, the background tasks of all connections will stall. /// In this way, the consumers of network events exert back-pressure on /// the network connection I/O. - pub fn with_connection_event_buffer_size(mut self, n: usize) -> Self { - self.task_event_buffer_size = n; + pub fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self { + self.per_connection_event_buffer_size = n; self } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 044e3dbfc37..0e2b0abc9ec 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -1489,31 +1489,19 @@ where self } - /// Configures the number of extra events from the [`ConnectionHandler`] in - /// destination to the [`NetworkBehaviour`] that can be buffered before - /// the [`ConnectionHandler`] has to go to sleep. - /// - /// There exists a buffer of events received from [`ConnectionHandler`]s - /// that the [`NetworkBehaviour`] has yet to process. This buffer is - /// shared between all instances of [`ConnectionHandler`]. Each instance of - /// [`ConnectionHandler`] is guaranteed one slot in this buffer, meaning - /// that delivering an event for the first time is guaranteed to be - /// instantaneous. Any extra event delivery, however, must wait for that - /// first event to be delivered or for an "extra slot" to be available. + /// Configures the size of the buffer for events sent by a [`ConnectionHandler`] to the + /// [`NetworkBehaviour`]. /// - /// This option configures the number of such "extra slots" in this - /// shared buffer. These extra slots are assigned in a first-come, - /// first-served basis. + /// Each connection has its own buffer. /// - /// The ideal value depends on the executor used, the CPU speed, the - /// average number of connections, and the volume of events. If this value - /// is too low, then the [`ConnectionHandler`]s will be sleeping more often + /// The ideal value depends on the executor used, the CPU speed and the volume of events. + /// If this value is too low, then the [`ConnectionHandler`]s will be sleeping more often /// than necessary. Increasing this value increases the overall memory /// usage, and more importantly the latency between the moment when an /// event is emitted and the moment when it is received by the /// [`NetworkBehaviour`]. - pub fn connection_event_buffer_size(mut self, n: usize) -> Self { - self.pool_config = self.pool_config.with_connection_event_buffer_size(n); + pub fn per_connection_event_buffer_size(mut self, n: usize) -> Self { + self.pool_config = self.pool_config.with_per_connection_event_buffer_size(n); self }