Skip to content

Commit

Permalink
all: remove database commit callback, rework noderesolver (#26637)
Browse files Browse the repository at this point in the history
This change ports some changes from the main PBSS PR:

  - get rid of callback function in `trie.Database.Commit` which is not required anymore
  - rework the `nodeResolver` in `trie.Iterator` to make it compatible with multiple state scheme
  - some other shallow changes in tests and typo-fixes
  • Loading branch information
rjl493456442 authored Feb 8, 2023
1 parent 8c18b48 commit 9842301
Show file tree
Hide file tree
Showing 23 changed files with 127 additions and 120 deletions.
4 changes: 3 additions & 1 deletion cmd/geth/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ block is used.
}
)

// Deprecation: this command should be deprecated once the hash-based
// scheme is deprecated.
func pruneState(ctx *cli.Context) error {
stack, config := makeConfigNode(ctx)
defer stack.Close()
Expand Down Expand Up @@ -433,7 +435,7 @@ func traverseRawState(ctx *cli.Context) error {
nodes += 1
node := storageIter.Hash()

// Check the present for non-empty hash node(embedded node doesn't
// Check the presence for non-empty hash node(embedded node doesn't
// have their own hash).
if node != (common.Hash{}) {
blob := rawdb.ReadLegacyTrieNode(chaindb, node)
Expand Down
8 changes: 4 additions & 4 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,14 +945,14 @@ func (bc *BlockChain) Stop() {
recent := bc.GetBlockByNumber(number - offset)

log.Info("Writing cached state to disk", "block", recent.Number(), "hash", recent.Hash(), "root", recent.Root())
if err := triedb.Commit(recent.Root(), true, nil); err != nil {
if err := triedb.Commit(recent.Root(), true); err != nil {
log.Error("Failed to commit recent state trie", "err", err)
}
}
}
if snapBase != (common.Hash{}) {
log.Info("Writing snapshot state to disk", "root", snapBase)
if err := triedb.Commit(snapBase, true, nil); err != nil {
if err := triedb.Commit(snapBase, true); err != nil {
log.Error("Failed to commit recent state trie", "err", err)
}
}
Expand Down Expand Up @@ -1343,7 +1343,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
// If we're running an archive node, always flush
if bc.cacheConfig.TrieDirtyDisabled {
return bc.triedb.Commit(root, false, nil)
return bc.triedb.Commit(root, false)
}
// Full but not archive node, do proper garbage collection
bc.triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
Expand Down Expand Up @@ -1379,7 +1379,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", flushInterval, "optimum", float64(chosen-bc.lastWrite)/TriesInMemory)
}
// Flush an entire trie and restart the counters
bc.triedb.Commit(header.Root, true, nil)
bc.triedb.Commit(header.Root, true)
bc.lastWrite = chosen
bc.gcproc = 0
}
Expand Down
6 changes: 3 additions & 3 deletions core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1803,7 +1803,7 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) {
t.Fatalf("Failed to import canonical chain start: %v", err)
}
if tt.commitBlock > 0 {
chain.stateCache.TrieDB().Commit(canonblocks[tt.commitBlock-1].Root(), true, nil)
chain.stateCache.TrieDB().Commit(canonblocks[tt.commitBlock-1].Root(), false)
if snapshots {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
Expand Down Expand Up @@ -1918,7 +1918,7 @@ func TestIssue23496(t *testing.T) {
if _, err := chain.InsertChain(blocks[:1]); err != nil {
t.Fatalf("Failed to import canonical chain start: %v", err)
}
chain.stateCache.TrieDB().Commit(blocks[0].Root(), true, nil)
chain.stateCache.TrieDB().Commit(blocks[0].Root(), false)

// Insert block B2 and commit the snapshot into disk
if _, err := chain.InsertChain(blocks[1:2]); err != nil {
Expand All @@ -1932,7 +1932,7 @@ func TestIssue23496(t *testing.T) {
if _, err := chain.InsertChain(blocks[2:3]); err != nil {
t.Fatalf("Failed to import canonical chain start: %v", err)
}
chain.stateCache.TrieDB().Commit(blocks[2].Root(), true, nil)
chain.stateCache.TrieDB().Commit(blocks[2].Root(), false)

// Insert the remaining blocks
if _, err := chain.InsertChain(blocks[3:]); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_sethead_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2004,7 +2004,7 @@ func testSetHead(t *testing.T, tt *rewindTest, snapshots bool) {
t.Fatalf("Failed to import canonical chain start: %v", err)
}
if tt.commitBlock > 0 {
chain.stateCache.TrieDB().Commit(canonblocks[tt.commitBlock-1].Root(), true, nil)
chain.stateCache.TrieDB().Commit(canonblocks[tt.commitBlock-1].Root(), false)
if snapshots {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
startPoint = point

if basic.commitBlock > 0 && basic.commitBlock == point {
chain.stateCache.TrieDB().Commit(blocks[point-1].Root(), true, nil)
chain.stateCache.TrieDB().Commit(blocks[point-1].Root(), false)
}
if basic.snapshotBlock > 0 && basic.snapshotBlock == point {
// Flushing the entire snap tree into the disk, the
Expand Down
2 changes: 1 addition & 1 deletion core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse
if err != nil {
panic(fmt.Sprintf("state write error: %v", err))
}
if err := statedb.Database().TrieDB().Commit(root, false, nil); err != nil {
if err := statedb.Database().TrieDB().Commit(root, false); err != nil {
panic(fmt.Sprintf("trie write error: %v", err))
}
return block, b.receipts
Expand Down
8 changes: 4 additions & 4 deletions core/dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
if _, err := bc.InsertChain(blocks); err != nil {
t.Fatalf("failed to import contra-fork chain for expansion: %v", err)
}
if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true, nil); err != nil {
if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, false); err != nil {
t.Fatalf("failed to commit contra-fork head for expansion: %v", err)
}
bc.Stop()
Expand All @@ -106,7 +106,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
if _, err := bc.InsertChain(blocks); err != nil {
t.Fatalf("failed to import pro-fork chain for expansion: %v", err)
}
if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true, nil); err != nil {
if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, false); err != nil {
t.Fatalf("failed to commit pro-fork head for expansion: %v", err)
}
bc.Stop()
Expand All @@ -131,7 +131,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
if _, err := bc.InsertChain(blocks); err != nil {
t.Fatalf("failed to import contra-fork chain for expansion: %v", err)
}
if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true, nil); err != nil {
if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, false); err != nil {
t.Fatalf("failed to commit contra-fork head for expansion: %v", err)
}
blocks, _ = GenerateChain(&proConf, conBc.CurrentBlock(), ethash.NewFaker(), genDb, 1, func(i int, gen *BlockGen) {})
Expand All @@ -149,7 +149,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
if _, err := bc.InsertChain(blocks); err != nil {
t.Fatalf("failed to import pro-fork chain for expansion: %v", err)
}
if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, true, nil); err != nil {
if err := bc.stateCache.TrieDB().Commit(bc.CurrentHeader().Root, false); err != nil {
t.Fatalf("failed to commit pro-fork head for expansion: %v", err)
}
blocks, _ = GenerateChain(&conConf, proBc.CurrentBlock(), ethash.NewFaker(), genDb, 1, func(i int, gen *BlockGen) {})
Expand Down
2 changes: 1 addition & 1 deletion core/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (ga *GenesisAlloc) flush(db ethdb.Database, triedb *trie.Database) error {
}
// Commit newly generated states into disk if it's not empty.
if root != types.EmptyRootHash {
if err := triedb.Commit(root, true, nil); err != nil {
if err := triedb.Commit(root, true); err != nil {
return err
}
}
Expand Down
63 changes: 44 additions & 19 deletions core/state/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
package state

import (
"bytes"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
)

// Tests that the node iterator indeed walks over the entire database contents.
func TestNodeIteratorCoverage(t *testing.T) {
// Create some arbitrary test state to iterate
db, sdb, root, _ := makeTestState()
sdb.TrieDB().Commit(root, false, nil)
sdb.TrieDB().Commit(root, false)

state, err := New(root, sdb, nil)
if err != nil {
Expand All @@ -40,29 +40,54 @@ func TestNodeIteratorCoverage(t *testing.T) {
hashes[it.Hash] = struct{}{}
}
}
// Cross check the iterated hashes and the database/nodepool content
for hash := range hashes {
if _, err = sdb.TrieDB().Node(hash); err != nil {
_, err = sdb.ContractCode(common.Hash{}, hash)
}
if err != nil {
t.Errorf("failed to retrieve reported node %x", hash)
}
}
for _, hash := range sdb.TrieDB().Nodes() {
if _, ok := hashes[hash]; !ok {
t.Errorf("state entry not reported %x", hash)
// Check in-disk nodes
var (
seenNodes = make(map[common.Hash]struct{})
seenCodes = make(map[common.Hash]struct{})
)
it := db.NewIterator(nil, nil)
for it.Next() {
ok, hash := isTrieNode(sdb.TrieDB().Scheme(), it.Key(), it.Value())
if !ok {
continue
}
seenNodes[hash] = struct{}{}
}
it := db.NewIterator(nil, nil)
it.Release()

// Check in-disk codes
it = db.NewIterator(nil, nil)
for it.Next() {
key := it.Key()
if bytes.HasPrefix(key, []byte("secure-key-")) {
ok, hash := rawdb.IsCodeKey(it.Key())
if !ok {
continue
}
if _, ok := hashes[common.BytesToHash(key)]; !ok {
t.Errorf("state entry not reported %x", key)
if _, ok := hashes[common.BytesToHash(hash)]; !ok {
t.Errorf("state entry not reported %x", it.Key())
}
seenCodes[common.BytesToHash(hash)] = struct{}{}
}
it.Release()

// Cross check the iterated hashes and the database/nodepool content
for hash := range hashes {
_, ok := seenNodes[hash]
if !ok {
_, ok = seenCodes[hash]
}
if !ok {
t.Errorf("failed to retrieve reported node %x", hash)
}
}
}

// isTrieNode is a helper function which reports if the provided
// database entry belongs to a trie node or not.
func isTrieNode(scheme string, key, val []byte) (bool, common.Hash) {
if scheme == rawdb.HashScheme {
if len(key) == common.HashLength {
return true, common.BytesToHash(key)
}
}
return false, common.Hash{}
}
2 changes: 1 addition & 1 deletion core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func RecoverPruning(datadir string, db ethdb.Database, trieCachePath string) err
}
headBlock := rawdb.ReadHeadBlock(db)
if headBlock == nil {
return errors.New("Failed to load head block")
return errors.New("failed to load head block")
}
// Initialize the snapshot tree in recovery mode to handle this special case:
// - Users run the `prune-state` command multiple times
Expand Down
21 changes: 12 additions & 9 deletions core/state/snapshot/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,19 +359,22 @@ func (dl *diskLayer) generateRange(ctx *generatorContext, trieId *trie.ID, prefi
}
// We use the snap data to build up a cache which can be used by the
// main account trie as a primary lookup when resolving hashes
var snapNodeCache ethdb.Database
var resolver trie.NodeResolver
if len(result.keys) > 0 {
snapNodeCache = rawdb.NewMemoryDatabase()
snapTrieDb := trie.NewDatabase(snapNodeCache)
snapTrie := trie.NewEmpty(snapTrieDb)
mdb := rawdb.NewMemoryDatabase()
tdb := trie.NewDatabase(mdb)
snapTrie := trie.NewEmpty(tdb)
for i, key := range result.keys {
snapTrie.Update(key, result.vals[i])
}
root, nodes, _ := snapTrie.Commit(false)
if nodes != nil {
snapTrieDb.Update(trie.NewWithNodeSet(nodes))
root, nodes, err := snapTrie.Commit(false)
if err == nil && nodes != nil {
tdb.Update(trie.NewWithNodeSet(nodes))
tdb.Commit(root, false)
}
resolver = func(owner common.Hash, path []byte, hash common.Hash) []byte {
return rawdb.ReadTrieNode(mdb, owner, path, hash, tdb.Scheme())
}
snapTrieDb.Commit(root, false, nil)
}
// Construct the trie for state iteration, reuse the trie
// if it's already opened with some nodes resolved.
Expand Down Expand Up @@ -400,7 +403,7 @@ func (dl *diskLayer) generateRange(ctx *generatorContext, trieId *trie.ID, prefi
start = time.Now()
internal time.Duration
)
nodeIt.AddResolver(snapNodeCache)
nodeIt.AddResolver(resolver)

for iter.Next() {
if last != nil && bytes.Compare(iter.Key, last) > 0 {
Expand Down
4 changes: 2 additions & 2 deletions core/state/snapshot/generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (t *testHelper) Commit() common.Hash {
t.nodes.Merge(nodes)
}
t.triedb.Update(t.nodes)
t.triedb.Commit(root, false, nil)
t.triedb.Commit(root, false)
return root
}

Expand Down Expand Up @@ -391,7 +391,7 @@ func TestGenerateCorruptAccountTrie(t *testing.T) {
root := helper.Commit() // Root: 0xa04693ea110a31037fb5ee814308a6f1d76bdab0b11676bdf4541d2de55ba978

// Delete an account trie leaf and ensure the generator chokes
helper.triedb.Commit(root, false, nil)
helper.triedb.Commit(root, false)
helper.diskdb.Delete(common.HexToHash("0x65145f923027566669a1ae5ccac66f945b55ff6eaeb17d2ea8e048b7d381f2d7").Bytes())

snap := generateSnapshot(helper.diskdb, helper.triedb, 16, root)
Expand Down
8 changes: 4 additions & 4 deletions core/state/statedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestUpdateLeaks(t *testing.T) {
}

root := state.IntermediateRoot(false)
if err := state.Database().TrieDB().Commit(root, false, nil); err != nil {
if err := state.Database().TrieDB().Commit(root, false); err != nil {
t.Errorf("can not commit trie %v to persistent database", root.Hex())
}

Expand Down Expand Up @@ -106,15 +106,15 @@ func TestIntermediateLeaks(t *testing.T) {
if err != nil {
t.Fatalf("failed to commit transition state: %v", err)
}
if err = transState.Database().TrieDB().Commit(transRoot, false, nil); err != nil {
if err = transState.Database().TrieDB().Commit(transRoot, false); err != nil {
t.Errorf("can not commit trie %v to persistent database", transRoot.Hex())
}

finalRoot, err := finalState.Commit(false)
if err != nil {
t.Fatalf("failed to commit final state: %v", err)
}
if err = finalState.Database().TrieDB().Commit(finalRoot, false, nil); err != nil {
if err = finalState.Database().TrieDB().Commit(finalRoot, false); err != nil {
t.Errorf("can not commit trie %v to persistent database", finalRoot.Hex())
}

Expand Down Expand Up @@ -948,7 +948,7 @@ func TestFlushOrderDataLoss(t *testing.T) {
if err := statedb.TrieDB().Cap(1024); err != nil {
t.Fatalf("failed to cap trie dirty cache: %v", err)
}
if err := statedb.TrieDB().Commit(root, false, nil); err != nil {
if err := statedb.TrieDB().Commit(root, false); err != nil {
t.Fatalf("failed to commit state trie: %v", err)
}
// Reopen the state trie from flushed disk and verify it
Expand Down
2 changes: 1 addition & 1 deletion core/state/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func testIterativeStateSync(t *testing.T, count int, commit bool, bypath bool) {
// Create a random state to copy
_, srcDb, srcRoot, srcAccounts := makeTestState()
if commit {
srcDb.TrieDB().Commit(srcRoot, false, nil)
srcDb.TrieDB().Commit(srcRoot, false)
}
srcTrie, _ := trie.New(trie.StateTrieID(srcRoot), srcDb.TrieDB())

Expand Down
2 changes: 1 addition & 1 deletion eth/protocols/eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func newTestBackendWithGenerator(blocks int, shanghai bool, generator func(int,
panic(err)
}
for _, block := range bs {
chain.StateCache().TrieDB().Commit(block.Root(), false, nil)
chain.StateCache().TrieDB().Commit(block.Root(), false)
}
txconfig := txpool.DefaultConfig
txconfig.Journal = "" // Don't litter the disk with test journals
Expand Down
4 changes: 2 additions & 2 deletions light/postprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (c *ChtIndexerBackend) Commit() error {
if err := c.triedb.Update(trie.NewWithNodeSet(nodes)); err != nil {
return err
}
if err := c.triedb.Commit(root, false, nil); err != nil {
if err := c.triedb.Commit(root, false); err != nil {
return err
}
}
Expand Down Expand Up @@ -467,7 +467,7 @@ func (b *BloomTrieIndexerBackend) Commit() error {
if err := b.triedb.Update(trie.NewWithNodeSet(nodes)); err != nil {
return err
}
if err := b.triedb.Commit(root, false, nil); err != nil {
if err := b.triedb.Commit(root, false); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion tests/fuzzers/stacktrie/trie_fuzzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (f *fuzzer) fuzz() int {
dbA.Update(trie.NewWithNodeSet(nodes))
}
// Flush memdb -> disk (sponge)
dbA.Commit(rootA, false, nil)
dbA.Commit(rootA, false)

// Stacktrie requires sorted insertion
sort.Sort(vals)
Expand Down
Loading

0 comments on commit 9842301

Please sign in to comment.