Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add missing leaves cache #167

Merged
merged 3 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/blockchain-tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ metrics.workspace = true
# misc
aquamarine.workspace = true
linked_hash_set.workspace = true
dashmap = "6.1.0"

[dev-dependencies]
reth-chainspec.workspace = true
Expand Down
26 changes: 20 additions & 6 deletions crates/blockchain-tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use super::externals::TreeExternals;
use crate::BundleStateDataRef;
use alloy_eips::ForkBlock;
use alloy_primitives::{map::HashMap, BlockHash, BlockNumber, B256, U256};
use dashmap::DashMap;
use reth_blockchain_tree_api::{
error::{BlockchainTreeError, InsertBlockErrorKind},
BlockAttachment, BlockValidationKind,
Expand All @@ -23,13 +24,17 @@ use reth_provider::{
FullExecutionDataProvider, ProviderError, StateRootProvider, TryIntoHistoricalStateProvider,
};
use reth_revm::database::StateProviderDatabase;
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
use reth_trie::{
updates::{StorageTrieUpdates, TrieUpdates},
HashedPostState, TrieInput,
};
use reth_trie_parallel::parallel_root::ParallelStateRoot;
use reth_trie_prefetch::TriePrefetch;
use std::{
clone::Clone,
collections::BTreeMap,
ops::{Deref, DerefMut},
sync::Arc,
time::Instant,
};

Expand Down Expand Up @@ -213,8 +218,11 @@ impl AppendableChain {

let provider = BundleStateProvider::new(state_provider, bundle_state_data_provider);

let (prefetch_tx, interrupt_tx) =
if enable_prefetch { Self::setup_prefetch(externals) } else { (None, None) };
let (prefetch_tx, interrupt_tx, missing_leaves_cache) = if enable_prefetch {
Self::setup_prefetch(externals)
} else {
(None, None, Default::default())
};

let db = StateProviderDatabase::new(&provider);
let executor = externals.executor_factory.executor(db, prefetch_tx);
Expand Down Expand Up @@ -243,7 +251,7 @@ impl AppendableChain {
consistent_view,
TrieInput::from_state(execution_outcome.hash_state_slow()),
)
.incremental_root_with_updates()
.incremental_root_with_updates_and_cache(missing_leaves_cache)
.map(|(root, updates)| (root, Some(updates)))
.map_err(ProviderError::from)?
} else {
Expand Down Expand Up @@ -343,11 +351,13 @@ impl AppendableChain {
Ok(())
}

#[allow(clippy::type_complexity)]
fn setup_prefetch<N, E>(
externals: &TreeExternals<N, E>,
) -> (
Option<tokio::sync::mpsc::UnboundedSender<EvmState>>,
Option<tokio::sync::oneshot::Sender<()>>,
Arc<DashMap<B256, (B256, StorageTrieUpdates)>>,
)
where
N: ProviderNodeTypes,
Expand All @@ -358,13 +368,17 @@ impl AppendableChain {

let mut trie_prefetch = TriePrefetch::new();
let provider_factory = externals.provider_factory.clone();
let missing_leaves_cache = Arc::new(DashMap::new());
let missing_leaves_cache_clone = Arc::clone(&missing_leaves_cache);

tokio::spawn({
async move {
trie_prefetch.run(provider_factory, prefetch_rx, interrupt_rx).await;
trie_prefetch
.run(provider_factory, prefetch_rx, interrupt_rx, missing_leaves_cache_clone)
.await;
}
});

(Some(prefetch_tx), Some(interrupt_tx))
(Some(prefetch_tx), Some(interrupt_tx), missing_leaves_cache)
}
}
1 change: 1 addition & 0 deletions crates/engine/tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ reth-metrics = { workspace = true, features = ["common"] }

# misc
tracing.workspace = true
dashmap = "6.1.0"

# optional deps for test-utils
reth-prune-types = { workspace = true, optional = true }
Expand Down
39 changes: 29 additions & 10 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use alloy_rpc_types_engine::{
CancunPayloadFields, ExecutionPayload, ForkchoiceState, PayloadStatus, PayloadStatusEnum,
PayloadValidationError,
};
use dashmap::DashMap;
use reth_beacon_consensus::{
BeaconConsensusEngineEvent, BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache,
OnForkChoiceUpdated, MIN_BLOCKS_FOR_PIPELINE_RUN,
Expand Down Expand Up @@ -44,7 +45,10 @@ use reth_provider::{
};
use reth_revm::database::StateProviderDatabase;
use reth_stages_api::ControlFlow;
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
use reth_trie::{
updates::{StorageTrieUpdates, TrieUpdates},
HashedPostState, TrieInput,
};
use reth_trie_parallel::parallel_root::{ParallelStateRoot, ParallelStateRootError};
use reth_trie_prefetch::TriePrefetch;
use std::{
Expand Down Expand Up @@ -2213,11 +2217,11 @@ where
}

trace!(target: "engine::tree", block=?block.num_hash(), "Executing block");
let (prefetch_tx, interrupt_tx) =
let (prefetch_tx, interrupt_tx, missing_leaves_cache) =
if self.enable_prefetch && !self.skip_state_root_validation {
self.setup_prefetch()
} else {
(None, None)
(None, None, Default::default())
};

let executor = self
Expand Down Expand Up @@ -2274,9 +2278,11 @@ where
// different view of the database.
let persistence_in_progress = self.persistence_state.in_progress();
if !persistence_in_progress {
state_root_result = match self
.compute_state_root_parallel(block.parent_hash, &hashed_state)
{
state_root_result = match self.compute_state_root_parallel(
block.parent_hash,
&hashed_state,
missing_leaves_cache,
) {
Ok((state_root, trie_output)) => Some((state_root, trie_output)),
Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => {
debug!(target: "engine", %error, "Parallel state root computation failed consistency check, falling back");
Expand Down Expand Up @@ -2361,6 +2367,7 @@ where
&self,
parent_hash: B256,
hashed_state: &HashedPostState,
missing_leaves_cache: Arc<DashMap<B256, (B256, StorageTrieUpdates)>>,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
let mut input = TrieInput::default();
Expand All @@ -2383,7 +2390,8 @@ where
// Extend with block we are validating root for.
input.append_ref(hashed_state);

ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()
ParallelStateRoot::new(consistent_view, input)
.incremental_root_with_updates_and_cache(missing_leaves_cache)
}

/// Handles an error that occurred while inserting a block.
Expand Down Expand Up @@ -2627,20 +2635,31 @@ where
Ok(())
}

fn setup_prefetch(&self) -> (Option<UnboundedSender<EvmState>>, Option<oneshot::Sender<()>>) {
#[allow(clippy::type_complexity)]
fn setup_prefetch(
&self,
) -> (
Option<UnboundedSender<EvmState>>,
Option<oneshot::Sender<()>>,
Arc<DashMap<B256, (B256, StorageTrieUpdates)>>,
) {
let (prefetch_tx, prefetch_rx) = tokio::sync::mpsc::unbounded_channel();
let (interrupt_tx, interrupt_rx) = oneshot::channel();

let mut trie_prefetch = TriePrefetch::new();
let provider_factory = self.provider.clone();
let missing_leaves_cache = Arc::new(DashMap::new());
let missing_leaves_cache_clone = Arc::clone(&missing_leaves_cache);

tokio::spawn({
async move {
trie_prefetch.run(provider_factory, prefetch_rx, interrupt_rx).await;
trie_prefetch
.run(provider_factory, prefetch_rx, interrupt_rx, missing_leaves_cache_clone)
.await;
}
});

(Some(prefetch_tx), Some(interrupt_tx))
(Some(prefetch_tx), Some(interrupt_tx), missing_leaves_cache)
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/trie/parallel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ tracing.workspace = true
# misc
thiserror.workspace = true
derive_more.workspace = true
dashmap = "6.1.0"
rayon.workspace = true
itertools.workspace = true

Expand Down
37 changes: 26 additions & 11 deletions crates/trie/parallel/src/parallel_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::metrics::ParallelStateRootMetrics;
use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets};
use alloy_primitives::B256;
use alloy_rlp::{BufMut, Encodable};
use dashmap::DashMap;
use itertools::Itertools;
use reth_execution_errors::StorageRootError;
use reth_provider::{
Expand All @@ -12,7 +13,7 @@ use reth_trie::{
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
node_iter::{TrieElement, TrieNodeIter},
trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
updates::TrieUpdates,
updates::{StorageTrieUpdates, TrieUpdates},
walker::TrieWalker,
HashBuilder, Nibbles, StorageRoot, TrieAccount, TrieInput,
};
Expand Down Expand Up @@ -61,19 +62,28 @@ where
{
/// Calculate incremental state root in parallel.
pub fn incremental_root(self) -> Result<B256, ParallelStateRootError> {
self.calculate(false).map(|(root, _)| root)
self.calculate(false, None).map(|(root, _)| root)
}

/// Calculate incremental state root with updates in parallel.
pub fn incremental_root_with_updates(
self,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
self.calculate(true)
self.calculate(true, None)
}

/// Calculate incremental state root with missing leaves cache.
pub fn incremental_root_with_updates_and_cache(
self,
miss_leaves_cache: Arc<DashMap<B256, (B256, StorageTrieUpdates)>>,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
self.calculate(true, Some(miss_leaves_cache))
}

fn calculate(
self,
retain_updates: bool,
miss_leaves_cache: Option<Arc<DashMap<B256, (B256, StorageTrieUpdates)>>>,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
let mut tracker = ParallelTrieTracker::default();
let trie_nodes_sorted = Arc::new(self.input.nodes.into_sorted());
Expand Down Expand Up @@ -171,14 +181,19 @@ where
// be a possibility of re-adding a non-modified leaf to the hash builder.
None => {
tracker.inc_missed_leaves();
StorageRoot::new_hashed(
trie_cursor_factory.clone(),
hashed_cursor_factory.clone(),
hashed_address,
#[cfg(feature = "metrics")]
self.metrics.storage_trie.clone(),
)
.calculate(retain_updates)?
match miss_leaves_cache.clone().and_then(|cache| {
cache.get(&hashed_address).map(|value| value.clone())
}) {
Some((root, updates)) => (root, 0usize, updates),
None => StorageRoot::new_hashed(
trie_cursor_factory.clone(),
hashed_cursor_factory.clone(),
hashed_address,
#[cfg(feature = "metrics")]
self.metrics.storage_trie.clone(),
)
.calculate(retain_updates)?,
}
}
};

Expand Down
1 change: 1 addition & 0 deletions crates/trie/prefetch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ tracing.workspace = true
thiserror.workspace = true
derive_more.workspace = true
rayon.workspace = true
dashmap = "6.1.0"

# async
tokio = { workspace = true, default-features = false, features = ["sync", "rt", "macros"] }
Expand Down
Loading