Skip to content

Commit

Permalink
Merge branch 'main' into feat/eugene/stream-utxos
Browse files Browse the repository at this point in the history
  • Loading branch information
0xterminator committed Sep 25, 2024
2 parents 26caf1e + 5895a65 commit 405d2b2
Show file tree
Hide file tree
Showing 21 changed files with 453 additions and 57 deletions.
2 changes: 1 addition & 1 deletion benches/nats-publisher/src/utils/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl TxHelper {
let mut subject: TransactionsSubject = tx.into();
subject = subject
.with_tx_index(Some(index))
.with_height(Some(BlockHeight::from(self.get_height(block))))
.with_block_height(Some(BlockHeight::from(self.get_height(block))))
.with_status(self.get_status(tx).map(Into::into));
subject
}
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-streams-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ futures = { workspace = true }
hex = "0.4.3"
pretty_assertions = { workspace = true, optional = true }
rand = { workspace = true }
serde = "1"
serde = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/fuel-streams-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

pub mod blocks;
pub mod inputs;
pub mod logs;
pub mod nats;
pub mod outputs;
pub mod receipts;
Expand Down
13 changes: 13 additions & 0 deletions crates/fuel-streams-core/src/logs/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
pub mod subjects;
pub mod types;

pub use subjects::*;
use types::*;

use crate::prelude::*;

impl StreamEncoder for Log {}
impl Streamable for Log {
const NAME: &'static str = "logs";
const WILDCARD_LIST: &'static [&'static str] = &[LogsSubject::WILDCARD];
}
81 changes: 81 additions & 0 deletions crates/fuel-streams-core/src/logs/subjects.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use fuel_streams_macros::subject::{IntoSubject, Subject};

use crate::types::*;

/// Represents a subject for logs related to transactions in the Fuel network.
///
/// This subject format allows for efficient querying and filtering of logs
/// based on the block height, transaction ID, the index of the receipt within the transaction,
/// and the unique log ID.
///
/// # Examples
///
/// Creating a subject for a specific log:
///
/// ```
/// # use fuel_streams_core::logs::subjects::LogsSubject;
/// # use fuel_streams_core::types::*;
/// # use fuel_streams_macros::subject::*;
/// let subject = LogsSubject {
/// block_height: Some(1000.into()),
/// tx_id: Some(Bytes32::from([1u8; 32])),
/// receipt_index: Some(0),
/// log_id: Some(Bytes32::from([2u8; 32])),
/// };
/// assert_eq!(
/// subject.parse(),
/// "logs.1000.0x0101010101010101010101010101010101010101010101010101010101010101.0.0x0202020202020202020202020202020202020202020202020202020202020202"
/// );
/// ```
///
/// Wildcard for querying all logs:
///
/// ```
/// # use fuel_streams_core::logs::subjects::LogsSubject;
/// # use fuel_streams_macros::subject::*;
/// assert_eq!(LogsSubject::WILDCARD, "logs.>");
/// ```
///
/// Creating a subject query using the `wildcard` method:
///
/// ```
/// # use fuel_streams_core::logs::subjects::LogsSubject;
/// # use fuel_streams_core::types::*;
/// # use fuel_streams_macros::subject::*;
/// let wildcard = LogsSubject::wildcard(
/// Some(1000.into()),
/// Some(Bytes32::from([1u8; 32])),
/// None,
/// None
/// );
/// assert_eq!(
/// wildcard,
/// "logs.1000.0x0101010101010101010101010101010101010101010101010101010101010101.*.*"
/// );
/// ```
///
/// Using the builder pattern:
///
/// ```
/// # use fuel_streams_core::logs::subjects::LogsSubject;
/// # use fuel_streams_core::types::*;
/// # use fuel_streams_macros::subject::*;
/// let subject = LogsSubject::new()
/// .with_block_height(Some(2310.into()))
/// .with_tx_id(Some(Bytes32::from([1u8; 32])))
/// .with_receipt_index(Some(0))
/// .with_log_id(Some(Bytes32::from([2u8; 32])));
/// assert_eq!(
/// subject.parse(),
/// "logs.2310.0x0101010101010101010101010101010101010101010101010101010101010101.0.0x0202020202020202020202020202020202020202020202020202020202020202"
/// );
/// ```
#[derive(Subject, Debug, Clone, Default)]
#[subject_wildcard = "logs.>"]
#[subject_format = "logs.{block_height}.{tx_id}.{receipt_index}.{log_id}"]
pub struct LogsSubject {
pub block_height: Option<BlockHeight>,
pub tx_id: Option<Bytes32>,
pub receipt_index: Option<usize>,
pub log_id: Option<Bytes32>,
}
140 changes: 140 additions & 0 deletions crates/fuel-streams-core/src/logs/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use fuel_core_types::fuel_tx::{Bytes32, ContractId, Receipt, Word};
use serde::{Deserialize, Deserializer, Serialize, Serializer};

