Skip to content

Commit

Permalink
stage execution cache flags
Browse files Browse the repository at this point in the history
  • Loading branch information
forcodedancing committed Oct 18, 2024
1 parent 3ce0585 commit 678a533
Show file tree
Hide file tree
Showing 22 changed files with 186 additions and 34 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions bin/reth/src/commands/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
stage_conf.clone(),
prune_modes.clone(),
self.env.performance_optimization.skip_state_root_validation,
self.env.performance_optimization.enable_execution_cache,
)
.set(ExecutionStage::new(
executor,
Expand All @@ -117,6 +118,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
stage_conf.execution_external_clean_threshold(),
prune_modes,
ExExManagerHandle::empty(),
self.env.performance_optimization.enable_execution_cache,
)),
)
.build(provider_factory, static_file_producer);
Expand Down
1 change: 1 addition & 0 deletions crates/cli/commands/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> EnvironmentArgs<C> {
config.stages.clone(),
prune_modes.clone(),
self.performance_optimization.skip_state_root_validation,
self.performance_optimization.enable_execution_cache,
))
.build(factory.clone(), StaticFileProducer::new(factory.clone(), prune_modes));

Expand Down
3 changes: 3 additions & 0 deletions crates/cli/commands/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> ImportCommand<C> {
self.no_state,
executor.clone(),
self.env.performance_optimization.skip_state_root_validation,
self.env.performance_optimization.enable_execution_cache,
)?;

// override the tip
Expand Down Expand Up @@ -169,6 +170,7 @@ pub fn build_import_pipeline<N, C, E>(
disable_exec: bool,
executor: E,
skip_state_root_validation: bool,
enable_execution_cache: bool,
) -> eyre::Result<(Pipeline<N>, impl Stream<Item = NodeEvent>)>
where
N: NodeTypesWithDB<ChainSpec = ChainSpec>,
Expand Down Expand Up @@ -221,6 +223,7 @@ where
config.stages.clone(),
PruneModes::default(),
skip_state_root_validation,
enable_execution_cache,
)
.builder()
.disable_all_if(&StageId::STATE_REQUIRED, || disable_exec),
Expand Down
1 change: 1 addition & 0 deletions crates/cli/commands/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>, Ext: clap::Args + fmt::Debug> No
pruning,
enable_prefetch,
skip_state_root_validation: performance_optimization.skip_state_root_validation,
enable_execution_cache: performance_optimization.enable_execution_cache,
};

// Register the prometheus recorder before creating the database,
Expand Down
1 change: 1 addition & 0 deletions crates/cli/commands/src/stage/dump/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ fn unwind_and_copy<N: NodeTypesWithDB<ChainSpec = ChainSpec>>(
MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
PruneModes::all(),
ExExManagerHandle::empty(),
false,
);

exec_stage.unwind(
Expand Down
1 change: 1 addition & 0 deletions crates/cli/commands/src/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
config.stages.merkle.clean_threshold,
prune_modes,
ExExManagerHandle::empty(),
self.env.performance_optimization.enable_execution_cache,
)),
None,
),
Expand Down
3 changes: 3 additions & 0 deletions crates/cli/commands/src/stage/unwind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
config.stages,
PruneModes::default(),
self.env.performance_optimization.skip_state_root_validation,
self.env.performance_optimization.enable_execution_cache,
)
.builder()
.disable(reth_stages::StageId::SenderRecovery),
Expand All @@ -154,6 +155,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
stage_conf.clone(),
prune_modes.clone(),
self.env.performance_optimization.skip_state_root_validation,
self.env.performance_optimization.enable_execution_cache,
)
.set(ExecutionStage::new(
executor,
Expand All @@ -166,6 +168,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
stage_conf.execution_external_clean_threshold(),
prune_modes,
ExExManagerHandle::empty(),
self.env.performance_optimization.enable_execution_cache,
)),
)
};
Expand Down
11 changes: 9 additions & 2 deletions crates/engine/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,16 @@ where
invalid_block_hook: Box<dyn InvalidBlockHook>,
sync_metrics_tx: MetricEventsSender,
skip_state_root_validation: bool,
enable_execution_cache: bool,
) -> Self {
let downloader = BasicBlockDownloader::new(client, consensus.clone());

let persistence_handle =
PersistenceHandle::spawn_service(provider, pruner, sync_metrics_tx);
let persistence_handle = PersistenceHandle::spawn_service(
provider,
pruner,
sync_metrics_tx,
enable_execution_cache,
);
let payload_validator = ExecutionPayloadValidator::new(chain_spec);

let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
Expand All @@ -99,6 +104,7 @@ where
tree_config,
invalid_block_hook,
skip_state_root_validation,
enable_execution_cache,
);

