Skip to content

Commit

Permalink
trie: revamp trie updates
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrasiuk committed Jul 2, 2024
1 parent 116d7a3 commit b731860
Show file tree
Hide file tree
Showing 15 changed files with 295 additions and 568 deletions.
9 changes: 5 additions & 4 deletions bin/reth/src/commands/debug_cmd/in_memory_merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use reth_provider::{
use reth_revm::database::StateProviderDatabase;
use reth_stages::StageId;
use reth_tasks::TaskExecutor;
use reth_trie::{updates::TrieKey, StateRoot};
use reth_trie::StateRoot;
use std::{path::PathBuf, sync::Arc};
use tracing::*;

Expand Down Expand Up @@ -188,15 +188,16 @@ impl Command {
// Compare updates
let mut in_mem_mismatched = Vec::new();
let mut incremental_mismatched = Vec::new();
let mut in_mem_updates_iter = in_memory_updates.into_iter().peekable();
let mut incremental_updates_iter = incremental_trie_updates.into_iter().peekable();
let mut in_mem_updates_iter = in_memory_updates.account_nodes_ref().into_iter().peekable();
let mut incremental_updates_iter =
incremental_trie_updates.account_nodes_ref().into_iter().peekable();

while in_mem_updates_iter.peek().is_some() || incremental_updates_iter.peek().is_some() {
match (in_mem_updates_iter.next(), incremental_updates_iter.next()) {
(Some(in_mem), Some(incr)) => {
similar_asserts::assert_eq!(in_mem.0, incr.0, "Nibbles don't match");
if in_mem.1 != incr.1 &&
matches!(in_mem.0, TrieKey::AccountNode(ref nibbles) if nibbles.len() > self.skip_node_depth.unwrap_or_default())
in_mem.0.len() > self.skip_node_depth.unwrap_or_default()
{
in_mem_mismatched.push(in_mem);
incremental_mismatched.push(incr);
Expand Down
2 changes: 1 addition & 1 deletion crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl<DB: Database> Persistence<DB> {
let trie_updates = block.trie_updates().clone();
let hashed_state = block.hashed_state();
HashedStateChanges(hashed_state.clone()).write_to_db(provider_rw.tx_ref())?;
trie_updates.flush(provider_rw.tx_ref())?;
trie_updates.write_to_database(provider_rw.tx_ref())?;
}

// update history indices
Expand Down
6 changes: 5 additions & 1 deletion crates/stages/stages/benches/setup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,11 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> TestStageDB {
let offset = transitions.len() as u64;

db.insert_changesets(transitions, None).unwrap();
db.commit(|tx| Ok(updates.flush(tx)?)).unwrap();
db.commit(|tx| {
updates.write_to_database(tx)?;
Ok(())
})
.unwrap();

let (transitions, final_state) = random_changeset_range(
&mut rng,
Expand Down
8 changes: 4 additions & 4 deletions crates/stages/stages/src/stages/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
})?;
match progress {
StateRootProgress::Progress(state, hashed_entries_walked, updates) => {
updates.flush(tx)?;
updates.write_to_database(tx)?;

let checkpoint = MerkleCheckpoint::new(
to_block,
Expand All @@ -237,7 +237,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
})
}
StateRootProgress::Complete(root, hashed_entries_walked, updates) => {
updates.flush(tx)?;
updates.write_to_database(tx)?;

entities_checkpoint.processed += hashed_entries_walked as u64;

Expand All @@ -252,7 +252,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "Incremental state root failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
StageError::Fatal(Box::new(e))
})?;
updates.flush(provider.tx_ref())?;
updates.write_to_database(provider.tx_ref())?;

let total_hashed_entries = (provider.count_entries::<tables::HashedAccounts>()? +
provider.count_entries::<tables::HashedStorages>()?)
Expand Down Expand Up @@ -325,7 +325,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
validate_state_root(block_root, target.seal_slow(), input.unwind_to)?;

// Validation passed, apply unwind changes to the database.
updates.flush(provider.tx_ref())?;
updates.write_to_database(provider.tx_ref())?;

// TODO(alexey): update entities checkpoint
} else {
Expand Down
17 changes: 6 additions & 11 deletions crates/storage/db-common/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,19 +464,17 @@ fn compute_state_root<DB: Database>(provider: &DatabaseProviderRW<DB>) -> eyre::
.root_with_progress()?
{
StateRootProgress::Progress(state, _, updates) => {
let updates_len = updates.len();
let updated_len = updates.write_to_database(tx)?;
total_flushed_updates += updated_len;

trace!(target: "reth::cli",
last_account_key = %state.last_account_key,
updates_len,
updated_len,
total_flushed_updates,
"Flushing trie updates"
);

intermediate_state = Some(*state);
updates.flush(tx)?;

total_flushed_updates += updates_len;

if total_flushed_updates % SOFT_LIMIT_COUNT_FLUSHED_UPDATES == 0 {
info!(target: "reth::cli",
Expand All @@ -486,15 +484,12 @@ fn compute_state_root<DB: Database>(provider: &DatabaseProviderRW<DB>) -> eyre::
}
}
StateRootProgress::Complete(root, _, updates) => {
let updates_len = updates.len();

updates.flush(tx)?;

total_flushed_updates += updates_len;
let updated_len = updates.write_to_database(tx)?;
total_flushed_updates += updated_len;

trace!(target: "reth::cli",
%root,
updates_len = updates_len,
updated_len,
total_flushed_updates,
"State root has been computed"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ mod tests {
}

let (_, updates) = StateRoot::from_tx(tx).root_with_updates().unwrap();
updates.flush(tx).unwrap();
updates.write_to_database(tx).unwrap();
})
.unwrap();

Expand Down
6 changes: 3 additions & 3 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2396,7 +2396,7 @@ impl<TX: DbTxMut + DbTx> HashingWriter for DatabaseProvider<TX> {
block_hash: end_block_hash,
})))
}
trie_updates.flush(&self.tx)?;
trie_updates.write_to_database(&self.tx)?;
}
durations_recorder.record_relative(metrics::Action::InsertMerkleTree);

Expand Down Expand Up @@ -2592,7 +2592,7 @@ impl<TX: DbTxMut + DbTx> BlockExecutionWriter for DatabaseProvider<TX> {
block_hash: parent_hash,
})))
}
trie_updates.flush(&self.tx)?;
trie_updates.write_to_database(&self.tx)?;
}

