Skip to content

Commit

Permalink
fix(publisher): Ensure OffchainDB readiness and synchronize blocks in…
Browse files Browse the repository at this point in the history
… order

This commit addresses two key issues in the publisher module:

OffchainDB synchronization:
Previously, receipts were not being populated because the OffchainDB lagged behind the OnChainDB.
This has been resolved by ensuring the OffchainDB is fully synchronized to the current height before
publishing any streams. In the future, separating streams into distinct processes could avoid delays
for streams that do not depend on OffchainDB synchronization.

Old and new blocks synchronization:
The previous implementation used a blocking while loop to process old blocks until the latest block. This
approach delayed the subscription to new blocks, causing significant block misses in the subscription stream.
To resolve this, an UnpublishedBlocks abstraction has been introduced. It allows the system to subscribe to new
blocks immediately, even while syncing old blocks. Blocks are now published in the correct order based on their
heights, ensuring no data is missed or processed out of sequence.

References: DS-124
  • Loading branch information
Jurshsmith committed Nov 30, 2024
1 parent d854490 commit 4dcb1b7
Show file tree
Hide file tree
Showing 10 changed files with 391 additions and 131 deletions.
25 changes: 25 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ fuel-core-storage = { version = "0.40" }
fuel-core-types = { version = "0.40", features = ["test-helpers", "serde"] }
fuel-core-services = "0.40"
futures-util = "0.3"
itertools = "0.13"
hex = "0.4"
pretty_assertions = "1.4"
rand = "0.8"
Expand All @@ -49,7 +50,9 @@ sha2 = "0.10"
strum = "0.26"
strum_macros = "0.26"
tokio = { version = "1.41", features = ["full"] }
tokio-stream = "0.1.16"
tracing = "0.1"
tracing-subscriber = "0.3"
tracing-actix-web = "0.7"
thiserror = "1.0"

Expand Down
2 changes: 2 additions & 0 deletions crates/fuel-streams-publisher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ fuel-core-types = { workspace = true }
fuel-streams = { workspace = true, features = ["test-helpers"] }
fuel-streams-core = { workspace = true, features = ["test-helpers"] }
futures = { workspace = true }
itertools = { workspace = true }
num_cpus = "1.16"
parking_lot = { version = "0.12", features = ["serde"] }
prometheus = { version = "0.13", features = ["process"] }
Expand All @@ -46,6 +47,7 @@ sha2 = { workspace = true }
sysinfo = { version = "0.29" }
thiserror = "1.0"
tokio = { workspace = true }
tokio-stream = { workspace = true }
tracing = { workspace = true }
tracing-actix-web = { workspace = true }
url = "2.5.2"
Expand Down
62 changes: 31 additions & 31 deletions crates/fuel-streams-publisher/src/publisher/fuel_core_like.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use fuel_core::{
combined_database::CombinedDatabase,
Expand All @@ -18,10 +18,9 @@ use fuel_core_storage::transactional::AtomicView;
use fuel_core_types::{
blockchain::consensus::{Consensus, Sealed},
fuel_types::BlockHeight,
tai64::Tai64,
};
use fuel_streams_core::types::*;
use tokio::sync::broadcast::Receiver;
use tokio::{sync::broadcast::Receiver, time::sleep};

pub type OffchainDatabase = GenericDatabase<
IterableKeyValueViewWrapper<
Expand All @@ -48,35 +47,28 @@ pub trait FuelCoreLike: Sync + Send {
Ok(Arc::new(self.database().off_chain().latest_view()?))
}

async fn await_offchain_db_sync(
&self,
block_id: &FuelCoreBlockId,
) -> anyhow::Result<()>;

fn blocks_subscription(
&self,
) -> Receiver<fuel_core_importer::ImporterResult>;

fn get_latest_block_height(&self) -> anyhow::Result<Option<u64>> {
fn get_latest_block_height(&self) -> anyhow::Result<u64> {
Ok(self
.onchain_database()
.latest_block_height()?
.map(|h| h.as_u64()))
.map(|h| h.as_u64())
.unwrap_or_default())
}

fn get_receipts(
&self,
tx_id: &FuelCoreBytes32,
) -> anyhow::Result<Option<Vec<FuelCoreReceipt>>>;

fn get_block_and_producer_by_height(
&self,
height: u32,
) -> anyhow::Result<(FuelCoreBlock, Address)> {
let sealed_block = self
.onchain_database()
.latest_view()?
.get_sealed_block_by_height(&(height).into())?
.expect("NATS Publisher: no block at height {height}");

Ok(self.get_block_and_producer(&sealed_block))
}

#[cfg(not(feature = "test-helpers"))]
fn get_consensus(
&self,
Expand All @@ -103,7 +95,7 @@ pub trait FuelCoreLike: Sync + Send {
#[cfg(not(feature = "test-helpers"))]
fn get_block_and_producer(
&self,
sealed_block: &Sealed<FuelCoreBlock>,
sealed_block: Sealed<FuelCoreBlock>,
) -> (FuelCoreBlock, Address) {
let block = sealed_block.entity.clone();
let block_producer = sealed_block
Expand All @@ -117,7 +109,7 @@ pub trait FuelCoreLike: Sync + Send {
#[cfg(feature = "test-helpers")]
fn get_block_and_producer(
&self,
sealed_block: &Sealed<FuelCoreBlock>,
sealed_block: Sealed<FuelCoreBlock>,
) -> (FuelCoreBlock, Address) {
let block = sealed_block.entity.clone();
let block_producer = sealed_block
Expand All @@ -136,17 +128,6 @@ pub trait FuelCoreLike: Sync + Send {
.expect("Failed to get latest block height")
.expect("NATS Publisher: no block at height {height}")
}

fn get_sealed_block_time_by_height(&self, height: u32) -> Tai64 {
self.onchain_database()
.latest_view()
.expect("failed to get latest db view")
.get_sealed_block_header(&height.into())
.expect("Failed to get sealed block header")
.expect("Failed to find sealed block header")
.entity
.time()
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -230,6 +211,25 @@ impl FuelCoreLike for FuelCore {
}
}

async fn await_offchain_db_sync(
&self,
block_id: &FuelCoreBlockId,
) -> anyhow::Result<()> {
loop {
if self
.offchain_database()?
.get_block_height(block_id)?
.is_some()
{
break;
};

sleep(Duration::from_millis(500)).await;
}

Ok(())
}

fn base_asset_id(&self) -> &FuelCoreAssetId {
&self.base_asset_id
}
Expand Down
Loading

0 comments on commit 4dcb1b7

Please sign in to comment.