Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add connection-retry logic with exponential backoff #213

Merged
merged 2 commits into from
Mar 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/bin/oura/dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {
mapper,
since: None,
intersect,
retry_policy: None,
}),
PeerMode::AsClient => DumpSource::N2C(N2CConfig {
address: AddressArg(bearer, socket),
Expand All @@ -127,6 +128,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {
mapper,
since: None,
intersect,
retry_policy: None,
}),
};

Expand Down
2 changes: 2 additions & 0 deletions src/bin/oura/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {
mapper,
since: None,
intersect,
retry_policy: None,
}),
PeerMode::AsClient => WatchSource::N2C(N2CConfig {
address: AddressArg(bearer, socket),
Expand All @@ -105,6 +106,7 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> {
mapper,
since: None,
intersect,
retry_policy: None,
}),
};

Expand Down
58 changes: 55 additions & 3 deletions src/sources/common.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
use core::fmt;
use std::{ops::Deref, str::FromStr};
use std::{net::TcpStream, ops::Deref, str::FromStr, time::Duration};

#[cfg(target_family = "unix")]
use std::os::unix::net::UnixStream;

use log::info;
use net2::TcpStreamExt;
use pallas::network::{
miniprotocols::{chainsync::TipFinder, run_agent, Point, MAINNET_MAGIC, TESTNET_MAGIC},
multiplexer::Channel,
multiplexer::{Channel, Multiplexer},
};
use serde::{de::Visitor, Deserializer};
use serde::{Deserialize, Serialize};

use crate::{
utils::{ChainWellKnownInfo, Utils},
utils::{retry, ChainWellKnownInfo, Utils},
Error,
};

Expand Down Expand Up @@ -141,6 +145,54 @@ where
deserializer.deserialize_any(MagicArgVisitor)
}

#[derive(Deserialize, Debug)]
pub struct RetryPolicy {
connection_max_retries: u32,
connection_max_backoff: u32,
}

pub fn setup_multiplexer_attempt(
bearer: &BearerKind,
address: &str,
protocols: &[u16],
) -> Result<Multiplexer, Error> {
match bearer {
BearerKind::Tcp => {
let tcp = TcpStream::connect(address)?;
tcp.set_nodelay(true)?;
tcp.set_keepalive_ms(Some(30_000u32))?;

Multiplexer::setup(tcp, protocols)
}
#[cfg(target_family = "unix")]
BearerKind::Unix => {
let unix = UnixStream::connect(address)?;

Multiplexer::setup(unix, protocols)
}
}
}

pub fn setup_multiplexer(
bearer: &BearerKind,
address: &str,
protocols: &[u16],
retry: &Option<RetryPolicy>,
) -> Result<Multiplexer, Error> {
match retry {
Some(policy) => retry::retry_operation(
|| setup_multiplexer_attempt(bearer, address, protocols),
&retry::Policy {
max_retries: policy.connection_max_retries,
backoff_unit: Duration::from_secs(1),
backoff_factor: 2,
max_backoff: Duration::from_secs(policy.connection_max_backoff as u64),
},
),
None => setup_multiplexer_attempt(bearer, address, protocols),
}
}

