From 4dcb1b7cb4a556e58191848ad8131bcbc25aa1b8 Mon Sep 17 00:00:00 2001 From: Joshua Oladele Date: Sat, 30 Nov 2024 21:50:40 +0100 Subject: [PATCH] fix(publisher): Ensure OffchainDB readiness and synchronize blocks in order This commit addresses two key issues in the publisher module: OffchainDB synchronization: Previously, receipts were not being populated because the OffchainDB lagged behind the OnChainDB. This has been resolved by ensuring the OffchainDB is fully synchronized to the current height before publishing any streams. In the future, separating streams into distinct processes could avoid delays for streams that do not depend on OffchainDB synchronization. Old and new blocks synchronization: The previous implementation used a blocking while loop to process old blocks until the latest block. This approach delayed the subscription to new blocks, causing significant block misses in the subscription stream. To resolve this, an UnpublishedBlocks abstraction has been introduced. It allows the system to subscribe to new blocks immediately, even while syncing old blocks. Blocks are now published in the correct order based on their heights, ensuring no data is missed or processed out of sequence. References: DS-124 --- Cargo.lock | 25 ++ Cargo.toml | 3 + crates/fuel-streams-publisher/Cargo.toml | 2 + .../src/publisher/fuel_core_like.rs | 62 ++--- .../src/publisher/mod.rs | 182 +++++++------- .../src/publisher/shutdown.rs | 1 - .../src/publisher/streams.rs | 9 + .../src/publisher/unpublished_blocks.rs | 226 ++++++++++++++++++ tests/Cargo.toml | 2 + tests/tests/publisher.rs | 10 +- 10 files changed, 391 insertions(+), 131 deletions(-) create mode 100644 crates/fuel-streams-publisher/src/publisher/unpublished_blocks.rs diff --git a/Cargo.lock b/Cargo.lock index bf15f201..44cbbdc6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3796,6 +3796,7 @@ dependencies = [ "fuel-streams", "fuel-streams-core", "futures", + "itertools 0.13.0", "num_cpus", "openssl", "parking_lot", @@ -3810,6 +3811,7 @@ dependencies = [ "sysinfo", "thiserror", "tokio", + "tokio-stream", "tracing", "tracing-actix-web", "url", @@ -8951,6 +8953,8 @@ dependencies = [ "pretty_assertions", "rand", "tokio", + "tracing", + "tracing-test", ] [[package]] @@ -9855,6 +9859,27 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-test" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" +dependencies = [ + "quote", + "syn 2.0.82", +] + [[package]] name = "trice" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 41c5c91e..643509e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ fuel-core-storage = { version = "0.40" } fuel-core-types = { version = "0.40", features = ["test-helpers", "serde"] } fuel-core-services = "0.40" futures-util = "0.3" +itertools = "0.13" hex = "0.4" pretty_assertions = "1.4" rand = "0.8" @@ -49,7 +50,9 @@ sha2 = "0.10" strum = "0.26" strum_macros = "0.26" tokio = { version = "1.41", features = ["full"] } +tokio-stream = "0.1.16" tracing = "0.1" +tracing-subscriber = "0.3" tracing-actix-web = "0.7" thiserror = "1.0" diff --git a/crates/fuel-streams-publisher/Cargo.toml b/crates/fuel-streams-publisher/Cargo.toml index 63520665..2e3debd2 100644 --- a/crates/fuel-streams-publisher/Cargo.toml +++ b/crates/fuel-streams-publisher/Cargo.toml @@ -33,6 +33,7 @@ fuel-core-types = { workspace = true } fuel-streams = { workspace = true, features = ["test-helpers"] } fuel-streams-core = { workspace = true, features = ["test-helpers"] } futures = { workspace = true } +itertools = { workspace = true } num_cpus = "1.16" parking_lot = { version = "0.12", features = ["serde"] } prometheus = { version = "0.13", features = ["process"] } @@ -46,6 +47,7 @@ sha2 = { workspace = true } sysinfo = { version = "0.29" } thiserror = "1.0" tokio = { workspace = true } +tokio-stream = { workspace = true } tracing = { workspace = true } tracing-actix-web = { workspace = true } url = "2.5.2" diff --git a/crates/fuel-streams-publisher/src/publisher/fuel_core_like.rs b/crates/fuel-streams-publisher/src/publisher/fuel_core_like.rs index e59ed6bd..8cc0ffbe 100644 --- a/crates/fuel-streams-publisher/src/publisher/fuel_core_like.rs +++ b/crates/fuel-streams-publisher/src/publisher/fuel_core_like.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use fuel_core::{ combined_database::CombinedDatabase, @@ -18,10 +18,9 @@ use fuel_core_storage::transactional::AtomicView; use fuel_core_types::{ blockchain::consensus::{Consensus, Sealed}, fuel_types::BlockHeight, - tai64::Tai64, }; use fuel_streams_core::types::*; -use tokio::sync::broadcast::Receiver; +use tokio::{sync::broadcast::Receiver, time::sleep}; pub type OffchainDatabase = GenericDatabase< IterableKeyValueViewWrapper< @@ -48,15 +47,21 @@ pub trait FuelCoreLike: Sync + Send { Ok(Arc::new(self.database().off_chain().latest_view()?)) } + async fn await_offchain_db_sync( + &self, + block_id: &FuelCoreBlockId, + ) -> anyhow::Result<()>; + fn blocks_subscription( &self, ) -> Receiver; - fn get_latest_block_height(&self) -> anyhow::Result> { + fn get_latest_block_height(&self) -> anyhow::Result { Ok(self .onchain_database() .latest_block_height()? - .map(|h| h.as_u64())) + .map(|h| h.as_u64()) + .unwrap_or_default()) } fn get_receipts( @@ -64,19 +69,6 @@ pub trait FuelCoreLike: Sync + Send { tx_id: &FuelCoreBytes32, ) -> anyhow::Result>>; - fn get_block_and_producer_by_height( - &self, - height: u32, - ) -> anyhow::Result<(FuelCoreBlock, Address)> { - let sealed_block = self - .onchain_database() - .latest_view()? - .get_sealed_block_by_height(&(height).into())? - .expect("NATS Publisher: no block at height {height}"); - - Ok(self.get_block_and_producer(&sealed_block)) - } - #[cfg(not(feature = "test-helpers"))] fn get_consensus( &self, @@ -103,7 +95,7 @@ pub trait FuelCoreLike: Sync + Send { #[cfg(not(feature = "test-helpers"))] fn get_block_and_producer( &self, - sealed_block: &Sealed, + sealed_block: Sealed, ) -> (FuelCoreBlock, Address) { let block = sealed_block.entity.clone(); let block_producer = sealed_block @@ -117,7 +109,7 @@ pub trait FuelCoreLike: Sync + Send { #[cfg(feature = "test-helpers")] fn get_block_and_producer( &self, - sealed_block: &Sealed, + sealed_block: Sealed, ) -> (FuelCoreBlock, Address) { let block = sealed_block.entity.clone(); let block_producer = sealed_block @@ -136,17 +128,6 @@ pub trait FuelCoreLike: Sync + Send { .expect("Failed to get latest block height") .expect("NATS Publisher: no block at height {height}") } - - fn get_sealed_block_time_by_height(&self, height: u32) -> Tai64 { - self.onchain_database() - .latest_view() - .expect("failed to get latest db view") - .get_sealed_block_header(&height.into()) - .expect("Failed to get sealed block header") - .expect("Failed to find sealed block header") - .entity - .time() - } } #[derive(Clone)] @@ -230,6 +211,25 @@ impl FuelCoreLike for FuelCore { } } + async fn await_offchain_db_sync( + &self, + block_id: &FuelCoreBlockId, + ) -> anyhow::Result<()> { + loop { + if self + .offchain_database()? + .get_block_height(block_id)? + .is_some() + { + break; + }; + + sleep(Duration::from_millis(500)).await; + } + + Ok(()) + } + fn base_asset_id(&self) -> &FuelCoreAssetId { &self.base_asset_id } diff --git a/crates/fuel-streams-publisher/src/publisher/mod.rs b/crates/fuel-streams-publisher/src/publisher/mod.rs index 8f6b2070..4ad3a296 100644 --- a/crates/fuel-streams-publisher/src/publisher/mod.rs +++ b/crates/fuel-streams-publisher/src/publisher/mod.rs @@ -3,14 +3,18 @@ pub mod payloads; pub mod shutdown; pub mod streams; -use std::sync::Arc; +mod unpublished_blocks; -use fuel_core_importer::ImporterResult; +use std::{cmp::max, sync::Arc}; + +use anyhow::Context; pub use fuel_core_like::{FuelCore, FuelCoreLike}; use fuel_streams_core::prelude::*; -use futures::{future::try_join_all, stream::FuturesUnordered}; +use futures::{future::try_join_all, stream::FuturesUnordered, StreamExt}; pub use streams::Streams; -use tokio::sync::{broadcast::error::RecvError, Semaphore}; +use tokio::sync::Semaphore; +use tokio_stream::wrappers::BroadcastStream; +use unpublished_blocks::UnpublishedBlocks; use super::{ payloads::blocks, @@ -69,16 +73,6 @@ impl Publisher { &self.streams } - async fn publish_block_data( - &self, - result: ImporterResult, - ) -> anyhow::Result<()> { - let (block, block_producer) = - self.fuel_core.get_block_and_producer(&result.sealed_block); - self.publish(&block, &block_producer).await?; - Ok(()) - } - async fn shutdown_services_with_timeout(&self) -> anyhow::Result<()> { tokio::time::timeout(GRACEFUL_SHUTDOWN_TIMEOUT, async { Publisher::flush_await_all_streams(&self.nats_client).await; @@ -105,100 +99,92 @@ impl Publisher { &self, shutdown_token: ShutdownToken, ) -> anyhow::Result<()> { - let last_published_block = self - .streams - .blocks - .get_last_published(BlocksSubject::WILDCARD) + tracing::info!("Publishing started..."); + + let latest_block_height = self.fuel_core.get_latest_block_height()?; + let last_published_block_height = self + .get_last_published_block_height(latest_block_height) .await?; - let last_published_height: u64 = last_published_block - .map(|block| block.height.into()) - .unwrap_or(0); - - // Catch up the streams with the FuelCore - if let Some(latest_fuel_core_height) = - self.fuel_core.get_latest_block_height()? - { - if latest_fuel_core_height > last_published_height + 1 { - tracing::warn!("Missing blocks: last block height in Node={latest_fuel_core_height}, last published block height={last_published_height}"); - } - let latest_fuel_block_time_unix = self - .fuel_core - .get_sealed_block_time_by_height(latest_fuel_core_height as u32) - .to_unix(); - - let mut height = last_published_height; - while height <= latest_fuel_core_height { - tokio::select! { - shutdown = shutdown_token.wait_for_shutdown() => { - if shutdown { - tracing::info!("Shutdown signal received during historical blocks processing. Last published block height {height}"); - self.shutdown_services_with_timeout().await?; - return Ok(()); - } - }, - (result, block_producer) = async { - - let fuel_block_time_unix = self - .fuel_core - .get_sealed_block_time_by_height(height as u32) - .to_unix(); - - if fuel_block_time_unix < latest_fuel_block_time_unix - (FUEL_BLOCK_TIME_SECS * MAX_RETENTION_BLOCKS) as i64 { - // Skip publishing for this block and move to the next height - tracing::warn!("Block {} with time: {} is more than 100 blocks behind chain tip, skipped publishing", height, fuel_block_time_unix); - return (Ok(()), None); - } - - let sealed_block = self - .fuel_core - .get_sealed_block_by_height(height as u32); - - let (block, block_producer) = - self.fuel_core.get_block_and_producer(&sealed_block); - - (self.publish(&block, &block_producer).await, Some(block_producer.clone())) - } => { - if let (Err(err), Some(block_producer)) = (result, block_producer) { - tracing::warn!("Failed to publish block data: {}", err); - self.telemetry.record_failed_publishing(self.fuel_core.chain_id(), &block_producer); - } - // Increment the height after processing - height += 1; - } - } - } - } + // TODO: Move the synchronous operations here to a dedicated Rayon pool + let old_blocks_stream = futures::stream::iter( + last_published_block_height..latest_block_height, + ) + .map(|height| self.fuel_core.get_sealed_block_by_height(height as u32)); + + let new_blocks_stream = BroadcastStream::new( + self.fuel_core.blocks_subscription(), + ) + .map(|import_result| { + import_result + .expect("Failed to get ImportResult") + .sealed_block + .clone() + }); - // publish subscribed data - loop { - let fuel_core = self.fuel_core.clone(); - let mut blocks_subscription = fuel_core.blocks_subscription(); - - tokio::select! { - result = blocks_subscription.recv() => { - match result { - Ok(result) => { - self.publish_block_data(result).await?; - } - Err(RecvError::Closed) | Err(RecvError::Lagged(_)) => { - // The sender has been dropped or has lagged, exit the loop - break; - } + let unpublished_blocks = + UnpublishedBlocks::new(last_published_block_height); + + let mut blocks_stream = + futures::stream::select(old_blocks_stream, new_blocks_stream); + + tokio::select! { + Some(block) = blocks_stream.next() => { + let unpublished_blocks = unpublished_blocks.clone(); + unpublished_blocks.add_block(block); + let next_blocks_to_publish = + unpublished_blocks.get_next_blocks_to_publish(); + + tracing::info!("Processing blocks stream"); + + for sealed_block in next_blocks_to_publish.into_iter() { + let fuel_core = &self.fuel_core; + let (block, block_producer) = + fuel_core.get_block_and_producer(sealed_block); + + // TODO: Avoid awaiting Offchain DB sync for all streams by grouping in their own service + fuel_core + .await_offchain_db_sync(&block.id()) + .await + .context("Failed to await Offchain DB sync")?; + + if let Err(err) = self.publish(&block, &block_producer).await { + tracing::error!("Failed to publish block data: {}", err); + self.telemetry.record_failed_publishing(self.fuel_core.chain_id(), &block_producer); } } - shutdown = shutdown_token.wait_for_shutdown() => { - if shutdown { - self.shutdown_services_with_timeout().await?; - break; - } + }, + shutdown = shutdown_token.wait_for_shutdown() => { + if shutdown { + tracing::info!("Shutdown signal received. Stopping services ..."); + self.shutdown_services_with_timeout().await?; + tracing::info!("Services stopped successfully!"); } - } - } + }, + }; Ok(()) } + const MAX_RETAINED_BLOCKS: u64 = 100; + async fn get_last_published_block_height( + &self, + latest_block_height: u64, + ) -> anyhow::Result { + Ok(self + .streams + .get_last_published_block() + .await? + .map(|block| block.height.into()) + .map(|block_height: u64| { + max( + block_height, + latest_block_height - Self::MAX_RETAINED_BLOCKS, + ) + }) + .unwrap_or_default()) + } + async fn publish( &self, block: &FuelCoreBlock, diff --git a/crates/fuel-streams-publisher/src/publisher/shutdown.rs b/crates/fuel-streams-publisher/src/publisher/shutdown.rs index ea2a0e0a..606fe97d 100644 --- a/crates/fuel-streams-publisher/src/publisher/shutdown.rs +++ b/crates/fuel-streams-publisher/src/publisher/shutdown.rs @@ -17,7 +17,6 @@ pub struct ShutdownToken { impl ShutdownToken { pub async fn wait_for_shutdown(&self) -> bool { - // Clone the receiver for this wait operation let mut rx = self.receiver.resubscribe(); rx.recv().await.is_ok() } diff --git a/crates/fuel-streams-publisher/src/publisher/streams.rs b/crates/fuel-streams-publisher/src/publisher/streams.rs index 68dfd869..14a7127a 100644 --- a/crates/fuel-streams-publisher/src/publisher/streams.rs +++ b/crates/fuel-streams-publisher/src/publisher/streams.rs @@ -54,6 +54,15 @@ impl Streams { ] } + pub async fn get_last_published_block( + &self, + ) -> anyhow::Result> { + Ok(self + .blocks + .get_last_published(BlocksSubject::WILDCARD) + .await?) + } + pub async fn get_consumers_and_state( &self, ) -> Result, StreamState)>, RequestErrorKind> { diff --git a/crates/fuel-streams-publisher/src/publisher/unpublished_blocks.rs b/crates/fuel-streams-publisher/src/publisher/unpublished_blocks.rs new file mode 100644 index 00000000..dd5d6c1b --- /dev/null +++ b/crates/fuel-streams-publisher/src/publisher/unpublished_blocks.rs @@ -0,0 +1,226 @@ +use std::{collections::HashMap, sync::Arc}; + +use fuel_core::database::database_description::DatabaseHeight; +use fuel_core_types::blockchain::consensus::Sealed; +use fuel_streams_core::prelude::*; +use itertools::Itertools; +use parking_lot::Mutex; + +/// Manages a collection of unpublished blocks for sequential publishing +/// +/// This was introduced to allow the publisher sequentially publish each block even though +/// it will receive them out of order from two different streams, the block importer stream and +/// the old blocks stream +/// +/// # Key Features +/// - Thread-safe block storage using `Arc>` +/// - Sequential block publishing +/// - Ability to handle blocks received out of order +/// +/// # Example +/// ``` +/// let unpublished_blocks = UnpublishedBlocks::new(10); +/// unpublished_blocks.add_block(block11); +/// unpublished_blocks.add_block(block12); +/// let next_blocks = unpublished_blocks.get_next_blocks_to_publish(); +/// ``` +#[derive(Debug, Default, Clone)] +pub struct UnpublishedBlocks { + inner: Arc>, +} + +#[derive(Debug, Default, Clone)] +struct UnpublishedBlocksInner { + blocks: HashMap>>, + next_height_to_publish: u64, +} + +impl UnpublishedBlocks { + pub fn new(last_published_block_height: u64) -> Self { + Self { + inner: Arc::new(Mutex::new(UnpublishedBlocksInner { + blocks: HashMap::new(), + next_height_to_publish: last_published_block_height, + })), + } + } + + pub fn add_block(&self, block: Sealed>) { + let height = block.entity.header().consensus().height.as_u64(); + self.inner.lock().blocks.insert(height, block); + } + + pub fn get_next_blocks_to_publish( + &self, + ) -> Vec>> { + let mut next_blocks_to_publish = vec![]; + let mut unpublished_blocks = self.inner.lock(); + + let unpublished_heights: Vec<_> = + unpublished_blocks.blocks.keys().sorted().cloned().collect(); + + for unpublished_height in unpublished_heights { + if unpublished_height == unpublished_blocks.next_height_to_publish { + let block = unpublished_blocks + .blocks + .remove(&unpublished_height) + .unwrap(); + next_blocks_to_publish.push(block); + unpublished_blocks.next_height_to_publish += 1; + } else { + break; + } + } + + next_blocks_to_publish + } +} + +#[cfg(test)] +mod tests { + use fuel_core_types::blockchain::SealedBlock; + + use super::*; + + #[test] + fn test_new_unpublished_blocks() { + let last_published_block_height = 10; + let unpublished_blocks = + UnpublishedBlocks::new(last_published_block_height); + + let inner = unpublished_blocks.inner.lock(); + assert!(inner.blocks.is_empty()); + assert_eq!(inner.next_height_to_publish, last_published_block_height); + } + + #[test] + fn test_add_block() { + let unpublished_blocks = UnpublishedBlocks::new(11); + + let block1 = create_mock_block(11); + let block2 = create_mock_block(12); + + unpublished_blocks.add_block(block1.clone()); + unpublished_blocks.add_block(block2.clone()); + + let inner = unpublished_blocks.inner.lock(); + assert_eq!(inner.blocks.len(), 2); + assert!(inner.blocks.contains_key(&11)); + assert!(inner.blocks.contains_key(&12)); + } + + #[test] + fn test_get_next_blocks_to_publish_sequential() { + let unpublished_blocks = UnpublishedBlocks::new(11); + + // Add blocks 11, 12, 13 in sequence + unpublished_blocks.add_block(create_mock_block(11)); + unpublished_blocks.add_block(create_mock_block(12)); + unpublished_blocks.add_block(create_mock_block(13)); + + // First call should return block 11 + let next_blocks = unpublished_blocks.get_next_blocks_to_publish(); + assert_eq!(next_blocks.len(), 1); + assert_eq!( + next_blocks[0].entity.header().consensus().height.as_u64(), + 11 + ); + + // Next call should return block 12 + let next_blocks = unpublished_blocks.get_next_blocks_to_publish(); + assert_eq!(next_blocks.len(), 1); + assert_eq!( + next_blocks[0].entity.header().consensus().height.as_u64(), + 12 + ); + } + + #[test] + fn test_get_next_blocks_to_publish_non_sequential() { + let unpublished_blocks = UnpublishedBlocks::new(11); + + // Add blocks out of sequence + unpublished_blocks.add_block(create_mock_block(12)); + unpublished_blocks.add_block(create_mock_block(11)); + unpublished_blocks.add_block(create_mock_block(14)); + + // First call should return only block 11 + let next_blocks = unpublished_blocks.get_next_blocks_to_publish(); + assert_eq!(next_blocks.len(), 1); + assert_eq!( + next_blocks[0].entity.header().consensus().height.as_u64(), + 11 + ); + + // Next call should return block 12 + let next_blocks = unpublished_blocks.get_next_blocks_to_publish(); + assert_eq!(next_blocks.len(), 1); + assert_eq!( + next_blocks[0].entity.header().consensus().height.as_u64(), + 12 + ); + + // Verify that block 14 is not returned yet + let next_blocks = unpublished_blocks.get_next_blocks_to_publish(); + assert_eq!(next_blocks.len(), 0); + } + + #[test] + fn test_get_next_blocks_to_returns_next_blocks_and_leaves_out_the_rest() { + let unpublished_blocks = UnpublishedBlocks::new(1); + + // Add blocks with heights 1, 2, and 11 + unpublished_blocks.add_block(create_mock_block(1)); + unpublished_blocks.add_block(create_mock_block(2)); + unpublished_blocks.add_block(create_mock_block(11)); + + // First call should return blocks with heights 1 and 2 + let next_blocks = unpublished_blocks.get_next_blocks_to_publish(); + assert_eq!(next_blocks.len(), 2); + + // Verify the heights of returned blocks + let returned_heights: Vec = next_blocks + .iter() + .map(|block| block.entity.header().consensus().height.as_u64()) + .collect(); + + assert_eq!(returned_heights, vec![1, 2]); + + // Verify remaining state + let inner = unpublished_blocks.inner.lock(); + assert_eq!(inner.next_height_to_publish, 3); + assert_eq!(inner.blocks.len(), 1); // Only block with height 11 should remain + assert!(inner.blocks.contains_key(&11)); + } + + #[test] + fn test_next_height_increments_correctly() { + let unpublished_blocks = UnpublishedBlocks::new(11); + + unpublished_blocks.add_block(create_mock_block(11)); + unpublished_blocks.add_block(create_mock_block(12)); + + let next_blocks_to_publish = + unpublished_blocks.get_next_blocks_to_publish(); + + assert_eq!(next_blocks_to_publish.len(), 2); + + let inner = unpublished_blocks.inner.lock(); + assert_eq!(inner.next_height_to_publish, 13); + assert!(inner.blocks.is_empty()); + } + + fn create_mock_block( + height: u64, + ) -> Sealed> { + let mut block = FuelCoreBlock::default(); + + block.header_mut().consensus_mut().height = + FuelCoreBlockHeight::new(height as u32); + + SealedBlock { + entity: block, + ..Default::default() + } + } +} diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 9f1c81b8..a7e42fdf 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -33,6 +33,8 @@ fuel-streams-publisher = { workspace = true, features = ["test-helpers"] } futures = { workspace = true } rand = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros", "test-util"] } +tracing = { workspace = true } +tracing-test = "0.2" [dev-dependencies] pretty_assertions = { workspace = true } diff --git a/tests/tests/publisher.rs b/tests/tests/publisher.rs index cf7cf69e..3f916839 100644 --- a/tests/tests/publisher.rs +++ b/tests/tests/publisher.rs @@ -52,6 +52,13 @@ impl FuelCoreLike for TestFuelCore { } async fn stop(&self) {} + async fn await_offchain_db_sync( + &self, + _block_id: &FuelCoreBlockId, + ) -> anyhow::Result<()> { + Ok(()) + } + fn base_asset_id(&self) -> &FuelCoreAssetId { &self.base_asset_id } @@ -226,7 +233,8 @@ async fn publishes_receipts() { } } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] +#[tracing_test::traced_test] async fn publishes_inputs() { let (blocks_broadcaster, _) = broadcast::channel::(1); let publisher = new_publisher(blocks_broadcaster.clone()).await;