Skip to content

Commit

Permalink
Paulo/fix pubsub (#3)
Browse files Browse the repository at this point in the history
* fix: use atomic operation to write cursor (txpipe#798)

* Release v1.8.6

* feat: add support for GCP pubsub emulator (txpipe#803)

* feat: make v1 compatible with Conway era (txpipe#807)

* Release v1.9.0

* fix: running pubsub on platform

* chore: formatting

* chore: cargo fmt

---------

Co-authored-by: Santiago Carmuega <santiago@carmuega.me>
Co-authored-by: Joaquin Hoyos (Clark) <joaco.hoyos@hotmail.com>
  • Loading branch information
3 people authored Aug 5, 2024
1 parent fd86874 commit 5e81661
Show file tree
Hide file tree
Showing 21 changed files with 947 additions and 280 deletions.
187 changes: 120 additions & 67 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 10 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "oura"
description = "The tail of Cardano"
version = "1.9.2"
version = "1.10.0"
edition = "2021"
repository = "https://github.com/txpipe/oura"
homepage = "https://github.com/txpipe/oura"
Expand All @@ -12,7 +12,13 @@ authors = ["Santiago Carmuega <santiago@carmuega.me>"]


[dependencies]
pallas = "0.18.2"
pallas-multiplexer = "0.18.2"
pallas-miniprotocols = "0.18.2"
pallas-primitives = "0.29.0"
pallas-traverse = "0.29.0"
pallas-addresses = "0.29.0"
pallas-codec = "0.29.0"
pallas-crypto = "0.29.0"
# pallas = { git = "https://github.com/txpipe/pallas" }
# pallas = { path = "../pallas/pallas" }
hex = "0.4.3"
Expand All @@ -30,6 +36,7 @@ strum = "0.26.3"
strum_macros = "0.26.4"
prometheus_exporter = { version = "0.8.5", default-features = false }
unicode-truncate = "1.1.0"
time = "0.3.36"

# feature logs
file-rotate = { version = "0.7.1", optional = true }
Expand Down Expand Up @@ -80,5 +87,5 @@ elasticsink = ["elasticsearch", "tokio"]
fingerprint = ["murmur3"]
aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"]
redissink = ["redis", "tokio"]
gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "tokio", "web" ,"google-cloud-gax"]
gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "tokio", "web", "google-cloud-gax"]
rabbitmqsink = ["lapin", "tokio"]
12 changes: 6 additions & 6 deletions src/mapper/babbage.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use pallas::codec::utils::KeepRaw;
use pallas_codec::utils::KeepRaw;

use pallas::ledger::primitives::babbage::{
use pallas_primitives::babbage::{
AuxiliaryData, MintedBlock, MintedDatumOption, MintedPostAlonzoTransactionOutput,
MintedTransactionBody, MintedTransactionOutput, MintedWitnessSet, NetworkId,
};

use pallas::crypto::hash::Hash;
use pallas::ledger::traverse::OriginalHash;
use pallas_crypto::hash::Hash;
use pallas_traverse::OriginalHash;

use crate::model::{BlockRecord, Era, TransactionRecord};
use crate::utils::time::TimeProvider;
Expand Down Expand Up @@ -199,7 +199,7 @@ impl EventWriter {
let record = self.to_post_alonzo_output_record(output)?;
self.append(record.into())?;

let address = pallas::ledger::addresses::Address::from_bytes(&output.address)?;
let address = pallas_addresses::Address::from_bytes(&output.address)?;

let child = &self.child_writer(EventContext {
output_address: address.to_string().into(),
Expand Down Expand Up @@ -389,7 +389,7 @@ impl EventWriter {
/// Entry-point to start crawling a blocks for events. Meant to be used when
/// we haven't decoded the CBOR yet (for example, N2N).
pub fn crawl_from_babbage_cbor(&self, cbor: &[u8]) -> Result<(), Error> {
let (_, block): (u16, MintedBlock) = pallas::codec::minicbor::decode(cbor)?;
let (_, block): (u16, MintedBlock) = pallas_codec::minicbor::decode(cbor)?;
self.crawl_babbage_with_cbor(&block, cbor)
}
}
66 changes: 33 additions & 33 deletions src/mapper/byron.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use std::ops::Deref;
use super::map::ToHex;
use super::EventWriter;
use crate::model::{BlockRecord, Era, EventData, TransactionRecord, TxInputRecord, TxOutputRecord};
use crate::utils::time::TimeProvider;
use crate::{model::EventContext, Error};

use pallas::crypto::hash::Hash;
use pallas::ledger::primitives::byron;
use pallas::ledger::traverse::OriginalHash;
use pallas_crypto::hash::Hash;
use pallas_primitives::byron;
use pallas_traverse::OriginalHash;

impl EventWriter {
fn to_byron_input_record(&self, source: &byron::TxIn) -> Option<TxInputRecord> {
Expand Down Expand Up @@ -41,12 +42,9 @@ impl EventWriter {
}

fn to_byron_output_record(&self, source: &byron::TxOut) -> Result<TxOutputRecord, Error> {
let address: pallas::ledger::addresses::Address =
pallas::ledger::addresses::ByronAddress::new(
&source.address.payload.0,
source.address.crc,
)
.into();
let address: pallas_addresses::Address =
pallas_addresses::ByronAddress::new(&source.address.payload.0, source.address.crc)
.into();

Ok(TxOutputRecord {
address: address.to_string(),
Expand Down Expand Up @@ -168,10 +166,12 @@ impl EventWriter {
hash: &Hash<32>,
cbor: &[u8],
) -> Result<BlockRecord, Error> {
let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute(
source.header.consensus_data.0.epoch,
source.header.consensus_data.0.slot,
);
let abs_slot = self.utils.time.as_ref().map(|time| {
time.byron_epoch_slot_to_absolute(
source.header.consensus_data.0.epoch,
source.header.consensus_data.0.slot,
)
});

let mut record = BlockRecord {
era: Era::Byron,
Expand All @@ -181,7 +181,7 @@ impl EventWriter {
tx_count: source.body.tx_payload.len(),
hash: hash.to_hex(),
number: source.header.consensus_data.2[0],
slot: abs_slot,
slot: abs_slot.unwrap_or_default(),
epoch: Some(source.header.consensus_data.0.epoch),
epoch_slot: Some(source.header.consensus_data.0.slot),
previous_hash: source.header.prev_block.to_hex(),
Expand Down Expand Up @@ -234,10 +234,9 @@ impl EventWriter {
hash: &Hash<32>,
cbor: &[u8],
) -> Result<BlockRecord, Error> {
let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute(
source.header.consensus_data.epoch_id,
0,
);
let abs_slot = self.utils.time.as_ref().map(|time| {
time.byron_epoch_slot_to_absolute(source.header.consensus_data.epoch_id, 0)
});

Ok(BlockRecord {
era: Era::Byron,
Expand All @@ -247,7 +246,7 @@ impl EventWriter {
vrf_vkey: Default::default(),
tx_count: 0,
number: source.header.consensus_data.difficulty[0],
slot: abs_slot,
slot: abs_slot.unwrap_or_default(),
epoch: Some(source.header.consensus_data.epoch_id),
epoch_slot: Some(0),
previous_hash: source.header.prev_block.to_hex(),
Expand Down Expand Up @@ -288,16 +287,18 @@ impl EventWriter {
) -> Result<(), Error> {
let hash = block.header.original_hash();

let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute(
block.header.consensus_data.0.epoch,
block.header.consensus_data.0.slot,
);
let abs_slot = self.utils.time.as_ref().map(|time| {
time.byron_epoch_slot_to_absolute(
block.header.consensus_data.0.epoch,
block.header.consensus_data.0.slot,
)
});

let child = self.child_writer(EventContext {
block_hash: Some(hex::encode(hash)),
block_number: Some(block.header.consensus_data.2[0]),
slot: Some(abs_slot),
timestamp: self.compute_timestamp(abs_slot),
slot: abs_slot,
timestamp: abs_slot.and_then(|slot| self.compute_timestamp(slot)),
..EventContext::default()
});

Expand All @@ -311,7 +312,7 @@ impl EventWriter {
/// Entry-point to start crawling a blocks for events. Meant to be used when
/// we haven't decoded the CBOR yet (for example, N2N).
pub fn crawl_from_byron_cbor(&self, cbor: &[u8]) -> Result<(), Error> {
let (_, block): (u16, byron::MintedBlock) = pallas::codec::minicbor::decode(cbor)?;
let (_, block): (u16, byron::MintedBlock) = pallas_codec::minicbor::decode(cbor)?;
self.crawl_byron_with_cbor(&block, cbor)
}

Expand All @@ -328,16 +329,15 @@ impl EventWriter {
if self.config.include_byron_ebb {
let hash = block.header.original_hash();

let abs_slot = pallas::ledger::traverse::time::byron_epoch_slot_to_absolute(
block.header.consensus_data.epoch_id,
0,
);
let abs_slot = self.utils.time.as_ref().map(|time| {
time.byron_epoch_slot_to_absolute(block.header.consensus_data.epoch_id, 0)
});

let child = self.child_writer(EventContext {
block_hash: Some(hex::encode(hash)),
block_number: Some(block.header.consensus_data.difficulty[0]),
slot: Some(abs_slot),
timestamp: self.compute_timestamp(abs_slot),
slot: abs_slot,
timestamp: abs_slot.and_then(|slot| self.compute_timestamp(slot)),
..EventContext::default()
});

Expand All @@ -352,7 +352,7 @@ impl EventWriter {
/// Entry-point to start crawling a blocks for events. Meant to be used when
/// we haven't decoded the CBOR yet (for example, N2N).
pub fn crawl_from_ebb_cbor(&self, cbor: &[u8]) -> Result<(), Error> {
let (_, block): (u16, byron::MintedEbBlock) = pallas::codec::minicbor::decode(cbor)?;
let (_, block): (u16, byron::MintedEbBlock) = pallas_codec::minicbor::decode(cbor)?;
self.crawl_ebb_with_cbor(&block, cbor)
}
}
2 changes: 1 addition & 1 deletion src/mapper/cip15.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::model::CIP15AssetRecord;
use crate::Error;
use serde_json::Value as JsonValue;

use pallas::ledger::primitives::alonzo::Metadatum;
use pallas_primitives::alonzo::Metadatum;

fn extract_json_property<'a>(
json: &'a JsonValue,
Expand Down
2 changes: 1 addition & 1 deletion src/mapper/cip25.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde_json::Value as JsonValue;

use pallas::ledger::primitives::alonzo::Metadatum;
use pallas_primitives::alonzo::Metadatum;

use crate::{model::CIP25AssetRecord, Error};

Expand Down
26 changes: 11 additions & 15 deletions src/mapper/collect.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
use pallas::{
codec::utils::{KeepRaw, KeyValuePairs, MaybeIndefArray},
ledger::{
primitives::{
alonzo::{
AuxiliaryData, Coin, MintedBlock, Multiasset, NativeScript, PlutusData,
PlutusScript, Redeemer, RewardAccount, TransactionInput, VKeyWitness, Value,
},
babbage::{
LegacyTransactionOutput, MintedPostAlonzoTransactionOutput,
MintedTransactionOutput, PlutusV2Script,
},
},
traverse::OriginalHash,
use pallas_codec::utils::{KeepRaw, KeyValuePairs, MaybeIndefArray};
use pallas_primitives::{
alonzo::{
AuxiliaryData, Coin, MintedBlock, Multiasset, NativeScript, PlutusData, PlutusScript,
Redeemer, RewardAccount, TransactionInput, VKeyWitness, Value,
},
babbage::{
LegacyTransactionOutput, MintedPostAlonzoTransactionOutput, MintedTransactionOutput,
PlutusV2Script,
},
};
use pallas_traverse::OriginalHash;

use crate::{
model::{
Expand Down Expand Up @@ -138,7 +134,7 @@ impl EventWriter {

pub fn collect_native_witness_records(
&self,
witness_set: &Option<Vec<NativeScript>>,
witness_set: &Option<Vec<KeepRaw<'_, NativeScript>>>,
) -> Result<Vec<NativeWitnessRecord>, Error> {
match witness_set {
Some(all) => all
Expand Down
Loading

0 comments on commit 5e81661

Please sign in to comment.