diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index e6b8c489d1ef..565852b7d7a5 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -127,7 +127,6 @@ impl Command { header_downloader, body_downloader, factory.clone(), - static_file_producer, )? .set(SenderRecoveryStage { commit_threshold: stage_conf.sender_recovery.commit_threshold, @@ -148,7 +147,7 @@ impl Command { config.prune.clone().map(|prune| prune.segments).unwrap_or_default(), )), ) - .build(provider_factory); + .build(provider_factory, static_file_producer); Ok(pipeline) } diff --git a/bin/reth/src/commands/debug_cmd/replay_engine.rs b/bin/reth/src/commands/debug_cmd/replay_engine.rs index 4c06f1ef2cd7..374844d1bbc6 100644 --- a/bin/reth/src/commands/debug_cmd/replay_engine.rs +++ b/bin/reth/src/commands/debug_cmd/replay_engine.rs @@ -25,10 +25,11 @@ use reth_node_ethereum::{EthEngineTypes, EthEvmConfig}; #[cfg(feature = "optimism")] use reth_node_optimism::{OptimismEngineTypes, OptimismEvmConfig}; use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; -use reth_primitives::{fs, ChainSpec}; +use reth_primitives::{fs, ChainSpec, PruneModes}; use reth_provider::{providers::BlockchainProvider, CanonStateSubscriptions, ProviderFactory}; use reth_revm::EvmProcessorFactory; use reth_stages::Pipeline; +use reth_static_file::StaticFileProducer; use reth_tasks::TaskExecutor; use reth_transaction_pool::noop::NoopTransactionPool; use std::{ @@ -196,7 +197,14 @@ impl Command { let (consensus_engine_tx, consensus_engine_rx) = mpsc::unbounded_channel(); let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel( network_client, - Pipeline::builder().build(provider_factory), + Pipeline::builder().build( + provider_factory.clone(), + StaticFileProducer::new( + provider_factory.clone(), + provider_factory.static_file_provider(), + PruneModes::default(), + ), + ), blockchain_db.clone(), Box::new(ctx.task_executor.clone()), Box::new(network), diff --git a/bin/reth/src/commands/import.rs b/bin/reth/src/commands/import.rs index 6bf89793fd26..f492a456d23c 100644 --- a/bin/reth/src/commands/import.rs +++ b/bin/reth/src/commands/import.rs @@ -191,7 +191,6 @@ impl ImportCommand { header_downloader, body_downloader, factory.clone(), - static_file_producer, )? .set(SenderRecoveryStage { commit_threshold: config.stages.sender_recovery.commit_threshold, @@ -213,7 +212,7 @@ impl ImportCommand { config.prune.map(|prune| prune.segments).unwrap_or_default(), )), ) - .build(provider_factory); + .build(provider_factory, static_file_producer); let events = pipeline.events().map(Into::into); diff --git a/crates/consensus/beacon/src/engine/sync.rs b/crates/consensus/beacon/src/engine/sync.rs index 6b25643e95a0..c6bd452be5da 100644 --- a/crates/consensus/beacon/src/engine/sync.rs +++ b/crates/consensus/beacon/src/engine/sync.rs @@ -398,13 +398,14 @@ mod tests { use reth_interfaces::{p2p::either::EitherDownloader, test_utils::TestFullBlockClient}; use reth_primitives::{ constants::ETHEREUM_BLOCK_GAS_LIMIT, stage::StageCheckpoint, BlockBody, ChainSpecBuilder, - Header, SealedHeader, MAINNET, + Header, PruneModes, SealedHeader, MAINNET, }; use reth_provider::{ test_utils::{create_test_provider_factory_with_chain_spec, TestExecutorFactory}, BundleStateWithReceipts, }; use reth_stages::{test_utils::TestStages, ExecOutput, StageError}; + use reth_static_file::StaticFileProducer; use reth_tasks::TokioTaskExecutor; use std::{collections::VecDeque, future::poll_fn, ops::Range}; use tokio::sync::watch; @@ -465,7 +466,15 @@ mod tests { pipeline = pipeline.with_max_block(max_block); } - pipeline.build(create_test_provider_factory_with_chain_spec(chain_spec)) + let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec); + + let static_file_producer = StaticFileProducer::new( + provider_factory.clone(), + provider_factory.static_file_provider(), + PruneModes::default(), + ); + + pipeline.build(provider_factory, static_file_producer) } } diff --git a/crates/consensus/beacon/src/engine/test_utils.rs b/crates/consensus/beacon/src/engine/test_utils.rs index cc656bac9c2a..e491b1fe33ef 100644 --- a/crates/consensus/beacon/src/engine/test_utils.rs +++ b/crates/consensus/beacon/src/engine/test_utils.rs @@ -407,7 +407,6 @@ where header_downloader, body_downloader, executor_factory.clone(), - static_file_producer, ) .expect("should build"), ) @@ -418,7 +417,7 @@ where pipeline = pipeline.with_max_block(max_block); } - let pipeline = pipeline.build(provider_factory.clone()); + let pipeline = pipeline.build(provider_factory.clone(), static_file_producer); // Setup blockchain tree let externals = TreeExternals::new(provider_factory.clone(), consensus, executor_factory); diff --git a/crates/node-core/src/node_config.rs b/crates/node-core/src/node_config.rs index 5c705d49daf6..1b5a8da2a74a 100644 --- a/crates/node-core/src/node_config.rs +++ b/crates/node-core/src/node_config.rs @@ -847,7 +847,6 @@ impl NodeConfig { header_downloader, body_downloader, factory.clone(), - static_file_producer, )? .set(SenderRecoveryStage { commit_threshold: stage_config.sender_recovery.commit_threshold, @@ -892,7 +891,7 @@ impl NodeConfig { prune_modes.storage_history, )), ) - .build(provider_factory); + .build(provider_factory, static_file_producer); Ok(pipeline) } diff --git a/crates/stages/src/error.rs b/crates/stages/src/error.rs index fa43c88fadc6..e8a5e3a71ff7 100644 --- a/crates/stages/src/error.rs +++ b/crates/stages/src/error.rs @@ -163,7 +163,7 @@ pub enum PipelineError { /// The pipeline encountered an error while trying to send an event. #[error("pipeline encountered an error while trying to send an event")] Channel(#[from] Box>), - /// The stage encountered an internal error. + /// Internal error #[error(transparent)] - Internal(Box), + Internal(#[from] RethError), } diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 8f90b080429d..c2bceceee2bb 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -59,11 +59,10 @@ //! headers_downloader, //! bodies_downloader, //! executor_factory, -//! static_file_producer, //! ) //! .unwrap(), //! ) -//! .build(provider_factory); +//! .build(provider_factory, static_file_producer); //! ``` //! //! ## Feature Flags diff --git a/crates/stages/src/pipeline/builder.rs b/crates/stages/src/pipeline/builder.rs index 3e160577fddc..e76f76c604c8 100644 --- a/crates/stages/src/pipeline/builder.rs +++ b/crates/stages/src/pipeline/builder.rs @@ -2,6 +2,7 @@ use crate::{pipeline::BoxedStage, MetricEventsSender, Pipeline, Stage, StageSet} use reth_db::database::Database; use reth_primitives::{stage::StageId, BlockNumber, B256}; use reth_provider::ProviderFactory; +use reth_static_file::StaticFileProducer; use tokio::sync::watch; /// Builds a [`Pipeline`]. @@ -67,12 +68,17 @@ where } /// Builds the final [`Pipeline`] using the given database. - pub fn build(self, provider_factory: ProviderFactory) -> Pipeline { + pub fn build( + self, + provider_factory: ProviderFactory, + static_file_producer: StaticFileProducer, + ) -> Pipeline { let Self { stages, max_block, tip_tx, metrics_tx } = self; Pipeline { provider_factory, stages, max_block, + static_file_producer, tip_tx, listeners: Default::default(), progress: Default::default(), diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index d1e24d14c140..40d010f48608 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -3,14 +3,17 @@ use crate::{ }; use futures_util::Future; use reth_db::database::Database; +use reth_interfaces::RethResult; use reth_primitives::{ constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH, stage::{StageCheckpoint, StageId}, + static_file::HighestStaticFiles, BlockNumber, B256, }; use reth_provider::{ providers::StaticFileWriter, ProviderFactory, StageCheckpointReader, StageCheckpointWriter, }; +use reth_static_file::StaticFileProducer; use reth_tokio_util::EventListeners; use std::pin::Pin; use tokio::sync::watch; @@ -68,6 +71,7 @@ pub struct Pipeline { stages: Vec>, /// The maximum block number to sync to. max_block: Option, + static_file_producer: StaticFileProducer, /// All listeners for events the pipeline emits. listeners: EventListeners, /// Keeps track of the progress of the pipeline. @@ -179,6 +183,8 @@ where /// pipeline (for example the `Finish` stage). Or [ControlFlow::Unwind] of the stage that caused /// the unwind. pub async fn run_loop(&mut self) -> Result { + self.produce_static_files()?; + let mut previous_stage = None; for stage_index in 0..self.stages.len() { let stage = &self.stages[stage_index]; @@ -214,6 +220,33 @@ where Ok(self.progress.next_ctrl()) } + /// Run [static file producer](StaticFileProducer) and move all data from the database to static + /// files for corresponding [segments](reth_primitives::static_file::StaticFileSegment), + /// according to their [stage checkpoints](StageCheckpoint): + /// - [StaticFileSegment::Headers](reth_primitives::static_file::StaticFileSegment::Headers) -> + /// [StageId::Headers] + /// - [StaticFileSegment::Receipts](reth_primitives::static_file::StaticFileSegment::Receipts) + /// -> [StageId::Execution] + /// - [StaticFileSegment::Transactions](reth_primitives::static_file::StaticFileSegment::Transactions) + /// -> [StageId::Bodies] + fn produce_static_files(&mut self) -> RethResult<()> { + let provider = self.provider_factory.provider()?; + let targets = self.static_file_producer.get_static_file_targets(HighestStaticFiles { + headers: provider + .get_stage_checkpoint(StageId::Headers)? + .map(|checkpoint| checkpoint.block_number), + receipts: provider + .get_stage_checkpoint(StageId::Execution)? + .map(|checkpoint| checkpoint.block_number), + transactions: provider + .get_stage_checkpoint(StageId::Bodies)? + .map(|checkpoint| checkpoint.block_number), + })?; + self.static_file_producer.run(targets)?; + + Ok(()) + } + /// Unwind the stages to the target block. /// /// If the unwind is due to a bad block the number of that block should be specified. @@ -508,6 +541,7 @@ mod tests { provider::ProviderError, test_utils::{generators, generators::random_header}, }; + use reth_primitives::PruneModes; use reth_provider::test_utils::create_test_provider_factory; use tokio_stream::StreamExt; @@ -553,7 +587,14 @@ mod tests { .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), ) .with_max_block(10) - .build(provider_factory); + .build( + provider_factory.clone(), + StaticFileProducer::new( + provider_factory.clone(), + provider_factory.static_file_provider(), + PruneModes::default(), + ), + ); let events = pipeline.events(); // Run pipeline @@ -613,7 +654,14 @@ mod tests { .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })), ) .with_max_block(10) - .build(provider_factory); + .build( + provider_factory.clone(), + StaticFileProducer::new( + provider_factory.clone(), + provider_factory.static_file_provider(), + PruneModes::default(), + ), + ); let events = pipeline.events(); // Run pipeline @@ -720,7 +768,14 @@ mod tests { .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), ) .with_max_block(10) - .build(provider_factory); + .build( + provider_factory.clone(), + StaticFileProducer::new( + provider_factory.clone(), + provider_factory.static_file_provider(), + PruneModes::default(), + ), + ); let events = pipeline.events(); // Run pipeline @@ -817,7 +872,14 @@ mod tests { .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), ) .with_max_block(10) - .build(provider_factory); + .build( + provider_factory.clone(), + StaticFileProducer::new( + provider_factory.clone(), + provider_factory.static_file_provider(), + PruneModes::default(), + ), + ); let events = pipeline.events(); // Run pipeline @@ -897,7 +959,14 @@ mod tests { .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), ) .with_max_block(10) - .build(provider_factory); + .build( + provider_factory.clone(), + StaticFileProducer::new( + provider_factory.clone(), + provider_factory.static_file_provider(), + PruneModes::default(), + ), + ); let result = pipeline.run().await; assert_matches!(result, Ok(())); @@ -907,7 +976,14 @@ mod tests { .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err( StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)), ))) - .build(provider_factory); + .build( + provider_factory.clone(), + StaticFileProducer::new( + provider_factory.clone(), + provider_factory.static_file_provider(), + PruneModes::default(), + ), + ); let result = pipeline.run().await; assert_matches!( result, diff --git a/crates/stages/src/sets.rs b/crates/stages/src/sets.rs index 60394e30f12e..1b029b11e534 100644 --- a/crates/stages/src/sets.rs +++ b/crates/stages/src/sets.rs @@ -13,14 +13,22 @@ //! # use reth_stages::Pipeline; //! # use reth_stages::sets::{OfflineStages}; //! # use reth_revm::EvmProcessorFactory; -//! # use reth_primitives::MAINNET; +//! # use reth_primitives::{PruneModes, MAINNET}; //! # use reth_node_ethereum::EthEvmConfig; //! # use reth_provider::test_utils::create_test_provider_factory; +//! # use reth_static_file::StaticFileProducer; //! //! # let executor_factory = EvmProcessorFactory::new(MAINNET.clone(), EthEvmConfig::default()); //! # let provider_factory = create_test_provider_factory(); +//! # let static_file_producer = StaticFileProducer::new( +//! provider_factory.clone(), +//! provider_factory.static_file_provider(), +//! PruneModes::default(), +//! ); //! // Build a pipeline with all offline stages. -//! # let pipeline = Pipeline::builder().add_stages(OfflineStages::new(executor_factory)).build(provider_factory); +//! # let pipeline = Pipeline::builder() +//! .add_stages(OfflineStages::new(executor_factory)) +//! .build(provider_factory, static_file_producer); //! ``` //! //! ```ignore @@ -42,7 +50,7 @@ use crate::{ stages::{ AccountHashingStage, BodyStage, ExecutionStage, FinishStage, HeaderStage, IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, - StaticFileStage, StorageHashingStage, TransactionLookupStage, + StorageHashingStage, TransactionLookupStage, }, StageError, StageSet, StageSetBuilder, }; @@ -52,7 +60,6 @@ use reth_interfaces::{ p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader}, }; use reth_provider::{ExecutorFactory, HeaderSyncGapProvider, HeaderSyncMode}; -use reth_static_file::StaticFileProducer; use std::sync::Arc; use tempfile::TempDir; @@ -60,13 +67,11 @@ use tempfile::TempDir; /// /// A combination of (in order) /// -/// - [`StaticFileStage`] /// - [`OnlineStages`] /// - [`OfflineStages`] /// - [`FinishStage`] /// /// This expands to the following series of stages: -/// - [`StaticFileStage`] /// - [`HeaderStage`] /// - [`BodyStage`] /// - [`SenderRecoveryStage`] @@ -80,15 +85,14 @@ use tempfile::TempDir; /// - [`IndexAccountHistoryStage`] /// - [`FinishStage`] #[derive(Debug)] -pub struct DefaultStages { +pub struct DefaultStages { /// Configuration for the online stages online: OnlineStages, /// Executor factory needs for execution stage executor_factory: EF, - static_file_producer: StaticFileProducer, } -impl DefaultStages { +impl DefaultStages { /// Create a new set of default stages with default values. pub fn new( provider: Provider, @@ -97,7 +101,6 @@ impl DefaultStages { header_downloader: H, body_downloader: B, executor_factory: EF, - static_file_producer: StaticFileProducer, ) -> Result where EF: ExecutorFactory, @@ -112,31 +115,27 @@ impl DefaultStages { Arc::new(TempDir::new()?), ), executor_factory, - static_file_producer, }) } } -impl DefaultStages +impl DefaultStages where EF: ExecutorFactory, - DB: Database + 'static, { /// Appends the default offline stages and default finish stage to the given builder. - pub fn add_offline_stages( + pub fn add_offline_stages( default_offline: StageSetBuilder, executor_factory: EF, - static_file_producer: StaticFileProducer, ) -> StageSetBuilder { StageSetBuilder::default() - .add_stage(StaticFileStage::new(static_file_producer)) .add_set(default_offline) .add_set(OfflineStages::new(executor_factory)) .add_stage(FinishStage) } } -impl StageSet for DefaultStages +impl StageSet for DefaultStages where Provider: HeaderSyncGapProvider + 'static, H: HeaderDownloader + 'static, @@ -145,11 +144,7 @@ where DB: Database + 'static, { fn builder(self) -> StageSetBuilder { - Self::add_offline_stages( - self.online.builder(), - self.executor_factory, - self.static_file_producer, - ) + Self::add_offline_stages(self.online.builder(), self.executor_factory) } } diff --git a/crates/stages/src/stages/mod.rs b/crates/stages/src/stages/mod.rs index a1c0c7c5469e..c024230bcc18 100644 --- a/crates/stages/src/stages/mod.rs +++ b/crates/stages/src/stages/mod.rs @@ -18,8 +18,6 @@ mod index_storage_history; mod merkle; /// The sender recovery stage. mod sender_recovery; -/// The static file stage. -mod static_file; /// The transaction lookup stage mod tx_lookup; @@ -33,7 +31,6 @@ pub use index_account_history::*; pub use index_storage_history::*; pub use merkle::*; pub use sender_recovery::*; -pub use static_file::*; pub use tx_lookup::*; #[cfg(test)] diff --git a/crates/stages/src/stages/static_file.rs b/crates/stages/src/stages/static_file.rs deleted file mode 100644 index 7d8ca87c6241..000000000000 --- a/crates/stages/src/stages/static_file.rs +++ /dev/null @@ -1,57 +0,0 @@ -use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; -use reth_db::database::Database; -use reth_primitives::{ - stage::{StageCheckpoint, StageId}, - static_file::HighestStaticFiles, -}; -use reth_provider::{DatabaseProviderRW, StageCheckpointReader}; -use reth_static_file::StaticFileProducer; - -/// The static file stage _copies_ all data from database to static files using -/// [StaticFileProducer]. The block range for copying is determined by the current highest blocks -/// contained in static files and stage checkpoints for each segment individually. -#[derive(Debug)] -pub struct StaticFileStage { - static_file_producer: StaticFileProducer, -} - -impl StaticFileStage { - /// Creates a new static file stage. - pub fn new(static_file_producer: StaticFileProducer) -> Self { - Self { static_file_producer } - } -} - -impl Stage for StaticFileStage { - fn id(&self) -> StageId { - StageId::StaticFile - } - - fn execute( - &mut self, - provider: &DatabaseProviderRW, - input: ExecInput, - ) -> Result { - let targets = self.static_file_producer.get_static_file_targets(HighestStaticFiles { - headers: provider - .get_stage_checkpoint(StageId::Headers)? - .map(|checkpoint| checkpoint.block_number), - receipts: provider - .get_stage_checkpoint(StageId::Execution)? - .map(|checkpoint| checkpoint.block_number), - transactions: provider - .get_stage_checkpoint(StageId::Bodies)? - .map(|checkpoint| checkpoint.block_number), - })?; - self.static_file_producer.run(targets)?; - Ok(ExecOutput::done(input.checkpoint())) - } - - fn unwind( - &mut self, - _provider: &DatabaseProviderRW, - input: UnwindInput, - ) -> Result { - Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) }) - } -}