Skip to content

Commit

Permalink
feat: use backoff w/ jitter for retries
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuef committed Jul 28, 2021
1 parent fef624d commit fec5d07
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 50 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ structopt = "~0.3.15"
thiserror = "1.0.23"
webpki = "~0.21.3"
tracing = "~0.1.26"
backoff = {version="0.3.0", features = ["tokio"] }

[dependencies.bytes]
version = "1.0.1"
Expand Down
3 changes: 0 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ pub struct Config {
/// Duration of a UPnP port mapping.
#[structopt(long)]
pub upnp_lease_duration: Option<u32>,
/// Duration to wait before retrying to resend / reconnect
#[structopt(long, default_value = "500")]
pub retry_interval: u64,
}

/// To be used to read and write our certificate and private key to disk esp. as a part of our
Expand Down
72 changes: 25 additions & 47 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ use super::{
use bytes::Bytes;
use std::net::SocketAddr;

use backoff::{future::retry, ExponentialBackoff};
use tokio::sync::broadcast::{self, Receiver, Sender};
use tokio::sync::mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender};
use tokio::time::{sleep, timeout, Duration};
use tokio::time::{timeout, Duration};
use tracing::{debug, error, info, trace, warn};

/// Host name of the Quic communication certificate used by peers
Expand All @@ -43,9 +44,6 @@ const ECHO_SERVICE_QUERY_TIMEOUT: u64 = 30;
/// Standard size of our channel bounds
const STANDARD_CHANNEL_SIZE: usize = 10000;

/// Max number of attempts for connection retries
const MAX_ATTEMPTS: usize = 5;

/// Channel on which incoming messages can be listened to
pub struct IncomingMessages(pub(crate) MpscReceiver<(SocketAddr, Bytes)>);

Expand Down Expand Up @@ -351,18 +349,13 @@ impl Endpoint {
Some(Err(error)) => return Err(error.into()),
None => {}
}
let final_conn = retry(ExponentialBackoff::default(), || async {
Ok(self.attempt_connection(node_addr).await?)
})
.await?;

let mut attempts: usize = 0;

let mut connecting = self.attempt_connection(node_addr).await;

while connecting.is_err() && attempts < MAX_ATTEMPTS {
sleep(Duration::from_millis(self.qp2p_config.retry_interval)).await;
connecting = self.attempt_connection(node_addr).await;
attempts += 1;
}

let final_conn = connecting?;
// self.attempt_connection(node_addr)try_with_backoff( || self.attempt_connection(node_addr)).await?;
// let final_conn = try_with_backoff( || self.attempt_connection(node_addr)).await?;

trace!("Successfully connected to peer: {}", node_addr);

Expand Down Expand Up @@ -443,30 +436,10 @@ impl Endpoint {
&self,
peer_addr: &SocketAddr,
) -> Result<quinn::NewConnection> {
let mut attempts = 0;

let mut new_connection = self
.quic_endpoint
.connect_with(self.client_cfg.clone(), peer_addr, CERT_SERVER_NAME)?
.await;

while new_connection.is_err() && attempts < MAX_ATTEMPTS {
attempts += 1;

new_connection = match self
.quic_endpoint
.connect_with(self.client_cfg.clone(), peer_addr, CERT_SERVER_NAME)?
.await
{
Ok(new_connection) => Ok(new_connection),
Err(error) => {
debug!("create new connection check error: {:?}", error);
Err(error)
}
};
}

let new_connection = new_connection?;
let new_connection = retry(ExponentialBackoff::default(), || async {
Ok(self.attempt_connection(peer_addr).await?)
})
.await?;

trace!("Successfully created new connection to peer: {}", peer_addr);
Ok(new_connection)
Expand Down Expand Up @@ -531,15 +504,20 @@ impl Endpoint {
}
self.connect_to(dest).await?;

let mut attempts: usize = 0;
let mut res = self.try_send_message(msg.clone(), dest).await;
let res = retry(ExponentialBackoff::default(), || async {
Ok(self.try_send_message(msg.clone(), dest).await?)
})
.await;

// let mut attempts: usize = 0;
// let mut res = self.try_send_message(msg.clone(), dest).await;

while attempts < MAX_ATTEMPTS && res.is_err() {
trace!("send attempt # {:?}", attempts);
attempts += 1;
sleep(Duration::from_millis(self.qp2p_config.retry_interval)).await;
res = self.try_send_message(msg.clone(), dest).await;
}
// while attempts < MAX_ATTEMPTS as usize && res.is_err() {
// trace!("send attempt # {:?}", attempts);
// attempts += 1;
// sleep(Duration::from_millis(self.qp2p_config.retry_interval)).await;
// res = self.try_send_message(msg.clone(), dest).await;
// }

res
}
Expand Down
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum Error {
/// Coudl not connect even after retries
#[error("Connection could not be established after {0} attempts")]
ConnectionFailed(u32),
/// quinn connection closed
#[error("Connection was closed by underlying quinn library")]
QuinnConnectionClosed,
Expand Down

0 comments on commit fec5d07

Please sign in to comment.