From 93190b0c67351375253b19e7427211878d7e4afd Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Mon, 20 Jun 2022 21:24:42 -0300 Subject: [PATCH] feat: Retry whole chainsync operation when possible (#332) --- src/sources/common.rs | 14 +++-- src/sources/n2c/run.rs | 121 +++++++++++++++++++++++++++++++++--- src/sources/n2c/setup.rs | 66 +++----------------- src/sources/n2n/run.rs | 131 ++++++++++++++++++++++++++++++++++++--- src/sources/n2n/setup.rs | 93 +++------------------------ src/utils/mod.rs | 1 + 6 files changed, 264 insertions(+), 162 deletions(-) diff --git a/src/sources/common.rs b/src/sources/common.rs index 74f98709..7eb27e86 100644 --- a/src/sources/common.rs +++ b/src/sources/common.rs @@ -13,14 +13,14 @@ use crate::{ Error, }; -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone)] pub enum BearerKind { Tcp, #[cfg(target_family = "unix")] Unix, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone)] pub struct AddressArg(pub BearerKind, pub String); impl FromStr for BearerKind { @@ -147,10 +147,12 @@ where deserializer.deserialize_any(MagicArgVisitor) } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Clone)] pub struct RetryPolicy { - connection_max_retries: u32, - connection_max_backoff: u32, + pub chainsync_max_retries: u32, + pub chainsync_max_backoff: u32, + pub connection_max_retries: u32, + pub connection_max_backoff: u32, } pub fn setup_multiplexer_attempt(bearer: &BearerKind, address: &str) -> Result { @@ -186,7 +188,7 @@ pub fn setup_multiplexer( } } -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone)] #[serde(tag = "type", content = "value")] pub enum IntersectArg { Tip, diff --git a/src/sources/n2c/run.rs b/src/sources/n2c/run.rs index 58993ce4..03594be8 100644 --- a/src/sources/n2c/run.rs +++ b/src/sources/n2c/run.rs @@ -1,13 +1,18 @@ -use std::{collections::HashMap, fmt::Debug}; +use std::{collections::HashMap, fmt::Debug, ops::Deref, sync::Arc, time::Duration}; use pallas::network::{ - miniprotocols::{chainsync, run_agent, Point}, + miniprotocols::{chainsync, handshake, run_agent, Point, MAINNET_MAGIC}, multiplexer::StdChannel, }; use crate::{ mapper::EventWriter, - sources::{n2c::blocks::CborHolder, should_finalize, FinalizeConfig}, + pipelining::StageSender, + sources::{ + define_start_point, n2c::blocks::CborHolder, setup_multiplexer, should_finalize, + FinalizeConfig, + }, + utils::{retry, Utils}, Error, }; @@ -119,13 +124,13 @@ impl<'b> chainsync::Observer for ChainObserver { } } -pub(crate) fn observe_forever( +fn observe_forever( mut channel: StdChannel, event_writer: EventWriter, known_points: Option>, min_depth: usize, finalize_config: Option, -) -> Result<(), Error> { +) -> Result<(), AttemptError> { let observer = ChainObserver { chain_buffer: Default::default(), blocks: HashMap::new(), @@ -136,8 +141,110 @@ pub(crate) fn observe_forever( }; let agent = chainsync::BlockConsumer::initial(known_points, observer); - let agent = run_agent(agent, &mut channel)?; - log::warn!("chainsync agent final state: {:?}", agent.state); + + match run_agent(agent, &mut channel) { + Ok(agent) => { + log::debug!("chainsync agent final state: {:?}", agent.state); + Ok(()) + } + Err(err) => Err(AttemptError::Recoverable(err.into())), + } +} + +#[derive(Debug)] +enum AttemptError { + Recoverable(Error), + Other(Error), +} + +fn do_handshake(channel: &mut StdChannel, magic: u64) -> Result<(), AttemptError> { + let versions = handshake::n2c::VersionTable::v1_and_above(magic); + + match run_agent(handshake::Initiator::initial(versions), channel) { + Ok(agent) => match agent.output { + handshake::Output::Accepted(_, _) => Ok(()), + _ => Err(AttemptError::Other( + "couldn't agree on handshake version".into(), + )), + }, + Err(err) => Err(AttemptError::Recoverable(err.into())), + } +} + +fn do_chainsync_attempt( + config: &super::Config, + utils: Arc, + output_tx: &StageSender, +) -> Result<(), AttemptError> { + let magic = match config.magic.as_ref() { + Some(m) => *m.deref(), + None => MAINNET_MAGIC, + }; + + let mut plexer = setup_multiplexer(&config.address.0, &config.address.1, &config.retry_policy) + .map_err(|x| AttemptError::Recoverable(x))?; + + let mut hs_channel = plexer.use_channel(0); + let mut cs_channel = plexer.use_channel(5); + + plexer.muxer.spawn(); + plexer.demuxer.spawn(); + + do_handshake(&mut hs_channel, magic)?; + + let known_points = define_start_point( + &config.intersect, + #[allow(deprecated)] + &config.since, + &utils, + &mut cs_channel, + ) + .map_err(|err| AttemptError::Recoverable(err))?; + + log::info!("starting chain sync from: {:?}", &known_points); + + let writer = EventWriter::new(output_tx.clone(), utils, config.mapper.clone()); + + observe_forever( + cs_channel, + writer, + known_points, + config.min_depth, + config.finalize.clone(), + )?; Ok(()) } + +pub fn do_chainsync( + config: &super::Config, + utils: Arc, + output_tx: StageSender, +) -> Result<(), Error> { + retry::retry_operation( + || match do_chainsync_attempt(config, utils.clone(), &output_tx) { + Ok(()) => Ok(()), + Err(AttemptError::Other(msg)) => { + log::error!("N2N error: {}", msg); + log::warn!("unrecoverable error performing chainsync, will exit"); + Ok(()) + } + Err(AttemptError::Recoverable(err)) => Err(err), + }, + &retry::Policy { + max_retries: config + .retry_policy + .as_ref() + .map(|x| x.chainsync_max_retries) + .unwrap_or(50), + backoff_unit: Duration::from_secs(1), + backoff_factor: 2, + max_backoff: config + .retry_policy + .as_ref() + .map(|x| x.chainsync_max_backoff as u64) + .map(Duration::from_secs) + .unwrap_or(Duration::from_secs(60)), + }, + ) +} diff --git a/src/sources/n2c/setup.rs b/src/sources/n2c/setup.rs index f85fdb2f..c1bc5c39 100644 --- a/src/sources/n2c/setup.rs +++ b/src/sources/n2c/setup.rs @@ -1,26 +1,18 @@ -use std::ops::Deref; - -use pallas::network::{ - miniprotocols::{handshake, run_agent, MAINNET_MAGIC}, - multiplexer::StdChannel, -}; - use serde::Deserialize; use crate::{ - mapper::{Config as MapperConfig, EventWriter}, + mapper::Config as MapperConfig, pipelining::{new_inter_stage_channel, PartialBootstrapResult, SourceProvider}, sources::{ common::{AddressArg, MagicArg, PointArg}, - define_start_point, setup_multiplexer, FinalizeConfig, IntersectArg, RetryPolicy, + FinalizeConfig, IntersectArg, RetryPolicy, }, utils::{ChainWellKnownInfo, WithUtils}, - Error, }; -use super::run::observe_forever; +use super::run::do_chainsync; -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone)] pub struct Config { pub address: AddressArg, @@ -53,57 +45,15 @@ pub struct Config { pub finalize: Option, } -fn do_handshake(channel: &mut StdChannel, magic: u64) -> Result<(), Error> { - let versions = handshake::n2c::VersionTable::v1_and_above(magic); - let agent = run_agent(handshake::Initiator::initial(versions), channel)?; - log::info!("handshake output: {:?}", agent.output); - - match agent.output { - handshake::Output::Accepted(_, _) => Ok(()), - _ => Err("couldn't agree on handshake version for client connection".into()), - } -} - impl SourceProvider for WithUtils { fn bootstrap(&self) -> PartialBootstrapResult { let (output_tx, output_rx) = new_inter_stage_channel(None); - let mut plexer = setup_multiplexer( - &self.inner.address.0, - &self.inner.address.1, - &self.inner.retry_policy, - )?; - - let mut hs_channel = plexer.use_channel(0); - let mut cs_channel = plexer.use_channel(5); - - plexer.muxer.spawn(); - plexer.demuxer.spawn(); - - let magic = match &self.inner.magic { - Some(m) => *m.deref(), - None => MAINNET_MAGIC, - }; - - let writer = EventWriter::new(output_tx, self.utils.clone(), self.inner.mapper.clone()); - - do_handshake(&mut hs_channel, magic)?; - - let known_points = define_start_point( - &self.inner.intersect, - #[allow(deprecated)] - &self.inner.since, - &self.utils, - &mut cs_channel, - )?; - - log::info!("starting chain sync from: {:?}", &known_points); - - let min_depth = self.inner.min_depth; - let finalize = self.inner.finalize.clone(); + let config = self.inner.clone(); + let utils = self.utils.clone(); let handle = std::thread::spawn(move || { - observe_forever(cs_channel, writer, known_points, min_depth, finalize) - .expect("chainsync loop failed"); + do_chainsync(&config, utils, output_tx) + .expect("chainsync process fails after max retries") }); Ok((handle, output_rx)) diff --git a/src/sources/n2n/run.rs b/src/sources/n2n/run.rs index 5cc6f7aa..fb86f8c4 100644 --- a/src/sources/n2n/run.rs +++ b/src/sources/n2n/run.rs @@ -1,9 +1,9 @@ -use std::fmt::Debug; +use std::{fmt::Debug, ops::Deref, sync::Arc, time::Duration}; use pallas::{ ledger::primitives::{probing, Era}, network::{ - miniprotocols::{blockfetch, chainsync, run_agent, Point}, + miniprotocols::{blockfetch, chainsync, handshake, run_agent, Point, MAINNET_MAGIC}, multiplexer::StdChannel, }, }; @@ -12,8 +12,9 @@ use std::sync::mpsc::{Receiver, SyncSender}; use crate::{ mapper::EventWriter, - sources::{should_finalize, FinalizeConfig}, - utils::SwallowResult, + pipelining::StageSender, + sources::{define_start_point, setup_multiplexer, should_finalize, FinalizeConfig}, + utils::{retry, SwallowResult, Utils}, Error, }; @@ -155,14 +156,14 @@ pub(crate) fn fetch_blocks_forever( Ok(()) } -pub(crate) fn observe_headers_forever( +fn observe_headers_forever( mut channel: StdChannel, event_writer: EventWriter, known_points: Option>, block_requests: SyncSender, min_depth: usize, finalize_config: Option, -) -> Result<(), Error> { +) -> Result<(), AttemptError> { let observer = &mut ChainObserver { chain_buffer: Default::default(), min_depth, @@ -173,8 +174,122 @@ pub(crate) fn observe_headers_forever( }; let agent = chainsync::HeaderConsumer::initial(known_points, observer); - let agent = run_agent(agent, &mut channel)?; - log::debug!("chainsync agent final state: {:?}", agent.state); + + match run_agent(agent, &mut channel) { + Ok(agent) => { + log::debug!("chainsync agent final state: {:?}", agent.state); + Ok(()) + } + Err(err) => Err(AttemptError::Recoverable(err.into())), + } +} + +#[derive(Debug)] +enum AttemptError { + Recoverable(Error), + Other(Error), +} + +fn do_handshake(channel: &mut StdChannel, magic: u64) -> Result<(), AttemptError> { + let versions = handshake::n2n::VersionTable::v6_and_above(magic); + + match run_agent(handshake::Initiator::initial(versions), channel) { + Ok(agent) => match agent.output { + handshake::Output::Accepted(_, _) => Ok(()), + _ => Err(AttemptError::Other( + "couldn't agree on handshake version".into(), + )), + }, + Err(err) => Err(AttemptError::Recoverable(err.into())), + } +} + +fn do_chainsync_attempt( + config: &super::Config, + utils: Arc, + output_tx: &StageSender, +) -> Result<(), AttemptError> { + let magic = match config.magic.as_ref() { + Some(m) => *m.deref(), + None => MAINNET_MAGIC, + }; + + let mut plexer = setup_multiplexer(&config.address.0, &config.address.1, &config.retry_policy) + .map_err(|x| AttemptError::Recoverable(x))?; + + let mut hs_channel = plexer.use_channel(0); + let mut cs_channel = plexer.use_channel(2); + let bf_channel = plexer.use_channel(3); + + plexer.muxer.spawn(); + plexer.demuxer.spawn(); + + do_handshake(&mut hs_channel, magic)?; + + let known_points = define_start_point( + &config.intersect, + #[allow(deprecated)] + &config.since, + &utils, + &mut cs_channel, + ) + .map_err(|err| AttemptError::Recoverable(err))?; + + log::info!("starting chain sync from: {:?}", &known_points); + + let writer = EventWriter::new(output_tx.clone(), utils, config.mapper.clone()); + + let (headers_tx, headers_rx) = std::sync::mpsc::sync_channel(100); + + let bf_writer = writer.clone(); + std::thread::spawn(move || { + fetch_blocks_forever(bf_channel, bf_writer, headers_rx).expect("blockfetch loop failed"); + + log::info!("block fetch thread ended"); + }); + + // this will block + observe_headers_forever( + cs_channel, + writer, + known_points, + headers_tx, + config.min_depth, + config.finalize.clone(), + )?; Ok(()) } + +pub fn do_chainsync( + config: &super::Config, + utils: Arc, + output_tx: StageSender, +) -> Result<(), Error> { + retry::retry_operation( + || match do_chainsync_attempt(config, utils.clone(), &output_tx) { + Ok(()) => Ok(()), + Err(AttemptError::Other(msg)) => { + log::error!("N2N error: {}", msg); + log::warn!("unrecoverable error performing chainsync, will exit"); + Ok(()) + } + Err(AttemptError::Recoverable(err)) => Err(err), + }, + &retry::Policy { + max_retries: config + .retry_policy + .as_ref() + .map(|x| x.chainsync_max_retries) + .unwrap_or(50), + backoff_unit: Duration::from_secs(1), + backoff_factor: 2, + max_backoff: config + .retry_policy + .as_ref() + .map(|x| x.chainsync_max_backoff as u64) + .map(Duration::from_secs) + .unwrap_or(Duration::from_secs(60)), + }, + ) +} diff --git a/src/sources/n2n/setup.rs b/src/sources/n2n/setup.rs index 34c95c73..a45f8612 100644 --- a/src/sources/n2n/setup.rs +++ b/src/sources/n2n/setup.rs @@ -1,28 +1,18 @@ -use std::ops::Deref; - -use log::info; - -use pallas::network::{ - miniprotocols::{handshake, run_agent, MAINNET_MAGIC}, - multiplexer::StdChannel, -}; - use serde::Deserialize; use crate::{ - mapper::{Config as MapperConfig, EventWriter}, + mapper::Config as MapperConfig, pipelining::{new_inter_stage_channel, PartialBootstrapResult, SourceProvider}, sources::{ common::{AddressArg, MagicArg, PointArg}, - define_start_point, setup_multiplexer, FinalizeConfig, IntersectArg, RetryPolicy, + FinalizeConfig, IntersectArg, RetryPolicy, }, utils::{ChainWellKnownInfo, WithUtils}, - Error, }; -use super::run::{fetch_blocks_forever, observe_headers_forever}; +use super::run::do_chainsync; -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone)] pub struct Config { pub address: AddressArg, @@ -55,80 +45,17 @@ pub struct Config { pub finalize: Option, } -fn do_handshake(channel: &mut StdChannel, magic: u64) -> Result<(), Error> { - let versions = handshake::n2n::VersionTable::v6_and_above(magic); - let agent = run_agent(handshake::Initiator::initial(versions), channel)?; - info!("handshake output: {:?}", agent.output); - - match agent.output { - handshake::Output::Accepted(_, _) => Ok(()), - _ => Err("couldn't agree on handshake version".into()), - } -} - impl SourceProvider for WithUtils { fn bootstrap(&self) -> PartialBootstrapResult { let (output_tx, output_rx) = new_inter_stage_channel(None); - let mut plexer = setup_multiplexer( - &self.inner.address.0, - &self.inner.address.1, - &self.inner.retry_policy, - )?; - - let mut hs_channel = plexer.use_channel(0); - let mut cs_channel = plexer.use_channel(2); - let bf_channel = plexer.use_channel(3); - - plexer.muxer.spawn(); - plexer.demuxer.spawn(); - - let magic = match &self.inner.magic { - Some(m) => *m.deref(), - None => MAINNET_MAGIC, - }; - - let writer = EventWriter::new(output_tx, self.utils.clone(), self.inner.mapper.clone()); - - do_handshake(&mut hs_channel, magic)?; - - let known_points = define_start_point( - &self.inner.intersect, - #[allow(deprecated)] - &self.inner.since, - &self.utils, - &mut cs_channel, - )?; - - info!("starting chain sync from: {:?}", &known_points); - - let (headers_tx, headers_rx) = std::sync::mpsc::sync_channel(100); - - let min_depth = self.inner.min_depth; - let cs_writer = writer.clone(); - let finalize = self.inner.finalize.clone(); - let _cs_handle = std::thread::spawn(move || { - observe_headers_forever( - cs_channel, - cs_writer, - known_points, - headers_tx, - min_depth, - finalize, - ) - .expect("chainsync loop failed"); - - log::info!("observe headers thread ended"); - }); - - let bf_writer = writer; - let bf_handle = std::thread::spawn(move || { - fetch_blocks_forever(bf_channel, bf_writer, headers_rx) - .expect("blockfetch loop failed"); - - log::info!("block fetch thread ended"); + let config = self.inner.clone(); + let utils = self.utils.clone(); + let handle = std::thread::spawn(move || { + do_chainsync(&config, utils, output_tx) + .expect("chainsync fails after applying max retry policy") }); - Ok((bf_handle, output_rx)) + Ok((handle, output_rx)) } } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 0476e37b..f33b37da 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -172,6 +172,7 @@ impl Utils { /// and a reference to the shared utilities singleton. This is a quality-of-life /// artifact to wrap other structs (usually configs) and attach the utilities /// singleton entrypoint. +#[derive(Clone)] pub struct WithUtils { pub utils: Arc, pub inner: C,