From d1ae825872640a06d6918b99bb27344568d7f98e Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sun, 19 Dec 2021 20:25:04 -0300 Subject: [PATCH 1/3] feat: compute slot timestamp --- src/bin/oura/watch.rs | 2 ++ src/framework.rs | 41 +++++++++++++++++++++++++++++++++++- src/mapping.rs | 1 + src/sinks/terminal/format.rs | 6 ++++-- src/sources/common.rs | 21 ++++-------------- src/sources/n2c/mod.rs | 11 +++++++--- src/sources/n2c/setup.rs | 18 ++++++++++------ src/sources/n2n/mod.rs | 8 ++++--- src/sources/n2n/setup.rs | 22 +++++++++++++------ 9 files changed, 90 insertions(+), 40 deletions(-) diff --git a/src/bin/oura/watch.rs b/src/bin/oura/watch.rs index 96bd0381..e85210b2 100644 --- a/src/bin/oura/watch.rs +++ b/src/bin/oura/watch.rs @@ -66,10 +66,12 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { PeerMode::AsNode => WatchSource::N2N(N2NConfig { address: AddressArg(bearer, socket), magic, + well_known: None, }), PeerMode::AsClient => WatchSource::N2C(N2CConfig { address: AddressArg(bearer, socket), magic, + well_known: None, }), }; diff --git a/src/framework.rs b/src/framework.rs index ff24c84b..21f465c4 100644 --- a/src/framework.rs +++ b/src/framework.rs @@ -6,14 +6,43 @@ use std::{ use merge::Merge; +use pallas::ouroboros::network::handshake::{MAINNET_MAGIC, TESTNET_MAGIC}; use serde_derive::{Deserialize, Serialize}; pub type Error = Box; +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ChainWellKnownInfo { + pub shelley_known_slot: u64, + pub shelley_known_hash: String, + pub shelley_known_time: u64, +} + +impl ChainWellKnownInfo { + pub fn try_from_magic(magic: u64) -> Result { + match magic { + MAINNET_MAGIC => Ok(ChainWellKnownInfo { + shelley_known_slot: 4492799, + shelley_known_hash: + "f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457".to_string(), + shelley_known_time: 1596059071, + }), + TESTNET_MAGIC => Ok(ChainWellKnownInfo { + shelley_known_slot: 1598399, + shelley_known_hash: + "7e16781b40ebf8b6da18f7b5e8ade855d6738095ef2f1c58c77e88b6e45997a4".to_string(), + shelley_known_time: 1506203091, + }), + _ => Err("can't infer well-known chain infro from specified magic".into()), + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, Merge, Default)] pub struct EventContext { pub block_number: Option, pub slot: Option, + pub timestamp: Option, pub tx_idx: Option, pub tx_hash: Option, pub input_idx: Option, @@ -133,13 +162,15 @@ pub trait SinkConfig { pub struct EventWriter { context: EventContext, output: Sender, + chain_info: Option, } impl EventWriter { - pub fn new(output: Sender) -> Self { + pub fn new(output: Sender, chain_info: Option) -> Self { EventWriter { context: EventContext::default(), output, + chain_info, } } @@ -160,6 +191,14 @@ impl EventWriter { EventWriter { context: extra_context, output: self.output.clone(), + chain_info: self.chain_info.clone(), + } + } + + pub fn compute_timestamp(&self, slot: u64) -> Option { + match &self.chain_info { + None => None, + Some(info) => Some(info.shelley_known_time + (slot - info.shelley_known_slot)), } } } diff --git a/src/mapping.rs b/src/mapping.rs index a948baa2..65ee8ca8 100644 --- a/src/mapping.rs +++ b/src/mapping.rs @@ -342,6 +342,7 @@ impl EventSource for Block { let writer = writer.child_writer(EventContext { block_number: Some(self.header.header_body.block_number), slot: Some(self.header.header_body.slot), + timestamp: writer.compute_timestamp(self.header.header_body.slot), ..EventContext::default() }); diff --git a/src/sinks/terminal/format.rs b/src/sinks/terminal/format.rs index 61702924..77f58129 100644 --- a/src/sinks/terminal/format.rs +++ b/src/sinks/terminal/format.rs @@ -22,8 +22,10 @@ impl LogLine { prefix: "BLOCK", color: Color::Magenta, content: format!( - "{{ body size: {}, issues vkey: {} }}", - body_size, issuer_vkey + "{{ body size: {}, issues vkey: {}, timestamp: {} }}", + body_size, + issuer_vkey, + source.context.timestamp.unwrap_or_default(), ), source, max_width, diff --git a/src/sources/common.rs b/src/sources/common.rs index 9909afbc..3b122c32 100644 --- a/src/sources/common.rs +++ b/src/sources/common.rs @@ -11,7 +11,7 @@ use pallas::ouroboros::network::{ use serde::{de::Visitor, Deserializer}; use serde_derive::Deserialize; -use crate::framework::Error; +use crate::framework::ChainWellKnownInfo; #[derive(Debug, Deserialize)] pub enum BearerKind { @@ -98,25 +98,12 @@ where deserializer.deserialize_any(MagicArgVisitor) } -pub fn get_wellknonwn_chain_point(magic: u64) -> Result { - match magic { - MAINNET_MAGIC => Ok(Point( - 4492799, - hex::decode("f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457")?, - )), - TESTNET_MAGIC => Ok(Point( - 1598399, - hex::decode("7e16781b40ebf8b6da18f7b5e8ade855d6738095ef2f1c58c77e88b6e45997a4")?, - )), - _ => Err("don't have a well-known chain point for the requested magic".into()), - } -} - pub fn find_end_of_chain( channel: &mut Channel, - wellknown_point: Point, + well_known: &ChainWellKnownInfo, ) -> Result { - let agent = TipFinder::initial(wellknown_point); + let point = Point(well_known.shelley_known_slot, hex::decode(&well_known.shelley_known_hash)?); + let agent = TipFinder::initial(point); let agent = run_agent(agent, channel)?; info!("chain point query output: {:?}", agent.output); diff --git a/src/sources/n2c/mod.rs b/src/sources/n2c/mod.rs index 8b4b1072..21b742e3 100644 --- a/src/sources/n2c/mod.rs +++ b/src/sources/n2c/mod.rs @@ -19,7 +19,7 @@ use pallas::{ use std::sync::mpsc::Sender; use crate::{ - framework::{Error, Event, EventData, EventSource, EventWriter}, + framework::{ChainWellKnownInfo, Error, Event, EventData, EventSource, EventWriter}, mapping::ToHex, }; @@ -81,8 +81,13 @@ impl Observer for ChainObserver { } } -fn observe_forever(mut channel: Channel, from: Point, output: Sender) -> Result<(), Error> { - let writer = EventWriter::new(output); +fn observe_forever( + mut channel: Channel, + chain_info: ChainWellKnownInfo, + from: Point, + output: Sender, +) -> Result<(), Error> { + let writer = EventWriter::new(output, Some(chain_info)); let observer = ChainObserver(writer); let agent = Consumer::::initial(vec![from], observer); let agent = run_agent(agent, &mut channel)?; diff --git a/src/sources/n2c/setup.rs b/src/sources/n2c/setup.rs index 89f60231..a21398db 100644 --- a/src/sources/n2c/setup.rs +++ b/src/sources/n2c/setup.rs @@ -13,10 +13,8 @@ use pallas::ouroboros::network::{ use serde_derive::Deserialize; use crate::{ - framework::{BootstrapResult, Error, Event, SourceConfig}, - sources::common::{ - find_end_of_chain, get_wellknonwn_chain_point, AddressArg, BearerKind, MagicArg, - }, + framework::{BootstrapResult, ChainWellKnownInfo, Error, Event, SourceConfig}, + sources::common::{find_end_of_chain, AddressArg, BearerKind, MagicArg}, }; use super::observe_forever; @@ -27,6 +25,8 @@ pub struct Config { #[serde(deserialize_with = "crate::sources::common::deserialize_magic_arg")] pub magic: Option, + + pub well_known: Option, } fn do_handshake(channel: &mut Channel, magic: u64) -> Result<(), Error> { @@ -66,18 +66,22 @@ impl SourceConfig for Config { None => MAINNET_MAGIC, }; + let well_known = match &self.well_known { + Some(info) => info.clone(), + None => ChainWellKnownInfo::try_from_magic(magic)?, + }; + let mut hs_channel = muxer.use_channel(0); do_handshake(&mut hs_channel, magic)?; let mut cs_channel = muxer.use_channel(5); - let wellknown_point = get_wellknonwn_chain_point(magic)?; - let node_tip = find_end_of_chain(&mut cs_channel, wellknown_point)?; + let node_tip = find_end_of_chain(&mut cs_channel, &well_known)?; info!("node tip: {:?}", &node_tip); let handle = std::thread::spawn(move || { - observe_forever(cs_channel, node_tip, output).expect("chainsync loop failed"); + observe_forever(cs_channel, well_known, node_tip, output).expect("chainsync loop failed"); }); Ok(handle) diff --git a/src/sources/n2n/mod.rs b/src/sources/n2n/mod.rs index 768f6922..5b83c320 100644 --- a/src/sources/n2n/mod.rs +++ b/src/sources/n2n/mod.rs @@ -21,7 +21,7 @@ use pallas::{ use std::sync::mpsc::{Receiver, Sender}; use crate::{ - framework::{Error, Event, EventData, EventSource, EventWriter}, + framework::{ChainWellKnownInfo, Error, Event, EventData, EventSource, EventWriter}, mapping::ToHex, }; @@ -107,10 +107,11 @@ impl Observer for ChainObserver { fn fetch_blocks_forever( mut channel: Channel, + chain_info: ChainWellKnownInfo, input: Receiver, output: Sender, ) -> Result<(), Error> { - let writer = EventWriter::new(output); + let writer = EventWriter::new(output, Some(chain_info)); let observer = Block2EventMapper(writer); let agent = BlockClient::initial(input, observer); let agent = run_agent(agent, &mut channel)?; @@ -121,11 +122,12 @@ fn fetch_blocks_forever( fn observe_headers_forever( mut channel: Channel, + chain_info: ChainWellKnownInfo, from: Point, event_output: Sender, block_requests: Sender, ) -> Result<(), Error> { - let event_writer = EventWriter::new(event_output); + let event_writer = EventWriter::new(event_output, Some(chain_info)); let observer = ChainObserver { event_writer, block_requests, diff --git a/src/sources/n2n/setup.rs b/src/sources/n2n/setup.rs index 46f37d77..cd775b1f 100644 --- a/src/sources/n2n/setup.rs +++ b/src/sources/n2n/setup.rs @@ -13,9 +13,9 @@ use pallas::ouroboros::network::{ use serde_derive::Deserialize; use crate::{ - framework::{BootstrapResult, Error, Event, SourceConfig}, + framework::{BootstrapResult, ChainWellKnownInfo, Error, Event, SourceConfig}, sources::{ - common::{find_end_of_chain, get_wellknonwn_chain_point, AddressArg, BearerKind, MagicArg}, + common::{find_end_of_chain, AddressArg, BearerKind, MagicArg}, n2n::{fetch_blocks_forever, observe_headers_forever}, }, }; @@ -26,6 +26,8 @@ pub struct Config { #[serde(deserialize_with = "crate::sources::common::deserialize_magic_arg")] pub magic: Option, + + pub well_known: Option, } fn do_handshake(channel: &mut Channel, magic: u64) -> Result<(), Error> { @@ -65,28 +67,34 @@ impl SourceConfig for Config { None => MAINNET_MAGIC, }; + let well_known = match &self.well_known { + Some(info) => info.clone(), + None => ChainWellKnownInfo::try_from_magic(magic)?, + }; + let mut hs_channel = muxer.use_channel(0); do_handshake(&mut hs_channel, magic)?; let mut cs_channel = muxer.use_channel(2); - let wellknown_point = get_wellknonwn_chain_point(magic)?; - let node_tip = find_end_of_chain(&mut cs_channel, wellknown_point)?; + let node_tip = find_end_of_chain(&mut cs_channel, &well_known)?; info!("node tip: {:?}", &node_tip); let (headers_tx, headers_rx) = std::sync::mpsc::channel(); let cs_events = output.clone(); + let cs_chain_info = well_known.clone(); let cs_handle = std::thread::spawn(move || { - observe_headers_forever(cs_channel, node_tip, cs_events, headers_tx) + observe_headers_forever(cs_channel, cs_chain_info, node_tip, cs_events, headers_tx) .expect("chainsync loop failed"); }); let bf_channel = muxer.use_channel(3); - + let bf_chain_info = well_known.clone(); let _bf_handle = std::thread::spawn(move || { - fetch_blocks_forever(bf_channel, headers_rx, output).expect("blockfetch loop failed"); + fetch_blocks_forever(bf_channel, bf_chain_info, headers_rx, output) + .expect("blockfetch loop failed"); }); Ok(cs_handle) From bc21f3a20431082b1b25f72547e19c0e96520355 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sun, 19 Dec 2021 20:28:36 -0300 Subject: [PATCH 2/3] fix: testnet well-known timestamp --- src/framework.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/framework.rs b/src/framework.rs index 21f465c4..3bcca06f 100644 --- a/src/framework.rs +++ b/src/framework.rs @@ -31,7 +31,7 @@ impl ChainWellKnownInfo { shelley_known_slot: 1598399, shelley_known_hash: "7e16781b40ebf8b6da18f7b5e8ade855d6738095ef2f1c58c77e88b6e45997a4".to_string(), - shelley_known_time: 1506203091, + shelley_known_time: 1595967596, }), _ => Err("can't infer well-known chain infro from specified magic".into()), } From 89ba43176b60f300740d85d3a86205fce32da223 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sun, 19 Dec 2021 20:34:42 -0300 Subject: [PATCH 3/3] style: apply fmt and clippy --- src/bin/oura/daemon.rs | 4 ++-- src/framework.rs | 7 +++---- src/sinks/elastic/run.rs | 5 ++++- src/sources/common.rs | 5 ++++- src/sources/n2c/setup.rs | 3 ++- 5 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/bin/oura/daemon.rs b/src/bin/oura/daemon.rs index 01187b04..02cefb55 100644 --- a/src/bin/oura/daemon.rs +++ b/src/bin/oura/daemon.rs @@ -40,7 +40,7 @@ enum Sink { #[cfg(feature = "kafkasink")] Kafka(KafkaConfig), - + #[cfg(feature = "elasticsink")] Elastic(ElasticConfig), } @@ -52,7 +52,7 @@ impl SinkConfig for Sink { #[cfg(feature = "kafkasink")] Sink::Kafka(c) => c.bootstrap(input), - + #[cfg(feature = "elasticsink")] Sink::Elastic(c) => c.bootstrap(input), } diff --git a/src/framework.rs b/src/framework.rs index 3bcca06f..c40c198a 100644 --- a/src/framework.rs +++ b/src/framework.rs @@ -196,10 +196,9 @@ impl EventWriter { } pub fn compute_timestamp(&self, slot: u64) -> Option { - match &self.chain_info { - None => None, - Some(info) => Some(info.shelley_known_time + (slot - info.shelley_known_slot)), - } + self.chain_info + .as_ref() + .map(|info| info.shelley_known_time + (slot - info.shelley_known_slot)) } } diff --git a/src/sinks/elastic/run.rs b/src/sinks/elastic/run.rs index a26f09e0..33a04574 100644 --- a/src/sinks/elastic/run.rs +++ b/src/sinks/elastic/run.rs @@ -16,7 +16,10 @@ async fn index_event(client: Arc, index: &str, evt: Event) -> Res if response.status_code().is_success() { debug!("pushed event to elastic"); } else { - error!("error pushing event to elastic: {:?}", response.text().await); + error!( + "error pushing event to elastic: {:?}", + response.text().await + ); } Ok(()) diff --git a/src/sources/common.rs b/src/sources/common.rs index 3b122c32..59602057 100644 --- a/src/sources/common.rs +++ b/src/sources/common.rs @@ -102,7 +102,10 @@ pub fn find_end_of_chain( channel: &mut Channel, well_known: &ChainWellKnownInfo, ) -> Result { - let point = Point(well_known.shelley_known_slot, hex::decode(&well_known.shelley_known_hash)?); + let point = Point( + well_known.shelley_known_slot, + hex::decode(&well_known.shelley_known_hash)?, + ); let agent = TipFinder::initial(point); let agent = run_agent(agent, channel)?; info!("chain point query output: {:?}", agent.output); diff --git a/src/sources/n2c/setup.rs b/src/sources/n2c/setup.rs index a21398db..c3d647e1 100644 --- a/src/sources/n2c/setup.rs +++ b/src/sources/n2c/setup.rs @@ -81,7 +81,8 @@ impl SourceConfig for Config { info!("node tip: {:?}", &node_tip); let handle = std::thread::spawn(move || { - observe_forever(cs_channel, well_known, node_tip, output).expect("chainsync loop failed"); + observe_forever(cs_channel, well_known, node_tip, output) + .expect("chainsync loop failed"); }); Ok(handle)