From 678a5332dacc2556e16991ab40956fc1bfdc7e0b Mon Sep 17 00:00:00 2001 From: forcodedancing Date: Fri, 18 Oct 2024 20:38:04 +0800 Subject: [PATCH] stage execution cache flags --- Cargo.lock | 2 + bin/reth/src/commands/debug_cmd/execution.rs | 2 + crates/cli/commands/src/common.rs | 1 + crates/cli/commands/src/import.rs | 3 ++ crates/cli/commands/src/node.rs | 1 + crates/cli/commands/src/stage/dump/merkle.rs | 1 + crates/cli/commands/src/stage/run.rs | 1 + crates/cli/commands/src/stage/unwind.rs | 3 ++ crates/engine/service/src/service.rs | 11 ++++- crates/engine/tree/src/persistence.rs | 47 ++++++++++++++----- crates/engine/tree/src/tree/mod.rs | 19 +++++++- crates/node/builder/src/launch/common.rs | 1 + crates/node/builder/src/launch/engine.rs | 3 ++ crates/node/builder/src/launch/mod.rs | 2 + crates/node/builder/src/setup.rs | 5 ++ crates/node/core/src/node_config.rs | 4 ++ crates/stages/stages/Cargo.toml | 3 ++ crates/stages/stages/src/sets.rs | 32 +++++++++++-- crates/stages/stages/src/stages/execution.rs | 37 +++++++++++++-- crates/stages/stages/src/stages/mod.rs | 1 + .../src/providers/blockchain_provider.rs | 14 +++--- .../state/cache/cached_provider_ref.rs | 27 ++++++++++- 22 files changed, 186 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b23b5c058..71129b1a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9607,7 +9607,9 @@ dependencies = [ "criterion", "futures-util", "itertools 0.13.0", + "lazy_static", "num-traits", + "parking_lot 0.12.3", "paste", "pprof", "rand 0.8.5", diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index 0dbd55fc2..f199f22b7 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -105,6 +105,7 @@ impl> Command { stage_conf.clone(), prune_modes.clone(), self.env.performance_optimization.skip_state_root_validation, + self.env.performance_optimization.enable_execution_cache, ) .set(ExecutionStage::new( executor, @@ -117,6 +118,7 @@ impl> Command { stage_conf.execution_external_clean_threshold(), prune_modes, ExExManagerHandle::empty(), + self.env.performance_optimization.enable_execution_cache, )), ) .build(provider_factory, static_file_producer); diff --git a/crates/cli/commands/src/common.rs b/crates/cli/commands/src/common.rs index b720ce95f..8dd2e8fe2 100644 --- a/crates/cli/commands/src/common.rs +++ b/crates/cli/commands/src/common.rs @@ -155,6 +155,7 @@ impl> EnvironmentArgs { config.stages.clone(), prune_modes.clone(), self.performance_optimization.skip_state_root_validation, + self.performance_optimization.enable_execution_cache, )) .build(factory.clone(), StaticFileProducer::new(factory.clone(), prune_modes)); diff --git a/crates/cli/commands/src/import.rs b/crates/cli/commands/src/import.rs index ddad4aac1..ee5dcb5fa 100644 --- a/crates/cli/commands/src/import.rs +++ b/crates/cli/commands/src/import.rs @@ -108,6 +108,7 @@ impl> ImportCommand { self.no_state, executor.clone(), self.env.performance_optimization.skip_state_root_validation, + self.env.performance_optimization.enable_execution_cache, )?; // override the tip @@ -169,6 +170,7 @@ pub fn build_import_pipeline( disable_exec: bool, executor: E, skip_state_root_validation: bool, + enable_execution_cache: bool, ) -> eyre::Result<(Pipeline, impl Stream)> where N: NodeTypesWithDB, @@ -221,6 +223,7 @@ where config.stages.clone(), PruneModes::default(), skip_state_root_validation, + enable_execution_cache, ) .builder() .disable_all_if(&StageId::STATE_REQUIRED, || disable_exec), diff --git a/crates/cli/commands/src/node.rs b/crates/cli/commands/src/node.rs index 66a67bdfc..4fd375e7b 100644 --- a/crates/cli/commands/src/node.rs +++ b/crates/cli/commands/src/node.rs @@ -184,6 +184,7 @@ impl, Ext: clap::Args + fmt::Debug> No pruning, enable_prefetch, skip_state_root_validation: performance_optimization.skip_state_root_validation, + enable_execution_cache: performance_optimization.enable_execution_cache, }; // Register the prometheus recorder before creating the database, diff --git a/crates/cli/commands/src/stage/dump/merkle.rs b/crates/cli/commands/src/stage/dump/merkle.rs index bcd05ca94..92f0fe986 100644 --- a/crates/cli/commands/src/stage/dump/merkle.rs +++ b/crates/cli/commands/src/stage/dump/merkle.rs @@ -102,6 +102,7 @@ fn unwind_and_copy>( MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD, PruneModes::all(), ExExManagerHandle::empty(), + false, ); exec_stage.unwind( diff --git a/crates/cli/commands/src/stage/run.rs b/crates/cli/commands/src/stage/run.rs index 542e0a5ef..2189b66fa 100644 --- a/crates/cli/commands/src/stage/run.rs +++ b/crates/cli/commands/src/stage/run.rs @@ -257,6 +257,7 @@ impl> Command { config.stages.merkle.clean_threshold, prune_modes, ExExManagerHandle::empty(), + self.env.performance_optimization.enable_execution_cache, )), None, ), diff --git a/crates/cli/commands/src/stage/unwind.rs b/crates/cli/commands/src/stage/unwind.rs index ce52c4a65..4e48ad4f7 100644 --- a/crates/cli/commands/src/stage/unwind.rs +++ b/crates/cli/commands/src/stage/unwind.rs @@ -138,6 +138,7 @@ impl> Command { config.stages, PruneModes::default(), self.env.performance_optimization.skip_state_root_validation, + self.env.performance_optimization.enable_execution_cache, ) .builder() .disable(reth_stages::StageId::SenderRecovery), @@ -154,6 +155,7 @@ impl> Command { stage_conf.clone(), prune_modes.clone(), self.env.performance_optimization.skip_state_root_validation, + self.env.performance_optimization.enable_execution_cache, ) .set(ExecutionStage::new( executor, @@ -166,6 +168,7 @@ impl> Command { stage_conf.execution_external_clean_threshold(), prune_modes, ExExManagerHandle::empty(), + self.env.performance_optimization.enable_execution_cache, )), ) }; diff --git a/crates/engine/service/src/service.rs b/crates/engine/service/src/service.rs index 3ee950453..a2f71f85f 100644 --- a/crates/engine/service/src/service.rs +++ b/crates/engine/service/src/service.rs @@ -79,11 +79,16 @@ where invalid_block_hook: Box, sync_metrics_tx: MetricEventsSender, skip_state_root_validation: bool, + enable_execution_cache: bool, ) -> Self { let downloader = BasicBlockDownloader::new(client, consensus.clone()); - let persistence_handle = - PersistenceHandle::spawn_service(provider, pruner, sync_metrics_tx); + let persistence_handle = PersistenceHandle::spawn_service( + provider, + pruner, + sync_metrics_tx, + enable_execution_cache, + ); let payload_validator = ExecutionPayloadValidator::new(chain_spec); let canonical_in_memory_state = blockchain_db.canonical_in_memory_state(); @@ -99,6 +104,7 @@ where tree_config, invalid_block_hook, skip_state_root_validation, + enable_execution_cache, ); let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree); @@ -212,6 +218,7 @@ mod tests { Box::new(NoopInvalidBlockHook::default()), sync_metrics_tx, false, + false, ); } } diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index f7e8bf462..b18bad8d8 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -56,6 +56,8 @@ pub struct PersistenceService { metrics: PersistenceMetrics, /// Sender for sync metrics - we only submit sync metrics for persisted blocks sync_metrics_tx: MetricEventsSender, + /// Flag indicating whether to enable the state cache for persisted blocks + enable_state_cache: bool, } impl PersistenceService { @@ -65,8 +67,16 @@ impl PersistenceService { incoming: Receiver, pruner: PrunerWithFactory>, sync_metrics_tx: MetricEventsSender, + enable_state_cache: bool, ) -> Self { - Self { provider, incoming, pruner, metrics: PersistenceMetrics::default(), sync_metrics_tx } + Self { + provider, + incoming, + pruner, + metrics: PersistenceMetrics::default(), + sync_metrics_tx, + enable_state_cache, + } } /// Prunes block data before the given block hash according to the configured prune @@ -131,8 +141,10 @@ impl PersistenceService { UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?; UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)?; - cache_writer::clear_plain_state(); - debug!(target: "tree::persistence", "Finish to clear state cache"); + if self.enable_state_cache { + cache_writer::clear_plain_state(); + debug!(target: "tree::persistence", "Finish to clear state cache"); + } debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk"); self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed()); @@ -154,13 +166,18 @@ impl PersistenceService { let static_file_provider = self.provider.static_file_provider(); // update plain state cache - let exec_time = Instant::now(); - let provider_ro = self.provider.database_provider_ro()?; - let mut cache_writer = cache_writer::PlainCacheWriter::new(provider_ro.tx_ref()); - cache_writer.write_executed_blocks(blocks.clone()); - let exec_elapsed = exec_time.elapsed(); - update_write_cache_total(last_block_hash_num.unwrap().number, exec_elapsed.as_millis()); - debug!(target: "tree::persistence", "Finish to write state cache"); + if self.enable_state_cache { + let exec_time = Instant::now(); + let provider_ro = self.provider.database_provider_ro()?; + let mut cache_writer = cache_writer::PlainCacheWriter::new(provider_ro.tx_ref()); + cache_writer.write_executed_blocks(blocks.clone()); + let exec_elapsed = exec_time.elapsed(); + update_write_cache_total( + last_block_hash_num.unwrap().number, + exec_elapsed.as_millis(), + ); + debug!(target: "tree::persistence", "Finish to write state cache"); + } UnifiedStorageWriter::from(&provider_rw, &static_file_provider).save_blocks(&blocks)?; UnifiedStorageWriter::commit(provider_rw, static_file_provider)?; @@ -221,6 +238,7 @@ impl PersistenceHandle { provider_factory: ProviderFactory, pruner: PrunerWithFactory>, sync_metrics_tx: MetricEventsSender, + enable_state_cache: bool, ) -> Self { // create the initial channels let (db_service_tx, db_service_rx) = std::sync::mpsc::channel(); @@ -229,8 +247,13 @@ impl PersistenceHandle { let persistence_handle = Self::new(db_service_tx); // spawn the persistence service - let db_service = - PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx); + let db_service = PersistenceService::new( + provider_factory, + db_service_rx, + pruner, + sync_metrics_tx, + enable_state_cache, + ); std::thread::Builder::new() .name("Persistence Service".to_string()) .spawn(|| { diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index ca9706c98..44afee32f 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -69,7 +69,7 @@ pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook}; use lazy_static::lazy_static; use parking_lot::RwLock; pub use reth_engine_primitives::InvalidBlockHook; -use reth_provider::providers::{cache_writer, ProviderNodeTypes}; +use reth_provider::providers::{cache_writer, CachedStateProvider, ProviderNodeTypes}; use std::sync::atomic::AtomicU64; lazy_static! { @@ -83,7 +83,7 @@ pub(crate) fn update_execution_total(block: u64, inc: u128) { *current = new; if block % 100 == 0 { - info!(target: "blockchain_tree_execution", execution = ?new, block = ?block, "Total execution time"); + info!(target: "bt_live_execution", execution = ?new, block = ?block, "Total execution time"); } } @@ -516,6 +516,8 @@ pub struct EngineApiTreeHandler { invalid_block_hook: Box, /// Flag indicating whether the state root validation should be skipped. skip_state_root_validation: bool, + /// Flag indicating whether the cache for execution is enabled. + enable_execution_cache: bool, } impl std::fmt::Debug for EngineApiTreeHandler { @@ -535,6 +537,8 @@ impl std::fmt::Debug for EngineApiTr .field("config", &self.config) .field("metrics", &self.metrics) .field("invalid_block_hook", &format!("{:p}", self.invalid_block_hook)) + .field("skip_state_root_validation", &self.skip_state_root_validation) + .field("enable_execution_cache", &self.enable_execution_cache) .finish() } } @@ -561,6 +565,7 @@ where payload_builder: PayloadBuilderHandle, config: TreeConfig, skip_state_root_validation: bool, + enable_execution_cache: bool, ) -> Self { let (incoming_tx, incoming) = std::sync::mpsc::channel(); Self { @@ -581,6 +586,7 @@ where incoming_tx, invalid_block_hook: Box::new(NoopInvalidBlockHook), skip_state_root_validation, + enable_execution_cache, } } @@ -606,6 +612,7 @@ where config: TreeConfig, invalid_block_hook: Box, skip_state_root_validation: bool, + enable_execution_cache: bool, ) -> (Sender>>, UnboundedReceiver) { let best_block_number = provider.best_block_number().unwrap_or(0); let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default(); @@ -637,6 +644,7 @@ where payload_builder, config, skip_state_root_validation, + enable_execution_cache, ); task.set_invalid_block_hook(invalid_block_hook); let incoming = task.incoming_tx.clone(); @@ -1591,6 +1599,12 @@ where trace!(target: "engine::tree", %hash, "found canonical state for block in memory"); // the block leads back to the canonical chain let historical = self.provider.state_by_block_hash(historical)?; + if self.enable_execution_cache { + return Ok(Some(Box::new(MemoryOverlayStateProvider::new( + CachedStateProvider::new(historical).boxed(), + blocks, + )))) + } return Ok(Some(Box::new(MemoryOverlayStateProvider::new(historical, blocks)))) } @@ -2750,6 +2764,7 @@ mod tests { payload_builder, TreeConfig::default(), false, + false, ); let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone()); diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index ee5ce5968..7c1793dac 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -441,6 +441,7 @@ where self.toml_config().stages.clone(), self.prune_modes(), self.node_config().skip_state_root_validation, + self.node_config().enable_execution_cache, )) .build( factory.clone(), diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index 7c2744284..4945f911b 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -204,6 +204,7 @@ where ctx.components().block_executor().clone(), pipeline_exex_handle, ctx.node_config().skip_state_root_validation, + ctx.node_config().enable_execution_cache, )?; // The new engine writes directly to static files. This ensures that they're up to the tip. @@ -241,6 +242,7 @@ where ctx.invalid_block_hook()?, ctx.sync_metrics_tx(), ctx.node_config().skip_state_root_validation, + ctx.node_config().enable_execution_cache, ); eth_service } @@ -274,6 +276,7 @@ where ctx.invalid_block_hook()?, ctx.sync_metrics_tx(), ctx.node_config().skip_state_root_validation, + ctx.node_config().enable_execution_cache, ); eth_service } diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index e03f46af4..fd1a93611 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -264,6 +264,7 @@ where ctx.components().block_executor().clone(), pipeline_exex_handle, ctx.node_config().skip_state_root_validation, + ctx.node_config().enable_execution_cache, )?; let pipeline_events = pipeline.events(); @@ -286,6 +287,7 @@ where ctx.components().block_executor().clone(), pipeline_exex_handle, ctx.node_config().skip_state_root_validation, + ctx.node_config().enable_execution_cache, )?; #[cfg(feature = "bsc")] { diff --git a/crates/node/builder/src/setup.rs b/crates/node/builder/src/setup.rs index 8ee13e810..a92a0ffde 100644 --- a/crates/node/builder/src/setup.rs +++ b/crates/node/builder/src/setup.rs @@ -36,6 +36,7 @@ pub fn build_networked_pipeline( executor: Executor, exex_manager_handle: ExExManagerHandle, skip_state_root_validation: bool, + enable_execution_cache: bool, ) -> eyre::Result> where N: ProviderNodeTypes, @@ -64,6 +65,7 @@ where executor, exex_manager_handle, skip_state_root_validation, + enable_execution_cache, )?; Ok(pipeline) @@ -84,6 +86,7 @@ pub fn build_pipeline( executor: Executor, exex_manager_handle: ExExManagerHandle, skip_state_root_validation: bool, + enable_execution_cache: bool, ) -> eyre::Result> where N: ProviderNodeTypes, @@ -116,6 +119,7 @@ where stage_config.clone(), prune_modes.clone(), skip_state_root_validation, + enable_execution_cache, ) .set(ExecutionStage::new( executor, @@ -123,6 +127,7 @@ where stage_config.execution_external_clean_threshold(), prune_modes, exex_manager_handle, + enable_execution_cache, )), ) .build(provider_factory, static_file_producer); diff --git a/crates/node/core/src/node_config.rs b/crates/node/core/src/node_config.rs index 12c3d3dab..b6679c881 100644 --- a/crates/node/core/src/node_config.rs +++ b/crates/node/core/src/node_config.rs @@ -133,6 +133,9 @@ pub struct NodeConfig { /// Disable hashing stages to skip merkle tree building pub skip_state_root_validation: bool, + + /// Enable execution cache during block insertion + pub enable_execution_cache: bool, } impl NodeConfig { @@ -421,6 +424,7 @@ impl Default for NodeConfig { datadir: DatadirArgs::default(), enable_prefetch: false, skip_state_root_validation: false, + enable_execution_cache: false, } } } diff --git a/crates/stages/stages/Cargo.toml b/crates/stages/stages/Cargo.toml index ff7c34099..f60a21cc4 100644 --- a/crates/stages/stages/Cargo.toml +++ b/crates/stages/stages/Cargo.toml @@ -52,6 +52,9 @@ rayon.workspace = true num-traits = "0.2.15" tempfile = { workspace = true, optional = true } +lazy_static = "1.5.0" +parking_lot = "0.12.3" + [dev-dependencies] # reth reth-chainspec.workspace = true diff --git a/crates/stages/stages/src/sets.rs b/crates/stages/stages/src/sets.rs index 9336f2afa..5651773c6 100644 --- a/crates/stages/stages/src/sets.rs +++ b/crates/stages/stages/src/sets.rs @@ -28,7 +28,13 @@ //! StaticFileProducer::new(provider_factory.clone(), PruneModes::default()); //! // Build a pipeline with all offline stages. //! let pipeline = Pipeline::::builder() -//! .add_stages(OfflineStages::new(exec, StageConfig::default(), PruneModes::default(), false)) +//! .add_stages(OfflineStages::new( +//! exec, +//! StageConfig::default(), +//! PruneModes::default(), +//! false, +//! false, +//! )) //! .build(provider_factory, static_file_producer); //! //! # } @@ -87,6 +93,8 @@ pub struct DefaultStages { prune_modes: PruneModes, /// Disable hashing stages(`Merkle`, `AccountHashing`, `StorageHashing`) skip_state_root_validation: bool, + /// Flag indicating whether the cache for execution is enabled. + enable_execution_cache: bool, } impl DefaultStages { @@ -102,6 +110,7 @@ impl DefaultStages { stages_config: StageConfig, prune_modes: PruneModes, skip_state_root_validation: bool, + enable_execution_cache: bool, ) -> Self where E: BlockExecutorProvider, @@ -119,6 +128,7 @@ impl DefaultStages { stages_config, prune_modes, skip_state_root_validation, + enable_execution_cache, } } } @@ -134,6 +144,7 @@ where stages_config: StageConfig, prune_modes: PruneModes, skip_state_root_validation: bool, + enable_execution_cache: bool, ) -> StageSetBuilder { StageSetBuilder::default() .add_set(default_offline) @@ -142,6 +153,7 @@ where stages_config, prune_modes, skip_state_root_validation, + enable_execution_cache, )) .add_stage(FinishStage) } @@ -162,6 +174,7 @@ where self.stages_config.clone(), self.prune_modes, self.skip_state_root_validation, + self.enable_execution_cache, ) } } @@ -275,6 +288,7 @@ pub struct OfflineStages { prune_modes: PruneModes, /// Disable hashing stages(`Merkle`, `AccountHashing`, `StorageHashing`) disable_hashing: bool, + enable_execution_cache: bool, } impl OfflineStages { @@ -284,8 +298,15 @@ impl OfflineStages { stages_config: StageConfig, prune_modes: PruneModes, disable_hashing: bool, + enable_execution_cache: bool, ) -> Self { - Self { executor_factory, stages_config, prune_modes, disable_hashing } + Self { + executor_factory, + stages_config, + prune_modes, + disable_hashing, + enable_execution_cache, + } } } @@ -299,6 +320,7 @@ where self.executor_factory, self.stages_config.clone(), self.prune_modes.clone(), + self.enable_execution_cache, ) .builder() // If sender recovery prune mode is set, add the prune sender recovery stage. @@ -333,6 +355,8 @@ pub struct ExecutionStages { stages_config: StageConfig, /// Prune configuration for every segment that can be pruned prune_modes: PruneModes, + /// Enable cache or not. + enable_cache: bool, } impl ExecutionStages { @@ -341,8 +365,9 @@ impl ExecutionStages { executor_factory: E, stages_config: StageConfig, prune_modes: PruneModes, + enable_cache: bool, ) -> Self { - Self { executor_factory, stages_config, prune_modes } + Self { executor_factory, stages_config, prune_modes, enable_cache } } } @@ -359,6 +384,7 @@ where self.stages_config.execution, self.stages_config.execution_external_clean_threshold(), self.prune_modes, + self.enable_cache, )) } } diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index b6033980b..e743cac5d 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -18,7 +18,8 @@ use reth_provider::{ }, writer::UnifiedStorageWriter, BlockReader, DBProvider, DatabaseProviderRW, HeaderProvider, LatestStateProviderRef, - OriginalValuesKnown, ProviderError, StateWriter, StatsReader, TransactionVariant, + OriginalValuesKnown, ProviderError, StateProvider, StateWriter, StatsReader, + TransactionVariant, }; use reth_prune_types::PruneModes; use reth_revm::database::StateProviderDatabase; @@ -36,6 +37,25 @@ use std::{ }; use tracing::*; +use lazy_static::lazy_static; +use parking_lot::RwLock; +use std::sync::atomic::AtomicU64; + +lazy_static! { + static ref EXECUTION_TIME: RwLock = RwLock::new(AtomicU64::new(0)); +} + +pub(crate) fn update_execution_total(block: u64, inc: u128) { + let mut binding = EXECUTION_TIME.write(); + let current = binding.get_mut(); + let new = *current + inc as u64; + *current = new; + + if block % 100 == 0 { + info!(target: "bt_pipeline_execution", execution = ?new, block = ?block, "Total execution time"); + } +} + /// The execution stage executes all transactions and /// update history indexes. /// @@ -90,6 +110,8 @@ pub struct ExecutionStage { exex_manager_handle: ExExManagerHandle, /// Executor metrics. metrics: ExecutorMetrics, + /// Flag indicating whether the cache for execution is enabled. + enable_cache: bool, } impl ExecutionStage { @@ -100,6 +122,7 @@ impl ExecutionStage { external_clean_threshold: u64, prune_modes: PruneModes, exex_manager_handle: ExExManagerHandle, + enable_execution_cache: bool, ) -> Self { Self { external_clean_threshold, @@ -110,6 +133,7 @@ impl ExecutionStage { post_unwind_commit_input: None, exex_manager_handle, metrics: ExecutorMetrics::default(), + enable_cache: enable_execution_cache, } } @@ -123,6 +147,7 @@ impl ExecutionStage { MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD, PruneModes::none(), ExExManagerHandle::empty(), + false, ) } @@ -132,6 +157,7 @@ impl ExecutionStage { config: ExecutionConfig, external_clean_threshold: u64, prune_modes: PruneModes, + enable_cache: bool, ) -> Self { Self::new( executor_provider, @@ -139,6 +165,7 @@ impl ExecutionStage { external_clean_threshold, prune_modes, ExExManagerHandle::empty(), + enable_cache, ) } @@ -221,13 +248,10 @@ where None }; - // let db = StateProviderDatabase(LatestStateProviderRef::new( - // provider.tx_ref(), - // provider.static_file_provider().clone(), - // )); let db = StateProviderDatabase(CachedStateProviderRef::new( provider.tx_ref(), provider.static_file_provider().clone(), + self.enable_cache, )); let mut executor = self.executor_provider.batch_executor(db); executor.set_tip(max_block); @@ -290,6 +314,8 @@ where execution_duration += execute_start.elapsed(); + update_execution_total(block_number, execution_duration.as_millis()); + // Log execution throughput if last_log_instant.elapsed() >= log_duration { info!( @@ -705,6 +731,7 @@ mod tests { MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD, PruneModes::none(), ExExManagerHandle::empty(), + false, ) } diff --git a/crates/stages/stages/src/stages/mod.rs b/crates/stages/stages/src/stages/mod.rs index 694f3e472..8596edf28 100644 --- a/crates/stages/stages/src/stages/mod.rs +++ b/crates/stages/stages/src/stages/mod.rs @@ -161,6 +161,7 @@ mod tests { MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD, prune_modes.clone(), ExExManagerHandle::empty(), + false, ); execution_stage.execute(&provider, input).unwrap(); diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 50bb61e98..4a89e02f6 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -1,11 +1,10 @@ use super::{DatabaseProvider, ProviderNodeTypes}; use crate::{ - providers::{state::cache::cached_provider::CachedStateProvider, StaticFileProvider}, - AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt, - BlockSource, CanonChainTracker, CanonStateNotifications, CanonStateSubscriptions, - ChainSpecProvider, ChangeSetReader, DatabaseProviderFactory, DatabaseProviderRO, - EvmEnvProvider, FinalizedBlockReader, HeaderProvider, ParliaSnapshotReader, ProviderError, - ProviderFactory, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, + providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, + BlockReader, BlockReaderIdExt, BlockSource, CanonChainTracker, CanonStateNotifications, + CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, DatabaseProviderFactory, + DatabaseProviderRO, EvmEnvProvider, FinalizedBlockReader, HeaderProvider, ParliaSnapshotReader, + ProviderError, ProviderFactory, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, RequestsProvider, StageCheckpointReader, StateProviderBox, StateProviderFactory, StateReader, StaticFileProviderFactory, TransactionVariant, TransactionsProvider, WithdrawalsProvider, }; @@ -1118,8 +1117,7 @@ impl StateProviderFactory for BlockchainProvider2 { trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash"); if let Ok(state) = self.database.history_by_block_hash(block_hash) { // This could be tracked by a block in the database block - - Ok(CachedStateProvider::new(state).boxed()) + Ok(state) } else if let Some(state) = self.canonical_in_memory_state.state_by_hash(block_hash) { // ... or this could be tracked by the in memory state let state_provider = self.block_state_provider(state)?; diff --git a/crates/storage/provider/src/providers/state/cache/cached_provider_ref.rs b/crates/storage/provider/src/providers/state/cache/cached_provider_ref.rs index 086f82a1e..734a22553 100644 --- a/crates/storage/provider/src/providers/state/cache/cached_provider_ref.rs +++ b/crates/storage/provider/src/providers/state/cache/cached_provider_ref.rs @@ -25,18 +25,27 @@ pub struct CachedStateProviderRef<'b, TX: DbTx> { tx: &'b TX, /// Static File provider static_file_provider: StaticFileProvider, + cache_enabled: bool, } impl<'b, TX: DbTx> CachedStateProviderRef<'b, TX> { /// Create new state provider - pub const fn new(tx: &'b TX, static_file_provider: StaticFileProvider) -> Self { - Self { tx, static_file_provider } + pub const fn new( + tx: &'b TX, + static_file_provider: StaticFileProvider, + cache_enabled: bool, + ) -> Self { + Self { tx, static_file_provider, cache_enabled } } } impl<'b, TX: DbTx> AccountReader for CachedStateProviderRef<'b, TX> { /// Get basic account information. fn basic_account(&self, address: Address) -> ProviderResult> { + if !self.cache_enabled { + return self.tx.get::(address).map_err(Into::into) + } + if let Some(v) = crate::providers::state::cache::plain_state::get_account(&address) { return Ok(Some(v)) } @@ -158,6 +167,16 @@ impl<'b, TX: DbTx> StateProvider for CachedStateProviderRef<'b, TX> { account: Address, storage_key: StorageKey, ) -> ProviderResult> { + if !self.cache_enabled { + let mut cursor = self.tx.cursor_dup_read::()?; + if let Some(entry) = cursor.seek_by_key_subkey(account, storage_key)? { + if entry.key == storage_key { + return Ok(Some(entry.value)) + } + } + return Ok(None) + } + let key = (account, storage_key); if let Some(v) = crate::providers::state::cache::plain_state::get_storage(&key) { return Ok(Some(v)) @@ -175,6 +194,10 @@ impl<'b, TX: DbTx> StateProvider for CachedStateProviderRef<'b, TX> { /// Get account code by its hash fn bytecode_by_hash(&self, code_hash: B256) -> ProviderResult> { + if !self.cache_enabled { + return self.tx.get::(code_hash).map_err(Into::into) + } + if let Some(v) = crate::providers::state::cache::plain_state::get_code(&code_hash) { return Ok(Some(v)) }