diff --git a/Cargo.lock b/Cargo.lock index cb8df23d6..23eb2603b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7155,6 +7155,7 @@ dependencies = [ "alloy-primitives", "aquamarine", "assert_matches", + "dashmap 6.1.0", "linked_hash_set", "metrics", "parking_lot 0.12.3", @@ -7879,6 +7880,7 @@ version = "1.0.5" dependencies = [ "alloy-rlp", "assert_matches", + "dashmap 6.1.0", "futures", "metrics", "rand 0.8.5", @@ -9925,6 +9927,7 @@ dependencies = [ "alloy-primitives", "alloy-rlp", "criterion", + "dashmap 6.1.0", "derive_more 1.0.0", "itertools 0.13.0", "metrics", @@ -9952,6 +9955,7 @@ version = "1.0.5" dependencies = [ "alloy-rlp", "criterion", + "dashmap 6.1.0", "derive_more 1.0.0", "metrics", "proptest", diff --git a/crates/blockchain-tree/Cargo.toml b/crates/blockchain-tree/Cargo.toml index e9dd537e7..fd9fbfeb7 100644 --- a/crates/blockchain-tree/Cargo.toml +++ b/crates/blockchain-tree/Cargo.toml @@ -46,6 +46,7 @@ metrics.workspace = true # misc aquamarine.workspace = true linked_hash_set.workspace = true +dashmap = "6.1.0" [dev-dependencies] reth-chainspec.workspace = true diff --git a/crates/blockchain-tree/src/chain.rs b/crates/blockchain-tree/src/chain.rs index 97ee4c083..ca16c1301 100644 --- a/crates/blockchain-tree/src/chain.rs +++ b/crates/blockchain-tree/src/chain.rs @@ -6,6 +6,7 @@ use super::externals::TreeExternals; use crate::BundleStateDataRef; use alloy_primitives::{BlockHash, BlockNumber, U256}; +use dashmap::DashMap; use reth_blockchain_tree_api::{ error::{BlockchainTreeError, InsertBlockErrorKind}, BlockAttachment, BlockValidationKind, @@ -23,13 +24,17 @@ use reth_provider::{ FullExecutionDataProvider, ProviderError, StateRootProvider, TryIntoHistoricalStateProvider, }; use reth_revm::database::StateProviderDatabase; -use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput}; +use reth_trie::{ + updates::{StorageTrieUpdates, TrieUpdates}, + HashedPostState, TrieInput, +}; use reth_trie_parallel::parallel_root::ParallelStateRoot; use reth_trie_prefetch::TriePrefetch; use std::{ clone::Clone, collections::{BTreeMap, HashMap}, ops::{Deref, DerefMut}, + sync::Arc, time::Instant, }; @@ -213,8 +218,11 @@ impl AppendableChain { let provider = BundleStateProvider::new(state_provider, bundle_state_data_provider); - let (prefetch_tx, interrupt_tx) = - if enable_prefetch { Self::setup_prefetch(externals) } else { (None, None) }; + let (prefetch_tx, interrupt_tx, missing_leaves_cache) = if enable_prefetch { + Self::setup_prefetch(externals) + } else { + (None, None, Default::default()) + }; let db = StateProviderDatabase::new(&provider); let executor = externals.executor_factory.executor(db, prefetch_tx); @@ -243,7 +251,7 @@ impl AppendableChain { consistent_view, TrieInput::from_state(execution_outcome.hash_state_slow()), ) - .incremental_root_with_updates() + .incremental_root_with_updates_and_cache(missing_leaves_cache) .map(|(root, updates)| (root, Some(updates))) .map_err(ProviderError::from)? } else { @@ -348,6 +356,7 @@ impl AppendableChain { ) -> ( Option>, Option>, + Arc>, ) where N: ProviderNodeTypes, @@ -358,13 +367,17 @@ impl AppendableChain { let mut trie_prefetch = TriePrefetch::new(); let provider_factory = externals.provider_factory.clone(); + let missing_leaves_cache = Arc::new(DashMap::new()); + let missing_leaves_cache_clone = Arc::clone(&missing_leaves_cache); tokio::spawn({ async move { - trie_prefetch.run(provider_factory, prefetch_rx, interrupt_rx).await; + trie_prefetch + .run(provider_factory, prefetch_rx, interrupt_rx, missing_leaves_cache_clone) + .await; } }); - (Some(prefetch_tx), Some(interrupt_tx)) + (Some(prefetch_tx), Some(interrupt_tx), missing_leaves_cache) } } diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index b938969f7..de36542a7 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -47,6 +47,7 @@ reth-metrics = { workspace = true, features = ["common"] } # misc tracing.workspace = true +dashmap = "6.1.0" # optional deps for test-utils reth-prune-types = { workspace = true, optional = true } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 3c57a74a4..415eacd56 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -4,6 +4,7 @@ use crate::{ engine::{DownloadRequest, EngineApiEvent, FromEngine}, persistence::PersistenceHandle, }; +use dashmap::DashMap; use reth_beacon_consensus::{ BeaconConsensusEngineEvent, BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated, MIN_BLOCKS_FOR_PIPELINE_RUN, @@ -40,7 +41,10 @@ use reth_rpc_types::{ ExecutionPayload, }; use reth_stages_api::ControlFlow; -use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput}; +use reth_trie::{ + updates::{StorageTrieUpdates, TrieUpdates}, + HashedPostState, TrieInput, +}; use reth_trie_parallel::parallel_root::ParallelStateRoot; use reth_trie_prefetch::TriePrefetch; use std::{ @@ -2163,11 +2167,11 @@ where return Err(e.into()) } - let (prefetch_tx, interrupt_tx) = + let (prefetch_tx, interrupt_tx, missing_leaves_cache) = if self.enable_prefetch && !self.skip_state_root_validation { self.setup_prefetch() } else { - (None, None) + (None, None, Default::default()) }; let executor = self @@ -2226,7 +2230,7 @@ where let persistence_in_progress = self.persistence_state.in_progress(); if !persistence_in_progress { state_root_result = match self - .compute_state_root_in_parallel(block.parent_hash, &hashed_state) + .compute_state_root_in_parallel(block.parent_hash, &hashed_state, missing_leaves_cache) { Ok((state_root, trie_output)) => Some((state_root, trie_output)), Err(ProviderError::ConsistentView(error)) => { @@ -2313,6 +2317,7 @@ where &self, parent_hash: B256, hashed_state: &HashedPostState, + missing_leaves_cache: Arc>, ) -> ProviderResult<(B256, TrieUpdates)> { let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?; let mut input = TrieInput::default(); @@ -2331,7 +2336,7 @@ where // Extend with block we are validating root for. input.append_ref(hashed_state); - Ok(ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()?) + Ok(ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates_and_cache(missing_leaves_cache)?) } /// Handles an error that occurred while inserting a block. @@ -2563,20 +2568,30 @@ where Ok(()) } - fn setup_prefetch(&self) -> (Option>, Option>) { + fn setup_prefetch( + &self, + ) -> ( + Option>, + Option>, + Arc>, + ) { let (prefetch_tx, prefetch_rx) = tokio::sync::mpsc::unbounded_channel(); let (interrupt_tx, interrupt_rx) = oneshot::channel(); let mut trie_prefetch = TriePrefetch::new(); let provider_factory = self.provider.clone(); + let missing_leaves_cache = Arc::new(DashMap::new()); + let missing_leaves_cache_clone = Arc::clone(&missing_leaves_cache); tokio::spawn({ async move { - trie_prefetch.run(provider_factory, prefetch_rx, interrupt_rx).await; + trie_prefetch + .run(provider_factory, prefetch_rx, interrupt_rx, missing_leaves_cache_clone) + .await; } }); - (Some(prefetch_tx), Some(interrupt_tx)) + (Some(prefetch_tx), Some(interrupt_tx), missing_leaves_cache) } } diff --git a/crates/trie/parallel/Cargo.toml b/crates/trie/parallel/Cargo.toml index e53d15c14..0e8cbb92e 100644 --- a/crates/trie/parallel/Cargo.toml +++ b/crates/trie/parallel/Cargo.toml @@ -31,6 +31,7 @@ tracing.workspace = true # misc thiserror.workspace = true derive_more.workspace = true +dashmap = "6.1.0" # `async` feature reth-tasks = { workspace = true, optional = true } diff --git a/crates/trie/parallel/src/parallel_root.rs b/crates/trie/parallel/src/parallel_root.rs index 369eaca2b..4fd2edec7 100644 --- a/crates/trie/parallel/src/parallel_root.rs +++ b/crates/trie/parallel/src/parallel_root.rs @@ -2,6 +2,7 @@ use crate::metrics::ParallelStateRootMetrics; use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets}; use alloy_rlp::{BufMut, Encodable}; +use dashmap::DashMap; use rayon::prelude::*; use reth_execution_errors::StorageRootError; use reth_primitives::B256; @@ -12,12 +13,12 @@ use reth_trie::{ hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory}, node_iter::{TrieElement, TrieNodeIter}, trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory}, - updates::TrieUpdates, + updates::{StorageTrieUpdates, TrieUpdates}, walker::TrieWalker, HashBuilder, Nibbles, StorageRoot, TrieAccount, TrieInput, }; use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory}; -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use thiserror::Error; use tracing::*; @@ -62,19 +63,28 @@ where { /// Calculate incremental state root in parallel. pub fn incremental_root(self) -> Result { - self.calculate(false).map(|(root, _)| root) + self.calculate(false, None).map(|(root, _)| root) } /// Calculate incremental state root with updates in parallel. pub fn incremental_root_with_updates( self, ) -> Result<(B256, TrieUpdates), ParallelStateRootError> { - self.calculate(true) + self.calculate(true, None) + } + + /// Calculate incremental state root with missing leaves cache. + pub fn incremental_root_with_updates_and_cache( + self, + miss_leaves_cache: Arc>, + ) -> Result<(B256, TrieUpdates), ParallelStateRootError> { + self.calculate(true, Some(miss_leaves_cache)) } fn calculate( self, retain_updates: bool, + miss_leaves_cache: Option>>, ) -> Result<(B256, TrieUpdates), ParallelStateRootError> { let mut tracker = ParallelTrieTracker::default(); let trie_nodes_sorted = self.input.nodes.into_sorted(); @@ -153,14 +163,30 @@ where // be a possibility of re-adding a non-modified leaf to the hash builder. None => { tracker.inc_missed_leaves(); - StorageRoot::new_hashed( - trie_cursor_factory.clone(), - hashed_cursor_factory.clone(), - hashed_address, - #[cfg(feature = "metrics")] - self.metrics.storage_trie.clone(), - ) - .calculate(retain_updates)? + if let Some(cache) = miss_leaves_cache.clone() { + if let Some(value) = cache.get(&hashed_address) { + let (root, updates) = value.value(); + (*root, 0usize, updates.clone()) + } else { + StorageRoot::new_hashed( + trie_cursor_factory.clone(), + hashed_cursor_factory.clone(), + hashed_address, + #[cfg(feature = "metrics")] + self.metrics.storage_trie.clone(), + ) + .calculate(retain_updates)? + } + } else { + StorageRoot::new_hashed( + trie_cursor_factory.clone(), + hashed_cursor_factory.clone(), + hashed_address, + #[cfg(feature = "metrics")] + self.metrics.storage_trie.clone(), + ) + .calculate(retain_updates)? + } } }; diff --git a/crates/trie/prefetch/Cargo.toml b/crates/trie/prefetch/Cargo.toml index 20ed081a6..575327ef0 100644 --- a/crates/trie/prefetch/Cargo.toml +++ b/crates/trie/prefetch/Cargo.toml @@ -32,6 +32,7 @@ tracing.workspace = true thiserror.workspace = true derive_more.workspace = true rayon.workspace = true +dashmap = "6.1.0" # async tokio = { workspace = true, default-features = false, features = ["sync", "rt", "macros"] } diff --git a/crates/trie/prefetch/src/prefetch.rs b/crates/trie/prefetch/src/prefetch.rs index b27da82d4..897c57707 100644 --- a/crates/trie/prefetch/src/prefetch.rs +++ b/crates/trie/prefetch/src/prefetch.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, sync::Arc}; +use dashmap::DashMap; use rayon::prelude::*; use reth_execution_errors::StorageRootError; use reth_primitives::{revm_primitives::EvmState, B256}; @@ -12,6 +13,7 @@ use reth_trie::{ node_iter::{TrieElement, TrieNodeIter}, stats::TrieTracker, trie_cursor::TrieCursorFactory, + updates::StorageTrieUpdates, walker::TrieWalker, HashedPostState, HashedStorage, StorageRoot, }; @@ -61,6 +63,7 @@ impl TriePrefetch { provider_factory: Factory, mut prefetch_rx: UnboundedReceiver, mut interrupt_rx: Receiver<()>, + missing_leaves_cache: Arc>, ) where Factory: DatabaseProviderFactory + Clone + 'static, { @@ -77,8 +80,9 @@ impl TriePrefetch { let consistent_view = ConsistentDbView::new_with_latest_tip(provider_factory.clone()).unwrap(); let hashed_state_clone = hashed_state.clone(); let arc_tracker_clone = Arc::clone(&arc_tracker); + let missing_leaves_cache = Arc::clone(&missing_leaves_cache); join_set.spawn(async move { - if let Err(e) = self_clone.prefetch_accounts::(consistent_view, hashed_state_clone, arc_tracker_clone).await { + if let Err(e) = self_clone.prefetch_accounts::(consistent_view, hashed_state_clone, arc_tracker_clone, missing_leaves_cache).await { debug!(target: "trie::trie_prefetch", ?e, "Error while prefetching account trie storage"); }; }); @@ -166,6 +170,7 @@ impl TriePrefetch { consistent_view: ConsistentDbView, hashed_state: HashedPostState, arc_prefetch_tracker: Arc>, + missing_leaves_cache: Arc>, ) -> Result<(), TriePrefetchError> where Factory: DatabaseProviderFactory, @@ -212,19 +217,23 @@ impl TriePrefetch { TrieElement::Leaf(hashed_address, _) => { tracker.inc_leaf(); match storage_roots.remove(&hashed_address) { - Some(result) => result, + Some(_) => (), // Since we do not store all intermediate nodes in the database, there might // be a possibility of re-adding a non-modified leaf to the hash builder. - None => StorageRoot::new_hashed( - trie_cursor_factory.clone(), - hashed_cursor_factory.clone(), - hashed_address, - #[cfg(feature = "metrics")] - self.metrics.clone(), - ) - .prefetch() - .ok() - .unwrap_or_default(), + None => { + if let Ok((storage_root, _, updates)) = StorageRoot::new_hashed( + trie_cursor_factory.clone(), + hashed_cursor_factory.clone(), + hashed_address, + #[cfg(feature = "metrics")] + self.metrics.clone(), + ) + .calculate(true) + { + missing_leaves_cache + .insert(hashed_address, (storage_root, updates)); + } + } }; } }