Skip to content

Commit

Permalink
feat: Retry whole chainsync operation when possible (txpipe#332)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored and kodemill committed Jun 24, 2022
1 parent c8d5517 commit 93190b0
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 162 deletions.
14 changes: 8 additions & 6 deletions src/sources/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ use crate::{
Error,
};

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub enum BearerKind {
Tcp,
#[cfg(target_family = "unix")]
Unix,
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct AddressArg(pub BearerKind, pub String);

impl FromStr for BearerKind {
Expand Down Expand Up @@ -147,10 +147,12 @@ where
deserializer.deserialize_any(MagicArgVisitor)
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
pub struct RetryPolicy {
connection_max_retries: u32,
connection_max_backoff: u32,
pub chainsync_max_retries: u32,
pub chainsync_max_backoff: u32,
pub connection_max_retries: u32,
pub connection_max_backoff: u32,
}

pub fn setup_multiplexer_attempt(bearer: &BearerKind, address: &str) -> Result<StdPlexer, Error> {
Expand Down Expand Up @@ -186,7 +188,7 @@ pub fn setup_multiplexer(
}
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
#[serde(tag = "type", content = "value")]
pub enum IntersectArg {
Tip,
Expand Down
121 changes: 114 additions & 7 deletions src/sources/n2c/run.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
use std::{collections::HashMap, fmt::Debug};
use std::{collections::HashMap, fmt::Debug, ops::Deref, sync::Arc, time::Duration};

use pallas::network::{
miniprotocols::{chainsync, run_agent, Point},
miniprotocols::{chainsync, handshake, run_agent, Point, MAINNET_MAGIC},
multiplexer::StdChannel,
};

use crate::{
mapper::EventWriter,
sources::{n2c::blocks::CborHolder, should_finalize, FinalizeConfig},
pipelining::StageSender,
sources::{
define_start_point, n2c::blocks::CborHolder, setup_multiplexer, should_finalize,
FinalizeConfig,
},
utils::{retry, Utils},
Error,
};

Expand Down Expand Up @@ -119,13 +124,13 @@ impl<'b> chainsync::Observer<chainsync::BlockContent> for ChainObserver {
}
}

pub(crate) fn observe_forever(
fn observe_forever(
mut channel: StdChannel,
event_writer: EventWriter,
known_points: Option<Vec<Point>>,
min_depth: usize,
finalize_config: Option<FinalizeConfig>,
) -> Result<(), Error> {
) -> Result<(), AttemptError> {
let observer = ChainObserver {
chain_buffer: Default::default(),
blocks: HashMap::new(),
Expand All @@ -136,8 +141,110 @@ pub(crate) fn observe_forever(
};

let agent = chainsync::BlockConsumer::initial(known_points, observer);
let agent = run_agent(agent, &mut channel)?;
log::warn!("chainsync agent final state: {:?}", agent.state);

match run_agent(agent, &mut channel) {
Ok(agent) => {
log::debug!("chainsync agent final state: {:?}", agent.state);
Ok(())
}
Err(err) => Err(AttemptError::Recoverable(err.into())),
}
}

#[derive(Debug)]
enum AttemptError {
Recoverable(Error),
Other(Error),
}

fn do_handshake(channel: &mut StdChannel, magic: u64) -> Result<(), AttemptError> {
let versions = handshake::n2c::VersionTable::v1_and_above(magic);

match run_agent(handshake::Initiator::initial(versions), channel) {
Ok(agent) => match agent.output {
handshake::Output::Accepted(_, _) => Ok(()),
_ => Err(AttemptError::Other(
"couldn't agree on handshake version".into(),
)),
},
Err(err) => Err(AttemptError::Recoverable(err.into())),
}
}

fn do_chainsync_attempt(
config: &super::Config,
utils: Arc<Utils>,
output_tx: &StageSender,
) -> Result<(), AttemptError> {
let magic = match config.magic.as_ref() {
Some(m) => *m.deref(),
None => MAINNET_MAGIC,
};

let mut plexer = setup_multiplexer(&config.address.0, &config.address.1, &config.retry_policy)
.map_err(|x| AttemptError::Recoverable(x))?;

let mut hs_channel = plexer.use_channel(0);
let mut cs_channel = plexer.use_channel(5);

plexer.muxer.spawn();
plexer.demuxer.spawn();

do_handshake(&mut hs_channel, magic)?;

let known_points = define_start_point(
&config.intersect,
#[allow(deprecated)]
&config.since,
&utils,
&mut cs_channel,
)
.map_err(|err| AttemptError::Recoverable(err))?;

log::info!("starting chain sync from: {:?}", &known_points);

let writer = EventWriter::new(output_tx.clone(), utils, config.mapper.clone());

observe_forever(
cs_channel,
writer,
known_points,
config.min_depth,
config.finalize.clone(),
)?;

Ok(())
}

pub fn do_chainsync(
config: &super::Config,
utils: Arc<Utils>,
output_tx: StageSender,
) -> Result<(), Error> {
retry::retry_operation(
|| match do_chainsync_attempt(config, utils.clone(), &output_tx) {
Ok(()) => Ok(()),
Err(AttemptError::Other(msg)) => {
log::error!("N2N error: {}", msg);
log::warn!("unrecoverable error performing chainsync, will exit");
Ok(())
}
Err(AttemptError::Recoverable(err)) => Err(err),
},
&retry::Policy {
max_retries: config
.retry_policy
.as_ref()
.map(|x| x.chainsync_max_retries)
.unwrap_or(50),
backoff_unit: Duration::from_secs(1),
backoff_factor: 2,
max_backoff: config
.retry_policy
.as_ref()
.map(|x| x.chainsync_max_backoff as u64)
.map(Duration::from_secs)
.unwrap_or(Duration::from_secs(60)),
},
)
}
66 changes: 8 additions & 58 deletions src/sources/n2c/setup.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,18 @@
use std::ops::Deref;

use pallas::network::{
miniprotocols::{handshake, run_agent, MAINNET_MAGIC},
multiplexer::StdChannel,
};

use serde::Deserialize;

use crate::{
mapper::{Config as MapperConfig, EventWriter},
mapper::Config as MapperConfig,
pipelining::{new_inter_stage_channel, PartialBootstrapResult, SourceProvider},
sources::{
common::{AddressArg, MagicArg, PointArg},
define_start_point, setup_multiplexer, FinalizeConfig, IntersectArg, RetryPolicy,
FinalizeConfig, IntersectArg, RetryPolicy,
},
utils::{ChainWellKnownInfo, WithUtils},
Error,
};

use super::run::observe_forever;
use super::run::do_chainsync;

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct Config {
pub address: AddressArg,

Expand Down Expand Up @@ -53,57 +45,15 @@ pub struct Config {
pub finalize: Option<FinalizeConfig>,
}

fn do_handshake(channel: &mut StdChannel, magic: u64) -> Result<(), Error> {
let versions = handshake::n2c::VersionTable::v1_and_above(magic);
let agent = run_agent(handshake::Initiator::initial(versions), channel)?;
log::info!("handshake output: {:?}", agent.output);

match agent.output {
handshake::Output::Accepted(_, _) => Ok(()),
_ => Err("couldn't agree on handshake version for client connection".into()),
}
}

impl SourceProvider for WithUtils<Config> {
fn bootstrap(&self) -> PartialBootstrapResult {
let (output_tx, output_rx) = new_inter_stage_channel(None);

let mut plexer = setup_multiplexer(
&self.inner.address.0,
&self.inner.address.1,
&self.inner.retry_policy,
)?;

let mut hs_channel = plexer.use_channel(0);
let mut cs_channel = plexer.use_channel(5);

plexer.muxer.spawn();
plexer.demuxer.spawn();

let magic = match &self.inner.magic {
Some(m) => *m.deref(),
None => MAINNET_MAGIC,
};

let writer = EventWriter::new(output_tx, self.utils.clone(), self.inner.mapper.clone());

do_handshake(&mut hs_channel, magic)?;

let known_points = define_start_point(
&self.inner.intersect,
#[allow(deprecated)]
&self.inner.since,
&self.utils,
&mut cs_channel,
)?;

log::info!("starting chain sync from: {:?}", &known_points);

let min_depth = self.inner.min_depth;
let finalize = self.inner.finalize.clone();
let config = self.inner.clone();
let utils = self.utils.clone();
let handle = std::thread::spawn(move || {
observe_forever(cs_channel, writer, known_points, min_depth, finalize)
.expect("chainsync loop failed");
do_chainsync(&config, utils, output_tx)
.expect("chainsync process fails after max retries")
});

Ok((handle, output_rx))
Expand Down
Loading

0 comments on commit 93190b0

Please sign in to comment.