Skip to content

Commit

Permalink
feat: support importing blocks without merkle calculation (#123)
Browse files Browse the repository at this point in the history
* feat: support import blocks without merkle calculation

* fix: op command

* fix: env arguments

* fix: lint

* fix: params passing

* fix: clippy

* fix: skip hashed account/storages in live sync

* chore: rename flag to skip_state_root_validation

* fix: refine args

* chore: fix lint

* chore: update description of flags

* fix: op command

* fix: debug cmd

* chore: update flag name

* fix: review comment

* chore: fix lint
  • Loading branch information
j75689 authored Sep 10, 2024
1 parent 7b15d77 commit 0493eac
Show file tree
Hide file tree
Showing 19 changed files with 173 additions and 56 deletions.
1 change: 1 addition & 0 deletions bin/reth/src/commands/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl Command {
executor.clone(),
stage_conf.clone(),
prune_modes.clone(),
self.env.performance_optimization.skip_state_root_validation,
)
.set(ExecutionStage::new(
executor,
Expand Down
117 changes: 71 additions & 46 deletions crates/blockchain-tree/src/blockchain_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ use reth_provider::{
use reth_prune_types::PruneModes;
use reth_stages_api::{MetricEvent, MetricEventsSender};
use reth_storage_errors::provider::{ProviderResult, RootMismatch};
use reth_trie::{hashed_cursor::HashedPostStateCursorFactory, StateRoot};
use reth_trie::{
hashed_cursor::HashedPostStateCursorFactory, updates::TrieUpdates, HashedPostStateSorted,
StateRoot,
};
use std::{
collections::{btree_map::Entry, BTreeMap, HashSet},
sync::Arc,
Expand Down Expand Up @@ -74,6 +77,8 @@ pub struct BlockchainTree<DB, E> {
metrics: TreeMetrics,
/// Whether to enable prefetch when execute block
enable_prefetch: bool,
/// Disable state root calculation for blocks.
skip_state_root_validation: bool,
}

impl<DB, E> BlockchainTree<DB, E> {
Expand Down Expand Up @@ -146,6 +151,7 @@ where
sync_metrics_tx: None,
metrics: Default::default(),
enable_prefetch: false,
skip_state_root_validation: false,
})
}

Expand Down Expand Up @@ -176,6 +182,14 @@ where
self
}

/// Set the state root calculation to be disabled.
///
/// This is helpful when the state root is taking too long to calculate.
pub const fn skip_state_root_validation(mut self) -> Self {
self.skip_state_root_validation = true;
self
}

/// Check if the block is known to blockchain tree or database and return its status.
///
/// Function will check:
Expand Down Expand Up @@ -394,7 +408,7 @@ where
fn try_append_canonical_chain(
&mut self,
block: SealedBlockWithSenders,
block_validation_kind: BlockValidationKind,
mut block_validation_kind: BlockValidationKind,
) -> Result<BlockStatus, InsertBlockErrorKind> {
let parent = block.parent_num_hash();
let block_num_hash = block.num_hash();
Expand Down Expand Up @@ -435,6 +449,10 @@ where
BlockAttachment::HistoricalFork
};

if self.skip_state_root_validation {
block_validation_kind = BlockValidationKind::SkipStateRootValidation;
}

let chain = AppendableChain::new_canonical_fork(
block,
&parent_header,
Expand All @@ -460,7 +478,7 @@ where
&mut self,
block: SealedBlockWithSenders,
chain_id: BlockchainId,
block_validation_kind: BlockValidationKind,
mut block_validation_kind: BlockValidationKind,
) -> Result<BlockStatus, InsertBlockErrorKind> {
let block_num_hash = block.num_hash();
debug!(target: "blockchain_tree", ?block_num_hash, ?chain_id, "Inserting block into side chain");
Expand All @@ -481,6 +499,10 @@ where
let chain_tip = parent_chain.tip().hash();
let canonical_chain = self.state.block_indices.canonical_chain();

if self.skip_state_root_validation {
block_validation_kind = BlockValidationKind::SkipStateRootValidation;
}

// append the block if it is continuing the side chain.
let block_attachment = if chain_tip == block.parent_hash {
// check if the chain extends the currently tracked canonical head
Expand Down Expand Up @@ -1228,50 +1250,53 @@ where
recorder: &mut MakeCanonicalDurationsRecorder,
) -> Result<(), CanonicalError> {
let (blocks, state, chain_trie_updates) = chain.into_inner();
let hashed_state = state.hash_state_slow();
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
let hashed_state_sorted = hashed_state.into_sorted();

// Compute state root or retrieve cached trie updates before opening write transaction.
let block_hash_numbers =
blocks.iter().map(|(number, b)| (number, b.hash())).collect::<Vec<_>>();
let trie_updates = match chain_trie_updates {
Some(updates) => {
debug!(target: "blockchain_tree", blocks = ?block_hash_numbers, "Using cached trie updates");
self.metrics.trie_updates_insert_cached.increment(1);
updates
}
None => {
debug!(target: "blockchain_tree", blocks = ?block_hash_numbers, "Recomputing state root for insert");
let provider = self
.externals
.provider_factory
.provider()?
// State root calculation can take a while, and we're sure no write transaction
// will be open in parallel. See https://github.com/paradigmxyz/reth/issues/6168.
.disable_long_read_transaction_safety();
let (state_root, trie_updates) = StateRoot::from_tx(provider.tx_ref())
.with_hashed_cursor_factory(HashedPostStateCursorFactory::new(
provider.tx_ref(),
&hashed_state_sorted,
))
.with_prefix_sets(prefix_sets)
.root_with_updates()
.map_err(Into::<BlockValidationError>::into)?;
let tip = blocks.tip();
if state_root != tip.state_root {
return Err(ProviderError::StateRootMismatch(Box::new(RootMismatch {
root: GotExpected { got: state_root, expected: tip.state_root },
block_number: tip.number,
block_hash: tip.hash(),
}))
.into())
let mut hashed_state_sorted = HashedPostStateSorted::default();
let mut trie_updates = TrieUpdates::default();
if !self.skip_state_root_validation {
let hashed_state = state.hash_state_slow();
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
hashed_state_sorted = hashed_state.into_sorted();
// Compute state root or retrieve cached trie updates before opening write transaction.
let block_hash_numbers =
blocks.iter().map(|(number, b)| (number, b.hash())).collect::<Vec<_>>();
trie_updates = match chain_trie_updates {
Some(updates) => {
debug!(target: "blockchain_tree", blocks = ?block_hash_numbers, "Using cached trie updates");
self.metrics.trie_updates_insert_cached.increment(1);
updates
}
self.metrics.trie_updates_insert_recomputed.increment(1);
trie_updates
}
};
recorder.record_relative(MakeCanonicalAction::RetrieveStateTrieUpdates);
None => {
debug!(target: "blockchain_tree", blocks = ?block_hash_numbers, "Recomputing state root for insert");
let provider = self
.externals
.provider_factory
.provider()?
// State root calculation can take a while, and we're sure no write
// transaction will be open in parallel. See https://github.com/paradigmxyz/reth/issues/6168.
.disable_long_read_transaction_safety();
let (state_root, trie_updates) = StateRoot::from_tx(provider.tx_ref())
.with_hashed_cursor_factory(HashedPostStateCursorFactory::new(
provider.tx_ref(),
&hashed_state_sorted,
))
.with_prefix_sets(prefix_sets)
.root_with_updates()
.map_err(Into::<BlockValidationError>::into)?;
let tip = blocks.tip();
if state_root != tip.state_root {
return Err(ProviderError::StateRootMismatch(Box::new(RootMismatch {
root: GotExpected { got: state_root, expected: tip.state_root },
block_number: tip.number,
block_hash: tip.hash(),
}))
.into())
}
self.metrics.trie_updates_insert_recomputed.increment(1);
trie_updates
}
};
recorder.record_relative(MakeCanonicalAction::RetrieveStateTrieUpdates);
}

let provider_rw = self.externals.provider_factory.provider_rw()?;
provider_rw
Expand Down
7 changes: 6 additions & 1 deletion crates/cli/commands/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use reth_evm::noop::NoopBlockExecutorProvider;
use reth_node_core::{
args::{
utils::{chain_help, chain_value_parser, SUPPORTED_CHAINS},
DatabaseArgs, DatadirArgs,
DatabaseArgs, DatadirArgs, PerformanceOptimizationArgs,
},
dirs::{ChainPath, DataDirPath},
};
Expand Down Expand Up @@ -49,6 +49,10 @@ pub struct EnvironmentArgs {
/// All database related arguments
#[command(flatten)]
pub db: DatabaseArgs,

/// All performance optimization related arguments
#[command(flatten)]
pub performance_optimization: PerformanceOptimizationArgs,
}

impl EnvironmentArgs {
Expand Down Expand Up @@ -145,6 +149,7 @@ impl EnvironmentArgs {
NoopBlockExecutorProvider::default(),
config.stages.clone(),
prune_modes.clone(),
self.performance_optimization.skip_state_root_validation,
))
.build(factory.clone(), StaticFileProducer::new(factory.clone(), prune_modes));

Expand Down
4 changes: 4 additions & 0 deletions crates/cli/commands/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ impl ImportCommand {
StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
self.no_state,
executor.clone(),
self.env.performance_optimization.skip_state_root_validation,
)?;

// override the tip
Expand Down Expand Up @@ -160,6 +161,7 @@ impl ImportCommand {
///
/// If configured to execute, all stages will run. Otherwise, only stages that don't require state
/// will run.
#[allow(clippy::too_many_arguments)]
pub fn build_import_pipeline<DB, C, E>(
config: &Config,
provider_factory: ProviderFactory<DB>,
Expand All @@ -168,6 +170,7 @@ pub fn build_import_pipeline<DB, C, E>(
static_file_producer: StaticFileProducer<DB>,
disable_exec: bool,
executor: E,
skip_state_root_validation: bool,
) -> eyre::Result<(Pipeline<DB>, impl Stream<Item = NodeEvent>)>
where
DB: Database + Clone + Unpin + 'static,
Expand Down Expand Up @@ -219,6 +222,7 @@ where
executor,
config.stages.clone(),
PruneModes::default(),
skip_state_root_validation,
)
.builder()
.disable_all_if(&StageId::STATE_REQUIRED, || disable_exec),
Expand Down
9 changes: 7 additions & 2 deletions crates/cli/commands/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use reth_node_core::{
args::{
utils::{chain_help, chain_value_parser, SUPPORTED_CHAINS},
DatabaseArgs, DatadirArgs, DebugArgs, DevArgs, NetworkArgs, PayloadBuilderArgs,
PruningArgs, RpcServerArgs, TxPoolArgs,
PerformanceOptimizationArgs, PruningArgs, RpcServerArgs, TxPoolArgs,
},
node_config::NodeConfig,
version,
Expand Down Expand Up @@ -110,6 +110,10 @@ pub struct NodeCommand<Ext: clap::Args + fmt::Debug = NoArgs> {
/// Enable prefetch when executing block
#[arg(long, default_value_t = false)]
pub enable_prefetch: bool,

/// All performance optimization related arguments
#[command(flatten)]
pub performance_optimization: PerformanceOptimizationArgs,
}

impl NodeCommand {
Expand Down Expand Up @@ -157,8 +161,8 @@ impl<Ext: clap::Args + fmt::Debug> NodeCommand<Ext> {
pruning,
ext,
enable_prefetch,
performance_optimization,
} = self;

// set up node config
let mut node_config = NodeConfig {
datadir,
Expand All @@ -175,6 +179,7 @@ impl<Ext: clap::Args + fmt::Debug> NodeCommand<Ext> {
dev,
pruning,
enable_prefetch,
skip_state_root_validation: performance_optimization.skip_state_root_validation,
};

// Register the prometheus recorder before creating the database,
Expand Down
12 changes: 9 additions & 3 deletions crates/cli/commands/src/stage/unwind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,14 @@ impl Command {

let builder = if self.offline {
Pipeline::builder().add_stages(
OfflineStages::new(executor, config.stages, PruneModes::default())
.builder()
.disable(reth_stages::StageId::SenderRecovery),
OfflineStages::new(
executor,
config.stages,
PruneModes::default(),
self.env.performance_optimization.skip_state_root_validation,
)
.builder()
.disable(reth_stages::StageId::SenderRecovery),
)
} else {
Pipeline::builder().with_tip_sender(tip_tx).add_stages(
Expand All @@ -140,6 +145,7 @@ impl Command {
executor.clone(),
stage_conf.clone(),
prune_modes.clone(),
self.env.performance_optimization.skip_state_root_validation,
)
.set(ExecutionStage::new(
executor,
Expand Down
1 change: 1 addition & 0 deletions crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ where
executor_factory.clone(),
StageConfig::default(),
PruneModes::default(),
false,
))
}
};
Expand Down
1 change: 1 addition & 0 deletions crates/ethereum/node/src/launch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ where
static_file_producer,
ctx.components().block_executor().clone(),
pipeline_exex_handle,
ctx.node_config().skip_state_root_validation,
)?;

let pipeline_events = pipeline.events();
Expand Down
6 changes: 6 additions & 0 deletions crates/node/builder/src/launch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ where
NoopBlockExecutorProvider::default(),
self.toml_config().stages.clone(),
self.prune_modes(),
self.node_config().skip_state_root_validation,
))
.build(
factory.clone(),
Expand Down Expand Up @@ -640,6 +641,7 @@ where
consensus.clone(),
components.block_executor().clone(),
);

let mut tree =
BlockchainTree::new(tree_externals, *self.tree_config(), self.prune_modes())?
.with_sync_metrics_tx(self.sync_metrics_tx())
Expand All @@ -653,6 +655,10 @@ where
tree = tree.enable_prefetch();
}

if self.node_config().skip_state_root_validation {
tree = tree.skip_state_root_validation();
}

let blockchain_tree = Arc::new(ShareableBlockchainTree::new(tree));

// Replace the tree component with the actual tree
Expand Down
2 changes: 2 additions & 0 deletions crates/node/builder/src/launch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ where
static_file_producer,
ctx.components().block_executor().clone(),
pipeline_exex_handle,
ctx.node_config().skip_state_root_validation,
)?;

let pipeline_events = pipeline.events();
Expand All @@ -245,6 +246,7 @@ where
static_file_producer,
ctx.components().block_executor().clone(),
pipeline_exex_handle,
ctx.node_config().skip_state_root_validation,
)?;
#[cfg(feature = "bsc")]
{
Expand Down
Loading

0 comments on commit 0493eac

Please sign in to comment.