Skip to content

Commit

Permalink
HeaderStage::new with Result (propagates all stages)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo committed Jan 24, 2024
1 parent d527fd9 commit e218e52
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 67 deletions.
4 changes: 2 additions & 2 deletions bin/reth/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ impl NodeConfig {
)
.set(SenderRecoveryStage {
commit_threshold: stage_config.sender_recovery.commit_threshold,
})
})?
.set(
ExecutionStage::new(
factory,
Expand Down Expand Up @@ -934,7 +934,7 @@ impl NodeConfig {
stage_config.index_storage_history.commit_threshold,
prune_modes.storage_history,
)),
)
)?
.build(provider_factory);

Ok(pipeline)
Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/commands/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl Command {
)
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
})
})?
.set(ExecutionStage::new(
factory,
ExecutionStageThresholds {
Expand All @@ -141,7 +141,7 @@ impl Command {
.max(stage_conf.storage_hashing.clean_threshold),
config.prune.clone().map(|prune| prune.segments).unwrap_or_default(),
)),
)
)?
.build(provider_factory);

Ok(pipeline)
Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/commands/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl ImportCommand {
)
.set(SenderRecoveryStage {
commit_threshold: config.stages.sender_recovery.commit_threshold,
})
})?
.set(ExecutionStage::new(
factory,
ExecutionStageThresholds {
Expand All @@ -189,7 +189,7 @@ impl ImportCommand {
.max(config.stages.storage_hashing.clean_threshold),
config.prune.map(|prune| prune.segments).unwrap_or_default(),
)),
)
)?
.build(provider_factory);

let events = pipeline.events().map(Into::into);
Expand Down
1 change: 1 addition & 0 deletions crates/consensus/beacon/src/engine/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ mod tests {
let (tip_tx, _tip_rx) = watch::channel(B256::default());
let mut pipeline = Pipeline::builder()
.add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default()))
.unwrap()
.with_tip_sender(tip_tx);

