diff --git a/Cargo.lock b/Cargo.lock index bcf52877..e05fbe99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -545,12 +545,6 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" -[[package]] -name = "bech32" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf9ff0bbfd639f15c74af777d81383cf53efb7c93613f6cab67c6c11e05bbf8b" - [[package]] name = "bech32" version = "0.9.1" @@ -1603,9 +1597,9 @@ checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" [[package]] name = "minicbor" -version = "0.17.1" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5e575910763b21a0db7df5e142907fe944bff84d1dfc78e2ba92e7f3bdfd36b" +checksum = "2a20020e8e2d1881d8736f64011bb5ff99f1db9947ce3089706945c8915695cb" dependencies = [ "half", "minicbor-derive", @@ -1613,9 +1607,9 @@ dependencies = [ [[package]] name = "minicbor-derive" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0a86c5f04def8fb7735ae918bb589af82f985526f4c62e0249544b668b2f456" +checksum = "8608fb1c805b5b6b3d5ab7bd95c40c396df622b64d77b2d621a5eae1eed050ee" dependencies = [ "proc-macro2", "quote", @@ -1839,7 +1833,7 @@ dependencies = [ "aws-sdk-lambda", "aws-sdk-s3", "aws-sdk-sqs", - "bech32 0.9.1", + "bech32", "clap", "config", "crossterm", @@ -1869,9 +1863,9 @@ dependencies = [ [[package]] name = "pallas" -version = "0.13.4" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ddbf43a5d4e04c9e866209ab955d46bbb14dcfa2621041a0833bb390cef0bc9" +checksum = "3aa33418b6ad94a42dde318b035af9b3ea958ca43ca77e6e8a5e9f259f44a837" dependencies = [ "pallas-addresses", "pallas-codec", @@ -1884,12 +1878,12 @@ dependencies = [ [[package]] name = "pallas-addresses" -version = "0.13.4" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddafd49d99069f345345cbb4ff56202811ea7eefb6c25043eb3e5606f90da3a3" +checksum = "33e4dbcadac1a429795eb111483b697fc848776aeb645d16aa9586849e03bcd7" dependencies = [ "base58", - "bech32 0.8.1", + "bech32", "hex", "pallas-codec", "pallas-crypto", @@ -1898,45 +1892,48 @@ dependencies = [ [[package]] name = "pallas-codec" -version = "0.13.4" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3d0856b5314a93554610d78003107478504b11285fe5072fa518f4e89176de3" +checksum = "0121cf6f8a780c073437f98f0bf35014de8f2661aa04ea63469b9360670b1263" dependencies = [ + "hex", "minicbor", + "serde", ] [[package]] name = "pallas-crypto" -version = "0.13.4" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3f47853ffb9f52bb31a599839ab0ed04a784f1bed220a637de9d7a557b6dff0" +checksum = "ea13446a83190ea3d4fac6c4d177d001eda6a91edb9e01ffeb4570ac6d5dd929" dependencies = [ "cryptoxide", "hex", "pallas-codec", "rand_core", + "serde", "thiserror", ] [[package]] name = "pallas-miniprotocols" -version = "0.13.4" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ec04d0c08ee29bb59328346c8bd7b8b3f2680d564aab2f2238106c3a197a84a" +checksum = "02feb6060d0d421e17126291cb1e87e0131028827607096063939718336a909d" dependencies = [ "hex", "itertools", - "log 0.4.17", "pallas-codec", "pallas-multiplexer", "thiserror", + "tracing", ] [[package]] name = "pallas-multiplexer" -version = "0.13.4" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93615ee89b68998231c7e42720b2e4e8eaf298ea966935dcc1678980647fc11c" +checksum = "6c62837b151a664b7844769275877ffb6645e1440b1356888a1fca6e076c3c55" dependencies = [ "byteorder 1.4.3", "hex", @@ -1944,16 +1941,17 @@ dependencies = [ "pallas-codec", "rand", "thiserror", + "tracing", ] [[package]] name = "pallas-primitives" -version = "0.13.4" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "429eba99fe854081b920f559bbe742a0e989520d914213073b83c8474d122ca5" +checksum = "a2cf19eaf7d399719bf2e70313345e0458a76efc2c9401244527419849228a55" dependencies = [ "base58", - "bech32 0.9.1", + "bech32", "hex", "log 0.4.17", "pallas-codec", @@ -1964,9 +1962,9 @@ dependencies = [ [[package]] name = "pallas-traverse" -version = "0.13.4" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2746c5996fb98cc03eb7ebea1314f616edb65d972b4d9aa5f65131d62acbf9ee" +checksum = "f2315201959af3f9ecc8ac22a37ed31d0043aa33fe9115242f8e69d8814ded8e" dependencies = [ "hex", "pallas-addresses", @@ -2067,9 +2065,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" [[package]] name = "pin-utils" @@ -3056,9 +3054,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.30" +version = "0.1.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d8d93354fe2a8e50d5953f5ae2e47a3fc2ef03292e7ea46e3cc38f549525fb9" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if 1.0.0", "log 0.4.17", @@ -3069,9 +3067,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.20" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e65ce065b4b5c53e73bb28912318cb8c9e9ad3921f1d669eb0e68b4c8143a2b" +checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" dependencies = [ "proc-macro2", "quote", @@ -3080,11 +3078,11 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.22" +version = "0.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03cfcb51380632a72d3111cb8d3447a8d908e577d31beeac006f836383d29a23" +checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" dependencies = [ - "lazy_static", + "once_cell", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index c6809806..fa5998d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ authors = ["Santiago Carmuega "] [dependencies] -pallas = "0.13.4" +pallas = "0.17.0" # pallas = { git = "https://github.com/txpipe/pallas" } # pallas = { path = "../pallas/pallas" } hex = "0.4.3" diff --git a/src/mapper/babbage.rs b/src/mapper/babbage.rs index aa902b58..d34c6812 100644 --- a/src/mapper/babbage.rs +++ b/src/mapper/babbage.rs @@ -1,12 +1,12 @@ use pallas::codec::utils::KeepRaw; -use pallas::ledger::primitives::ToHash; use pallas::ledger::primitives::babbage::{ - AuxiliaryData, MintedBlock, NetworkId, PostAlonzoTransactionOutput, TransactionBody, - TransactionOutput, TransactionWitnessSet, + AuxiliaryData, MintedBlock, MintedWitnessSet, NetworkId, PostAlonzoTransactionOutput, + TransactionBody, TransactionOutput, }; use pallas::crypto::hash::Hash; +use pallas::ledger::traverse::OriginalHash; use crate::model::{BlockRecord, Era, TransactionRecord}; use crate::utils::time::TimeProvider; @@ -22,7 +22,7 @@ impl EventWriter { &self, body: &KeepRaw, aux_data: Option<&KeepRaw>, - witness_set: Option<&KeepRaw>, + witness_set: Option<&KeepRaw>, ) -> usize { body.raw_cbor().len() + aux_data.map(|ax| ax.raw_cbor().len()).unwrap_or(2) @@ -34,7 +34,7 @@ impl EventWriter { body: &KeepRaw, tx_hash: &str, aux_data: Option<&KeepRaw>, - witness_set: Option<&KeepRaw>, + witness_set: Option<&KeepRaw>, ) -> Result { let mut record = TransactionRecord { hash: tx_hash.to_owned(), @@ -85,9 +85,7 @@ impl EventWriter { record.collateral_output = body.collateral_return.as_ref().map(|output| match output { TransactionOutput::Legacy(x) => self.to_legacy_output_record(x).unwrap(), - TransactionOutput::PostAlonzo(x) => { - self.to_post_alonzo_output_record(x).unwrap() - } + TransactionOutput::PostAlonzo(x) => self.to_post_alonzo_output_record(x).unwrap(), }); record.metadata = match aux_data { @@ -185,7 +183,7 @@ impl EventWriter { let witness_set = block.transaction_witness_sets.get(idx); - let tx_hash = tx.to_hash().to_hex(); + let tx_hash = tx.original_hash().to_hex(); self.to_babbage_transaction_record(tx, &tx_hash, aux_data, witness_set) }) @@ -217,7 +215,10 @@ impl EventWriter { } } - fn crawl_babbage_witness_set(&self, witness_set: &TransactionWitnessSet) -> Result<(), Error> { + fn crawl_babbage_witness_set( + &self, + witness_set: &KeepRaw, + ) -> Result<(), Error> { if let Some(native) = &witness_set.native_script { for script in native.iter() { self.append_from(self.to_native_witness_record(script)?)?; @@ -250,7 +251,7 @@ impl EventWriter { tx: &KeepRaw, tx_hash: &str, aux_data: Option<&KeepRaw>, - witness_set: Option<&KeepRaw>, + witness_set: Option<&KeepRaw>, ) -> Result<(), Error> { let record = self.to_babbage_transaction_record(tx, tx_hash, aux_data, witness_set)?; @@ -331,7 +332,7 @@ impl EventWriter { let witness_set = block.transaction_witness_sets.get(idx); - let tx_hash = tx.to_hash().to_hex(); + let tx_hash = tx.original_hash().to_hex(); let child = self.child_writer(EventContext { tx_idx: Some(idx), @@ -359,10 +360,10 @@ impl EventWriter { block: &'b MintedBlock<'b>, cbor: &'b [u8], ) -> Result<(), Error> { - let hash = block.header.to_hash(); + let hash = block.header.original_hash(); let child = self.child_writer(EventContext { - block_hash: Some(hex::encode(&hash)), + block_hash: Some(hex::encode(hash)), block_number: Some(block.header.header_body.block_number), slot: Some(block.header.header_body.slot), timestamp: self.compute_timestamp(block.header.header_body.slot), diff --git a/src/mapper/byron.rs b/src/mapper/byron.rs index 6570c046..60c43936 100644 --- a/src/mapper/byron.rs +++ b/src/mapper/byron.rs @@ -6,7 +6,8 @@ use crate::model::{BlockRecord, Era, EventData, TransactionRecord, TxInputRecord use crate::{model::EventContext, Error}; use pallas::crypto::hash::Hash; -use pallas::ledger::primitives::{byron, ToHash}; +use pallas::ledger::primitives::byron; +use pallas::ledger::traverse::OriginalHash; impl EventWriter { fn to_byron_input_record(&self, source: &byron::TxIn) -> Option { @@ -40,8 +41,10 @@ impl EventWriter { } fn to_byron_output_record(&self, source: &byron::TxOut) -> Result { + let address = pallas::ledger::addresses::Address::from_bytes(&source.address.payload)?; + Ok(TxOutputRecord { - address: source.address.to_addr_string()?, + address: address.to_string(), amount: source.amount, assets: None, datum_hash: None, @@ -109,7 +112,7 @@ impl EventWriter { .tx_payload .iter() .map(|tx| { - let tx_hash = tx.transaction.to_hash().to_string(); + let tx_hash = tx.transaction.original_hash().to_hex(); self.to_byron_transaction_record(tx, &tx_hash) }) .collect() @@ -159,15 +162,20 @@ impl EventWriter { hash: &Hash<32>, cbor: &[u8], ) -> Result { + let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute( + source.header.consensus_data.0.epoch, + source.header.consensus_data.0.slot, + ); + let mut record = BlockRecord { era: Era::Byron, - body_size: cbor.len() as usize, + body_size: cbor.len(), issuer_vkey: source.header.consensus_data.1.to_hex(), vrf_vkey: Default::default(), tx_count: source.body.tx_payload.len(), hash: hash.to_hex(), number: source.header.consensus_data.2[0], - slot: source.header.consensus_data.0.to_abs_slot(), + slot: abs_slot, epoch: Some(source.header.consensus_data.0.epoch), epoch_slot: Some(source.header.consensus_data.0.slot), previous_hash: source.header.prev_block.to_hex(), @@ -196,7 +204,7 @@ impl EventWriter { self.append(EventData::Block(record.clone()))?; for (idx, tx) in block.body.tx_payload.iter().enumerate() { - let tx_hash = tx.transaction.to_hash().to_string(); + let tx_hash = tx.transaction.original_hash().to_hex(); let child = self.child_writer(EventContext { tx_idx: Some(idx), @@ -216,19 +224,24 @@ impl EventWriter { pub fn to_byron_epoch_boundary_record( &self, - source: &byron::EbBlock, + source: &byron::MintedEbBlock, hash: &Hash<32>, cbor: &[u8], ) -> Result { + let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute( + source.header.consensus_data.epoch_id, + 0, + ); + Ok(BlockRecord { era: Era::Byron, - body_size: cbor.len() as usize, + body_size: cbor.len(), hash: hash.to_hex(), issuer_vkey: Default::default(), vrf_vkey: Default::default(), tx_count: 0, number: source.header.consensus_data.difficulty[0], - slot: source.header.to_abs_slot(), + slot: abs_slot, epoch: Some(source.header.consensus_data.epoch_id), epoch_slot: Some(0), previous_hash: source.header.prev_block.to_hex(), @@ -242,7 +255,7 @@ impl EventWriter { fn crawl_byron_ebb_block( &self, - block: &byron::EbBlock, + block: &byron::MintedEbBlock, hash: &Hash<32>, cbor: &[u8], ) -> Result<(), Error> { @@ -267,11 +280,15 @@ impl EventWriter { block: &byron::MintedBlock, cbor: &[u8], ) -> Result<(), Error> { - let hash = block.header.to_hash(); - let abs_slot = block.header.consensus_data.0.to_abs_slot(); + let hash = block.header.original_hash(); + + let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute( + block.header.consensus_data.0.epoch, + block.header.consensus_data.0.slot, + ); let child = self.child_writer(EventContext { - block_hash: Some(hex::encode(&hash)), + block_hash: Some(hex::encode(hash)), block_number: Some(block.header.consensus_data.2[0]), slot: Some(abs_slot), timestamp: self.compute_timestamp(abs_slot), @@ -297,13 +314,21 @@ impl EventWriter { /// Entry-point to start crawling a blocks for events. Meant to be used when /// we already have a decoded block (for example, N2C). The raw CBOR is also /// passed through in case we need to attach it to outbound events. - pub fn crawl_ebb_with_cbor(&self, block: &byron::EbBlock, cbor: &[u8]) -> Result<(), Error> { + pub fn crawl_ebb_with_cbor( + &self, + block: &byron::MintedEbBlock, + cbor: &[u8], + ) -> Result<(), Error> { if self.config.include_byron_ebb { - let hash = block.header.to_hash(); - let abs_slot = block.header.to_abs_slot(); + let hash = block.header.original_hash(); + + let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute( + block.header.consensus_data.epoch_id, + 0, + ); let child = self.child_writer(EventContext { - block_hash: Some(hex::encode(&hash)), + block_hash: Some(hex::encode(hash)), block_number: Some(block.header.consensus_data.difficulty[0]), slot: Some(abs_slot), timestamp: self.compute_timestamp(abs_slot), @@ -321,7 +346,7 @@ impl EventWriter { /// Entry-point to start crawling a blocks for events. Meant to be used when /// we haven't decoded the CBOR yet (for example, N2N). pub fn crawl_from_ebb_cbor(&self, cbor: &[u8]) -> Result<(), Error> { - let (_, block): (u16, byron::EbBlock) = pallas::codec::minicbor::decode(cbor)?; + let (_, block): (u16, byron::MintedEbBlock) = pallas::codec::minicbor::decode(cbor)?; self.crawl_ebb_with_cbor(&block, cbor) } } diff --git a/src/mapper/collect.rs b/src/mapper/collect.rs index 784bae75..14f0d769 100644 --- a/src/mapper/collect.rs +++ b/src/mapper/collect.rs @@ -1,14 +1,17 @@ use pallas::{ - codec::utils::{KeyValuePairs, MaybeIndefArray}, - ledger::primitives::{ - alonzo::{ - AuxiliaryData, Coin, MintedBlock, Multiasset, NativeScript, PlutusData, PlutusScript, - Redeemer, RewardAccount, TransactionInput, VKeyWitness, Value, + codec::utils::{KeepRaw, KeyValuePairs, MaybeIndefArray}, + ledger::{ + primitives::{ + alonzo::{ + AuxiliaryData, Coin, MintedBlock, Multiasset, NativeScript, PlutusData, + PlutusScript, Redeemer, RewardAccount, TransactionInput, VKeyWitness, Value, + }, + babbage::{ + LegacyTransactionOutput, PlutusV2Script, PostAlonzoTransactionOutput, + TransactionOutput, + }, }, - babbage::{ - LegacyTransactionOutput, PlutusV2Script, PostAlonzoTransactionOutput, TransactionOutput, - }, - ToHash, + traverse::OriginalHash, }, }; @@ -71,7 +74,7 @@ impl EventWriter { .iter() .flat_map(|(policy, assets)| { assets.iter().map(|(asset, amount)| { - self.to_transaction_output_asset_record(policy, asset, amount.into()) + self.to_transaction_output_asset_record(policy, asset, *amount) }) }) .collect(), @@ -99,7 +102,7 @@ impl EventWriter { let hex = reward_account.to_hex(); hex.strip_prefix("e1").map(|x| x.to_string()).unwrap_or(hex) }, - coin: coin.into(), + coin: *coin, }) .collect() } @@ -125,7 +128,7 @@ impl EventWriter { pub fn collect_vkey_witness_records( &self, - witness_set: &Option>, + witness_set: &Option>, ) -> Result, Error> { match witness_set { Some(all) => all.iter().map(|i| self.to_vkey_witness_record(i)).collect(), @@ -135,7 +138,7 @@ impl EventWriter { pub fn collect_native_witness_records( &self, - witness_set: &Option>, + witness_set: &Option>, ) -> Result, Error> { match witness_set { Some(all) => all @@ -148,7 +151,7 @@ impl EventWriter { pub fn collect_plutus_v1_witness_records( &self, - witness_set: &Option>, + witness_set: &Option>, ) -> Result, Error> { match &witness_set { Some(all) => all @@ -174,7 +177,7 @@ impl EventWriter { pub fn collect_plutus_redeemer_records( &self, - witness_set: &Option>, + witness_set: &Option>, ) -> Result, Error> { match &witness_set { Some(all) => all @@ -187,7 +190,7 @@ impl EventWriter { pub fn collect_plutus_datum_records( &self, - witness_set: &Option>, + witness_set: &Option>>, ) -> Result, Error> { match &witness_set { Some(all) => all.iter().map(|i| self.to_plutus_datum_record(i)).collect(), @@ -212,7 +215,7 @@ impl EventWriter { let witness_set = block.transaction_witness_sets.get(idx); - let tx_hash = tx.to_hash().to_hex(); + let tx_hash = tx.original_hash().to_hex(); self.to_transaction_record(tx, &tx_hash, aux_data, witness_set) }) diff --git a/src/mapper/map.rs b/src/mapper/map.rs index 1db3698a..378ac2de 100644 --- a/src/mapper/map.rs +++ b/src/mapper/map.rs @@ -1,18 +1,16 @@ use std::collections::HashMap; -use pallas::{ - codec::{minicbor::bytes::ByteVec, utils::KeepRaw}, - crypto::hash::Hash, - ledger::primitives::babbage::DatumOption, -}; +use pallas::ledger::primitives::alonzo::MintedWitnessSet; +use pallas::ledger::traverse::ComputeHash; +use pallas::{codec::utils::KeepRaw, crypto::hash::Hash, ledger::primitives::babbage::DatumOption}; use pallas::ledger::primitives::{ alonzo::{ self as alonzo, AuxiliaryData, Certificate, InstantaneousRewardSource, InstantaneousRewardTarget, Metadatum, MetadatumLabel, MintedBlock, NetworkId, Relay, - TransactionBody, TransactionInput, TransactionWitnessSet, Value, + TransactionBody, TransactionInput, Value, }, - babbage, ToCanonicalJson, ToHash, + babbage, ToCanonicalJson, }; use pallas::network::miniprotocols::Point; @@ -101,8 +99,8 @@ fn metadatum_to_string_key(datum: &Metadatum) -> String { fn get_tx_output_coin_value(amount: &Value) -> u64 { match amount { - Value::Coin(x) => x.into(), - Value::Multiasset(x, _) => x.into(), + Value::Coin(x) => *x, + Value::Multiasset(x, _) => *x, } } @@ -144,7 +142,7 @@ impl EventWriter { value: &Metadatum, ) -> Result { let data = MetadataRecord { - label: u64::from(label).to_string(), + label: label.to_string(), content: match value { Metadatum::Int(x) => MetadatumRendition::IntScalar(i128::from(*x)), Metadatum::Bytes(x) => MetadatumRendition::BytesHex(hex::encode(x.as_slice())), @@ -201,8 +199,8 @@ impl EventWriter { pub fn to_transaction_output_asset_record( &self, - policy: &ByteVec, - asset: &ByteVec, + policy: &Hash<28>, + asset: &pallas::codec::utils::Bytes, amount: u64, ) -> OutputAssetRecord { OutputAssetRecord { @@ -213,7 +211,12 @@ impl EventWriter { } } - pub fn to_mint_record(&self, policy: &ByteVec, asset: &ByteVec, quantity: i64) -> MintRecord { + pub fn to_mint_record( + &self, + policy: &Hash<28>, + asset: &pallas::codec::utils::Bytes, + quantity: i64, + ) -> MintRecord { MintRecord { policy: policy.to_hex(), asset: asset.to_hex(), @@ -223,14 +226,14 @@ impl EventWriter { pub fn to_aux_native_script_event(&self, script: &alonzo::NativeScript) -> EventData { EventData::NativeScript { - policy_id: script.to_hash().to_hex(), + policy_id: script.compute_hash().to_hex(), script: script.to_json(), } } pub fn to_aux_plutus_script_event(&self, script: &alonzo::PlutusScript) -> EventData { EventData::PlutusScript { - hash: script.to_hash().to_hex(), + hash: script.compute_hash().to_hex(), data: script.0.to_hex(), } } @@ -258,7 +261,7 @@ impl EventWriter { datum: &alonzo::PlutusData, ) -> Result { Ok(PlutusDatumRecord { - datum_hash: datum.to_hash().to_hex(), + datum_hash: datum.compute_hash().to_hex(), plutus_data: datum.to_json(), }) } @@ -268,7 +271,7 @@ impl EventWriter { script: &alonzo::PlutusScript, ) -> Result { Ok(PlutusWitnessRecord { - script_hash: script.to_hash().to_hex(), + script_hash: script.compute_hash().to_hex(), script_hex: script.as_ref().to_hex(), }) } @@ -278,7 +281,7 @@ impl EventWriter { script: &babbage::PlutusV2Script, ) -> Result { Ok(PlutusWitnessRecord { - script_hash: script.to_hash().to_hex(), + script_hash: script.compute_hash().to_hex(), script_hex: script.as_ref().to_hex(), }) } @@ -288,7 +291,7 @@ impl EventWriter { script: &alonzo::NativeScript, ) -> Result { Ok(NativeWitnessRecord { - policy_id: script.to_hash().to_hex(), + policy_id: script.compute_hash().to_hex(), script_json: script.to_json(), }) } @@ -328,8 +331,8 @@ impl EventWriter { } => EventData::PoolRegistration { operator: operator.to_hex(), vrf_keyhash: vrf_keyhash.to_hex(), - pledge: pledge.into(), - cost: cost.into(), + pledge: *pledge, + cost: *cost, margin: (margin.numerator as f64 / margin.denominator as f64), reward_account: reward_account.to_hex(), pool_owners: pool_owners.iter().map(|p| p.to_hex()).collect(), @@ -353,7 +356,7 @@ impl EventWriter { _ => None, }, to_other_pot: match move_.target { - InstantaneousRewardTarget::OtherAccountingPot(x) => Some(x.into()), + InstantaneousRewardTarget::OtherAccountingPot(x) => Some(x), _ => None, }, } @@ -374,7 +377,7 @@ impl EventWriter { &self, body: &KeepRaw, aux_data: Option<&KeepRaw>, - witness_set: Option<&KeepRaw>, + witness_set: Option<&KeepRaw>, ) -> usize { body.raw_cbor().len() + aux_data.map(|ax| ax.raw_cbor().len()).unwrap_or(2) @@ -386,7 +389,7 @@ impl EventWriter { body: &KeepRaw, tx_hash: &str, aux_data: Option<&KeepRaw>, - witness_set: Option<&KeepRaw>, + witness_set: Option<&KeepRaw>, ) -> Result { let mut record = TransactionRecord { hash: tx_hash.to_owned(), @@ -513,7 +516,7 @@ impl EventWriter { }, Point::Specific(slot, hash) => EventData::RollBack { block_slot: *slot, - block_hash: hex::encode(&hash), + block_hash: hex::encode(hash), }, }; diff --git a/src/mapper/shelley.rs b/src/mapper/shelley.rs index 2bf380cd..84a42478 100644 --- a/src/mapper/shelley.rs +++ b/src/mapper/shelley.rs @@ -1,12 +1,12 @@ use pallas::codec::utils::KeepRaw; -use pallas::ledger::primitives::ToHash; use pallas::ledger::primitives::alonzo::{ - AuxiliaryData, Certificate, Metadata, MintedBlock, Multiasset, TransactionBody, - TransactionInput, TransactionOutput, TransactionWitnessSet, Value, + AuxiliaryData, Certificate, Metadata, MintedBlock, MintedWitnessSet, Multiasset, + TransactionBody, TransactionInput, TransactionOutput, Value, }; use pallas::crypto::hash::Hash; +use pallas::ledger::traverse::OriginalHash; use crate::{ model::{Era, EventContext, EventData}, @@ -21,7 +21,7 @@ impl EventWriter { let record = self.to_metadata_record(label, content)?; self.append_from(record)?; - match u64::from(label) { + match label { 721u64 => self.crawl_metadata_label_721(content)?, 61284u64 => self.crawl_metadata_label_61284(content)?, _ => (), @@ -75,11 +75,9 @@ impl EventWriter { if let Value::Multiasset(_, policies) = amount { for (policy, assets) in policies.iter() { for (asset, amount) in assets.iter() { - self.append_from(self.to_transaction_output_asset_record( - policy, - asset, - amount.into(), - ))?; + self.append_from( + self.to_transaction_output_asset_record(policy, asset, *amount), + )?; } } } @@ -132,7 +130,7 @@ impl EventWriter { pub(crate) fn crawl_witness_set( &self, - witness_set: &TransactionWitnessSet, + witness_set: &KeepRaw, ) -> Result<(), Error> { if let Some(native) = &witness_set.native_script { for script in native.iter() { @@ -166,7 +164,7 @@ impl EventWriter { tx: &KeepRaw, tx_hash: &str, aux_data: Option<&KeepRaw>, - witness_set: Option<&KeepRaw>, + witness_set: Option<&KeepRaw>, ) -> Result<(), Error> { let record = self.to_transaction_record(tx, tx_hash, aux_data, witness_set)?; @@ -248,7 +246,7 @@ impl EventWriter { let witness_set = block.transaction_witness_sets.get(idx); - let tx_hash = tx.to_hash().to_hex(); + let tx_hash = tx.original_hash().to_hex(); let child = self.child_writer(EventContext { tx_idx: Some(idx), @@ -268,10 +266,10 @@ impl EventWriter { #[deprecated(note = "use crawl_from_shelley_cbor instead")] pub fn crawl_with_cbor(&self, block: &MintedBlock, cbor: &[u8]) -> Result<(), Error> { - let hash = block.header.to_hash(); + let hash = block.header.original_hash(); let child = self.child_writer(EventContext { - block_hash: Some(hex::encode(&hash)), + block_hash: Some(hex::encode(hash)), block_number: Some(block.header.header_body.block_number), slot: Some(block.header.header_body.slot), timestamp: self.compute_timestamp(block.header.header_body.slot), @@ -283,10 +281,10 @@ impl EventWriter { #[deprecated(note = "use crawl_from_shelley_cbor instead")] pub fn crawl(&self, block: &MintedBlock) -> Result<(), Error> { - let hash = block.header.to_hash(); + let hash = block.header.original_hash(); let child = self.child_writer(EventContext { - block_hash: Some(hex::encode(&hash)), + block_hash: Some(hex::encode(hash)), block_number: Some(block.header.header_body.block_number), slot: Some(block.header.header_body.slot), timestamp: self.compute_timestamp(block.header.header_body.slot), @@ -307,10 +305,10 @@ impl EventWriter { cbor: &'b [u8], era: Era, ) -> Result<(), Error> { - let hash = block.header.to_hash(); + let hash = block.header.original_hash(); let child = self.child_writer(EventContext { - block_hash: Some(hex::encode(&hash)), + block_hash: Some(hex::encode(hash)), block_number: Some(block.header.header_body.block_number), slot: Some(block.header.header_body.slot), timestamp: self.compute_timestamp(block.header.header_body.slot), diff --git a/src/sources/common.rs b/src/sources/common.rs index fe951d4e..481ae6fa 100644 --- a/src/sources/common.rs +++ b/src/sources/common.rs @@ -1,16 +1,20 @@ use core::fmt; use std::{ops::Deref, str::FromStr, time::Duration}; -use pallas::network::{ - miniprotocols::{chainsync::TipFinder, run_agent, Point, MAINNET_MAGIC, TESTNET_MAGIC}, - multiplexer::{bearers::Bearer, StdChannelBuffer, StdPlexer}, +use pallas::{ + ledger::traverse::{probe, Era}, + network::{ + miniprotocols::{chainsync, Point, MAINNET_MAGIC, TESTNET_MAGIC}, + multiplexer::{bearers::Bearer, StdChannel, StdPlexer}, + }, }; use serde::{de::Visitor, Deserializer}; use serde::{Deserialize, Serialize}; use crate::{ - utils::{retry, ChainWellKnownInfo, Utils}, + mapper::EventWriter, + utils::{retry, SwallowResult, Utils}, Error, }; @@ -258,81 +262,109 @@ pub fn should_finalize( false } -pub(crate) fn find_end_of_chain( - channel: &mut StdChannelBuffer, - well_known: &ChainWellKnownInfo, -) -> Result { - let point = Point::Specific( - well_known.shelley_known_slot, - hex::decode(&well_known.shelley_known_hash)?, - ); - - let agent = TipFinder::initial(point); - let agent = run_agent(agent, channel)?; - log::info!("chain point query output: {:?}", agent.output); - - match agent.output { - Some(tip) => Ok(tip.0), - None => Err("failure acquiring end of chain".into()), - } -} - -pub(crate) fn define_start_point( - intersect: &Option, - since: &Option, +pub(crate) fn intersect_starting_point( + client: &mut chainsync::Client, + intersect_arg: &Option, + since_arg: &Option, utils: &Utils, - cs_channel: &mut StdChannelBuffer, -) -> Result>, Error> { +) -> Result, Error> +where + chainsync::Message: pallas::codec::Fragment, +{ let cursor = utils.get_cursor_if_any(); match cursor { Some(cursor) => { log::info!("found persisted cursor, will use as starting point"); - let points = vec![cursor.try_into()?]; + let desired = cursor.try_into()?; + let (point, _) = client.find_intersect(vec![desired])?; - Ok(Some(points)) + Ok(point) } - None => match intersect { + None => match intersect_arg { Some(IntersectArg::Fallbacks(x)) => { log::info!("found 'fallbacks' intersect argument, will use as starting point"); - let points: Result, _> = x.iter().map(|x| x.clone().try_into()).collect(); + let options: Result, _> = x.iter().map(|x| x.clone().try_into()).collect(); + + let (point, _) = client.find_intersect(options?)?; - Ok(Some(points?)) + Ok(point) } Some(IntersectArg::Origin) => { log::info!("found 'origin' intersect argument, will use as starting point"); - Ok(None) + let point = client.intersect_origin()?; + + Ok(Some(point)) } Some(IntersectArg::Point(x)) => { log::info!("found 'point' intersect argument, will use as starting point"); - let points = vec![x.clone().try_into()?]; + let options = vec![x.clone().try_into()?]; + + let (point, _) = client.find_intersect(options)?; - Ok(Some(points)) + Ok(point) } Some(IntersectArg::Tip) => { log::info!("found 'tip' intersect argument, will use as starting point"); - let tip = find_end_of_chain(cs_channel, &utils.well_known)?; - let points = vec![tip]; - Ok(Some(points)) + let point = client.intersect_tip()?; + + Ok(Some(point)) } - None => match since { + None => match since_arg { Some(x) => { log::info!("explicit 'since' argument, will use as starting point"); log::warn!("`since` value is deprecated, please use `intersect`"); - let points = vec![x.clone().try_into()?]; + let options = vec![x.clone().try_into()?]; - Ok(Some(points)) + let (point, _) = client.find_intersect(options)?; + + Ok(point) } None => { log::info!("no starting point specified, will use tip of chain"); - let tip = find_end_of_chain(cs_channel, &utils.well_known)?; - let points = vec![tip]; - Ok(Some(points)) + let point = client.intersect_tip()?; + + Ok(Some(point)) } }, }, } } + +pub fn unknown_block_to_events(writer: &EventWriter, body: &Vec) -> Result<(), Error> { + match probe::block_era(body) { + probe::Outcome::Matched(era) => match era { + Era::Byron => { + writer + .crawl_from_byron_cbor(body) + .ok_or_warn("error crawling byron block for events"); + } + Era::Allegra | Era::Alonzo | Era::Mary | Era::Shelley => { + writer + .crawl_from_shelley_cbor(body, era.into()) + .ok_or_warn("error crawling alonzo-compatible block for events"); + } + Era::Babbage => { + writer + .crawl_from_babbage_cbor(body) + .ok_or_warn("error crawling babbage block for events"); + } + x => { + return Err(format!("This version of Oura can't handle era: {}", x).into()); + } + }, + probe::Outcome::EpochBoundary => { + writer + .crawl_from_ebb_cbor(body) + .ok_or_warn("error crawling block for events"); + } + probe::Outcome::Inconclusive => { + log::error!("can't infer primitive block from cbor, inconclusive probing. CBOR hex for debugging: {}", hex::encode(body)); + } + } + + Ok(()) +} diff --git a/src/sources/n2c/blocks.rs b/src/sources/n2c/blocks.rs deleted file mode 100644 index bf8cab10..00000000 --- a/src/sources/n2c/blocks.rs +++ /dev/null @@ -1,85 +0,0 @@ -use pallas::{ - codec::minicbor::decode, - ledger::primitives::{alonzo, babbage, byron, ToHash}, - ledger::traverse::{probe, Era}, - network::miniprotocols::Point, -}; - -use crate::Error; - -pub(crate) struct CborHolder(Vec); - -impl<'b> CborHolder { - pub fn new(bytes: Vec) -> Self { - Self(bytes) - } - - pub fn parse(&'b self) -> Result, Error> { - let block = match probe::block_era(&self.0) { - probe::Outcome::Matched(era) => match era { - Era::Byron => { - let (_, block): (u16, byron::MintedBlock) = decode(&self.0)?; - MultiEraBlock::Byron(Box::new(block)) - } - Era::Shelley | Era::Allegra | Era::Mary | Era::Alonzo => { - let (_, block): (u16, alonzo::MintedBlock) = decode(&self.0)?; - MultiEraBlock::AlonzoCompatible(Box::new(block), era) - } - Era::Babbage => { - let (_, block): (u16, babbage::MintedBlock) = decode(&self.0)?; - MultiEraBlock::Babbage(Box::new(block)) - } - x => { - return Err(format!("This version of Oura can't handle era: {}", x).into()); - } - }, - probe::Outcome::EpochBoundary => { - let (_, block): (u16, byron::EbBlock) = decode(&self.0)?; - MultiEraBlock::EpochBoundary(Box::new(block)) - } - probe::Outcome::Inconclusive => { - log::error!("CBOR hex for debugging: {}", hex::encode(&self.0)); - return Err("can't infer primitive block from cbor, inconclusive probing".into()); - } - }; - - Ok(block) - } - - pub fn cbor(&'b self) -> &'b [u8] { - &self.0 - } -} - -#[derive(Debug)] -pub(crate) enum MultiEraBlock<'b> { - EpochBoundary(Box), - Byron(Box>), - AlonzoCompatible(Box>, Era), - Babbage(Box>), -} - -impl MultiEraBlock<'_> { - pub(crate) fn read_cursor(&self) -> Result { - match self { - MultiEraBlock::EpochBoundary(x) => { - let hash = x.header.to_hash(); - let slot = x.header.to_abs_slot(); - Ok(Point::Specific(slot, hash.to_vec())) - } - MultiEraBlock::Byron(x) => { - let hash = x.header.to_hash(); - let slot = x.header.consensus_data.0.to_abs_slot(); - Ok(Point::Specific(slot, hash.to_vec())) - } - MultiEraBlock::AlonzoCompatible(x, _) => { - let hash = x.header.to_hash(); - Ok(Point::Specific(x.header.header_body.slot, hash.to_vec())) - } - MultiEraBlock::Babbage(x) => { - let hash = x.header.to_hash(); - Ok(Point::Specific(x.header.header_body.slot, hash.to_vec())) - } - } - } -} diff --git a/src/sources/n2c/mod.rs b/src/sources/n2c/mod.rs index 840abc4f..0a447c1d 100644 --- a/src/sources/n2c/mod.rs +++ b/src/sources/n2c/mod.rs @@ -1,4 +1,3 @@ -mod blocks; mod run; mod setup; diff --git a/src/sources/n2c/run.rs b/src/sources/n2c/run.rs index 47b68f60..efa696ee 100644 --- a/src/sources/n2c/run.rs +++ b/src/sources/n2c/run.rs @@ -1,27 +1,28 @@ use std::{collections::HashMap, fmt::Debug, ops::Deref, sync::Arc, time::Duration}; -use pallas::network::{ - miniprotocols::{chainsync, handshake, run_agent, Point, MAINNET_MAGIC}, - multiplexer::StdChannelBuffer, +use pallas::{ + ledger::traverse::MultiEraBlock, + network::{ + miniprotocols::{chainsync, handshake, Point, MAINNET_MAGIC}, + multiplexer::StdChannel, + }, }; use crate::{ mapper::EventWriter, pipelining::StageSender, sources::{ - define_start_point, n2c::blocks::CborHolder, setup_multiplexer, should_finalize, + intersect_starting_point, setup_multiplexer, should_finalize, unknown_block_to_events, FinalizeConfig, }, utils::{retry, Utils}, Error, }; -use super::blocks::MultiEraBlock; - struct ChainObserver { chain_buffer: chainsync::RollbackBuffer, min_depth: usize, - blocks: HashMap, + blocks: HashMap>, event_writer: EventWriter, finalize_config: Option, block_count: u64, @@ -43,19 +44,23 @@ fn log_buffer_state(buffer: &chainsync::RollbackBuffer) { ); } -impl chainsync::Observer for ChainObserver { +enum Continuation { + Proceed, + DropOut, +} + +impl ChainObserver { fn on_roll_forward( &mut self, content: chainsync::BlockContent, tip: &chainsync::Tip, - ) -> Result> { + ) -> Result> { // parse the block and extract the point of the chain - let cbor = content.into(); - let block = CborHolder::new(cbor); - let point = block.parse()?.read_cursor()?; + let block = MultiEraBlock::decode(content.deref())?; + let point = Point::Specific(block.slot(), block.hash().to_vec()); // store the block for later retrieval - self.blocks.insert(point.clone(), block); + self.blocks.insert(point.clone(), content.into()); // track the new point in our memory buffer log::info!("rolling forward to point {:?}", point); @@ -72,26 +77,13 @@ impl chainsync::Observer for ChainObserver { .remove(&point) .expect("required block not found in memory"); - match block.parse()? { - MultiEraBlock::EpochBoundary(model) => self - .event_writer - .crawl_ebb_with_cbor(&model, block.cbor())?, - MultiEraBlock::Byron(model) => self - .event_writer - .crawl_byron_with_cbor(&model, block.cbor())?, - MultiEraBlock::AlonzoCompatible(model, era) => self - .event_writer - .crawl_shelley_with_cbor(&model, block.cbor(), era.into())?, - MultiEraBlock::Babbage(model) => self - .event_writer - .crawl_babbage_with_cbor(&model, block.cbor())?, - }; + unknown_block_to_events(&self.event_writer, &block)?; self.block_count += 1; // evaluate if we should finalize the thread according to config if should_finalize(&self.finalize_config, &point, self.block_count) { - return Ok(chainsync::Continuation::DropOut); + return Ok(Continuation::DropOut); } } @@ -100,17 +92,17 @@ impl chainsync::Observer for ChainObserver { // notify chain tip to the pipeline metrics self.event_writer.utils.track_chain_tip(tip.1); - Ok(chainsync::Continuation::Proceed) + Ok(Continuation::Proceed) } - fn on_rollback(&mut self, point: &Point) -> Result { + fn on_rollback(&mut self, point: &Point) -> Result<(), Error> { log::info!("rolling block to point {:?}", point); match self.chain_buffer.roll_back(point) { chainsync::RollbackEffect::Handled => { log::debug!("handled rollback within buffer {:?}", point); - // drain memory blocks afther the rollback slot + // drain memory blocks after the rollback slot self.blocks .retain(|x, _| x.slot_or_default() <= point.slot_or_default()); } @@ -126,18 +118,41 @@ impl chainsync::Observer for ChainObserver { log_buffer_state(&self.chain_buffer); - Ok(chainsync::Continuation::Proceed) + Ok(()) + } + + fn on_next_message( + &mut self, + msg: chainsync::NextResponse, + client: &mut chainsync::N2CClient, + ) -> Result { + match msg { + chainsync::NextResponse::RollForward(c, t) => match self.on_roll_forward(c, &t) { + Ok(x) => Ok(x), + Err(err) => Err(AttemptError::Other(err)), + }, + chainsync::NextResponse::RollBackward(x, _) => match self.on_rollback(&x) { + Ok(_) => Ok(Continuation::Proceed), + Err(err) => Err(AttemptError::Other(err)), + }, + chainsync::NextResponse::Await => { + let next = client + .recv_while_must_reply() + .map_err(|x| AttemptError::Recoverable(x.into()))?; + + self.on_next_message(next, client) + } + } } } fn observe_forever( - mut channel: StdChannelBuffer, + mut client: chainsync::N2CClient, event_writer: EventWriter, - known_points: Option>, min_depth: usize, finalize_config: Option, ) -> Result<(), AttemptError> { - let observer = ChainObserver { + let mut observer = ChainObserver { chain_buffer: Default::default(), blocks: HashMap::new(), min_depth, @@ -146,14 +161,15 @@ fn observe_forever( finalize_config, }; - let agent = chainsync::BlockConsumer::initial(known_points, observer); - - match run_agent(agent, &mut channel) { - Ok(agent) => { - log::debug!("chainsync agent final state: {:?}", agent.state); - Ok(()) + loop { + match client.request_next() { + Ok(next) => match observer.on_next_message(next, &mut client) { + Ok(Continuation::Proceed) => (), + Ok(Continuation::DropOut) => break Ok(()), + Err(err) => break Err(err), + }, + Err(err) => break Err(AttemptError::Recoverable(err.into())), } - Err(err) => Err(AttemptError::Recoverable(err.into())), } } @@ -163,12 +179,13 @@ enum AttemptError { Other(Error), } -fn do_handshake(channel: &mut StdChannelBuffer, magic: u64) -> Result<(), AttemptError> { +fn do_handshake(channel: StdChannel, magic: u64) -> Result<(), AttemptError> { + let mut client = handshake::N2CClient::new(channel); let versions = handshake::n2c::VersionTable::v1_and_above(magic); - match run_agent(handshake::Initiator::initial(versions), channel) { - Ok(agent) => match agent.output { - handshake::Output::Accepted(_, _) => Ok(()), + match client.handshake(versions) { + Ok(confirmation) => match confirmation { + handshake::Confirmation::Accepted(_, _) => Ok(()), _ => Err(AttemptError::Other( "couldn't agree on handshake version".into(), )), @@ -190,34 +207,36 @@ fn do_chainsync_attempt( let mut plexer = setup_multiplexer(&config.address.0, &config.address.1, &config.retry_policy) .map_err(|x| AttemptError::Recoverable(x))?; - let mut hs_channel = plexer.use_channel(0).into(); - let mut cs_channel = plexer.use_channel(5).into(); + let hs_channel = plexer.use_channel(0); + let cs_channel = plexer.use_channel(5); plexer.muxer.spawn(); plexer.demuxer.spawn(); - do_handshake(&mut hs_channel, magic)?; + do_handshake(hs_channel, magic)?; - let known_points = define_start_point( + let mut client = chainsync::N2CClient::new(cs_channel); + + let intersection = intersect_starting_point( + &mut client, &config.intersect, #[allow(deprecated)] &config.since, &utils, - &mut cs_channel, ) .map_err(|err| AttemptError::Recoverable(err))?; - log::info!("starting chain sync from: {:?}", &known_points); + if intersection.is_none() { + return Err(AttemptError::Other( + "Can't find chain intersection point".into(), + )); + } + + log::info!("starting chain sync from: {:?}", &intersection); let writer = EventWriter::new(output_tx.clone(), utils, config.mapper.clone()); - observe_forever( - cs_channel, - writer, - known_points, - config.min_depth, - config.finalize.clone(), - )?; + observe_forever(client, writer, config.min_depth, config.finalize.clone())?; Ok(()) } diff --git a/src/sources/n2n/headers.rs b/src/sources/n2n/headers.rs deleted file mode 100644 index 615a05b7..00000000 --- a/src/sources/n2n/headers.rs +++ /dev/null @@ -1,68 +0,0 @@ -use pallas::{ - codec::minicbor::decode, - ledger::primitives::{alonzo, babbage, byron, ToHash}, - network::miniprotocols::{chainsync::HeaderContent, Point}, -}; - -use crate::Error; - -#[derive(Debug)] -pub enum MultiEraHeader { - ByronBoundary(byron::EbbHead), - Byron(byron::BlockHead), - AlonzoCompatible(alonzo::Header), - Babbage(babbage::Header), -} - -impl TryFrom for MultiEraHeader { - type Error = Error; - - fn try_from(value: HeaderContent) -> Result { - match value.variant { - 0 => match value.byron_prefix { - Some((0, _)) => { - let header = decode(&value.cbor)?; - Ok(MultiEraHeader::ByronBoundary(header)) - } - _ => { - let header = decode(&value.cbor)?; - Ok(MultiEraHeader::Byron(header)) - } - }, - 1 | 2 | 3 | 4 => { - let header = decode(&value.cbor)?; - Ok(MultiEraHeader::AlonzoCompatible(header)) - } - 5 => { - let header = decode(&value.cbor)?; - Ok(MultiEraHeader::Babbage(header)) - } - x => Err(format!("This version of Oura can't handle era: {}", x).into()), - } - } -} - -impl MultiEraHeader { - pub fn read_cursor(&self) -> Result { - match self { - MultiEraHeader::ByronBoundary(x) => { - let hash = x.to_hash(); - let slot = x.to_abs_slot(); - Ok(Point::Specific(slot, hash.to_vec())) - } - MultiEraHeader::Byron(x) => { - let hash = x.to_hash(); - let slot = x.consensus_data.0.to_abs_slot(); - Ok(Point::Specific(slot, hash.to_vec())) - } - MultiEraHeader::AlonzoCompatible(x) => { - let hash = x.to_hash(); - Ok(Point::Specific(x.header_body.slot, hash.to_vec())) - } - MultiEraHeader::Babbage(x) => { - let hash = x.to_hash(); - Ok(Point::Specific(x.header_body.slot, hash.to_vec())) - } - } - } -} diff --git a/src/sources/n2n/mod.rs b/src/sources/n2n/mod.rs index fd10351d..0a447c1d 100644 --- a/src/sources/n2n/mod.rs +++ b/src/sources/n2n/mod.rs @@ -1,4 +1,3 @@ -mod headers; mod run; mod setup; diff --git a/src/sources/n2n/run.rs b/src/sources/n2n/run.rs index dc905e0d..a3956d8e 100644 --- a/src/sources/n2n/run.rs +++ b/src/sources/n2n/run.rs @@ -1,11 +1,8 @@ use std::{fmt::Debug, ops::Deref, sync::Arc, time::Duration}; -use pallas::{ - ledger::traverse::{probe, Era}, - network::{ - miniprotocols::{blockfetch, chainsync, handshake, run_agent, Point, MAINNET_MAGIC}, - multiplexer::StdChannelBuffer, - }, +use pallas::network::{ + miniprotocols::{blockfetch, chainsync, handshake, Point, MAINNET_MAGIC}, + multiplexer::StdChannel, }; use std::sync::mpsc::{Receiver, SyncSender}; @@ -13,60 +10,14 @@ use std::sync::mpsc::{Receiver, SyncSender}; use crate::{ mapper::EventWriter, pipelining::StageSender, - sources::{define_start_point, setup_multiplexer, should_finalize, FinalizeConfig}, - utils::{retry, SwallowResult, Utils}, + sources::{ + intersect_starting_point, setup_multiplexer, should_finalize, unknown_block_to_events, + FinalizeConfig, + }, + utils::{retry, Utils}, Error, }; -use super::headers::MultiEraHeader; - -struct Block2EventMapper(EventWriter); - -impl Debug for Block2EventMapper { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_tuple("Block2EventMapper").finish() - } -} - -impl blockfetch::Observer for Block2EventMapper { - fn on_block_received(&mut self, body: Vec) -> Result<(), Error> { - let Self(writer) = self; - - match probe::block_era(&body) { - probe::Outcome::Matched(era) => match era { - Era::Byron => { - writer - .crawl_from_byron_cbor(&body) - .ok_or_warn("error crawling byron block for events"); - } - Era::Allegra | Era::Alonzo | Era::Mary | Era::Shelley => { - writer - .crawl_from_shelley_cbor(&body, era.into()) - .ok_or_warn("error crawling alonzo-compatible block for events"); - } - Era::Babbage => { - writer - .crawl_from_babbage_cbor(&body) - .ok_or_warn("error crawling babbage block for events"); - } - x => { - return Err(format!("This version of Oura can't handle era: {}", x).into()); - } - }, - probe::Outcome::EpochBoundary => { - writer - .crawl_from_ebb_cbor(&body) - .ok_or_warn("error crawling block for events"); - } - probe::Outcome::Inconclusive => { - log::error!("can't infer primitive block from cbor, inconclusive probing. CBOR hex for debugging: {}", hex::encode(body)); - } - } - - Ok(()) - } -} - struct ChainObserver { min_depth: usize, chain_buffer: chainsync::RollbackBuffer, @@ -92,15 +43,26 @@ fn log_buffer_state(buffer: &chainsync::RollbackBuffer) { ); } -impl chainsync::Observer for &mut ChainObserver { +enum Continuation { + Proceed, + DropOut, +} + +impl ChainObserver { fn on_roll_forward( &mut self, content: chainsync::HeaderContent, tip: &chainsync::Tip, - ) -> Result { + ) -> Result { // parse the header and extract the point of the chain - let header = MultiEraHeader::try_from(content)?; - let point = header.read_cursor()?; + + let header = pallas::ledger::traverse::MultiEraHeader::decode( + content.variant, + content.byron_prefix.map(|x| x.0), + &content.cbor, + )?; + + let point = Point::Specific(header.slot(), header.hash().to_vec()); // track the new point in our memory buffer log::info!("rolling forward to point {:?}", point); @@ -118,7 +80,7 @@ impl chainsync::Observer for &mut ChainObserver { // evaluate if we should finalize the thread according to config if should_finalize(&self.finalize_config, &point, self.block_count) { - return Ok(chainsync::Continuation::DropOut); + return Ok(Continuation::DropOut); } } @@ -127,10 +89,10 @@ impl chainsync::Observer for &mut ChainObserver { // notify chain tip to the pipeline metrics self.event_writer.utils.track_chain_tip(tip.1); - Ok(chainsync::Continuation::Proceed) + Ok(Continuation::Proceed) } - fn on_rollback(&mut self, point: &Point) -> Result { + fn on_rollback(&mut self, point: &Point) -> Result<(), Error> { log::info!("rolling block to point {:?}", point); match self.chain_buffer.roll_back(point) { @@ -145,27 +107,53 @@ impl chainsync::Observer for &mut ChainObserver { log_buffer_state(&self.chain_buffer); - Ok(chainsync::Continuation::Proceed) + Ok(()) + } + + fn on_next_message( + &mut self, + msg: chainsync::NextResponse, + client: &mut chainsync::N2NClient, + ) -> Result { + match msg { + chainsync::NextResponse::RollForward(c, t) => match self.on_roll_forward(c, &t) { + Ok(x) => Ok(x), + Err(err) => Err(AttemptError::Other(err)), + }, + chainsync::NextResponse::RollBackward(x, _) => match self.on_rollback(&x) { + Ok(_) => Ok(Continuation::Proceed), + Err(err) => Err(AttemptError::Other(err)), + }, + chainsync::NextResponse::Await => { + let next = client + .recv_while_must_reply() + .map_err(|x| AttemptError::Recoverable(x.into()))?; + + self.on_next_message(next, client) + } + } } } pub(crate) fn fetch_blocks_forever( - mut channel: StdChannelBuffer, + mut client: blockfetch::Client, event_writer: EventWriter, input: Receiver, ) -> Result<(), Error> { - let observer = Block2EventMapper(event_writer); - let agent = blockfetch::OnDemandClient::initial(input.iter(), observer); - let agent = run_agent(agent, &mut channel)?; - log::debug!("blockfetch agent final state: {:?}", agent.state); + for point in input { + let body = client.fetch_single(point.clone())?; + + unknown_block_to_events(&event_writer, &body)?; + + log::debug!("blockfetch succeeded: {:?}", point); + } Ok(()) } fn observe_headers_forever( - mut channel: StdChannelBuffer, + mut client: chainsync::N2NClient, event_writer: EventWriter, - known_points: Option>, block_requests: SyncSender, min_depth: usize, finalize_config: Option, @@ -179,14 +167,15 @@ fn observe_headers_forever( finalize_config, }; - let agent = chainsync::HeaderConsumer::initial(known_points, observer); - - match run_agent(agent, &mut channel) { - Ok(agent) => { - log::debug!("chainsync agent final state: {:?}", agent.state); - Ok(()) + loop { + match client.request_next() { + Ok(next) => match observer.on_next_message(next, &mut client) { + Ok(Continuation::Proceed) => (), + Ok(Continuation::DropOut) => break Ok(()), + Err(err) => break Err(err), + }, + Err(err) => break Err(AttemptError::Recoverable(err.into())), } - Err(err) => Err(AttemptError::Recoverable(err.into())), } } @@ -196,12 +185,13 @@ enum AttemptError { Other(Error), } -fn do_handshake(channel: &mut StdChannelBuffer, magic: u64) -> Result<(), AttemptError> { - let versions = handshake::n2n::VersionTable::v6_and_above(magic); +fn do_handshake(channel: StdChannel, magic: u64) -> Result<(), AttemptError> { + let mut client = handshake::N2NClient::new(channel); + let versions = handshake::n2n::VersionTable::v4_and_above(magic); - match run_agent(handshake::Initiator::initial(versions), channel) { - Ok(agent) => match agent.output { - handshake::Output::Accepted(_, _) => Ok(()), + match client.handshake(versions) { + Ok(confirmation) => match confirmation { + handshake::Confirmation::Accepted(_, _) => Ok(()), _ => Err(AttemptError::Other( "couldn't agree on handshake version".into(), )), @@ -223,42 +213,50 @@ fn do_chainsync_attempt( let mut plexer = setup_multiplexer(&config.address.0, &config.address.1, &config.retry_policy) .map_err(|x| AttemptError::Recoverable(x))?; - let mut hs_channel = plexer.use_channel(0).into(); - let mut cs_channel = plexer.use_channel(2).into(); - let bf_channel = plexer.use_channel(3).into(); + let hs_channel = plexer.use_channel(0); + let cs_channel = plexer.use_channel(2); + let bf_channel = plexer.use_channel(3); plexer.muxer.spawn(); plexer.demuxer.spawn(); - do_handshake(&mut hs_channel, magic)?; + do_handshake(hs_channel, magic)?; + + let mut cs_client = chainsync::N2NClient::new(cs_channel); - let known_points = define_start_point( + let intersection = intersect_starting_point( + &mut cs_client, &config.intersect, #[allow(deprecated)] &config.since, &utils, - &mut cs_channel, ) .map_err(|err| AttemptError::Recoverable(err))?; - log::info!("starting chain sync from: {:?}", &known_points); + if intersection.is_none() { + return Err(AttemptError::Other( + "Can't find chain intersection point".into(), + )); + } + + log::info!("starting chain sync from: {:?}", &intersection); + let bf_client = blockfetch::Client::new(bf_channel); let writer = EventWriter::new(output_tx.clone(), utils, config.mapper.clone()); let (headers_tx, headers_rx) = std::sync::mpsc::sync_channel(100); let bf_writer = writer.clone(); std::thread::spawn(move || { - fetch_blocks_forever(bf_channel, bf_writer, headers_rx).expect("blockfetch loop failed"); + fetch_blocks_forever(bf_client, bf_writer, headers_rx).expect("blockfetch loop failed"); log::info!("block fetch thread ended"); }); // this will block observe_headers_forever( - cs_channel, + cs_client, writer, - known_points, headers_tx, config.min_depth, config.finalize.clone(),