Skip to content

Commit

Permalink
feat: add missing leaves cache (#167)
Browse files Browse the repository at this point in the history
* feat: add missing leaves cache

* fix lint issues

* fix comments

---------

Co-authored-by: Keefe Liu <keefe.l@users.noreply.github.com>
  • Loading branch information
keefel and Keefe Liu authored Oct 30, 2024
1 parent c576b32 commit 7452e10
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 39 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/blockchain-tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ metrics.workspace = true
# misc
aquamarine.workspace = true
linked_hash_set.workspace = true
dashmap = "6.1.0"

[dev-dependencies]
reth-chainspec.workspace = true
Expand Down
26 changes: 20 additions & 6 deletions crates/blockchain-tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use super::externals::TreeExternals;
use crate::BundleStateDataRef;
use alloy_eips::ForkBlock;
use alloy_primitives::{map::HashMap, BlockHash, BlockNumber, B256, U256};
use dashmap::DashMap;
use reth_blockchain_tree_api::{
error::{BlockchainTreeError, InsertBlockErrorKind},
BlockAttachment, BlockValidationKind,
Expand All @@ -23,13 +24,17 @@ use reth_provider::{
FullExecutionDataProvider, ProviderError, StateRootProvider, TryIntoHistoricalStateProvider,
};
use reth_revm::database::StateProviderDatabase;
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
use reth_trie::{
updates::{StorageTrieUpdates, TrieUpdates},
HashedPostState, TrieInput,
};
use reth_trie_parallel::parallel_root::ParallelStateRoot;
use reth_trie_prefetch::TriePrefetch;
use std::{
clone::Clone,
collections::BTreeMap,
ops::{Deref, DerefMut},
sync::Arc,
time::Instant,
};

Expand Down Expand Up @@ -213,8 +218,11 @@ impl AppendableChain {

let provider = BundleStateProvider::new(state_provider, bundle_state_data_provider);

let (prefetch_tx, interrupt_tx) =
if enable_prefetch { Self::setup_prefetch(externals) } else { (None, None) };
let (prefetch_tx, interrupt_tx, missing_leaves_cache) = if enable_prefetch {
Self::setup_prefetch(externals)
} else {
(None, None, Default::default())
};

let db = StateProviderDatabase::new(&provider);
let executor = externals.executor_factory.executor(db, prefetch_tx);
Expand Down Expand Up @@ -243,7 +251,7 @@ impl AppendableChain {
consistent_view,
TrieInput::from_state(execution_outcome.hash_state_slow()),
)
.incremental_root_with_updates()
.incremental_root_with_updates_and_cache(missing_leaves_cache)
.map(|(root, updates)| (root, Some(updates)))
.map_err(ProviderError::from)?
} else {
Expand Down Expand Up @@ -343,11 +351,13 @@ impl AppendableChain {
Ok(())
}

#[allow(clippy::type_complexity)]
fn setup_prefetch<N, E>(
externals: &TreeExternals<N, E>,
) -> (
Option<tokio::sync::mpsc::UnboundedSender<EvmState>>,
Option<tokio::sync::oneshot::Sender<()>>,
Arc<DashMap<B256, (B256, StorageTrieUpdates)>>,
)
where
N: ProviderNodeTypes,
Expand All @@ -358,13 +368,17 @@ impl AppendableChain {

let mut trie_prefetch = TriePrefetch::new();
let provider_factory = externals.provider_factory.clone();
let missing_leaves_cache = Arc::new(DashMap::new());
let missing_leaves_cache_clone = Arc::clone(&missing_leaves_cache);

tokio::spawn({
async move {
trie_prefetch.run(provider_factory, prefetch_rx, interrupt_rx).await;
trie_prefetch
.run(provider_factory, prefetch_rx, interrupt_rx, missing_leaves_cache_clone)
.await;
}
});

(Some(prefetch_tx), Some(interrupt_tx))
(Some(prefetch_tx), Some(interrupt_tx), missing_leaves_cache)
}
}
1 change: 1 addition & 0 deletions crates/engine/tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ reth-metrics = { workspace = true, features = ["common"] }

# misc
tracing.workspace = true
dashmap = "6.1.0"

# optional deps for test-utils
reth-prune-types = { workspace = true, optional = true }
Expand Down
39 changes: 29 additions & 10 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use alloy_rpc_types_engine::{
CancunPayloadFields, ExecutionPayload, ForkchoiceState, PayloadStatus, PayloadStatusEnum,
PayloadValidationError,
};
use dashmap::DashMap;
use reth_beacon_consensus::{
BeaconConsensusEngineEvent, BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache,
OnForkChoiceUpdated, MIN_BLOCKS_FOR_PIPELINE_RUN,
Expand Down Expand Up @@ -44,7 +45,10 @@ use reth_provider::{
};
use reth_revm::database::StateProviderDatabase;
use reth_stages_api::ControlFlow;
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
use reth_trie::{
updates::{StorageTrieUpdates, TrieUpdates},
HashedPostState, TrieInput,
};
use reth_trie_parallel::parallel_root::{ParallelStateRoot, ParallelStateRootError};
use reth_trie_prefetch::TriePrefetch;
use std::{
Expand Down Expand Up @@ -2213,11 +2217,11 @@ where
}

trace!(target: "engine::tree", block=?block.num_hash(), "Executing block");
let (prefetch_tx, interrupt_tx) =
let (prefetch_tx, interrupt_tx, missing_leaves_cache) =
if self.enable_prefetch && !self.skip_state_root_validation {
self.setup_prefetch()
} else {
(None, None)
(None, None, Default::default())
};

let executor = self
Expand Down Expand Up @@ -2274,9 +2278,11 @@ where
// different view of the database.
let persistence_in_progress = self.persistence_state.in_progress();
if !persistence_in_progress {
state_root_result = match self
.compute_state_root_parallel(block.parent_hash, &hashed_state)
{
state_root_result = match self.compute_state_root_parallel(
block.parent_hash,
&hashed_state,
missing_leaves_cache,
) {
Ok((state_root, trie_output)) => Some((state_root, trie_output)),
Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => {
debug!(target: "engine", %error, "Parallel state root computation failed consistency check, falling back");
Expand Down Expand Up @@ -2361,6 +2367,7 @@ where
&self,
parent_hash: B256,
hashed_state: &HashedPostState,
missing_leaves_cache: Arc<DashMap<B256, (B256, StorageTrieUpdates)>>,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
let mut input = TrieInput::default();
Expand All @@ -2383,7 +2390,8 @@ where
// Extend with block we are validating root for.
input.append_ref(hashed_state);

ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()
ParallelStateRoot::new(consistent_view, input)
.incremental_root_with_updates_and_cache(missing_leaves_cache)
}

/// Handles an error that occurred while inserting a block.
Expand Down Expand Up @@ -2627,20 +2635,31 @@ where
Ok(())
}

fn setup_prefetch(&self) -> (Option<UnboundedSender<EvmState>>, Option<oneshot::Sender<()>>) {
#[allow(clippy::type_complexity)]
fn setup_prefetch(
&self,
) -> (
Option<UnboundedSender<EvmState>>,
Option<oneshot::Sender<()>>,
Arc<DashMap<B256, (B256, StorageTrieUpdates)>>,
) {
let (prefetch_tx, prefetch_rx) = tokio::sync::mpsc::unbounded_channel();
let (interrupt_tx, interrupt_rx) = oneshot::channel();

let mut trie_prefetch = TriePrefetch::new();
let provider_factory = self.provider.clone();
let missing_leaves_cache = Arc::new(DashMap::new());
let missing_leaves_cache_clone = Arc::clone(&missing_leaves_cache);

tokio::spawn({
async move {
trie_prefetch.run(provider_factory, prefetch_rx, interrupt_rx).await;
trie_prefetch
.run(provider_factory, prefetch_rx, interrupt_rx, missing_leaves_cache_clone)
.await;
}
});

(Some(prefetch_tx), Some(interrupt_tx))
(Some(prefetch_tx), Some(interrupt_tx), missing_leaves_cache)
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/trie/parallel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ tracing.workspace = true
# misc
thiserror.workspace = true
derive_more.workspace = true
dashmap = "6.1.0"
rayon.workspace = true
itertools.workspace = true

Expand Down
37 changes: 26 additions & 11 deletions crates/trie/parallel/src/parallel_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::metrics::ParallelStateRootMetrics;
use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets};
use alloy_primitives::B256;
use alloy_rlp::{BufMut, Encodable};
use dashmap::DashMap;
use itertools::Itertools;
use reth_execution_errors::StorageRootError;
use reth_provider::{
Expand All @@ -12,7 +13,7 @@ use reth_trie::{
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
node_iter::{TrieElement, TrieNodeIter},
trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
updates::TrieUpdates,
updates::{StorageTrieUpdates, TrieUpdates},
walker::TrieWalker,
HashBuilder, Nibbles, StorageRoot, TrieAccount, TrieInput,
};
Expand Down Expand Up @@ -61,19 +62,28 @@ where
{
/// Calculate incremental state root in parallel.
pub fn incremental_root(self) -> Result<B256, ParallelStateRootError> {
self.calculate(false).map(|(root, _)| root)
self.calculate(false, None).map(|(root, _)| root)
}

/// Calculate incremental state root with updates in parallel.
pub fn incremental_root_with_updates(
self,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
self.calculate(true)
self.calculate(true, None)
}

/// Calculate incremental state root with missing leaves cache.
pub fn incremental_root_with_updates_and_cache(
self,
miss_leaves_cache: Arc<DashMap<B256, (B256, StorageTrieUpdates)>>,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
self.calculate(true, Some(miss_leaves_cache))
}

fn calculate(
self,
retain_updates: bool,
miss_leaves_cache: Option<Arc<DashMap<B256, (B256, StorageTrieUpdates)>>>,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
let mut tracker = ParallelTrieTracker::default();
let trie_nodes_sorted = Arc::new(self.input.nodes.into_sorted());
Expand Down Expand Up @@ -171,14 +181,19 @@ where
// be a possibility of re-adding a non-modified leaf to the hash builder.
None => {
tracker.inc_missed_leaves();
StorageRoot::new_hashed(
trie_cursor_factory.clone(),
hashed_cursor_factory.clone(),
hashed_address,
#[cfg(feature = "metrics")]
self.metrics.storage_trie.clone(),
)
.calculate(retain_updates)?
match miss_leaves_cache.clone().and_then(|cache| {
cache.get(&hashed_address).map(|value| value.clone())
}) {
Some((root, updates)) => (root, 0usize, updates),
None => StorageRoot::new_hashed(
trie_cursor_factory.clone(),
hashed_cursor_factory.clone(),
hashed_address,
#[cfg(feature = "metrics")]
self.metrics.storage_trie.clone(),
)
.calculate(retain_updates)?,
}
}
};

Expand Down
1 change: 1 addition & 0 deletions crates/trie/prefetch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ tracing.workspace = true
thiserror.workspace = true
derive_more.workspace = true
rayon.workspace = true
dashmap = "6.1.0"

# async
tokio = { workspace = true, default-features = false, features = ["sync", "rt", "macros"] }
Expand Down
Loading

0 comments on commit 7452e10

Please sign in to comment.