if let Some(max_block) = self.max_block {
Expand Down
19 changes: 11 additions & 8 deletions crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ where
let mut pipeline = match self.base_config.pipeline_config {
TestPipelineConfig::Test(outputs) => Pipeline::builder()
.add_stages(TestStages::new(outputs, Default::default()))
.unwrap()
.with_tip_sender(tip_tx),
TestPipelineConfig::Real => {
let header_downloader = ReverseHeadersDownloaderBuilder::default()
Expand All @@ -492,14 +493,16 @@ where
.build(client.clone(), consensus.clone(), provider_factory.clone())
.into_task();

Pipeline::builder().add_stages(DefaultStages::new(
ProviderFactory::new(db.clone(), self.base_config.chain_spec.clone()),
HeaderSyncMode::Tip(tip_rx.clone()),
Arc::clone(&consensus),
header_downloader,
body_downloader,
executor_factory.clone(),
))
Pipeline::builder()
.add_stages(DefaultStages::new(
ProviderFactory::new(db.clone(), self.base_config.chain_spec.clone()),
HeaderSyncMode::Tip(tip_rx.clone()),
Arc::clone(&consensus),
header_downloader,
body_downloader,
executor_factory.clone(),
))
.unwrap()
}
};

Expand Down
2 changes: 1 addition & 1 deletion crates/stages/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl StageError {

impl From<std::io::Error> for StageError {
fn from(source: std::io::Error) -> Self {
StageError::Fatal(Box::new(source))
StageError::Fatal(Box::new(source))
}
}

Expand Down
8 changes: 4 additions & 4 deletions crates/stages/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{pipeline::BoxedStage, MetricEventsSender, Pipeline, Stage, StageSet};
use crate::{pipeline::BoxedStage, MetricEventsSender, Pipeline, Stage, StageError, StageSet};
use reth_db::database::Database;
use reth_primitives::{stage::StageId, BlockNumber, B256};
use reth_provider::ProviderFactory;
Expand Down Expand Up @@ -39,11 +39,11 @@ where
/// To customize the stages in the set (reorder, disable, insert a stage) call
/// [`builder`][StageSet::builder] on the set which will convert it to a
/// [`StageSetBuilder`][crate::StageSetBuilder].
pub fn add_stages<Set: StageSet<DB>>(mut self, set: Set) -> Self {
for stage in set.builder().build() {
pub fn add_stages<Set: StageSet<DB>>(mut self, set: Set) -> Result<Self, StageError> {
for stage in set.builder()?.build() {
self.stages.push(stage);
}
self
Ok(self)
}

/// Set the target block.
Expand Down
18 changes: 9 additions & 9 deletions crates/stages/src/pipeline/set.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::Stage;
use crate::{Stage, StageError};
use reth_db::database::Database;
use reth_primitives::stage::StageId;
use std::{
Expand All @@ -14,15 +14,15 @@ use std::{
/// Individual stages in the set can be added, removed and overridden using [`StageSetBuilder`].
pub trait StageSet<DB: Database>: Sized {
/// Configures the stages in the set.
fn builder(self) -> StageSetBuilder<DB>;
fn builder(self) -> Result<StageSetBuilder<DB>, StageError>;

/// Overrides the given [`Stage`], if it is in this set.
///
/// # Panics
///
/// Panics if the [`Stage`] is not in this set.
fn set<S: Stage<DB> + 'static>(self, stage: S) -> StageSetBuilder<DB> {
self.builder().set(stage)
fn set<S: Stage<DB> + 'static>(self, stage: S) -> Result<StageSetBuilder<DB>, StageError> {
Ok(self.builder()?.set(stage))
}
}

Expand Down Expand Up @@ -119,13 +119,13 @@ where
///
/// If a stage is in both sets, it is removed from its previous place in this set. Because of
/// this, it is advisable to merge sets first and re-order stages after if needed.
pub fn add_set<Set: StageSet<DB>>(mut self, set: Set) -> Self {
for stage in set.builder().build() {
pub fn add_set<Set: StageSet<DB>>(mut self, set: Set) -> Result<Self, StageError> {
for stage in set.builder()?.build() {
let target_index = self.order.len();
self.order.push(stage.id());
self.upsert_stage_state(stage, target_index);
}
self
Ok(self)
}

/// Adds the given [`Stage`] before the stage with the given [`StageId`].
Expand Down Expand Up @@ -215,7 +215,7 @@ where
}

impl<DB: Database> StageSet<DB> for StageSetBuilder<DB> {
fn builder(self) -> StageSetBuilder<DB> {
self
fn builder(self) -> Result<StageSetBuilder<DB>, StageError> {
Ok(self)
}
}
50 changes: 25 additions & 25 deletions crates/stages/src/sets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::{
IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage,
StorageHashingStage, TransactionLookupStage,
},
StageSet, StageSetBuilder,
StageError, StageSet, StageSetBuilder,
};
use reth_db::database::Database;
use reth_interfaces::{
Expand Down Expand Up @@ -115,8 +115,8 @@ where
pub fn add_offline_stages<DB: Database>(
default_offline: StageSetBuilder<DB>,
executor_factory: EF,
) -> StageSetBuilder<DB> {
default_offline.add_set(OfflineStages::new(executor_factory)).add_stage(FinishStage)
) -> Result<StageSetBuilder<DB>, StageError> {
Ok(default_offline.add_set(OfflineStages::new(executor_factory))?.add_stage(FinishStage))
}
}

Expand All @@ -128,8 +128,8 @@ where
B: BodyDownloader + 'static,
EF: ExecutorFactory,
{
fn builder(self) -> StageSetBuilder<DB> {
Self::add_offline_stages(self.online.builder(), self.executor_factory)
fn builder(self) -> Result<StageSetBuilder<DB>, StageError> {
Self::add_offline_stages(self.online.builder()?, self.executor_factory)
}
}

Expand Down Expand Up @@ -185,10 +185,10 @@ where
mode: HeaderSyncMode,
header_downloader: H,
consensus: Arc<dyn Consensus>,
) -> StageSetBuilder<DB> {
StageSetBuilder::default()
.add_stage(HeaderStage::new(provider, header_downloader, mode, consensus.clone()))
.add_stage(bodies)
) -> Result<StageSetBuilder<DB>, StageError> {
Ok(StageSetBuilder::default()
.add_stage(HeaderStage::new(provider, header_downloader, mode, consensus.clone())?)
.add_stage(bodies))
}
}

Expand All @@ -199,15 +199,15 @@ where
H: HeaderDownloader + 'static,
B: BodyDownloader + 'static,
{
fn builder(self) -> StageSetBuilder<DB> {
StageSetBuilder::default()
fn builder(self) -> Result<StageSetBuilder<DB>, StageError> {
Ok(StageSetBuilder::default()
.add_stage(HeaderStage::new(
self.provider,
self.header_downloader,
self.header_mode,
self.consensus.clone(),
))
.add_stage(BodyStage::new(self.body_downloader))
)?)
.add_stage(BodyStage::new(self.body_downloader)))
}
}

Expand All @@ -233,10 +233,10 @@ impl<EF: ExecutorFactory> OfflineStages<EF> {
}

impl<EF: ExecutorFactory, DB: Database> StageSet<DB> for OfflineStages<EF> {
fn builder(self) -> StageSetBuilder<DB> {
fn builder(self) -> Result<StageSetBuilder<DB>, StageError> {
ExecutionStages::new(self.executor_factory)
.builder()
.add_set(HashingStages)
.builder()?
.add_set(HashingStages)?
.add_set(HistoryIndexingStages)
}
}
Expand All @@ -257,10 +257,10 @@ impl<EF: ExecutorFactory + 'static> ExecutionStages<EF> {
}

impl<EF: ExecutorFactory, DB: Database> StageSet<DB> for ExecutionStages<EF> {
fn builder(self) -> StageSetBuilder<DB> {
StageSetBuilder::default()
fn builder(self) -> Result<StageSetBuilder<DB>, StageError> {
Ok(StageSetBuilder::default()
.add_stage(SenderRecoveryStage::default())
.add_stage(ExecutionStage::new_with_factory(self.executor_factory))
.add_stage(ExecutionStage::new_with_factory(self.executor_factory)))
}
}

Expand All @@ -270,12 +270,12 @@ impl<EF: ExecutorFactory, DB: Database> StageSet<DB> for ExecutionStages<EF> {
pub struct HashingStages;

impl<DB: Database> StageSet<DB> for HashingStages {
fn builder(self) -> StageSetBuilder<DB> {
StageSetBuilder::default()
fn builder(self) -> Result<StageSetBuilder<DB>, StageError> {
Ok(StageSetBuilder::default()
.add_stage(MerkleStage::default_unwind())
.add_stage(AccountHashingStage::default())
.add_stage(StorageHashingStage::default())
.add_stage(MerkleStage::default_execution())
.add_stage(MerkleStage::default_execution()))
}
}

Expand All @@ -285,10 +285,10 @@ impl<DB: Database> StageSet<DB> for HashingStages {
pub struct HistoryIndexingStages;

impl<DB: Database> StageSet<DB> for HistoryIndexingStages {
fn builder(self) -> StageSetBuilder<DB> {
StageSetBuilder::default()
fn builder(self) -> Result<StageSetBuilder<DB>, StageError> {
Ok(StageSetBuilder::default()
.add_stage(TransactionLookupStage::default())
.add_stage(IndexStorageHistoryStage::default())
.add_stage(IndexAccountHistoryStage::default())
.add_stage(IndexAccountHistoryStage::default()))
}
}
19 changes: 8 additions & 11 deletions crates/stages/src/stages/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,17 @@ where
downloader: Downloader,
mode: HeaderSyncMode,
consensus: Arc<dyn Consensus>,
) -> Self {
Self {
) -> Result<Self, StageError> {
Ok(Self {
provider: database,
downloader,
mode,
consensus,
sync_gap: None,
hash_collector: Collector::new(100 * (1024 * 1024)).unwrap(),
header_collector: Collector::new(100 * (1024 * 1024)).unwrap(),
hash_collector: Collector::new(100 * (1024 * 1024))?,
header_collector: Collector::new(100 * (1024 * 1024))?,
is_etl_ready: false,
}
})
}

/// Write downloaded headers to the given transaction from ETL.
Expand Down Expand Up @@ -119,9 +119,7 @@ where
// order

let interval = total_headers / 10;
for (index, header) in
self.header_collector.iter()?.enumerate()
{
for (index, header) in self.header_collector.iter()?.enumerate() {
let (number, header_buf) = header?;

if index > 0 && index % interval == 0 {
Expand Down Expand Up @@ -174,9 +172,7 @@ where

// Since ETL sorts all entries by hashes, we are either appending (first sync) or inserting
// in order (further syncs).
for (index, hash_to_number) in
self.hash_collector.iter()?.enumerate()
{
for (index, hash_to_number) in self.hash_collector.iter()?.enumerate() {
let (hash, number) = hash_to_number?;

if index > 0 && index % interval == 0 {
Expand Down Expand Up @@ -426,6 +422,7 @@ mod tests {
HeaderSyncMode::Tip(self.channel.1.clone()),
self.consensus.clone(),
)
.unwrap()
}
}

Expand Down
6 changes: 3 additions & 3 deletions crates/stages/src/test_utils/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ impl TestStages {
}

impl<DB: Database> StageSet<DB> for TestStages {
fn builder(self) -> StageSetBuilder<DB> {
StageSetBuilder::default().add_stage(
fn builder(self) -> Result<StageSetBuilder<DB>, StageError> {
Ok(StageSetBuilder::default().add_stage(
TestStage::new(TEST_STAGE_ID)
.with_exec(self.exec_outputs)
.with_unwind(self.unwind_outputs),
)
))
}
}

0 comments on commit e218e52

Please sign in to comment.