Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: split async/sync work in stages #4636

Merged
merged 13 commits into from
Nov 17, 2023
17 changes: 7 additions & 10 deletions bin/reth/src/chain/import.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::{
args::{
utils::{chain_help, genesis_value_parser, SUPPORTED_CHAINS},
DatabaseArgs,
},
dirs::{DataDirPath, MaybePlatformPath},
init::init_genesis,
node::events::{handle_events, NodeEvent},
Expand All @@ -8,12 +12,6 @@ use clap::Parser;
use eyre::Context;
use futures::{Stream, StreamExt};
use reth_beacon_consensus::BeaconConsensus;
use reth_provider::{ProviderFactory, StageCheckpointReader};

use crate::args::{
utils::{chain_help, genesis_value_parser, SUPPORTED_CHAINS},
DatabaseArgs,
};
use reth_config::Config;
use reth_db::{database::Database, init_db};
use reth_downloaders::{
Expand All @@ -22,12 +20,10 @@ use reth_downloaders::{
};
use reth_interfaces::consensus::Consensus;
use reth_primitives::{stage::StageId, ChainSpec, B256};
use reth_provider::{HeaderSyncMode, ProviderFactory, StageCheckpointReader};
use reth_stages::{
prelude::*,
stages::{
ExecutionStage, ExecutionStageThresholds, HeaderSyncMode, SenderRecoveryStage,
TotalDifficultyStage,
},
stages::{ExecutionStage, ExecutionStageThresholds, SenderRecoveryStage, TotalDifficultyStage},
};
use std::{path::PathBuf, sync::Arc};
use tokio::sync::watch;
Expand Down Expand Up @@ -164,6 +160,7 @@ impl ImportCommand {
.with_max_block(max_block)
.add_stages(
DefaultStages::new(
ProviderFactory::new(db.clone(), self.chain.clone()),
HeaderSyncMode::Tip(tip_rx),
consensus.clone(),
header_downloader,
Expand Down
8 changes: 3 additions & 5 deletions bin/reth/src/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@ use reth_interfaces::{
use reth_network::{NetworkEvents, NetworkHandle};
use reth_network_api::NetworkInfo;
use reth_primitives::{fs, stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, B256};
use reth_provider::{BlockExecutionWriter, ProviderFactory, StageCheckpointReader};
use reth_provider::{BlockExecutionWriter, HeaderSyncMode, ProviderFactory, StageCheckpointReader};
use reth_stages::{
sets::DefaultStages,
stages::{
ExecutionStage, ExecutionStageThresholds, HeaderSyncMode, SenderRecoveryStage,
TotalDifficultyStage,
},
stages::{ExecutionStage, ExecutionStageThresholds, SenderRecoveryStage, TotalDifficultyStage},
Pipeline, StageSet,
};
use reth_tasks::TaskExecutor;
Expand Down Expand Up @@ -118,6 +115,7 @@ impl Command {
.with_tip_sender(tip_tx)
.add_stages(
DefaultStages::new(
ProviderFactory::new(db.clone(), self.chain.clone()),
header_mode,
Arc::clone(&consensus),
header_downloader,
Expand Down
57 changes: 23 additions & 34 deletions bin/reth/src/debug_cmd/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,53 +222,42 @@ impl Command {
None
};

execution_stage
.execute(
execution_stage.execute(
&provider_rw,
ExecInput {
target: Some(block),
checkpoint: block.checked_sub(1).map(StageCheckpoint::new),
},
)?;

let mut account_hashing_done = false;
while !account_hashing_done {
let output = account_hashing_stage.execute(
&provider_rw,
ExecInput {
target: Some(block),
checkpoint: block.checked_sub(1).map(StageCheckpoint::new),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await?;

let mut account_hashing_done = false;
while !account_hashing_done {
let output = account_hashing_stage
.execute(
&provider_rw,
ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await?;
)?;
account_hashing_done = output.done;
}

let mut storage_hashing_done = false;
while !storage_hashing_done {
let output = storage_hashing_stage
.execute(
&provider_rw,
ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await?;
storage_hashing_done = output.done;
}

let incremental_result = merkle_stage
.execute(
let output = storage_hashing_stage.execute(
&provider_rw,
ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await;
)?;
storage_hashing_done = output.done;
}

let incremental_result = merkle_stage.execute(
&provider_rw,
ExecInput { target: Some(block), checkpoint: progress.map(StageCheckpoint::new) },
);

if incremental_result.is_err() {
tracing::warn!(target: "reth::cli", block, "Incremental calculation failed, retrying from scratch");
Expand All @@ -285,7 +274,7 @@ impl Command {

let clean_input = ExecInput { target: Some(block), checkpoint: None };
loop {
let clean_result = merkle_stage.execute(&provider_rw, clean_input).await;
let clean_result = merkle_stage.execute(&provider_rw, clean_input);
assert!(clean_result.is_ok(), "Clean state root calculation failed");
if clean_result.unwrap().done {
break
Expand Down
9 changes: 5 additions & 4 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use reth_primitives::{
};
use reth_provider::{
providers::BlockchainProvider, BlockHashReader, BlockReader, CanonStateSubscriptions,
HeaderProvider, ProviderFactory, StageCheckpointReader,
HeaderProvider, HeaderSyncMode, ProviderFactory, StageCheckpointReader,
};
use reth_prune::{segments::SegmentSet, Pruner};
use reth_revm::Factory;
Expand All @@ -71,9 +71,9 @@ use reth_snapshot::HighestSnapshotsTracker;
use reth_stages::{
prelude::*,
stages::{
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, HeaderSyncMode,
IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage,
StorageHashingStage, TotalDifficultyStage, TransactionLookupStage,
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage,
IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
TotalDifficultyStage, TransactionLookupStage,
},
};
use reth_tasks::TaskExecutor;
Expand Down Expand Up @@ -896,6 +896,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
.with_metrics_tx(metrics_tx.clone())
.add_stages(
DefaultStages::new(
ProviderFactory::new(db.clone(), self.chain.clone()),
header_mode,
Arc::clone(&consensus),
header_downloader,
Expand Down
33 changes: 12 additions & 21 deletions bin/reth/src/stage/dump/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,14 @@ async fn unwind_and_copy<DB: Database>(

let mut exec_stage = ExecutionStage::new_with_factory(Factory::new(db_tool.chain.clone()));

exec_stage
.unwind(
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)
.await?;
exec_stage.unwind(
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)?;

let unwind_inner_tx = provider.into_tx();

Expand All @@ -131,20 +129,13 @@ async fn dry_run<DB: Database>(
info!(target: "reth::cli", "Executing stage. [dry-run]");

let factory = ProviderFactory::new(&output_db, chain.clone());
let provider = factory.provider_rw()?;
let mut exec_stage = ExecutionStage::new_with_factory(Factory::new(chain.clone()));

exec_stage
.execute(
&provider,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?;
let input =
reth_stages::ExecInput { target: Some(to), checkpoint: Some(StageCheckpoint::new(from)) };
exec_stage.execute(&factory.provider_rw()?, input)?;

info!(target: "reth::cli", "Success.");
info!(target: "reth::cli", "Success");

Ok(())
}
44 changes: 19 additions & 25 deletions bin/reth/src/stage/dump/hashing_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub(crate) async fn dump_hashing_account_stage<DB: Database>(
tx.import_table_with_range::<tables::AccountChangeSet, _>(&db_tool.db.tx()?, Some(from), to)
})??;

unwind_and_copy(db_tool, from, tip_block_number, &output_db).await?;
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;

if should_run {
dry_run(db_tool.chain.clone(), output_db, to, from).await?;
Expand All @@ -32,7 +32,7 @@ pub(crate) async fn dump_hashing_account_stage<DB: Database>(
}

/// Dry-run an unwind to FROM block and copy the necessary table data to the new database.
async fn unwind_and_copy<DB: Database>(
fn unwind_and_copy<DB: Database>(
db_tool: &DbTool<'_, DB>,
from: u64,
tip_block_number: u64,
Expand All @@ -42,16 +42,14 @@ async fn unwind_and_copy<DB: Database>(
let provider = factory.provider_rw()?;
let mut exec_stage = AccountHashingStage::default();

exec_stage
.unwind(
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)
.await?;
exec_stage.unwind(
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)?;
let unwind_inner_tx = provider.into_tx();

output_db.update(|tx| tx.import_table::<tables::PlainAccountState, _>(&unwind_inner_tx))??;
Expand All @@ -70,23 +68,19 @@ async fn dry_run<DB: Database>(

let factory = ProviderFactory::new(&output_db, chain);
let provider = factory.provider_rw()?;
let mut exec_stage = AccountHashingStage {
let mut stage = AccountHashingStage {
clean_threshold: 1, // Forces hashing from scratch
..Default::default()
};

let mut exec_output = false;
while !exec_output {
exec_output = exec_stage
.execute(
&provider,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?
.done;
loop {
let input = reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
};
if stage.execute(&provider, input)?.done {
break
}
}

info!(target: "reth::cli", "Success.");
Expand Down
44 changes: 19 additions & 25 deletions bin/reth/src/stage/dump/hashing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub(crate) async fn dump_hashing_storage_stage<DB: Database>(
) -> Result<()> {
let (output_db, tip_block_number) = setup(from, to, output_db, db_tool)?;

unwind_and_copy(db_tool, from, tip_block_number, &output_db).await?;
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;

if should_run {
dry_run(db_tool.chain.clone(), output_db, to, from).await?;
Expand All @@ -27,7 +27,7 @@ pub(crate) async fn dump_hashing_storage_stage<DB: Database>(
}

/// Dry-run an unwind to FROM block and copy the necessary table data to the new database.
async fn unwind_and_copy<DB: Database>(
fn unwind_and_copy<DB: Database>(
db_tool: &DbTool<'_, DB>,
from: u64,
tip_block_number: u64,
Expand All @@ -38,16 +38,14 @@ async fn unwind_and_copy<DB: Database>(

let mut exec_stage = StorageHashingStage::default();

exec_stage
.unwind(
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)
.await?;
exec_stage.unwind(
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)?;
let unwind_inner_tx = provider.into_tx();

// TODO optimize we can actually just get the entries we need for both these tables
Expand All @@ -69,23 +67,19 @@ async fn dry_run<DB: Database>(

let factory = ProviderFactory::new(&output_db, chain);
let provider = factory.provider_rw()?;
let mut exec_stage = StorageHashingStage {
let mut stage = StorageHashingStage {
clean_threshold: 1, // Forces hashing from scratch
..Default::default()
};

let mut exec_output = false;
while !exec_output {
exec_output = exec_stage
.execute(
&provider,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?
.done;
loop {
let input = reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
};
if stage.execute(&provider, input)?.done {
break
}
}

info!(target: "reth::cli", "Success.");
Expand Down
Loading
Loading