/// A convenient aggregate type to represent a Fuel logs to allow users
/// think about them agnostic of receipts.
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub enum Log {
WithoutData {
id: ContractId,
ra: Word,
rb: Word,
rc: Word,
rd: Word,
pc: Word,
is: Word,
},
WithData {
id: ContractId,
ra: Word,
rb: Word,
ptr: Word,
len: Word,
digest: Bytes32,
pc: Word,
is: Word,
data: Option<Vec<u8>>,
},
}

impl From<Receipt> for Log {
fn from(value: Receipt) -> Self {
match value {
Receipt::Log {
id,
ra,
rb,
rc,
rd,
pc,
is,
} => Log::WithoutData {
id,
ra,
rb,
rc,
rd,
pc,
is,
},
Receipt::LogData {
id,
ra,
rb,
ptr,
len,
digest,
pc,
is,
data,
} => Log::WithData {
id,
ra,
rb,
ptr,
len,
digest,
pc,
is,
data,
},
_ => panic!("Invalid receipt type"),
}
}
}

/// Introduced majorly allow delegating serialization and deserialization to `fuel-core`'s Receipt
impl From<Log> for Receipt {
fn from(log: Log) -> Receipt {
match log {
Log::WithoutData {
id,
ra,
rb,
rc,
rd,
pc,
is,
} => Receipt::Log {
id,
ra,
rb,
rc,
rd,
pc,
is,
},
Log::WithData {
id,
ra,
rb,
ptr,
len,
digest,
pc,
is,
data,
} => Receipt::LogData {
id,
ra,
rb,
ptr,
len,
digest,
pc,
is,
data,
},
}
}
}

impl Serialize for Log {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let receipt: Receipt = self.clone().into();
receipt.serialize(serializer)
}
}

