Skip to content

Commit

Permalink
wip: adjust commands and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
onbjerg committed Sep 18, 2023
1 parent 4259ebc commit e47ed97
Show file tree
Hide file tree
Showing 13 changed files with 141 additions and 137 deletions.
14 changes: 5 additions & 9 deletions bin/reth/src/stage/dump/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,11 @@ async fn dry_run<DB: Database>(
let provider = factory.provider_rw()?;
let mut exec_stage = ExecutionStage::new_with_factory(Factory::new(chain.clone()));

exec_stage
.execute(
&provider,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?;
let input =
reth_stages::ExecInput { target: Some(to), checkpoint: Some(StageCheckpoint::new(from)) };
poll_fn(|cx| exec_stage.poll_ready(cx, input))
.await
.and_then(|_| exec_stage.execute(&provider, input))?;

info!(target: "reth::cli", "Success.");

Expand Down
44 changes: 20 additions & 24 deletions bin/reth/src/stage/dump/hashing_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use reth_db::{database::Database, table::TableImporter, tables, DatabaseEnv};
use reth_primitives::{stage::StageCheckpoint, BlockNumber, ChainSpec};
use reth_provider::ProviderFactory;
use reth_stages::{stages::AccountHashingStage, Stage, UnwindInput};
use std::{path::PathBuf, sync::Arc};
use std::{future::poll_fn, path::PathBuf, sync::Arc};
use tracing::info;

pub(crate) async fn dump_hashing_account_stage<DB: Database>(
Expand All @@ -22,7 +22,7 @@ pub(crate) async fn dump_hashing_account_stage<DB: Database>(
tx.import_table_with_range::<tables::AccountChangeSet, _>(&db_tool.db.tx()?, Some(from), to)
})??;

unwind_and_copy(db_tool, from, tip_block_number, &output_db).await?;
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;

if should_run {
dry_run(db_tool.chain.clone(), output_db, to, from).await?;
Expand All @@ -32,7 +32,7 @@ pub(crate) async fn dump_hashing_account_stage<DB: Database>(
}

/// Dry-run an unwind to FROM block and copy the necessary table data to the new database.
async fn unwind_and_copy<DB: Database>(
fn unwind_and_copy<DB: Database>(
db_tool: &DbTool<'_, DB>,
from: u64,
tip_block_number: u64,
Expand All @@ -42,16 +42,14 @@ async fn unwind_and_copy<DB: Database>(
let provider = factory.provider_rw()?;
let mut exec_stage = AccountHashingStage::default();

exec_stage
.unwind(
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)
.await?;
exec_stage.unwind(
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)?;
let unwind_inner_tx = provider.into_tx();

output_db.update(|tx| tx.import_table::<tables::PlainAccountState, _>(&unwind_inner_tx))??;
Expand All @@ -75,17 +73,15 @@ async fn dry_run<DB: Database>(
..Default::default()
};

let mut exec_output = false;
while !exec_output {
exec_output = exec_stage
.execute(
&provider,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?
let mut done = false;
while !done {
let input = reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
};
done = poll_fn(|cx| exec_stage.poll_ready(cx, input))
.await
.and_then(|_| exec_stage.execute(&provider, input))?
.done;
}

Expand Down
44 changes: 20 additions & 24 deletions bin/reth/src/stage/dump/hashing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use reth_db::{database::Database, table::TableImporter, tables, DatabaseEnv};
use reth_primitives::{stage::StageCheckpoint, ChainSpec};
use reth_provider::ProviderFactory;
use reth_stages::{stages::StorageHashingStage, Stage, UnwindInput};
use std::{path::PathBuf, sync::Arc};
use std::{future::poll_fn, path::PathBuf, sync::Arc};
use tracing::info;

pub(crate) async fn dump_hashing_storage_stage<DB: Database>(
Expand All @@ -17,7 +17,7 @@ pub(crate) async fn dump_hashing_storage_stage<DB: Database>(
) -> Result<()> {
let (output_db, tip_block_number) = setup(from, to, output_db, db_tool)?;

unwind_and_copy(db_tool, from, tip_block_number, &output_db).await?;
unwind_and_copy(db_tool, from, tip_block_number, &output_db)?;

if should_run {
dry_run(db_tool.chain.clone(), output_db, to, from).await?;
Expand All @@ -27,7 +27,7 @@ pub(crate) async fn dump_hashing_storage_stage<DB: Database>(
}

/// Dry-run an unwind to FROM block and copy the necessary table data to the new database.
async fn unwind_and_copy<DB: Database>(
fn unwind_and_copy<DB: Database>(
db_tool: &DbTool<'_, DB>,
from: u64,
tip_block_number: u64,
Expand All @@ -38,16 +38,14 @@ async fn unwind_and_copy<DB: Database>(

let mut exec_stage = StorageHashingStage::default();

exec_stage
.unwind(
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)
.await?;
exec_stage.unwind(
&provider,
UnwindInput {
unwind_to: from,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)?;
let unwind_inner_tx = provider.into_tx();

// TODO optimize we can actually just get the entries we need for both these tables
Expand All @@ -74,17 +72,15 @@ async fn dry_run<DB: Database>(
..Default::default()
};

let mut exec_output = false;
while !exec_output {
exec_output = exec_stage
.execute(
&provider,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?
let mut done = false;
while !done {
let input = reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
};
done = poll_fn(|cx| exec_stage.poll_ready(cx, input))
.await
.and_then(|_| exec_stage.execute(&provider, input))?
.done;
}

Expand Down
75 changes: 44 additions & 31 deletions bin/reth/src/stage/dump/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ use crate::utils::DbTool;
use eyre::Result;
use reth_db::{database::Database, table::TableImporter, tables, DatabaseEnv};
use reth_primitives::{stage::StageCheckpoint, BlockNumber, ChainSpec, PruneModes};
use reth_provider::ProviderFactory;
use reth_provider::{DatabaseProviderRW, ProviderFactory};
use reth_stages::{
stages::{
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, MerkleStage,
StorageHashingStage, MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
},
Stage, UnwindInput,
ExecInput, ExecOutput, Stage, StageError, UnwindInput,
};
use std::{path::PathBuf, sync::Arc};
use std::{future::poll_fn, path::PathBuf, sync::Arc};
use tracing::info;

pub(crate) async fn dump_merkle_stage<DB: Database>(
Expand All @@ -31,7 +31,7 @@ pub(crate) async fn dump_merkle_stage<DB: Database>(
tx.import_table_with_range::<tables::AccountChangeSet, _>(&db_tool.db.tx()?, Some(from), to)
})??;

unwind_and_copy(db_tool, (from, to), tip_block_number, &output_db).await?;
unwind_and_copy(db_tool, (from, to), tip_block_number, &output_db)?;

if should_run {
dry_run(db_tool.chain.clone(), output_db, to, from).await?;
Expand Down Expand Up @@ -61,10 +61,10 @@ async fn unwind_and_copy<DB: Database>(

// Unwind hashes all the way to FROM

StorageHashingStage::default().unwind(&provider, unwind).await.unwrap();
AccountHashingStage::default().unwind(&provider, unwind).await.unwrap();
StorageHashingStage::default().unwind(&provider, unwind).unwrap();
AccountHashingStage::default().unwind(&provider, unwind).unwrap();

MerkleStage::default_unwind().unwind(&provider, unwind).await?;
MerkleStage::default_unwind().unwind(&provider, unwind)?;

// Bring Plainstate to TO (hashing stage execution requires it)
let mut exec_stage = ExecutionStage::new(
Expand All @@ -74,27 +74,31 @@ async fn unwind_and_copy<DB: Database>(
PruneModes::all(),
);

exec_stage
.unwind(
&provider,
UnwindInput {
unwind_to: to,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)
.await?;
exec_stage.unwind(
&provider,
UnwindInput {
unwind_to: to,
checkpoint: StageCheckpoint::new(tip_block_number),
bad_block: None,
},
)?;

// Bring hashes to TO

AccountHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX }
.execute(&provider, execute_input)
.await
.unwrap();
StorageHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX }
.execute(&provider, execute_input)
.await
.unwrap();
poll_and_execute(
&mut AccountHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX },
&provider,
execute_input,
)
.await
.unwrap();
poll_and_execute(
&mut StorageHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX },
&provider,
execute_input,
)
.await
.unwrap();

let unwind_inner_tx = provider.into_tx();

Expand All @@ -109,6 +113,15 @@ async fn unwind_and_copy<DB: Database>(
Ok(())
}

// todo: move to test_utils in reth_stages and use it where we currently manually poll
async fn poll_and_execute<DB: Database, S: Stage<DB>>(
stage: &mut S,
provider: &DatabaseProviderRW<'_, &DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
poll_fn(|cx| stage.poll_ready(cx, input)).await.and_then(|_| stage.execute(&provider, input))
}

/// Try to re-execute the stage straightaway
async fn dry_run<DB: Database>(
chain: Arc<ChainSpec>,
Expand All @@ -121,12 +134,12 @@ async fn dry_run<DB: Database>(
let provider = factory.provider_rw()?;
let mut exec_output = false;
while !exec_output {
exec_output = MerkleStage::Execution {
clean_threshold: u64::MAX, /* Forces updating the root instead of calculating
* from
* scratch */
}
.execute(
exec_output = poll_and_execute(
&mut MerkleStage::Execution {
clean_threshold: u64::MAX, /* Forces updating the root instead of calculating
* from
* scratch */
},
&provider,
reth_stages::ExecInput {
target: Some(to),
Expand Down
8 changes: 4 additions & 4 deletions bin/reth/src/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ impl Command {
.await?;
let fetch_client = Arc::new(network.fetch_client().await?);

let stage = BodyStage {
downloader: BodiesDownloaderBuilder::default()
let stage = BodyStage::new(
BodiesDownloaderBuilder::default()
.with_stream_batch_size(batch_size as usize)
.with_request_limit(config.stages.bodies.downloader_request_limit)
.with_max_buffered_blocks_size_bytes(
Expand All @@ -187,8 +187,8 @@ impl Command {
config.stages.bodies.downloader_max_concurrent_requests,
)
.build(fetch_client, consensus.clone(), db.clone()),
consensus: consensus.clone(),
};
consensus.clone(),
);

(Box::new(stage), None)
}
Expand Down
7 changes: 5 additions & 2 deletions crates/stages/benches/criterion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use reth_stages::{
test_utils::TestTransaction,
ExecInput, Stage, UnwindInput,
};
use std::{path::PathBuf, sync::Arc};
use std::{future::poll_fn, path::PathBuf, sync::Arc};

mod setup;
use setup::StageRange;
Expand Down Expand Up @@ -138,7 +138,10 @@ fn measure_stage_with_path<F, S>(
let mut stage = stage.clone();
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
let provider = factory.provider_rw().unwrap();
stage.execute(&provider, input).await.unwrap();
poll_fn(|cx| stage.poll_ready(cx, input))
.await
.and_then(|_| stage.execute(&provider, input))
.unwrap();
provider.commit().unwrap();
},
)
Expand Down
23 changes: 10 additions & 13 deletions crates/stages/benches/setup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ pub(crate) fn stage_unwind<S: Clone + Stage<DatabaseEnv>>(
// Clear previous run
stage
.unwind(&provider, unwind)
.await
.map_err(|e| {
format!(
"{e}\nMake sure your test database at `{}` isn't too old and incompatible with newer stage changes.",
Expand All @@ -67,22 +66,20 @@ pub(crate) fn unwind_hashes<S: Clone + Stage<DatabaseEnv>>(
) {
let (input, unwind) = range;

tokio::runtime::Runtime::new().unwrap().block_on(async {
let mut stage = stage.clone();
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
let provider = factory.provider_rw().unwrap();
let mut stage = stage.clone();
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
let provider = factory.provider_rw().unwrap();

StorageHashingStage::default().unwind(&provider, unwind).await.unwrap();
AccountHashingStage::default().unwind(&provider, unwind).await.unwrap();
StorageHashingStage::default().unwind(&provider, unwind).unwrap();
AccountHashingStage::default().unwind(&provider, unwind).unwrap();

// Clear previous run
stage.unwind(&provider, unwind).await.unwrap();
// Clear previous run
stage.unwind(&provider, unwind).unwrap();

AccountHashingStage::default().execute(&provider, input).await.unwrap();
StorageHashingStage::default().execute(&provider, input).await.unwrap();
AccountHashingStage::default().execute(&provider, input).unwrap();
StorageHashingStage::default().execute(&provider, input).unwrap();

provider.commit().unwrap();
});
provider.commit().unwrap();
}

// Helper for generating testdata for the benchmarks.
Expand Down
Loading

0 comments on commit e47ed97

Please sign in to comment.