Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stages): recover senders for both DB and static file transactions #6089

Merged
merged 15 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/stages/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ criterion = { workspace = true, features = ["async_futures"] }
serde_json.workspace = true

[features]
test-utils = ["reth-interfaces/test-utils", "reth-db/test-utils"]
test-utils = ["reth-interfaces/test-utils", "reth-db/test-utils", "reth-provider/test-utils"]

[[bench]]
name = "criterion"
Expand Down
89 changes: 36 additions & 53 deletions crates/stages/benches/criterion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use criterion::{
use pprof::criterion::{Output, PProfProfiler};
use reth_db::{test_utils::TempDatabase, DatabaseEnv};
use reth_interfaces::test_utils::TestConsensus;
use reth_primitives::stage::StageCheckpoint;
use reth_primitives::{stage::StageCheckpoint, BlockNumber};
use reth_stages::{
stages::{MerkleStage, SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage},
test_utils::TestStageDB,
ExecInput, Stage, StageExt, UnwindInput,
};
use std::{path::PathBuf, sync::Arc};
use std::{ops::RangeInclusive, sync::Arc};

mod setup;
use setup::StageRange;
Expand All @@ -33,28 +33,23 @@ fn account_hashing(c: &mut Criterion) {
group.sample_size(10);

let num_blocks = 10_000;
let (path, stage, execution_range) = setup::prepare_account_hashing(num_blocks);
let (db, stage, range) = setup::prepare_account_hashing(num_blocks);

measure_stage_with_path(
path,
&mut group,
setup::stage_unwind,
stage,
execution_range,
"AccountHashing".to_string(),
);
measure_stage(&mut group, &db, setup::stage_unwind, stage, range, "AccountHashing".to_string());
}

fn senders(c: &mut Criterion) {
let mut group = c.benchmark_group("Stages");
// don't need to run each stage for that many times
group.sample_size(10);

let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS);

for batch in [1000usize, 10_000, 100_000, 250_000] {
let stage = SenderRecoveryStage { commit_threshold: DEFAULT_NUM_BLOCKS };
let label = format!("SendersRecovery-batch-{batch}");

measure_stage(&mut group, setup::stage_unwind, stage, 0..DEFAULT_NUM_BLOCKS, label);
measure_stage(&mut group, &db, setup::stage_unwind, stage, 0..=DEFAULT_NUM_BLOCKS, label);
}
}

Expand All @@ -64,11 +59,14 @@ fn transaction_lookup(c: &mut Criterion) {
group.sample_size(10);
let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, None);

let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS);

measure_stage(
&mut group,
&db,
setup::stage_unwind,
stage,
0..DEFAULT_NUM_BLOCKS,
0..=DEFAULT_NUM_BLOCKS,
"TransactionLookup".to_string(),
);
}
Expand All @@ -81,11 +79,14 @@ fn total_difficulty(c: &mut Criterion) {
group.sample_size(10);
let stage = TotalDifficultyStage::new(Arc::new(TestConsensus::default()));

let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS);

measure_stage(
&mut group,
&db,
setup::stage_unwind,
stage,
0..DEFAULT_NUM_BLOCKS,
0..=DEFAULT_NUM_BLOCKS,
"TotalDifficulty".to_string(),
);
}
Expand All @@ -95,44 +96,58 @@ fn merkle(c: &mut Criterion) {
// don't need to run each stage for that many times
group.sample_size(10);

let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS);

let stage = MerkleStage::Both { clean_threshold: u64::MAX };
measure_stage(
&mut group,
&db,
setup::unwind_hashes,
stage,
1..DEFAULT_NUM_BLOCKS,
1..=DEFAULT_NUM_BLOCKS,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this a bug before?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or well "bug", i guess this is in a benchmark, so doesn't rly matter

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the behavior didn't change: even though we were passing an exclusive range, we constructed StageRange struct with range.start and range.end

"Merkle-incremental".to_string(),
);

let stage = MerkleStage::Both { clean_threshold: 0 };
measure_stage(
&mut group,
&db,
setup::unwind_hashes,
stage,
1..DEFAULT_NUM_BLOCKS,
1..=DEFAULT_NUM_BLOCKS,
"Merkle-fullhash".to_string(),
);
}

