Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Retry whole chainsync operation when possible #332

Merged
merged 2 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/sources/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ use crate::{
Error,
};

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub enum BearerKind {
Tcp,
#[cfg(target_family = "unix")]
Unix,
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct AddressArg(pub BearerKind, pub String);

impl FromStr for BearerKind {
Expand Down Expand Up @@ -147,10 +147,12 @@ where
deserializer.deserialize_any(MagicArgVisitor)
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
pub struct RetryPolicy {
connection_max_retries: u32,
connection_max_backoff: u32,
pub chainsync_max_retries: u32,
pub chainsync_max_backoff: u32,
pub connection_max_retries: u32,
pub connection_max_backoff: u32,
}

pub fn setup_multiplexer_attempt(bearer: &BearerKind, address: &str) -> Result<StdPlexer, Error> {
Expand Down Expand Up @@ -186,7 +188,7 @@ pub fn setup_multiplexer(
}
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
#[serde(tag = "type", content = "value")]
pub enum IntersectArg {
Tip,
Expand Down
121 changes: 114 additions & 7 deletions src/sources/n2c/run.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
use std::{collections::HashMap, fmt::Debug};
use std::{collections::HashMap, fmt::Debug, ops::Deref, sync::Arc, time::Duration};

use pallas::network::{
miniprotocols::{chainsync, run_agent, Point},
miniprotocols::{chainsync, handshake, run_agent, Point, MAINNET_MAGIC},
multiplexer::StdChannel,
};

use crate::{
mapper::EventWriter,
sources::{n2c::blocks::CborHolder, should_finalize, FinalizeConfig},
pipelining::StageSender,
sources::{
define_start_point, n2c::blocks::CborHolder, setup_multiplexer, should_finalize,
FinalizeConfig,
},
utils::{retry, Utils},
Error,
};

Expand Down Expand Up @@ -119,13 +124,13 @@ impl<'b> chainsync::Observer<chainsync::BlockContent> for ChainObserver {
}
}

pub(crate) fn observe_forever(
fn observe_forever(
mut channel: StdChannel,
event_writer: EventWriter,
known_points: Option<Vec<Point>>,
min_depth: usize,
finalize_config: Option<FinalizeConfig>,
) -> Result<(), Error> {
) -> Result<(), AttemptError> {
let observer = ChainObserver {
chain_buffer: Default::default(),
blocks: HashMap::new(),
Expand All @@ -136,8 +141,110 @@ pub(crate) fn observe_forever(
};

let agent = chainsync::BlockConsumer::initial(known_points, observer);
let agent = run_agent(agent, &mut channel)?;
log::warn!("chainsync agent final state: {:?}", agent.state);

match run_agent(agent, &mut channel) {
Ok(agent) => {
log::debug!("chainsync agent final state: {:?}", agent.state);
Ok(())
}
Err(err) => Err(AttemptError::Recoverable(err.into())),
}
}

#[derive(Debug)]
enum AttemptError {
Recoverable(Error),
Other(Error),
}

fn do_handshake(channel: &mut StdChannel, magic: u64) -> Result<(), AttemptError> {
let versions = handshake::n2c::VersionTable::v1_and_above(magic);

match run_agent(handshake::Initiator::initial(versions), channel) {
Ok(agent) => match agent.output {
handshake::Output::Accepted(_, _) => Ok(()),
_ => Err(AttemptError::Other(
"couldn't agree on handshake version".into(),
)),
},
Err(err) => Err(AttemptError::Recoverable(err.into())),
}
}

fn do_chainsync_attempt(
config: &super::Config,
utils: Arc<Utils>,
output_tx: &StageSender,
) -> Result<(), AttemptError> {
let magic = match config.magic.as_ref() {
Some(m) => *m.deref(),
None => MAINNET_MAGIC,
};

let mut plexer = setup_multiplexer(&config.address.0, &config.address.1, &config.retry_policy)
.map_err(|x| AttemptError::Recoverable(x))?;

let mut hs_channel = plexer.use_channel(0);
let mut cs_channel = plexer.use_channel(5);

plexer.muxer.spawn();
plexer.demuxer.spawn();

do_handshake(&mut hs_channel, magic)?;

let known_points = define_start_point(
&config.intersect,
#[allow(deprecated)]
&config.since,
&utils,
&mut cs_channel,
)
.map_err(|err| AttemptError::Recoverable(err))?;

log::info!("starting chain sync from: {:?}", &known_points);

let writer = EventWriter::new(output_tx.clone(), utils, config.mapper.clone());

observe_forever(
cs_channel,
writer,
known_points,
config.min_depth,
config.finalize.clone(),
)?;

Ok(())
}

pub fn do_chainsync(
config: &super::Config,
utils: Arc<Utils>,
output_tx: StageSender,
) -> Result<(), Error> {
retry::retry_operation(
|| match do_chainsync_attempt(config, utils.clone(), &output_tx) {
Ok(()) => Ok(()),
Err(AttemptError::Other(msg)) => {
log::error!("N2N error: {}", msg);
log::warn!("unrecoverable error performing chainsync, will exit");
Ok(())
}
Err(AttemptError::Recoverable(err)) => Err(err),
},
&retry::Policy {
max_retries: config
.retry_policy
.as_ref()
.map(|x| x.chainsync_max_retries)
.unwrap_or(50),
backoff_unit: Duration::from_secs(1),
backoff_factor: 2,
max_backoff: config
.retry_policy
.as_ref()
.map(|x| x.chainsync_max_backoff as u64)
.map(Duration::from_secs)
.unwrap_or(Duration::from_secs(60)),
},
)
}
66 changes: 8 additions & 58 deletions src/sources/n2c/setup.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,18 @@
use std::ops::Deref;

use pallas::network::{
miniprotocols::{handshake, run_agent, MAINNET_MAGIC},
multiplexer::StdChannel,
};

use serde::Deserialize;

use crate::{
mapper::{Config as MapperConfig, EventWriter},
mapper::Config as MapperConfig,
pipelining::{new_inter_stage_channel, PartialBootstrapResult, SourceProvider},
sources::{
common::{AddressArg, MagicArg, PointArg},
define_start_point, setup_multiplexer, FinalizeConfig, IntersectArg, RetryPolicy,
FinalizeConfig, IntersectArg, RetryPolicy,
},
utils::{ChainWellKnownInfo, WithUtils},
Error,
};

use super::run::observe_forever;
use super::run::do_chainsync;

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct Config {
pub address: AddressArg,

Expand Down Expand Up @@ -53,57 +45,15 @@ pub struct Config {
pub finalize: Option<FinalizeConfig>,
}

fn do_handshake(channel: &mut StdChannel, magic: u64) -> Result<(), Error> {
let versions = handshake::n2c::VersionTable::v1_and_above(magic);
let agent = run_agent(handshake::Initiator::initial(versions), channel)?;
log::info!("handshake output: {:?}", agent.output);

match agent.output {
handshake::Output::Accepted(_, _) => Ok(()),
_ => Err("couldn't agree on handshake version for client connection".into()),
}
}

impl SourceProvider for WithUtils<Config> {
fn bootstrap(&self) -> PartialBootstrapResult {
let (output_tx, output_rx) = new_inter_stage_channel(None);

let mut plexer = setup_multiplexer(
&self.inner.address.0,
&self.inner.address.1,
&self.inner.retry_policy,
)?;

let mut hs_channel = plexer.use_channel(0);
let mut cs_channel = plexer.use_channel(5);

plexer.muxer.spawn();
plexer.demuxer.spawn();

let magic = match &self.inner.magic {
Some(m) => *m.deref(),
None => MAINNET_MAGIC,
};

let writer = EventWriter::new(output_tx, self.utils.clone(), self.inner.mapper.clone());

do_handshake(&mut hs_channel, magic)?;

let known_points = define_start_point(
&self.inner.intersect,
#[allow(deprecated)]
&self.inner.since,
&self.utils,
&mut cs_channel,
)?;

log::info!("starting chain sync from: {:?}", &known_points);

let min_depth = self.inner.min_depth;
let finalize = self.inner.finalize.clone();
let config = self.inner.clone();
let utils = self.utils.clone();
let handle = std::thread::spawn(move || {
observe_forever(cs_channel, writer, known_points, min_depth, finalize)
.expect("chainsync loop failed");
do_chainsync(&config, utils, output_tx)
.expect("chainsync process fails after max retries")
});

Ok((handle, output_rx))
Expand Down
Loading