From e017a7edbbd22947aac9864dd523cc679b6dca26 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 16 Jan 2024 13:28:01 +0000 Subject: [PATCH 01/12] feat(stages): recover senders for transactions in both db and static files --- crates/stages/benches/criterion.rs | 91 ++++++++----------- .../stages/benches/setup/account_hashing.rs | 40 ++++---- crates/stages/benches/setup/mod.rs | 18 ++-- crates/stages/src/stages/sender_recovery.rs | 16 ++-- 4 files changed, 74 insertions(+), 91 deletions(-) diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index e9354503d279..e3666292da4c 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -6,13 +6,13 @@ use criterion::{ use pprof::criterion::{Output, PProfProfiler}; use reth_db::{test_utils::TempDatabase, DatabaseEnv}; use reth_interfaces::test_utils::TestConsensus; -use reth_primitives::stage::StageCheckpoint; +use reth_primitives::{stage::StageCheckpoint, BlockNumber}; use reth_stages::{ stages::{MerkleStage, SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage}, test_utils::TestStageDB, ExecInput, Stage, StageExt, UnwindInput, }; -use std::{path::PathBuf, sync::Arc}; +use std::{ops::RangeInclusive, sync::Arc}; mod setup; use setup::StageRange; @@ -20,7 +20,7 @@ use setup::StageRange; criterion_group! { name = benches; config = Criterion::default().with_profiler(PProfProfiler::new(1000, Output::Flamegraph(None))); - targets = transaction_lookup, account_hashing, senders, total_difficulty, merkle + targets = senders } criterion_main!(benches); @@ -33,16 +33,9 @@ fn account_hashing(c: &mut Criterion) { group.sample_size(10); let num_blocks = 10_000; - let (path, stage, execution_range) = setup::prepare_account_hashing(num_blocks); + let (db, stage, range) = setup::prepare_account_hashing(num_blocks); - measure_stage_with_path( - path, - &mut group, - setup::stage_unwind, - stage, - execution_range, - "AccountHashing".to_string(), - ); + measure_stage(&mut group, &db, setup::stage_unwind, stage, range, "AccountHashing".to_string()); } fn senders(c: &mut Criterion) { @@ -50,11 +43,13 @@ fn senders(c: &mut Criterion) { // don't need to run each stage for that many times group.sample_size(10); + let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS); + for batch in [1000usize, 10_000, 100_000, 250_000] { let stage = SenderRecoveryStage { commit_threshold: DEFAULT_NUM_BLOCKS }; let label = format!("SendersRecovery-batch-{batch}"); - measure_stage(&mut group, setup::stage_unwind, stage, 0..DEFAULT_NUM_BLOCKS, label); + measure_stage(&mut group, &db, setup::stage_unwind, stage, 0..=DEFAULT_NUM_BLOCKS, label); } } @@ -64,11 +59,14 @@ fn transaction_lookup(c: &mut Criterion) { group.sample_size(10); let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, None); + let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS); + measure_stage( &mut group, + &db, setup::stage_unwind, stage, - 0..DEFAULT_NUM_BLOCKS, + 0..=DEFAULT_NUM_BLOCKS, "TransactionLookup".to_string(), ); } @@ -81,11 +79,14 @@ fn total_difficulty(c: &mut Criterion) { group.sample_size(10); let stage = TotalDifficultyStage::new(Arc::new(TestConsensus::default())); + let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS); + measure_stage( &mut group, + &db, setup::stage_unwind, stage, - 0..DEFAULT_NUM_BLOCKS, + 0..=DEFAULT_NUM_BLOCKS, "TotalDifficulty".to_string(), ); } @@ -95,44 +96,58 @@ fn merkle(c: &mut Criterion) { // don't need to run each stage for that many times group.sample_size(10); + let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS); + let stage = MerkleStage::Both { clean_threshold: u64::MAX }; measure_stage( &mut group, + &db, setup::unwind_hashes, stage, - 1..DEFAULT_NUM_BLOCKS, + 1..=DEFAULT_NUM_BLOCKS, "Merkle-incremental".to_string(), ); let stage = MerkleStage::Both { clean_threshold: 0 }; measure_stage( &mut group, + &db, setup::unwind_hashes, stage, - 1..DEFAULT_NUM_BLOCKS, + 1..=DEFAULT_NUM_BLOCKS, "Merkle-fullhash".to_string(), ); } -fn measure_stage_with_path( - path: PathBuf, +fn measure_stage( group: &mut BenchmarkGroup<'_, WallTime>, + db: &TestStageDB, setup: F, stage: S, - stage_range: StageRange, + block_interval: RangeInclusive, label: String, ) where S: Clone + Stage>>, F: Fn(S, &TestStageDB, StageRange), { - let db = TestStageDB::new(&path); + let stage_range = ( + ExecInput { + target: Some(*block_interval.end()), + checkpoint: Some(StageCheckpoint::new(*block_interval.start())), + }, + UnwindInput { + checkpoint: StageCheckpoint::new(*block_interval.end()), + unwind_to: *block_interval.start(), + bad_block: None, + }, + ); let (input, _) = stage_range; group.bench_function(label, move |b| { b.to_async(FuturesExecutor).iter_with_setup( || { // criterion setup does not support async, so we have to use our own runtime - setup(stage.clone(), &db, stage_range) + setup(stage.clone(), db, stage_range) }, |_| async { let mut stage = stage.clone(); @@ -147,35 +162,3 @@ fn measure_stage_with_path( ) }); } - -fn measure_stage( - group: &mut BenchmarkGroup<'_, WallTime>, - setup: F, - stage: S, - block_interval: std::ops::Range, - label: String, -) where - S: Clone + Stage>>, - F: Fn(S, &TestStageDB, StageRange), -{ - let path = setup::txs_testdata(block_interval.end); - - measure_stage_with_path( - path, - group, - setup, - stage, - ( - ExecInput { - target: Some(block_interval.end), - checkpoint: Some(StageCheckpoint::new(block_interval.start)), - }, - UnwindInput { - checkpoint: StageCheckpoint::new(block_interval.end), - unwind_to: block_interval.start, - bad_block: None, - }, - ), - label, - ); -} diff --git a/crates/stages/benches/setup/account_hashing.rs b/crates/stages/benches/setup/account_hashing.rs index 497dce2787f4..ba532d3660e7 100644 --- a/crates/stages/benches/setup/account_hashing.rs +++ b/crates/stages/benches/setup/account_hashing.rs @@ -1,15 +1,15 @@ #![allow(unreachable_pub)] -use super::{constants, StageRange}; + +use super::constants; use reth_db::{ cursor::DbCursorRO, database::Database, tables, transaction::DbTx, DatabaseError as DbError, }; -use reth_primitives::{fs, stage::StageCheckpoint}; +use reth_primitives::{fs, stage::StageCheckpoint, BlockNumber}; use reth_stages::{ stages::{AccountHashingStage, SeedOpts}, test_utils::TestStageDB, - ExecInput, UnwindInput, }; -use std::path::{Path, PathBuf}; +use std::{ops::RangeInclusive, path::Path}; /// Prepares a database for [`AccountHashingStage`] /// If the environment variable [`constants::ACCOUNT_HASHING_DB`] is set, it will use that one and @@ -17,20 +17,22 @@ use std::path::{Path, PathBuf}; /// generate its own random data. /// /// Returns the path to the database file, stage and range of stage execution if it exists. -pub fn prepare_account_hashing(num_blocks: u64) -> (PathBuf, AccountHashingStage, StageRange) { - let (path, stage_range) = match std::env::var(constants::ACCOUNT_HASHING_DB) { +pub fn prepare_account_hashing( + num_blocks: u64, +) -> (TestStageDB, AccountHashingStage, RangeInclusive) { + let (db, stage_range) = match std::env::var(constants::ACCOUNT_HASHING_DB) { Ok(db) => { let path = Path::new(&db).to_path_buf(); let range = find_stage_range(&path); - (path, range) + (TestStageDB::new(&path), range) } Err(_) => generate_testdata_db(num_blocks), }; - (path, AccountHashingStage::default(), stage_range) + (db, AccountHashingStage::default(), stage_range) } -fn find_stage_range(db: &Path) -> StageRange { +fn find_stage_range(db: &Path) -> RangeInclusive { let mut stage_range = None; TestStageDB::new(db) .factory @@ -40,13 +42,7 @@ fn find_stage_range(db: &Path) -> StageRange { let from = cursor.first()?.unwrap().0; let to = StageCheckpoint::new(cursor.last()?.unwrap().0); - stage_range = Some(( - ExecInput { - target: Some(to.block_number), - checkpoint: Some(StageCheckpoint::new(from)), - }, - UnwindInput { unwind_to: from, checkpoint: to, bad_block: None }, - )); + stage_range = Some(from..=to.block_number); Ok::<(), DbError>(()) }) .unwrap() @@ -55,19 +51,21 @@ fn find_stage_range(db: &Path) -> StageRange { stage_range.expect("Could not find the stage range from the external DB.") } -fn generate_testdata_db(num_blocks: u64) -> (PathBuf, StageRange) { +fn generate_testdata_db(num_blocks: u64) -> (TestStageDB, RangeInclusive) { let opts = SeedOpts { blocks: 0..=num_blocks, accounts: 0..100_000, txs: 100..150 }; let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("account-hashing-bench"); + let exists = path.exists(); + let db = TestStageDB::new(&path); - if !path.exists() { + if !exists { // create the dirs fs::create_dir_all(&path).unwrap(); println!("Account Hashing testdata not found, generating to {:?}", path.display()); - let db = TestStageDB::new(&path); let provider = db.factory.provider_rw().unwrap(); - let _accounts = AccountHashingStage::seed(&provider, opts); + let _accounts = AccountHashingStage::seed(&provider, opts.clone()); provider.commit().expect("failed to commit"); } - (path, (ExecInput { target: Some(num_blocks), ..Default::default() }, UnwindInput::default())) + + (db, opts.blocks) } diff --git a/crates/stages/benches/setup/mod.rs b/crates/stages/benches/setup/mod.rs index f6322cc50d3f..d0aa9e94fcc5 100644 --- a/crates/stages/benches/setup/mod.rs +++ b/crates/stages/benches/setup/mod.rs @@ -21,11 +21,7 @@ use reth_stages::{ ExecInput, Stage, UnwindInput, }; use reth_trie::StateRoot; -use std::{ - collections::BTreeMap, - path::{Path, PathBuf}, - sync::Arc, -}; +use std::{collections::BTreeMap, path::Path, sync::Arc}; mod constants; @@ -84,8 +80,7 @@ pub(crate) fn unwind_hashes>>>( // Helper for generating testdata for the benchmarks. // Returns the path to the database file. -pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { - let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("txs-bench"); +pub(crate) fn txs_testdata(num_blocks: u64) -> TestStageDB { let txs_range = 100..150; // number of storage changes per transition @@ -101,11 +96,14 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { // rng let mut rng = generators::rng(); - if !path.exists() { + let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("txs-bench"); + let exists = path.exists(); + let db = TestStageDB::new(&path); + + if !exists { // create the dirs fs::create_dir_all(&path).unwrap(); println!("Transactions testdata not found, generating to {:?}", path.display()); - let db = TestStageDB::new(&path); let accounts: BTreeMap = concat([ random_eoa_account_range(&mut rng, 0..n_eoa), @@ -177,5 +175,5 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { .unwrap(); } - path + db } diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index a758b9b6bc09..b187763ef9aa 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -83,10 +83,12 @@ impl Stage for SenderRecoveryStage { let mut senders_cursor = tx.cursor_write::()?; // Acquire the cursor over the transactions - let mut tx_cursor = tx.cursor_read::>()?; + let mut tx_cursor = tx.cursor_read::()?; + // let mut tx_cursor = tx.cursor_read::>()?; // Walk the transactions from start to end index (inclusive) - let raw_tx_range = RawKey::new(tx_range.start)..RawKey::new(tx_range.end); - let tx_walker = tx_cursor.walk_range(raw_tx_range)?; + // let raw_tx_range = RawKey::new(tx_range.start)..RawKey::new(tx_range.end); + let tx_walker = tx_cursor.walk_range(tx_range.clone())?; + // let tx_walker = tx_cursor.walk_range(raw_tx_range)?; // Iterate over transactions in chunks info!(target: "sync::stages::sender_recovery", ?tx_range, "Recovering senders"); @@ -188,14 +190,16 @@ impl Stage for SenderRecoveryStage { } fn recover_sender( - entry: Result<(RawKey, RawValue), DatabaseError>, + // entry: Result<(RawKey, RawValue), DatabaseError>, + entry: Result<(TxNumber, TransactionSignedNoHash), DatabaseError>, rlp_buf: &mut Vec, ) -> Result<(u64, Address), Box> { let (tx_id, transaction) = entry.map_err(|e| Box::new(SenderRecoveryStageError::StageError(e.into())))?; - let tx_id = tx_id.key().expect("key to be formated"); + // let tx_id = tx_id.key().expect("key to be formated"); - let tx = transaction.value().expect("value to be formated"); + // let tx = transaction.value().expect("value to be formated"); + let tx = transaction; tx.transaction.encode_without_signature(rlp_buf); // We call [Signature::recover_signer_unchecked] because transactions run in the pipeline are From 3ea1def56a7c956f229f7b094457096c8a161baa Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 16 Jan 2024 13:46:02 +0000 Subject: [PATCH 02/12] query snapshots too --- crates/stages/src/stages/sender_recovery.rs | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index b187763ef9aa..90a858d4d41c 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -15,6 +15,7 @@ use reth_primitives::{ }; use reth_provider::{ BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError, PruneCheckpointReader, + TransactionsProvider, }; use std::{fmt::Debug, sync::mpsc}; use thiserror::Error; @@ -82,13 +83,8 @@ impl Stage for SenderRecoveryStage { // Acquire the cursor for inserting elements let mut senders_cursor = tx.cursor_write::()?; - // Acquire the cursor over the transactions - let mut tx_cursor = tx.cursor_read::()?; - // let mut tx_cursor = tx.cursor_read::>()?; - // Walk the transactions from start to end index (inclusive) - // let raw_tx_range = RawKey::new(tx_range.start)..RawKey::new(tx_range.end); - let tx_walker = tx_cursor.walk_range(tx_range.clone())?; - // let tx_walker = tx_cursor.walk_range(raw_tx_range)?; + // Query the transactions from both database and static files + let transactions = provider.transactions_by_tx_range(tx_range.clone())?; // Iterate over transactions in chunks info!(target: "sync::stages::sender_recovery", ?tx_range, "Recovering senders"); @@ -108,7 +104,7 @@ impl Stage for SenderRecoveryStage { // to gain anything from using more than 1 thread let chunk_size = chunk_size.max(16); - for chunk in &tx_walker.chunks(chunk_size) { + for chunk in &tx_range.zip(transactions).chunks(chunk_size) { // An _unordered_ channel to receive results from a rayon job let (recovered_senders_tx, recovered_senders_rx) = mpsc::channel(); channels.push(recovered_senders_rx); @@ -190,16 +186,9 @@ impl Stage for SenderRecoveryStage { } fn recover_sender( - // entry: Result<(RawKey, RawValue), DatabaseError>, - entry: Result<(TxNumber, TransactionSignedNoHash), DatabaseError>, + (tx_id, tx): (TxNumber, TransactionSignedNoHash), rlp_buf: &mut Vec, ) -> Result<(u64, Address), Box> { - let (tx_id, transaction) = - entry.map_err(|e| Box::new(SenderRecoveryStageError::StageError(e.into())))?; - // let tx_id = tx_id.key().expect("key to be formated"); - - // let tx = transaction.value().expect("value to be formated"); - let tx = transaction; tx.transaction.encode_without_signature(rlp_buf); // We call [Signature::recover_signer_unchecked] because transactions run in the pipeline are From e6875cb06ae045d1204d97f23636d6c59fccc886 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 16 Jan 2024 13:46:51 +0000 Subject: [PATCH 03/12] return bench targets back --- crates/stages/benches/criterion.rs | 2 +- crates/stages/src/stages/sender_recovery.rs | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index e3666292da4c..1e2458e50e73 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -20,7 +20,7 @@ use setup::StageRange; criterion_group! { name = benches; config = Criterion::default().with_profiler(PProfProfiler::new(1000, Output::Flamegraph(None))); - targets = senders + targets = transaction_lookup, account_hashing, senders, total_difficulty, merkle } criterion_main!(benches); diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 90a858d4d41c..3c2edc93ac41 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -1,11 +1,10 @@ use crate::{BlockErrorKind, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use itertools::Itertools; use reth_db::{ - cursor::{DbCursorRO, DbCursorRW}, + cursor::DbCursorRW, database::Database, tables, transaction::{DbTx, DbTxMut}, - DatabaseError, RawKey, RawTable, RawValue, }; use reth_interfaces::consensus; use reth_primitives::{ From 0ca5eb76202f94e17d718cf210342ff6770b2ee2 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 16 Jan 2024 15:48:47 +0000 Subject: [PATCH 04/12] fix test --- crates/stages/Cargo.toml | 2 +- crates/stages/src/stages/sender_recovery.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index d3ddca5be3ae..9226d0df8e71 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -81,7 +81,7 @@ criterion = { workspace = true, features = ["async_futures"] } serde_json.workspace = true [features] -test-utils = ["reth-interfaces/test-utils", "reth-db/test-utils"] +test-utils = ["reth-interfaces/test-utils", "reth-db/test-utils", "reth-provider/test-utils"] [[bench]] name = "criterion" diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 3c2edc93ac41..5d8539823cfd 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -241,6 +241,7 @@ struct FailedSenderRecoveryError { #[cfg(test)] mod tests { use assert_matches::assert_matches; + use reth_db::cursor::DbCursorRO; use reth_interfaces::test_utils::{ generators, generators::{random_block, random_block_range}, From c2c03ca42c1293b18d8903779183b36d12c51332 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 18 Jan 2024 12:23:17 +0000 Subject: [PATCH 05/12] count entries from both db and snapshot --- crates/stages/src/stages/bodies.rs | 16 +++++++------ crates/stages/src/stages/execution.rs | 4 ++-- crates/stages/src/stages/hashing_account.rs | 10 ++++---- crates/stages/src/stages/hashing_storage.rs | 10 ++++---- crates/stages/src/stages/merkle.rs | 11 +++++---- crates/stages/src/stages/sender_recovery.rs | 6 ++--- crates/stages/src/stages/total_difficulty.rs | 14 ++++++----- crates/stages/src/stages/tx_lookup.rs | 6 ++--- .../src/providers/database/provider.rs | 16 ++++++++++++- .../src/providers/snapshot/manager.rs | 24 +++++++++++++++++-- crates/storage/provider/src/traits/mod.rs | 3 +++ crates/storage/provider/src/traits/stats.rs | 10 ++++++++ 12 files changed, 91 insertions(+), 39 deletions(-) create mode 100644 crates/storage/provider/src/traits/stats.rs diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index 56001595cf76..71acd62ae24a 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -5,12 +5,14 @@ use reth_db::{ database::Database, models::{StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals}, tables, - transaction::{DbTx, DbTxMut}, - DatabaseError, + transaction::DbTxMut, +}; +use reth_interfaces::{ + p2p::bodies::{downloader::BodyDownloader, response::BlockResponse}, + provider::ProviderResult, }; -use reth_interfaces::p2p::bodies::{downloader::BodyDownloader, response::BlockResponse}; use reth_primitives::stage::{EntitiesCheckpoint, StageCheckpoint, StageId}; -use reth_provider::DatabaseProviderRW; +use reth_provider::{DatabaseProviderRW, StatsReader}; use std::task::{ready, Context, Poll}; use tracing::*; @@ -246,10 +248,10 @@ impl Stage for BodyStage { // progress in gas as a proxy to size. Execution stage uses a similar approach. fn stage_checkpoint( provider: &DatabaseProviderRW, -) -> Result { +) -> ProviderResult { Ok(EntitiesCheckpoint { - processed: provider.tx_ref().entries::()? as u64, - total: provider.tx_ref().entries::()? as u64, + processed: provider.count_entries::()? as u64, + total: provider.count_entries::()? as u64, }) } diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 5d7134ba0688..23e537abb2f3 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -19,7 +19,7 @@ use reth_primitives::{ }; use reth_provider::{ BlockReader, DatabaseProviderRW, ExecutorFactory, HeaderProvider, LatestStateProviderRef, - OriginalValuesKnown, ProviderError, TransactionVariant, + OriginalValuesKnown, ProviderError, StatsReader, TransactionVariant, }; use std::{ ops::RangeInclusive, @@ -234,7 +234,7 @@ impl ExecutionStage { // If we're not executing MerkleStage from scratch (by threshold or first-sync), then erase // changeset related pruning configurations if !(max_block - start_block > self.external_clean_threshold || - provider.tx_ref().entries::()?.is_zero()) + provider.count_entries::()?.is_zero()) { prune_modes.account_history = None; prune_modes.storage_history = None; diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index 323fd1c1b300..1627e893f6e2 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -8,7 +8,7 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, RawKey, RawTable, }; -use reth_interfaces::db::DatabaseError; +use reth_interfaces::provider::ProviderResult; use reth_primitives::{ keccak256, stage::{ @@ -16,7 +16,7 @@ use reth_primitives::{ StageId, }, }; -use reth_provider::{AccountExtReader, DatabaseProviderRW, HashingWriter}; +use reth_provider::{AccountExtReader, DatabaseProviderRW, HashingWriter, StatsReader}; use std::{ cmp::max, fmt::Debug, @@ -289,10 +289,10 @@ impl Stage for AccountHashingStage { fn stage_checkpoint_progress( provider: &DatabaseProviderRW, -) -> Result { +) -> ProviderResult { Ok(EntitiesCheckpoint { - processed: provider.tx_ref().entries::()? as u64, - total: provider.tx_ref().entries::()? as u64, + processed: provider.count_entries::()? as u64, + total: provider.count_entries::()? as u64, }) } diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index d508846a43c4..28f684f819a1 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -7,7 +7,7 @@ use reth_db::{ tables, transaction::{DbTx, DbTxMut}, }; -use reth_interfaces::db::DatabaseError; +use reth_interfaces::provider::ProviderResult; use reth_primitives::{ keccak256, stage::{ @@ -16,7 +16,7 @@ use reth_primitives::{ }, StorageEntry, }; -use reth_provider::{DatabaseProviderRW, HashingWriter, StorageReader}; +use reth_provider::{DatabaseProviderRW, HashingWriter, StatsReader, StorageReader}; use std::{collections::BTreeMap, fmt::Debug}; use tracing::*; @@ -214,10 +214,10 @@ impl Stage for StorageHashingStage { fn stage_checkpoint_progress( provider: &DatabaseProviderRW, -) -> Result { +) -> ProviderResult { Ok(EntitiesCheckpoint { - processed: provider.tx_ref().entries::()? as u64, - total: provider.tx_ref().entries::()? as u64, + processed: provider.count_entries::()? as u64, + total: provider.count_entries::()? as u64, }) } diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 7b74ca47b8fb..3ccf11b92d7a 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -13,7 +13,8 @@ use reth_primitives::{ BlockNumber, GotExpected, SealedHeader, B256, }; use reth_provider::{ - DatabaseProviderRW, HeaderProvider, ProviderError, StageCheckpointReader, StageCheckpointWriter, + DatabaseProviderRW, HeaderProvider, ProviderError, StageCheckpointReader, + StageCheckpointWriter, StatsReader, }; use reth_trie::{IntermediateStateRootState, StateRoot, StateRootProgress}; use std::fmt::Debug; @@ -187,8 +188,8 @@ impl Stage for MerkleStage { } .unwrap_or(EntitiesCheckpoint { processed: 0, - total: (provider.tx_ref().entries::()? + - provider.tx_ref().entries::()?) + total: (provider.count_entries::()? + + provider.count_entries::()?) as u64, }); @@ -233,8 +234,8 @@ impl Stage for MerkleStage { .map_err(|e| StageError::Fatal(Box::new(e)))?; updates.flush(provider.tx_ref())?; - let total_hashed_entries = (provider.tx_ref().entries::()? + - provider.tx_ref().entries::()?) + let total_hashed_entries = (provider.count_entries::()? + + provider.count_entries::()?) as u64; let entities_checkpoint = EntitiesCheckpoint { diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 5d8539823cfd..f848b73a4ff3 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -14,7 +14,7 @@ use reth_primitives::{ }; use reth_provider::{ BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError, PruneCheckpointReader, - TransactionsProvider, + StatsReader, TransactionsProvider, }; use std::{fmt::Debug, sync::mpsc}; use thiserror::Error; @@ -214,8 +214,8 @@ fn stage_checkpoint( // If `TxSenders` table was pruned, we will have a number of entries in it not matching // the actual number of processed transactions. To fix that, we add the number of pruned // `TxSenders` entries. - processed: provider.tx_ref().entries::()? as u64 + pruned_entries, - total: provider.tx_ref().entries::()? as u64, + processed: provider.count_entries::()? as u64 + pruned_entries, + total: provider.count_entries::()? as u64, }) } diff --git a/crates/stages/src/stages/total_difficulty.rs b/crates/stages/src/stages/total_difficulty.rs index d523cf4ce850..96f00e6c2712 100644 --- a/crates/stages/src/stages/total_difficulty.rs +++ b/crates/stages/src/stages/total_difficulty.rs @@ -4,14 +4,16 @@ use reth_db::{ database::Database, tables, transaction::{DbTx, DbTxMut}, - DatabaseError, }; -use reth_interfaces::{consensus::Consensus, provider::ProviderError}; +use reth_interfaces::{ + consensus::Consensus, + provider::{ProviderError, ProviderResult}, +}; use reth_primitives::{ stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, U256, }; -use reth_provider::DatabaseProviderRW; +use reth_provider::{DatabaseProviderRW, StatsReader}; use std::sync::Arc; use tracing::*; @@ -116,10 +118,10 @@ impl Stage for TotalDifficultyStage { fn stage_checkpoint( provider: &DatabaseProviderRW, -) -> Result { +) -> ProviderResult { Ok(EntitiesCheckpoint { - processed: provider.tx_ref().entries::()? as u64, - total: provider.tx_ref().entries::()? as u64, + processed: provider.count_entries::()? as u64, + total: provider.count_entries::()? as u64, }) } diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index a741bed28582..cac0210c9b33 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -12,7 +12,7 @@ use reth_primitives::{ PruneCheckpoint, PruneMode, PruneSegment, }; use reth_provider::{ - BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter, + BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter, StatsReader, TransactionsProviderExt, }; use tracing::*; @@ -176,8 +176,8 @@ fn stage_checkpoint( // If `TxHashNumber` table was pruned, we will have a number of entries in it not matching // the actual number of processed transactions. To fix that, we add the number of pruned // `TxHashNumber` entries. - processed: provider.tx_ref().entries::()? as u64 + pruned_entries, - total: provider.tx_ref().entries::()? as u64, + processed: provider.count_entries::()? as u64 + pruned_entries, + total: provider.count_entries::()? as u64, }) } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 1180a755cce2..bd16c8079996 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -8,7 +8,7 @@ use crate::{ AccountReader, BlockExecutionWriter, BlockHashReader, BlockNumReader, BlockReader, BlockWriter, Chain, EvmEnvProvider, HashingWriter, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, HeaderSyncMode, HistoryWriter, OriginalValuesKnown, ProviderError, PruneCheckpointReader, - PruneCheckpointWriter, StageCheckpointReader, StorageReader, TransactionVariant, + PruneCheckpointWriter, StageCheckpointReader, StatsReader, StorageReader, TransactionVariant, TransactionsProvider, TransactionsProviderExt, WithdrawalsProvider, }; use ahash::{AHashMap, AHashSet}; @@ -2521,6 +2521,20 @@ impl PruneCheckpointWriter for DatabaseProvider { } } +impl StatsReader for DatabaseProvider { + fn count_entries(&self) -> ProviderResult { + let db_entries = self.tx.entries::()?; + let snapshot_entries = + match self.snapshot_provider.as_ref().map(|provider| provider.count_entries::()) { + Some(Ok(entries)) => entries, + Some(Err(ProviderError::UnsupportedProvider)) | None => 0, + Some(Err(err)) => return Err(err), + }; + + Ok(db_entries + snapshot_entries) + } +} + fn range_size_hint(range: &impl RangeBounds) -> Option { let start = match range.start_bound().cloned() { Bound::Included(start) => start, diff --git a/crates/storage/provider/src/providers/snapshot/manager.rs b/crates/storage/provider/src/providers/snapshot/manager.rs index 5ed22ecf28c4..c825f029e635 100644 --- a/crates/storage/provider/src/providers/snapshot/manager.rs +++ b/crates/storage/provider/src/providers/snapshot/manager.rs @@ -1,8 +1,8 @@ use super::{LoadedJar, SnapshotJarProvider}; use crate::{ to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, HeaderProvider, - ReceiptProvider, TransactionVariant, TransactionsProvider, TransactionsProviderExt, - WithdrawalsProvider, + ReceiptProvider, StatsReader, TransactionVariant, TransactionsProvider, + TransactionsProviderExt, WithdrawalsProvider, }; use dashmap::DashMap; use parking_lot::RwLock; @@ -10,6 +10,8 @@ use reth_db::{ codecs::CompactU256, models::StoredBlockBodyIndices, snapshot::{iter_snapshots, HeaderMask, ReceiptMask, SnapshotCursor, TransactionMask}, + table::Table, + tables, }; use reth_interfaces::provider::{ProviderError, ProviderResult}; use reth_nippy_jar::NippyJar; @@ -684,3 +686,21 @@ impl WithdrawalsProvider for SnapshotProvider { Err(ProviderError::UnsupportedProvider) } } + +impl StatsReader for SnapshotProvider { + fn count_entries(&self) -> ProviderResult { + match T::NAME { + tables::CanonicalHeaders::NAME | tables::Headers::NAME | tables::HeaderTD::NAME => { + Ok(self.get_highest_snapshot_block(SnapshotSegment::Headers).unwrap_or_default() + as usize) + } + tables::Receipts::NAME => Ok(self + .get_highest_snapshot_tx(SnapshotSegment::Receipts) + .unwrap_or_default() as usize), + tables::Transactions::NAME => Ok(self + .get_highest_snapshot_tx(SnapshotSegment::Transactions) + .unwrap_or_default() as usize), + _ => Err(ProviderError::UnsupportedProvider), + } + } +} diff --git a/crates/storage/provider/src/traits/mod.rs b/crates/storage/provider/src/traits/mod.rs index d6c023f82835..c1546bc93071 100644 --- a/crates/storage/provider/src/traits/mod.rs +++ b/crates/storage/provider/src/traits/mod.rs @@ -68,3 +68,6 @@ pub use history::HistoryWriter; mod prune_checkpoint; pub use prune_checkpoint::{PruneCheckpointReader, PruneCheckpointWriter}; + +mod stats; +pub use stats::StatsReader; \ No newline at end of file diff --git a/crates/storage/provider/src/traits/stats.rs b/crates/storage/provider/src/traits/stats.rs new file mode 100644 index 000000000000..dece75e287ba --- /dev/null +++ b/crates/storage/provider/src/traits/stats.rs @@ -0,0 +1,10 @@ +use reth_db::table::Table; +use reth_interfaces::provider::ProviderResult; + +/// The trait for fetching provider statistics. +#[auto_impl::auto_impl(&, Arc)] +pub trait StatsReader: Send + Sync { + /// Fetch the number of entries in the corresponding [Table]. Depending on the provider, it may + /// route to different data sources other than [Table]. + fn count_entries(&self) -> ProviderResult; +} From 057f9dc22a2dd3777532c104c1951cb04ce66e1e Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 18 Jan 2024 12:39:45 +0000 Subject: [PATCH 06/12] cargo fmt --- crates/storage/provider/src/traits/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/storage/provider/src/traits/mod.rs b/crates/storage/provider/src/traits/mod.rs index c1546bc93071..7fa3878099d6 100644 --- a/crates/storage/provider/src/traits/mod.rs +++ b/crates/storage/provider/src/traits/mod.rs @@ -70,4 +70,4 @@ mod prune_checkpoint; pub use prune_checkpoint::{PruneCheckpointReader, PruneCheckpointWriter}; mod stats; -pub use stats::StatsReader; \ No newline at end of file +pub use stats::StatsReader; From dd8740c0b742114b766b39b32f0a9c9ed2d576dc Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 25 Jan 2024 15:40:50 +0000 Subject: [PATCH 07/12] raw values --- crates/stages/src/stages/sender_recovery.rs | 7 +++-- crates/storage/db/src/tables/raw.rs | 5 ++++ .../provider/src/providers/database/mod.rs | 9 ++++++- .../src/providers/database/provider.rs | 21 ++++++++++++++- crates/storage/provider/src/providers/mod.rs | 9 ++++++- .../provider/src/providers/snapshot/jar.rs | 22 +++++++++++++++- .../src/providers/snapshot/manager.rs | 24 ++++++++++++++--- .../storage/provider/src/test_utils/mock.rs | 26 ++++++++++++++++++- .../storage/provider/src/test_utils/noop.rs | 12 ++++++++- .../provider/src/traits/transactions.rs | 7 +++++ 10 files changed, 131 insertions(+), 11 deletions(-) diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index f848b73a4ff3..259959d8cc5b 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -5,6 +5,7 @@ use reth_db::{ database::Database, tables, transaction::{DbTx, DbTxMut}, + RawValue, }; use reth_interfaces::consensus; use reth_primitives::{ @@ -83,7 +84,7 @@ impl Stage for SenderRecoveryStage { let mut senders_cursor = tx.cursor_write::()?; // Query the transactions from both database and static files - let transactions = provider.transactions_by_tx_range(tx_range.clone())?; + let transactions = provider.raw_transactions_by_tx_range(tx_range.clone())?; // Iterate over transactions in chunks info!(target: "sync::stages::sender_recovery", ?tx_range, "Recovering senders"); @@ -185,9 +186,11 @@ impl Stage for SenderRecoveryStage { } fn recover_sender( - (tx_id, tx): (TxNumber, TransactionSignedNoHash), + (tx_id, tx): (TxNumber, RawValue), rlp_buf: &mut Vec, ) -> Result<(u64, Address), Box> { + let tx = tx.value().expect("value to be formated"); + tx.transaction.encode_without_signature(rlp_buf); // We call [Signature::recover_signer_unchecked] because transactions run in the pipeline are diff --git a/crates/storage/db/src/tables/raw.rs b/crates/storage/db/src/tables/raw.rs index 46dffa1db58f..dbc42daf040b 100644 --- a/crates/storage/db/src/tables/raw.rs +++ b/crates/storage/db/src/tables/raw.rs @@ -114,6 +114,11 @@ impl RawValue { Self { value: V::compress(value).into(), _phantom: std::marker::PhantomData } } + /// Create new raw value from raw compressed value + pub fn new_raw(value: impl AsRef<[u8]>) -> Self { + Self { value: value.as_ref().to_vec(), _phantom: std::marker::PhantomData } + } + /// Returns the decompressed value. pub fn value(&self) -> Result { V::decompress(&self.value) diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index fbe9b0ebdfc0..86355b1047da 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -9,7 +9,7 @@ use crate::{ PruneCheckpointReader, StageCheckpointReader, StateProviderBox, TransactionVariant, TransactionsProvider, WithdrawalsProvider, }; -use reth_db::{database::Database, init_db, models::StoredBlockBodyIndices, DatabaseEnv}; +use reth_db::{database::Database, init_db, models::StoredBlockBodyIndices, DatabaseEnv, RawValue}; use reth_interfaces::{provider::ProviderResult, RethError, RethResult}; use reth_primitives::{ snapshot::HighestSnapshots, @@ -379,6 +379,13 @@ impl TransactionsProvider for ProviderFactory { self.provider()?.transactions_by_tx_range(range) } + fn raw_transactions_by_tx_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult>> { + self.provider()?.raw_transactions_by_tx_range(range) + } + fn senders_by_tx_range( &self, range: impl RangeBounds, diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 219b266dcb57..172fb774f94d 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -24,7 +24,7 @@ use reth_db::{ table::{Table, TableRow}, tables, transaction::{DbTx, DbTxMut}, - BlockNumberList, DatabaseError, + BlockNumberList, DatabaseError, RawKey, RawTable, RawValue, }; use reth_interfaces::{ p2p::headers::downloader::SyncTarget, @@ -1640,6 +1640,25 @@ impl TransactionsProvider for DatabaseProvider { ) } + fn raw_transactions_by_tx_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult>> { + self.get_range_with_snapshot( + SnapshotSegment::Transactions, + to_range(range), + |snapshot, range, _| snapshot.raw_transactions_by_tx_range(range), + |range, _| { + self.tx + .cursor_read::>()? + .walk_range(RawKey::new(range.start)..RawKey::new(range.end))? + .map(|entry| Ok(entry?.1)) + .try_collect() + }, + |_| true, + ) + } + fn senders_by_tx_range( &self, range: impl RangeBounds, diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 2d0aac23defa..e8f463557ce3 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -6,7 +6,7 @@ use crate::{ ReceiptProvider, ReceiptProviderIdExt, StageCheckpointReader, StateProviderBox, StateProviderFactory, TransactionVariant, TransactionsProvider, WithdrawalsProvider, }; -use reth_db::{database::Database, models::StoredBlockBodyIndices}; +use reth_db::{database::Database, models::StoredBlockBodyIndices, RawValue}; use reth_interfaces::{ blockchain_tree::{BlockchainTreeEngine, BlockchainTreeViewer}, consensus::ForkchoiceState, @@ -355,6 +355,13 @@ where self.database.provider()?.transactions_by_tx_range(range) } + fn raw_transactions_by_tx_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult>> { + self.database.provider()?.raw_transactions_by_tx_range(range) + } + fn senders_by_tx_range( &self, range: impl RangeBounds, diff --git a/crates/storage/provider/src/providers/snapshot/jar.rs b/crates/storage/provider/src/providers/snapshot/jar.rs index ee1519c9f2b8..a357c59abdd8 100644 --- a/crates/storage/provider/src/providers/snapshot/jar.rs +++ b/crates/storage/provider/src/providers/snapshot/jar.rs @@ -5,7 +5,8 @@ use crate::{ }; use reth_db::{ codecs::CompactU256, - snapshot::{HeaderMask, ReceiptMask, SnapshotCursor, TransactionMask}, + snapshot::{ColumnSelectorOne, HeaderMask, ReceiptMask, SnapshotCursor, TransactionMask}, + RawValue, }; use reth_interfaces::provider::{ProviderError, ProviderResult}; use reth_primitives::{ @@ -265,6 +266,25 @@ impl<'a> TransactionsProvider for SnapshotJarProvider<'a> { .get_one::>(num.into())? .and_then(|tx| tx.recover_signer())) } + + fn raw_transactions_by_tx_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult>> { + let range = to_range(range); + let mut cursor = self.cursor()?; + let mut txes = Vec::with_capacity((range.end - range.start) as usize); + + for num in range { + if let Some(tx) = + cursor.get(num.into(), >::MASK)? + { + txes.push(RawValue::new_raw(tx[0])); + } + } + + Ok(txes) + } } impl<'a> ReceiptProvider for SnapshotJarProvider<'a> { diff --git a/crates/storage/provider/src/providers/snapshot/manager.rs b/crates/storage/provider/src/providers/snapshot/manager.rs index c825f029e635..aa562f536777 100644 --- a/crates/storage/provider/src/providers/snapshot/manager.rs +++ b/crates/storage/provider/src/providers/snapshot/manager.rs @@ -9,9 +9,11 @@ use parking_lot::RwLock; use reth_db::{ codecs::CompactU256, models::StoredBlockBodyIndices, - snapshot::{iter_snapshots, HeaderMask, ReceiptMask, SnapshotCursor, TransactionMask}, + snapshot::{ + iter_snapshots, ColumnSelectorOne, HeaderMask, ReceiptMask, SnapshotCursor, TransactionMask, + }, table::Table, - tables, + tables, RawValue, }; use reth_interfaces::provider::{ProviderError, ProviderResult}; use reth_nippy_jar::NippyJar; @@ -576,7 +578,7 @@ impl TransactionsProvider for SnapshotProvider { fn transactions_by_tx_range( &self, range: impl RangeBounds, - ) -> ProviderResult> { + ) -> ProviderResult> { self.fetch_range( SnapshotSegment::Transactions, to_range(range), @@ -587,6 +589,22 @@ impl TransactionsProvider for SnapshotProvider { ) } + fn raw_transactions_by_tx_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult>> { + self.fetch_range( + SnapshotSegment::Transactions, + to_range(range), + |cursor, number| { + cursor.get(number.into(), >::MASK).map( + |result| result.map(|row| RawValue::::new_raw(row[0])), + ) + }, + |_| true, + ) + } + fn transaction_sender(&self, id: TxNumber) -> ProviderResult> { Ok(self.transaction_by_id_no_hash(id)?.and_then(|tx| tx.recover_signer())) } diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index 0c862a1dacaa..68ec4e72b3d3 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -7,7 +7,10 @@ use crate::{ TransactionVariant, TransactionsProvider, WithdrawalsProvider, }; use parking_lot::Mutex; -use reth_db::models::{AccountBeforeTx, StoredBlockBodyIndices}; +use reth_db::{ + models::{AccountBeforeTx, StoredBlockBodyIndices}, + RawValue, +}; use reth_interfaces::provider::{ProviderError, ProviderResult}; use reth_primitives::{ keccak256, trie::AccountProof, Account, Address, Block, BlockHash, BlockHashOrNumber, BlockId, @@ -306,6 +309,27 @@ impl TransactionsProvider for MockEthProvider { Ok(transactions) } + fn raw_transactions_by_tx_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult>> { + let lock = self.blocks.lock(); + let transactions = lock + .values() + .flat_map(|block| &block.body) + .enumerate() + .filter_map(|(tx_number, tx)| { + if range.contains(&(tx_number as TxNumber)) { + Some(RawValue::new(tx.clone().into())) + } else { + None + } + }) + .collect(); + + Ok(transactions) + } + fn senders_by_tx_range( &self, range: impl RangeBounds, diff --git a/crates/storage/provider/src/test_utils/noop.rs b/crates/storage/provider/src/test_utils/noop.rs index 4036ca99f460..b8cf86cb7d29 100644 --- a/crates/storage/provider/src/test_utils/noop.rs +++ b/crates/storage/provider/src/test_utils/noop.rs @@ -7,7 +7,10 @@ use crate::{ StateProviderFactory, StateRootProvider, TransactionVariant, TransactionsProvider, WithdrawalsProvider, }; -use reth_db::models::{AccountBeforeTx, StoredBlockBodyIndices}; +use reth_db::{ + models::{AccountBeforeTx, StoredBlockBodyIndices}, + RawValue, +}; use reth_interfaces::provider::ProviderResult; use reth_primitives::{ stage::{StageCheckpoint, StageId}, @@ -208,6 +211,13 @@ impl TransactionsProvider for NoopProvider { fn transaction_sender(&self, _id: TxNumber) -> ProviderResult> { Ok(None) } + + fn raw_transactions_by_tx_range( + &self, + _range: impl RangeBounds, + ) -> ProviderResult>> { + Ok(Vec::default()) + } } impl ReceiptProvider for NoopProvider { diff --git a/crates/storage/provider/src/traits/transactions.rs b/crates/storage/provider/src/traits/transactions.rs index 9041593b552e..935b82638c4a 100644 --- a/crates/storage/provider/src/traits/transactions.rs +++ b/crates/storage/provider/src/traits/transactions.rs @@ -5,6 +5,7 @@ use reth_primitives::{ TransactionSignedNoHash, TxHash, TxNumber, }; use std::ops::{Range, RangeBounds, RangeInclusive}; +use reth_db::RawValue; /// Client trait for fetching [TransactionSigned] related data. #[auto_impl::auto_impl(&, Arc)] @@ -55,6 +56,12 @@ pub trait TransactionsProvider: BlockNumReader + Send + Sync { range: impl RangeBounds, ) -> ProviderResult>; + /// Get raw transactions by tx range. + fn raw_transactions_by_tx_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult>>; + /// Get Senders from a tx range. fn senders_by_tx_range( &self, From ed0c660f42a9a7bc1773eaa7f082915cce6986fa Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 25 Jan 2024 15:54:52 +0000 Subject: [PATCH 08/12] fmt --- crates/storage/provider/src/traits/transactions.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/storage/provider/src/traits/transactions.rs b/crates/storage/provider/src/traits/transactions.rs index 935b82638c4a..961a16baf15c 100644 --- a/crates/storage/provider/src/traits/transactions.rs +++ b/crates/storage/provider/src/traits/transactions.rs @@ -1,11 +1,11 @@ use crate::{BlockNumReader, BlockReader}; +use reth_db::RawValue; use reth_interfaces::provider::{ProviderError, ProviderResult}; use reth_primitives::{ Address, BlockHashOrNumber, BlockNumber, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, }; use std::ops::{Range, RangeBounds, RangeInclusive}; -use reth_db::RawValue; /// Client trait for fetching [TransactionSigned] related data. #[auto_impl::auto_impl(&, Arc)] From 98b90811c131507be7e3ccf0e8d6dd04aff61679 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 25 Jan 2024 16:00:33 +0000 Subject: [PATCH 09/12] allocate capacity --- .../provider/src/providers/database/provider.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 172fb774f94d..74be815111e7 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -144,7 +144,7 @@ impl DatabaseProvider { self.cursor_collect_with_capacity(cursor, range, capacity, |_, v| f(v)) } - fn cursor_collect_with_capacity, R>( + fn cursor_collect_with_capacity( &self, cursor: &mut impl DbCursorRO, range: impl RangeBounds, @@ -1649,11 +1649,12 @@ impl TransactionsProvider for DatabaseProvider { to_range(range), |snapshot, range, _| snapshot.raw_transactions_by_tx_range(range), |range, _| { - self.tx - .cursor_read::>()? - .walk_range(RawKey::new(range.start)..RawKey::new(range.end))? - .map(|entry| Ok(entry?.1)) - .try_collect() + self.cursor_collect_with_capacity( + &mut self.tx.cursor_read::>()?, + RawKey::new(range.start)..RawKey::new(range.end), + range_size_hint(&range).unwrap_or(0), + |_, v| Ok(v), + ) }, |_| true, ) From e9e217d6f3febee44f72fefd5d299be59336dc65 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 25 Jan 2024 16:18:54 +0000 Subject: [PATCH 10/12] comment querying from snapshots to prevent slowdown --- .../src/providers/database/provider.rs | 35 ++++++++++++------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 74be815111e7..fc2555254ab9 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -1644,20 +1644,29 @@ impl TransactionsProvider for DatabaseProvider { &self, range: impl RangeBounds, ) -> ProviderResult>> { - self.get_range_with_snapshot( - SnapshotSegment::Transactions, - to_range(range), - |snapshot, range, _| snapshot.raw_transactions_by_tx_range(range), - |range, _| { - self.cursor_collect_with_capacity( - &mut self.tx.cursor_read::>()?, - RawKey::new(range.start)..RawKey::new(range.end), - range_size_hint(&range).unwrap_or(0), - |_, v| Ok(v), - ) - }, - |_| true, + let range = to_range(range); + self.cursor_collect_with_capacity( + &mut self.tx.cursor_read::>()?, + RawKey::new(range.start)..RawKey::new(range.end), + range_size_hint(&range).unwrap_or(0), + |_, v| Ok(v), ) + + // TODO(alexey): uncomment to query from snapshots + // self.get_range_with_snapshot( + // SnapshotSegment::Transactions, + // to_range(range), + // |snapshot, range, _| snapshot.raw_transactions_by_tx_range(range), + // |range, _| { + // self.cursor_collect_with_capacity( + // &mut self.tx.cursor_read::>()?, + // RawKey::new(range.start)..RawKey::new(range.end), + // range_size_hint(&range).unwrap_or(0), + // |_, v| Ok(v), + // ) + // }, + // |_| true, + // ) } fn senders_by_tx_range( From 019d1bf87da826e7fd9309d81404bcab62b6555b Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 25 Jan 2024 16:40:26 +0000 Subject: [PATCH 11/12] Revert "comment querying from snapshots to prevent slowdown" This reverts commit e9e217d6f3febee44f72fefd5d299be59336dc65. --- .../src/providers/database/provider.rs | 35 +++++++------------ 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index fc2555254ab9..74be815111e7 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -1644,29 +1644,20 @@ impl TransactionsProvider for DatabaseProvider { &self, range: impl RangeBounds, ) -> ProviderResult>> { - let range = to_range(range); - self.cursor_collect_with_capacity( - &mut self.tx.cursor_read::>()?, - RawKey::new(range.start)..RawKey::new(range.end), - range_size_hint(&range).unwrap_or(0), - |_, v| Ok(v), + self.get_range_with_snapshot( + SnapshotSegment::Transactions, + to_range(range), + |snapshot, range, _| snapshot.raw_transactions_by_tx_range(range), + |range, _| { + self.cursor_collect_with_capacity( + &mut self.tx.cursor_read::>()?, + RawKey::new(range.start)..RawKey::new(range.end), + range_size_hint(&range).unwrap_or(0), + |_, v| Ok(v), + ) + }, + |_| true, ) - - // TODO(alexey): uncomment to query from snapshots - // self.get_range_with_snapshot( - // SnapshotSegment::Transactions, - // to_range(range), - // |snapshot, range, _| snapshot.raw_transactions_by_tx_range(range), - // |range, _| { - // self.cursor_collect_with_capacity( - // &mut self.tx.cursor_read::>()?, - // RawKey::new(range.start)..RawKey::new(range.end), - // range_size_hint(&range).unwrap_or(0), - // |_, v| Ok(v), - // ) - // }, - // |_| true, - // ) } fn senders_by_tx_range( From 8e8a05941b20d0097f8d2f7d200eeffc0f45399d Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 31 Jan 2024 13:20:21 +0000 Subject: [PATCH 12/12] pass tx.value() error upstream --- crates/stages/src/stages/sender_recovery.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 259959d8cc5b..67b0fec5f648 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -7,7 +7,7 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, RawValue, }; -use reth_interfaces::consensus; +use reth_interfaces::{consensus, RethError}; use reth_primitives::{ keccak256, stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, @@ -189,7 +189,12 @@ fn recover_sender( (tx_id, tx): (TxNumber, RawValue), rlp_buf: &mut Vec, ) -> Result<(u64, Address), Box> { - let tx = tx.value().expect("value to be formated"); + let tx = tx + .value() + .map_err(RethError::from) + .map_err(StageError::from) + .map_err(Into::into) + .map_err(Box::new)?; tx.transaction.encode_without_signature(rlp_buf);