diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index 79b630972196..3a7b31e982c1 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -84,7 +84,7 @@ criterion = { workspace = true, features = ["async_futures"] } serde_json.workspace = true [features] -test-utils = ["reth-interfaces/test-utils", "reth-db/test-utils"] +test-utils = ["reth-interfaces/test-utils", "reth-db/test-utils", "reth-provider/test-utils"] [[bench]] name = "criterion" diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index 34b2f9cb6f16..eb668ab74f9b 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -6,13 +6,13 @@ use criterion::{ use pprof::criterion::{Output, PProfProfiler}; use reth_db::{test_utils::TempDatabase, DatabaseEnv}; -use reth_primitives::stage::StageCheckpoint; +use reth_primitives::{stage::StageCheckpoint, BlockNumber}; use reth_stages::{ stages::{MerkleStage, SenderRecoveryStage, TransactionLookupStage}, test_utils::TestStageDB, ExecInput, Stage, StageExt, UnwindInput, }; -use std::{path::PathBuf, sync::Arc}; +use std::{ops::RangeInclusive, sync::Arc}; mod setup; use setup::StageRange; @@ -33,16 +33,9 @@ fn account_hashing(c: &mut Criterion) { group.sample_size(10); let num_blocks = 10_000; - let (path, stage, execution_range) = setup::prepare_account_hashing(num_blocks); + let (db, stage, range) = setup::prepare_account_hashing(num_blocks); - measure_stage_with_path( - path, - &mut group, - setup::stage_unwind, - stage, - execution_range, - "AccountHashing".to_string(), - ); + measure_stage(&mut group, &db, setup::stage_unwind, stage, range, "AccountHashing".to_string()); } fn senders(c: &mut Criterion) { @@ -50,11 +43,13 @@ fn senders(c: &mut Criterion) { // don't need to run each stage for that many times group.sample_size(10); + let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS); + for batch in [1000usize, 10_000, 100_000, 250_000] { let stage = SenderRecoveryStage { commit_threshold: DEFAULT_NUM_BLOCKS }; let label = format!("SendersRecovery-batch-{batch}"); - measure_stage(&mut group, setup::stage_unwind, stage, 0..DEFAULT_NUM_BLOCKS, label); + measure_stage(&mut group, &db, setup::stage_unwind, stage, 0..=DEFAULT_NUM_BLOCKS, label); } } @@ -64,11 +59,14 @@ fn transaction_lookup(c: &mut Criterion) { group.sample_size(10); let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, None); + let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS); + measure_stage( &mut group, + &db, setup::stage_unwind, stage, - 0..DEFAULT_NUM_BLOCKS, + 0..=DEFAULT_NUM_BLOCKS, "TransactionLookup".to_string(), ); } @@ -78,44 +76,58 @@ fn merkle(c: &mut Criterion) { // don't need to run each stage for that many times group.sample_size(10); + let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS); + let stage = MerkleStage::Both { clean_threshold: u64::MAX }; measure_stage( &mut group, + &db, setup::unwind_hashes, stage, - 1..DEFAULT_NUM_BLOCKS, + 1..=DEFAULT_NUM_BLOCKS, "Merkle-incremental".to_string(), ); let stage = MerkleStage::Both { clean_threshold: 0 }; measure_stage( &mut group, + &db, setup::unwind_hashes, stage, - 1..DEFAULT_NUM_BLOCKS, + 1..=DEFAULT_NUM_BLOCKS, "Merkle-fullhash".to_string(), ); } -fn measure_stage_with_path( - path: PathBuf, +fn measure_stage( group: &mut BenchmarkGroup<'_, WallTime>, + db: &TestStageDB, setup: F, stage: S, - stage_range: StageRange, + block_interval: RangeInclusive, label: String, ) where S: Clone + Stage>>, F: Fn(S, &TestStageDB, StageRange), { - let db = TestStageDB::new(&path); + let stage_range = ( + ExecInput { + target: Some(*block_interval.end()), + checkpoint: Some(StageCheckpoint::new(*block_interval.start())), + }, + UnwindInput { + checkpoint: StageCheckpoint::new(*block_interval.end()), + unwind_to: *block_interval.start(), + bad_block: None, + }, + ); let (input, _) = stage_range; group.bench_function(label, move |b| { b.to_async(FuturesExecutor).iter_with_setup( || { // criterion setup does not support async, so we have to use our own runtime - setup(stage.clone(), &db, stage_range) + setup(stage.clone(), db, stage_range) }, |_| async { let mut stage = stage.clone(); @@ -130,35 +142,3 @@ fn measure_stage_with_path( ) }); } - -fn measure_stage( - group: &mut BenchmarkGroup<'_, WallTime>, - setup: F, - stage: S, - block_interval: std::ops::Range, - label: String, -) where - S: Clone + Stage>>, - F: Fn(S, &TestStageDB, StageRange), -{ - let path = setup::txs_testdata(block_interval.end); - - measure_stage_with_path( - path, - group, - setup, - stage, - ( - ExecInput { - target: Some(block_interval.end), - checkpoint: Some(StageCheckpoint::new(block_interval.start)), - }, - UnwindInput { - checkpoint: StageCheckpoint::new(block_interval.end), - unwind_to: block_interval.start, - bad_block: None, - }, - ), - label, - ); -} diff --git a/crates/stages/benches/setup/account_hashing.rs b/crates/stages/benches/setup/account_hashing.rs index 497dce2787f4..ba532d3660e7 100644 --- a/crates/stages/benches/setup/account_hashing.rs +++ b/crates/stages/benches/setup/account_hashing.rs @@ -1,15 +1,15 @@ #![allow(unreachable_pub)] -use super::{constants, StageRange}; + +use super::constants; use reth_db::{ cursor::DbCursorRO, database::Database, tables, transaction::DbTx, DatabaseError as DbError, }; -use reth_primitives::{fs, stage::StageCheckpoint}; +use reth_primitives::{fs, stage::StageCheckpoint, BlockNumber}; use reth_stages::{ stages::{AccountHashingStage, SeedOpts}, test_utils::TestStageDB, - ExecInput, UnwindInput, }; -use std::path::{Path, PathBuf}; +use std::{ops::RangeInclusive, path::Path}; /// Prepares a database for [`AccountHashingStage`] /// If the environment variable [`constants::ACCOUNT_HASHING_DB`] is set, it will use that one and @@ -17,20 +17,22 @@ use std::path::{Path, PathBuf}; /// generate its own random data. /// /// Returns the path to the database file, stage and range of stage execution if it exists. -pub fn prepare_account_hashing(num_blocks: u64) -> (PathBuf, AccountHashingStage, StageRange) { - let (path, stage_range) = match std::env::var(constants::ACCOUNT_HASHING_DB) { +pub fn prepare_account_hashing( + num_blocks: u64, +) -> (TestStageDB, AccountHashingStage, RangeInclusive) { + let (db, stage_range) = match std::env::var(constants::ACCOUNT_HASHING_DB) { Ok(db) => { let path = Path::new(&db).to_path_buf(); let range = find_stage_range(&path); - (path, range) + (TestStageDB::new(&path), range) } Err(_) => generate_testdata_db(num_blocks), }; - (path, AccountHashingStage::default(), stage_range) + (db, AccountHashingStage::default(), stage_range) } -fn find_stage_range(db: &Path) -> StageRange { +fn find_stage_range(db: &Path) -> RangeInclusive { let mut stage_range = None; TestStageDB::new(db) .factory @@ -40,13 +42,7 @@ fn find_stage_range(db: &Path) -> StageRange { let from = cursor.first()?.unwrap().0; let to = StageCheckpoint::new(cursor.last()?.unwrap().0); - stage_range = Some(( - ExecInput { - target: Some(to.block_number), - checkpoint: Some(StageCheckpoint::new(from)), - }, - UnwindInput { unwind_to: from, checkpoint: to, bad_block: None }, - )); + stage_range = Some(from..=to.block_number); Ok::<(), DbError>(()) }) .unwrap() @@ -55,19 +51,21 @@ fn find_stage_range(db: &Path) -> StageRange { stage_range.expect("Could not find the stage range from the external DB.") } -fn generate_testdata_db(num_blocks: u64) -> (PathBuf, StageRange) { +fn generate_testdata_db(num_blocks: u64) -> (TestStageDB, RangeInclusive) { let opts = SeedOpts { blocks: 0..=num_blocks, accounts: 0..100_000, txs: 100..150 }; let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("account-hashing-bench"); + let exists = path.exists(); + let db = TestStageDB::new(&path); - if !path.exists() { + if !exists { // create the dirs fs::create_dir_all(&path).unwrap(); println!("Account Hashing testdata not found, generating to {:?}", path.display()); - let db = TestStageDB::new(&path); let provider = db.factory.provider_rw().unwrap(); - let _accounts = AccountHashingStage::seed(&provider, opts); + let _accounts = AccountHashingStage::seed(&provider, opts.clone()); provider.commit().expect("failed to commit"); } - (path, (ExecInput { target: Some(num_blocks), ..Default::default() }, UnwindInput::default())) + + (db, opts.blocks) } diff --git a/crates/stages/benches/setup/mod.rs b/crates/stages/benches/setup/mod.rs index f6322cc50d3f..d0aa9e94fcc5 100644 --- a/crates/stages/benches/setup/mod.rs +++ b/crates/stages/benches/setup/mod.rs @@ -21,11 +21,7 @@ use reth_stages::{ ExecInput, Stage, UnwindInput, }; use reth_trie::StateRoot; -use std::{ - collections::BTreeMap, - path::{Path, PathBuf}, - sync::Arc, -}; +use std::{collections::BTreeMap, path::Path, sync::Arc}; mod constants; @@ -84,8 +80,7 @@ pub(crate) fn unwind_hashes>>>( // Helper for generating testdata for the benchmarks. // Returns the path to the database file. -pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { - let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("txs-bench"); +pub(crate) fn txs_testdata(num_blocks: u64) -> TestStageDB { let txs_range = 100..150; // number of storage changes per transition @@ -101,11 +96,14 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { // rng let mut rng = generators::rng(); - if !path.exists() { + let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("txs-bench"); + let exists = path.exists(); + let db = TestStageDB::new(&path); + + if !exists { // create the dirs fs::create_dir_all(&path).unwrap(); println!("Transactions testdata not found, generating to {:?}", path.display()); - let db = TestStageDB::new(&path); let accounts: BTreeMap = concat([ random_eoa_account_range(&mut rng, 0..n_eoa), @@ -177,5 +175,5 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { .unwrap(); } - path + db } diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index 5a0ac77c35e6..f784ac830322 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -6,14 +6,16 @@ use reth_db::{ models::{StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals}, tables, transaction::{DbTx, DbTxMut}, - DatabaseError, }; -use reth_interfaces::p2p::bodies::{downloader::BodyDownloader, response::BlockResponse}; +use reth_interfaces::{ + p2p::bodies::{downloader::BodyDownloader, response::BlockResponse}, + provider::ProviderResult, +}; use reth_primitives::{ stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, SnapshotSegment, }; -use reth_provider::{providers::SnapshotWriter, DatabaseProviderRW}; +use reth_provider::{providers::SnapshotWriter, DatabaseProviderRW, StatsReader}; use std::{ cmp::Ordering, task::{ready, Context, Poll}, @@ -342,10 +344,10 @@ impl Stage for BodyStage { // progress in gas as a proxy to size. Execution stage uses a similar approach. fn stage_checkpoint( provider: &DatabaseProviderRW, -) -> Result { +) -> ProviderResult { Ok(EntitiesCheckpoint { - processed: provider.tx_ref().entries::()? as u64, - total: provider.tx_ref().entries::()? as u64, + processed: provider.count_entries::()? as u64, + total: provider.count_entries::()? as u64, }) } diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 1d5c6260e49b..383782218258 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -20,7 +20,7 @@ use reth_primitives::{ use reth_provider::{ providers::{SnapshotProviderRWRefMut, SnapshotWriter}, BlockReader, DatabaseProviderRW, ExecutorFactory, HeaderProvider, LatestStateProviderRef, - OriginalValuesKnown, ProviderError, TransactionVariant, + OriginalValuesKnown, ProviderError, StatsReader, TransactionVariant, }; use std::{ cmp::Ordering, @@ -245,7 +245,7 @@ impl ExecutionStage { // If we're not executing MerkleStage from scratch (by threshold or first-sync), then erase // changeset related pruning configurations if !(max_block - start_block > self.external_clean_threshold || - provider.tx_ref().entries::()?.is_zero()) + provider.count_entries::()?.is_zero()) { prune_modes.account_history = None; prune_modes.storage_history = None; diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index 323fd1c1b300..1627e893f6e2 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -8,7 +8,7 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, RawKey, RawTable, }; -use reth_interfaces::db::DatabaseError; +use reth_interfaces::provider::ProviderResult; use reth_primitives::{ keccak256, stage::{ @@ -16,7 +16,7 @@ use reth_primitives::{ StageId, }, }; -use reth_provider::{AccountExtReader, DatabaseProviderRW, HashingWriter}; +use reth_provider::{AccountExtReader, DatabaseProviderRW, HashingWriter, StatsReader}; use std::{ cmp::max, fmt::Debug, @@ -289,10 +289,10 @@ impl Stage for AccountHashingStage { fn stage_checkpoint_progress( provider: &DatabaseProviderRW, -) -> Result { +) -> ProviderResult { Ok(EntitiesCheckpoint { - processed: provider.tx_ref().entries::()? as u64, - total: provider.tx_ref().entries::()? as u64, + processed: provider.count_entries::()? as u64, + total: provider.count_entries::()? as u64, }) } diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index d508846a43c4..28f684f819a1 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -7,7 +7,7 @@ use reth_db::{ tables, transaction::{DbTx, DbTxMut}, }; -use reth_interfaces::db::DatabaseError; +use reth_interfaces::provider::ProviderResult; use reth_primitives::{ keccak256, stage::{ @@ -16,7 +16,7 @@ use reth_primitives::{ }, StorageEntry, }; -use reth_provider::{DatabaseProviderRW, HashingWriter, StorageReader}; +use reth_provider::{DatabaseProviderRW, HashingWriter, StatsReader, StorageReader}; use std::{collections::BTreeMap, fmt::Debug}; use tracing::*; @@ -214,10 +214,10 @@ impl Stage for StorageHashingStage { fn stage_checkpoint_progress( provider: &DatabaseProviderRW, -) -> Result { +) -> ProviderResult { Ok(EntitiesCheckpoint { - processed: provider.tx_ref().entries::()? as u64, - total: provider.tx_ref().entries::()? as u64, + processed: provider.count_entries::()? as u64, + total: provider.count_entries::()? as u64, }) } diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 7b74ca47b8fb..3ccf11b92d7a 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -13,7 +13,8 @@ use reth_primitives::{ BlockNumber, GotExpected, SealedHeader, B256, }; use reth_provider::{ - DatabaseProviderRW, HeaderProvider, ProviderError, StageCheckpointReader, StageCheckpointWriter, + DatabaseProviderRW, HeaderProvider, ProviderError, StageCheckpointReader, + StageCheckpointWriter, StatsReader, }; use reth_trie::{IntermediateStateRootState, StateRoot, StateRootProgress}; use std::fmt::Debug; @@ -187,8 +188,8 @@ impl Stage for MerkleStage { } .unwrap_or(EntitiesCheckpoint { processed: 0, - total: (provider.tx_ref().entries::()? + - provider.tx_ref().entries::()?) + total: (provider.count_entries::()? + + provider.count_entries::()?) as u64, }); @@ -233,8 +234,8 @@ impl Stage for MerkleStage { .map_err(|e| StageError::Fatal(Box::new(e)))?; updates.flush(provider.tx_ref())?; - let total_hashed_entries = (provider.tx_ref().entries::()? + - provider.tx_ref().entries::()?) + let total_hashed_entries = (provider.count_entries::()? + + provider.count_entries::()?) as u64; let entities_checkpoint = EntitiesCheckpoint { diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index a758b9b6bc09..67b0fec5f648 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -1,13 +1,13 @@ use crate::{BlockErrorKind, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use itertools::Itertools; use reth_db::{ - cursor::{DbCursorRO, DbCursorRW}, + cursor::DbCursorRW, database::Database, tables, transaction::{DbTx, DbTxMut}, - DatabaseError, RawKey, RawTable, RawValue, + RawValue, }; -use reth_interfaces::consensus; +use reth_interfaces::{consensus, RethError}; use reth_primitives::{ keccak256, stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, @@ -15,6 +15,7 @@ use reth_primitives::{ }; use reth_provider::{ BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError, PruneCheckpointReader, + StatsReader, TransactionsProvider, }; use std::{fmt::Debug, sync::mpsc}; use thiserror::Error; @@ -82,11 +83,8 @@ impl Stage for SenderRecoveryStage { // Acquire the cursor for inserting elements let mut senders_cursor = tx.cursor_write::()?; - // Acquire the cursor over the transactions - let mut tx_cursor = tx.cursor_read::>()?; - // Walk the transactions from start to end index (inclusive) - let raw_tx_range = RawKey::new(tx_range.start)..RawKey::new(tx_range.end); - let tx_walker = tx_cursor.walk_range(raw_tx_range)?; + // Query the transactions from both database and static files + let transactions = provider.raw_transactions_by_tx_range(tx_range.clone())?; // Iterate over transactions in chunks info!(target: "sync::stages::sender_recovery", ?tx_range, "Recovering senders"); @@ -106,7 +104,7 @@ impl Stage for SenderRecoveryStage { // to gain anything from using more than 1 thread let chunk_size = chunk_size.max(16); - for chunk in &tx_walker.chunks(chunk_size) { + for chunk in &tx_range.zip(transactions).chunks(chunk_size) { // An _unordered_ channel to receive results from a rayon job let (recovered_senders_tx, recovered_senders_rx) = mpsc::channel(); channels.push(recovered_senders_rx); @@ -188,14 +186,16 @@ impl Stage for SenderRecoveryStage { } fn recover_sender( - entry: Result<(RawKey, RawValue), DatabaseError>, + (tx_id, tx): (TxNumber, RawValue), rlp_buf: &mut Vec, ) -> Result<(u64, Address), Box> { - let (tx_id, transaction) = - entry.map_err(|e| Box::new(SenderRecoveryStageError::StageError(e.into())))?; - let tx_id = tx_id.key().expect("key to be formated"); + let tx = tx + .value() + .map_err(RethError::from) + .map_err(StageError::from) + .map_err(Into::into) + .map_err(Box::new)?; - let tx = transaction.value().expect("value to be formated"); tx.transaction.encode_without_signature(rlp_buf); // We call [Signature::recover_signer_unchecked] because transactions run in the pipeline are @@ -222,8 +222,8 @@ fn stage_checkpoint( // If `TxSenders` table was pruned, we will have a number of entries in it not matching // the actual number of processed transactions. To fix that, we add the number of pruned // `TxSenders` entries. - processed: provider.tx_ref().entries::()? as u64 + pruned_entries, - total: provider.tx_ref().entries::()? as u64, + processed: provider.count_entries::()? as u64 + pruned_entries, + total: provider.count_entries::()? as u64, }) } @@ -249,6 +249,7 @@ struct FailedSenderRecoveryError { #[cfg(test)] mod tests { use assert_matches::assert_matches; + use reth_db::cursor::DbCursorRO; use reth_interfaces::test_utils::{ generators, generators::{random_block, random_block_range}, diff --git a/crates/stages/src/stages/total_difficulty.rs b/crates/stages/src/stages/total_difficulty.rs new file mode 100644 index 000000000000..96f00e6c2712 --- /dev/null +++ b/crates/stages/src/stages/total_difficulty.rs @@ -0,0 +1,315 @@ +use crate::{BlockErrorKind, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; +use reth_db::{ + cursor::{DbCursorRO, DbCursorRW}, + database::Database, + tables, + transaction::{DbTx, DbTxMut}, +}; +use reth_interfaces::{ + consensus::Consensus, + provider::{ProviderError, ProviderResult}, +}; +use reth_primitives::{ + stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, + U256, +}; +use reth_provider::{DatabaseProviderRW, StatsReader}; +use std::sync::Arc; +use tracing::*; + +/// The total difficulty stage. +/// +/// This stage walks over inserted headers and computes total difficulty +/// at each block. The entries are inserted into [`HeaderTD`][reth_db::tables::HeaderTD] +/// table. +#[derive(Debug, Clone)] +pub struct TotalDifficultyStage { + /// Consensus client implementation + consensus: Arc, + /// The number of table entries to commit at once + commit_threshold: u64, +} + +impl TotalDifficultyStage { + /// Create a new total difficulty stage + pub fn new(consensus: Arc) -> Self { + Self { consensus, commit_threshold: 100_000 } + } + + /// Set a commit threshold on total difficulty stage + pub fn with_commit_threshold(mut self, commit_threshold: u64) -> Self { + self.commit_threshold = commit_threshold; + self + } +} + +impl Stage for TotalDifficultyStage { + /// Return the id of the stage + fn id(&self) -> StageId { + StageId::TotalDifficulty + } + + /// Write total difficulty entries + fn execute( + &mut self, + provider: &DatabaseProviderRW, + input: ExecInput, + ) -> Result { + let tx = provider.tx_ref(); + if input.target_reached() { + return Ok(ExecOutput::done(input.checkpoint())) + } + + let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); + let (start_block, end_block) = range.clone().into_inner(); + + debug!(target: "sync::stages::total_difficulty", start_block, end_block, "Commencing sync"); + + // Acquire cursor over total difficulty and headers tables + let mut cursor_td = tx.cursor_write::()?; + let mut cursor_headers = tx.cursor_read::()?; + + // Get latest total difficulty + let last_header_number = input.checkpoint().block_number; + let last_entry = cursor_td + .seek_exact(last_header_number)? + .ok_or(ProviderError::TotalDifficultyNotFound(last_header_number))?; + + let mut td: U256 = last_entry.1.into(); + debug!(target: "sync::stages::total_difficulty", ?td, block_number = last_header_number, "Last total difficulty entry"); + + // Walk over newly inserted headers, update & insert td + for entry in cursor_headers.walk_range(range)? { + let (block_number, header) = entry?; + td += header.difficulty; + + self.consensus.validate_header_with_total_difficulty(&header, td).map_err(|error| { + StageError::Block { + block: Box::new(header.seal_slow()), + error: BlockErrorKind::Validation(error), + } + })?; + cursor_td.append(block_number, td.into())?; + } + + Ok(ExecOutput { + checkpoint: StageCheckpoint::new(end_block) + .with_entities_stage_checkpoint(stage_checkpoint(provider)?), + done: is_final_range, + }) + } + + /// Unwind the stage. + fn unwind( + &mut self, + provider: &DatabaseProviderRW, + input: UnwindInput, + ) -> Result { + let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold); + + provider.unwind_table_by_num::(unwind_to)?; + + Ok(UnwindOutput { + checkpoint: StageCheckpoint::new(unwind_to) + .with_entities_stage_checkpoint(stage_checkpoint(provider)?), + }) + } +} + +fn stage_checkpoint( + provider: &DatabaseProviderRW, +) -> ProviderResult { + Ok(EntitiesCheckpoint { + processed: provider.count_entries::()? as u64, + total: provider.count_entries::()? as u64, + }) +} + +#[cfg(test)] +mod tests { + use assert_matches::assert_matches; + use reth_db::transaction::DbTx; + use reth_interfaces::test_utils::{ + generators, + generators::{random_header, random_header_range}, + TestConsensus, + }; + use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedHeader}; + use reth_provider::HeaderProvider; + + use super::*; + use crate::test_utils::{ + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, + TestStageDB, UnwindStageTestRunner, + }; + + stage_test_suite_ext!(TotalDifficultyTestRunner, total_difficulty); + + #[tokio::test] + async fn execute_with_intermediate_commit() { + let threshold = 50; + let (stage_progress, previous_stage) = (1000, 1100); // input exceeds threshold + + let mut runner = TotalDifficultyTestRunner::default(); + runner.set_threshold(threshold); + + let first_input = ExecInput { + target: Some(previous_stage), + checkpoint: Some(StageCheckpoint::new(stage_progress)), + }; + + // Seed only once with full input range + runner.seed_execution(first_input).expect("failed to seed execution"); + + // Execute first time + let result = runner.execute(first_input).await.unwrap(); + let expected_progress = stage_progress + threshold; + assert_matches!( + result, + Ok(ExecOutput { checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed, + total + })) + }, done: false }) if block_number == expected_progress && processed == 1 + threshold && + total == runner.db.table::().unwrap().len() as u64 + ); + + // Execute second time + let second_input = ExecInput { + target: Some(previous_stage), + checkpoint: Some(StageCheckpoint::new(expected_progress)), + }; + let result = runner.execute(second_input).await.unwrap(); + assert_matches!( + result, + Ok(ExecOutput { checkpoint: StageCheckpoint { + block_number, + stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint { + processed, + total + })) + }, done: true }) if block_number == previous_stage && processed == total && + total == runner.db.table::().unwrap().len() as u64 + ); + + assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed"); + } + + struct TotalDifficultyTestRunner { + db: TestStageDB, + consensus: Arc, + commit_threshold: u64, + } + + impl Default for TotalDifficultyTestRunner { + fn default() -> Self { + Self { + db: Default::default(), + consensus: Arc::new(TestConsensus::default()), + commit_threshold: 500, + } + } + } + + impl StageTestRunner for TotalDifficultyTestRunner { + type S = TotalDifficultyStage; + + fn db(&self) -> &TestStageDB { + &self.db + } + + fn stage(&self) -> Self::S { + TotalDifficultyStage { + consensus: self.consensus.clone(), + commit_threshold: self.commit_threshold, + } + } + } + + #[async_trait::async_trait] + impl ExecuteStageTestRunner for TotalDifficultyTestRunner { + type Seed = Vec; + + fn seed_execution(&mut self, input: ExecInput) -> Result { + let mut rng = generators::rng(); + let start = input.checkpoint().block_number; + let head = random_header(&mut rng, start, None); + self.db.insert_headers(std::iter::once(&head))?; + self.db.commit(|tx| { + let td: U256 = tx + .cursor_read::()? + .last()? + .map(|(_, v)| v) + .unwrap_or_default() + .into(); + tx.put::(head.number, (td + head.difficulty).into())?; + Ok(()) + })?; + + // use previous progress as seed size + let end = input.target.unwrap_or_default() + 1; + + if start + 1 >= end { + return Ok(Vec::default()) + } + + let mut headers = random_header_range(&mut rng, start + 1..end, head.hash()); + self.db.insert_headers(headers.iter())?; + headers.insert(0, head); + Ok(headers) + } + + /// Validate stored headers + fn validate_execution( + &self, + input: ExecInput, + output: Option, + ) -> Result<(), TestRunnerError> { + let initial_stage_progress = input.checkpoint().block_number; + match output { + Some(output) if output.checkpoint.block_number > initial_stage_progress => { + let provider = self.db.factory.provider()?; + + let mut header_cursor = provider.tx_ref().cursor_read::()?; + let (_, mut current_header) = header_cursor + .seek_exact(initial_stage_progress)? + .expect("no initial header"); + let mut td: U256 = provider + .header_td_by_number(initial_stage_progress)? + .expect("no initial td"); + + while let Some((next_key, next_header)) = header_cursor.next()? { + assert_eq!(current_header.number + 1, next_header.number); + td += next_header.difficulty; + assert_eq!( + provider.header_td_by_number(next_key)?.map(Into::into), + Some(td) + ); + current_header = next_header; + } + } + _ => self.check_no_td_above(initial_stage_progress)?, + }; + Ok(()) + } + } + + impl UnwindStageTestRunner for TotalDifficultyTestRunner { + fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> { + self.check_no_td_above(input.unwind_to) + } + } + + impl TotalDifficultyTestRunner { + fn check_no_td_above(&self, block: BlockNumber) -> Result<(), TestRunnerError> { + self.db.ensure_no_entry_above::(block, |num| num)?; + Ok(()) + } + + fn set_threshold(&mut self, new_threshold: u64) { + self.commit_threshold = new_threshold; + } + } +} diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index a741bed28582..cac0210c9b33 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -12,7 +12,7 @@ use reth_primitives::{ PruneCheckpoint, PruneMode, PruneSegment, }; use reth_provider::{ - BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter, + BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter, StatsReader, TransactionsProviderExt, }; use tracing::*; @@ -176,8 +176,8 @@ fn stage_checkpoint( // If `TxHashNumber` table was pruned, we will have a number of entries in it not matching // the actual number of processed transactions. To fix that, we add the number of pruned // `TxHashNumber` entries. - processed: provider.tx_ref().entries::()? as u64 + pruned_entries, - total: provider.tx_ref().entries::()? as u64, + processed: provider.count_entries::()? as u64 + pruned_entries, + total: provider.count_entries::()? as u64, }) } diff --git a/crates/storage/db/src/tables/raw.rs b/crates/storage/db/src/tables/raw.rs index b1932f152c26..d4183dae02a0 100644 --- a/crates/storage/db/src/tables/raw.rs +++ b/crates/storage/db/src/tables/raw.rs @@ -120,7 +120,7 @@ impl RawValue { Self { value: V::compress(value).into(), _phantom: std::marker::PhantomData } } - /// Creates a raw value from an existing `Vec`. Useful when we already have the compressed + /// Creates a raw value from an existing `Vec`. Useful when we already have the encoded /// value. pub fn from_vec(vec: Vec) -> Self { Self { value: vec, _phantom: std::marker::PhantomData } diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 1588dc3b830b..3c1306c3f6e1 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -9,7 +9,7 @@ use crate::{ PruneCheckpointReader, StageCheckpointReader, StateProviderBox, TransactionVariant, TransactionsProvider, WithdrawalsProvider, }; -use reth_db::{database::Database, init_db, models::StoredBlockBodyIndices, DatabaseEnv}; +use reth_db::{database::Database, init_db, models::StoredBlockBodyIndices, DatabaseEnv, RawValue}; use reth_interfaces::{provider::ProviderResult, RethError, RethResult}; use reth_primitives::{ stage::{StageCheckpoint, StageId}, @@ -375,6 +375,13 @@ impl TransactionsProvider for ProviderFactory { self.provider()?.transactions_by_tx_range(range) } + fn raw_transactions_by_tx_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult>> { + self.provider()?.raw_transactions_by_tx_range(range) + } + fn senders_by_tx_range( &self, range: impl RangeBounds, diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index e70ea82c06d8..979e046eb41e 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -8,7 +8,7 @@ use crate::{ AccountReader, BlockExecutionWriter, BlockHashReader, BlockNumReader, BlockReader, BlockWriter, Chain, EvmEnvProvider, HashingWriter, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, HeaderSyncMode, HistoryWriter, OriginalValuesKnown, ProviderError, PruneCheckpointReader, - PruneCheckpointWriter, StageCheckpointReader, StorageReader, TransactionVariant, + PruneCheckpointWriter, StageCheckpointReader, StatsReader, StorageReader, TransactionVariant, TransactionsProvider, TransactionsProviderExt, WithdrawalsProvider, }; use ahash::{AHashMap, AHashSet}; @@ -24,7 +24,7 @@ use reth_db::{ table::{Table, TableRow}, tables, transaction::{DbTx, DbTxMut}, - BlockNumberList, DatabaseError, + BlockNumberList, DatabaseError, RawKey, RawTable, RawValue, }; use reth_interfaces::{ p2p::headers::downloader::SyncTarget, @@ -153,7 +153,7 @@ impl DatabaseProvider { self.cursor_collect_with_capacity(cursor, range, capacity, |_, v| f(v)) } - fn cursor_collect_with_capacity, R>( + fn cursor_collect_with_capacity( &self, cursor: &mut impl DbCursorRO, range: impl RangeBounds, @@ -1649,6 +1649,26 @@ impl TransactionsProvider for DatabaseProvider { ) } + fn raw_transactions_by_tx_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult>> { + self.get_range_with_snapshot( + SnapshotSegment::Transactions, + to_range(range), + |snapshot, range, _| snapshot.raw_transactions_by_tx_range(range), + |range, _| { + self.cursor_collect_with_capacity( + &mut self.tx.cursor_read::>()?, + RawKey::new(range.start)..RawKey::new(range.end), + range_size_hint(&range).unwrap_or(0), + |_, v| Ok(v), + ) + }, + |_| true, + ) + } + fn senders_by_tx_range( &self, range: impl RangeBounds, @@ -2541,6 +2561,20 @@ impl PruneCheckpointWriter for DatabaseProvider { } } +impl StatsReader for DatabaseProvider { + fn count_entries(&self) -> ProviderResult { + let db_entries = self.tx.entries::()?; + let snapshot_entries = + match self.snapshot_provider.as_ref().map(|provider| provider.count_entries::()) { + Some(Ok(entries)) => entries, + Some(Err(ProviderError::UnsupportedProvider)) | None => 0, + Some(Err(err)) => return Err(err), + }; + + Ok(db_entries + snapshot_entries) + } +} + fn range_size_hint(range: &impl RangeBounds) -> Option { let start = match range.start_bound().cloned() { Bound::Included(start) => start, diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 4e2ef3cb198d..4a1cb8afbe3d 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -6,7 +6,7 @@ use crate::{ ReceiptProvider, ReceiptProviderIdExt, StageCheckpointReader, StateProviderBox, StateProviderFactory, TransactionVariant, TransactionsProvider, WithdrawalsProvider, }; -use reth_db::{database::Database, models::StoredBlockBodyIndices}; +use reth_db::{database::Database, models::StoredBlockBodyIndices, RawValue}; use reth_interfaces::{ blockchain_tree::{BlockchainTreeEngine, BlockchainTreeViewer}, consensus::ForkchoiceState, @@ -358,6 +358,13 @@ where self.database.provider()?.transactions_by_tx_range(range) } + fn raw_transactions_by_tx_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult>> { + self.database.provider()?.raw_transactions_by_tx_range(range) + } + fn senders_by_tx_range( &self, range: impl RangeBounds, diff --git a/crates/storage/provider/src/providers/snapshot/jar.rs b/crates/storage/provider/src/providers/snapshot/jar.rs index ee1519c9f2b8..9f7aa30e7c87 100644 --- a/crates/storage/provider/src/providers/snapshot/jar.rs +++ b/crates/storage/provider/src/providers/snapshot/jar.rs @@ -5,7 +5,8 @@ use crate::{ }; use reth_db::{ codecs::CompactU256, - snapshot::{HeaderMask, ReceiptMask, SnapshotCursor, TransactionMask}, + snapshot::{ColumnSelectorOne, HeaderMask, ReceiptMask, SnapshotCursor, TransactionMask}, + RawValue, }; use reth_interfaces::provider::{ProviderError, ProviderResult}; use reth_primitives::{ @@ -265,6 +266,25 @@ impl<'a> TransactionsProvider for SnapshotJarProvider<'a> { .get_one::>(num.into())? .and_then(|tx| tx.recover_signer())) } + + fn raw_transactions_by_tx_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult>> { + let range = to_range(range); + let mut cursor = self.cursor()?; + let mut txes = Vec::with_capacity((range.end - range.start) as usize); + + for num in range { + if let Some(tx) = + cursor.get(num.into(), >::MASK)? + { + txes.push(RawValue::from_vec(tx[0].to_vec())); + } + } + + Ok(txes) + } } impl<'a> ReceiptProvider for SnapshotJarProvider<'a> { diff --git a/crates/storage/provider/src/providers/snapshot/manager.rs b/crates/storage/provider/src/providers/snapshot/manager.rs index c548d3dbcc11..99f976d1bbba 100644 --- a/crates/storage/provider/src/providers/snapshot/manager.rs +++ b/crates/storage/provider/src/providers/snapshot/manager.rs @@ -4,15 +4,19 @@ use super::{ }; use crate::{ to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, HeaderProvider, - ReceiptProvider, TransactionVariant, TransactionsProvider, TransactionsProviderExt, - WithdrawalsProvider, + ReceiptProvider, StatsReader, TransactionVariant, TransactionsProvider, + TransactionsProviderExt, WithdrawalsProvider, }; use dashmap::{mapref::entry::Entry as DashMapEntry, DashMap}; use parking_lot::RwLock; use reth_db::{ codecs::CompactU256, models::StoredBlockBodyIndices, - snapshot::{iter_snapshots, HeaderMask, ReceiptMask, SnapshotCursor, TransactionMask}, + snapshot::{ + iter_snapshots, ColumnSelectorOne, HeaderMask, ReceiptMask, SnapshotCursor, TransactionMask, + }, + table::Table, + tables, RawValue, }; use reth_interfaces::provider::{ProviderError, ProviderResult}; use reth_nippy_jar::NippyJar; @@ -681,7 +685,7 @@ impl TransactionsProvider for SnapshotProvider { fn transactions_by_tx_range( &self, range: impl RangeBounds, - ) -> ProviderResult> { + ) -> ProviderResult> { self.fetch_range( SnapshotSegment::Transactions, to_range(range), @@ -692,6 +696,26 @@ impl TransactionsProvider for SnapshotProvider { ) } + fn raw_transactions_by_tx_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult>> { + self.fetch_range( + SnapshotSegment::Transactions, + to_range(range), + |cursor, number| { + cursor.get(number.into(), >::MASK).map( + |result| { + result.map(|row| { + RawValue::::from_vec(row[0].to_vec()) + }) + }, + ) + }, + |_| true, + ) + } + fn transaction_sender(&self, id: TxNumber) -> ProviderResult> { Ok(self.transaction_by_id_no_hash(id)?.and_then(|tx| tx.recover_signer())) } @@ -791,3 +815,21 @@ impl WithdrawalsProvider for SnapshotProvider { Err(ProviderError::UnsupportedProvider) } } + +impl StatsReader for SnapshotProvider { + fn count_entries(&self) -> ProviderResult { + match T::NAME { + tables::CanonicalHeaders::NAME | tables::Headers::NAME | tables::HeaderTD::NAME => { + Ok(self.get_highest_snapshot_block(SnapshotSegment::Headers).unwrap_or_default() + as usize) + } + tables::Receipts::NAME => Ok(self + .get_highest_snapshot_tx(SnapshotSegment::Receipts) + .unwrap_or_default() as usize), + tables::Transactions::NAME => Ok(self + .get_highest_snapshot_tx(SnapshotSegment::Transactions) + .unwrap_or_default() as usize), + _ => Err(ProviderError::UnsupportedProvider), + } + } +} diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index 0c862a1dacaa..68ec4e72b3d3 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -7,7 +7,10 @@ use crate::{ TransactionVariant, TransactionsProvider, WithdrawalsProvider, }; use parking_lot::Mutex; -use reth_db::models::{AccountBeforeTx, StoredBlockBodyIndices}; +use reth_db::{ + models::{AccountBeforeTx, StoredBlockBodyIndices}, + RawValue, +}; use reth_interfaces::provider::{ProviderError, ProviderResult}; use reth_primitives::{ keccak256, trie::AccountProof, Account, Address, Block, BlockHash, BlockHashOrNumber, BlockId, @@ -306,6 +309,27 @@ impl TransactionsProvider for MockEthProvider { Ok(transactions) } + fn raw_transactions_by_tx_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult>> { + let lock = self.blocks.lock(); + let transactions = lock + .values() + .flat_map(|block| &block.body) + .enumerate() + .filter_map(|(tx_number, tx)| { + if range.contains(&(tx_number as TxNumber)) { + Some(RawValue::new(tx.clone().into())) + } else { + None + } + }) + .collect(); + + Ok(transactions) + } + fn senders_by_tx_range( &self, range: impl RangeBounds, diff --git a/crates/storage/provider/src/test_utils/noop.rs b/crates/storage/provider/src/test_utils/noop.rs index 4036ca99f460..b8cf86cb7d29 100644 --- a/crates/storage/provider/src/test_utils/noop.rs +++ b/crates/storage/provider/src/test_utils/noop.rs @@ -7,7 +7,10 @@ use crate::{ StateProviderFactory, StateRootProvider, TransactionVariant, TransactionsProvider, WithdrawalsProvider, }; -use reth_db::models::{AccountBeforeTx, StoredBlockBodyIndices}; +use reth_db::{ + models::{AccountBeforeTx, StoredBlockBodyIndices}, + RawValue, +}; use reth_interfaces::provider::ProviderResult; use reth_primitives::{ stage::{StageCheckpoint, StageId}, @@ -208,6 +211,13 @@ impl TransactionsProvider for NoopProvider { fn transaction_sender(&self, _id: TxNumber) -> ProviderResult> { Ok(None) } + + fn raw_transactions_by_tx_range( + &self, + _range: impl RangeBounds, + ) -> ProviderResult>> { + Ok(Vec::default()) + } } impl ReceiptProvider for NoopProvider { diff --git a/crates/storage/provider/src/traits/mod.rs b/crates/storage/provider/src/traits/mod.rs index 1260534784d7..360fe97c06aa 100644 --- a/crates/storage/provider/src/traits/mod.rs +++ b/crates/storage/provider/src/traits/mod.rs @@ -71,3 +71,6 @@ pub use prune_checkpoint::{PruneCheckpointReader, PruneCheckpointWriter}; mod database_provider; pub use database_provider::DatabaseProviderFactory; + +mod stats; +pub use stats::StatsReader; diff --git a/crates/storage/provider/src/traits/stats.rs b/crates/storage/provider/src/traits/stats.rs new file mode 100644 index 000000000000..dece75e287ba --- /dev/null +++ b/crates/storage/provider/src/traits/stats.rs @@ -0,0 +1,10 @@ +use reth_db::table::Table; +use reth_interfaces::provider::ProviderResult; + +/// The trait for fetching provider statistics. +#[auto_impl::auto_impl(&, Arc)] +pub trait StatsReader: Send + Sync { + /// Fetch the number of entries in the corresponding [Table]. Depending on the provider, it may + /// route to different data sources other than [Table]. + fn count_entries(&self) -> ProviderResult; +} diff --git a/crates/storage/provider/src/traits/transactions.rs b/crates/storage/provider/src/traits/transactions.rs index 9041593b552e..961a16baf15c 100644 --- a/crates/storage/provider/src/traits/transactions.rs +++ b/crates/storage/provider/src/traits/transactions.rs @@ -1,4 +1,5 @@ use crate::{BlockNumReader, BlockReader}; +use reth_db::RawValue; use reth_interfaces::provider::{ProviderError, ProviderResult}; use reth_primitives::{ Address, BlockHashOrNumber, BlockNumber, TransactionMeta, TransactionSigned, @@ -55,6 +56,12 @@ pub trait TransactionsProvider: BlockNumReader + Send + Sync { range: impl RangeBounds, ) -> ProviderResult>; + /// Get raw transactions by tx range. + fn raw_transactions_by_tx_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult>>; + /// Get Senders from a tx range. fn senders_by_tx_range( &self,