Skip to content

Commit

Permalink
feat: add cache for missing leaves
Browse files Browse the repository at this point in the history
  • Loading branch information
Keefe Liu committed Oct 24, 2024
1 parent 176c458 commit 17843fe
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 38 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/blockchain-tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ metrics.workspace = true
# misc
aquamarine.workspace = true
linked_hash_set.workspace = true
dashmap = "6.1.0"

[dev-dependencies]
reth-chainspec.workspace = true
Expand Down
25 changes: 19 additions & 6 deletions crates/blockchain-tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use super::externals::TreeExternals;
use crate::BundleStateDataRef;
use alloy_primitives::{BlockHash, BlockNumber, U256};
use dashmap::DashMap;
use reth_blockchain_tree_api::{
error::{BlockchainTreeError, InsertBlockErrorKind},
BlockAttachment, BlockValidationKind,
Expand All @@ -23,13 +24,17 @@ use reth_provider::{
FullExecutionDataProvider, ProviderError, StateRootProvider, TryIntoHistoricalStateProvider,
};
use reth_revm::database::StateProviderDatabase;
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
use reth_trie::{
updates::{StorageTrieUpdates, TrieUpdates},
HashedPostState, TrieInput,
};
use reth_trie_parallel::parallel_root::ParallelStateRoot;
use reth_trie_prefetch::TriePrefetch;
use std::{
clone::Clone,
collections::{BTreeMap, HashMap},
ops::{Deref, DerefMut},
sync::Arc,
time::Instant,
};

Expand Down Expand Up @@ -213,8 +218,11 @@ impl AppendableChain {

let provider = BundleStateProvider::new(state_provider, bundle_state_data_provider);

let (prefetch_tx, interrupt_tx) =
if enable_prefetch { Self::setup_prefetch(externals) } else { (None, None) };
let (prefetch_tx, interrupt_tx, missing_leaves_cache) = if enable_prefetch {
Self::setup_prefetch(externals)
} else {
(None, None, Default::default())
};

let db = StateProviderDatabase::new(&provider);
let executor = externals.executor_factory.executor(db, prefetch_tx);
Expand Down Expand Up @@ -243,7 +251,7 @@ impl AppendableChain {
consistent_view,
TrieInput::from_state(execution_outcome.hash_state_slow()),
)
.incremental_root_with_updates()
.incremental_root_with_updates_and_cache(missing_leaves_cache)
.map(|(root, updates)| (root, Some(updates)))
.map_err(ProviderError::from)?
} else {
Expand Down Expand Up @@ -348,6 +356,7 @@ impl AppendableChain {
) -> (
Option<tokio::sync::mpsc::UnboundedSender<EvmState>>,
Option<tokio::sync::oneshot::Sender<()>>,
Arc<DashMap<B256, (B256, StorageTrieUpdates)>>,
)
where
N: ProviderNodeTypes,
Expand All @@ -358,13 +367,17 @@ impl AppendableChain {

let mut trie_prefetch = TriePrefetch::new();
let provider_factory = externals.provider_factory.clone();
let missing_leaves_cache = Arc::new(DashMap::new());
let missing_leaves_cache_clone = Arc::clone(&missing_leaves_cache);

tokio::spawn({
async move {
trie_prefetch.run(provider_factory, prefetch_rx, interrupt_rx).await;
trie_prefetch
.run(provider_factory, prefetch_rx, interrupt_rx, missing_leaves_cache_clone)
.await;
}
});

(Some(prefetch_tx), Some(interrupt_tx))
(Some(prefetch_tx), Some(interrupt_tx), missing_leaves_cache)
}
}
1 change: 1 addition & 0 deletions crates/engine/tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ reth-metrics = { workspace = true, features = ["common"] }

# misc
tracing.workspace = true
dashmap = "6.1.0"

# optional deps for test-utils
reth-prune-types = { workspace = true, optional = true }
Expand Down
31 changes: 23 additions & 8 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
engine::{DownloadRequest, EngineApiEvent, FromEngine},
persistence::PersistenceHandle,
};
use dashmap::DashMap;
use reth_beacon_consensus::{
BeaconConsensusEngineEvent, BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache,
OnForkChoiceUpdated, MIN_BLOCKS_FOR_PIPELINE_RUN,
Expand Down Expand Up @@ -40,7 +41,10 @@ use reth_rpc_types::{
ExecutionPayload,
};
use reth_stages_api::ControlFlow;
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
use reth_trie::{
updates::{StorageTrieUpdates, TrieUpdates},
HashedPostState, TrieInput,
};
use reth_trie_parallel::parallel_root::ParallelStateRoot;
use reth_trie_prefetch::TriePrefetch;
use std::{
Expand Down Expand Up @@ -2163,11 +2167,11 @@ where
return Err(e.into())
}

let (prefetch_tx, interrupt_tx) =
let (prefetch_tx, interrupt_tx, missing_leaves_cache) =
if self.enable_prefetch && !self.skip_state_root_validation {
self.setup_prefetch()
} else {
(None, None)
(None, None, Default::default())
};

let executor = self
Expand Down Expand Up @@ -2226,7 +2230,7 @@ where
let persistence_in_progress = self.persistence_state.in_progress();
if !persistence_in_progress {
state_root_result = match self
.compute_state_root_in_parallel(block.parent_hash, &hashed_state)
.compute_state_root_in_parallel(block.parent_hash, &hashed_state, missing_leaves_cache)
{
Ok((state_root, trie_output)) => Some((state_root, trie_output)),
Err(ProviderError::ConsistentView(error)) => {
Expand Down Expand Up @@ -2313,6 +2317,7 @@ where
&self,
parent_hash: B256,
hashed_state: &HashedPostState,
missing_leaves_cache: Arc<DashMap<B256, (B256, StorageTrieUpdates)>>,
) -> ProviderResult<(B256, TrieUpdates)> {
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
let mut input = TrieInput::default();
Expand All @@ -2331,7 +2336,7 @@ where
// Extend with block we are validating root for.
input.append_ref(hashed_state);

Ok(ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()?)
Ok(ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates_and_cache(missing_leaves_cache)?)
}

/// Handles an error that occurred while inserting a block.
Expand Down Expand Up @@ -2563,20 +2568,30 @@ where
Ok(())
}

fn setup_prefetch(&self) -> (Option<UnboundedSender<EvmState>>, Option<oneshot::Sender<()>>) {
fn setup_prefetch(
&self,
) -> (
Option<UnboundedSender<EvmState>>,
Option<oneshot::Sender<()>>,
Arc<DashMap<B256, (B256, StorageTrieUpdates)>>,
) {
let (prefetch_tx, prefetch_rx) = tokio::sync::mpsc::unbounded_channel();
let (interrupt_tx, interrupt_rx) = oneshot::channel();

let mut trie_prefetch = TriePrefetch::new();
let provider_factory = self.provider.clone();
let missing_leaves_cache = Arc::new(DashMap::new());
let missing_leaves_cache_clone = Arc::clone(&missing_leaves_cache);

tokio::spawn({
async move {
trie_prefetch.run(provider_factory, prefetch_rx, interrupt_rx).await;
trie_prefetch
.run(provider_factory, prefetch_rx, interrupt_rx, missing_leaves_cache_clone)
.await;
}
});

(Some(prefetch_tx), Some(interrupt_tx))
(Some(prefetch_tx), Some(interrupt_tx), missing_leaves_cache)
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/trie/parallel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ tracing.workspace = true
# misc
thiserror.workspace = true
derive_more.workspace = true
dashmap = "6.1.0"

# `async` feature
reth-tasks = { workspace = true, optional = true }
Expand Down
50 changes: 38 additions & 12 deletions crates/trie/parallel/src/parallel_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use crate::metrics::ParallelStateRootMetrics;
use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets};
use alloy_rlp::{BufMut, Encodable};
use dashmap::DashMap;
use rayon::prelude::*;
use reth_execution_errors::StorageRootError;
use reth_primitives::B256;
Expand All @@ -12,12 +13,12 @@ use reth_trie::{
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
node_iter::{TrieElement, TrieNodeIter},
trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
updates::TrieUpdates,
updates::{StorageTrieUpdates, TrieUpdates},
walker::TrieWalker,
HashBuilder, Nibbles, StorageRoot, TrieAccount, TrieInput,
};
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};
use thiserror::Error;
use tracing::*;

Expand Down Expand Up @@ -62,19 +63,28 @@ where
{
/// Calculate incremental state root in parallel.
pub fn incremental_root(self) -> Result<B256, ParallelStateRootError> {
self.calculate(false).map(|(root, _)| root)
self.calculate(false, None).map(|(root, _)| root)
}

/// Calculate incremental state root with updates in parallel.
pub fn incremental_root_with_updates(
self,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
self.calculate(true)
self.calculate(true, None)
}

/// Calculate incremental state root with missing leaves cache.
pub fn incremental_root_with_updates_and_cache(
self,
miss_leaves_cache: Arc<DashMap<B256, (B256, StorageTrieUpdates)>>,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
self.calculate(true, Some(miss_leaves_cache))
}

fn calculate(
self,
retain_updates: bool,
miss_leaves_cache: Option<Arc<DashMap<B256, (B256, StorageTrieUpdates)>>>,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
let mut tracker = ParallelTrieTracker::default();
let trie_nodes_sorted = self.input.nodes.into_sorted();
Expand Down Expand Up @@ -153,14 +163,30 @@ where
// be a possibility of re-adding a non-modified leaf to the hash builder.
None => {
tracker.inc_missed_leaves();
StorageRoot::new_hashed(
trie_cursor_factory.clone(),
hashed_cursor_factory.clone(),
hashed_address,
#[cfg(feature = "metrics")]
self.metrics.storage_trie.clone(),
)
.calculate(retain_updates)?
if let Some(cache) = miss_leaves_cache.clone() {
if let Some(value) = cache.get(&hashed_address) {
let (root, updates) = value.value();
(*root, 0usize, updates.clone())
} else {
StorageRoot::new_hashed(
trie_cursor_factory.clone(),
hashed_cursor_factory.clone(),
hashed_address,
#[cfg(feature = "metrics")]
self.metrics.storage_trie.clone(),
)
.calculate(retain_updates)?
}
} else {
StorageRoot::new_hashed(
trie_cursor_factory.clone(),
hashed_cursor_factory.clone(),
hashed_address,
#[cfg(feature = "metrics")]
self.metrics.storage_trie.clone(),
)
.calculate(retain_updates)?
}
}
};

Expand Down
1 change: 1 addition & 0 deletions crates/trie/prefetch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ tracing.workspace = true
thiserror.workspace = true
derive_more.workspace = true
rayon.workspace = true
dashmap = "6.1.0"

# async
tokio = { workspace = true, default-features = false, features = ["sync", "rt", "macros"] }
Expand Down
Loading

0 comments on commit 17843fe

Please sign in to comment.