#[derive(Debug, Deserialize)]
#[serde(tag = "type", content = "value")]
pub enum IntersectArg {
Expand Down
40 changes: 12 additions & 28 deletions src/sources/n2c/setup.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
#[cfg(target_family = "unix")]
use std::os::unix::net::UnixStream;
use std::{net::TcpStream, ops::Deref};

use net2::TcpStreamExt;
use std::ops::Deref;

use log::info;

use pallas::network::{
miniprotocols::{handshake::n2c, run_agent, MAINNET_MAGIC},
multiplexer::{Channel, Multiplexer},
multiplexer::Channel,
};

use serde::Deserialize;
Expand All @@ -17,8 +13,8 @@ use crate::{
mapper::{Config as MapperConfig, EventWriter},
pipelining::{new_inter_stage_channel, PartialBootstrapResult, SourceProvider},
sources::{
common::{AddressArg, BearerKind, MagicArg, PointArg},
define_start_point, IntersectArg,
common::{AddressArg, MagicArg, PointArg},
define_start_point, setup_multiplexer, IntersectArg, RetryPolicy,
},
utils::{ChainWellKnownInfo, WithUtils},
Error,
Expand Down Expand Up @@ -53,6 +49,8 @@ pub struct Config {
/// will need some time to fill up the buffer before sending the 1st event.
#[serde(default)]
pub min_depth: usize,

pub retry_policy: Option<RetryPolicy>,
}

fn do_handshake(channel: &mut Channel, magic: u64) -> Result<(), Error> {
Expand All @@ -66,30 +64,16 @@ fn do_handshake(channel: &mut Channel, magic: u64) -> Result<(), Error> {
}
}

#[cfg(target_family = "unix")]
fn setup_unix_multiplexer(path: &str) -> Result<Multiplexer, Error> {
let unix = UnixStream::connect(path)?;

Multiplexer::setup(unix, &[0, 5, 7])
}

fn setup_tcp_multiplexer(address: &str) -> Result<Multiplexer, Error> {
let tcp = TcpStream::connect(address)?;
tcp.set_nodelay(true)?;
tcp.set_keepalive_ms(Some(30_000u32))?;

Multiplexer::setup(tcp, &[0, 5])
}

impl SourceProvider for WithUtils<Config> {
fn bootstrap(&self) -> PartialBootstrapResult {
let (output_tx, output_rx) = new_inter_stage_channel(None);

let mut muxer = match self.inner.address.0 {
BearerKind::Tcp => setup_tcp_multiplexer(&self.inner.address.1)?,
#[cfg(target_family = "unix")]
BearerKind::Unix => setup_unix_multiplexer(&self.inner.address.1)?,
};
let mut muxer = setup_multiplexer(
&self.inner.address.0,
&self.inner.address.1,
&[0, 5],
&self.inner.retry_policy,
)?;

let magic = match &self.inner.magic {
Some(m) => *m.deref(),
Expand Down
40 changes: 12 additions & 28 deletions src/sources/n2n/setup.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
#[cfg(target_family = "unix")]
use std::os::unix::net::UnixStream;
use std::{net::TcpStream, ops::Deref};

use net2::TcpStreamExt;
use std::ops::Deref;

use log::info;

use pallas::network::{
miniprotocols::{handshake::n2n, run_agent, MAINNET_MAGIC},
multiplexer::{Channel, Multiplexer},
multiplexer::Channel,
};

use serde::Deserialize;
Expand All @@ -17,8 +13,8 @@ use crate::{
mapper::{Config as MapperConfig, EventWriter},
pipelining::{new_inter_stage_channel, PartialBootstrapResult, SourceProvider},
sources::{
common::{AddressArg, BearerKind, MagicArg, PointArg},
define_start_point, IntersectArg,
common::{AddressArg, MagicArg, PointArg},
define_start_point, setup_multiplexer, IntersectArg, RetryPolicy,
},
utils::{ChainWellKnownInfo, WithUtils},
Error,
Expand Down Expand Up @@ -53,6 +49,8 @@ pub struct Config {
/// will need some time to fill up the buffer before sending the 1st event.
#[serde(default)]
pub min_depth: usize,

pub retry_policy: Option<RetryPolicy>,
}

fn do_handshake(channel: &mut Channel, magic: u64) -> Result<(), Error> {
Expand All @@ -66,30 +64,16 @@ fn do_handshake(channel: &mut Channel, magic: u64) -> Result<(), Error> {
}
}

#[cfg(target_family = "unix")]
fn setup_unix_multiplexer(path: &str) -> Result<Multiplexer, Error> {
let unix = UnixStream::connect(path)?;

Multiplexer::setup(unix, &[0, 2, 3])
}

fn setup_tcp_multiplexer(address: &str) -> Result<Multiplexer, Error> {
let tcp = TcpStream::connect(address)?;
tcp.set_nodelay(true)?;
tcp.set_keepalive_ms(Some(30_000u32))?;

Multiplexer::setup(tcp, &[0, 2, 3])
}

impl SourceProvider for WithUtils<Config> {
fn bootstrap(&self) -> PartialBootstrapResult {
let (output_tx, output_rx) = new_inter_stage_channel(None);

let mut muxer = match self.inner.address.0 {
BearerKind::Tcp => setup_tcp_multiplexer(&self.inner.address.1)?,
#[cfg(target_family = "unix")]
BearerKind::Unix => setup_unix_multiplexer(&self.inner.address.1)?,
};
let mut muxer = setup_multiplexer(
&self.inner.address.0,
&self.inner.address.1,
&[0, 2, 3],
&self.inner.retry_policy,
)?;

let magic = match &self.inner.magic {
Some(m) => *m.deref(),
Expand Down
1 change: 1 addition & 0 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod metrics;
pub mod throttle;

pub(crate) mod bech32;
pub(crate) mod retry;
pub(crate) mod time;

mod facade;
Expand Down
99 changes: 99 additions & 0 deletions src/utils/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::{fmt::Debug, ops::Mul, time::Duration};

pub struct Policy {
pub max_retries: u32,
pub backoff_unit: Duration,
pub backoff_factor: u32,
pub max_backoff: Duration,
}

fn compute_backoff_delay(policy: &Policy, retry: u32) -> Duration {
let units = policy.backoff_factor.pow(retry);
let backoff = policy.backoff_unit.mul(units);
core::cmp::min(backoff, policy.max_backoff)
}

pub fn retry_operation<T, E>(op: impl Fn() -> Result<T, E>, policy: &Policy) -> Result<T, E>
where
E: Debug,
{
let mut retry = 0;

loop {
let result = op();

match result {
Ok(x) => break Ok(x),
Err(err) if retry < policy.max_retries => {
log::warn!("retryable operation error: {:?}", err);

retry += 1;

let backoff = compute_backoff_delay(policy, retry);

log::debug!(
"backoff for {}s until next retry #{}",
backoff.as_secs(),
retry
);

std::thread::sleep(backoff);
}
Err(x) => {
log::error!("max retries reached, failing whole operation");
break Err(x);
}
}
}
}

#[cfg(test)]
mod tests {
use std::{cell::RefCell, rc::Rc};

use super::*;

#[test]
fn honors_max_retries() {
let counter = Rc::new(RefCell::new(0));

let inner_counter = counter.clone();
let op = move || -> Result<(), String> {
*inner_counter.borrow_mut() += 1;
Err("very bad stuff happened".to_string())
};

let policy = Policy {
max_retries: 3,
backoff_unit: Duration::from_secs(1),
backoff_factor: 0,
max_backoff: Duration::from_secs(100),
};

assert!(retry_operation(op, &policy).is_err());

assert_eq!(*counter.borrow(), 4);
}

#[test]
fn honors_exponential_backoff() {
let op = move || -> Result<(), String> { Err("very bad stuff happened".to_string()) };

let policy = Policy {
max_retries: 10,
backoff_unit: Duration::from_millis(1),
backoff_factor: 2,
max_backoff: Duration::MAX,
};

let start = std::time::Instant::now();
let result = retry_operation(op, &policy);
let elapsed = start.elapsed();

assert!(result.is_err());

// not an exact science, should be 2046, adding +/- 10%
assert!(elapsed.as_millis() >= 1842);
assert!(elapsed.as_millis() <= 2250);
}
}