Skip to content

Commit

Permalink
fix(bootstrap): fix stalled connections w/multiple bootstrap contacts
Browse files Browse the repository at this point in the history
When multiple bootstrap contacts are used the first successful
connection is considered and the rest is disregarded. This results in
unfinished connection attempts that are left lingering in the connection
de-duplicator and connection pool. This fix introduces
Endpoint::new_connection that creates a fresh connection
disregarding the de-duplication of connections that is used by the
bootstrap function which then only saves the successful connection in the pool
  • Loading branch information
lionel-faber authored and S-Coyle committed Apr 7, 2021
1 parent ddb9cd0 commit cd02b6a
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 15 deletions.
11 changes: 7 additions & 4 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use super::{
error::{Error, Result},
peer_config::{self, DEFAULT_IDLE_TIMEOUT_MSEC, DEFAULT_KEEP_ALIVE_INTERVAL_MSEC},
};
use futures::{future, TryFutureExt};
use futures::future;
use log::{debug, error, info, trace};
use std::net::{SocketAddr, UdpSocket};
use std::path::PathBuf;
Expand Down Expand Up @@ -179,20 +179,23 @@ impl QuicP2p {
return Err(Error::EmptyBootstrapNodesList);
}

// Attempt to connect to all nodes and return the first one to succeed
// Attempt to create a new connection to all nodes and return the first one to succeed
let tasks = endpoint
.bootstrap_nodes()
.iter()
.map(|addr| Box::pin(endpoint.connect_to(addr).map_ok(move |()| *addr)));
.map(|addr| Box::pin(endpoint.new_connection(addr)));

let bootstrapped_peer = future::select_ok(tasks)
let successful_connection = future::select_ok(tasks)
.await
.map_err(|err| {
error!("Failed to bootstrap to the network: {}", err);
Error::BootstrapFailure
})?
.0;

let bootstrapped_peer = successful_connection.connection.remote_address();
endpoint.add_new_connection(successful_connection);

Ok((
endpoint,
incoming_connections,
Expand Down
36 changes: 27 additions & 9 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,24 +371,42 @@ impl Endpoint {

trace!("Successfully connected to peer: {}", node_addr);

self.add_new_connection(new_conn);

self.connection_deduplicator
.complete(node_addr, Ok(()))
.await;

Ok(())
}

/// Creates a fresh connection without looking at the connection pool and connection duplicator.
pub(crate) async fn new_connection(
&self,
peer_addr: &SocketAddr,
) -> Result<quinn::NewConnection> {
let new_connection = self
.quic_endpoint
.connect_with(self.client_cfg.clone(), peer_addr, CERT_SERVER_NAME)?
.await?;

trace!("Successfully created new connection to peer: {}", peer_addr);
Ok(new_connection)
}

pub(crate) fn add_new_connection(&self, conn: quinn::NewConnection) {
let guard = self
.connection_pool
.insert(*node_addr, new_conn.connection.clone());
.insert(conn.connection.remote_address(), conn.connection);

listen_for_incoming_messages(
new_conn.uni_streams,
new_conn.bi_streams,
conn.uni_streams,
conn.bi_streams,
guard,
self.message_tx.clone(),
self.disconnection_tx.clone(),
self.clone(),
);

self.connection_deduplicator
.complete(node_addr, Ok(()))
.await;

Ok(())
}

/// Get an existing connection for the peer address.
Expand Down
31 changes: 29 additions & 2 deletions src/tests/common.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::{new_qp2p, random_msg};
use super::{new_qp2p, new_qp2p_with_hcc, random_msg};
use crate::utils;
use anyhow::{anyhow, Result};
use futures::future;
use std::time::Duration;
use std::{collections::HashSet, time::Duration};
use tokio::time::timeout;

#[tokio::test]
Expand Down Expand Up @@ -417,3 +417,30 @@ async fn many_messages() -> Result<()> {
let _ = future::try_join_all(tasks).await?;
Ok(())
}

// When we bootstrap with multiple bootstrap contacts, we will use the first connection
// that succeeds. We should still be able to establish a connection with the rest of the
// bootstrap contacts later.
#[tokio::test]
async fn connection_attempts_to_bootstrap_contacts_should_succeed() -> Result<()> {
let qp2p = new_qp2p()?;

let (ep1, _, _, _) = qp2p.new_endpoint().await?;
let (ep2, _, _, _) = qp2p.new_endpoint().await?;
let (ep3, _, _, _) = qp2p.new_endpoint().await?;

let contacts = vec![ep1.socket_addr(), ep2.socket_addr(), ep3.socket_addr()]
.iter()
.cloned()
.collect::<HashSet<_>>();

let qp2p = new_qp2p_with_hcc(contacts.clone())?;
let (ep, _, _, _, bootstrapped_peer) = qp2p.bootstrap().await?;

for peer in contacts {
if peer != bootstrapped_peer {
ep.connect_to(&peer).await?;
}
}
Ok(())
}

0 comments on commit cd02b6a

Please sign in to comment.