let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
Expand Down Expand Up @@ -212,6 +218,7 @@ mod tests {
Box::new(NoopInvalidBlockHook::default()),
sync_metrics_tx,
false,
false,
);
}
}
47 changes: 35 additions & 12 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub struct PersistenceService<N: ProviderNodeTypes> {
metrics: PersistenceMetrics,
/// Sender for sync metrics - we only submit sync metrics for persisted blocks
sync_metrics_tx: MetricEventsSender,
/// Flag indicating whether to enable the state cache for persisted blocks
enable_state_cache: bool,
}

impl<N: ProviderNodeTypes> PersistenceService<N> {
Expand All @@ -65,8 +67,16 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
incoming: Receiver<PersistenceAction>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
sync_metrics_tx: MetricEventsSender,
enable_state_cache: bool,
) -> Self {
Self { provider, incoming, pruner, metrics: PersistenceMetrics::default(), sync_metrics_tx }
Self {
provider,
incoming,
pruner,
metrics: PersistenceMetrics::default(),
sync_metrics_tx,
enable_state_cache,
}
}

/// Prunes block data before the given block hash according to the configured prune
Expand Down Expand Up @@ -131,8 +141,10 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?;
UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)?;

cache_writer::clear_plain_state();
debug!(target: "tree::persistence", "Finish to clear state cache");
if self.enable_state_cache {
cache_writer::clear_plain_state();
debug!(target: "tree::persistence", "Finish to clear state cache");
}

debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
Expand All @@ -154,13 +166,18 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
let static_file_provider = self.provider.static_file_provider();

// update plain state cache
let exec_time = Instant::now();
let provider_ro = self.provider.database_provider_ro()?;
let mut cache_writer = cache_writer::PlainCacheWriter::new(provider_ro.tx_ref());
cache_writer.write_executed_blocks(blocks.clone());
let exec_elapsed = exec_time.elapsed();
update_write_cache_total(last_block_hash_num.unwrap().number, exec_elapsed.as_millis());
debug!(target: "tree::persistence", "Finish to write state cache");
if self.enable_state_cache {
let exec_time = Instant::now();
let provider_ro = self.provider.database_provider_ro()?;
let mut cache_writer = cache_writer::PlainCacheWriter::new(provider_ro.tx_ref());
cache_writer.write_executed_blocks(blocks.clone());
let exec_elapsed = exec_time.elapsed();
update_write_cache_total(
last_block_hash_num.unwrap().number,
exec_elapsed.as_millis(),
);
debug!(target: "tree::persistence", "Finish to write state cache");
}

