Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stages): remove static files stage, run before loop #6763

Merged
merged 5 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions bin/reth/src/commands/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ impl Command {
header_downloader,
body_downloader,
factory.clone(),
static_file_producer,
)?
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
Expand All @@ -148,7 +147,7 @@ impl Command {
config.prune.clone().map(|prune| prune.segments).unwrap_or_default(),
)),
)
.build(provider_factory);
.build(provider_factory, static_file_producer);

Ok(pipeline)
}
Expand Down
12 changes: 10 additions & 2 deletions bin/reth/src/commands/debug_cmd/replay_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ use reth_node_ethereum::{EthEngineTypes, EthEvmConfig};
#[cfg(feature = "optimism")]
use reth_node_optimism::{OptimismEngineTypes, OptimismEvmConfig};
use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService};
use reth_primitives::{fs, ChainSpec};
use reth_primitives::{fs, ChainSpec, PruneModes};
use reth_provider::{providers::BlockchainProvider, CanonStateSubscriptions, ProviderFactory};
use reth_revm::EvmProcessorFactory;
use reth_stages::Pipeline;
use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;
use reth_transaction_pool::noop::NoopTransactionPool;
use std::{
Expand Down Expand Up @@ -196,7 +197,14 @@ impl Command {
let (consensus_engine_tx, consensus_engine_rx) = mpsc::unbounded_channel();
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
network_client,
Pipeline::builder().build(provider_factory),
Pipeline::builder().build(
provider_factory.clone(),
StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
PruneModes::default(),
),
),
Comment on lines +200 to +207
Copy link
Member

@gakonst gakonst Feb 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would like to improve the builder pattern here such that we don't have 3 mentions of provider factory in the call -- not a blocker

Copy link
Collaborator Author

@shekhirin shekhirin Feb 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a bit of an unreal example because of debug cmd. in practice, it looks like this

// configure static_file_producer
let static_file_producer = reth_static_file::StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
config.prune_config()?.unwrap_or_default().segments,
);

.build(provider_factory, static_file_producer);

but I agree that we can pass only provider_factory and prune_modes inside the StaticFileProducer::new

blockchain_db.clone(),
Box::new(ctx.task_executor.clone()),
Box::new(network),
Expand Down
3 changes: 1 addition & 2 deletions bin/reth/src/commands/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ impl ImportCommand {
header_downloader,
body_downloader,
factory.clone(),
static_file_producer,
)?
.set(SenderRecoveryStage {
commit_threshold: config.stages.sender_recovery.commit_threshold,
Expand All @@ -213,7 +212,7 @@ impl ImportCommand {
config.prune.map(|prune| prune.segments).unwrap_or_default(),
)),
)
.build(provider_factory);
.build(provider_factory, static_file_producer);

let events = pipeline.events().map(Into::into);

Expand Down
13 changes: 11 additions & 2 deletions crates/consensus/beacon/src/engine/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,13 +398,14 @@ mod tests {
use reth_interfaces::{p2p::either::EitherDownloader, test_utils::TestFullBlockClient};
use reth_primitives::{
constants::ETHEREUM_BLOCK_GAS_LIMIT, stage::StageCheckpoint, BlockBody, ChainSpecBuilder,
Header, SealedHeader, MAINNET,
Header, PruneModes, SealedHeader, MAINNET,
};
use reth_provider::{
test_utils::{create_test_provider_factory_with_chain_spec, TestExecutorFactory},
BundleStateWithReceipts,
};
use reth_stages::{test_utils::TestStages, ExecOutput, StageError};
use reth_static_file::StaticFileProducer;
use reth_tasks::TokioTaskExecutor;
use std::{collections::VecDeque, future::poll_fn, ops::Range};
use tokio::sync::watch;
Expand Down Expand Up @@ -465,7 +466,15 @@ mod tests {
pipeline = pipeline.with_max_block(max_block);
}

pipeline.build(create_test_provider_factory_with_chain_spec(chain_spec))
let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec);

