Skip to content

Commit

Permalink
feat: retry on connect, retry on send
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuef committed Jul 15, 2021
1 parent 4f5cab4 commit 94bfb8b
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 118,181 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
Cargo.lock
*.*~
.idea
*.log
118,072 changes: 0 additions & 118,072 deletions logfile.log

This file was deleted.

4 changes: 2 additions & 2 deletions src/connection_deduplicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ impl ConnectionDeduplicator {
pub(crate) enum Error {
#[error("Connect error")]
Connect(#[from] quinn::ConnectError),
#[error("Connection error")]
#[error("Quinn Connection error during deduplication")]
Connection(#[from] quinn::ConnectionError),
}

impl From<Error> for crate::error::Error {
fn from(src: Error) -> Self {
match src {
Error::Connect(source) => Self::Connect(source),
Error::Connection(source) => Self::Connection(source),
Error::Connection(_) => Self::QuinnConnectionClosed,
}
}
}
81 changes: 37 additions & 44 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures::{future, stream::StreamExt};
use std::net::SocketAddr;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time::{timeout, Duration};
use tracing::{debug, error, trace, warn};
use tracing::{trace, warn};

/// Connection instance to a node which can be used to send messages to it
#[derive(Clone)]
Expand All @@ -28,15 +28,13 @@ pub(crate) struct Connection {
remover: ConnectionRemover,
}

pub type DisconnectSender = Sender<(SocketAddr, Result<()>)>;
/// Disconnection events, and the result that led to disconnection.
pub struct DisconnectionEvents(pub Receiver<(SocketAddr, Result<()>)>);
pub struct DisconnectionEvents(pub Receiver<SocketAddr>);

/// Disconnection
impl DisconnectionEvents {
/// Blocks until there is a disconnection event and returns the address of the disconnected peer
pub async fn next(&mut self) -> Option<(SocketAddr, Result<()>)> {
pub async fn next(&mut self) -> Option<SocketAddr> {
self.0.recv().await
}
}
Expand All @@ -54,9 +52,11 @@ impl Connection {
/// Send message to peer using a uni-directional stream.
pub async fn send_uni(&self, msg: Bytes) -> Result<()> {
let mut send_stream = self.handle_error(self.quic_conn.open_uni().await).await?;
self.handle_error(send_msg(&mut send_stream, msg).await)
self.handle_error(send_msg(&mut send_stream, msg.clone()).await)
.await?;

trace!("Message sent");

// We try to make sure the stream is gracefully closed and the bytes get sent,
// but if it was already closed (perhaps by the peer) then we
// don't remove the connection from the pool.
Expand Down Expand Up @@ -154,7 +154,7 @@ pub(super) fn listen_for_incoming_connections(
connection_pool: ConnectionPool,
message_tx: Sender<(SocketAddr, Bytes)>,
connection_tx: Sender<SocketAddr>,
disconnection_tx: DisconnectSender,
disconnection_tx: Sender<SocketAddr>,
endpoint: Endpoint,
) {
let _ = tokio::spawn(async move {
Expand All @@ -180,10 +180,7 @@ pub(super) fn listen_for_incoming_connections(
);
}
Err(err) => {
error!(
"An incoming connection failed because of an error: {:?}",
err
);
warn!("An incoming connection failed because of: {:?}", err);
}
},
None => {
Expand All @@ -201,7 +198,7 @@ pub(super) fn listen_for_incoming_messages(
mut bi_streams: quinn::IncomingBiStreams,
remover: ConnectionRemover,
message_tx: Sender<(SocketAddr, Bytes)>,
disconnection_tx: DisconnectSender,
disconnection_tx: Sender<SocketAddr>,
endpoint: Endpoint,
) {
let src = *remover.remote_addr();
Expand All @@ -213,10 +210,11 @@ pub(super) fn listen_for_incoming_messages(
.await
{
Ok(_) => {
let _ = disconnection_tx.send((src, Ok(()))).await;
let _ = disconnection_tx.send(src).await;
}
Err(error) => {
let _ = disconnection_tx.send((src, Err(error))).await;
trace!("Issue on stream reading from: {:?} :: {:?}", src, error);
let _ = disconnection_tx.send(src).await;
}
}

Expand All @@ -233,34 +231,35 @@ async fn read_on_uni_streams(
) -> Result<()> {
while let Some(result) = uni_streams.next().await {
match result {
Err(quinn::ConnectionError::ApplicationClosed(frame)) => {
trace!("Connection terminated by peer {:?}.", peer_addr);
return Err(Error::from(quinn::ConnectionError::ApplicationClosed(
frame,
)));
Err(quinn::ConnectionError::ConnectionClosed(_))
| Err(quinn::ConnectionError::ApplicationClosed(_)) => {
trace!("Connection closed by peer {:?}.", peer_addr);
return Err(Error::QuinnConnectionClosed);
}
Err(err) => {
warn!(
"Failed to read incoming message on uni-stream for peer {:?} with error: {:?}",
"Failed to read incoming message on uni-stream for peer {:?} with: {:?}",
peer_addr, err
);
return Err(Error::from(err));
}
Ok(mut recv) => loop {
match read_bytes(&mut recv).await {
Ok(WireMsg::UserMsg(bytes)) => {
trace!("bytes received fine from: {:?} ", peer_addr);
let _ = message_tx.send((peer_addr, bytes)).await;
}
Ok(msg) => error!("Unexpected message type: {:?}", msg),
Ok(msg) => warn!("Unexpected message type: {:?}", msg),
Err(Error::StreamRead(quinn::ReadExactError::FinishedEarly)) => {
return Err(Error::StreamRead(quinn::ReadExactError::FinishedEarly))
warn!("Stream read finished early");
break;
}
Err(err) => {
error!(
"Failed reading from a uni-stream for peer {:?} with error: {:?}",
warn!(
"Failed reading from a uni-stream for peer {:?} with: {:?}",
peer_addr, err
);
return Err(err);
break;
}
}
},
Expand All @@ -278,21 +277,14 @@ async fn read_on_bi_streams(
) -> Result<()> {
while let Some(result) = bi_streams.next().await {
match result {
Err(quinn::ConnectionError::ConnectionClosed(frame)) => {
Err(quinn::ConnectionError::ConnectionClosed(_))
| Err(quinn::ConnectionError::ApplicationClosed(_)) => {
trace!("Connection closed by peer {:?}.", peer_addr);
return Err(Error::from(quinn::ConnectionError::ConnectionClosed(
frame,
)));
}
Err(quinn::ConnectionError::ApplicationClosed(frame)) => {
trace!("Connection terminated by peer {:?}.", peer_addr);
return Err(Error::from(quinn::ConnectionError::ApplicationClosed(
frame,
)));
return Err(Error::QuinnConnectionClosed);
}
Err(err) => {
warn!(
"Failed to read incoming message on bi-stream for peer {:?} with error: {:?}",
"Failed to read incoming message on bi-stream for peer {:?} with: {:?}",
peer_addr, err
);
return Err(Error::from(err));
Expand All @@ -304,8 +296,8 @@ async fn read_on_bi_streams(
}
Ok(WireMsg::EndpointEchoReq) => {
if let Err(error) = handle_endpoint_echo_req(peer_addr, &mut send).await {
error!(
"Failed to handle Echo Request for peer {:?} with error: {:?}",
warn!(
"Failed to handle Echo Request for peer {:?} with: {:?}",
peer_addr, error
);

Expand All @@ -321,26 +313,27 @@ async fn read_on_bi_streams(
)
.await
{
error!("Failed to handle Endpoint verification request for peer {:?} with error: {:?}", peer_addr, error);
warn!("Failed to handle Endpoint verification request for peer {:?} with: {:?}", peer_addr, error);

return Err(error);
}
}
Ok(msg) => {
error!(
warn!(
"Unexpected message type from peer {:?}: {:?}",
peer_addr, msg
);
}
Err(Error::StreamRead(quinn::ReadExactError::FinishedEarly)) => {
return Err(Error::StreamRead(quinn::ReadExactError::FinishedEarly))
warn!("Stream finished early");
break;
}
Err(err) => {
error!(
"Failed reading from a bi-stream for peer {:?} with error: {:?}",
warn!(
"Failed reading from a bi-stream for peer {:?} with: {:?}",
peer_addr, err
);
return Err(err);
break;
}
}
},
Expand Down
105 changes: 72 additions & 33 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@ use super::{
connection_pool::ConnectionPool,
connections::{
listen_for_incoming_connections, listen_for_incoming_messages, Connection,
DisconnectSender, DisconnectionEvents, RecvStream, SendStream,
DisconnectionEvents, RecvStream, SendStream,
},
error::Result,
Config,
};
use bytes::Bytes;
use std::{net::SocketAddr, time::Duration};
use std::net::SocketAddr;

use tokio::sync::broadcast::{self, Receiver, Sender};
use tokio::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender};
use tokio::time::timeout;
use tokio::time::{sleep, timeout, Duration};
use tracing::{debug, error, info, trace, warn};

/// Host name of the Quic communication certificate used by peers
Expand All @@ -40,7 +41,10 @@ const PORT_FORWARD_TIMEOUT: u64 = 30;
const ECHO_SERVICE_QUERY_TIMEOUT: u64 = 30;

/// Standard size of our channel bounds
const STANDARD_CHANNEL_SIZE: usize = 1000;
const STANDARD_CHANNEL_SIZE: usize = 10000;

/// Max number of attempts for connection retries
const MAX_ATTEMPTS: usize = 10;

/// Channel on which incoming messages can be listened to
pub struct IncomingMessages(pub(crate) MpscReceiver<(SocketAddr, Bytes)>);
Expand Down Expand Up @@ -71,7 +75,8 @@ pub struct Endpoint {
public_addr: Option<SocketAddr>,
quic_endpoint: quinn::Endpoint,
message_tx: MpscSender<(SocketAddr, Bytes)>,
disconnection_tx: DisconnectSender,
disconnection_tx: MpscSender<SocketAddr>,

client_cfg: quinn::ClientConfig,
bootstrap_nodes: Vec<SocketAddr>,
qp2p_config: Config,
Expand Down Expand Up @@ -347,34 +352,21 @@ impl Endpoint {
None => {}
}

// This is the first attempt - proceed with establishing the connection now.
let connecting = match self.quic_endpoint.connect_with(
self.client_cfg.clone(),
node_addr,
CERT_SERVER_NAME,
) {
Ok(connecting) => connecting,
Err(error) => {
self.connection_deduplicator
.complete(node_addr, Err(error.clone().into()))
.await;
return Err(error.into());
}
};
let mut attempts: usize = 0;

let new_conn = match connecting.await {
Ok(new_conn) => new_conn,
Err(error) => {
self.connection_deduplicator
.complete(node_addr, Err(error.clone().into()))
.await;
return Err(error.into());
}
};
let mut connecting = self.attempt_connection(node_addr).await;

while connecting.is_err() && attempts < MAX_ATTEMPTS {
sleep(Duration::from_millis(500)).await;
connecting = self.attempt_connection(node_addr).await;
attempts += 1;
}

let final_conn = connecting?;

trace!("Successfully connected to peer: {}", node_addr);

self.add_new_connection_to_pool(new_conn).await;
self.add_new_connection_to_pool(final_conn).await;

self.connection_deduplicator
.complete(node_addr, Ok(()))
Expand All @@ -383,12 +375,41 @@ impl Endpoint {
Ok(())
}

/// Attempt a connection to a node_addr
async fn attempt_connection(&self, node_addr: &SocketAddr) -> Result<quinn::NewConnection> {
trace!("Attempting connecton to {:?}", node_addr);
let connecting = self.quic_endpoint.connect_with(
self.client_cfg.clone(),
node_addr,
CERT_SERVER_NAME,
)?;

match connecting.await {
Ok(new_conn) => Ok(new_conn),
Err(error) => {
self.connection_deduplicator
.complete(node_addr, Err(error.clone().into()))
.await;

Err(Error::QuinnConnectionClosed)
}
}
}

/// Verify if an address is publicly reachable. This will attempt to create
/// a new connection and use it to exchange a message and verify that the node
/// can be reached.
pub async fn is_reachable(&self, peer_addr: &SocketAddr) -> Result<()> {
trace!("Checking is reachable");
let new_connection = self.create_new_connection(peer_addr).await?;
let (mut send_stream, mut recv_stream) = new_connection.connection.open_bi().await?;
let (mut send_stream, mut recv_stream) = match new_connection.connection.open_bi().await {
Ok(cool) => cool,
Err(error) => {
error!("Reachablity check errored with: {:?}", error);
return Err(Error::QuinnConnectionClosed);
}
};

let message = WireMsg::EndpointEchoReq;
message.write_to_stream(&mut send_stream).await?;

Expand Down Expand Up @@ -422,10 +443,17 @@ impl Endpoint {
&self,
peer_addr: &SocketAddr,
) -> Result<quinn::NewConnection> {
let new_connection = self
let new_connection = match self
.quic_endpoint
.connect_with(self.client_cfg.clone(), peer_addr, CERT_SERVER_NAME)?
.await?;
.await
{
Ok(new_connection) => new_connection,
Err(error) => {
debug!("create new connection check error: {:?}", error);
return Err(Error::QuinnConnectionClosed);
}
};

trace!("Successfully created new connection to peer: {}", peer_addr);
Ok(new_connection)
Expand Down Expand Up @@ -489,7 +517,18 @@ impl Endpoint {
return Ok(());
}
self.connect_to(dest).await?;
self.try_send_message(msg, dest).await

let mut attempts: usize = 0;
let mut res = self.try_send_message(msg.clone(), dest).await;

while attempts < MAX_ATTEMPTS && res.is_err() {
trace!("send attempt # {:?}", attempts);
attempts += 1;
sleep(Duration::from_millis(500)).await;
res = self.try_send_message(msg.clone(), dest).await;
}

res
}

/// Close all the connections of this endpoint immediately and stop accepting new connections.
Expand Down
Loading

0 comments on commit 94bfb8b

Please sign in to comment.