Skip to content

Commit

Permalink
refactor(publisher): include logs and utxos
Browse files Browse the repository at this point in the history
  • Loading branch information
Jurshsmith committed Oct 1, 2024
1 parent 6939688 commit f5f46a0
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 76 deletions.
1 change: 1 addition & 0 deletions crates/fuel-streams-core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub use crate::{
logs::types::*,
nats::types::*,
transactions::types::*,
utxos::types::*,
};

// ------------------------------------------------------------------------
Expand Down
10 changes: 4 additions & 6 deletions crates/fuel-streams-core/src/utxos/subjects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::{types::UtxoType, MessageId};
/// # use fuel_streams_core::types::*;
/// # use fuel_streams_macros::subject::*;
/// let subject = UtxosSubject {
/// tx_id: Some(Bytes32::from([1u8; 32]).into()),
/// hash: Some(MessageId::from([1u8; 32])),
/// utxo_type: Some(UtxoType::Message),
/// };
/// assert_eq!(
Expand All @@ -40,11 +40,10 @@ use super::{types::UtxoType, MessageId};
/// # use fuel_streams_core::types::*;
/// # use fuel_streams_macros::subject::*;
/// let wildcard = UtxosSubject::wildcard(
/// Some(MessageId::from([1u8; 32])),
/// None,
/// Some(Bytes32::from([1u8; 32]).into()),
/// Some(UtxoType::Message),
/// );
/// assert_eq!(wildcard, "utxos.message.0x0101010101010101010101010101010101010101010101010101010101010101");
/// assert_eq!(wildcard, "utxos.*.0x0101010101010101010101010101010101010101010101010101010101010101");
/// ```
///
/// Using the builder pattern:
Expand All @@ -54,15 +53,14 @@ use super::{types::UtxoType, MessageId};
/// # use fuel_streams_core::types::*;
/// # use fuel_streams_macros::subject::*;
/// let subject = UtxosSubject::new()
/// .with_tx_id(Some(Bytes32::from([1u8; 32])))
/// .with_hash(Some(MessageId::from([1u8; 32])))
/// .with_utxo_type(Some(UtxoType::Message));
/// assert_eq!(subject.parse(), "utxos.message.0x0101010101010101010101010101010101010101010101010101010101010101");
/// ```
#[derive(Subject, Debug, Clone, Default)]
#[subject_wildcard = "utxos.>"]
#[subject_format = "utxos.{utxo_type}.{hash}"]
#[allow(clippy::too_many_arguments)]
pub struct UtxosSubject {
pub hash: Option<MessageId>,
pub utxo_type: Option<UtxoType>,
Expand Down
8 changes: 6 additions & 2 deletions crates/fuel-streams-publisher/src/inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use fuel_core_types::fuel_tx::{
};
use fuel_streams_core::{
inputs::{
InputsByIdSubject, InputsCoinSubject, InputsContractSubject,
InputsByIdSubject,
InputsCoinSubject,
InputsContractSubject,
InputsMessageSubject,
},
prelude::*,
Expand All @@ -19,7 +21,9 @@ use fuel_streams_core::{
};

use crate::{
build_subject_name, metrics::PublisherMetrics, publish_with_metrics,
build_subject_name,
metrics::PublisherMetrics,
publish_with_metrics,
};

fn coin_subject<T: CoinSpecification>(
Expand Down
73 changes: 34 additions & 39 deletions crates/fuel-streams-publisher/src/logs.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,49 @@
use std::sync::Arc;

use fuel_core_types::fuel_tx::Receipt;
use fuel_streams_core::{
logs::LogsSubject,
prelude::*,
types::{Transaction, UniqueIdentifier},
Stream,
};
use fuel_streams_core::{logs::LogsSubject, prelude::*, Stream};
use tracing::info;

use crate::{metrics::PublisherMetrics, publish_with_metrics, FuelCoreLike};
use crate::{
build_subject_name,
metrics::PublisherMetrics,
publish_with_metrics,
};

pub async fn publish(
metrics: &Arc<PublisherMetrics>,
fuel_core: &dyn FuelCoreLike,
logs_stream: &Stream<Log>,
transactions: &[Transaction],
block_producer: &Address,
receipts: Option<Vec<Receipt>>,
tx_id: Bytes32,
chain_id: &ChainId,
block_height: BlockHeight,
metrics: &Arc<PublisherMetrics>,
block_producer: &Address,
predicate_tag: Option<Bytes32>,
) -> anyhow::Result<()> {
let chain_id = fuel_core.chain_id();

for transaction in transactions.iter() {
let tx_id = transaction.id(chain_id);
let receipts = fuel_core.get_receipts(&tx_id)?;

if let Some(receipts) = receipts {
for (index, receipt) in receipts.iter().enumerate() {
match receipt {
receipt @ (Receipt::Log { id, .. }
| Receipt::LogData { id, .. }) => {
let subject = LogsSubject::new()
.with_block_height(Some(block_height.clone()))
.with_tx_id(Some(tx_id.into()))
.with_receipt_index(Some(index))
.with_log_id(Some((*id).into()));
let subject_wildcard = LogsSubject::WILDCARD;
if let Some(receipts) = receipts {
for (index, receipt) in receipts.iter().enumerate() {
match receipt {
Receipt::Log { id, .. } | Receipt::LogData { id, .. } => {
let subject = LogsSubject::new()
.with_block_height(Some(block_height.clone()))
.with_tx_id(Some(tx_id.clone()))
.with_receipt_index(Some(index))
.with_log_id(Some((*id).into()));
let subject_wildcard = LogsSubject::WILDCARD;

info!("NATS Publisher: Publishing Logs for 0x#{tx_id}");
publish_with_metrics!(
logs_stream
.publish(&subject, &(receipt.clone()).into()),
metrics,
chain_id,
block_producer,
subject_wildcard
);
}
_non_log_receipt => {}
info!("NATS Publisher: Publishing Logs for 0x#{tx_id}");
publish_with_metrics!(
logs_stream.publish_raw(
&build_subject_name(&predicate_tag, &subject),
&(receipt.clone()).into(),
),
metrics,
chain_id,
block_producer,
subject_wildcard
);
}
_non_log_receipt => {}
}
}
}
Expand Down
83 changes: 75 additions & 8 deletions crates/fuel-streams-publisher/src/publisher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{sync::Arc, time::Instant};
use std::sync::Arc;

use async_nats::{jetstream::stream::State as StreamState, RequestErrorKind};
use fuel_core::database::database_description::DatabaseHeight;
Expand All @@ -9,7 +9,9 @@ use fuel_streams::types::{Log, UniqueIdentifier};
use fuel_streams_core::{
blocks::BlocksSubject,
inputs::{
InputsByIdSubject, InputsCoinSubject, InputsContractSubject,
InputsByIdSubject,
InputsCoinSubject,
InputsContractSubject,
InputsMessageSubject,
},
logs::LogsSubject,
Expand All @@ -25,11 +27,18 @@ use tokio::sync::broadcast::error::RecvError;
use tracing::warn;

use crate::{
blocks, inputs, logs,
blocks,
inputs,
logs,
metrics::PublisherMetrics,
outputs, predicates, receipts,
outputs,
predicates,
receipts,
shutdown::{StopHandle, GRACEFUL_SHUTDOWN_TIMEOUT},
transactions, utxos, FuelCore, FuelCoreLike,
transactions,
utxos,
FuelCore,
FuelCoreLike,
};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -315,6 +324,7 @@ impl Publisher {
block_producer: &Address,
) -> anyhow::Result<()> {
let transactions = block.transactions();
let block_height = block.header().consensus().height;

let mut publishing_tasks = vec![blocks::publish(
&self.metrics,
Expand All @@ -329,13 +339,14 @@ impl Publisher {
{
let chain_id = self.fuel_core.chain_id();
let tx_id = transaction.id(chain_id);
let receipts = self.fuel_core.get_receipts(&tx_id)?;

publishing_tasks.push(
transactions::publish(
&self.streams.transactions,
(transaction_index, transaction),
&*self.fuel_core,
block.header().consensus().height.into(),
block_height.into(),
&self.metrics,
block_producer,
None,
Expand All @@ -346,7 +357,7 @@ impl Publisher {
publishing_tasks.push(
receipts::publish(
&self.streams.receipts,
self.fuel_core.get_receipts(&tx_id)?,
receipts.clone(),
tx_id.into(),
*chain_id,
&self.metrics,
Expand All @@ -356,6 +367,20 @@ impl Publisher {
.boxed(),
);

publishing_tasks.push(
logs::publish(
&self.streams.logs,
receipts.clone(),
tx_id.into(),
chain_id,
block_height.into(),
&self.metrics,
block_producer,
None,
)
.boxed(),
);

publishing_tasks.push(
inputs::publish(
&self.streams.inputs,
Expand All @@ -378,6 +403,20 @@ impl Publisher {
.boxed(),
);

publishing_tasks.push(
utxos::publish(
&self.metrics,
&self.streams.utxos,
&*self.fuel_core,
transaction,
tx_id.into(),
chain_id,
block_producer,
None,
)
.boxed(),
);

for input in transaction.inputs() {
if let Some((
predicate_bytecode,
Expand All @@ -404,7 +443,7 @@ impl Publisher {
publishing_tasks.push(
receipts::publish(
&self.streams.receipts,
self.fuel_core.get_receipts(&tx_id)?,
receipts.clone(),
tx_id.into(),
*chain_id,
&self.metrics,
Expand All @@ -414,6 +453,20 @@ impl Publisher {
.boxed(),
);

publishing_tasks.push(
logs::publish(
&self.streams.logs,
receipts.clone(),
tx_id.into(),
chain_id,
block_height.into(),
&self.metrics,
block_producer,
predicate_tag.clone(),
)
.boxed(),
);

publishing_tasks.push(
inputs::publish(
&self.streams.inputs,
Expand All @@ -431,6 +484,20 @@ impl Publisher {
&self.streams.outputs,
self.fuel_core.chain_id(),
transaction,
predicate_tag.clone(),
)
.boxed(),
);

publishing_tasks.push(
utxos::publish(
&self.metrics,
&self.streams.utxos,
&*self.fuel_core,
transaction,
tx_id.into(),
chain_id,
block_producer,
predicate_tag,
)
.boxed(),
Expand Down
11 changes: 8 additions & 3 deletions crates/fuel-streams-publisher/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,20 @@ use fuel_streams_core::{
prelude::*,
transactions::TransactionsSubject,
types::{
BlockHeight, Transaction, TransactionKind, TransactionStatus,
BlockHeight,
Transaction,
TransactionKind,
TransactionStatus,
UniqueIdentifier,
},
Stream,
};
use tracing::info;

use crate::{
build_subject_name, metrics::PublisherMetrics, publish_with_metrics,
build_subject_name,
metrics::PublisherMetrics,
publish_with_metrics,
FuelCoreLike,
};

Expand Down Expand Up @@ -42,7 +47,7 @@ pub async fn publish(
.with_tx_id(Some(tx_id.into()))
.with_kind(Some(kind))
.with_status(Some(status))
.with_height(Some(block_height))
.with_block_height(Some(block_height))
.with_tx_index(Some(transaction_index));

info!("NATS Publisher: Publishing Transaction 0x#{tx_id}");
Expand Down
Loading

0 comments on commit f5f46a0

Please sign in to comment.