From e11ce2362c59eacf33d6af3b1dadb185994e5e27 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Thu, 24 Feb 2022 10:05:06 -0300 Subject: [PATCH 1/4] feat: Introduce 'intersect' argument --- src/bin/oura/dump.rs | 14 ++++---- src/bin/oura/watch.rs | 14 ++++---- src/sources/common.rs | 76 ++++++++++++++++++++++++++++++---------- src/sources/n2c/run.rs | 4 +-- src/sources/n2c/setup.rs | 16 +++++---- src/sources/n2n/run.rs | 4 +-- src/sources/n2n/setup.rs | 15 ++++---- 7 files changed, 93 insertions(+), 50 deletions(-) diff --git a/src/bin/oura/dump.rs b/src/bin/oura/dump.rs index 3f87d018..ad249ca8 100644 --- a/src/bin/oura/dump.rs +++ b/src/bin/oura/dump.rs @@ -6,7 +6,7 @@ use clap::ArgMatches; use oura::{ mapper::Config as MapperConfig, pipelining::{BootstrapResult, SinkProvider, SourceProvider, StageReceiver}, - sources::{AddressArg, BearerKind, MagicArg}, + sources::{AddressArg, BearerKind, IntersectArg, MagicArg}, utils::{ChainWellKnownInfo, Utils, WithUtils}, }; @@ -77,8 +77,8 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { false => MagicArg::default(), }; - let since = match args.is_present("since") { - true => Some(args.value_of_t("since")?), + let intersect = match args.is_present("since") { + true => Some(IntersectArg::Point(args.value_of_t("since")?)), false => None, }; @@ -116,8 +116,8 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { well_known: None, min_depth: 0, mapper, - since, - intersections: None, + since: None, + intersect, }), PeerMode::AsClient => DumpSource::N2C(N2CConfig { address: AddressArg(bearer, socket), @@ -125,8 +125,8 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { well_known: None, min_depth: 0, mapper, - since, - intersections: None, + since: None, + intersect, }), }; diff --git a/src/bin/oura/watch.rs b/src/bin/oura/watch.rs index c55b2ceb..6fd88f42 100644 --- a/src/bin/oura/watch.rs +++ b/src/bin/oura/watch.rs @@ -4,7 +4,7 @@ use clap::ArgMatches; use oura::{ mapper::Config as MapperConfig, pipelining::{SinkProvider, SourceProvider}, - sources::{AddressArg, BearerKind, MagicArg}, + sources::{AddressArg, BearerKind, IntersectArg, MagicArg}, utils::{ChainWellKnownInfo, Utils, WithUtils}, }; @@ -58,8 +58,8 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { false => MagicArg::default(), }; - let since = match args.is_present("since") { - true => Some(args.value_of_t("since")?), + let intersect = match args.is_present("since") { + true => Some(IntersectArg::Point(args.value_of_t("since")?)), false => None, }; @@ -94,8 +94,8 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { well_known: None, min_depth: 0, mapper, - since, - intersections: None, + since: None, + intersect, }), PeerMode::AsClient => WatchSource::N2C(N2CConfig { address: AddressArg(bearer, socket), @@ -103,8 +103,8 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { well_known: None, min_depth: 0, mapper, - since, - intersections: None, + since: None, + intersect, }), }; diff --git a/src/sources/common.rs b/src/sources/common.rs index c556e808..ae07bdce 100644 --- a/src/sources/common.rs +++ b/src/sources/common.rs @@ -141,6 +141,15 @@ where deserializer.deserialize_any(MagicArgVisitor) } +#[derive(Debug, Deserialize)] +#[serde(tag = "type", content = "value")] +pub enum IntersectArg { + Tip, + Origin, + Point(PointArg), + Fallbacks(Vec), +} + pub(crate) fn find_end_of_chain( channel: &mut Channel, well_known: &ChainWellKnownInfo, @@ -161,30 +170,61 @@ pub(crate) fn find_end_of_chain( } pub(crate) fn define_start_point( + intersect: &Option, since: &Option, - intersections: &Option>, utils: &Utils, cs_channel: &mut Channel, -) -> Result, Error> { +) -> Result>, Error> { let cursor = utils.get_cursor_if_any(); - match (cursor, since, intersections) { - (Some(cursor), _, _) => { + match cursor { + Some(cursor) => { log::info!("found persisted cursor, will use as starting point"); - Ok(vec![cursor.try_into()?]) - } - (None, Some(arg), _) => { - log::info!("explicit 'since' argument, will use as starting point"); - Ok(vec![arg.clone().try_into()?]) - } - (None, None, Some(args)) => { - log::info!("intersections argument, will use as starting point"); - args.iter().map(|x| x.clone().try_into()).collect() - } - _ => { - log::info!("no starting point specified, will use tip of chain"); - let point = find_end_of_chain(cs_channel, &utils.well_known)?; - Ok(vec![point]) + let points = vec![cursor.try_into()?]; + + Ok(Some(points)) } + None => match intersect { + Some(IntersectArg::Fallbacks(x)) => { + log::info!("found 'fallbacks' intersect argument, will use as starting point"); + let points: Result, _> = x.iter().map(|x| x.clone().try_into()).collect(); + + Ok(Some(points?)) + } + Some(IntersectArg::Origin) => { + log::info!("found 'origin' instersect argument, will use as starting point"); + + Ok(None) + } + Some(IntersectArg::Point(x)) => { + log::info!("found 'point' intersect argument, will use as starting point"); + let points = vec![x.clone().try_into()?]; + + Ok(Some(points)) + } + Some(IntersectArg::Tip) => { + log::info!("found 'tip' intersect argument, will use as starting point"); + let tip = find_end_of_chain(cs_channel, &utils.well_known)?; + let points = vec![tip]; + + Ok(Some(points)) + } + None => match since { + Some(x) => { + log::info!("explicit 'since' argument, will use as starting point"); + log::warn!("`since` value is deprecated, please use `intersect`"); + let points = vec![x.clone().try_into()?]; + + Ok(Some(points)) + } + None => { + log::info!("no starting point specified, will use tip of chain"); + let tip = find_end_of_chain(cs_channel, &utils.well_known)?; + let points = vec![tip]; + + Ok(Some(points)) + } + }, + }, } } diff --git a/src/sources/n2c/run.rs b/src/sources/n2c/run.rs index 5e244db2..ad8b4fb5 100644 --- a/src/sources/n2c/run.rs +++ b/src/sources/n2c/run.rs @@ -109,7 +109,7 @@ impl chainsync::Observer for ChainObserver { pub(crate) fn observe_forever( mut channel: Channel, event_writer: EventWriter, - from: Vec, + known_points: Option>, min_depth: usize, ) -> Result<(), Error> { let observer = ChainObserver { @@ -119,7 +119,7 @@ pub(crate) fn observe_forever( event_writer, }; - let agent = chainsync::BlockConsumer::initial(Some(from), observer); + let agent = chainsync::BlockConsumer::initial(known_points, observer); let agent = run_agent(agent, &mut channel)?; log::warn!("chainsync agent final state: {:?}", agent.state); diff --git a/src/sources/n2c/setup.rs b/src/sources/n2c/setup.rs index 38e3cc12..0a50f2ad 100644 --- a/src/sources/n2c/setup.rs +++ b/src/sources/n2c/setup.rs @@ -7,7 +7,7 @@ use net2::TcpStreamExt; use log::info; use pallas::network::{ - miniprotocols::{handshake::n2c, run_agent, Point, MAINNET_MAGIC}, + miniprotocols::{handshake::n2c, run_agent, MAINNET_MAGIC}, multiplexer::{Channel, Multiplexer}, }; @@ -18,7 +18,7 @@ use crate::{ pipelining::{new_inter_stage_channel, PartialBootstrapResult, SourceProvider}, sources::{ common::{AddressArg, BearerKind, MagicArg, PointArg}, - define_start_point, + define_start_point, IntersectArg, }, utils::{ChainWellKnownInfo, WithUtils}, Error, @@ -33,9 +33,10 @@ pub struct Config { #[serde(deserialize_with = "crate::sources::common::deserialize_magic_arg")] pub magic: Option, + #[deprecated(note = "use intersect value instead")] pub since: Option, - pub intersections: Option>, + pub intersect: Option, #[deprecated(note = "chain info is now pipeline-wide, use utils")] pub well_known: Option, @@ -102,18 +103,19 @@ impl SourceProvider for WithUtils { let mut cs_channel = muxer.use_channel(5); - let since: Vec = define_start_point( + let known_points = define_start_point( + &self.inner.intersect, &self.inner.since, - &self.inner.intersections, &self.utils, &mut cs_channel, )?; - info!("starting from chain point: {:?}", &since); + info!("starting chain sync from: {:?}", &known_points); let min_depth = self.inner.min_depth; let handle = std::thread::spawn(move || { - observe_forever(cs_channel, writer, since, min_depth).expect("chainsync loop failed"); + observe_forever(cs_channel, writer, known_points, min_depth) + .expect("chainsync loop failed"); }); Ok((handle, output_rx)) diff --git a/src/sources/n2n/run.rs b/src/sources/n2n/run.rs index 75873b50..03afe531 100644 --- a/src/sources/n2n/run.rs +++ b/src/sources/n2n/run.rs @@ -138,7 +138,7 @@ pub(crate) fn fetch_blocks_forever( pub(crate) fn observe_headers_forever( mut channel: Channel, event_writer: EventWriter, - from: Vec, + known_points: Option>, block_requests: SyncSender, min_depth: usize, ) -> Result<(), Error> { @@ -149,7 +149,7 @@ pub(crate) fn observe_headers_forever( block_requests, }; - let agent = chainsync::HeaderConsumer::initial(Some(from), observer); + let agent = chainsync::HeaderConsumer::initial(known_points, observer); let agent = run_agent(agent, &mut channel)?; log::warn!("chainsync agent final state: {:?}", agent.state); diff --git a/src/sources/n2n/setup.rs b/src/sources/n2n/setup.rs index d7b9ff33..c849cad9 100644 --- a/src/sources/n2n/setup.rs +++ b/src/sources/n2n/setup.rs @@ -7,7 +7,7 @@ use net2::TcpStreamExt; use log::info; use pallas::network::{ - miniprotocols::{handshake::n2n, run_agent, Point, MAINNET_MAGIC}, + miniprotocols::{handshake::n2n, run_agent, MAINNET_MAGIC}, multiplexer::{Channel, Multiplexer}, }; @@ -18,7 +18,7 @@ use crate::{ pipelining::{new_inter_stage_channel, PartialBootstrapResult, SourceProvider}, sources::{ common::{AddressArg, BearerKind, MagicArg, PointArg}, - define_start_point, + define_start_point, IntersectArg, }, utils::{ChainWellKnownInfo, WithUtils}, Error, @@ -33,9 +33,10 @@ pub struct Config { #[serde(deserialize_with = "crate::sources::common::deserialize_magic_arg")] pub magic: Option, + #[deprecated(note = "use intersect value instead")] pub since: Option, - pub intersections: Option>, + pub intersect: Option, #[deprecated(note = "chain info is now pipeline-wide, use utils")] pub well_known: Option, @@ -102,21 +103,21 @@ impl SourceProvider for WithUtils { let mut cs_channel = muxer.use_channel(2); - let since: Vec = define_start_point( + let known_points = define_start_point( + &self.inner.intersect, &self.inner.since, - &self.inner.intersections, &self.utils, &mut cs_channel, )?; - info!("starting from chain point: {:?}", &since); + info!("starting chain sync from: {:?}", &known_points); let (headers_tx, headers_rx) = std::sync::mpsc::sync_channel(100); let min_depth = self.inner.min_depth; let cs_writer = writer.clone(); let cs_handle = std::thread::spawn(move || { - observe_headers_forever(cs_channel, cs_writer, since, headers_tx, min_depth) + observe_headers_forever(cs_channel, cs_writer, known_points, headers_tx, min_depth) .expect("chainsync loop failed"); }); From 6b27b35aa03edecdc417257b2ae772eedf3b9082 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Thu, 24 Feb 2022 15:30:10 -0300 Subject: [PATCH 2/4] fix: Handle genesis block correctly --- Cargo.lock | 20 ++++++++++---------- Cargo.toml | 2 +- src/sources/n2c/blocks.rs | 6 ++++++ src/sources/n2n/run.rs | 7 +++++++ 4 files changed, 24 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index da54512a..f71d9b3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1101,9 +1101,9 @@ dependencies = [ [[package]] name = "pallas" -version = "0.5.0-alpha.5" +version = "0.5.0-beta.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b816ee53329d5570aff7f2c48938ee42b32ac834b9012db523fe09a759c298db" +checksum = "174e34b47129161d48d14fdeafe8ebf8a31ef428a29f105f211cc9dc293288ad" dependencies = [ "pallas-crypto", "pallas-miniprotocols", @@ -1113,9 +1113,9 @@ dependencies = [ [[package]] name = "pallas-crypto" -version = "0.5.0-alpha.5" +version = "0.5.0-beta.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3a447560df9f123088e3f646b86ed07dd312d24bb2a94c892587e1c5381006b" +checksum = "e833b3405a1a4b61d630166f7b7f0767a23561ddb447ca97f35eb7d7f4a1deb7" dependencies = [ "cryptoxide", "hex", @@ -1126,9 +1126,9 @@ dependencies = [ [[package]] name = "pallas-miniprotocols" -version = "0.5.0-alpha.5" +version = "0.5.0-beta.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "660af7ad65741e2f62392a41edec81880293f189bce436b3e55f6df870457993" +checksum = "764722842fa893bfa8037113d883ab9430753a44cab54fd98602caa757417e62" dependencies = [ "hex", "itertools", @@ -1140,9 +1140,9 @@ dependencies = [ [[package]] name = "pallas-multiplexer" -version = "0.5.0-alpha.5" +version = "0.5.0-beta.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c0bb02bdf83153b32ce41048ca0525d2259763d28db88d45d108ac57107a84b" +checksum = "0496df45e294846b9e4d8f64fd88ba06378e9c8f631e6bfe9f45a18b310dd737" dependencies = [ "byteorder 1.4.3", "hex", @@ -1151,9 +1151,9 @@ dependencies = [ [[package]] name = "pallas-primitives" -version = "0.5.0-alpha.5" +version = "0.5.0-beta.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9cfa98d17f734e5127a684a52053f8245aa12cb3ac76a28973a849456b0e903" +checksum = "fc631fa3dba356aef4159e6665843eec0fd113ae52b3096bb9dc61a636a98c6e" dependencies = [ "base58", "hex", diff --git a/Cargo.toml b/Cargo.toml index 0c4ccb52..2aa4356b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ authors = ["Santiago Carmuega "] [dependencies] -pallas = "0.5.0-alpha.5" +pallas = "0.5.0-beta.0" # pallas = { path = "../pallas/pallas" } hex = "0.4.3" net2 = "0.2.37" diff --git a/src/sources/n2c/blocks.rs b/src/sources/n2c/blocks.rs index a35cf013..1803e7ed 100644 --- a/src/sources/n2c/blocks.rs +++ b/src/sources/n2c/blocks.rs @@ -30,6 +30,12 @@ impl TryFrom for MultiEraBlock { Ok(MultiEraBlock::AlonzoCompatible(Box::new(block), era)) } }, + // TODO: we're assuming that the genesis block is Byron-compatible. Is this a safe + // assumption? + probing::Outcome::GenesisBlock => { + let block = minicbor::decode(bytes)?; + Ok(MultiEraBlock::Byron(Box::new(block))) + } probing::Outcome::Inconclusive => { log::error!("CBOR hex for debubbing: {}", hex::encode(bytes)); Err("can't infer primitive block from cbor, inconslusive probing".into()) diff --git a/src/sources/n2n/run.rs b/src/sources/n2n/run.rs index 03afe531..092c3573 100644 --- a/src/sources/n2n/run.rs +++ b/src/sources/n2n/run.rs @@ -39,6 +39,13 @@ impl blockfetch::Observer for Block2EventMapper { .ok_or_warn("error crawling block for events"); } }, + // TODO: we're assuming that the genesis block is Byron-compatible. Is this a safe + // assumption? + probing::Outcome::GenesisBlock => { + writer + .crawl_from_byron_cbor(&body) + .ok_or_warn("error crawling block for events"); + } probing::Outcome::Inconclusive => { log::error!("can't infer primitive block from cbor, inconslusive probing. CBOR hex for debubbing: {}", hex::encode(body)); } From 27938fafb7e7b59729eaefbc953534230b5b5f44 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Thu, 24 Feb 2022 19:13:51 -0300 Subject: [PATCH 3/4] Update to Pallas v0.5.0 --- Cargo.lock | 20 ++++++++++---------- Cargo.toml | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f71d9b3c..339370e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1101,9 +1101,9 @@ dependencies = [ [[package]] name = "pallas" -version = "0.5.0-beta.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "174e34b47129161d48d14fdeafe8ebf8a31ef428a29f105f211cc9dc293288ad" +checksum = "05851f9052fa9f8cb82f17eca09731ee06fd7e71ced4410c1ba34fb1188f08fc" dependencies = [ "pallas-crypto", "pallas-miniprotocols", @@ -1113,9 +1113,9 @@ dependencies = [ [[package]] name = "pallas-crypto" -version = "0.5.0-beta.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e833b3405a1a4b61d630166f7b7f0767a23561ddb447ca97f35eb7d7f4a1deb7" +checksum = "26b3d6c207cdaf012a1216dade8e0dcd4fd3c2a1ac67502a317b3d515183b087" dependencies = [ "cryptoxide", "hex", @@ -1126,9 +1126,9 @@ dependencies = [ [[package]] name = "pallas-miniprotocols" -version = "0.5.0-beta.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "764722842fa893bfa8037113d883ab9430753a44cab54fd98602caa757417e62" +checksum = "ffb9c03313216a8d2df8aa9d028213c4b5ea73751b4bcb202989657927a38db5" dependencies = [ "hex", "itertools", @@ -1140,9 +1140,9 @@ dependencies = [ [[package]] name = "pallas-multiplexer" -version = "0.5.0-beta.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0496df45e294846b9e4d8f64fd88ba06378e9c8f631e6bfe9f45a18b310dd737" +checksum = "4c2e716075627be053d360049c715e204567721612a0220b2e52c015b8270579" dependencies = [ "byteorder 1.4.3", "hex", @@ -1151,9 +1151,9 @@ dependencies = [ [[package]] name = "pallas-primitives" -version = "0.5.0-beta.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc631fa3dba356aef4159e6665843eec0fd113ae52b3096bb9dc61a636a98c6e" +checksum = "27d58c43453f292c1ee81b8969ba4d560b7e6b29eeb40f603613fabb7404b0d3" dependencies = [ "base58", "hex", diff --git a/Cargo.toml b/Cargo.toml index 2aa4356b..a8e53280 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ authors = ["Santiago Carmuega "] [dependencies] -pallas = "0.5.0-beta.0" +pallas = "0.5.0" # pallas = { path = "../pallas/pallas" } hex = "0.4.3" net2 = "0.2.37" From 79bb0392d819127f038c2a42400c9689f90acf8a Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Thu, 24 Feb 2022 19:13:57 -0300 Subject: [PATCH 4/4] Fix clippy warnings --- src/sources/n2c/setup.rs | 1 + src/sources/n2n/setup.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/src/sources/n2c/setup.rs b/src/sources/n2c/setup.rs index 0a50f2ad..ee5e5368 100644 --- a/src/sources/n2c/setup.rs +++ b/src/sources/n2c/setup.rs @@ -105,6 +105,7 @@ impl SourceProvider for WithUtils { let known_points = define_start_point( &self.inner.intersect, + #[allow(deprecated)] &self.inner.since, &self.utils, &mut cs_channel, diff --git a/src/sources/n2n/setup.rs b/src/sources/n2n/setup.rs index c849cad9..fe228144 100644 --- a/src/sources/n2n/setup.rs +++ b/src/sources/n2n/setup.rs @@ -105,6 +105,7 @@ impl SourceProvider for WithUtils { let known_points = define_start_point( &self.inner.intersect, + #[allow(deprecated)] &self.inner.since, &self.utils, &mut cs_channel,