Skip to content

Commit

Permalink
feat(stages): recover senders for both DB and static file transactions (
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed Jan 31, 2024
1 parent 9c0838e commit d1abb98
Show file tree
Hide file tree
Showing 23 changed files with 598 additions and 139 deletions.
2 changes: 1 addition & 1 deletion crates/stages/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ criterion = { workspace = true, features = ["async_futures"] }
serde_json.workspace = true

[features]
test-utils = ["reth-interfaces/test-utils", "reth-db/test-utils"]
test-utils = ["reth-interfaces/test-utils", "reth-db/test-utils", "reth-provider/test-utils"]

[[bench]]
name = "criterion"
Expand Down
84 changes: 32 additions & 52 deletions crates/stages/benches/criterion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use criterion::{
use pprof::criterion::{Output, PProfProfiler};
use reth_db::{test_utils::TempDatabase, DatabaseEnv};

use reth_primitives::stage::StageCheckpoint;
use reth_primitives::{stage::StageCheckpoint, BlockNumber};
use reth_stages::{
stages::{MerkleStage, SenderRecoveryStage, TransactionLookupStage},
test_utils::TestStageDB,
ExecInput, Stage, StageExt, UnwindInput,
};
use std::{path::PathBuf, sync::Arc};
use std::{ops::RangeInclusive, sync::Arc};

mod setup;
use setup::StageRange;
Expand All @@ -33,28 +33,23 @@ fn account_hashing(c: &mut Criterion) {
group.sample_size(10);

let num_blocks = 10_000;
let (path, stage, execution_range) = setup::prepare_account_hashing(num_blocks);
let (db, stage, range) = setup::prepare_account_hashing(num_blocks);

measure_stage_with_path(
path,
&mut group,
setup::stage_unwind,
stage,
execution_range,
"AccountHashing".to_string(),
);
measure_stage(&mut group, &db, setup::stage_unwind, stage, range, "AccountHashing".to_string());
}

fn senders(c: &mut Criterion) {
let mut group = c.benchmark_group("Stages");
// don't need to run each stage for that many times
group.sample_size(10);

let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS);

for batch in [1000usize, 10_000, 100_000, 250_000] {
let stage = SenderRecoveryStage { commit_threshold: DEFAULT_NUM_BLOCKS };
let label = format!("SendersRecovery-batch-{batch}");

measure_stage(&mut group, setup::stage_unwind, stage, 0..DEFAULT_NUM_BLOCKS, label);
measure_stage(&mut group, &db, setup::stage_unwind, stage, 0..=DEFAULT_NUM_BLOCKS, label);
}
}

Expand All @@ -64,11 +59,14 @@ fn transaction_lookup(c: &mut Criterion) {
group.sample_size(10);
let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, None);

let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS);

measure_stage(
&mut group,
&db,
setup::stage_unwind,
stage,
0..DEFAULT_NUM_BLOCKS,
0..=DEFAULT_NUM_BLOCKS,
"TransactionLookup".to_string(),
);
}
Expand All @@ -78,44 +76,58 @@ fn merkle(c: &mut Criterion) {
// don't need to run each stage for that many times
group.sample_size(10);

let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS);

let stage = MerkleStage::Both { clean_threshold: u64::MAX };
measure_stage(
&mut group,
&db,
setup::unwind_hashes,
stage,
1..DEFAULT_NUM_BLOCKS,
1..=DEFAULT_NUM_BLOCKS,
"Merkle-incremental".to_string(),
);

let stage = MerkleStage::Both { clean_threshold: 0 };
measure_stage(
&mut group,
&db,
setup::unwind_hashes,
stage,
1..DEFAULT_NUM_BLOCKS,
1..=DEFAULT_NUM_BLOCKS,
"Merkle-fullhash".to_string(),
);
}