fn measure_stage_with_path<F, S>(
path: PathBuf,
fn measure_stage<F, S>(
group: &mut BenchmarkGroup<'_, WallTime>,
db: &TestStageDB,
setup: F,
stage: S,
stage_range: StageRange,
block_interval: RangeInclusive<BlockNumber>,
label: String,
) where
S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>,
F: Fn(S, &TestStageDB, StageRange),
{
let db = TestStageDB::new(&path);
let stage_range = (
ExecInput {
target: Some(*block_interval.end()),
checkpoint: Some(StageCheckpoint::new(*block_interval.start())),
},
UnwindInput {
checkpoint: StageCheckpoint::new(*block_interval.end()),
unwind_to: *block_interval.start(),
bad_block: None,
},
);
let (input, _) = stage_range;

group.bench_function(label, move |b| {
b.to_async(FuturesExecutor).iter_with_setup(
|| {
// criterion setup does not support async, so we have to use our own runtime
setup(stage.clone(), &db, stage_range)
setup(stage.clone(), db, stage_range)
},
|_| async {
let mut stage = stage.clone();
Expand All @@ -147,35 +162,3 @@ fn measure_stage_with_path<F, S>(
)
});
}

fn measure_stage<F, S>(
group: &mut BenchmarkGroup<'_, WallTime>,
setup: F,
stage: S,
block_interval: std::ops::Range<u64>,
label: String,
) where
S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>,
F: Fn(S, &TestStageDB, StageRange),
{
let path = setup::txs_testdata(block_interval.end);

measure_stage_with_path(
path,
group,
setup,
stage,
(
ExecInput {
target: Some(block_interval.end),
checkpoint: Some(StageCheckpoint::new(block_interval.start)),
},
UnwindInput {
checkpoint: StageCheckpoint::new(block_interval.end),
unwind_to: block_interval.start,
bad_block: None,
},
),
label,
);
}
40 changes: 19 additions & 21 deletions crates/stages/benches/setup/account_hashing.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,38 @@
#![allow(unreachable_pub)]
use super::{constants, StageRange};

use super::constants;
use reth_db::{
cursor::DbCursorRO, database::Database, tables, transaction::DbTx, DatabaseError as DbError,
};
use reth_primitives::{fs, stage::StageCheckpoint};
use reth_primitives::{fs, stage::StageCheckpoint, BlockNumber};
use reth_stages::{
stages::{AccountHashingStage, SeedOpts},
test_utils::TestStageDB,
ExecInput, UnwindInput,
};
use std::path::{Path, PathBuf};
use std::{ops::RangeInclusive, path::Path};

/// Prepares a database for [`AccountHashingStage`]
/// If the environment variable [`constants::ACCOUNT_HASHING_DB`] is set, it will use that one and
/// will get the stage execution range from [`tables::BlockBodyIndices`]. Otherwise, it will
/// generate its own random data.
///
/// Returns the path to the database file, stage and range of stage execution if it exists.
pub fn prepare_account_hashing(num_blocks: u64) -> (PathBuf, AccountHashingStage, StageRange) {
let (path, stage_range) = match std::env::var(constants::ACCOUNT_HASHING_DB) {
pub fn prepare_account_hashing(
num_blocks: u64,
) -> (TestStageDB, AccountHashingStage, RangeInclusive<BlockNumber>) {
let (db, stage_range) = match std::env::var(constants::ACCOUNT_HASHING_DB) {
Ok(db) => {
let path = Path::new(&db).to_path_buf();
let range = find_stage_range(&path);
(path, range)
(TestStageDB::new(&path), range)
}
Err(_) => generate_testdata_db(num_blocks),
};

(path, AccountHashingStage::default(), stage_range)
(db, AccountHashingStage::default(), stage_range)
}

fn find_stage_range(db: &Path) -> StageRange {
fn find_stage_range(db: &Path) -> RangeInclusive<BlockNumber> {
let mut stage_range = None;
TestStageDB::new(db)
.factory
Expand All @@ -40,13 +42,7 @@ fn find_stage_range(db: &Path) -> StageRange {
let from = cursor.first()?.unwrap().0;
let to = StageCheckpoint::new(cursor.last()?.unwrap().0);

stage_range = Some((
ExecInput {
target: Some(to.block_number),
checkpoint: Some(StageCheckpoint::new(from)),
},
UnwindInput { unwind_to: from, checkpoint: to, bad_block: None },
));
stage_range = Some(from..=to.block_number);
Ok::<(), DbError>(())
})
.unwrap()
Expand All @@ -55,19 +51,21 @@ fn find_stage_range(db: &Path) -> StageRange {
stage_range.expect("Could not find the stage range from the external DB.")
}

fn generate_testdata_db(num_blocks: u64) -> (PathBuf, StageRange) {
fn generate_testdata_db(num_blocks: u64) -> (TestStageDB, RangeInclusive<BlockNumber>) {
let opts = SeedOpts { blocks: 0..=num_blocks, accounts: 0..100_000, txs: 100..150 };

let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("account-hashing-bench");
let exists = path.exists();
let db = TestStageDB::new(&path);

if !path.exists() {
if !exists {
// create the dirs
fs::create_dir_all(&path).unwrap();
println!("Account Hashing testdata not found, generating to {:?}", path.display());
let db = TestStageDB::new(&path);
let provider = db.factory.provider_rw().unwrap();
let _accounts = AccountHashingStage::seed(&provider, opts);
let _accounts = AccountHashingStage::seed(&provider, opts.clone());
provider.commit().expect("failed to commit");
}
(path, (ExecInput { target: Some(num_blocks), ..Default::default() }, UnwindInput::default()))

(db, opts.blocks)
}
18 changes: 8 additions & 10 deletions crates/stages/benches/setup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ use reth_stages::{
ExecInput, Stage, UnwindInput,
};
use reth_trie::StateRoot;
use std::{
collections::BTreeMap,
path::{Path, PathBuf},
sync::Arc,
};
use std::{collections::BTreeMap, path::Path, sync::Arc};

mod constants;

Expand Down Expand Up @@ -84,8 +80,7 @@ pub(crate) fn unwind_hashes<S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>>(

// Helper for generating testdata for the benchmarks.
// Returns the path to the database file.
pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("txs-bench");
pub(crate) fn txs_testdata(num_blocks: u64) -> TestStageDB {
let txs_range = 100..150;

// number of storage changes per transition
Expand All @@ -101,11 +96,14 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
// rng
let mut rng = generators::rng();

if !path.exists() {
let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("txs-bench");
let exists = path.exists();
let db = TestStageDB::new(&path);

if !exists {
// create the dirs
fs::create_dir_all(&path).unwrap();
println!("Transactions testdata not found, generating to {:?}", path.display());
let db = TestStageDB::new(&path);

let accounts: BTreeMap<Address, Account> = concat([
random_eoa_account_range(&mut rng, 0..n_eoa),
Expand Down Expand Up @@ -177,5 +175,5 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
.unwrap();
}

path
db
}
16 changes: 9 additions & 7 deletions crates/stages/src/stages/bodies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ use reth_db::{
database::Database,
models::{StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals},
tables,
transaction::{DbTx, DbTxMut},
DatabaseError,
transaction::DbTxMut,
};
use reth_interfaces::{
p2p::bodies::{downloader::BodyDownloader, response::BlockResponse},
provider::ProviderResult,
};
use reth_interfaces::p2p::bodies::{downloader::BodyDownloader, response::BlockResponse};
use reth_primitives::stage::{EntitiesCheckpoint, StageCheckpoint, StageId};
use reth_provider::DatabaseProviderRW;
use reth_provider::{DatabaseProviderRW, StatsReader};
use std::task::{ready, Context, Poll};
use tracing::*;

Expand Down Expand Up @@ -246,10 +248,10 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// progress in gas as a proxy to size. Execution stage uses a similar approach.
fn stage_checkpoint<DB: Database>(
provider: &DatabaseProviderRW<DB>,
) -> Result<EntitiesCheckpoint, DatabaseError> {
) -> ProviderResult<EntitiesCheckpoint> {
Ok(EntitiesCheckpoint {
processed: provider.tx_ref().entries::<tables::BlockBodyIndices>()? as u64,
total: provider.tx_ref().entries::<tables::Headers>()? as u64,
processed: provider.count_entries::<tables::BlockBodyIndices>()? as u64,
total: provider.count_entries::<tables::Headers>()? as u64,
})
}

Expand Down
4 changes: 2 additions & 2 deletions crates/stages/src/stages/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use reth_primitives::{
};
use reth_provider::{
BlockReader, DatabaseProviderRW, ExecutorFactory, HeaderProvider, LatestStateProviderRef,
OriginalValuesKnown, ProviderError, TransactionVariant,
OriginalValuesKnown, ProviderError, StatsReader, TransactionVariant,
};
use std::{
ops::RangeInclusive,
Expand Down Expand Up @@ -237,7 +237,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
// If we're not executing MerkleStage from scratch (by threshold or first-sync), then erase
// changeset related pruning configurations
if !(max_block - start_block > self.external_clean_threshold ||
provider.tx_ref().entries::<tables::AccountsTrie>()?.is_zero())
provider.count_entries::<tables::AccountsTrie>()?.is_zero())
{
prune_modes.account_history = None;
prune_modes.storage_history = None;
Expand Down
Loading
Loading