Skip to content

Commit

Permalink
perf(engine): migrate to AsyncStateRoot (#10927)
Browse files Browse the repository at this point in the history
Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
  • Loading branch information
fgimenez and rkrasiuk authored Sep 25, 2024
1 parent 2022dd3 commit 1e0a35e
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 26 deletions.
4 changes: 4 additions & 0 deletions crates/blockchain-tree-api/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,9 @@ pub enum InsertBlockErrorKindTwo {
/// Provider error.
#[error(transparent)]
Provider(#[from] ProviderError),
/// Other errors.
#[error(transparent)]
Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
}

impl InsertBlockErrorKindTwo {
Expand Down Expand Up @@ -365,6 +368,7 @@ impl InsertBlockErrorKindTwo {
}
}
Self::Provider(err) => Err(InsertBlockFatalError::Provider(err)),
Self::Other(err) => Err(InternalBlockExecutionError::Other(err).into()),
}
}
}
Expand Down
20 changes: 11 additions & 9 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use reth_rpc_types::{
};
use reth_stages_api::ControlFlow;
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
use reth_trie_parallel::parallel_root::ParallelStateRoot;
use reth_trie_parallel::async_root::{AsyncStateRoot, AsyncStateRootError};
use std::{
cmp::Ordering,
collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet, VecDeque},
Expand Down Expand Up @@ -549,6 +549,7 @@ where
config: TreeConfig,
) -> Self {
let (incoming_tx, incoming) = std::sync::mpsc::channel();

Self {
provider,
executor_provider,
Expand Down Expand Up @@ -2193,14 +2194,14 @@ where
let persistence_in_progress = self.persistence_state.in_progress();
if !persistence_in_progress {
state_root_result = match self
.compute_state_root_in_parallel(block.parent_hash, &hashed_state)
.compute_state_root_async(block.parent_hash, &hashed_state)
{
Ok((state_root, trie_output)) => Some((state_root, trie_output)),
Err(ProviderError::ConsistentView(error)) => {
debug!(target: "engine::tree", %error, "Parallel state root computation failed consistency check, falling back");
Err(AsyncStateRootError::Provider(ProviderError::ConsistentView(error))) => {
debug!(target: "engine", %error, "Async state root computation failed consistency check, falling back");
None
}
Err(error) => return Err(error.into()),
Err(error) => return Err(InsertBlockErrorKindTwo::Other(Box::new(error))),
};
}

Expand Down Expand Up @@ -2263,19 +2264,20 @@ where
Ok(InsertPayloadOk2::Inserted(BlockStatus2::Valid))
}

/// Compute state root for the given hashed post state in parallel.
/// Compute state root for the given hashed post state asynchronously.
///
/// # Returns
///
/// Returns `Ok(_)` if computed successfully.
/// Returns `Err(_)` if error was encountered during computation.
/// `Err(ProviderError::ConsistentView(_))` can be safely ignored and fallback computation
/// should be used instead.
fn compute_state_root_in_parallel(
fn compute_state_root_async(
&self,
parent_hash: B256,
hashed_state: &HashedPostState,
) -> ProviderResult<(B256, TrieUpdates)> {
) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
let mut input = TrieInput::default();

Expand All @@ -2297,7 +2299,7 @@ where
// Extend with block we are validating root for.
input.append_ref(hashed_state);

Ok(ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()?)
AsyncStateRoot::new(consistent_view, input).incremental_root_with_updates()
}

/// Handles an error that occurred while inserting a block.
Expand Down
2 changes: 1 addition & 1 deletion crates/trie/parallel/benches/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub fn calculate_state_root(c: &mut Criterion) {

// async root
group.bench_function(BenchmarkId::new("async root", size), |b| {
b.to_async(&runtime).iter_with_setup(
b.iter_with_setup(
|| AsyncStateRoot::new(view.clone(), TrieInput::from_state(updated_state.clone())),
|calculator| calculator.incremental_root(),
);
Expand Down
27 changes: 11 additions & 16 deletions crates/trie/parallel/src/async_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use reth_trie::{
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
use std::{collections::HashMap, sync::Arc};
use thiserror::Error;
use tokio::sync::oneshot;
use tracing::*;

/// Async state root calculator.
Expand Down Expand Up @@ -63,21 +62,16 @@ where
Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone + Send + Sync + 'static,
{
/// Calculate incremental state root asynchronously.
pub async fn incremental_root(self) -> Result<B256, AsyncStateRootError> {
self.calculate(false).await.map(|(root, _)| root)
pub fn incremental_root(self) -> Result<B256, AsyncStateRootError> {
self.calculate(false).map(|(root, _)| root)
}

/// Calculate incremental state root with updates asynchronously.
pub async fn incremental_root_with_updates(
self,
) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
self.calculate(true).await
pub fn incremental_root_with_updates(self) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
self.calculate(true)
}

async fn calculate(
self,
retain_updates: bool,
) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
fn calculate(self, retain_updates: bool) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
let mut tracker = ParallelTrieTracker::default();
let trie_nodes_sorted = Arc::new(self.input.nodes.into_sorted());
let hashed_state_sorted = Arc::new(self.input.state.into_sorted());
Expand All @@ -100,7 +94,7 @@ where
#[cfg(feature = "metrics")]
let metrics = self.metrics.storage_trie.clone();

let (tx, rx) = oneshot::channel();
let (tx, rx) = std::sync::mpsc::sync_channel(1);

rayon::spawn_fifo(move || {
let result = (|| -> Result<_, AsyncStateRootError> {
Expand Down Expand Up @@ -160,7 +154,7 @@ where
}
TrieElement::Leaf(hashed_address, account) => {
let (storage_root, _, updates) = match storage_roots.remove(&hashed_address) {
Some(rx) => rx.await.map_err(|_| {
Some(rx) => rx.recv().map_err(|_| {
AsyncStateRootError::StorageRootChannelClosed { hashed_address }
})??,
// Since we do not store all intermediate nodes in the database, there might
Expand Down Expand Up @@ -227,6 +221,9 @@ pub enum AsyncStateRootError {
/// The hashed address for which channel was closed.
hashed_address: B256,
},
/// Receive error
#[error(transparent)]
Receive(#[from] std::sync::mpsc::RecvError),
/// Error while calculating storage root.
#[error(transparent)]
StorageRoot(#[from] StorageRootError),
Expand Down Expand Up @@ -292,7 +289,6 @@ mod tests {
assert_eq!(
AsyncStateRoot::new(consistent_view.clone(), Default::default(),)
.incremental_root()
.await
.unwrap(),
test_utils::state_root(state.clone())
);
Expand Down Expand Up @@ -323,9 +319,8 @@ mod tests {
}

assert_eq!(
AsyncStateRoot::new(consistent_view.clone(), TrieInput::from_state(hashed_state))
AsyncStateRoot::new(consistent_view, TrieInput::from_state(hashed_state))
.incremental_root()
.await
.unwrap(),
test_utils::state_root(state)
);
Expand Down

0 comments on commit 1e0a35e

Please sign in to comment.