// get blocks
Expand Down Expand Up @@ -2793,7 +2793,7 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
// insert hashes and intermediate merkle nodes
{
HashedStateChanges(hashed_state).write_to_db(&self.tx)?;
trie_updates.flush(&self.tx)?;
trie_updates.write_to_database(&self.tx)?;
}
durations_recorder.record_relative(metrics::Action::InsertHashes);

Expand Down
2 changes: 1 addition & 1 deletion crates/trie/parallel/benches/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub fn calculate_state_root(c: &mut Criterion) {
HashedStateChanges(db_state).write_to_db(provider_rw.tx_ref()).unwrap();
let (_, updates) =
StateRoot::from_tx(provider_rw.tx_ref()).root_with_updates().unwrap();
updates.flush(provider_rw.tx_ref()).unwrap();
updates.write_to_database(provider_rw.tx_ref()).unwrap();
provider_rw.commit().unwrap();
}

Expand Down
4 changes: 2 additions & 2 deletions crates/trie/parallel/src/async_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ where
};

if retain_updates {
trie_updates.extend(updates.into_iter());
trie_updates.insert_storage_updates(hashed_address, updates);
}

account_rlp.clear();
Expand All @@ -179,7 +179,7 @@ where

let root = hash_builder.root();

trie_updates.finalize_state_updates(
trie_updates.finalize(
account_node_iter.walker,
hash_builder,
prefix_sets.destroyed_accounts,
Expand Down
4 changes: 2 additions & 2 deletions crates/trie/parallel/src/parallel_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ where
};

if retain_updates {
trie_updates.extend(updates.into_iter());
trie_updates.insert_storage_updates(hashed_address, updates);
}

account_rlp.clear();
Expand All @@ -161,7 +161,7 @@ where

let root = hash_builder.root();

trie_updates.finalize_state_updates(
trie_updates.finalize(
account_node_iter.walker,
hash_builder,
prefix_sets.destroyed_accounts,
Expand Down
2 changes: 1 addition & 1 deletion crates/trie/trie/src/proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ mod tests {
let (root, updates) = StateRoot::from_tx(provider.tx_ref())
.root_with_updates()
.map_err(Into::<reth_db::DatabaseError>::into)?;
updates.flush(provider.tx_mut())?;
updates.write_to_database(provider.tx_mut())?;

provider.commit()?;

Expand Down
Loading

0 comments on commit b731860

Please sign in to comment.