From f900b51616b731e865d1ec4e500522bb6dea11d9 Mon Sep 17 00:00:00 2001 From: DJO <790521+Alenar@users.noreply.github.com> Date: Tue, 22 Oct 2024 18:01:18 +0200 Subject: [PATCH 1/8] feat(common): introduce `RawCardanoPoint` --- .../raw_cardano_point.rs | 151 ++++++++++++++++++ 1 file changed, 151 insertions(+) create mode 100644 mithril-common/src/cardano_block_scanner/raw_cardano_point.rs diff --git a/mithril-common/src/cardano_block_scanner/raw_cardano_point.rs b/mithril-common/src/cardano_block_scanner/raw_cardano_point.rs new file mode 100644 index 00000000000..7e7609a5eef --- /dev/null +++ b/mithril-common/src/cardano_block_scanner/raw_cardano_point.rs @@ -0,0 +1,151 @@ +#[cfg(feature = "fs")] +use pallas_network::miniprotocols::Point as PallasPoint; +use std::fmt::{Debug, Formatter}; + +use crate::cardano_block_scanner::ScannedBlock; +use crate::entities::{ChainPoint, SlotNumber}; + +/// Point in the chain that can be intersected. +/// +/// Internally the point used in Cardano doesn't have a block number like our [ChainPoint] +/// does, so we need to use a different struct to represent it. Else converting from one to the other +/// would be lossy. +#[derive(Clone, PartialEq)] +pub struct RawCardanoPoint { + /// The [slot number](https://docs.cardano.org/learn/cardano-node/#slotsandepochs) + pub slot_number: SlotNumber, + + /// Hex array of the block hash + pub block_hash: Vec, +} + +impl RawCardanoPoint { + /// Instantiate a new `RawCardanoPoint` + pub fn new>>(slot_number: SlotNumber, block_hash: T) -> Self { + RawCardanoPoint { + slot_number, + block_hash: block_hash.into(), + } + } + + /// Create a new origin `RawCardanoPoint` + pub fn origin() -> Self { + RawCardanoPoint { + slot_number: SlotNumber(0), + block_hash: Vec::new(), + } + } + + /// Check if origin + pub fn is_origin(&self) -> bool { + self.slot_number == 0 && self.block_hash.is_empty() + } +} + +impl From<&ChainPoint> for RawCardanoPoint { + fn from(point: &ChainPoint) -> Self { + RawCardanoPoint { + slot_number: point.slot_number, + block_hash: hex::decode(&point.block_hash).unwrap(), + } + } +} + +impl From for RawCardanoPoint { + fn from(point: ChainPoint) -> Self { + Self::from(&point) + } +} + +impl Debug for RawCardanoPoint { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut debug = f.debug_struct("RawCardanoPoint"); + debug + .field("slot_number", &self.slot_number) + .field("block_hash", &hex::encode(&self.block_hash)) + .finish() + } +} + +cfg_fs! { + impl From for PallasPoint { + fn from(raw_point: RawCardanoPoint) -> Self { + match raw_point.is_origin() { + true => Self::Origin, + false => Self::Specific( + *raw_point.slot_number, + raw_point.block_hash + ), + } + } + } + + impl From for RawCardanoPoint { + fn from(point: PallasPoint) -> Self { + match point { + PallasPoint::Specific(slot_number, block_hash) => Self { + slot_number: SlotNumber(slot_number), + block_hash, + }, + PallasPoint::Origin => Self::origin(), + } + } + } +} + +impl From<&ScannedBlock> for RawCardanoPoint { + fn from(scanned_block: &ScannedBlock) -> Self { + RawCardanoPoint { + slot_number: scanned_block.slot_number, + block_hash: hex::decode(&scanned_block.block_hash).unwrap(), + } + } +} + +impl From for RawCardanoPoint { + fn from(scanned_block: ScannedBlock) -> Self { + Self::from(&scanned_block) + } +} + +#[cfg(test)] +mod tests { + use crate::entities::BlockNumber; + + use super::*; + + #[test] + fn from_chain_point_to_raw_cardano_point_conversions() { + let expected_hash = vec![4, 2, 12, 9, 7]; + let chain_point = + ChainPoint::new(SlotNumber(8), BlockNumber(23), hex::encode(&expected_hash)); + + assert_eq!( + RawCardanoPoint::new(SlotNumber(8), expected_hash.clone()), + RawCardanoPoint::from(&chain_point) + ); + assert_eq!( + RawCardanoPoint::new(SlotNumber(8), expected_hash.clone()), + RawCardanoPoint::from(chain_point) + ); + } + + #[test] + fn from_scanned_block_to_raw_cardano_point_conversions() { + let expected_hash = vec![7, 1, 13, 7, 8]; + let scanned_block = ScannedBlock::new( + hex::encode(&expected_hash), + BlockNumber(31), + SlotNumber(4), + Vec::<&str>::new(), + ); + assert_eq!( + RawCardanoPoint::new(SlotNumber(4), expected_hash.clone()), + RawCardanoPoint::from(&scanned_block) + ); + assert_eq!( + RawCardanoPoint::new(SlotNumber(4), expected_hash.clone()), + RawCardanoPoint::from(scanned_block) + ); + } +} From 56b8351866a62b9049244190cfc23636a2263a52 Mon Sep 17 00:00:00 2001 From: DJO <790521+Alenar@users.noreply.github.com> Date: Wed, 23 Oct 2024 17:00:15 +0200 Subject: [PATCH 2/8] refactor(common): store block hash as bytes in scanned block Instead of a hex encoded string, making it closer to what the chain use. Conversion are still done but later, and not as often. --- .../raw_cardano_point.rs | 4 +-- .../cardano_block_scanner/scanned_block.rs | 34 +++++++++++++------ 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/mithril-common/src/cardano_block_scanner/raw_cardano_point.rs b/mithril-common/src/cardano_block_scanner/raw_cardano_point.rs index 7e7609a5eef..9c587c26cd7 100644 --- a/mithril-common/src/cardano_block_scanner/raw_cardano_point.rs +++ b/mithril-common/src/cardano_block_scanner/raw_cardano_point.rs @@ -97,7 +97,7 @@ impl From<&ScannedBlock> for RawCardanoPoint { fn from(scanned_block: &ScannedBlock) -> Self { RawCardanoPoint { slot_number: scanned_block.slot_number, - block_hash: hex::decode(&scanned_block.block_hash).unwrap(), + block_hash: scanned_block.block_hash.clone(), } } } @@ -134,7 +134,7 @@ mod tests { fn from_scanned_block_to_raw_cardano_point_conversions() { let expected_hash = vec![7, 1, 13, 7, 8]; let scanned_block = ScannedBlock::new( - hex::encode(&expected_hash), + expected_hash.clone(), BlockNumber(31), SlotNumber(4), Vec::<&str>::new(), diff --git a/mithril-common/src/cardano_block_scanner/scanned_block.rs b/mithril-common/src/cardano_block_scanner/scanned_block.rs index 8060cf2f7ee..2f75973e7a5 100644 --- a/mithril-common/src/cardano_block_scanner/scanned_block.rs +++ b/mithril-common/src/cardano_block_scanner/scanned_block.rs @@ -1,14 +1,13 @@ use pallas_traverse::MultiEraBlock; +use std::fmt::{Debug, Formatter}; -use crate::entities::{ - BlockHash, BlockNumber, CardanoTransaction, ChainPoint, SlotNumber, TransactionHash, -}; +use crate::entities::{BlockNumber, CardanoTransaction, ChainPoint, SlotNumber, TransactionHash}; /// A block scanned from a Cardano database -#[derive(Debug, Clone, PartialEq)] +#[derive(Clone, PartialEq)] pub struct ScannedBlock { /// Block hash - pub block_hash: BlockHash, + pub block_hash: Vec, /// Block number pub block_number: BlockNumber, /// Slot number of the block @@ -19,11 +18,11 @@ pub struct ScannedBlock { impl ScannedBlock { /// Scanned block factory - pub fn new, U: Into>( - block_hash: U, + pub fn new>, TxHash: Into>( + block_hash: BlkHash, block_number: BlockNumber, slot_number: SlotNumber, - transaction_hashes: Vec, + transaction_hashes: Vec, ) -> Self { Self { block_hash: block_hash.into(), @@ -40,7 +39,7 @@ impl ScannedBlock { } Self::new( - multi_era_block.hash().to_string(), + *multi_era_block.hash(), BlockNumber(multi_era_block.number()), SlotNumber(multi_era_block.slot()), transactions, @@ -56,6 +55,7 @@ impl ScannedBlock { /// /// Consume the block. pub fn into_transactions(self) -> Vec { + let block_hash = hex::encode(&self.block_hash); self.transactions_hashes .into_iter() .map(|transaction_hash| { @@ -63,19 +63,31 @@ impl ScannedBlock { transaction_hash, self.block_number, self.slot_number, - self.block_hash.clone(), + block_hash.clone(), ) }) .collect::>() } } +impl Debug for ScannedBlock { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut debug = f.debug_struct("ScannedBlock"); + debug + .field("block_hash", &hex::encode(&self.block_hash)) + .field("block_number", &self.block_number) + .field("slot_number", &self.slot_number) + .field("transactions_hashes", &self.transactions_hashes) + .finish() + } +} + impl From<&ScannedBlock> for ChainPoint { fn from(scanned_block: &ScannedBlock) -> Self { ChainPoint::new( scanned_block.slot_number, scanned_block.block_number, - scanned_block.block_hash.clone(), + hex::encode(&scanned_block.block_hash), ) } } From 908b5b8c0f5b87498ebe1aa5b65efed65a5ed316 Mon Sep 17 00:00:00 2001 From: DJO <790521+Alenar@users.noreply.github.com> Date: Wed, 23 Oct 2024 10:21:08 +0200 Subject: [PATCH 3/8] refactor(common): use `RawCardanoPoint` in chain reader and block scanner --- .../cardano_block_scanner/block_scanner.rs | 6 +- .../chain_reader_block_streamer.rs | 95 +++++++++---------- .../dumb_block_scanner.rs | 42 ++++---- .../src/cardano_block_scanner/interface.rs | 16 ++-- .../src/cardano_block_scanner/mod.rs | 2 + mithril-common/src/chain_reader/entity.rs | 7 +- .../src/chain_reader/fake_chain_reader.rs | 15 +-- mithril-common/src/chain_reader/interface.rs | 5 +- .../src/chain_reader/pallas_chain_reader.rs | 31 +++--- 9 files changed, 105 insertions(+), 114 deletions(-) diff --git a/mithril-common/src/cardano_block_scanner/block_scanner.rs b/mithril-common/src/cardano_block_scanner/block_scanner.rs index 0e968104f27..253789ea097 100644 --- a/mithril-common/src/cardano_block_scanner/block_scanner.rs +++ b/mithril-common/src/cardano_block_scanner/block_scanner.rs @@ -4,9 +4,9 @@ use async_trait::async_trait; use slog::Logger; use tokio::sync::Mutex; -use crate::cardano_block_scanner::{BlockScanner, BlockStreamer}; +use crate::cardano_block_scanner::{BlockScanner, BlockStreamer, RawCardanoPoint}; use crate::chain_reader::ChainBlockReader; -use crate::entities::{BlockNumber, ChainPoint}; +use crate::entities::BlockNumber; use crate::StdResult; use super::ChainReaderBlockStreamer; @@ -39,7 +39,7 @@ impl CardanoBlockScanner { impl BlockScanner for CardanoBlockScanner { async fn scan( &self, - from: Option, + from: Option, until: BlockNumber, ) -> StdResult> { Ok(Box::new( diff --git a/mithril-common/src/cardano_block_scanner/chain_reader_block_streamer.rs b/mithril-common/src/cardano_block_scanner/chain_reader_block_streamer.rs index e71a35ec4c9..67fbc2ef2db 100644 --- a/mithril-common/src/cardano_block_scanner/chain_reader_block_streamer.rs +++ b/mithril-common/src/cardano_block_scanner/chain_reader_block_streamer.rs @@ -4,10 +4,9 @@ use async_trait::async_trait; use slog::{debug, trace, Logger}; use tokio::sync::Mutex; -use crate::cardano_block_scanner::BlockStreamer; -use crate::cardano_block_scanner::ChainScannedBlocks; +use crate::cardano_block_scanner::{BlockStreamer, ChainScannedBlocks, RawCardanoPoint}; use crate::chain_reader::{ChainBlockNextAction, ChainBlockReader}; -use crate::entities::{BlockNumber, ChainPoint}; +use crate::entities::BlockNumber; use crate::logging::LoggerExtensions; use crate::StdResult; @@ -23,10 +22,10 @@ enum BlockStreamerNextAction { /// [Block streamer][BlockStreamer] that streams blocks with a [Chain block reader][ChainBlockReader] pub struct ChainReaderBlockStreamer { chain_reader: Arc>, - from: ChainPoint, + from: RawCardanoPoint, until: BlockNumber, max_roll_forwards_per_poll: usize, - last_polled_chain_point: Option, + last_polled_point: Option, logger: Logger, } @@ -43,7 +42,7 @@ impl BlockStreamer for ChainReaderBlockStreamer { Some(BlockStreamerNextAction::ChainBlockNextAction( ChainBlockNextAction::RollForward { parsed_block }, )) => { - self.last_polled_chain_point = Some(ChainPoint::from(&parsed_block)); + self.last_polled_point = Some(RawCardanoPoint::from(&parsed_block)); let parsed_block_number = parsed_block.block_number; roll_forwards.push(parsed_block); if roll_forwards.len() >= self.max_roll_forwards_per_poll @@ -54,10 +53,10 @@ impl BlockStreamer for ChainReaderBlockStreamer { } Some(BlockStreamerNextAction::ChainBlockNextAction( ChainBlockNextAction::RollBackward { - chain_point: rollback_chain_point, + point: rollback_chain_point, }, )) => { - self.last_polled_chain_point = Some(rollback_chain_point.clone()); + self.last_polled_point = Some(rollback_chain_point.clone()); let rollback_slot_number = rollback_chain_point.slot_number; let index_rollback = roll_forwards .iter() @@ -96,8 +95,8 @@ impl BlockStreamer for ChainReaderBlockStreamer { } } - fn latest_polled_chain_point(&self) -> Option { - self.last_polled_chain_point.clone() + fn last_polled_point(&self) -> Option { + self.last_polled_point.clone() } } @@ -105,12 +104,12 @@ impl ChainReaderBlockStreamer { /// Factory pub async fn try_new( chain_reader: Arc>, - from: Option, + from: Option, until: BlockNumber, max_roll_forwards_per_poll: usize, logger: Logger, ) -> StdResult { - let from = from.unwrap_or(ChainPoint::origin()); + let from = from.unwrap_or(RawCardanoPoint::origin()); { let mut chain_reader_inner = chain_reader.try_lock()?; chain_reader_inner.set_chain_point(&from).await?; @@ -120,7 +119,7 @@ impl ChainReaderBlockStreamer { from, until, max_roll_forwards_per_poll, - last_polled_chain_point: None, + last_polled_point: None, logger: logger.new_with_component_name::(), }) } @@ -148,7 +147,7 @@ impl ChainReaderBlockStreamer { } } Some(ChainBlockNextAction::RollBackward { - chain_point: rollback_chain_point, + point: rollback_chain_point, }) => { let rollback_slot_number = rollback_chain_point.slot_number; trace!( @@ -160,7 +159,7 @@ impl ChainReaderBlockStreamer { } else { BlockStreamerNextAction::ChainBlockNextAction( ChainBlockNextAction::RollBackward { - chain_point: rollback_chain_point, + point: rollback_chain_point, }, ) }; @@ -219,7 +218,7 @@ mod tests { let scanned_blocks = block_streamer.poll_next().await.expect("poll_next failed"); assert_eq!(None, scanned_blocks); - assert_eq!(None, block_streamer.latest_polled_chain_point()); + assert_eq!(None, block_streamer.last_polled_point()); let mut block_streamer = ChainReaderBlockStreamer::try_new( chain_reader, @@ -242,12 +241,8 @@ mod tests { scanned_blocks ); assert_eq!( - block_streamer.latest_polled_chain_point(), - Some(ChainPoint::new( - SlotNumber(100), - until_block_number, - "hash-2", - )) + block_streamer.last_polled_point(), + Some(RawCardanoPoint::new(SlotNumber(100), "hash-2")) ); } @@ -304,8 +299,8 @@ mod tests { assert_eq!(1, chain_reader_total_remaining_next_actions); assert_eq!( - block_streamer.latest_polled_chain_point(), - Some(ChainPoint::new(SlotNumber(20), BlockNumber(2), "hash-2")) + block_streamer.last_polled_point(), + Some(RawCardanoPoint::new(SlotNumber(20), "hash-2")) ); } @@ -350,8 +345,8 @@ mod tests { scanned_blocks, ); assert_eq!( - block_streamer.latest_polled_chain_point(), - Some(ChainPoint::new(SlotNumber(20), BlockNumber(2), "hash-2")) + block_streamer.last_polled_point(), + Some(RawCardanoPoint::new(SlotNumber(20), "hash-2")) ); } @@ -403,8 +398,8 @@ mod tests { scanned_blocks, ); assert_eq!( - block_streamer.latest_polled_chain_point(), - Some(ChainPoint::new(SlotNumber(20), BlockNumber(2), "hash-2")) + block_streamer.last_polled_point(), + Some(RawCardanoPoint::new(SlotNumber(20), "hash-2")) ); let scanned_blocks = block_streamer.poll_next().await.expect("poll_next failed"); @@ -418,15 +413,15 @@ mod tests { scanned_blocks, ); assert_eq!( - block_streamer.latest_polled_chain_point(), - Some(ChainPoint::new(SlotNumber(30), BlockNumber(3), "hash-3")) + block_streamer.last_polled_point(), + Some(RawCardanoPoint::new(SlotNumber(30), "hash-3")) ); let scanned_blocks = block_streamer.poll_next().await.expect("poll_next failed"); assert_eq!(None, scanned_blocks); assert_eq!( - block_streamer.latest_polled_chain_point(), - Some(ChainPoint::new(SlotNumber(30), BlockNumber(3), "hash-3")) + block_streamer.last_polled_point(), + Some(RawCardanoPoint::new(SlotNumber(30), "hash-3")) ); } @@ -434,16 +429,12 @@ mod tests { async fn test_parse_expected_nothing_when_rollbackward_on_same_point() { let chain_reader = Arc::new(Mutex::new(FakeChainReader::new(vec![ ChainBlockNextAction::RollBackward { - chain_point: ChainPoint::new(SlotNumber(100), BlockNumber(10), "hash-123"), + point: RawCardanoPoint::new(SlotNumber(100), "hash-123"), }, ]))); let mut block_streamer = ChainReaderBlockStreamer::try_new( chain_reader, - Some(ChainPoint::new( - SlotNumber(100), - BlockNumber(10), - "hash-123", - )), + Some(RawCardanoPoint::new(SlotNumber(100), "hash-123")), BlockNumber(1), MAX_ROLL_FORWARDS_PER_POLL, TestLogger::stdout(), @@ -453,7 +444,7 @@ mod tests { let scanned_blocks = block_streamer.poll_next().await.expect("poll_next failed"); assert_eq!(None, scanned_blocks); - assert_eq!(block_streamer.latest_polled_chain_point(), None); + assert_eq!(block_streamer.last_polled_point(), None); } #[tokio::test] @@ -461,7 +452,7 @@ mod tests { { let chain_reader = Arc::new(Mutex::new(FakeChainReader::new(vec![ ChainBlockNextAction::RollBackward { - chain_point: ChainPoint::new(SlotNumber(100), BlockNumber(10), "hash-10"), + point: RawCardanoPoint::new(SlotNumber(100), "hash-10"), }, ]))); let mut block_streamer = ChainReaderBlockStreamer::try_new( @@ -481,15 +472,15 @@ mod tests { scanned_blocks, ); assert_eq!( - block_streamer.latest_polled_chain_point(), - Some(ChainPoint::new(SlotNumber(100), BlockNumber(10), "hash-10")) + block_streamer.last_polled_point(), + Some(RawCardanoPoint::new(SlotNumber(100), "hash-10")) ); let scanned_blocks = block_streamer.poll_next().await.expect("poll_next failed"); assert_eq!(None, scanned_blocks); assert_eq!( - block_streamer.latest_polled_chain_point(), - Some(ChainPoint::new(SlotNumber(100), BlockNumber(10), "hash-10")) + block_streamer.last_polled_point(), + Some(RawCardanoPoint::new(SlotNumber(100), "hash-10")) ); } @@ -522,7 +513,7 @@ mod tests { ), }, ChainBlockNextAction::RollBackward { - chain_point: ChainPoint::new(SlotNumber(9), BlockNumber(90), "hash-9"), + point: RawCardanoPoint::new(SlotNumber(9), "hash-9"), }, ]))); let mut block_streamer = ChainReaderBlockStreamer::try_new( @@ -545,8 +536,8 @@ mod tests { scanned_blocks, ); assert_eq!( - block_streamer.latest_polled_chain_point(), - Some(ChainPoint::new(SlotNumber(9), BlockNumber(90), "hash-9",)) + block_streamer.last_polled_point(), + Some(RawCardanoPoint::new(SlotNumber(9), "hash-9")) ); } @@ -571,7 +562,7 @@ mod tests { ), }, ChainBlockNextAction::RollBackward { - chain_point: ChainPoint::new(SlotNumber(3), BlockNumber(30), "hash-3"), + point: RawCardanoPoint::new(SlotNumber(3), "hash-3"), }, ]))); let mut block_streamer = ChainReaderBlockStreamer::try_new( @@ -591,8 +582,8 @@ mod tests { scanned_blocks, ); assert_eq!( - block_streamer.latest_polled_chain_point(), - Some(ChainPoint::new(SlotNumber(3), BlockNumber(30), "hash-3",)) + block_streamer.last_polled_point(), + Some(RawCardanoPoint::new(SlotNumber(3), "hash-3")) ); } @@ -615,7 +606,7 @@ mod tests { } #[tokio::test] - async fn test_latest_polled_chain_point_is_none_if_nothing_was_polled() { + async fn test_last_polled_point_is_none_if_nothing_was_polled() { let chain_reader = Arc::new(Mutex::new(FakeChainReader::new(vec![]))); let block_streamer = ChainReaderBlockStreamer::try_new( chain_reader, @@ -627,6 +618,6 @@ mod tests { .await .unwrap(); - assert_eq!(block_streamer.latest_polled_chain_point(), None); + assert_eq!(block_streamer.last_polled_point(), None); } } diff --git a/mithril-common/src/cardano_block_scanner/dumb_block_scanner.rs b/mithril-common/src/cardano_block_scanner/dumb_block_scanner.rs index b156e1a2167..5b5b678dd4c 100644 --- a/mithril-common/src/cardano_block_scanner/dumb_block_scanner.rs +++ b/mithril-common/src/cardano_block_scanner/dumb_block_scanner.rs @@ -3,8 +3,8 @@ use std::sync::RwLock; use async_trait::async_trait; -use crate::cardano_block_scanner::ChainScannedBlocks; use crate::cardano_block_scanner::{BlockScanner, BlockStreamer, ScannedBlock}; +use crate::cardano_block_scanner::{ChainScannedBlocks, RawCardanoPoint}; use crate::entities::{BlockNumber, ChainPoint}; use crate::StdResult; @@ -34,9 +34,9 @@ impl DumbBlockScanner { self } - /// Set the latest polled chain point to return when [Self::latest_polled_chain_point] is called. - pub fn latest_polled_chain_point(self, chain_point: Option) -> Self { - self.set_latest_polled_chain_point(chain_point); + /// Set the last polled point to return when [Self::last_polled_point] is called. + pub fn last_polled_point(self, raw_point: Option) -> Self { + self.set_last_polled_point(raw_point); self } @@ -53,10 +53,10 @@ impl DumbBlockScanner { *streamer = streamer.clone().rollback(chain_point); } - /// Set the latest polled chain point to return when [Self::latest_polled_chain_point] is called. - pub fn set_latest_polled_chain_point(&self, chain_point: Option) { + /// Set the last polled point to return when [Self::last_polled_point] is called. + pub fn set_last_polled_point(&self, raw_point: Option) { let mut streamer = self.streamer.write().unwrap(); - *streamer = streamer.clone().set_latest_polled_chain_point(chain_point); + *streamer = streamer.clone().set_last_polled_point(raw_point); } } @@ -70,7 +70,7 @@ impl Default for DumbBlockScanner { impl BlockScanner for DumbBlockScanner { async fn scan( &self, - _from: Option, + _from: Option, _until: BlockNumber, ) -> StdResult> { let streamer = self.streamer.read().unwrap(); @@ -82,7 +82,7 @@ impl BlockScanner for DumbBlockScanner { #[derive(Clone)] pub struct DumbBlockStreamer { streamer_responses: VecDeque, - latest_polled_chain_point: Option, + last_polled_point: Option, } impl DumbBlockStreamer { @@ -90,13 +90,13 @@ impl DumbBlockStreamer { pub fn new() -> Self { Self { streamer_responses: VecDeque::new(), - latest_polled_chain_point: None, + last_polled_point: None, } } - /// Set the latest polled chain point to return when [Self::latest_polled_chain_point] is called - pub fn set_latest_polled_chain_point(mut self, chain_point: Option) -> Self { - self.latest_polled_chain_point = chain_point; + /// Set the last polled point to return when [Self::last_polled_point] is called + pub fn set_last_polled_point(mut self, raw_point: Option) -> Self { + self.last_polled_point = raw_point; self } @@ -132,8 +132,8 @@ impl BlockStreamer for DumbBlockStreamer { Ok(self.streamer_responses.pop_front()) } - fn latest_polled_chain_point(&self) -> Option { - self.latest_polled_chain_point.clone() + fn last_polled_point(&self) -> Option { + self.last_polled_point.clone() } } @@ -312,13 +312,13 @@ mod tests { #[tokio::test] async fn setting_last_polled_block() { let mut streamer = DumbBlockStreamer::new().forwards(vec![]); - assert_eq!(streamer.latest_polled_chain_point(), None); + assert_eq!(streamer.last_polled_point(), None); - let chain_point = ChainPoint::new(SlotNumber(10), BlockNumber(2), "block-hash"); - streamer = streamer.set_latest_polled_chain_point(Some(chain_point.clone())); - assert_eq!(streamer.latest_polled_chain_point(), Some(chain_point)); + let raw_point = RawCardanoPoint::new(SlotNumber(10), "block-hash".as_bytes()); + streamer = streamer.set_last_polled_point(Some(raw_point.clone())); + assert_eq!(streamer.last_polled_point(), Some(raw_point)); - streamer = streamer.set_latest_polled_chain_point(None); - assert_eq!(streamer.latest_polled_chain_point(), None); + streamer = streamer.set_last_polled_point(None); + assert_eq!(streamer.last_polled_point(), None); } } diff --git a/mithril-common/src/cardano_block_scanner/interface.rs b/mithril-common/src/cardano_block_scanner/interface.rs index 54268944cae..cdde80d3675 100644 --- a/mithril-common/src/cardano_block_scanner/interface.rs +++ b/mithril-common/src/cardano_block_scanner/interface.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; -use crate::cardano_block_scanner::ScannedBlock; -use crate::entities::{BlockNumber, ChainPoint, SlotNumber}; +use crate::cardano_block_scanner::{RawCardanoPoint, ScannedBlock}; +use crate::entities::{BlockNumber, SlotNumber}; use crate::StdResult; /// A scanner that can read cardano transactions in a cardano database @@ -15,8 +15,8 @@ use crate::StdResult; /// use async_trait::async_trait; /// use mockall::mock; /// -/// use mithril_common::cardano_block_scanner::{BlockScanner, BlockStreamer}; -/// use mithril_common::entities::{BlockNumber, ChainPoint}; +/// use mithril_common::cardano_block_scanner::{BlockScanner, BlockStreamer, RawCardanoPoint}; +/// use mithril_common::entities::{BlockNumber}; /// use mithril_common::StdResult; /// /// mock! { @@ -26,7 +26,7 @@ use crate::StdResult; /// impl BlockScanner for BlockScannerImpl { /// async fn scan( /// &self, -/// from: Option, +/// from: Option, /// until: BlockNumber, /// ) -> StdResult>; /// } @@ -46,7 +46,7 @@ pub trait BlockScanner: Sync + Send { /// Scan the transactions async fn scan( &self, - from: Option, + from: Option, until: BlockNumber, ) -> StdResult>; } @@ -66,8 +66,8 @@ pub trait BlockStreamer: Sync + Send { /// Stream the next available blocks async fn poll_next(&mut self) -> StdResult>; - /// Get the latest polled chain point - fn latest_polled_chain_point(&self) -> Option; + /// Get the last polled point of the chain + fn last_polled_point(&self) -> Option; } cfg_test_tools! { diff --git a/mithril-common/src/cardano_block_scanner/mod.rs b/mithril-common/src/cardano_block_scanner/mod.rs index 164f5b35231..6c1faf6d160 100644 --- a/mithril-common/src/cardano_block_scanner/mod.rs +++ b/mithril-common/src/cardano_block_scanner/mod.rs @@ -3,10 +3,12 @@ mod block_scanner; mod chain_reader_block_streamer; mod dumb_block_scanner; mod interface; +mod raw_cardano_point; mod scanned_block; pub use block_scanner::*; pub use chain_reader_block_streamer::*; pub use dumb_block_scanner::*; pub use interface::*; +pub use raw_cardano_point::*; pub use scanned_block::*; diff --git a/mithril-common/src/chain_reader/entity.rs b/mithril-common/src/chain_reader/entity.rs index 0b391423b75..778d0f9e26b 100644 --- a/mithril-common/src/chain_reader/entity.rs +++ b/mithril-common/src/chain_reader/entity.rs @@ -1,4 +1,5 @@ -use crate::{cardano_block_scanner::ScannedBlock, entities::ChainPoint}; +use crate::cardano_block_scanner::RawCardanoPoint; +use crate::cardano_block_scanner::ScannedBlock; /// The action that indicates what to do next when scanning the chain #[derive(Debug, Clone, PartialEq)] @@ -10,7 +11,7 @@ pub enum ChainBlockNextAction { }, /// RollBackward event (we are on an incorrect fork, we need to get back a point to roll forward again) RollBackward { - /// The rollback chain point in the chain to read (as a new valid chain point to read from on the main chain, which has already been seen) - chain_point: ChainPoint, + /// The rollback point in the chain to read (as a new valid chain point to read from on the main chain, which has already been seen) + point: RawCardanoPoint, }, } diff --git a/mithril-common/src/chain_reader/fake_chain_reader.rs b/mithril-common/src/chain_reader/fake_chain_reader.rs index 6286f1ccb3b..c0a67257820 100644 --- a/mithril-common/src/chain_reader/fake_chain_reader.rs +++ b/mithril-common/src/chain_reader/fake_chain_reader.rs @@ -2,7 +2,8 @@ use std::{collections::VecDeque, sync::Mutex}; use async_trait::async_trait; -use crate::{entities::ChainPoint, StdResult}; +use crate::cardano_block_scanner::RawCardanoPoint; +use crate::StdResult; use super::{ChainBlockNextAction, ChainBlockReader}; @@ -27,7 +28,7 @@ impl FakeChainReader { #[async_trait] impl ChainBlockReader for FakeChainReader { - async fn set_chain_point(&mut self, _point: &ChainPoint) -> StdResult<()> { + async fn set_chain_point(&mut self, _point: &RawCardanoPoint) -> StdResult<()> { Ok(()) } @@ -43,14 +44,6 @@ mod tests { use super::*; - fn build_chain_point(id: u64) -> ChainPoint { - ChainPoint { - slot_number: SlotNumber(id), - block_number: BlockNumber(id), - block_hash: format!("point-hash-{id}"), - } - } - #[tokio::test] async fn test_get_next_chain_block() { let expected_chain_point_next_actions = vec![ @@ -71,7 +64,7 @@ mod tests { ), }, ChainBlockNextAction::RollBackward { - chain_point: build_chain_point(1), + point: RawCardanoPoint::new(SlotNumber(1), "point-hash-1".as_bytes()), }, ]; diff --git a/mithril-common/src/chain_reader/interface.rs b/mithril-common/src/chain_reader/interface.rs index 66d66d36865..793e56c4e0c 100644 --- a/mithril-common/src/chain_reader/interface.rs +++ b/mithril-common/src/chain_reader/interface.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; -use crate::{entities::ChainPoint, StdResult}; +use crate::cardano_block_scanner::RawCardanoPoint; +use crate::StdResult; use super::ChainBlockNextAction; @@ -11,7 +12,7 @@ use super::ChainBlockNextAction; #[async_trait] pub trait ChainBlockReader: Send + Sync { /// Sets the chain point - async fn set_chain_point(&mut self, point: &ChainPoint) -> StdResult<()>; + async fn set_chain_point(&mut self, point: &RawCardanoPoint) -> StdResult<()>; /// Get the next chain block async fn get_next_chain_block(&mut self) -> StdResult>; diff --git a/mithril-common/src/chain_reader/pallas_chain_reader.rs b/mithril-common/src/chain_reader/pallas_chain_reader.rs index 4116e410bbd..6d73fd5cb13 100644 --- a/mithril-common/src/chain_reader/pallas_chain_reader.rs +++ b/mithril-common/src/chain_reader/pallas_chain_reader.rs @@ -10,7 +10,10 @@ use pallas_traverse::MultiEraBlock; use slog::{debug, Logger}; use crate::logging::LoggerExtensions; -use crate::{cardano_block_scanner::ScannedBlock, entities::ChainPoint, CardanoNetwork, StdResult}; +use crate::{ + cardano_block_scanner::{RawCardanoPoint, ScannedBlock}, + CardanoNetwork, StdResult, +}; use super::{ChainBlockNextAction, ChainBlockReader}; @@ -55,7 +58,7 @@ impl PallasChainReader { } /// Intersects the point of the chain with the given point. - async fn find_intersect_point(&mut self, point: &ChainPoint) -> StdResult<()> { + async fn find_intersect_point(&mut self, point: &RawCardanoPoint) -> StdResult<()> { let logger = self.logger.clone(); let client = self.get_client().await?; let chainsync = client.chainsync(); @@ -85,8 +88,8 @@ impl PallasChainReader { Ok(Some(ChainBlockNextAction::RollForward { parsed_block })) } NextResponse::RollBackward(rollback_point, _) => { - let chain_point = ChainPoint::from(rollback_point); - Ok(Some(ChainBlockNextAction::RollBackward { chain_point })) + let point = RawCardanoPoint::from(rollback_point); + Ok(Some(ChainBlockNextAction::RollBackward { point })) } NextResponse::Await => Ok(None), } @@ -105,7 +108,7 @@ impl Drop for PallasChainReader { #[async_trait] impl ChainBlockReader for PallasChainReader { - async fn set_chain_point(&mut self, point: &ChainPoint) -> StdResult<()> { + async fn set_chain_point(&mut self, point: &RawCardanoPoint) -> StdResult<()> { self.find_intersect_point(point).await } @@ -167,9 +170,9 @@ mod tests { BlockNumber(1337) } - /// Returns a fake chain point for testing purposes. - fn get_fake_chain_point_backwards() -> ChainPoint { - ChainPoint::from(get_fake_specific_point()) + /// Returns a fake cardano raw point for testing purposes. + fn get_fake_raw_point_backwards() -> RawCardanoPoint { + RawCardanoPoint::from(get_fake_specific_point()) } /// Creates a new work directory in the system's temporary folder. @@ -268,7 +271,7 @@ mod tests { ); chain_reader - .set_chain_point(&ChainPoint::from(known_point.clone())) + .set_chain_point(&RawCardanoPoint::from(known_point.clone())) .await .unwrap(); @@ -278,8 +281,8 @@ mod tests { let (_, client_res) = tokio::join!(server, client); let chain_block = client_res.expect("Client failed to get next chain block"); match chain_block { - ChainBlockNextAction::RollBackward { chain_point } => { - assert_eq!(chain_point, get_fake_chain_point_backwards()); + ChainBlockNextAction::RollBackward { point: chain_point } => { + assert_eq!(chain_point, get_fake_raw_point_backwards()); } _ => panic!("Unexpected chain block action"), } @@ -303,7 +306,7 @@ mod tests { ); chain_reader - .set_chain_point(&ChainPoint::from(known_point.clone())) + .set_chain_point(&RawCardanoPoint::from(known_point.clone())) .await .unwrap(); @@ -338,7 +341,7 @@ mod tests { ); chain_reader - .set_chain_point(&ChainPoint::from(known_point.clone())) + .set_chain_point(&RawCardanoPoint::from(known_point.clone())) .await .unwrap(); @@ -355,7 +358,7 @@ mod tests { // make sure that setting the chain point is harmless when the chainsync client does not have agency chain_reader - .set_chain_point(&ChainPoint::from(known_point.clone())) + .set_chain_point(&RawCardanoPoint::from(known_point.clone())) .await .unwrap(); From 00af33edcf75171f9c0dfa86fdf76d54f3166007 Mon Sep 17 00:00:00 2001 From: DJO <790521+Alenar@users.noreply.github.com> Date: Thu, 24 Oct 2024 12:55:19 +0200 Subject: [PATCH 4/8] refactor: adapt transactions importers to the new scanner api --- .../services/cardano_transactions_importer.rs | 247 +++++++++--------- .../cardano_transactions/importer/service.rs | 247 +++++++++--------- 2 files changed, 234 insertions(+), 260 deletions(-) diff --git a/mithril-aggregator/src/services/cardano_transactions_importer.rs b/mithril-aggregator/src/services/cardano_transactions_importer.rs index ccc2bd62705..72254e30c68 100644 --- a/mithril-aggregator/src/services/cardano_transactions_importer.rs +++ b/mithril-aggregator/src/services/cardano_transactions_importer.rs @@ -7,7 +7,7 @@ use async_trait::async_trait; use slog::{debug, Logger}; use tokio::{runtime::Handle, sync::Mutex, task}; -use mithril_common::cardano_block_scanner::{BlockScanner, ChainScannedBlocks}; +use mithril_common::cardano_block_scanner::{BlockScanner, ChainScannedBlocks, RawCardanoPoint}; use mithril_common::crypto_helper::{MKTree, MKTreeNode, MKTreeStoreInMemory}; use mithril_common::entities::{ BlockNumber, BlockRange, CardanoTransaction, ChainPoint, SlotNumber, @@ -56,7 +56,7 @@ pub trait TransactionStore: Send + Sync { pub struct CardanoTransactionsImporter { block_scanner: Arc, transaction_store: Arc, - latest_polled_chain_point: Arc>>, + last_polled_point: Arc>>, logger: Logger, } @@ -70,47 +70,52 @@ impl CardanoTransactionsImporter { Self { block_scanner, transaction_store, - latest_polled_chain_point: Arc::new(Mutex::new(None)), + last_polled_point: Arc::new(Mutex::new(None)), logger: logger.new_with_component_name::(), } } - async fn start_chain_point(&self) -> StdResult> { - let last_scanned_chain_point = self.latest_polled_chain_point.lock().await.clone(); - - if last_scanned_chain_point.is_none() { - self.transaction_store.get_highest_beacon().await - } else { - Ok(last_scanned_chain_point) - } + async fn start_point( + &self, + highest_stored_chain_point: &Option, + ) -> StdResult> { + let last_polled_point = self.last_polled_point.lock().await.clone(); + Ok(last_polled_point.or(highest_stored_chain_point + .as_ref() + .map(RawCardanoPoint::from))) } async fn import_transactions(&self, up_to_beacon: BlockNumber) -> StdResult<()> { - let from = self.start_chain_point().await?; - self.parse_and_store_transactions_not_imported_yet(from, up_to_beacon) - .await - } + let highest_stored_beacon = self.transaction_store.get_highest_beacon().await?; + let from = self.start_point(&highest_stored_beacon).await?; - async fn parse_and_store_transactions_not_imported_yet( - &self, - from: Option, - until: BlockNumber, - ) -> StdResult<()> { - if from.as_ref().is_some_and(|f| f.block_number >= until) { + if highest_stored_beacon + .as_ref() + .is_some_and(|f| f.block_number >= up_to_beacon) + { debug!( self.logger, - "No need to retrieve Cardano transactions, the database is up to date for block_number '{until}'", + "No need to retrieve Cardano transactions, the database is up to date for block_number '{up_to_beacon}'", + ); + + Ok(()) + } else { + debug!( + self.logger, "Retrieving Cardano transactions until block numbered '{up_to_beacon}'"; + "starting_slot_number" => ?from.as_ref().map(|c| c.slot_number), + "highest_stored_block_number" => ?highest_stored_beacon.as_ref().map(|c| c.block_number), ); - return Ok(()); + + self.parse_and_store_transactions_not_imported_yet(from, up_to_beacon) + .await } - debug!( - self.logger, - "Retrieving Cardano transactions between block_number '{}' and '{until}'", - from.as_ref() - .map(|c| c.block_number) - .unwrap_or(BlockNumber(0)) - ); + } + async fn parse_and_store_transactions_not_imported_yet( + &self, + from: Option, + until: BlockNumber, + ) -> StdResult<()> { let mut streamer = self.block_scanner.scan(from, until).await?; while let Some(blocks) = streamer.poll_next().await? { @@ -132,7 +137,8 @@ impl CardanoTransactionsImporter { } } } - *self.latest_polled_chain_point.lock().await = streamer.latest_polled_chain_point(); + + *self.last_polled_point.lock().await = streamer.last_polled_point(); Ok(()) } @@ -229,7 +235,7 @@ mod tests { impl BlockScanner for BlockScannerImpl { async fn scan( &self, - from: Option, + from: Option, until: BlockNumber, ) -> StdResult>; } @@ -446,8 +452,12 @@ mod tests { ), ]]); - let last_tx = - CardanoTransaction::new("tx-20", BlockNumber(30), SlotNumber(35), "block_hash-3"); + let last_tx = CardanoTransaction::new( + "tx-20", + BlockNumber(30), + SlotNumber(35), + hex::encode("block_hash-3"), + ); repository .store_transactions(vec![last_tx.clone()]) .await @@ -472,10 +482,13 @@ mod tests { SqliteConnectionPool::build_from_connection(connection), ))); - let highest_stored_chain_point = - ChainPoint::new(SlotNumber(134), BlockNumber(10), "block_hash-1"); + let highest_stored_chain_point = ChainPoint::new( + SlotNumber(134), + BlockNumber(10), + hex::encode("block_hash-1"), + ); let stored_block = ScannedBlock::new( - highest_stored_chain_point.block_hash.clone(), + hex::decode(highest_stored_chain_point.block_hash.clone()).unwrap(), highest_stored_chain_point.block_number, highest_stored_chain_point.slot_number, vec!["tx_hash-1", "tx_hash-2"], @@ -504,7 +517,7 @@ mod tests { scanner_mock .expect_scan() .withf(move |from, until| { - from == &Some(highest_stored_chain_point.clone()) + from == &Some(highest_stored_chain_point.clone().into()) && *until == up_to_block_number }) .return_once(move |_, _| { @@ -797,24 +810,16 @@ mod tests { mod transactions_import_start_point { use super::*; - async fn importer_with_highest_stored_transaction_and_last_polled_chain_point( - highest_stored_transaction: Option, - last_polled_chain_point: Option, + async fn importer_with_last_polled_point( + last_polled_point: Option, ) -> CardanoTransactionsImporter { let connection = cardano_tx_db_connection().unwrap(); let repository = Arc::new(CardanoTransactionRepository::new(Arc::new( SqliteConnectionPool::build_from_connection(connection), ))); - if let Some(transaction) = highest_stored_transaction { - repository - .store_transactions(vec![transaction]) - .await - .unwrap(); - } - CardanoTransactionsImporter { - latest_polled_chain_point: Arc::new(Mutex::new(last_polled_chain_point)), + last_polled_point: Arc::new(Mutex::new(last_polled_point)), ..CardanoTransactionsImporter::new_for_test( Arc::new(DumbBlockScanner::new()), repository, @@ -823,110 +828,90 @@ mod tests { } #[tokio::test] - async fn cloning_keep_last_polled_chain_point() { - let importer = importer_with_highest_stored_transaction_and_last_polled_chain_point( - None, - Some(ChainPoint::new( - SlotNumber(15), - BlockNumber(10), - "block_hash-1", - )), - ) + async fn cloning_keep_last_polled_point() { + let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new( + SlotNumber(15), + "block_hash-1", + ))) .await; let cloned_importer = importer.clone(); - let start_point = cloned_importer.start_chain_point().await.unwrap(); + let start_point = cloned_importer.start_point(&None).await.unwrap(); assert_eq!( - Some(ChainPoint::new( - SlotNumber(15), - BlockNumber(10), - "block_hash-1" - )), + Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")), start_point ); } #[tokio::test] async fn none_if_nothing_stored_nor_scanned() { - let importer = - importer_with_highest_stored_transaction_and_last_polled_chain_point(None, None) - .await; + let importer = importer_with_last_polled_point(None).await; + let highest_stored_block_number = None; - let start_point = importer.start_chain_point().await.unwrap(); + let start_point = importer + .start_point(&highest_stored_block_number) + .await + .unwrap(); assert_eq!(None, start_point); } #[tokio::test] - async fn start_at_last_stored_chain_point_if_nothing_scanned() { - let importer = importer_with_highest_stored_transaction_and_last_polled_chain_point( - Some(CardanoTransaction::new( - "tx_hash-2", - BlockNumber(20), - SlotNumber(25), - "block_hash-2", - )), - None, - ) - .await; + async fn start_at_last_stored_point_if_nothing_scanned() { + let importer = importer_with_last_polled_point(None).await; + let highest_stored_block_number = Some(ChainPoint::new( + SlotNumber(25), + BlockNumber(20), + hex::encode("block_hash-2"), + )); - let start_point = importer.start_chain_point().await.unwrap(); + let start_point = importer + .start_point(&highest_stored_block_number) + .await + .unwrap(); assert_eq!( - Some(ChainPoint::new( - SlotNumber(25), - BlockNumber(20), - "block_hash-2" - )), + Some(RawCardanoPoint::new(SlotNumber(25), "block_hash-2")), start_point ); } #[tokio::test] - async fn start_at_last_scanned_chain_point_when_nothing_stored() { - let importer = importer_with_highest_stored_transaction_and_last_polled_chain_point( - None, - Some(ChainPoint::new( - SlotNumber(15), - BlockNumber(10), - "block_hash-1", - )), - ) + async fn start_at_last_scanned_point_when_nothing_stored() { + let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new( + SlotNumber(15), + "block_hash-1", + ))) .await; + let highest_stored_block_number = None; - let start_point = importer.start_chain_point().await.unwrap(); + let start_point = importer + .start_point(&highest_stored_block_number) + .await + .unwrap(); assert_eq!( - Some(ChainPoint::new( - SlotNumber(15), - BlockNumber(10), - "block_hash-1" - )), + Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")), start_point ); } #[tokio::test] - async fn start_at_last_scanned_chain_point_even_if_something_stored() { - let importer = importer_with_highest_stored_transaction_and_last_polled_chain_point( - Some(CardanoTransaction::new( - "tx_hash-2", - BlockNumber(20), - SlotNumber(25), - "block_hash-2", - )), - Some(ChainPoint::new( - SlotNumber(15), - BlockNumber(10), - "block_hash-1", - )), - ) + async fn start_at_last_scanned_point_even_if_something_stored() { + let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new( + SlotNumber(15), + "block_hash-1", + ))) .await; + let highest_stored_block_number = Some(ChainPoint::new( + SlotNumber(25), + BlockNumber(20), + hex::encode("block_hash-2"), + )); - let start_point = importer.start_chain_point().await.unwrap(); + let start_point = importer + .start_point(&highest_stored_block_number) + .await + .unwrap(); assert_eq!( - Some(ChainPoint::new( - SlotNumber(15), - BlockNumber(10), - "block_hash-1" - )), + Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")), start_point ); } @@ -935,7 +920,7 @@ mod tests { async fn importing_transactions_update_start_point_even_if_no_transactions_are_found() { let connection = cardano_tx_db_connection().unwrap(); let importer = CardanoTransactionsImporter { - latest_polled_chain_point: Arc::new(Mutex::new(None)), + last_polled_point: Arc::new(Mutex::new(None)), ..CardanoTransactionsImporter::new_for_test( Arc::new( DumbBlockScanner::new() @@ -945,9 +930,8 @@ mod tests { SlotNumber(15), Vec::<&str>::new(), )]]) - .latest_polled_chain_point(Some(ChainPoint::new( + .last_polled_point(Some(RawCardanoPoint::new( SlotNumber(25), - BlockNumber(20), "block_hash-2", ))), ), @@ -956,8 +940,12 @@ mod tests { ))), ) }; + let highest_stored_block_number = None; - let start_point_before_import = importer.start_chain_point().await.unwrap(); + let start_point_before_import = importer + .start_point(&highest_stored_block_number) + .await + .unwrap(); assert_eq!(None, start_point_before_import); importer @@ -965,13 +953,12 @@ mod tests { .await .unwrap(); - let start_point_after_import = importer.start_chain_point().await.unwrap(); + let start_point_after_import = importer + .start_point(&highest_stored_block_number) + .await + .unwrap(); assert_eq!( - Some(ChainPoint::new( - SlotNumber(25), - BlockNumber(20), - "block_hash-2" - )), + Some(RawCardanoPoint::new(SlotNumber(25), "block_hash-2")), start_point_after_import ); } diff --git a/mithril-signer/src/services/cardano_transactions/importer/service.rs b/mithril-signer/src/services/cardano_transactions/importer/service.rs index ccc2bd62705..72254e30c68 100644 --- a/mithril-signer/src/services/cardano_transactions/importer/service.rs +++ b/mithril-signer/src/services/cardano_transactions/importer/service.rs @@ -7,7 +7,7 @@ use async_trait::async_trait; use slog::{debug, Logger}; use tokio::{runtime::Handle, sync::Mutex, task}; -use mithril_common::cardano_block_scanner::{BlockScanner, ChainScannedBlocks}; +use mithril_common::cardano_block_scanner::{BlockScanner, ChainScannedBlocks, RawCardanoPoint}; use mithril_common::crypto_helper::{MKTree, MKTreeNode, MKTreeStoreInMemory}; use mithril_common::entities::{ BlockNumber, BlockRange, CardanoTransaction, ChainPoint, SlotNumber, @@ -56,7 +56,7 @@ pub trait TransactionStore: Send + Sync { pub struct CardanoTransactionsImporter { block_scanner: Arc, transaction_store: Arc, - latest_polled_chain_point: Arc>>, + last_polled_point: Arc>>, logger: Logger, } @@ -70,47 +70,52 @@ impl CardanoTransactionsImporter { Self { block_scanner, transaction_store, - latest_polled_chain_point: Arc::new(Mutex::new(None)), + last_polled_point: Arc::new(Mutex::new(None)), logger: logger.new_with_component_name::(), } } - async fn start_chain_point(&self) -> StdResult> { - let last_scanned_chain_point = self.latest_polled_chain_point.lock().await.clone(); - - if last_scanned_chain_point.is_none() { - self.transaction_store.get_highest_beacon().await - } else { - Ok(last_scanned_chain_point) - } + async fn start_point( + &self, + highest_stored_chain_point: &Option, + ) -> StdResult> { + let last_polled_point = self.last_polled_point.lock().await.clone(); + Ok(last_polled_point.or(highest_stored_chain_point + .as_ref() + .map(RawCardanoPoint::from))) } async fn import_transactions(&self, up_to_beacon: BlockNumber) -> StdResult<()> { - let from = self.start_chain_point().await?; - self.parse_and_store_transactions_not_imported_yet(from, up_to_beacon) - .await - } + let highest_stored_beacon = self.transaction_store.get_highest_beacon().await?; + let from = self.start_point(&highest_stored_beacon).await?; - async fn parse_and_store_transactions_not_imported_yet( - &self, - from: Option, - until: BlockNumber, - ) -> StdResult<()> { - if from.as_ref().is_some_and(|f| f.block_number >= until) { + if highest_stored_beacon + .as_ref() + .is_some_and(|f| f.block_number >= up_to_beacon) + { debug!( self.logger, - "No need to retrieve Cardano transactions, the database is up to date for block_number '{until}'", + "No need to retrieve Cardano transactions, the database is up to date for block_number '{up_to_beacon}'", + ); + + Ok(()) + } else { + debug!( + self.logger, "Retrieving Cardano transactions until block numbered '{up_to_beacon}'"; + "starting_slot_number" => ?from.as_ref().map(|c| c.slot_number), + "highest_stored_block_number" => ?highest_stored_beacon.as_ref().map(|c| c.block_number), ); - return Ok(()); + + self.parse_and_store_transactions_not_imported_yet(from, up_to_beacon) + .await } - debug!( - self.logger, - "Retrieving Cardano transactions between block_number '{}' and '{until}'", - from.as_ref() - .map(|c| c.block_number) - .unwrap_or(BlockNumber(0)) - ); + } + async fn parse_and_store_transactions_not_imported_yet( + &self, + from: Option, + until: BlockNumber, + ) -> StdResult<()> { let mut streamer = self.block_scanner.scan(from, until).await?; while let Some(blocks) = streamer.poll_next().await? { @@ -132,7 +137,8 @@ impl CardanoTransactionsImporter { } } } - *self.latest_polled_chain_point.lock().await = streamer.latest_polled_chain_point(); + + *self.last_polled_point.lock().await = streamer.last_polled_point(); Ok(()) } @@ -229,7 +235,7 @@ mod tests { impl BlockScanner for BlockScannerImpl { async fn scan( &self, - from: Option, + from: Option, until: BlockNumber, ) -> StdResult>; } @@ -446,8 +452,12 @@ mod tests { ), ]]); - let last_tx = - CardanoTransaction::new("tx-20", BlockNumber(30), SlotNumber(35), "block_hash-3"); + let last_tx = CardanoTransaction::new( + "tx-20", + BlockNumber(30), + SlotNumber(35), + hex::encode("block_hash-3"), + ); repository .store_transactions(vec![last_tx.clone()]) .await @@ -472,10 +482,13 @@ mod tests { SqliteConnectionPool::build_from_connection(connection), ))); - let highest_stored_chain_point = - ChainPoint::new(SlotNumber(134), BlockNumber(10), "block_hash-1"); + let highest_stored_chain_point = ChainPoint::new( + SlotNumber(134), + BlockNumber(10), + hex::encode("block_hash-1"), + ); let stored_block = ScannedBlock::new( - highest_stored_chain_point.block_hash.clone(), + hex::decode(highest_stored_chain_point.block_hash.clone()).unwrap(), highest_stored_chain_point.block_number, highest_stored_chain_point.slot_number, vec!["tx_hash-1", "tx_hash-2"], @@ -504,7 +517,7 @@ mod tests { scanner_mock .expect_scan() .withf(move |from, until| { - from == &Some(highest_stored_chain_point.clone()) + from == &Some(highest_stored_chain_point.clone().into()) && *until == up_to_block_number }) .return_once(move |_, _| { @@ -797,24 +810,16 @@ mod tests { mod transactions_import_start_point { use super::*; - async fn importer_with_highest_stored_transaction_and_last_polled_chain_point( - highest_stored_transaction: Option, - last_polled_chain_point: Option, + async fn importer_with_last_polled_point( + last_polled_point: Option, ) -> CardanoTransactionsImporter { let connection = cardano_tx_db_connection().unwrap(); let repository = Arc::new(CardanoTransactionRepository::new(Arc::new( SqliteConnectionPool::build_from_connection(connection), ))); - if let Some(transaction) = highest_stored_transaction { - repository - .store_transactions(vec![transaction]) - .await - .unwrap(); - } - CardanoTransactionsImporter { - latest_polled_chain_point: Arc::new(Mutex::new(last_polled_chain_point)), + last_polled_point: Arc::new(Mutex::new(last_polled_point)), ..CardanoTransactionsImporter::new_for_test( Arc::new(DumbBlockScanner::new()), repository, @@ -823,110 +828,90 @@ mod tests { } #[tokio::test] - async fn cloning_keep_last_polled_chain_point() { - let importer = importer_with_highest_stored_transaction_and_last_polled_chain_point( - None, - Some(ChainPoint::new( - SlotNumber(15), - BlockNumber(10), - "block_hash-1", - )), - ) + async fn cloning_keep_last_polled_point() { + let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new( + SlotNumber(15), + "block_hash-1", + ))) .await; let cloned_importer = importer.clone(); - let start_point = cloned_importer.start_chain_point().await.unwrap(); + let start_point = cloned_importer.start_point(&None).await.unwrap(); assert_eq!( - Some(ChainPoint::new( - SlotNumber(15), - BlockNumber(10), - "block_hash-1" - )), + Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")), start_point ); } #[tokio::test] async fn none_if_nothing_stored_nor_scanned() { - let importer = - importer_with_highest_stored_transaction_and_last_polled_chain_point(None, None) - .await; + let importer = importer_with_last_polled_point(None).await; + let highest_stored_block_number = None; - let start_point = importer.start_chain_point().await.unwrap(); + let start_point = importer + .start_point(&highest_stored_block_number) + .await + .unwrap(); assert_eq!(None, start_point); } #[tokio::test] - async fn start_at_last_stored_chain_point_if_nothing_scanned() { - let importer = importer_with_highest_stored_transaction_and_last_polled_chain_point( - Some(CardanoTransaction::new( - "tx_hash-2", - BlockNumber(20), - SlotNumber(25), - "block_hash-2", - )), - None, - ) - .await; + async fn start_at_last_stored_point_if_nothing_scanned() { + let importer = importer_with_last_polled_point(None).await; + let highest_stored_block_number = Some(ChainPoint::new( + SlotNumber(25), + BlockNumber(20), + hex::encode("block_hash-2"), + )); - let start_point = importer.start_chain_point().await.unwrap(); + let start_point = importer + .start_point(&highest_stored_block_number) + .await + .unwrap(); assert_eq!( - Some(ChainPoint::new( - SlotNumber(25), - BlockNumber(20), - "block_hash-2" - )), + Some(RawCardanoPoint::new(SlotNumber(25), "block_hash-2")), start_point ); } #[tokio::test] - async fn start_at_last_scanned_chain_point_when_nothing_stored() { - let importer = importer_with_highest_stored_transaction_and_last_polled_chain_point( - None, - Some(ChainPoint::new( - SlotNumber(15), - BlockNumber(10), - "block_hash-1", - )), - ) + async fn start_at_last_scanned_point_when_nothing_stored() { + let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new( + SlotNumber(15), + "block_hash-1", + ))) .await; + let highest_stored_block_number = None; - let start_point = importer.start_chain_point().await.unwrap(); + let start_point = importer + .start_point(&highest_stored_block_number) + .await + .unwrap(); assert_eq!( - Some(ChainPoint::new( - SlotNumber(15), - BlockNumber(10), - "block_hash-1" - )), + Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")), start_point ); } #[tokio::test] - async fn start_at_last_scanned_chain_point_even_if_something_stored() { - let importer = importer_with_highest_stored_transaction_and_last_polled_chain_point( - Some(CardanoTransaction::new( - "tx_hash-2", - BlockNumber(20), - SlotNumber(25), - "block_hash-2", - )), - Some(ChainPoint::new( - SlotNumber(15), - BlockNumber(10), - "block_hash-1", - )), - ) + async fn start_at_last_scanned_point_even_if_something_stored() { + let importer = importer_with_last_polled_point(Some(RawCardanoPoint::new( + SlotNumber(15), + "block_hash-1", + ))) .await; + let highest_stored_block_number = Some(ChainPoint::new( + SlotNumber(25), + BlockNumber(20), + hex::encode("block_hash-2"), + )); - let start_point = importer.start_chain_point().await.unwrap(); + let start_point = importer + .start_point(&highest_stored_block_number) + .await + .unwrap(); assert_eq!( - Some(ChainPoint::new( - SlotNumber(15), - BlockNumber(10), - "block_hash-1" - )), + Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")), start_point ); } @@ -935,7 +920,7 @@ mod tests { async fn importing_transactions_update_start_point_even_if_no_transactions_are_found() { let connection = cardano_tx_db_connection().unwrap(); let importer = CardanoTransactionsImporter { - latest_polled_chain_point: Arc::new(Mutex::new(None)), + last_polled_point: Arc::new(Mutex::new(None)), ..CardanoTransactionsImporter::new_for_test( Arc::new( DumbBlockScanner::new() @@ -945,9 +930,8 @@ mod tests { SlotNumber(15), Vec::<&str>::new(), )]]) - .latest_polled_chain_point(Some(ChainPoint::new( + .last_polled_point(Some(RawCardanoPoint::new( SlotNumber(25), - BlockNumber(20), "block_hash-2", ))), ), @@ -956,8 +940,12 @@ mod tests { ))), ) }; + let highest_stored_block_number = None; - let start_point_before_import = importer.start_chain_point().await.unwrap(); + let start_point_before_import = importer + .start_point(&highest_stored_block_number) + .await + .unwrap(); assert_eq!(None, start_point_before_import); importer @@ -965,13 +953,12 @@ mod tests { .await .unwrap(); - let start_point_after_import = importer.start_chain_point().await.unwrap(); + let start_point_after_import = importer + .start_point(&highest_stored_block_number) + .await + .unwrap(); assert_eq!( - Some(ChainPoint::new( - SlotNumber(25), - BlockNumber(20), - "block_hash-2" - )), + Some(RawCardanoPoint::new(SlotNumber(25), "block_hash-2")), start_point_after_import ); } From bd6d5efe1bf0467cfe770adfe91b38b500c275dc Mon Sep 17 00:00:00 2001 From: DJO <790521+Alenar@users.noreply.github.com> Date: Thu, 24 Oct 2024 17:50:13 +0200 Subject: [PATCH 5/8] fix: avoid `latest_polled_point` reset if streamer polled nothing --- .../services/cardano_transactions_importer.rs | 43 ++++++++++++++++++- .../cardano_transactions/importer/service.rs | 43 ++++++++++++++++++- 2 files changed, 84 insertions(+), 2 deletions(-) diff --git a/mithril-aggregator/src/services/cardano_transactions_importer.rs b/mithril-aggregator/src/services/cardano_transactions_importer.rs index 72254e30c68..58ade1d4b3e 100644 --- a/mithril-aggregator/src/services/cardano_transactions_importer.rs +++ b/mithril-aggregator/src/services/cardano_transactions_importer.rs @@ -80,6 +80,13 @@ impl CardanoTransactionsImporter { highest_stored_chain_point: &Option, ) -> StdResult> { let last_polled_point = self.last_polled_point.lock().await.clone(); + if last_polled_point.is_none() { + debug!( + self.logger, + "No last polled point available, falling back to the highest stored chain point" + ); + } + Ok(last_polled_point.or(highest_stored_chain_point .as_ref() .map(RawCardanoPoint::from))) @@ -138,7 +145,9 @@ impl CardanoTransactionsImporter { } } - *self.last_polled_point.lock().await = streamer.last_polled_point(); + if let Some(point) = streamer.last_polled_point() { + *self.last_polled_point.lock().await = Some(point); + } Ok(()) } @@ -962,6 +971,38 @@ mod tests { start_point_after_import ); } + + #[tokio::test] + async fn importing_transactions_dont_update_start_point_if_streamer_did_nothing() { + let connection = cardano_tx_db_connection().unwrap(); + let importer = CardanoTransactionsImporter { + last_polled_point: Arc::new(Mutex::new(Some(RawCardanoPoint::new( + SlotNumber(15), + "block_hash-1", + )))), + ..CardanoTransactionsImporter::new_for_test( + Arc::new(DumbBlockScanner::new()), + Arc::new(CardanoTransactionRepository::new(Arc::new( + SqliteConnectionPool::build_from_connection(connection), + ))), + ) + }; + let highest_stored_block_number = None; + + importer + .import_transactions(BlockNumber(1000)) + .await + .unwrap(); + + let start_point_after_import = importer + .start_point(&highest_stored_block_number) + .await + .unwrap(); + assert_eq!( + Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")), + start_point_after_import + ); + } } #[tokio::test] diff --git a/mithril-signer/src/services/cardano_transactions/importer/service.rs b/mithril-signer/src/services/cardano_transactions/importer/service.rs index 72254e30c68..58ade1d4b3e 100644 --- a/mithril-signer/src/services/cardano_transactions/importer/service.rs +++ b/mithril-signer/src/services/cardano_transactions/importer/service.rs @@ -80,6 +80,13 @@ impl CardanoTransactionsImporter { highest_stored_chain_point: &Option, ) -> StdResult> { let last_polled_point = self.last_polled_point.lock().await.clone(); + if last_polled_point.is_none() { + debug!( + self.logger, + "No last polled point available, falling back to the highest stored chain point" + ); + } + Ok(last_polled_point.or(highest_stored_chain_point .as_ref() .map(RawCardanoPoint::from))) @@ -138,7 +145,9 @@ impl CardanoTransactionsImporter { } } - *self.last_polled_point.lock().await = streamer.last_polled_point(); + if let Some(point) = streamer.last_polled_point() { + *self.last_polled_point.lock().await = Some(point); + } Ok(()) } @@ -962,6 +971,38 @@ mod tests { start_point_after_import ); } + + #[tokio::test] + async fn importing_transactions_dont_update_start_point_if_streamer_did_nothing() { + let connection = cardano_tx_db_connection().unwrap(); + let importer = CardanoTransactionsImporter { + last_polled_point: Arc::new(Mutex::new(Some(RawCardanoPoint::new( + SlotNumber(15), + "block_hash-1", + )))), + ..CardanoTransactionsImporter::new_for_test( + Arc::new(DumbBlockScanner::new()), + Arc::new(CardanoTransactionRepository::new(Arc::new( + SqliteConnectionPool::build_from_connection(connection), + ))), + ) + }; + let highest_stored_block_number = None; + + importer + .import_transactions(BlockNumber(1000)) + .await + .unwrap(); + + let start_point_after_import = importer + .start_point(&highest_stored_block_number) + .await + .unwrap(); + assert_eq!( + Some(RawCardanoPoint::new(SlotNumber(15), "block_hash-1")), + start_point_after_import + ); + } } #[tokio::test] From 7fe427af8704f399eb43acb00da18add7be6e0f1 Mon Sep 17 00:00:00 2001 From: DJO <790521+Alenar@users.noreply.github.com> Date: Thu, 24 Oct 2024 18:03:38 +0200 Subject: [PATCH 6/8] chore(common): remove now unused conversion from `ChainPoint` to pallas types --- .../src/entities/cardano_chain_point.rs | 68 ------------------- 1 file changed, 68 deletions(-) diff --git a/mithril-common/src/entities/cardano_chain_point.rs b/mithril-common/src/entities/cardano_chain_point.rs index c44c8634689..b1409dd06c8 100644 --- a/mithril-common/src/entities/cardano_chain_point.rs +++ b/mithril-common/src/entities/cardano_chain_point.rs @@ -5,10 +5,6 @@ use serde::{Deserialize, Serialize}; use crate::entities::{BlockNumber, SlotNumber}; -cfg_fs! { - use pallas_network::miniprotocols::{chainsync::Tip, Point}; -} - /// Hash of a Cardano Block pub type BlockHash = String; @@ -39,20 +35,6 @@ impl ChainPoint { } } - /// Create a new origin chain point - pub fn origin() -> ChainPoint { - ChainPoint { - slot_number: SlotNumber(0), - block_number: BlockNumber(0), - block_hash: String::new(), - } - } - - /// Check if origin chain point - pub fn is_origin(&self) -> bool { - self.slot_number == 0 && self.block_number == 0 && self.block_hash.is_empty() - } - cfg_test_tools! { /// Create a dummy ChainPoint pub fn dummy() -> Self { @@ -90,56 +72,6 @@ impl Ord for ChainPoint { } } -cfg_fs! { - impl From for Point { - fn from(chain_point: ChainPoint) -> Self { - match chain_point.is_origin() { - true => Self::Origin, - false => Self::Specific( - *chain_point.slot_number, - hex::decode(&chain_point.block_hash).unwrap(), // TODO: keep block_hash as a Vec - ), - } - } - } - - impl From for ChainPoint { - fn from(point: Point) -> Self { - match point { - Point::Specific(slot_number, block_hash) => Self { - slot_number: SlotNumber(slot_number), - block_number: BlockNumber(0), - block_hash: hex::encode(block_hash), - }, - Point::Origin => Self { - slot_number: SlotNumber(0), - block_number: BlockNumber(0), - block_hash: String::new(), - }, - } - } - } - - impl From for ChainPoint { - fn from(tip: Tip) -> Self { - let chain_point: Self = tip.0.into(); - Self { - slot_number: chain_point.slot_number, - block_number: BlockNumber(tip.1), - block_hash: chain_point.block_hash, - } - } - } - - impl From for Tip { - fn from(chain_point: ChainPoint) -> Self { - let block_number = chain_point.block_number; - let point: Point = chain_point.into(); - Tip(point, *block_number) - } - } -} - #[cfg(test)] mod tests { use std::cmp::Ordering; From 16e65ba4be3858f79959193a46ee11536223d665 Mon Sep 17 00:00:00 2001 From: DJO <790521+Alenar@users.noreply.github.com> Date: Fri, 25 Oct 2024 14:47:33 +0200 Subject: [PATCH 7/8] style(common): adjust naming and comments --- .../chain_reader_block_streamer.rs | 26 +++++++------------ .../raw_cardano_point.rs | 6 +---- .../cardano_block_scanner/scanned_block.rs | 6 ++--- mithril-common/src/chain_reader/entity.rs | 2 +- .../src/chain_reader/fake_chain_reader.rs | 22 ++++++++-------- .../src/chain_reader/pallas_chain_reader.rs | 9 ++++--- 6 files changed, 31 insertions(+), 40 deletions(-) diff --git a/mithril-common/src/cardano_block_scanner/chain_reader_block_streamer.rs b/mithril-common/src/cardano_block_scanner/chain_reader_block_streamer.rs index 67fbc2ef2db..e770d53f3f8 100644 --- a/mithril-common/src/cardano_block_scanner/chain_reader_block_streamer.rs +++ b/mithril-common/src/cardano_block_scanner/chain_reader_block_streamer.rs @@ -52,12 +52,10 @@ impl BlockStreamer for ChainReaderBlockStreamer { } } Some(BlockStreamerNextAction::ChainBlockNextAction( - ChainBlockNextAction::RollBackward { - point: rollback_chain_point, - }, + ChainBlockNextAction::RollBackward { rollback_point }, )) => { - self.last_polled_point = Some(rollback_chain_point.clone()); - let rollback_slot_number = rollback_chain_point.slot_number; + self.last_polled_point = Some(rollback_point.clone()); + let rollback_slot_number = rollback_point.slot_number; let index_rollback = roll_forwards .iter() .position(|block| block.slot_number == rollback_slot_number); @@ -146,10 +144,8 @@ impl ChainReaderBlockStreamer { ))) } } - Some(ChainBlockNextAction::RollBackward { - point: rollback_chain_point, - }) => { - let rollback_slot_number = rollback_chain_point.slot_number; + Some(ChainBlockNextAction::RollBackward { rollback_point }) => { + let rollback_slot_number = rollback_point.slot_number; trace!( self.logger, "Received a RollBackward({rollback_slot_number:?})" @@ -158,9 +154,7 @@ impl ChainReaderBlockStreamer { BlockStreamerNextAction::SkipToNextAction } else { BlockStreamerNextAction::ChainBlockNextAction( - ChainBlockNextAction::RollBackward { - point: rollback_chain_point, - }, + ChainBlockNextAction::RollBackward { rollback_point }, ) }; Ok(Some(block_streamer_next_action)) @@ -429,7 +423,7 @@ mod tests { async fn test_parse_expected_nothing_when_rollbackward_on_same_point() { let chain_reader = Arc::new(Mutex::new(FakeChainReader::new(vec![ ChainBlockNextAction::RollBackward { - point: RawCardanoPoint::new(SlotNumber(100), "hash-123"), + rollback_point: RawCardanoPoint::new(SlotNumber(100), "hash-123"), }, ]))); let mut block_streamer = ChainReaderBlockStreamer::try_new( @@ -452,7 +446,7 @@ mod tests { { let chain_reader = Arc::new(Mutex::new(FakeChainReader::new(vec![ ChainBlockNextAction::RollBackward { - point: RawCardanoPoint::new(SlotNumber(100), "hash-10"), + rollback_point: RawCardanoPoint::new(SlotNumber(100), "hash-10"), }, ]))); let mut block_streamer = ChainReaderBlockStreamer::try_new( @@ -513,7 +507,7 @@ mod tests { ), }, ChainBlockNextAction::RollBackward { - point: RawCardanoPoint::new(SlotNumber(9), "hash-9"), + rollback_point: RawCardanoPoint::new(SlotNumber(9), "hash-9"), }, ]))); let mut block_streamer = ChainReaderBlockStreamer::try_new( @@ -562,7 +556,7 @@ mod tests { ), }, ChainBlockNextAction::RollBackward { - point: RawCardanoPoint::new(SlotNumber(3), "hash-3"), + rollback_point: RawCardanoPoint::new(SlotNumber(3), "hash-3"), }, ]))); let mut block_streamer = ChainReaderBlockStreamer::try_new( diff --git a/mithril-common/src/cardano_block_scanner/raw_cardano_point.rs b/mithril-common/src/cardano_block_scanner/raw_cardano_point.rs index 9c587c26cd7..619e4a2421c 100644 --- a/mithril-common/src/cardano_block_scanner/raw_cardano_point.rs +++ b/mithril-common/src/cardano_block_scanner/raw_cardano_point.rs @@ -5,11 +5,7 @@ use std::fmt::{Debug, Formatter}; use crate::cardano_block_scanner::ScannedBlock; use crate::entities::{ChainPoint, SlotNumber}; -/// Point in the chain that can be intersected. -/// -/// Internally the point used in Cardano doesn't have a block number like our [ChainPoint] -/// does, so we need to use a different struct to represent it. Else converting from one to the other -/// would be lossy. +/// Point internal representation in the Cardano chain. #[derive(Clone, PartialEq)] pub struct RawCardanoPoint { /// The [slot number](https://docs.cardano.org/learn/cardano-node/#slotsandepochs) diff --git a/mithril-common/src/cardano_block_scanner/scanned_block.rs b/mithril-common/src/cardano_block_scanner/scanned_block.rs index 2f75973e7a5..17ded39a50c 100644 --- a/mithril-common/src/cardano_block_scanner/scanned_block.rs +++ b/mithril-common/src/cardano_block_scanner/scanned_block.rs @@ -18,11 +18,11 @@ pub struct ScannedBlock { impl ScannedBlock { /// Scanned block factory - pub fn new>, TxHash: Into>( - block_hash: BlkHash, + pub fn new>, T: Into>( + block_hash: B, block_number: BlockNumber, slot_number: SlotNumber, - transaction_hashes: Vec, + transaction_hashes: Vec, ) -> Self { Self { block_hash: block_hash.into(), diff --git a/mithril-common/src/chain_reader/entity.rs b/mithril-common/src/chain_reader/entity.rs index 778d0f9e26b..6409107437c 100644 --- a/mithril-common/src/chain_reader/entity.rs +++ b/mithril-common/src/chain_reader/entity.rs @@ -12,6 +12,6 @@ pub enum ChainBlockNextAction { /// RollBackward event (we are on an incorrect fork, we need to get back a point to roll forward again) RollBackward { /// The rollback point in the chain to read (as a new valid chain point to read from on the main chain, which has already been seen) - point: RawCardanoPoint, + rollback_point: RawCardanoPoint, }, } diff --git a/mithril-common/src/chain_reader/fake_chain_reader.rs b/mithril-common/src/chain_reader/fake_chain_reader.rs index c0a67257820..34bdb97a150 100644 --- a/mithril-common/src/chain_reader/fake_chain_reader.rs +++ b/mithril-common/src/chain_reader/fake_chain_reader.rs @@ -9,20 +9,20 @@ use super::{ChainBlockNextAction, ChainBlockReader}; /// [FakeChainReader] is a fake implementation of [ChainBlockReader] for testing purposes. pub struct FakeChainReader { - chain_point_next_actions: Mutex>, + chain_block_next_actions: Mutex>, } impl FakeChainReader { /// Creates a new [FakeChainReader] instance. - pub fn new(chain_point_next_actions: Vec) -> Self { + pub fn new(chain_block_next_actions: Vec) -> Self { Self { - chain_point_next_actions: Mutex::new(chain_point_next_actions.into()), + chain_block_next_actions: Mutex::new(chain_block_next_actions.into()), } } /// Total remaining next actions pub fn get_total_remaining_next_actions(&self) -> usize { - self.chain_point_next_actions.lock().unwrap().len() + self.chain_block_next_actions.lock().unwrap().len() } } @@ -33,7 +33,7 @@ impl ChainBlockReader for FakeChainReader { } async fn get_next_chain_block(&mut self) -> StdResult> { - Ok(self.chain_point_next_actions.lock().unwrap().pop_front()) + Ok(self.chain_block_next_actions.lock().unwrap().pop_front()) } } @@ -46,7 +46,7 @@ mod tests { #[tokio::test] async fn test_get_next_chain_block() { - let expected_chain_point_next_actions = vec![ + let expected_chain_block_next_actions = vec![ ChainBlockNextAction::RollForward { parsed_block: ScannedBlock::new( "hash-1", @@ -64,18 +64,18 @@ mod tests { ), }, ChainBlockNextAction::RollBackward { - point: RawCardanoPoint::new(SlotNumber(1), "point-hash-1".as_bytes()), + rollback_point: RawCardanoPoint::new(SlotNumber(1), "point-hash-1".as_bytes()), }, ]; - let mut chain_reader = FakeChainReader::new(expected_chain_point_next_actions.clone()); + let mut chain_reader = FakeChainReader::new(expected_chain_block_next_actions.clone()); - let mut chain_point_next_actions = vec![]; + let mut chain_block_next_actions = vec![]; while let Some(chain_block_next_action) = chain_reader.get_next_chain_block().await.unwrap() { - chain_point_next_actions.push(chain_block_next_action); + chain_block_next_actions.push(chain_block_next_action); } - assert_eq!(expected_chain_point_next_actions, chain_point_next_actions); + assert_eq!(expected_chain_block_next_actions, chain_block_next_actions); } } diff --git a/mithril-common/src/chain_reader/pallas_chain_reader.rs b/mithril-common/src/chain_reader/pallas_chain_reader.rs index 6d73fd5cb13..b8d32a5be53 100644 --- a/mithril-common/src/chain_reader/pallas_chain_reader.rs +++ b/mithril-common/src/chain_reader/pallas_chain_reader.rs @@ -88,8 +88,9 @@ impl PallasChainReader { Ok(Some(ChainBlockNextAction::RollForward { parsed_block })) } NextResponse::RollBackward(rollback_point, _) => { - let point = RawCardanoPoint::from(rollback_point); - Ok(Some(ChainBlockNextAction::RollBackward { point })) + Ok(Some(ChainBlockNextAction::RollBackward { + rollback_point: RawCardanoPoint::from(rollback_point), + })) } NextResponse::Await => Ok(None), } @@ -281,8 +282,8 @@ mod tests { let (_, client_res) = tokio::join!(server, client); let chain_block = client_res.expect("Client failed to get next chain block"); match chain_block { - ChainBlockNextAction::RollBackward { point: chain_point } => { - assert_eq!(chain_point, get_fake_raw_point_backwards()); + ChainBlockNextAction::RollBackward { rollback_point } => { + assert_eq!(rollback_point, get_fake_raw_point_backwards()); } _ => panic!("Unexpected chain block action"), } From ddc7211200ee577f1473451ce1f9332d3c8535d2 Mon Sep 17 00:00:00 2001 From: DJO <790521+Alenar@users.noreply.github.com> Date: Fri, 25 Oct 2024 14:55:01 +0200 Subject: [PATCH 8/8] chore: upgrade crate versions * mithril-aggregator from `0.5.90` to `0.5.91` * mithril-common from `0.4.74` to `0.4.75` * mithril-signer from `0.2.204` to `0.2.205` --- Cargo.lock | 6 +++--- mithril-aggregator/Cargo.toml | 2 +- mithril-common/Cargo.toml | 2 +- mithril-signer/Cargo.toml | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e4bfa2f74b0..8cf50c2fa73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3552,7 +3552,7 @@ dependencies = [ [[package]] name = "mithril-aggregator" -version = "0.5.90" +version = "0.5.91" dependencies = [ "anyhow", "async-trait", @@ -3709,7 +3709,7 @@ dependencies = [ [[package]] name = "mithril-common" -version = "0.4.74" +version = "0.4.75" dependencies = [ "anyhow", "async-trait", @@ -3867,7 +3867,7 @@ dependencies = [ [[package]] name = "mithril-signer" -version = "0.2.204" +version = "0.2.205" dependencies = [ "anyhow", "async-trait", diff --git a/mithril-aggregator/Cargo.toml b/mithril-aggregator/Cargo.toml index ac3d55e17b9..2c98db69f85 100644 --- a/mithril-aggregator/Cargo.toml +++ b/mithril-aggregator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-aggregator" -version = "0.5.90" +version = "0.5.91" description = "A Mithril Aggregator server" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-common/Cargo.toml b/mithril-common/Cargo.toml index 4140098b3b9..e1f615ce7ee 100644 --- a/mithril-common/Cargo.toml +++ b/mithril-common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-common" -version = "0.4.74" +version = "0.4.75" description = "Common types, interfaces, and utilities for Mithril nodes." authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-signer/Cargo.toml b/mithril-signer/Cargo.toml index 22ccd6a5d73..6913be66cfd 100644 --- a/mithril-signer/Cargo.toml +++ b/mithril-signer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-signer" -version = "0.2.204" +version = "0.2.205" description = "A Mithril Signer" authors = { workspace = true } edition = { workspace = true }