Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(swarm)!: don't share event buffer for established connections #3188

Merged
merged 11 commits into from
Jan 19, 2023
42 changes: 26 additions & 16 deletions swarm/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::{
use concurrent_dial::ConcurrentDial;
use fnv::FnvHashMap;
use futures::prelude::*;
use futures::stream::SelectAll;
use futures::{
channel::{mpsc, oneshot},
future::{poll_fn, BoxFuture, Either},
Expand All @@ -40,6 +41,7 @@ use futures::{
};
use libp2p_core::connection::{ConnectionId, Endpoint, PendingPoint};
use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt};
use std::task::Waker;
use std::{
collections::{hash_map, HashMap},
convert::TryFrom as _,
Expand Down Expand Up @@ -117,6 +119,9 @@ where
/// See [`Connection::max_negotiating_inbound_streams`].
max_negotiating_inbound_streams: usize,

/// How many [`task::EstablishedConnectionEvent`]s can be buffered before the connection is back-pressured.
task_event_buffer_size: usize,

/// The executor to use for running connection tasks. Can either be a global executor
/// or a local queue.
executor: ExecSwitch,
Expand All @@ -128,14 +133,12 @@ where
/// Receiver for events reported from pending tasks.
pending_connection_events_rx: mpsc::Receiver<task::PendingConnectionEvent<TTrans>>,

/// Sender distributed to established tasks for reporting events back
/// to the pool.
established_connection_events_tx:
mpsc::Sender<task::EstablishedConnectionEvent<THandler::Handler>>,
/// Waker in case we haven't established any connections yet.
no_established_connections_waker: Option<Waker>,

/// Receiver for events reported from established tasks.
established_connection_events_rx:
mpsc::Receiver<task::EstablishedConnectionEvent<THandler::Handler>>,
/// Receivers for events reported from established connections.
established_connection_events:
SelectAll<mpsc::Receiver<task::EstablishedConnectionEvent<THandler::Handler>>>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -314,10 +317,7 @@ where
{
/// Creates a new empty `Pool`.
pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self {
let (pending_connection_events_tx, pending_connection_events_rx) =
mpsc::channel(config.task_event_buffer_size);
let (established_connection_events_tx, established_connection_events_rx) =
mpsc::channel(config.task_event_buffer_size);
let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(0);
let executor = match config.executor {
Some(exec) => ExecSwitch::Executor(exec),
None => ExecSwitch::LocalSpawn(Default::default()),
Expand All @@ -332,11 +332,12 @@ where
dial_concurrency_factor: config.dial_concurrency_factor,
substream_upgrade_protocol_override: config.substream_upgrade_protocol_override,
max_negotiating_inbound_streams: config.max_negotiating_inbound_streams,
task_event_buffer_size: config.task_event_buffer_size,
executor,
pending_connection_events_tx,
pending_connection_events_rx,
established_connection_events_tx,
established_connection_events_rx,
no_established_connections_waker: None,
established_connection_events: Default::default(),
}
}

Expand Down Expand Up @@ -557,9 +558,11 @@ where
//
// Note that established connections are polled before pending connections, thus
// prioritizing established connections over pending connections.
match self.established_connection_events_rx.poll_next_unpin(cx) {
match self.established_connection_events.poll_next_unpin(cx) {
Poll::Pending => {}
Poll::Ready(None) => unreachable!("Pool holds both sender and receiver."),
Poll::Ready(None) => {
self.no_established_connections_waker = Some(cx.waker().clone());
}

Poll::Ready(Some(task::EstablishedConnectionEvent::Notify { id, peer_id, event })) => {
return Poll::Ready(PoolEvent::ConnectionEvent { peer_id, id, event });
Expand Down Expand Up @@ -759,27 +762,34 @@ where

let (command_sender, command_receiver) =
mpsc::channel(self.task_command_buffer_size);
let (event_sender, event_receiver) = mpsc::channel(self.task_event_buffer_size);
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved

conns.insert(
id,
EstablishedConnection {
endpoint: endpoint.clone(),
sender: command_sender,
},
);
self.established_connection_events.push(event_receiver);
if let Some(waker) = self.no_established_connections_waker.take() {
waker.wake();
}

let connection = Connection::new(
muxer,
handler.into_handler(&obtained_peer_id, &endpoint),
self.substream_upgrade_protocol_override,
self.max_negotiating_inbound_streams,
);

self.spawn(
task::new_for_established_connection(
id,
obtained_peer_id,
connection,
command_receiver,
self.established_connection_events_tx.clone(),
event_sender,
)
.boxed(),
);
Expand Down