impl<'de> Deserialize<'de> for Log {
fn deserialize<D>(deserializer: D) -> Result<Log, D::Error>
where
D: Deserializer<'de>,
{
let receipt = Receipt::deserialize(deserializer)?;
Ok(Log::from(receipt))
}
}
2 changes: 1 addition & 1 deletion crates/fuel-streams-core/src/outputs/subjects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ pub struct OutputsChangeSubject {
/// .with_tx_id(Some(Bytes32::zeroed()))
/// .with_index(Some(0))
/// .with_to(Some(Address::zeroed()))
/// .with_asset_id(Some(Bytes32::from([1u8; 32])));
/// .with_asset_id(Some(AssetId::from([1u8; 32])));
/// assert_eq!(
/// subject.to_string(),
/// "outputs.variable.0x0000000000000000000000000000000000000000000000000000000000000000.0.0x0000000000000000000000000000000000000000000000000000000000000000.0x0101010101010101010101010101010101010101010101010101010101010101"
Expand Down
10 changes: 5 additions & 5 deletions crates/fuel-streams-core/src/transactions/subjects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{blocks::types::BlockHeight, types::*};
/// # use fuel_streams_core::types::*;
/// # use fuel_streams_macros::subject::IntoSubject;
/// let subject = TransactionsSubject {
/// height: Some(23.into()),
/// block_height: Some(23.into()),
/// tx_index: Some(1),
/// tx_id: Some(Bytes32::zeroed()),
/// status: Some(TransactionStatus::Success),
Expand Down Expand Up @@ -52,7 +52,7 @@ use crate::{blocks::types::BlockHeight, types::*};
/// # use fuel_streams_core::types::*;
/// # use fuel_streams_macros::subject::*;
/// let subject = TransactionsSubject::new()
/// .with_height(Some(23.into()))
/// .with_block_height(Some(23.into()))
/// .with_tx_index(Some(1))
/// .with_tx_id(Some(Bytes32::zeroed()))
/// .with_status(Some(TransactionStatus::Success))
Expand All @@ -61,9 +61,9 @@ use crate::{blocks::types::BlockHeight, types::*};
/// ```
#[derive(Subject, Debug, Clone, Default)]
#[subject_wildcard = "transactions.>"]
#[subject_format = "transactions.{height}.{tx_index}.{tx_id}.{status}.{kind}"]
#[subject_format = "transactions.{block_height}.{tx_index}.{tx_id}.{status}.{kind}"]
pub struct TransactionsSubject {
pub height: Option<BlockHeight>,
pub block_height: Option<BlockHeight>,
pub tx_index: Option<usize>,
pub tx_id: Option<Bytes32>,
pub status: Option<TransactionStatus>,
Expand Down Expand Up @@ -146,7 +146,7 @@ mod test {
fn transactions_subjects_from_transaction() {
let mock_tx = MockTransaction::build();
let subject = TransactionsSubject::from(&mock_tx);
assert!(subject.height.is_none());
assert!(subject.block_height.is_none());
assert!(subject.tx_index.is_none());
assert!(subject.status.is_none());
assert!(subject.kind.is_some());
Expand Down
1 change: 1 addition & 0 deletions crates/fuel-streams-core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub use fuel_core_types::{
pub use crate::{
blocks::types::*,
inputs::types::*,
logs::types::*,
nats::types::*,
transactions::types::*,
};
Expand Down
6 changes: 1 addition & 5 deletions crates/fuel-streams-core/src/utxos/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
pub mod subjects;
pub mod types;

use serde::{Deserialize, Serialize};
pub use subjects::*;
use types::Utxo;

use crate::prelude::*;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Utxo(pub Option<Vec<u8>>);

impl StreamEncoder for Utxo {}
impl Streamable for Utxo {
const NAME: &'static str = "utxos";
Expand Down
37 changes: 27 additions & 10 deletions crates/fuel-streams-core/src/utxos/subjects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@ use super::{types::UtxoType, Address, Bytes32, MessageId, Nonce};
/// # use fuel_streams_core::types::*;
/// # use fuel_streams_macros::subject::*;
/// let subject = UtxosSubject {
/// tx_id: Some(Bytes32::from([1u8; 32])),
/// index: Some(0),
/// sender: Some(Address::from([2u8; 32])),
/// recipient: Some(Address::from([3u8; 32])),
/// tx_id: Some(Bytes32::from([1u8; 32]).into()),
/// utxo_type: Some(UtxoType::Message),
/// nonce: Some(Nonce::zeroed()),
/// amount: Some(100),
/// sender: Some(Address::from([2u8; 32]).into()),
/// data: Some("".to_string()),
/// recipient: Some(Address::from([3u8; 32]).into()),
/// hash: None,
/// };
/// assert_eq!(
/// subject.parse(),
/// "utxos.0x0101010101010101010101010101010101010101010101010101010101010101.0.message.0x0202020202020202020202020202020202020202020202020202020202020202.0x0303030303030303030303030303030303030303030303030303030303030303"
/// "utxos.message.*"
/// );
/// ```
///
Expand All @@ -45,8 +49,17 @@ use super::{types::UtxoType, Address, Bytes32, MessageId, Nonce};
/// # use fuel_streams_core::utxos::subjects::UtxosSubject;
/// # use fuel_streams_core::types::*;
/// # use fuel_streams_macros::subject::*;
/// let wildcard = UtxosSubject::wildcard(Some(Bytes32::from([1u8; 32])), None, None, None);
/// assert_eq!(wildcard, "utxos.0x0101010101010101010101010101010101010101010101010101010101010101.*.message.*.*");
/// let wildcard = UtxosSubject::wildcard(
/// None,
/// Some(Bytes32::from([1u8; 32]).into()),
/// Some(UtxoType::Message),
/// Some(Address::from([2u8; 32]).into()),
/// Some(Address::from([3u8; 32]).into()),
/// Some(Nonce::zeroed()),
/// Some("".to_string()),
/// Some(100),
/// );
/// assert_eq!(wildcard, "utxos.message.*");
/// ```
///
/// Using the builder pattern:
Expand All @@ -57,10 +70,14 @@ use super::{types::UtxoType, Address, Bytes32, MessageId, Nonce};
/// # use fuel_streams_macros::subject::*;
/// let subject = UtxosSubject::new()
/// .with_tx_id(Some(Bytes32::from([1u8; 32])))
/// .with_index(Some(0))
/// .with_utxo_type(Some(UtxoType::Message))
/// .with_nonce(Some(Nonce::zeroed()))
/// .with_amount(Some(100))
/// .with_sender(Some(Address::from([2u8; 32])))
/// .with_recipient(Some(Address::from([3u8; 32])));
/// assert_eq!(subject.parse(), "utxos.0x0101010101010101010101010101010101010101010101010101010101010101.0.message.0x0202020202020202020202020202020202020202020202020202020202020202.0x0303030303030303030303030303030303030303030303030303030303030303");
/// .with_data(Some("".to_string()))
/// .with_recipient(Some(Address::from([3u8; 32])))
/// .with_computed_hash();
/// assert_eq!(subject.parse(), "utxos.message.0x954b025b24f1c1ee5bdcb4065a4d283c1e088a9a82115d2576a7203492ada721");
/// ```
#[derive(Subject, Debug, Clone, Default)]
Expand Down
Loading

0 comments on commit 405d2b2

Please sign in to comment.