fn measure_stage_with_path<F, S>(
path: PathBuf,
fn measure_stage<F, S>(
group: &mut BenchmarkGroup<'_, WallTime>,
db: &TestStageDB,
setup: F,
stage: S,
stage_range: StageRange,
block_interval: RangeInclusive<BlockNumber>,
label: String,
) where
S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>,
F: Fn(S, &TestStageDB, StageRange),
{
let db = TestStageDB::new(&path);
let stage_range = (
ExecInput {
target: Some(*block_interval.end()),
checkpoint: Some(StageCheckpoint::new(*block_interval.start())),
},
UnwindInput {
checkpoint: StageCheckpoint::new(*block_interval.end()),
unwind_to: *block_interval.start(),
bad_block: None,
},
);
let (input, _) = stage_range;

group.bench_function(label, move |b| {
b.to_async(FuturesExecutor).iter_with_setup(
|| {
// criterion setup does not support async, so we have to use our own runtime
setup(stage.clone(), &db, stage_range)
setup(stage.clone(), db, stage_range)
},
|_| async {
let mut stage = stage.clone();
Expand All @@ -130,35 +142,3 @@ fn measure_stage_with_path<F, S>(
)
});
}

fn measure_stage<F, S>(
group: &mut BenchmarkGroup<'_, WallTime>,
setup: F,
stage: S,
block_interval: std::ops::Range<u64>,
label: String,
) where
S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>,
F: Fn(S, &TestStageDB, StageRange),
{
let path = setup::txs_testdata(block_interval.end);

measure_stage_with_path(
path,
group,
setup,
stage,
(
ExecInput {
target: Some(block_interval.end),
checkpoint: Some(StageCheckpoint::new(block_interval.start)),
},
UnwindInput {
checkpoint: StageCheckpoint::new(block_interval.end),
unwind_to: block_interval.start,
bad_block: None,
},
),
label,
);
}
40 changes: 19 additions & 21 deletions crates/stages/benches/setup/account_hashing.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,38 @@
#![allow(unreachable_pub)]
use super::{constants, StageRange};

use super::constants;
use reth_db::{
cursor::DbCursorRO, database::Database, tables, transaction::DbTx, DatabaseError as DbError,
};
use reth_primitives::{fs, stage::StageCheckpoint};
use reth_primitives::{fs, stage::StageCheckpoint, BlockNumber};
use reth_stages::{
stages::{AccountHashingStage, SeedOpts},
test_utils::TestStageDB,
ExecInput, UnwindInput,
};
use std::path::{Path, PathBuf};
use std::{ops::RangeInclusive, path::Path};

/// Prepares a database for [`AccountHashingStage`]
/// If the environment variable [`constants::ACCOUNT_HASHING_DB`] is set, it will use that one and
/// will get the stage execution range from [`tables::BlockBodyIndices`]. Otherwise, it will
/// generate its own random data.
///
/// Returns the path to the database file, stage and range of stage execution if it exists.
pub fn prepare_account_hashing(num_blocks: u64) -> (PathBuf, AccountHashingStage, StageRange) {
let (path, stage_range) = match std::env::var(constants::ACCOUNT_HASHING_DB) {
pub fn prepare_account_hashing(
num_blocks: u64,
) -> (TestStageDB, AccountHashingStage, RangeInclusive<BlockNumber>) {
let (db, stage_range) = match std::env::var(constants::ACCOUNT_HASHING_DB) {
Ok(db) => {
let path = Path::new(&db).to_path_buf();
let range = find_stage_range(&path);
(path, range)
(TestStageDB::new(&path), range)
}
Err(_) => generate_testdata_db(num_blocks),
};

(path, AccountHashingStage::default(), stage_range)
(db, AccountHashingStage::default(), stage_range)
}

fn find_stage_range(db: &Path) -> StageRange {
fn find_stage_range(db: &Path) -> RangeInclusive<BlockNumber> {
let mut stage_range = None;
TestStageDB::new(db)
.factory
Expand All @@ -40,13 +42,7 @@ fn find_stage_range(db: &Path) -> StageRange {
let from = cursor.first()?.unwrap().0;
let to = StageCheckpoint::new(cursor.last()?.unwrap().0);

stage_range = Some((
ExecInput {
target: Some(to.block_number),
checkpoint: Some(StageCheckpoint::new(from)),
},
UnwindInput { unwind_to: from, checkpoint: to, bad_block: None },
));
stage_range = Some(from..=to.block_number);
Ok::<(), DbError>(())
})
.unwrap()
Expand All @@ -55,19 +51,21 @@ fn find_stage_range(db: &Path) -> StageRange {
stage_range.expect("Could not find the stage range from the external DB.")
}

fn generate_testdata_db(num_blocks: u64) -> (PathBuf, StageRange) {
fn generate_testdata_db(num_blocks: u64) -> (TestStageDB, RangeInclusive<BlockNumber>) {
let opts = SeedOpts { blocks: 0..=num_blocks, accounts: 0..100_000, txs: 100..150 };

let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("account-hashing-bench");
let exists = path.exists();
let db = TestStageDB::new(&path);

if !path.exists() {
if !exists {
// create the dirs
fs::create_dir_all(&path).unwrap();
println!("Account Hashing testdata not found, generating to {:?}", path.display());
let db = TestStageDB::new(&path);
let provider = db.factory.provider_rw().unwrap();
let _accounts = AccountHashingStage::seed(&provider, opts);
let _accounts = AccountHashingStage::seed(&provider, opts.clone());
provider.commit().expect("failed to commit");
}
(path, (ExecInput { target: Some(num_blocks), ..Default::default() }, UnwindInput::default()))

(db, opts.blocks)
}
18 changes: 8 additions & 10 deletions crates/stages/benches/setup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ use reth_stages::{
ExecInput, Stage, UnwindInput,
};
use reth_trie::StateRoot;
use std::{
collections::BTreeMap,
path::{Path, PathBuf},
sync::Arc,
};
use std::{collections::BTreeMap, path::Path, sync::Arc};

mod constants;

Expand Down Expand Up @@ -84,8 +80,7 @@ pub(crate) fn unwind_hashes<S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>>(

// Helper for generating testdata for the benchmarks.
// Returns the path to the database file.
pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("txs-bench");
pub(crate) fn txs_testdata(num_blocks: u64) -> TestStageDB {
let txs_range = 100..150;

// number of storage changes per transition
Expand All @@ -101,11 +96,14 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
// rng
let mut rng = generators::rng();

if !path.exists() {
let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("txs-bench");
let exists = path.exists();
let db = TestStageDB::new(&path);

if !exists {
// create the dirs
fs::create_dir_all(&path).unwrap();
println!("Transactions testdata not found, generating to {:?}", path.display());
let db = TestStageDB::new(&path);

let accounts: BTreeMap<Address, Account> = concat([
random_eoa_account_range(&mut rng, 0..n_eoa),
Expand Down Expand Up @@ -177,5 +175,5 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
.unwrap();
}

path
db
}
14 changes: 8 additions & 6 deletions crates/stages/src/stages/bodies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ use reth_db::{
models::{StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals},
tables,
transaction::{DbTx, DbTxMut},
DatabaseError,
};
use reth_interfaces::p2p::bodies::{downloader::BodyDownloader, response::BlockResponse};
use reth_interfaces::{
p2p::bodies::{downloader::BodyDownloader, response::BlockResponse},
provider::ProviderResult,
};
use reth_primitives::{
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
SnapshotSegment,
};
use reth_provider::{providers::SnapshotWriter, DatabaseProviderRW};
use reth_provider::{providers::SnapshotWriter, DatabaseProviderRW, StatsReader};
use std::{
cmp::Ordering,
task::{ready, Context, Poll},
Expand Down Expand Up @@ -342,10 +344,10 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// progress in gas as a proxy to size. Execution stage uses a similar approach.
fn stage_checkpoint<DB: Database>(
provider: &DatabaseProviderRW<DB>,
) -> Result<EntitiesCheckpoint, DatabaseError> {
) -> ProviderResult<EntitiesCheckpoint> {
Ok(EntitiesCheckpoint {
processed: provider.tx_ref().entries::<tables::BlockBodyIndices>()? as u64,
total: provider.tx_ref().entries::<tables::Headers>()? as u64,
processed: provider.count_entries::<tables::BlockBodyIndices>()? as u64,
total: provider.count_entries::<tables::Headers>()? as u64,
})
}

Expand Down
4 changes: 2 additions & 2 deletions crates/stages/src/stages/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use reth_primitives::{
use reth_provider::{
providers::{SnapshotProviderRWRefMut, SnapshotWriter},
BlockReader, DatabaseProviderRW, ExecutorFactory, HeaderProvider, LatestStateProviderRef,
OriginalValuesKnown, ProviderError, TransactionVariant,
OriginalValuesKnown, ProviderError, StatsReader, TransactionVariant,
};
use std::{
cmp::Ordering,
Expand Down Expand Up @@ -245,7 +245,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
// If we're not executing MerkleStage from scratch (by threshold or first-sync), then erase
// changeset related pruning configurations
if !(max_block - start_block > self.external_clean_threshold ||
provider.tx_ref().entries::<tables::AccountsTrie>()?.is_zero())
provider.count_entries::<tables::AccountsTrie>()?.is_zero())
{
prune_modes.account_history = None;
prune_modes.storage_history = None;
Expand Down
10 changes: 5 additions & 5 deletions crates/stages/src/stages/hashing_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
RawKey, RawTable,
};
use reth_interfaces::db::DatabaseError;
use reth_interfaces::provider::ProviderResult;
use reth_primitives::{
keccak256,
stage::{
AccountHashingCheckpoint, CheckpointBlockRange, EntitiesCheckpoint, StageCheckpoint,
StageId,
},
};
use reth_provider::{AccountExtReader, DatabaseProviderRW, HashingWriter};
use reth_provider::{AccountExtReader, DatabaseProviderRW, HashingWriter, StatsReader};
use std::{
cmp::max,
fmt::Debug,
Expand Down Expand Up @@ -289,10 +289,10 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {

fn stage_checkpoint_progress<DB: Database>(
provider: &DatabaseProviderRW<DB>,
) -> Result<EntitiesCheckpoint, DatabaseError> {
) -> ProviderResult<EntitiesCheckpoint> {
Ok(EntitiesCheckpoint {
processed: provider.tx_ref().entries::<tables::HashedAccount>()? as u64,
total: provider.tx_ref().entries::<tables::PlainAccountState>()? as u64,
processed: provider.count_entries::<tables::HashedAccount>()? as u64,
total: provider.count_entries::<tables::PlainAccountState>()? as u64,
})
}

Expand Down
Loading

0 comments on commit d1abb98

Please sign in to comment.