UnifiedStorageWriter::from(&provider_rw, &static_file_provider).save_blocks(&blocks)?;
UnifiedStorageWriter::commit(provider_rw, static_file_provider)?;
Expand Down Expand Up @@ -221,6 +238,7 @@ impl PersistenceHandle {
provider_factory: ProviderFactory<N>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
sync_metrics_tx: MetricEventsSender,
enable_state_cache: bool,
) -> Self {
// create the initial channels
let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
Expand All @@ -229,8 +247,13 @@ impl PersistenceHandle {
let persistence_handle = Self::new(db_service_tx);

// spawn the persistence service
let db_service =
PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
let db_service = PersistenceService::new(
provider_factory,
db_service_rx,
pruner,
sync_metrics_tx,
enable_state_cache,
);
std::thread::Builder::new()
.name("Persistence Service".to_string())
.spawn(|| {
Expand Down
19 changes: 17 additions & 2 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook};
use lazy_static::lazy_static;
use parking_lot::RwLock;
pub use reth_engine_primitives::InvalidBlockHook;
use reth_provider::providers::{cache_writer, ProviderNodeTypes};
use reth_provider::providers::{cache_writer, CachedStateProvider, ProviderNodeTypes};
use std::sync::atomic::AtomicU64;

lazy_static! {
Expand All @@ -83,7 +83,7 @@ pub(crate) fn update_execution_total(block: u64, inc: u128) {
*current = new;

if block % 100 == 0 {
info!(target: "blockchain_tree_execution", execution = ?new, block = ?block, "Total execution time");
info!(target: "bt_live_execution", execution = ?new, block = ?block, "Total execution time");
}
}

Expand Down Expand Up @@ -516,6 +516,8 @@ pub struct EngineApiTreeHandler<P, E, T: EngineTypes> {
invalid_block_hook: Box<dyn InvalidBlockHook>,
/// Flag indicating whether the state root validation should be skipped.
skip_state_root_validation: bool,
/// Flag indicating whether the cache for execution is enabled.
enable_execution_cache: bool,
}

impl<P: Debug, E: Debug, T: EngineTypes + Debug> std::fmt::Debug for EngineApiTreeHandler<P, E, T> {
Expand All @@ -535,6 +537,8 @@ impl<P: Debug, E: Debug, T: EngineTypes + Debug> std::fmt::Debug for EngineApiTr
.field("config", &self.config)
.field("metrics", &self.metrics)
.field("invalid_block_hook", &format!("{:p}", self.invalid_block_hook))
.field("skip_state_root_validation", &self.skip_state_root_validation)
.field("enable_execution_cache", &self.enable_execution_cache)
.finish()
}
}
Expand All @@ -561,6 +565,7 @@ where
payload_builder: PayloadBuilderHandle<T>,
config: TreeConfig,
skip_state_root_validation: bool,
enable_execution_cache: bool,
) -> Self {
let (incoming_tx, incoming) = std::sync::mpsc::channel();
Self {
Expand All @@ -581,6 +586,7 @@ where
incoming_tx,
invalid_block_hook: Box::new(NoopInvalidBlockHook),
skip_state_root_validation,
enable_execution_cache,
}
}

Expand All @@ -606,6 +612,7 @@ where
config: TreeConfig,
invalid_block_hook: Box<dyn InvalidBlockHook>,
skip_state_root_validation: bool,
enable_execution_cache: bool,
) -> (Sender<FromEngine<EngineApiRequest<T>>>, UnboundedReceiver<EngineApiEvent>) {
let best_block_number = provider.best_block_number().unwrap_or(0);
let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
Expand Down Expand Up @@ -637,6 +644,7 @@ where
payload_builder,
config,
skip_state_root_validation,
enable_execution_cache,
);
task.set_invalid_block_hook(invalid_block_hook);
let incoming = task.incoming_tx.clone();
Expand Down Expand Up @@ -1591,6 +1599,12 @@ where
trace!(target: "engine::tree", %hash, "found canonical state for block in memory");
// the block leads back to the canonical chain
let historical = self.provider.state_by_block_hash(historical)?;
if self.enable_execution_cache {
return Ok(Some(Box::new(MemoryOverlayStateProvider::new(
CachedStateProvider::new(historical).boxed(),
blocks,
))))
}
return Ok(Some(Box::new(MemoryOverlayStateProvider::new(historical, blocks))))
}

Expand Down Expand Up @@ -2750,6 +2764,7 @@ mod tests {
payload_builder,
TreeConfig::default(),
false,
false,
);

let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
Expand Down
1 change: 1 addition & 0 deletions crates/node/builder/src/launch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ where
self.toml_config().stages.clone(),
self.prune_modes(),
self.node_config().skip_state_root_validation,
self.node_config().enable_execution_cache,
))
.build(
factory.clone(),
Expand Down
3 changes: 3 additions & 0 deletions crates/node/builder/src/launch/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ where
ctx.components().block_executor().clone(),
pipeline_exex_handle,
ctx.node_config().skip_state_root_validation,
ctx.node_config().enable_execution_cache,
)?;

// The new engine writes directly to static files. This ensures that they're up to the tip.
Expand Down Expand Up @@ -241,6 +242,7 @@ where
ctx.invalid_block_hook()?,
ctx.sync_metrics_tx(),
ctx.node_config().skip_state_root_validation,
ctx.node_config().enable_execution_cache,
);
eth_service
}
Expand Down Expand Up @@ -274,6 +276,7 @@ where
ctx.invalid_block_hook()?,
ctx.sync_metrics_tx(),
ctx.node_config().skip_state_root_validation,
ctx.node_config().enable_execution_cache,
);
eth_service
}
Expand Down
2 changes: 2 additions & 0 deletions crates/node/builder/src/launch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ where
ctx.components().block_executor().clone(),
pipeline_exex_handle,
ctx.node_config().skip_state_root_validation,
ctx.node_config().enable_execution_cache,
)?;

let pipeline_events = pipeline.events();
Expand All @@ -286,6 +287,7 @@ where
ctx.components().block_executor().clone(),
pipeline_exex_handle,
ctx.node_config().skip_state_root_validation,
ctx.node_config().enable_execution_cache,
)?;
#[cfg(feature = "bsc")]
{
Expand Down
Loading

0 comments on commit 678a533

Please sign in to comment.