let static_file_producer = StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
PruneModes::default(),
);

pipeline.build(provider_factory, static_file_producer)
}
}

Expand Down
3 changes: 1 addition & 2 deletions crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,6 @@ where
header_downloader,
body_downloader,
executor_factory.clone(),
static_file_producer,
)
.expect("should build"),
)
Expand All @@ -417,7 +416,7 @@ where
pipeline = pipeline.with_max_block(max_block);
}

let pipeline = pipeline.build(provider_factory.clone());
let pipeline = pipeline.build(provider_factory.clone(), static_file_producer);

// Setup blockchain tree
let externals = TreeExternals::new(provider_factory.clone(), consensus, executor_factory);
Expand Down
3 changes: 1 addition & 2 deletions crates/node-core/src/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,6 @@ impl NodeConfig {
header_downloader,
body_downloader,
factory.clone(),
static_file_producer,
)?
.set(SenderRecoveryStage {
commit_threshold: stage_config.sender_recovery.commit_threshold,
Expand Down Expand Up @@ -888,7 +887,7 @@ impl NodeConfig {
prune_modes.storage_history,
)),
)
.build(provider_factory);
.build(provider_factory, static_file_producer);

Ok(pipeline)
}
Expand Down
4 changes: 2 additions & 2 deletions crates/stages/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pub enum PipelineError {
/// The pipeline encountered an error while trying to send an event.
#[error("pipeline encountered an error while trying to send an event")]
Channel(#[from] Box<SendError<PipelineEvent>>),
/// The stage encountered an internal error.
/// Internal error
#[error(transparent)]
Internal(Box<dyn std::error::Error + Send + Sync>),
Internal(#[from] RethError),
}
3 changes: 1 addition & 2 deletions crates/stages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,10 @@
//! headers_downloader,
//! bodies_downloader,
//! executor_factory,
//! static_file_producer,
//! )
//! .unwrap(),
//! )
//! .build(provider_factory);
//! .build(provider_factory, static_file_producer);
//! ```
//!
//! ## Feature Flags
Expand Down
8 changes: 7 additions & 1 deletion crates/stages/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{pipeline::BoxedStage, MetricEventsSender, Pipeline, Stage, StageSet}
use reth_db::database::Database;
use reth_primitives::{stage::StageId, BlockNumber, B256};
use reth_provider::ProviderFactory;
use reth_static_file::StaticFileProducer;
use tokio::sync::watch;

/// Builds a [`Pipeline`].
Expand Down Expand Up @@ -67,12 +68,17 @@ where
}

/// Builds the final [`Pipeline`] using the given database.
pub fn build(self, provider_factory: ProviderFactory<DB>) -> Pipeline<DB> {
pub fn build(
self,
provider_factory: ProviderFactory<DB>,
static_file_producer: StaticFileProducer<DB>,
) -> Pipeline<DB> {
let Self { stages, max_block, tip_tx, metrics_tx } = self;
Pipeline {
provider_factory,
stages,
max_block,
static_file_producer,
tip_tx,
listeners: Default::default(),
progress: Default::default(),
Expand Down
88 changes: 82 additions & 6 deletions crates/stages/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ use crate::{
};
use futures_util::Future;
use reth_db::database::Database;
use reth_interfaces::RethResult;
use reth_primitives::{
constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH,
stage::{StageCheckpoint, StageId},
static_file::HighestStaticFiles,
BlockNumber, B256,
};
use reth_provider::{
providers::StaticFileWriter, ProviderFactory, StageCheckpointReader, StageCheckpointWriter,
};
use reth_static_file::StaticFileProducer;
use reth_tokio_util::EventListeners;
use std::pin::Pin;
use tokio::sync::watch;
Expand Down Expand Up @@ -68,6 +71,7 @@ pub struct Pipeline<DB: Database> {
stages: Vec<BoxedStage<DB>>,
/// The maximum block number to sync to.
max_block: Option<BlockNumber>,
static_file_producer: StaticFileProducer<DB>,
/// All listeners for events the pipeline emits.
listeners: EventListeners<PipelineEvent>,
/// Keeps track of the progress of the pipeline.
Expand Down Expand Up @@ -179,6 +183,8 @@ where
/// pipeline (for example the `Finish` stage). Or [ControlFlow::Unwind] of the stage that caused
/// the unwind.
pub async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
self.produce_static_files()?;

let mut previous_stage = None;
for stage_index in 0..self.stages.len() {
let stage = &self.stages[stage_index];
Expand Down Expand Up @@ -214,6 +220,33 @@ where
Ok(self.progress.next_ctrl())
}

/// Run [static file producer](StaticFileProducer) and move all data from the database to static
/// files for corresponding [segments](reth_primitives::static_file::StaticFileSegment),
/// according to their [stage checkpoints](StageCheckpoint):
/// - [StaticFileSegment::Headers](reth_primitives::static_file::StaticFileSegment::Headers) ->
/// [StageId::Headers]
/// - [StaticFileSegment::Receipts](reth_primitives::static_file::StaticFileSegment::Receipts)
/// -> [StageId::Execution]
/// - [StaticFileSegment::Transactions](reth_primitives::static_file::StaticFileSegment::Transactions)
/// -> [StageId::Bodies]
fn produce_static_files(&mut self) -> RethResult<()> {
let provider = self.provider_factory.provider()?;
let targets = self.static_file_producer.get_static_file_targets(HighestStaticFiles {
headers: provider
.get_stage_checkpoint(StageId::Headers)?
.map(|checkpoint| checkpoint.block_number),
receipts: provider
.get_stage_checkpoint(StageId::Execution)?
.map(|checkpoint| checkpoint.block_number),
transactions: provider
.get_stage_checkpoint(StageId::Bodies)?
.map(|checkpoint| checkpoint.block_number),
})?;
self.static_file_producer.run(targets)?;

Ok(())
}

/// Unwind the stages to the target block.
///
/// If the unwind is due to a bad block the number of that block should be specified.
Expand Down Expand Up @@ -508,6 +541,7 @@ mod tests {
provider::ProviderError,
test_utils::{generators, generators::random_header},
};
use reth_primitives::PruneModes;
use reth_provider::test_utils::create_test_provider_factory;
use tokio_stream::StreamExt;

Expand Down Expand Up @@ -553,7 +587,14 @@ mod tests {
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
)
.with_max_block(10)
.build(provider_factory);
.build(
provider_factory.clone(),
StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
PruneModes::default(),
),
);
let events = pipeline.events();

// Run pipeline
Expand Down Expand Up @@ -613,7 +654,14 @@ mod tests {
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })),
)
.with_max_block(10)
.build(provider_factory);
.build(
provider_factory.clone(),
StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
PruneModes::default(),
),
);
let events = pipeline.events();

// Run pipeline
Expand Down Expand Up @@ -720,7 +768,14 @@ mod tests {
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
)
.with_max_block(10)
.build(provider_factory);
.build(
provider_factory.clone(),
StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
PruneModes::default(),
),
);
let events = pipeline.events();

// Run pipeline
Expand Down Expand Up @@ -817,7 +872,14 @@ mod tests {
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
)
.with_max_block(10)
.build(provider_factory);
.build(
provider_factory.clone(),
StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
PruneModes::default(),
),
);
let events = pipeline.events();

// Run pipeline
Expand Down Expand Up @@ -897,7 +959,14 @@ mod tests {
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
)
.with_max_block(10)
.build(provider_factory);
.build(
provider_factory.clone(),
StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
PruneModes::default(),
),
);
let result = pipeline.run().await;
assert_matches!(result, Ok(()));

Expand All @@ -907,7 +976,14 @@ mod tests {
.add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err(
StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)),
)))
.build(provider_factory);
.build(
provider_factory.clone(),
StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
PruneModes::default(),
),
);
let result = pipeline.run().await;
assert_matches!(
result,
Expand Down
Loading
Loading