From 1d71a9c455dbd3dc205524c9f16772b08eb56f84 Mon Sep 17 00:00:00 2001 From: ryanmorphl2 <163962984+ryanmorphl2@users.noreply.github.com> Date: Mon, 5 Aug 2024 16:45:30 +0800 Subject: [PATCH] feat(zk-trie): enable database pruning --- cmd/utils/flags.go | 10 +- core/state/statedb.go | 5 +- eth/ethconfig/config.go | 5 +- trie/database.go | 89 +++++++++++++---- trie/zk_committer.go | 205 +++++++++++++++++++++++++++++++++++++++ trie/zk_trie.go | 39 +++++++- trie/zk_trie_database.go | 33 +++++-- 7 files changed, 346 insertions(+), 40 deletions(-) create mode 100644 trie/zk_committer.go diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 72299b56e..f2c1ffe6f 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1835,11 +1835,11 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { cfg.Genesis = core.DefaultMorphHoleskyGenesisBlock() // forced for mainnet // disable pruning - if ctx.GlobalString(GCModeFlag.Name) != GCModeArchive { - log.Crit("Must use --gcmode=archive") - } - log.Info("Pruning disabled") - cfg.NoPruning = true + // if ctx.GlobalString(GCModeFlag.Name) != GCModeArchive { + // log.Crit("Must use --gcmode=archive") + // } + // log.Info("Pruning disabled") + // cfg.NoPruning = true // disable prefetch log.Info("Prefetch disabled") cfg.NoPrefetch = true diff --git a/core/state/statedb.go b/core/state/statedb.go index 6629a50ea..6aaf05a2c 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -1006,9 +1006,10 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) { } // The onleaf func is called _serially_, so we can reuse the same account // for unmarshalling every time. - var account types.StateAccount + var account *types.StateAccount root, accountCommitted, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error { - if err := rlp.DecodeBytes(leaf, &account); err != nil { + var err error + if account, err = types.UnmarshalStateAccount(leaf); err != nil { return nil } if account.Root != s.db.TrieDB().EmptyRoot() { diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 0d98db2a6..c17b3d529 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -18,7 +18,6 @@ package ethconfig import ( - "github.com/scroll-tech/go-ethereum/consensus/l2" "math/big" "os" "os/user" @@ -26,6 +25,8 @@ import ( "runtime" "time" + "github.com/scroll-tech/go-ethereum/consensus/l2" + "github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/consensus" "github.com/scroll-tech/go-ethereum/consensus/clique" @@ -79,7 +80,7 @@ var Defaults = Config{ TrieCleanCacheJournal: "triecache", TrieCleanCacheRejournal: 60 * time.Minute, TrieDirtyCache: 256, - TrieTimeout: 60 * time.Minute, + TrieTimeout: 1 * time.Minute, SnapshotCache: 102, FilterLogCacheSize: 32, Miner: miner.DefaultConfig, diff --git a/trie/database.go b/trie/database.go index 1c5b7f805..ea0c739c6 100644 --- a/trie/database.go +++ b/trie/database.go @@ -17,6 +17,7 @@ package trie import ( + "bytes" "errors" "fmt" "io" @@ -33,6 +34,8 @@ import ( "github.com/scroll-tech/go-ethereum/log" "github.com/scroll-tech/go-ethereum/metrics" "github.com/scroll-tech/go-ethereum/rlp" + zktrie "github.com/scroll-tech/zktrie/trie" + zkt "github.com/scroll-tech/zktrie/types" ) var ( @@ -92,7 +95,8 @@ type Database struct { childrenSize common.StorageSize // Storage size of the external children tracking preimages *preimageStore // The store for caching preimages - lock sync.RWMutex + lock sync.RWMutex + rawLock sync.RWMutex //For zk raw dirties } // rawNode is a simple binary blob used to differentiate between collapsed trie @@ -140,6 +144,13 @@ type rawShortNode struct { func (n rawShortNode) cache() (hashNode, bool) { panic("this should never end up in a live trie") } func (n rawShortNode) fstring(ind string) string { panic("this should never end up in a live trie") } +type rawZkNode struct { + n *zktrie.Node +} + +func (n rawZkNode) cache() (hashNode, bool) { panic("this should never end up in a live trie") } +func (n rawZkNode) fstring(ind string) string { panic("this should never end up in a live trie") } + // cachedNode is all the information we know about a single cached trie node // in the memory database write layer. type cachedNode struct { @@ -168,6 +179,9 @@ func (n *cachedNode) rlp() []byte { if node, ok := n.node.(rawNode); ok { return node } + if node, ok := n.node.(rawZkNode); ok { + return node.n.CanonicalValue() + } blob, err := rlp.EncodeToBytes(n.node) if err != nil { panic(err) @@ -181,6 +195,10 @@ func (n *cachedNode) obj(hash common.Hash) node { if node, ok := n.node.(rawNode); ok { return mustDecodeNode(hash[:], node) } + + if node, ok := n.node.(rawZkNode); ok { + return node + } return expandNode(hash[:], n.node) } @@ -208,6 +226,16 @@ func forGatherChildren(n node, onChild func(hash common.Hash)) { } case hashNode: onChild(common.BytesToHash(n)) + case rawZkNode: + switch n.n.Type { + case zktrie.NodeTypeBranch_0, zktrie.NodeTypeBranch_1, zktrie.NodeTypeBranch_2, zktrie.NodeTypeBranch_3: + if !bytes.Equal(n.n.ChildL[:], common.Hash{}.Bytes()) { + onChild(common.BytesToHash(n.n.ChildL.Bytes())) + } + if !bytes.Equal(n.n.ChildR[:], common.Hash{}.Bytes()) { + onChild(common.BytesToHash(n.n.ChildR.Bytes())) + } + } case valueNode, nil, rawNode: default: panic(fmt.Sprintf("unknown node type: %T", n)) @@ -231,8 +259,7 @@ func simplifyNode(n node) node { } } return node - - case valueNode, hashNode, rawNode: + case valueNode, hashNode, rawNode, rawZkNode: return n default: @@ -360,12 +387,18 @@ func (db *Database) insert(hash common.Hash, size int, node node) { // node retrieves a cached trie node from memory, or returns nil if none can be // found in the memory cache. func (db *Database) node(hash common.Hash) node { + zkHash := zkt.NewHashFromBytes(hash[:]) + nodeKey := common.BytesToHash(BitReverse(zkHash[:])) + // Retrieve the node from the clean cache if available if db.cleans != nil { - if enc := db.cleans.Get(nil, hash[:]); enc != nil { + if enc := db.cleans.Get(nil, nodeKey[:]); enc != nil { memcacheCleanHitMeter.Mark(1) memcacheCleanReadMeter.Mark(int64(len(enc))) - return mustDecodeNode(hash[:], enc) + + if zkNode, err := zktrie.NewNodeFromBytes(enc); err == nil { + return rawZkNode{zkNode} + } } } // Retrieve the node from the dirty cache if available @@ -381,15 +414,20 @@ func (db *Database) node(hash common.Hash) node { memcacheDirtyMissMeter.Mark(1) // Content unavailable in memory, attempt to retrieve from disk - enc, err := db.diskdb.Get(hash[:]) + enc, err := db.diskdb.Get(nodeKey[:]) if err != nil || enc == nil { return nil } if db.cleans != nil { - db.cleans.Set(hash[:], enc) + db.cleans.Set(nodeKey[:], enc) memcacheCleanMissMeter.Mark(1) memcacheCleanWriteMeter.Mark(int64(len(enc))) } + + if zkNode, err := zktrie.NewNodeFromBytes(enc); err == nil { + return rawZkNode{zkNode} + } + return mustDecodeNode(hash[:], enc) } @@ -585,7 +623,11 @@ func (db *Database) Cap(limit common.StorageSize) error { for size > limit && oldest != (common.Hash{}) { // Fetch the oldest referenced node and push into the batch node := db.dirties[oldest] - rawdb.WriteTrieNode(batch, oldest, node.rlp()) + + zkHash := zkt.NewHashFromBytes(oldest[:]) + nodeKey := common.BytesToHash(BitReverse(zkHash[:])) + + rawdb.WriteTrieNode(batch, nodeKey, node.rlp()) // If we exceeded the ideal batch size, commit and reset if batch.ValueSize() >= ethdb.IdealBatchSize { @@ -654,18 +696,21 @@ func (db *Database) Commit(node common.Hash, report bool, callback func(common.H start := time.Now() batch := db.diskdb.NewBatch() - db.lock.Lock() - for _, v := range db.rawDirties { - batch.Put(v.K, v.V) - } - for k := range db.rawDirties { - delete(db.rawDirties, k) - } - db.lock.Unlock() - if err := batch.Write(); err != nil { - return err + if (db.newest == common.Hash{}) { + db.lock.Lock() + + for _, v := range db.rawDirties { + batch.Put(v.K, v.V) + } + for k := range db.rawDirties { + delete(db.rawDirties, k) + } + db.lock.Unlock() + if err := batch.Write(); err != nil { + return err + } + batch.Reset() } - batch.Reset() if (node == common.Hash{}) { return nil @@ -675,6 +720,7 @@ func (db *Database) Commit(node common.Hash, report bool, callback func(common.H if db.preimages != nil { db.preimages.commit(true) } + // Move the trie itself into the batch, flushing if enough data is accumulated nodes, storage := len(db.dirties), db.dirtiesSize @@ -731,7 +777,10 @@ func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleane return err } // If we've reached an optimal batch size, commit and start over - rawdb.WriteTrieNode(batch, hash, node.rlp()) + zkHash := zkt.NewHashFromBytes(hash[:]) + nodeKey := common.BytesToHash(BitReverse(zkHash[:])) + + rawdb.WriteTrieNode(batch, nodeKey, node.rlp()) if callback != nil { callback(hash) } diff --git a/trie/zk_committer.go b/trie/zk_committer.go new file mode 100644 index 000000000..35b01aa7f --- /dev/null +++ b/trie/zk_committer.go @@ -0,0 +1,205 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package trie + +import ( + "bytes" + "errors" + "sync" + + zktrie "github.com/scroll-tech/zktrie/trie" + zkt "github.com/scroll-tech/zktrie/types" + "golang.org/x/crypto/sha3" + + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/crypto" +) + +// leafChanSize is the size of the leafCh. It's a pretty arbitrary number, to allow +// some parallelism but not incur too much memory overhead. +const ZkLeafChanSize = 200 + +// leaf represents a trie leaf value +type ZkLeaf struct { + size int // size of the rlp data (estimate) + hash common.Hash // hash of rlp data + node node // the node to commit +} + +// committer is a type used for the trie Commit operation. A committer has some +// internal preallocated temp space, and also a callback that is invoked when +// leaves are committed. The leafs are passed through the `leafCh`, to allow +// some level of parallelism. +// By 'some level' of parallelism, it's still the case that all leaves will be +// processed sequentially - onleaf will never be called in parallel or out of order. +type zk_committer struct { + tmp sliceBuffer + sha crypto.KeccakState + + onleaf LeafCallback + zkLeafCh chan *ZkLeaf +} + +// committers live in a global sync.Pool +var zk_committerPool = sync.Pool{ + New: func() interface{} { + return &zk_committer{ + tmp: make(sliceBuffer, 0, 550), // cap is as large as a full fullNode. + sha: sha3.NewLegacyKeccak256().(crypto.KeccakState), + } + }, +} + +// newCommitter creates a new committer or picks one from the pool. +func newZkCommitter() *zk_committer { + return zk_committerPool.Get().(*zk_committer) +} + +func returnZkCommitterToPool(h *zk_committer) { + h.onleaf = nil + h.zkLeafCh = nil + zk_committerPool.Put(h) +} + +// Commit collapses a node down into a hash node and inserts it into the database +func (c *zk_committer) Commit(rootHash common.Hash, db *Database) (*zktrie.Node, int, error) { + if db == nil { + return nil, 0, errors.New("no db provided") + } + + db.rawLock.Lock() + defer db.rawLock.Unlock() + + n, childCommitted, err := c.commit(rootHash, db) + + for k := range db.rawDirties { + delete(db.rawDirties, k) + } + + return n, childCommitted, err +} + +// commit collapses a node down into a hash node and inserts it into the database +func (c *zk_committer) commit(nodeHash common.Hash, db *Database) (*zktrie.Node, int, error) { + nodeKey := zkt.NewHashFromBytes(nodeHash[:]) + if nodeVal, ok := db.rawDirties.Get(BitReverse(nodeKey[:])); ok { + if node, err := zktrie.NewNodeFromBytes(nodeVal); err == nil { + switch node.Type { + case zktrie.NodeTypeEmpty_New: + return nil, 0, nil + case zktrie.NodeTypeLeaf_New: + + nodeCopy := node.Copy() + c.store(nodeCopy, nodeHash, db) + + return nodeCopy, 1, nil + case zktrie.NodeTypeBranch_0, zktrie.NodeTypeBranch_1, zktrie.NodeTypeBranch_2, zktrie.NodeTypeBranch_3: + var childCommittedL int + if !bytes.Equal(node.ChildL[:], common.Hash{}.Bytes()) { + _, childCommitted, _ := c.commit(common.BytesToHash(node.ChildL.Bytes()), db) + childCommittedL = childCommitted + childCommittedL += 1 + } + + var childCommittedR int + if !bytes.Equal(node.ChildR[:], common.Hash{}.Bytes()) { + _, childCommitted, _ := c.commit(common.BytesToHash(node.ChildR.Bytes()), db) + childCommittedR = childCommitted + childCommittedR += 1 + } + + nodeCopy := node.Copy() + c.store(nodeCopy, nodeHash, db) + + return nodeCopy, childCommittedL + childCommittedR, nil + case zktrie.NodeTypeEmpty, zktrie.NodeTypeLeaf, zktrie.NodeTypeParent: + panic("encounter unsupported deprecated node type") + default: + panic("unreachable") + } + } + } + + return nil, 0, nil +} + +// store hashes the node n and if we have a storage layer specified, it writes +// the key/value pair to it and tracks any node->child references as well as any +// node->external trie references. +func (c *zk_committer) store(n *zktrie.Node, nodeHash common.Hash, db *Database) { + // Larger nodes are replaced by their hash and stored in the database. + var ( + size int + ) + if n == nil { + // This was not generated - must be a small node stored in the parent. + // In theory, we should apply the leafCall here if it's not nil(embedded + // node usually contains value). But small value(less than 32bytes) is + // not our target. + return + } else { + // We have the hash already, estimate the RLP encoding-size of the node. + // The size is used for mem tracking, does not need to be exact + size = zkEstimateSize(n) + } + // If we're using channel-based leaf-reporting, send to channel. + // The leaf channel will be active only when there an active leaf-callback + if c.zkLeafCh != nil { + c.zkLeafCh <- &ZkLeaf{ + size: size, + hash: nodeHash, + node: rawZkNode{n}, + } + } else if db != nil { + // No leaf-callback used, but there's still a database. Do serial + // insertion + db.lock.Lock() + db.insert(nodeHash, size, rawZkNode{n}) + db.lock.Unlock() + } +} + +// commitLoop does the actual insert + leaf callback for nodes. +func (c *zk_committer) commitLoop(db *Database) { + for item := range c.zkLeafCh { + var ( + hash = item.hash + size = item.size + n = item.node + ) + // // We are pooling the trie nodes into an intermediate memory cache + db.lock.Lock() + db.insert(hash, size, n) + db.lock.Unlock() + + if c.onleaf != nil { + if node, ok := n.(rawZkNode); ok { + if node.n.Type == zktrie.NodeTypeLeaf_New { + c.onleaf(nil, nil, node.n.Data(), hash) + } + } + } + } +} + +// estimateSize estimates the size of an rlp-encoded node, without actually +// rlp-encoding it (zero allocs). This method has been experimentally tried, and with a trie +// with 1000 leafs, the only errors above 1% are on small shortnodes, where this +// method overestimates by 2 or 3 bytes (e.g. 37 instead of 35) +func zkEstimateSize(n *zktrie.Node) int { + return len(n.CanonicalValue()) +} diff --git a/trie/zk_trie.go b/trie/zk_trie.go index ed266c518..f25e4b82b 100644 --- a/trie/zk_trie.go +++ b/trie/zk_trie.go @@ -18,6 +18,7 @@ package trie import ( "fmt" + "sync" zktrie "github.com/scroll-tech/zktrie/trie" zkt "github.com/scroll-tech/zktrie/types" @@ -123,13 +124,47 @@ func (t *ZkTrie) GetKey(kHashBytes []byte) []byte { // // Committing flushes nodes from memory. Subsequent Get calls will load nodes // from the database. -func (t *ZkTrie) Commit(LeafCallback) (common.Hash, int, error) { +func (t *ZkTrie) Commit(onleaf LeafCallback) (common.Hash, int, error) { if err := t.ZkTrie.Commit(); err != nil { return common.Hash{}, 0, err } // in current implmentation, every update of trie already writes into database // so Commmit does nothing - return t.Hash(), 0, nil + + rootHash := t.Hash() + + // Derive the hash for all dirty nodes first. We hold the assumption + // in the following procedure that all nodes are hashed. + h := newZkCommitter() + defer returnZkCommitterToPool(h) + + var wg sync.WaitGroup + if onleaf != nil { + h.onleaf = onleaf + h.zkLeafCh = make(chan *ZkLeaf, ZkLeafChanSize) + wg.Add(1) + go func() { + defer wg.Done() + h.commitLoop(t.db.db) + }() + } + + _, committed, err := h.Commit(rootHash, t.db.db) + + if onleaf != nil { + // The leafch is created in newCommitter if there was an onleaf callback + // provided. The commitLoop only _reads_ from it, and the commit + // operation was the sole writer. Therefore, it's safe to close this + // channel here. + close(h.zkLeafCh) + wg.Wait() + } + + if err != nil { + return common.Hash{}, 0, err + } + + return rootHash, committed, nil } // Hash returns the root hash of SecureBinaryTrie. It does not write to the diff --git a/trie/zk_trie_database.go b/trie/zk_trie_database.go index 0b5ae7ecf..64dc19dbf 100644 --- a/trie/zk_trie_database.go +++ b/trie/zk_trie_database.go @@ -6,6 +6,7 @@ import ( "github.com/syndtr/goleveldb/leveldb" zktrie "github.com/scroll-tech/zktrie/trie" + zkt "github.com/scroll-tech/zktrie/types" "github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/ethdb" @@ -38,22 +39,29 @@ func NewZktrieDatabaseFromTriedb(db *Database) *ZktrieDatabase { // Put saves a key:value into the Storage func (l *ZktrieDatabase) Put(k, v []byte) error { k = BitReverse(k) - l.db.lock.Lock() - l.db.rawDirties.Put(Concat(l.prefix, k[:]), v) - l.db.lock.Unlock() + concatKey := Concat(l.prefix, k[:]) + + l.db.rawLock.Lock() + l.db.rawDirties.Put(concatKey, v) + l.db.rawLock.Unlock() + + if l.db.cleans != nil { + l.db.cleans.Set(concatKey[:], v) + memcacheCleanMissMeter.Mark(1) + memcacheCleanWriteMeter.Mark(int64(len(v))) + } + return nil } // Get retrieves a value from a key in the Storage func (l *ZktrieDatabase) Get(key []byte) ([]byte, error) { + // for dirties + zkHash := zkt.NewHashFromBytes(key[:]) + nodeKey := common.BytesToHash(zkHash[:]) + key = BitReverse(key) concatKey := Concat(l.prefix, key[:]) - l.db.lock.RLock() - value, ok := l.db.rawDirties.Get(concatKey) - l.db.lock.RUnlock() - if ok { - return value, nil - } if l.db.cleans != nil { if enc := l.db.cleans.Get(nil, concatKey); enc != nil { @@ -63,6 +71,13 @@ func (l *ZktrieDatabase) Get(key []byte) ([]byte, error) { } } + l.db.lock.RLock() + value, ok := l.db.dirties[nodeKey] + l.db.lock.RUnlock() + if ok { + return value.rlp(), nil + } + v, err := l.db.diskdb.Get(concatKey) if err == leveldb.ErrNotFound { return nil, zktrie.ErrKeyNotFound