Skip to content

Commit

Permalink
feat(publisher): Publish predicates (#243)
Browse files Browse the repository at this point in the history
* refactor(core): Allow fetching inputs and outputs for fuel-streams transactions

* feat(publisher): publish predicate-related subjects

* refactor(publisher): extract predicates tag fn

* refactor(publisher): run publishing tasks concurrently

* refactor(publisher): include logs and utxos

* refactor(publisher): include publishing interval metric per block
  • Loading branch information
Jurshsmith authored Oct 7, 2024
1 parent 1ed8daf commit 94b90fb
Show file tree
Hide file tree
Showing 16 changed files with 577 additions and 404 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pretty_assertions = "1.4.1"
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.10"
strum = "0.26"
strum_macros = "0.26"
tokio = { version = "1.40", features = ["full"] }
Expand Down
9 changes: 9 additions & 0 deletions crates/fuel-streams-core/src/stream/stream_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ impl<S: Streamable> Stream<S> {
payload: &S,
) -> Result<usize, StreamError> {
let subject_name = &subject.parse();
self.publish_raw(subject_name, payload).await
}

/// Publish with subject name with no static guarantees of the subject
pub async fn publish_raw(
&self,
subject_name: &str,
payload: &S,
) -> Result<usize, StreamError> {
let data = payload.encode(subject_name).await;
let data_size = data.len();
let result = self.store.create(subject_name, data.into()).await;
Expand Down
38 changes: 38 additions & 0 deletions crates/fuel-streams-core/src/transactions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod subjects;
pub mod types;

use fuel_core_types::fuel_tx::{field::Outputs, Output};
pub use subjects::*;
use types::*;

Expand All @@ -14,3 +15,40 @@ impl Streamable for Transaction {
TransactionsByIdSubject::WILDCARD,
];
}

pub trait WithTxInputs {
fn inputs(&self) -> &[Input];
}

pub trait WithTxOutputs {
fn outputs(&self) -> &Vec<Output>;
}

impl WithTxInputs for Transaction {
fn inputs(&self) -> &[Input] {
match self {
Transaction::Mint(_) => &[],
Transaction::Script(tx) => tx.inputs(),
Transaction::Blob(tx) => tx.inputs(),
Transaction::Create(tx) => tx.inputs(),
Transaction::Upload(tx) => tx.inputs(),
Transaction::Upgrade(tx) => tx.inputs(),
}
}
}

impl WithTxOutputs for Transaction {
fn outputs(&self) -> &Vec<Output> {
match self {
Transaction::Mint(_) => {
static NO_OUTPUTS: Vec<Output> = Vec::new();
&NO_OUTPUTS
}
Transaction::Script(tx) => tx.outputs(),
Transaction::Blob(tx) => tx.outputs(),
Transaction::Create(tx) => tx.outputs(),
Transaction::Upload(tx) => tx.outputs(),
Transaction::Upgrade(tx) => tx.outputs(),
}
}
}
1 change: 1 addition & 0 deletions crates/fuel-streams-core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub use crate::{
logs::types::*,
nats::types::*,
transactions::types::*,
utxos::types::*,
};

// ------------------------------------------------------------------------
Expand Down
10 changes: 4 additions & 6 deletions crates/fuel-streams-core/src/utxos/subjects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::{types::UtxoType, MessageId};
/// # use fuel_streams_core::types::*;
/// # use fuel_streams_macros::subject::*;
/// let subject = UtxosSubject {
/// tx_id: Some(Bytes32::from([1u8; 32]).into()),
/// hash: Some(MessageId::from([1u8; 32])),
/// utxo_type: Some(UtxoType::Message),
/// };
/// assert_eq!(
Expand All @@ -40,11 +40,10 @@ use super::{types::UtxoType, MessageId};
/// # use fuel_streams_core::types::*;
/// # use fuel_streams_macros::subject::*;
/// let wildcard = UtxosSubject::wildcard(
/// Some(MessageId::from([1u8; 32])),
/// None,
/// Some(Bytes32::from([1u8; 32]).into()),
/// Some(UtxoType::Message),
/// );
/// assert_eq!(wildcard, "utxos.message.0x0101010101010101010101010101010101010101010101010101010101010101");
/// assert_eq!(wildcard, "utxos.*.0x0101010101010101010101010101010101010101010101010101010101010101");
/// ```
///
/// Using the builder pattern:
Expand All @@ -54,15 +53,14 @@ use super::{types::UtxoType, MessageId};
/// # use fuel_streams_core::types::*;
/// # use fuel_streams_macros::subject::*;
/// let subject = UtxosSubject::new()
/// .with_tx_id(Some(Bytes32::from([1u8; 32])))
/// .with_hash(Some(MessageId::from([1u8; 32])))
/// .with_utxo_type(Some(UtxoType::Message));
/// assert_eq!(subject.parse(), "utxos.message.0x0101010101010101010101010101010101010101010101010101010101010101");
/// ```
#[derive(Subject, Debug, Clone, Default)]
#[subject_wildcard = "utxos.>"]
#[subject_format = "utxos.{utxo_type}.{hash}"]
#[allow(clippy::too_many_arguments)]
pub struct UtxosSubject {
pub hash: Option<MessageId>,
pub utxo_type: Option<UtxoType>,
Expand Down
1 change: 1 addition & 0 deletions crates/fuel-streams-publisher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ rand = { workspace = true }
rust_decimal = { version = "1.13" }
serde = { workspace = true }
serde_prometheus = { version = "0.2.5" }
sha2 = { workspace = true }
sysinfo = { version = "0.29.2" }
thiserror = "1.0.63"
tokio = { workspace = true }
Expand Down
Loading

0 comments on commit 94b90fb

Please sign in to comment.