From 0a1cc1007437a523645bff6f8770a1a3f5b95679 Mon Sep 17 00:00:00 2001 From: fudongbai <296179868@qq.com> Date: Mon, 7 Jun 2021 14:40:34 +0800 Subject: [PATCH] use hot cache for cachedb --- consensus/parlia/parlia.go | 9 +++- core/blockchain.go | 4 +- core/state/database.go | 78 +++++++++++++++++++++++++++++++++-- core/state/state_object.go | 8 +++- core/state/statedb.go | 10 +++-- core/state/trie_prefetcher.go | 32 +++++++++----- light/trie.go | 8 ++++ trie/secure_trie.go | 7 ++++ 8 files changed, 135 insertions(+), 21 deletions(-) diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index 0fca851d6b..d55e16188d 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -1106,7 +1106,14 @@ func (p *Parlia) applyTransaction( } actualTx := (*receivedTxs)[0] if !bytes.Equal(p.signer.Hash(actualTx).Bytes(), expectedHash.Bytes()) { - return fmt.Errorf("expected tx hash %v, get %v", expectedHash.String(), actualTx.Hash().String()) + return fmt.Errorf("expected tx hash %v, get %v, nonce %d, to %s, value %s, gas %d, gasPrice %s, data %s", expectedHash.String(), actualTx.Hash().String(), + expectedTx.Nonce(), + expectedTx.To().String(), + expectedTx.Value().String(), + expectedTx.Gas(), + expectedTx.GasPrice().String(), + hex.EncodeToString(expectedTx.Data()), + ) } expectedTx = actualTx // move to next diff --git a/core/blockchain.go b/core/blockchain.go index 03db0518e4..c4a2cb5073 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -232,7 +232,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par cacheConfig: cacheConfig, db: db, triegc: prque.New(nil), - stateCache: state.NewDatabaseWithConfig(db, &trie.Config{ + stateCache: state.NewDatabaseWithConfigAndCache(db, &trie.Config{ Cache: cacheConfig.TrieCleanLimit, Journal: cacheConfig.TrieCleanJournal, Preimages: cacheConfig.Preimages, @@ -1936,7 +1936,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { bc.reportBlock(block, receipts, err) atomic.StoreUint32(&followupInterrupt, 1) - log.Error("validate state failed", err) + log.Error("validate state failed", "error", err) return it.index, err } proctime := time.Since(start) diff --git a/core/state/database.go b/core/state/database.go index 1a06e33409..2919cc618d 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -32,6 +32,12 @@ const ( // Number of codehash->size associations to keep. codeSizeCacheSize = 100000 + // Number of state trie in cache + accountTrieCacheSize = 11 + + // Number of storage Trie in cache + storageTrieCacheSize = 2000 + // Cache size granted for caching clean code. codeCacheSize = 64 * 1024 * 1024 ) @@ -55,6 +61,12 @@ type Database interface { // TrieDB retrieves the low level trie database used for data storage. TrieDB() *trie.Database + + // Cache the account trie tree + CacheAccount(root common.Hash, t Trie) + + // Cache the storage trie tree + CacheStorage(addr common.Address, root common.Hash, t Trie) } // Trie is a Ethereum Merkle Patricia trie. @@ -121,14 +133,45 @@ func NewDatabaseWithConfig(db ethdb.Database, config *trie.Config) Database { } } +func NewDatabaseWithConfigAndCache(db ethdb.Database, config *trie.Config) Database { + csc, _ := lru.New(codeSizeCacheSize) + atc, _ := lru.New(accountTrieCacheSize) + stc, _ := lru.New(storageTrieCacheSize) + + database := &cachingDB{ + db: trie.NewDatabaseWithConfig(db, config), + codeSizeCache: csc, + codeCache: fastcache.New(codeCacheSize), + accountTrieCache: atc, + storageTrieCache: stc, + } + return database +} + type cachingDB struct { - db *trie.Database - codeSizeCache *lru.Cache - codeCache *fastcache.Cache + db *trie.Database + codeSizeCache *lru.Cache + codeCache *fastcache.Cache + accountTrieCache *lru.Cache + storageTrieCache *lru.Cache +} + +type triePair struct { + root common.Hash + trie Trie } // OpenTrie opens the main account trie at a specific root hash. func (db *cachingDB) OpenTrie(root common.Hash) (Trie, error) { + if db.accountTrieCache != nil { + if tr, exist := db.accountTrieCache.Get(root); exist { + fmt.Println("hit account Trie") + + return tr.(Trie), nil + } + } + fmt.Println("miss account Trie") + tr, err := trie.NewSecure(root, db.db) if err != nil { return nil, err @@ -138,6 +181,18 @@ func (db *cachingDB) OpenTrie(root common.Hash) (Trie, error) { // OpenStorageTrie opens the storage trie of an account. func (db *cachingDB) OpenStorageTrie(addrHash, root common.Hash) (Trie, error) { + if db.storageTrieCache != nil { + if tries, exist := db.storageTrieCache.Get(root); exist { + triesPairs := tries.([3]*triePair) + for _, triePair := range triesPairs { + if triePair != nil && triePair.root == root { + return triePair.trie, nil + } + } + } + } + fmt.Println("miss storage Trie") + tr, err := trie.NewSecure(root, db.db) if err != nil { return nil, err @@ -145,6 +200,23 @@ func (db *cachingDB) OpenStorageTrie(addrHash, root common.Hash) (Trie, error) { return tr, nil } +func (db *cachingDB) CacheAccount(root common.Hash, t Trie) { + tr := t.(*trie.SecureTrie) + db.accountTrieCache.Add(root, tr.ResetCopy()) +} + +func (db *cachingDB) CacheStorage(addr common.Address, root common.Hash, t Trie) { + tr := t.(*trie.SecureTrie) + if tries, exist := db.storageTrieCache.Get(addr); exist { + triesArray := tries.([3]*triePair) + triesArray[0], triesArray[1], triesArray[2] = &triePair{root: root, trie: tr.ResetCopy()}, triesArray[0], triesArray[1] + db.storageTrieCache.Add(addr, triesArray) + } else { + triesArray := [3]*triePair{&triePair{root: root, trie: tr.ResetCopy()}, nil, nil} + db.storageTrieCache.Add(addr, triesArray) + } +} + // CopyTrie returns an independent copy of the given trie. func (db *cachingDB) CopyTrie(t Trie) Trie { switch t := t.(type) { diff --git a/core/state/state_object.go b/core/state/state_object.go index ad46b09803..164bd05c46 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -321,7 +321,7 @@ func (s *stateObject) finalise(prefetch bool) { } } if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot { - s.db.prefetcher.prefetch(s.data.Root, slotsToPrefetch) + s.db.prefetcher.prefetch(s.data.Root, slotsToPrefetch, false) } if len(s.dirtyStorage) > 0 { s.dirtyStorage = make(Storage) @@ -412,6 +412,9 @@ func (s *stateObject) updateRoot(db Database) { func (s *stateObject) CommitTrie(db Database) error { // If nothing changed, don't bother with hashing anything if s.updateTrie(db) == nil { + if s.trie != nil && s.data.Root != emptyRoot { + db.CacheStorage(s.address, s.data.Root, s.trie) + } return nil } if s.dbErr != nil { @@ -425,6 +428,9 @@ func (s *stateObject) CommitTrie(db Database) error { if err == nil { s.data.Root = root } + if s.data.Root != emptyRoot { + db.CacheStorage(s.address, s.data.Root, s.trie) + } return err } diff --git a/core/state/statedb.go b/core/state/statedb.go index d1c910911b..7ed7404742 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -878,7 +878,7 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) { addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure } if s.prefetcher != nil && len(addressesToPrefetch) > 0 { - s.prefetcher.prefetch(s.originalRoot, addressesToPrefetch) + s.prefetcher.prefetch(s.originalRoot, addressesToPrefetch, true) } // Invalidate journal because reverting across transactions is not allowed. s.clearJournalAndRefund() @@ -986,7 +986,8 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { if metrics.EnabledExpensive { defer func(start time.Time) { s.AccountHashes += time.Since(start) }(time.Now()) } - return s.trie.Hash() + root := s.trie.Hash() + return root } // Prepare sets the current transaction hash and index and block hash which is @@ -1077,7 +1078,7 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) { // The onleaf func is called _serially_, so we can reuse the same account // for unmarshalling every time. var account Account - _, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error { + root, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error { if err := rlp.DecodeBytes(leaf, &account); err != nil { return nil } @@ -1092,6 +1093,9 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) { if metrics.EnabledExpensive { s.AccountCommits += time.Since(start) } + if root != emptyRoot { + s.db.CacheAccount(root, s.trie) + } return nil }, func() error { diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 6da71cef32..296d530021 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -141,7 +141,7 @@ func (p *triePrefetcher) copy() *triePrefetcher { } // prefetch schedules a batch of trie items to prefetch. -func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte) { +func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, isAccount bool) { // If the prefetcher is an inactive one, bail out if p.fetches != nil { return @@ -149,7 +149,7 @@ func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte) { // Active fetcher, schedule the retrievals fetcher := p.fetchers[root] if fetcher == nil { - fetcher = newSubfetcher(p.db, root) + fetcher = newSubfetcher(p.db, root, isAccount) p.fetchers[root] = fetcher } fetcher.schedule(keys) @@ -213,19 +213,22 @@ type subfetcher struct { seen map[string]struct{} // Tracks the entries already loaded dups int // Number of duplicate preload tasks used [][]byte // Tracks the entries used in the end + + isAccount bool // Whether it is account trie } // newSubfetcher creates a goroutine to prefetch state items belonging to a // particular root hash. -func newSubfetcher(db Database, root common.Hash) *subfetcher { +func newSubfetcher(db Database, root common.Hash, isAccount bool) *subfetcher { sf := &subfetcher{ - db: db, - root: root, - wake: make(chan struct{}, 1), - stop: make(chan struct{}), - term: make(chan struct{}), - copy: make(chan chan Trie), - seen: make(map[string]struct{}), + db: db, + root: root, + wake: make(chan struct{}, 1), + stop: make(chan struct{}), + term: make(chan struct{}), + copy: make(chan chan Trie), + seen: make(map[string]struct{}), + isAccount: isAccount, } gopool.Submit(func() { sf.loop() @@ -283,7 +286,14 @@ func (sf *subfetcher) loop() { defer close(sf.term) // Start by opening the trie and stop processing if it fails - trie, err := sf.db.OpenTrie(sf.root) + var trie Trie + var err error + if sf.isAccount { + trie, err = sf.db.OpenTrie(sf.root) + } else { + // address is useless + trie, err = sf.db.OpenStorageTrie(common.Hash{}, sf.root) + } if err != nil { log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err) return diff --git a/light/trie.go b/light/trie.go index 0516b94486..37b7513c1b 100644 --- a/light/trie.go +++ b/light/trie.go @@ -95,6 +95,14 @@ func (db *odrDatabase) TrieDB() *trie.Database { return nil } +func (db *odrDatabase) CacheAccount(_ common.Hash, _ state.Trie) { + return +} + +func (db *odrDatabase) CacheStorage(_ common.Address, _ common.Hash, _ state.Trie) { + return +} + type odrTrie struct { db *odrDatabase id *TrieID diff --git a/trie/secure_trie.go b/trie/secure_trie.go index 700d5b17fb..3b6193681d 100644 --- a/trie/secure_trie.go +++ b/trie/secure_trie.go @@ -173,6 +173,13 @@ func (t *SecureTrie) Copy() *SecureTrie { return &cpy } +func (t *SecureTrie) ResetCopy() *SecureTrie { + cpy := *t + cpy.secKeyCacheOwner = nil + cpy.secKeyCache = nil + return &cpy +} + // NodeIterator returns an iterator that returns nodes of the underlying trie. Iteration // starts at the key after the given start key. func (t *SecureTrie) NodeIterator(start []byte) NodeIterator {