diff --git a/Cargo.lock b/Cargo.lock index 7271b30e..4b3ac4b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -114,9 +114,9 @@ checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" [[package]] name = "asn1-rs" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ad1373757efa0f70ec53939aabc7152e1591cb485208052993070ac8d2429d" +checksum = "5493c3bedbacf7fd7382c6346bbd66687d12bbaad3a89a2d2c303ee6cf20b048" dependencies = [ "asn1-rs-derive", "asn1-rs-impl", @@ -130,9 +130,9 @@ dependencies = [ [[package]] name = "asn1-rs-derive" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7378575ff571966e99a744addeff0bff98b8ada0dedf1956d59e634db95eaac1" +checksum = "965c2d33e53cb6b267e148a4cb0760bc01f4904c1cd4bb4002a085bb016d1490" dependencies = [ "proc-macro2", "quote", @@ -428,9 +428,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.20.0" +version = "0.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e89b6941c2d1a7045538884d6e760ccfffdf8e1ffc2613d8efa74305e1f3752" +checksum = "0f0e249228c6ad2d240c2dc94b714d711629d52bad946075d8e9b2f5391f0703" dependencies = [ "bindgen", "cc", @@ -913,9 +913,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.6.1" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" +checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" [[package]] name = "bytes-utils" @@ -1328,9 +1328,9 @@ checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" [[package]] name = "dunce" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56ce8c6da7551ec6c462cbaf3bfbc75131ebbfa1c944aeaa9dab51ca1c5f0c3b" +checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" [[package]] name = "dyn-clone" @@ -1471,9 +1471,9 @@ checksum = "b3ea1ec5f8307826a5b71094dd91fc04d4ae75d5709b20ad351c7fb4815c86ec" [[package]] name = "flate2" -version = "1.0.30" +version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f54427cfd1c7829e2a139fcefea601bf088ebca651d2bf53ebc600eac295dae" +checksum = "7f211bbe8e69bbd0cfdea405084f128ae8b4aaa6b0b522fc8f2b009084797920" dependencies = [ "crc32fast", "miniz_oxide", @@ -1713,9 +1713,9 @@ dependencies = [ [[package]] name = "google-cloud-pubsub" -version = "0.28.0" +version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "834f472d7973f9ce14139416507dd08bb6b1d17d43e524deaa8f7f56626899eb" +checksum = "55ef73601dcec5ea144e59969e921d35d66000211603fee8023b7947af09248f" dependencies = [ "async-channel 1.9.0", "async-stream", @@ -1751,7 +1751,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap 2.2.6", + "indexmap 2.3.0", "slab", "tokio", "tokio-util", @@ -1770,7 +1770,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.1.0", - "indexmap 2.2.6", + "indexmap 2.3.0", "slab", "tokio", "tokio-util", @@ -2112,9 +2112,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.6" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" +checksum = "de3fc2e30ba82dd1b3911c8de1ffc143c74a914a14e99514d7637e3099df5ea0" dependencies = [ "equivalent", "hashbrown 0.14.5", @@ -2396,6 +2396,16 @@ dependencies = [ "minicbor-derive", ] +[[package]] +name = "minicbor" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d15f4203d71fdf90903c2696e55426ac97a363c67b218488a73b534ce7aca10" +dependencies = [ + "half", + "minicbor-derive", +] + [[package]] name = "minicbor-derive" version = "0.13.0" @@ -2605,7 +2615,7 @@ checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" [[package]] name = "oura" -version = "1.9.2" +version = "1.10.0" dependencies = [ "aws-config", "aws-sdk-lambda", @@ -2628,7 +2638,13 @@ dependencies = [ "merge", "murmur3", "openssl", - "pallas", + "pallas-addresses", + "pallas-codec 0.29.0", + "pallas-crypto", + "pallas-miniprotocols", + "pallas-multiplexer", + "pallas-primitives", + "pallas-traverse", "prometheus_exporter", "redis", "reqwest 0.12.5", @@ -2636,6 +2652,7 @@ dependencies = [ "serde_json", "strum", "strum_macros", + "time", "tokio", "unicode-truncate", "webpki 0.22.4", @@ -2663,31 +2680,18 @@ dependencies = [ "x509-parser", ] -[[package]] -name = "pallas" -version = "0.18.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94ef55b690eac7ddf43a3e7ce10d4594866c34279c424aa2ce26d757789246da" -dependencies = [ - "pallas-addresses", - "pallas-codec", - "pallas-crypto", - "pallas-miniprotocols", - "pallas-multiplexer", - "pallas-primitives", - "pallas-traverse", -] - [[package]] name = "pallas-addresses" -version = "0.18.2" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8db28c4050dea032d497555bc68c269ae8e691486d8ec83f02b090487da0d0be" +checksum = "d628ad58404ddd733e8fe46fe9986489b46258a2ab1bb7b1c4b8e406b91b7cff" dependencies = [ "base58", "bech32", + "crc", + "cryptoxide", "hex", - "pallas-codec", + "pallas-codec 0.29.0", "pallas-crypto", "thiserror", ] @@ -2699,19 +2703,31 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b6e03d05d42a663526d78c8b1d4f2554f09bbf4cc846e1a9e839c558bf6103c" dependencies = [ "hex", - "minicbor", + "minicbor 0.19.1", "serde", ] +[[package]] +name = "pallas-codec" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da003a7360fa032b80d38b4a15573f885f412f2b3868772d49fb072197a9d5f9" +dependencies = [ + "hex", + "minicbor 0.20.0", + "serde", + "thiserror", +] + [[package]] name = "pallas-crypto" -version = "0.18.2" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a35fc93b3613c0a628d0820f8d5d9a52709d795b59a1754a337aee0fca289dd" +checksum = "c9248ed0e594bcb0f548393264519c7adea88874d8bd7cc86f894e8ba4e918c2" dependencies = [ "cryptoxide", "hex", - "pallas-codec", + "pallas-codec 0.29.0", "rand_core", "serde", "thiserror", @@ -2725,7 +2741,7 @@ checksum = "a8a4754676d92ae351ad524d98bc32d70835856ee0623a45288bb50a5ee4b161" dependencies = [ "hex", "itertools 0.10.5", - "pallas-codec", + "pallas-codec 0.18.2", "pallas-multiplexer", "thiserror", "tracing", @@ -2740,7 +2756,7 @@ dependencies = [ "byteorder", "hex", "log", - "pallas-codec", + "pallas-codec 0.18.2", "rand", "thiserror", "tracing", @@ -2748,15 +2764,15 @@ dependencies = [ [[package]] name = "pallas-primitives" -version = "0.18.2" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc5fdf328f41971e0b1457e2377abeb09143fa50ab79f1a6a6ab5740bc94dc4b" +checksum = "c0fa55305212f7828651c8db024e1e286198c2fccb028bbb697c68990c044959" dependencies = [ "base58", "bech32", "hex", "log", - "pallas-codec", + "pallas-codec 0.29.0", "pallas-crypto", "serde", "serde_json", @@ -2764,15 +2780,18 @@ dependencies = [ [[package]] name = "pallas-traverse" -version = "0.18.2" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c58c353ecb175a63422386c80301493db9fc448407bc63322534522579e22879" +checksum = "49459bd0d2ba86fd909890a81e6238eaf051952d7e38ad63195301e72e8f458e" dependencies = [ "hex", + "itertools 0.13.0", "pallas-addresses", - "pallas-codec", + "pallas-codec 0.29.0", "pallas-crypto", "pallas-primitives", + "paste", + "serde", "thiserror", ] @@ -2982,9 +3001,12 @@ checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" [[package]] name = "ppv-lite86" -version = "0.2.17" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +dependencies = [ + "zerocopy", +] [[package]] name = "prettyplease" @@ -3184,9 +3206,9 @@ checksum = "f4ed1d73fb92eba9b841ba2aef69533a060ccc0d3ec71c90aeda5996d4afb7a9" [[package]] name = "regex" -version = "1.10.5" +version = "1.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b91213439dad192326a0d7c6ee3955910425f441d7038e0d6933b0aec5c4517f" +checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619" dependencies = [ "aho-corasick", "memchr", @@ -3281,7 +3303,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls-pemfile 2.1.2", + "rustls-pemfile 2.1.3", "serde", "serde_json", "serde_urlencoded", @@ -3454,7 +3476,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba" dependencies = [ "openssl-probe", - "rustls-pemfile 2.1.2", + "rustls-pemfile 2.1.3", "rustls-pki-types", "schannel", "security-framework", @@ -3471,9 +3493,9 @@ dependencies = [ [[package]] name = "rustls-pemfile" -version = "2.1.2" +version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" +checksum = "196fe16b00e106300d3e45ecfcb764fa292a535d7326a29a5875c579c7417425" dependencies = [ "base64 0.22.1", "rustls-pki-types", @@ -3620,9 +3642,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.121" +version = "1.0.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ab380d7d9f22ef3f21ad3e6c1ebe8e4fc7a2000ccba2e4d71fc96f15b2cb609" +checksum = "784b6203951c57ff748476b126ccb5e8e2959a5c19e5c617ab1956be3dbc68da" dependencies = [ "itoa", "memchr", @@ -3918,17 +3940,18 @@ dependencies = [ "cfg-if", "p12-keystore", "rustls-connector", - "rustls-pemfile 2.1.2", + "rustls-pemfile 2.1.3", ] [[package]] name = "tempfile" -version = "3.10.1" +version = "3.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" +checksum = "b8fcd239983515c23a32fb82099f97d0b11b8c72f654ed659363a95c3dad7a53" dependencies = [ "cfg-if", "fastrand 2.1.0", + "once_cell", "rustix 0.38.34", "windows-sys 0.52.0", ] @@ -4153,7 +4176,7 @@ dependencies = [ "percent-encoding", "pin-project", "prost", - "rustls-pemfile 2.1.2", + "rustls-pemfile 2.1.3", "socket2 0.5.7", "tokio", "tokio-rustls 0.26.0", @@ -4499,11 +4522,11 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" [[package]] name = "winapi-util" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d4cc384e1e73b93bafa6fb4f1df8c41695c8a91cf9c4c64358067d15a7b6c6b" +checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -4539,6 +4562,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-targets" version = "0.48.5" @@ -4723,6 +4755,27 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "byteorder", + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.72", +] + [[package]] name = "zeroize" version = "1.8.1" diff --git a/Cargo.toml b/Cargo.toml index a1da0669..949196e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "oura" description = "The tail of Cardano" -version = "1.9.2" +version = "1.10.0" edition = "2021" repository = "https://github.com/txpipe/oura" homepage = "https://github.com/txpipe/oura" @@ -12,7 +12,13 @@ authors = ["Santiago Carmuega "] [dependencies] -pallas = "0.18.2" +pallas-multiplexer = "0.18.2" +pallas-miniprotocols = "0.18.2" +pallas-primitives = "0.29.0" +pallas-traverse = "0.29.0" +pallas-addresses = "0.29.0" +pallas-codec = "0.29.0" +pallas-crypto = "0.29.0" # pallas = { git = "https://github.com/txpipe/pallas" } # pallas = { path = "../pallas/pallas" } hex = "0.4.3" @@ -30,6 +36,7 @@ strum = "0.26.3" strum_macros = "0.26.4" prometheus_exporter = { version = "0.8.5", default-features = false } unicode-truncate = "1.1.0" +time = "0.3.36" # feature logs file-rotate = { version = "0.7.1", optional = true } @@ -80,5 +87,5 @@ elasticsink = ["elasticsearch", "tokio"] fingerprint = ["murmur3"] aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"] redissink = ["redis", "tokio"] -gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "tokio", "web" ,"google-cloud-gax"] +gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "tokio", "web", "google-cloud-gax"] rabbitmqsink = ["lapin", "tokio"] diff --git a/src/mapper/babbage.rs b/src/mapper/babbage.rs index fda60d46..994e3ad1 100644 --- a/src/mapper/babbage.rs +++ b/src/mapper/babbage.rs @@ -1,12 +1,12 @@ -use pallas::codec::utils::KeepRaw; +use pallas_codec::utils::KeepRaw; -use pallas::ledger::primitives::babbage::{ +use pallas_primitives::babbage::{ AuxiliaryData, MintedBlock, MintedDatumOption, MintedPostAlonzoTransactionOutput, MintedTransactionBody, MintedTransactionOutput, MintedWitnessSet, NetworkId, }; -use pallas::crypto::hash::Hash; -use pallas::ledger::traverse::OriginalHash; +use pallas_crypto::hash::Hash; +use pallas_traverse::OriginalHash; use crate::model::{BlockRecord, Era, TransactionRecord}; use crate::utils::time::TimeProvider; @@ -199,7 +199,7 @@ impl EventWriter { let record = self.to_post_alonzo_output_record(output)?; self.append(record.into())?; - let address = pallas::ledger::addresses::Address::from_bytes(&output.address)?; + let address = pallas_addresses::Address::from_bytes(&output.address)?; let child = &self.child_writer(EventContext { output_address: address.to_string().into(), @@ -389,7 +389,7 @@ impl EventWriter { /// Entry-point to start crawling a blocks for events. Meant to be used when /// we haven't decoded the CBOR yet (for example, N2N). pub fn crawl_from_babbage_cbor(&self, cbor: &[u8]) -> Result<(), Error> { - let (_, block): (u16, MintedBlock) = pallas::codec::minicbor::decode(cbor)?; + let (_, block): (u16, MintedBlock) = pallas_codec::minicbor::decode(cbor)?; self.crawl_babbage_with_cbor(&block, cbor) } } diff --git a/src/mapper/byron.rs b/src/mapper/byron.rs index 2ed223d5..d0f58ac5 100644 --- a/src/mapper/byron.rs +++ b/src/mapper/byron.rs @@ -3,11 +3,12 @@ use std::ops::Deref; use super::map::ToHex; use super::EventWriter; use crate::model::{BlockRecord, Era, EventData, TransactionRecord, TxInputRecord, TxOutputRecord}; +use crate::utils::time::TimeProvider; use crate::{model::EventContext, Error}; -use pallas::crypto::hash::Hash; -use pallas::ledger::primitives::byron; -use pallas::ledger::traverse::OriginalHash; +use pallas_crypto::hash::Hash; +use pallas_primitives::byron; +use pallas_traverse::OriginalHash; impl EventWriter { fn to_byron_input_record(&self, source: &byron::TxIn) -> Option { @@ -41,12 +42,9 @@ impl EventWriter { } fn to_byron_output_record(&self, source: &byron::TxOut) -> Result { - let address: pallas::ledger::addresses::Address = - pallas::ledger::addresses::ByronAddress::new( - &source.address.payload.0, - source.address.crc, - ) - .into(); + let address: pallas_addresses::Address = + pallas_addresses::ByronAddress::new(&source.address.payload.0, source.address.crc) + .into(); Ok(TxOutputRecord { address: address.to_string(), @@ -168,10 +166,12 @@ impl EventWriter { hash: &Hash<32>, cbor: &[u8], ) -> Result { - let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute( - source.header.consensus_data.0.epoch, - source.header.consensus_data.0.slot, - ); + let abs_slot = self.utils.time.as_ref().map(|time| { + time.byron_epoch_slot_to_absolute( + source.header.consensus_data.0.epoch, + source.header.consensus_data.0.slot, + ) + }); let mut record = BlockRecord { era: Era::Byron, @@ -181,7 +181,7 @@ impl EventWriter { tx_count: source.body.tx_payload.len(), hash: hash.to_hex(), number: source.header.consensus_data.2[0], - slot: abs_slot, + slot: abs_slot.unwrap_or_default(), epoch: Some(source.header.consensus_data.0.epoch), epoch_slot: Some(source.header.consensus_data.0.slot), previous_hash: source.header.prev_block.to_hex(), @@ -234,10 +234,9 @@ impl EventWriter { hash: &Hash<32>, cbor: &[u8], ) -> Result { - let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute( - source.header.consensus_data.epoch_id, - 0, - ); + let abs_slot = self.utils.time.as_ref().map(|time| { + time.byron_epoch_slot_to_absolute(source.header.consensus_data.epoch_id, 0) + }); Ok(BlockRecord { era: Era::Byron, @@ -247,7 +246,7 @@ impl EventWriter { vrf_vkey: Default::default(), tx_count: 0, number: source.header.consensus_data.difficulty[0], - slot: abs_slot, + slot: abs_slot.unwrap_or_default(), epoch: Some(source.header.consensus_data.epoch_id), epoch_slot: Some(0), previous_hash: source.header.prev_block.to_hex(), @@ -288,16 +287,18 @@ impl EventWriter { ) -> Result<(), Error> { let hash = block.header.original_hash(); - let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute( - block.header.consensus_data.0.epoch, - block.header.consensus_data.0.slot, - ); + let abs_slot = self.utils.time.as_ref().map(|time| { + time.byron_epoch_slot_to_absolute( + block.header.consensus_data.0.epoch, + block.header.consensus_data.0.slot, + ) + }); let child = self.child_writer(EventContext { block_hash: Some(hex::encode(hash)), block_number: Some(block.header.consensus_data.2[0]), - slot: Some(abs_slot), - timestamp: self.compute_timestamp(abs_slot), + slot: abs_slot, + timestamp: abs_slot.and_then(|slot| self.compute_timestamp(slot)), ..EventContext::default() }); @@ -311,7 +312,7 @@ impl EventWriter { /// Entry-point to start crawling a blocks for events. Meant to be used when /// we haven't decoded the CBOR yet (for example, N2N). pub fn crawl_from_byron_cbor(&self, cbor: &[u8]) -> Result<(), Error> { - let (_, block): (u16, byron::MintedBlock) = pallas::codec::minicbor::decode(cbor)?; + let (_, block): (u16, byron::MintedBlock) = pallas_codec::minicbor::decode(cbor)?; self.crawl_byron_with_cbor(&block, cbor) } @@ -328,16 +329,15 @@ impl EventWriter { if self.config.include_byron_ebb { let hash = block.header.original_hash(); - let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute( - block.header.consensus_data.epoch_id, - 0, - ); + let abs_slot = self.utils.time.as_ref().map(|time| { + time.byron_epoch_slot_to_absolute(block.header.consensus_data.epoch_id, 0) + }); let child = self.child_writer(EventContext { block_hash: Some(hex::encode(hash)), block_number: Some(block.header.consensus_data.difficulty[0]), - slot: Some(abs_slot), - timestamp: self.compute_timestamp(abs_slot), + slot: abs_slot, + timestamp: abs_slot.and_then(|slot| self.compute_timestamp(slot)), ..EventContext::default() }); @@ -352,7 +352,7 @@ impl EventWriter { /// Entry-point to start crawling a blocks for events. Meant to be used when /// we haven't decoded the CBOR yet (for example, N2N). pub fn crawl_from_ebb_cbor(&self, cbor: &[u8]) -> Result<(), Error> { - let (_, block): (u16, byron::MintedEbBlock) = pallas::codec::minicbor::decode(cbor)?; + let (_, block): (u16, byron::MintedEbBlock) = pallas_codec::minicbor::decode(cbor)?; self.crawl_ebb_with_cbor(&block, cbor) } } diff --git a/src/mapper/cip15.rs b/src/mapper/cip15.rs index 1d269206..84a548d6 100644 --- a/src/mapper/cip15.rs +++ b/src/mapper/cip15.rs @@ -3,7 +3,7 @@ use crate::model::CIP15AssetRecord; use crate::Error; use serde_json::Value as JsonValue; -use pallas::ledger::primitives::alonzo::Metadatum; +use pallas_primitives::alonzo::Metadatum; fn extract_json_property<'a>( json: &'a JsonValue, diff --git a/src/mapper/cip25.rs b/src/mapper/cip25.rs index 9def5b1a..813b7ef7 100644 --- a/src/mapper/cip25.rs +++ b/src/mapper/cip25.rs @@ -1,6 +1,6 @@ use serde_json::Value as JsonValue; -use pallas::ledger::primitives::alonzo::Metadatum; +use pallas_primitives::alonzo::Metadatum; use crate::{model::CIP25AssetRecord, Error}; diff --git a/src/mapper/collect.rs b/src/mapper/collect.rs index 515e11b5..1de39df9 100644 --- a/src/mapper/collect.rs +++ b/src/mapper/collect.rs @@ -1,19 +1,15 @@ -use pallas::{ - codec::utils::{KeepRaw, KeyValuePairs, MaybeIndefArray}, - ledger::{ - primitives::{ - alonzo::{ - AuxiliaryData, Coin, MintedBlock, Multiasset, NativeScript, PlutusData, - PlutusScript, Redeemer, RewardAccount, TransactionInput, VKeyWitness, Value, - }, - babbage::{ - LegacyTransactionOutput, MintedPostAlonzoTransactionOutput, - MintedTransactionOutput, PlutusV2Script, - }, - }, - traverse::OriginalHash, +use pallas_codec::utils::{KeepRaw, KeyValuePairs, MaybeIndefArray}; +use pallas_primitives::{ + alonzo::{ + AuxiliaryData, Coin, MintedBlock, Multiasset, NativeScript, PlutusData, PlutusScript, + Redeemer, RewardAccount, TransactionInput, VKeyWitness, Value, + }, + babbage::{ + LegacyTransactionOutput, MintedPostAlonzoTransactionOutput, MintedTransactionOutput, + PlutusV2Script, }, }; +use pallas_traverse::OriginalHash; use crate::{ model::{ @@ -138,7 +134,7 @@ impl EventWriter { pub fn collect_native_witness_records( &self, - witness_set: &Option>, + witness_set: &Option>>, ) -> Result, Error> { match witness_set { Some(all) => all diff --git a/src/mapper/conway.rs b/src/mapper/conway.rs new file mode 100644 index 00000000..8b07fc9e --- /dev/null +++ b/src/mapper/conway.rs @@ -0,0 +1,588 @@ +use pallas_codec::utils::{KeepRaw, NonZeroInt}; + +use pallas_primitives::conway::{ + AuxiliaryData, Certificate, MintedBlock, MintedDatumOption, MintedPostAlonzoTransactionOutput, + MintedTransactionBody, MintedTransactionOutput, MintedWitnessSet, Multiasset, NetworkId, + RedeemerTag, RedeemersKey, RedeemersValue, +}; + +use pallas_crypto::hash::Hash; +use pallas_primitives::ToCanonicalJson as _; +use pallas_traverse::OriginalHash; + +use crate::model::{ + BlockRecord, Era, MintRecord, PlutusRedeemerRecord, TransactionRecord, TxOutputRecord, +}; +use crate::utils::time::TimeProvider; +use crate::{ + model::{EventContext, EventData}, + Error, +}; + +use super::{map::ToHex, EventWriter}; + +impl EventWriter { + pub fn collect_conway_mint_records(&self, mint: &Multiasset) -> Vec { + mint.iter() + .flat_map(|(policy, assets)| { + assets + .iter() + .map(|(asset, amount)| self.to_mint_record(policy, asset, amount.into())) + }) + .collect() + } + + pub fn crawl_conway_mints(&self, mints: &Multiasset) -> Result<(), Error> { + for (policy, assets) in mints.iter() { + for (asset, quantity) in assets.iter() { + self.append_from(self.to_mint_record(policy, asset, quantity.into()))?; + } + } + + Ok(()) + } + + pub fn to_conway_output_record( + &self, + output: &MintedPostAlonzoTransactionOutput, + ) -> Result { + let address = pallas_addresses::Address::from_bytes(&output.address)?; + + Ok(TxOutputRecord { + address: address.to_string(), + amount: super::map::get_tx_output_coin_value(&output.value), + assets: self.collect_asset_records(&output.value).into(), + datum_hash: match &output.datum_option { + Some(MintedDatumOption::Hash(x)) => Some(x.to_string()), + Some(MintedDatumOption::Data(x)) => Some(x.original_hash().to_hex()), + None => None, + }, + inline_datum: match &output.datum_option { + Some(MintedDatumOption::Data(x)) => Some(self.to_plutus_datum_record(x)?), + _ => None, + }, + }) + } + + pub fn to_conway_redeemer_record( + &self, + key: &RedeemersKey, + value: &RedeemersValue, + ) -> Result { + Ok(PlutusRedeemerRecord { + purpose: match key.tag { + RedeemerTag::Spend => "spend".to_string(), + RedeemerTag::Mint => "mint".to_string(), + RedeemerTag::Cert => "cert".to_string(), + RedeemerTag::Reward => "reward".to_string(), + RedeemerTag::Vote => "vote".to_string(), + RedeemerTag::Propose => "propose".to_string(), + }, + ex_units_mem: value.ex_units.mem, + ex_units_steps: value.ex_units.steps, + input_idx: key.index, + plutus_data: value.data.to_json(), + }) + } + + pub fn collect_conway_output_records( + &self, + source: &[MintedTransactionOutput], + ) -> Result, Error> { + source + .iter() + .map(|x| match x { + MintedTransactionOutput::Legacy(x) => self.to_legacy_output_record(x), + MintedTransactionOutput::PostAlonzo(x) => self.to_conway_output_record(x), + }) + .collect() + } + + pub fn to_conway_tx_size( + &self, + body: &KeepRaw, + aux_data: Option<&KeepRaw>, + witness_set: Option<&KeepRaw>, + ) -> usize { + body.raw_cbor().len() + + aux_data.map(|ax| ax.raw_cbor().len()).unwrap_or(2) + + witness_set.map(|ws| ws.raw_cbor().len()).unwrap_or(1) + } + + pub fn to_conway_transaction_record( + &self, + body: &KeepRaw, + tx_hash: &str, + aux_data: Option<&KeepRaw>, + witness_set: Option<&KeepRaw>, + ) -> Result { + let mut record = TransactionRecord { + hash: tx_hash.to_owned(), + size: self.to_conway_tx_size(body, aux_data, witness_set) as u32, + fee: body.fee, + ttl: body.ttl, + validity_interval_start: body.validity_interval_start, + network_id: body.network_id.as_ref().map(|x| match x { + NetworkId::One => 1, + NetworkId::Two => 2, + }), + ..Default::default() + }; + + let outputs = self.collect_conway_output_records(body.outputs.as_slice())?; + record.output_count = outputs.len(); + record.total_output = outputs.iter().map(|o| o.amount).sum(); + + let inputs = self.collect_input_records(&body.inputs); + record.input_count = inputs.len(); + + if let Some(mint) = &body.mint { + let mints = self.collect_conway_mint_records(mint); + record.mint_count = mints.len(); + + if self.config.include_transaction_details { + record.mint = mints.into(); + } + } + + // Add Collateral Stuff + let collateral_inputs = &body.collateral.as_deref(); + record.collateral_input_count = collateral_inputs.iter().count(); + record.has_collateral_output = body.collateral_return.is_some(); + + // TODO + // TransactionBodyComponent::ScriptDataHash(_) + // TransactionBodyComponent::RequiredSigners(_) + // TransactionBodyComponent::AuxiliaryDataHash(_) + + if self.config.include_transaction_details { + record.outputs = outputs.into(); + record.inputs = inputs.into(); + + // transaction_details collateral stuff + record.collateral_inputs = + collateral_inputs.map(|inputs| self.collect_input_records(inputs)); + + record.collateral_output = body.collateral_return.as_ref().map(|output| match output { + MintedTransactionOutput::Legacy(x) => self.to_legacy_output_record(x).unwrap(), + MintedTransactionOutput::PostAlonzo(x) => self.to_conway_output_record(x).unwrap(), + }); + + record.metadata = match aux_data { + Some(aux_data) => self.collect_metadata_records(aux_data)?.into(), + None => None, + }; + + if let Some(witnesses) = witness_set { + record.vkey_witnesses = Some( + witnesses + .vkeywitness + .iter() + .flatten() + .map(|i| self.to_vkey_witness_record(i)) + .collect::>()?, + ); + + record.native_witnesses = Some( + witnesses + .native_script + .iter() + .flatten() + .map(|i| self.to_native_witness_record(i)) + .collect::>()?, + ); + + let mut all_plutus = vec![]; + + let plutus_v1: Vec<_> = witnesses + .plutus_v1_script + .iter() + .flatten() + .map(|i| self.to_plutus_v1_witness_record(i)) + .collect::>()?; + + all_plutus.extend(plutus_v1); + + let plutus_v2: Vec<_> = witnesses + .plutus_v2_script + .iter() + .flatten() + .map(|i| self.to_plutus_v2_witness_record(i)) + .collect::>()?; + + all_plutus.extend(plutus_v2); + + let plutus_v3: Vec<_> = witnesses + .plutus_v3_script + .iter() + .flatten() + .map(|i| self.to_plutus_v3_witness_record(i)) + .collect::>()?; + + all_plutus.extend(plutus_v3); + + record.plutus_witnesses = Some(all_plutus); + + record.plutus_redeemers = Some( + witnesses + .redeemer + .iter() + .flat_map(|i| i.iter()) + .map(|(k, v)| self.to_conway_redeemer_record(k, v)) + .collect::>()?, + ); + + record.plutus_data = Some( + witnesses + .plutus_data + .iter() + .flatten() + .map(|i| self.to_plutus_datum_record(i)) + .collect::>()?, + ); + } + + if let Some(withdrawals) = &body.withdrawals { + record.withdrawals = self.collect_withdrawal_records(withdrawals).into(); + } + } + + Ok(record) + } + + pub fn to_conway_block_record( + &self, + source: &MintedBlock, + hash: &Hash<32>, + cbor: &[u8], + ) -> Result { + let relative_epoch = self + .utils + .time + .as_ref() + .map(|time| time.absolute_slot_to_relative(source.header.header_body.slot)); + + let mut record = BlockRecord { + era: Era::Conway, + body_size: source.header.header_body.block_body_size as usize, + issuer_vkey: source.header.header_body.issuer_vkey.to_hex(), + vrf_vkey: source.header.header_body.vrf_vkey.to_hex(), + tx_count: source.transaction_bodies.len(), + hash: hex::encode(hash), + number: source.header.header_body.block_number, + slot: source.header.header_body.slot, + epoch: relative_epoch.map(|(epoch, _)| epoch), + epoch_slot: relative_epoch.map(|(_, epoch_slot)| epoch_slot), + previous_hash: source + .header + .header_body + .prev_hash + .map(hex::encode) + .unwrap_or_default(), + cbor_hex: match self.config.include_block_cbor { + true => hex::encode(cbor).into(), + false => None, + }, + transactions: None, + }; + + if self.config.include_block_details || self.config.include_transaction_details { + record.transactions = Some(self.collect_conway_tx_records(source)?); + } + + Ok(record) + } + + pub fn collect_conway_tx_records( + &self, + block: &MintedBlock, + ) -> Result, Error> { + block + .transaction_bodies + .iter() + .enumerate() + .map(|(idx, tx)| { + let aux_data = block + .auxiliary_data_set + .iter() + .find(|(k, _)| *k == (idx as u32)) + .map(|(_, v)| v); + + let witness_set = block.transaction_witness_sets.get(idx); + + let tx_hash = tx.original_hash().to_hex(); + + self.to_conway_transaction_record(tx, &tx_hash, aux_data, witness_set) + }) + .collect() + } + + fn crawl_conway_output(&self, output: &MintedPostAlonzoTransactionOutput) -> Result<(), Error> { + let record = self.to_conway_output_record(output)?; + self.append(record.into())?; + + let address = pallas_addresses::Address::from_bytes(&output.address)?; + + let child = &self.child_writer(EventContext { + output_address: address.to_string().into(), + ..EventContext::default() + }); + + child.crawl_transaction_output_amount(&output.value)?; + + if let Some(MintedDatumOption::Data(datum)) = &output.datum_option { + let record = self.to_plutus_datum_record(datum)?; + child.append(record.into())?; + } + + Ok(()) + } + + fn crawl_conway_transaction_output( + &self, + output: &MintedTransactionOutput, + ) -> Result<(), Error> { + match output { + MintedTransactionOutput::Legacy(x) => self.crawl_legacy_output(x), + MintedTransactionOutput::PostAlonzo(x) => self.crawl_conway_output(x), + } + } + + fn crawl_conway_witness_set( + &self, + witness_set: &KeepRaw, + ) -> Result<(), Error> { + if let Some(native) = &witness_set.native_script { + for script in native.iter() { + self.append_from(self.to_native_witness_record(script)?)?; + } + } + + if let Some(plutus) = &witness_set.plutus_v1_script { + for script in plutus.iter() { + self.append_from(self.to_plutus_v1_witness_record(script)?)?; + } + } + + if let Some(plutus) = &witness_set.plutus_v2_script { + for script in plutus.iter() { + self.append_from(self.to_plutus_v2_witness_record(script)?)?; + } + } + + if let Some(plutus) = &witness_set.plutus_v3_script { + for script in plutus.iter() { + self.append_from(self.to_plutus_v3_witness_record(script)?)?; + } + } + + if let Some(redeemers) = &witness_set.redeemer { + for (key, value) in redeemers.iter() { + self.append_from(self.to_conway_redeemer_record(key, value)?)?; + } + } + + if let Some(datums) = &witness_set.plutus_data { + for datum in datums.iter() { + self.append_from(self.to_plutus_datum_record(datum)?)?; + } + } + + Ok(()) + } + + pub fn to_conway_certificate_event(&self, certificate: &Certificate) -> Option { + match certificate { + Certificate::StakeRegistration(credential) => EventData::StakeRegistration { + credential: credential.into(), + } + .into(), + Certificate::StakeDeregistration(credential) => EventData::StakeDeregistration { + credential: credential.into(), + } + .into(), + Certificate::StakeDelegation(credential, pool) => EventData::StakeDelegation { + credential: credential.into(), + pool_hash: pool.to_hex(), + } + .into(), + Certificate::PoolRegistration { + operator, + vrf_keyhash, + pledge, + cost, + margin, + reward_account, + pool_owners, + relays, + pool_metadata, + } => EventData::PoolRegistration { + operator: operator.to_hex(), + vrf_keyhash: vrf_keyhash.to_hex(), + pledge: *pledge, + cost: *cost, + margin: (margin.numerator as f64 / margin.denominator as f64), + reward_account: reward_account.to_hex(), + pool_owners: pool_owners.iter().map(|p| p.to_hex()).collect(), + relays: relays.iter().map(super::map::relay_to_string).collect(), + pool_metadata: pool_metadata.to_owned().map(|m| m.url.clone()).into(), + pool_metadata_hash: pool_metadata + .to_owned() + .map(|m| m.hash.clone().to_hex()) + .into(), + } + .into(), + Certificate::PoolRetirement(pool, epoch) => EventData::PoolRetirement { + pool: pool.to_hex(), + epoch: *epoch, + } + .into(), + // all new Conway certs are out of scope for Oura lts/v1 + _ => None, + } + } + + fn crawl_conway_certificate(&self, certificate: &Certificate) -> Result<(), Error> { + if let Some(evt) = self.to_conway_certificate_event(certificate) { + self.append(evt)?; + } + + Ok(()) + } + + fn crawl_conway_transaction( + &self, + tx: &KeepRaw, + tx_hash: &str, + aux_data: Option<&KeepRaw>, + witness_set: Option<&KeepRaw>, + ) -> Result<(), Error> { + let record = self.to_conway_transaction_record(tx, tx_hash, aux_data, witness_set)?; + + self.append_from(record.clone())?; + + for (idx, input) in tx.inputs.iter().enumerate() { + let child = self.child_writer(EventContext { + input_idx: Some(idx), + ..EventContext::default() + }); + + child.crawl_transaction_input(input)?; + } + + for (idx, output) in tx.outputs.iter().enumerate() { + let child = self.child_writer(EventContext { + output_idx: Some(idx), + ..EventContext::default() + }); + + child.crawl_conway_transaction_output(output)?; + } + + if let Some(certs) = &tx.certificates { + for (idx, cert) in certs.iter().enumerate() { + let child = self.child_writer(EventContext { + certificate_idx: Some(idx), + ..EventContext::default() + }); + + child.crawl_conway_certificate(cert)?; + } + } + + if let Some(collateral) = &tx.collateral { + for collateral in collateral.iter() { + // TODO: collateral context? + + self.crawl_collateral(collateral)?; + } + } + + if let Some(mint) = &tx.mint { + self.crawl_conway_mints(mint)?; + } + + if let Some(aux_data) = aux_data { + self.crawl_auxdata(aux_data)?; + } + + if let Some(witness_set) = witness_set { + self.crawl_conway_witness_set(witness_set)?; + } + + if self.config.include_transaction_end_events { + self.append(EventData::TransactionEnd(record))?; + } + + Ok(()) + } + + fn crawl_conway_block( + &self, + block: &MintedBlock, + hash: &Hash<32>, + cbor: &[u8], + ) -> Result<(), Error> { + let record = self.to_conway_block_record(block, hash, cbor)?; + + self.append(EventData::Block(record.clone()))?; + + for (idx, tx) in block.transaction_bodies.iter().enumerate() { + let aux_data = block + .auxiliary_data_set + .iter() + .find(|(k, _)| *k == (idx as u32)) + .map(|(_, v)| v); + + let witness_set = block.transaction_witness_sets.get(idx); + + let tx_hash = tx.original_hash().to_hex(); + + let child = self.child_writer(EventContext { + tx_idx: Some(idx), + tx_hash: Some(tx_hash.to_owned()), + ..EventContext::default() + }); + + child.crawl_conway_transaction(tx, &tx_hash, aux_data, witness_set)?; + } + + if self.config.include_block_end_events { + self.append(EventData::BlockEnd(record))?; + } + + Ok(()) + } + + /// Mapper entry-point for decoded Conway blocks + /// + /// Entry-point to start crawling a blocks for events. Meant to be used when + /// we already have a decoded block (for example, N2C). The raw CBOR is also + /// passed through in case we need to attach it to outbound events. + pub fn crawl_conway_with_cbor<'b>( + &self, + block: &'b MintedBlock<'b>, + cbor: &'b [u8], + ) -> Result<(), Error> { + let hash = block.header.original_hash(); + + let child = self.child_writer(EventContext { + block_hash: Some(hex::encode(hash)), + block_number: Some(block.header.header_body.block_number), + slot: Some(block.header.header_body.slot), + timestamp: self.compute_timestamp(block.header.header_body.slot), + ..EventContext::default() + }); + + child.crawl_conway_block(block, &hash, cbor) + } + + /// Mapper entry-point for raw Conway cbor blocks + /// + /// Entry-point to start crawling a blocks for events. Meant to be used when + /// we haven't decoded the CBOR yet (for example, N2N). + pub fn crawl_from_conway_cbor(&self, cbor: &[u8]) -> Result<(), Error> { + let (_, block): (u16, MintedBlock) = pallas_codec::minicbor::decode(cbor)?; + self.crawl_conway_with_cbor(&block, cbor) + } +} diff --git a/src/mapper/map.rs b/src/mapper/map.rs index d05d20a0..79be068b 100644 --- a/src/mapper/map.rs +++ b/src/mapper/map.rs @@ -1,20 +1,21 @@ use std::collections::HashMap; -use pallas::ledger::primitives::alonzo::MintedWitnessSet; -use pallas::ledger::primitives::babbage::MintedDatumOption; -use pallas::ledger::traverse::{ComputeHash, OriginalHash}; -use pallas::{codec::utils::KeepRaw, crypto::hash::Hash}; +use pallas_codec::utils::KeepRaw; +use pallas_crypto::hash::Hash; +use pallas_primitives::alonzo::MintedWitnessSet; +use pallas_primitives::babbage::MintedDatumOption; +use pallas_traverse::{ComputeHash, OriginalHash}; -use pallas::ledger::primitives::{ +use pallas_primitives::{ alonzo::{ self as alonzo, AuxiliaryData, Certificate, InstantaneousRewardSource, InstantaneousRewardTarget, Metadatum, MetadatumLabel, MintedBlock, NetworkId, Relay, TransactionBody, TransactionInput, Value, }, - babbage, ToCanonicalJson, + babbage, conway, ToCanonicalJson, }; -use pallas::network::miniprotocols::Point; +use pallas_miniprotocols::Point; use serde_json::{json, Value as JsonValue}; use crate::model::{ @@ -64,23 +65,23 @@ fn ip_string_from_bytes(bytes: &[u8]) -> String { format!("{}.{}.{}.{}", bytes[0], bytes[1], bytes[2], bytes[3]) } -fn relay_to_string(relay: &Relay) -> String { +pub fn relay_to_string(relay: &Relay) -> String { match relay { Relay::SingleHostAddr(port, ipv4, ipv6) => { let ip = match (ipv6, ipv4) { - (None, None) => "".to_string(), - (_, Some(x)) => ip_string_from_bytes(x.as_ref()), - (Some(x), _) => ip_string_from_bytes(x.as_ref()), + (_, pallas_codec::utils::Nullable::Some(x)) => ip_string_from_bytes(x.as_ref()), + (pallas_codec::utils::Nullable::Some(x), _) => ip_string_from_bytes(x.as_ref()), + _ => "".to_string(), }; match port { - Some(port) => format!("{ip}:{port}"), - None => ip, + pallas_codec::utils::Nullable::Some(port) => format!("{ip}:{port}"), + _ => ip, } } Relay::SingleHostName(port, host) => match port { - Some(port) => format!("{host}:{port}"), - None => host.clone(), + pallas_codec::utils::Nullable::Some(port) => format!("{host}:{port}"), + _ => host.clone(), }, Relay::MultiHostName(host) => host.clone(), } @@ -98,7 +99,7 @@ fn metadatum_to_string_key(datum: &Metadatum) -> String { } } -fn get_tx_output_coin_value(amount: &Value) -> u64 { +pub fn get_tx_output_coin_value(amount: &Value) -> u64 { match amount { Value::Coin(x) => *x, Value::Multiasset(x, _) => *x, @@ -169,7 +170,7 @@ impl EventWriter { &self, output: &alonzo::TransactionOutput, ) -> Result { - let address = pallas::ledger::addresses::Address::from_bytes(&output.address)?; + let address = pallas_addresses::Address::from_bytes(&output.address)?; Ok(TxOutputRecord { address: address.to_string(), @@ -184,7 +185,7 @@ impl EventWriter { &self, output: &babbage::MintedPostAlonzoTransactionOutput, ) -> Result { - let address = pallas::ledger::addresses::Address::from_bytes(&output.address)?; + let address = pallas_addresses::Address::from_bytes(&output.address)?; Ok(TxOutputRecord { address: address.to_string(), @@ -205,7 +206,7 @@ impl EventWriter { pub fn to_transaction_output_asset_record( &self, policy: &Hash<28>, - asset: &pallas::codec::utils::Bytes, + asset: &pallas_codec::utils::Bytes, amount: u64, ) -> OutputAssetRecord { OutputAssetRecord { @@ -219,7 +220,7 @@ impl EventWriter { pub fn to_mint_record( &self, policy: &Hash<28>, - asset: &pallas::codec::utils::Bytes, + asset: &pallas_codec::utils::Bytes, quantity: i64, ) -> MintRecord { MintRecord { @@ -291,6 +292,16 @@ impl EventWriter { }) } + pub fn to_plutus_v3_witness_record( + &self, + script: &conway::PlutusV3Script, + ) -> Result { + Ok(PlutusWitnessRecord { + script_hash: script.compute_hash().to_hex(), + script_hex: script.as_ref().to_hex(), + }) + } + pub fn to_native_witness_record( &self, script: &alonzo::NativeScript, @@ -342,8 +353,11 @@ impl EventWriter { reward_account: reward_account.to_hex(), pool_owners: pool_owners.iter().map(|p| p.to_hex()).collect(), relays: relays.iter().map(relay_to_string).collect(), - pool_metadata: pool_metadata.as_ref().map(|m| m.url.clone()), - pool_metadata_hash: pool_metadata.as_ref().map(|m| m.hash.clone().to_hex()), + pool_metadata: pool_metadata.to_owned().map(|m| m.url.clone()).into(), + pool_metadata_hash: pool_metadata + .to_owned() + .map(|m| m.hash.clone().to_hex()) + .into(), }, Certificate::PoolRetirement(pool, epoch) => EventData::PoolRetirement { pool: pool.to_hex(), diff --git a/src/mapper/mod.rs b/src/mapper/mod.rs index a176e40f..13e68677 100644 --- a/src/mapper/mod.rs +++ b/src/mapper/mod.rs @@ -3,6 +3,7 @@ mod byron; mod cip15; mod cip25; mod collect; +mod conway; mod map; mod prelude; mod shelley; diff --git a/src/mapper/prelude.rs b/src/mapper/prelude.rs index 4564bb92..806ec250 100644 --- a/src/mapper/prelude.rs +++ b/src/mapper/prelude.rs @@ -106,15 +106,15 @@ impl EventWriter { } } -impl From for Era { - fn from(other: pallas::ledger::traverse::Era) -> Self { +impl From for Era { + fn from(other: pallas_traverse::Era) -> Self { match other { - pallas::ledger::traverse::Era::Byron => Era::Byron, - pallas::ledger::traverse::Era::Shelley => Era::Shelley, - pallas::ledger::traverse::Era::Allegra => Era::Allegra, - pallas::ledger::traverse::Era::Mary => Era::Mary, - pallas::ledger::traverse::Era::Alonzo => Era::Alonzo, - pallas::ledger::traverse::Era::Babbage => Era::Babbage, + pallas_traverse::Era::Byron => Era::Byron, + pallas_traverse::Era::Shelley => Era::Shelley, + pallas_traverse::Era::Allegra => Era::Allegra, + pallas_traverse::Era::Mary => Era::Mary, + pallas_traverse::Era::Alonzo => Era::Alonzo, + pallas_traverse::Era::Babbage => Era::Babbage, _ => Era::Unknown, } } diff --git a/src/mapper/shelley.rs b/src/mapper/shelley.rs index 183ec6e7..c10a930c 100644 --- a/src/mapper/shelley.rs +++ b/src/mapper/shelley.rs @@ -1,12 +1,12 @@ -use pallas::codec::utils::KeepRaw; +use pallas_codec::utils::KeepRaw; -use pallas::ledger::primitives::alonzo::{ +use pallas_primitives::alonzo::{ AuxiliaryData, Certificate, Metadata, MintedBlock, MintedWitnessSet, Multiasset, TransactionBody, TransactionInput, TransactionOutput, Value, }; -use pallas::crypto::hash::Hash; -use pallas::ledger::traverse::OriginalHash; +use pallas_crypto::hash::Hash; +use pallas_traverse::OriginalHash; use crate::{ model::{Era, EventContext, EventData}, @@ -89,7 +89,7 @@ impl EventWriter { let record = self.to_legacy_output_record(output)?; self.append(record.into())?; - let address = pallas::ledger::addresses::Address::from_bytes(&output.address)?; + let address = pallas_addresses::Address::from_bytes(&output.address)?; let child = &self.child_writer(EventContext { output_address: address.to_string().into(), @@ -325,7 +325,7 @@ impl EventWriter { /// Shelley. In this way, we can avoid having to fork the crawling procedure /// for each different hard-fork. pub fn crawl_from_shelley_cbor(&self, cbor: &[u8], era: Era) -> Result<(), Error> { - let (_, block): (u16, MintedBlock) = pallas::codec::minicbor::decode(cbor)?; + let (_, block): (u16, MintedBlock) = pallas_codec::minicbor::decode(cbor)?; self.crawl_shelley_with_cbor(&block, cbor, era) } } diff --git a/src/model.rs b/src/model.rs index ad661bc3..89f894b4 100644 --- a/src/model.rs +++ b/src/model.rs @@ -25,6 +25,7 @@ pub enum Era { Mary, Alonzo, Babbage, + Conway, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] @@ -247,7 +248,7 @@ impl From for EventData { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct PlutusRedeemerRecord { pub purpose: String, - pub ex_units_mem: u32, + pub ex_units_mem: u64, pub ex_units_steps: u64, pub input_idx: u32, pub plutus_data: JsonValue, diff --git a/src/sinks/elastic/run.rs b/src/sinks/elastic/run.rs index 0ba6750d..405d2ad8 100644 --- a/src/sinks/elastic/run.rs +++ b/src/sinks/elastic/run.rs @@ -1,4 +1,4 @@ -use elasticsearch::{params::OpType, Elasticsearch, IndexParts, http::StatusCode}; +use elasticsearch::{http::StatusCode, params::OpType, Elasticsearch, IndexParts}; use serde::Serialize; use serde_json::json; use std::sync::Arc; diff --git a/src/sinks/gcp_pubsub/run.rs b/src/sinks/gcp_pubsub/run.rs index f4116126..6d7364c3 100644 --- a/src/sinks/gcp_pubsub/run.rs +++ b/src/sinks/gcp_pubsub/run.rs @@ -1,6 +1,5 @@ use std::{collections::HashMap, sync::Arc}; -use google_cloud_gax::conn::Environment; use google_cloud_googleapis::pubsub::v1::PubsubMessage; use google_cloud_pubsub::{ client::{Client, ClientConfig}, @@ -49,9 +48,6 @@ pub fn writer_loop( retry_policy: &retry::Policy, ordering_key: &str, attributes: &GenericKV, - emulator: bool, - emulator_endpoint: &Option, - emulator_project_id: &Option, utils: Arc, ) -> Result<(), crate::Error> { let rt = tokio::runtime::Builder::new_current_thread() @@ -60,16 +56,7 @@ pub fn writer_loop( .build()?; let publisher: Publisher = rt.block_on(async { - let client_config = if emulator { - ClientConfig { - project_id: Some(emulator_project_id.clone().unwrap_or_default()), - environment: Environment::Emulator(emulator_endpoint.clone().unwrap_or_default()), - ..Default::default() - } - } else { - ClientConfig::default() - }; - let client = Client::new(client_config.with_auth().await?).await?; + let client = Client::new(ClientConfig::default().with_auth().await?).await?; let topic = client.topic(topic_name); Result::<_, crate::Error>::Ok(topic.new_publisher(None)) })?; diff --git a/src/sinks/gcp_pubsub/setup.rs b/src/sinks/gcp_pubsub/setup.rs index a1cdd064..033bcc19 100644 --- a/src/sinks/gcp_pubsub/setup.rs +++ b/src/sinks/gcp_pubsub/setup.rs @@ -27,12 +27,6 @@ pub struct Config { impl SinkProvider for WithUtils { fn bootstrap(&self, input: StageReceiver) -> BootstrapResult { let topic_name = self.inner.topic.to_owned(); - let mut use_emulator = self.inner.emulator.unwrap_or(false); - let emulator_endpoint = self.inner.emulator_endpoint.to_owned(); - let emulator_project_id = self.inner.emulator_project_id.to_owned(); - if use_emulator && (emulator_endpoint.is_none() || emulator_project_id.is_none()) { - use_emulator = false; - } let error_policy = self .inner @@ -56,9 +50,6 @@ impl SinkProvider for WithUtils { &retry_policy, &ordering_key, &attributes, - use_emulator, - &emulator_endpoint, - &emulator_project_id, utils, ) .expect("writer loop failed"); diff --git a/src/sources/common.rs b/src/sources/common.rs index 4687561e..7731a555 100644 --- a/src/sources/common.rs +++ b/src/sources/common.rs @@ -1,13 +1,9 @@ use core::fmt; use std::{ops::Deref, str::FromStr, time::Duration}; -use pallas::{ - ledger::traverse::{probe, Era}, - network::{ - miniprotocols::{chainsync, Point, MAINNET_MAGIC, TESTNET_MAGIC}, - multiplexer::{bearers::Bearer, StdChannel, StdPlexer}, - }, -}; +use pallas_miniprotocols::{chainsync, Point, MAINNET_MAGIC, TESTNET_MAGIC}; +use pallas_multiplexer::{bearers::Bearer, StdChannel, StdPlexer}; +use pallas_traverse::{probe, Era}; use serde::{de::Visitor, Deserializer}; use serde::{Deserialize, Serialize}; @@ -72,9 +68,9 @@ impl FromStr for PointArg { } } -impl ToString for PointArg { - fn to_string(&self) -> String { - format!("{},{}", self.0, self.1) +impl std::fmt::Display for PointArg { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{},{}", self.0, self.1) } } @@ -262,77 +258,91 @@ pub fn should_finalize( false } -pub(crate) fn intersect_starting_point( - client: &mut chainsync::Client, - intersect_arg: &Option, - since_arg: &Option, - utils: &Utils, -) -> Result, Error> -where - chainsync::Message: pallas::codec::Fragment, -{ - let cursor = utils.get_cursor_if_any(); - - match cursor { - Some(cursor) => { - log::info!("found persisted cursor, will use as starting point"); - let desired = cursor.try_into()?; - let (point, _) = client.find_intersect(vec![desired])?; - - Ok(point) - } - None => match intersect_arg { - Some(IntersectArg::Fallbacks(x)) => { - log::info!("found 'fallbacks' intersect argument, will use as starting point"); - let options: Result, _> = x.iter().map(|x| x.clone().try_into()).collect(); - - let (point, _) = client.find_intersect(options?)?; - - Ok(point) - } - Some(IntersectArg::Origin) => { - log::info!("found 'origin' intersect argument, will use as starting point"); - - let point = client.intersect_origin()?; - - Ok(Some(point)) - } - Some(IntersectArg::Point(x)) => { - log::info!("found 'point' intersect argument, will use as starting point"); - let options = vec![x.clone().try_into()?]; - - let (point, _) = client.find_intersect(options)?; - - Ok(point) - } - Some(IntersectArg::Tip) => { - log::info!("found 'tip' intersect argument, will use as starting point"); - - let point = client.intersect_tip()?; - - Ok(Some(point)) - } - None => match since_arg { - Some(x) => { - log::info!("explicit 'since' argument, will use as starting point"); - log::warn!("`since` value is deprecated, please use `intersect`"); - let options = vec![x.clone().try_into()?]; - - let (point, _) = client.find_intersect(options)?; +macro_rules! intersect_starting_point { + ($fn:ident, $client:ty) => { + pub(crate) fn $fn( + client: &mut $client, + intersect_arg: &Option, + since_arg: &Option, + utils: &Utils, + ) -> Result, Error> { + let cursor = utils.get_cursor_if_any(); + + match cursor { + Some(cursor) => { + log::info!("found persisted cursor, will use as starting point"); + let desired = cursor.try_into()?; + let (point, _) = client.find_intersect(vec![desired])?; Ok(point) } - None => { - log::info!("no starting point specified, will use tip of chain"); + None => match intersect_arg { + Some(IntersectArg::Fallbacks(x)) => { + log::info!( + "found 'fallbacks' intersect argument, will use as starting point" + ); + let options: Result, _> = + x.iter().map(|x| x.clone().try_into()).collect(); + + let (point, _) = client.find_intersect(options?)?; + + Ok(point) + } + Some(IntersectArg::Origin) => { + log::info!("found 'origin' intersect argument, will use as starting point"); + + let point = client.intersect_origin()?; + + Ok(Some(point)) + } + Some(IntersectArg::Point(x)) => { + log::info!("found 'point' intersect argument, will use as starting point"); + let options = vec![x.clone().try_into()?]; + + let (point, _) = client.find_intersect(options)?; + + Ok(point) + } + Some(IntersectArg::Tip) => { + log::info!("found 'tip' intersect argument, will use as starting point"); + + let point = client.intersect_tip()?; + + Ok(Some(point)) + } + None => match since_arg { + Some(x) => { + log::info!("explicit 'since' argument, will use as starting point"); + log::warn!("`since` value is deprecated, please use `intersect`"); + let options = vec![x.clone().try_into()?]; + + let (point, _) = client.find_intersect(options)?; + + Ok(point) + } + None => { + log::info!("no starting point specified, will use tip of chain"); + + let point = client.intersect_tip()?; + + Ok(Some(point)) + } + }, + }, + } + } + }; +} - let point = client.intersect_tip()?; +intersect_starting_point!( + intersect_starting_point_n2n, + chainsync::N2NClient +); - Ok(Some(point)) - } - }, - }, - } -} +intersect_starting_point!( + intersect_starting_point_n2c, + chainsync::N2CClient +); pub fn unknown_block_to_events(writer: &EventWriter, body: &Vec) -> Result<(), Error> { match probe::block_era(body) { @@ -352,6 +362,11 @@ pub fn unknown_block_to_events(writer: &EventWriter, body: &Vec) -> Result<( .crawl_from_babbage_cbor(body) .ok_or_warn("error crawling babbage block for events"); } + Era::Conway => { + writer + .crawl_from_conway_cbor(body) + .ok_or_warn("error crawling conway block for events"); + } x => { return Err(format!("This version of Oura can't handle era: {x}").into()); } diff --git a/src/sources/n2c/run.rs b/src/sources/n2c/run.rs index a6aef999..2678813f 100644 --- a/src/sources/n2c/run.rs +++ b/src/sources/n2c/run.rs @@ -1,18 +1,14 @@ use std::{collections::HashMap, fmt::Debug, ops::Deref, sync::Arc, time::Duration}; -use pallas::{ - ledger::traverse::MultiEraBlock, - network::{ - miniprotocols::{chainsync, handshake, Point, MAINNET_MAGIC}, - multiplexer::StdChannel, - }, -}; +use pallas_miniprotocols::{chainsync, handshake, Point, MAINNET_MAGIC}; +use pallas_multiplexer::StdChannel; +use pallas_traverse::MultiEraBlock; use crate::{ mapper::EventWriter, pipelining::StageSender, sources::{ - intersect_starting_point, setup_multiplexer, should_finalize, unknown_block_to_events, + intersect_starting_point_n2c, setup_multiplexer, should_finalize, unknown_block_to_events, FinalizeConfig, }, utils::{retry, Utils}, @@ -217,7 +213,7 @@ fn do_chainsync_attempt( let mut client = chainsync::N2CClient::new(cs_channel); - let intersection = intersect_starting_point( + let intersection = intersect_starting_point_n2c( &mut client, &config.intersect, #[allow(deprecated)] diff --git a/src/sources/n2n/run.rs b/src/sources/n2n/run.rs index f2b83267..75936ecb 100644 --- a/src/sources/n2n/run.rs +++ b/src/sources/n2n/run.rs @@ -1,9 +1,7 @@ use std::{fmt::Debug, ops::Deref, sync::Arc, time::Duration}; -use pallas::network::{ - miniprotocols::{blockfetch, chainsync, handshake, Point, MAINNET_MAGIC}, - multiplexer::StdChannel, -}; +use pallas_miniprotocols::{blockfetch, chainsync, handshake, Point, MAINNET_MAGIC}; +use pallas_multiplexer::StdChannel; use std::sync::mpsc::{Receiver, SyncSender}; @@ -11,7 +9,7 @@ use crate::{ mapper::EventWriter, pipelining::StageSender, sources::{ - intersect_starting_point, setup_multiplexer, should_finalize, unknown_block_to_events, + intersect_starting_point_n2n, setup_multiplexer, should_finalize, unknown_block_to_events, FinalizeConfig, }, utils::{retry, Utils}, @@ -56,7 +54,7 @@ impl ChainObserver { ) -> Result { // parse the header and extract the point of the chain - let header = pallas::ledger::traverse::MultiEraHeader::decode( + let header = pallas_traverse::MultiEraHeader::decode( content.variant, content.byron_prefix.map(|x| x.0), &content.cbor, @@ -224,7 +222,7 @@ fn do_chainsync_attempt( let mut cs_client = chainsync::N2NClient::new(cs_channel); - let intersection = intersect_starting_point( + let intersection = intersect_starting_point_n2n( &mut cs_client, &config.intersect, #[allow(deprecated)] diff --git a/src/utils/mod.rs b/src/utils/mod.rs index b7c8375b..d142c883 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -7,7 +7,7 @@ use std::sync::Arc; -use pallas::network::miniprotocols::{Point, MAINNET_MAGIC, TESTNET_MAGIC}; +use pallas_miniprotocols::{Point, MAINNET_MAGIC, TESTNET_MAGIC}; // TODO: move these values to Pallas pub const PREPROD_MAGIC: u64 = 1; diff --git a/src/utils/time.rs b/src/utils/time.rs index da2bef4d..77db083a 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -9,6 +9,7 @@ pub(crate) trait TimeProvider { /// Maps between slots and wallclock fn slot_to_wallclock(&self, slot: u64) -> u64; fn absolute_slot_to_relative(&self, slot: u64) -> (u64, u64); + fn byron_epoch_slot_to_absolute(&self, epoch: u64, slot: u64) -> u64; } /// A naive, standalone implementation of a time provider @@ -66,6 +67,16 @@ fn compute_era_epoch(era_slot: u64, era_slot_length: u64, era_epoch_length: u64) (epoch, reminder) } +#[inline] +fn relative_slot_to_absolute( + epoch: u64, + sub_epoch_slot: u64, + epoch_length: u64, + slot_length: u64, +) -> u64 { + ((epoch * epoch_length) / slot_length) + sub_epoch_slot +} + impl TimeProvider for NaiveProvider { fn slot_to_wallclock(&self, slot: u64) -> u64 { let NaiveProvider { config, .. } = self; @@ -111,6 +122,15 @@ impl TimeProvider for NaiveProvider { (shelley_start_epoch + era_epoch, reminder) } } + + fn byron_epoch_slot_to_absolute(&self, epoch: u64, slot: u64) -> u64 { + relative_slot_to_absolute( + epoch, + slot, + self.config.byron_epoch_length as u64, + self.config.byron_slot_length as u64, + ) + } } #[cfg(test)]