-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[R4R]add sharedStorage for prefetching to L1 #792
Changes from all commits
8bccd6f
6626e02
7abd888
57f224a
9a9b231
55349e0
e031e4e
63edb9d
85bd395
a7a785c
8a4dc60
eded677
c8425bf
76dac12
f2f75d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
package state | ||
|
||
import ( | ||
"sync" | ||
|
||
"github.com/ethereum/go-ethereum/common" | ||
) | ||
|
||
// sharedPool is used to store maps of originStorage of stateObjects | ||
type StoragePool struct { | ||
sync.RWMutex | ||
sharedMap map[common.Address]*sync.Map | ||
} | ||
|
||
func NewStoragePool() *StoragePool { | ||
sharedMap := make(map[common.Address]*sync.Map) | ||
return &StoragePool{ | ||
sync.RWMutex{}, | ||
sharedMap, | ||
} | ||
} | ||
|
||
// getStorage Check whether the storage exist in pool, | ||
// new one if not exist, the content of storage will be fetched in stateObjects.GetCommittedState() | ||
func (s *StoragePool) getStorage(address common.Address) *sync.Map { | ||
s.RLock() | ||
storageMap, ok := s.sharedMap[address] | ||
s.RUnlock() | ||
if !ok { | ||
s.Lock() | ||
defer s.Unlock() | ||
if storageMap, ok = s.sharedMap[address]; !ok { | ||
m := new(sync.Map) | ||
s.sharedMap[address] = m | ||
return m | ||
} | ||
} | ||
return storageMap | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ import ( | |
"fmt" | ||
"io" | ||
"math/big" | ||
"sync" | ||
"time" | ||
|
||
"github.com/ethereum/go-ethereum/common" | ||
|
@@ -79,7 +80,9 @@ type StateObject struct { | |
trie Trie // storage trie, which becomes non-nil on first access | ||
code Code // contract bytecode, which gets set when code is loaded | ||
|
||
originStorage Storage // Storage cache of original entries to dedup rewrites, reset for every transaction | ||
sharedOriginStorage *sync.Map // Storage cache of original entries to dedup rewrites, reset for every transaction | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
originStorage Storage | ||
|
||
pendingStorage Storage // Storage entries that need to be flushed to disk, at the end of an entire block | ||
dirtyStorage Storage // Storage entries that have been modified in the current transaction execution | ||
fakeStorage Storage // Fake storage which constructed by caller for debugging purpose. | ||
|
@@ -120,14 +123,21 @@ func newObject(db *StateDB, address common.Address, data Account) *StateObject { | |
if data.Root == (common.Hash{}) { | ||
data.Root = emptyRoot | ||
} | ||
var storageMap *sync.Map | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rename |
||
// Check whether the storage exist in pool, new originStorage if not exist | ||
if db != nil && db.storagePool != nil { | ||
storageMap = db.GetStorage(address) | ||
} | ||
|
||
return &StateObject{ | ||
db: db, | ||
address: address, | ||
addrHash: crypto.Keccak256Hash(address[:]), | ||
data: data, | ||
originStorage: make(Storage), | ||
pendingStorage: make(Storage), | ||
dirtyStorage: make(Storage), | ||
db: db, | ||
address: address, | ||
addrHash: crypto.Keccak256Hash(address[:]), | ||
data: data, | ||
sharedOriginStorage: storageMap, | ||
originStorage: make(Storage), | ||
pendingStorage: make(Storage), | ||
dirtyStorage: make(Storage), | ||
} | ||
} | ||
|
||
|
@@ -194,6 +204,29 @@ func (s *StateObject) GetState(db Database, key common.Hash) common.Hash { | |
return s.GetCommittedState(db, key) | ||
} | ||
|
||
func (s *StateObject) getOriginStorage(key common.Hash) (common.Hash, bool) { | ||
if value, cached := s.originStorage[key]; cached { | ||
return value, true | ||
} | ||
// if L1 cache miss, try to get it from shared pool | ||
if s.sharedOriginStorage != nil { | ||
val, ok := s.sharedOriginStorage.Load(key) | ||
if !ok { | ||
return common.Hash{}, false | ||
} | ||
setunapo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
s.originStorage[key] = val.(common.Hash) | ||
return val.(common.Hash), true | ||
} | ||
return common.Hash{}, false | ||
} | ||
|
||
func (s *StateObject) setOriginStorage(key common.Hash, value common.Hash) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to add a new API |
||
if s.db.writeOnSharedStorage && s.sharedOriginStorage != nil { | ||
s.sharedOriginStorage.Store(key, value) | ||
} | ||
s.originStorage[key] = value | ||
} | ||
|
||
// GetCommittedState retrieves a value from the committed account storage trie. | ||
func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Hash { | ||
// If the fake storage is set, only lookup the state here(in the debugging mode) | ||
|
@@ -204,7 +237,8 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has | |
if value, pending := s.pendingStorage[key]; pending { | ||
return value | ||
} | ||
if value, cached := s.originStorage[key]; cached { | ||
|
||
if value, cached := s.getOriginStorage(key); cached { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can leave it unchanged; the API: |
||
return value | ||
} | ||
// If no live objects are available, attempt to use snapshots | ||
|
@@ -263,7 +297,7 @@ func (s *StateObject) GetCommittedState(db Database, key common.Hash) common.Has | |
} | ||
value.SetBytes(content) | ||
} | ||
s.originStorage[key] = value | ||
s.setOriginStorage(key, value) | ||
return value | ||
} | ||
|
||
|
@@ -320,6 +354,7 @@ func (s *StateObject) finalise(prefetch bool) { | |
slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure | ||
} | ||
} | ||
|
||
if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot { | ||
s.db.prefetcher.prefetch(s.data.Root, slotsToPrefetch, s.addrHash) | ||
} | ||
|
@@ -356,7 +391,6 @@ func (s *StateObject) updateTrie(db Database) Trie { | |
continue | ||
} | ||
s.originStorage[key] = value | ||
|
||
var v []byte | ||
if (value == common.Hash{}) { | ||
s.setError(tr.TryDelete(key[:])) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,10 +39,7 @@ import ( | |
"github.com/ethereum/go-ethereum/trie" | ||
) | ||
|
||
const ( | ||
preLoadLimit = 128 | ||
defaultNumOfSlots = 100 | ||
) | ||
const defaultNumOfSlots = 100 | ||
|
||
type revision struct { | ||
id int | ||
|
@@ -101,6 +98,8 @@ type StateDB struct { | |
stateObjectsPending map[common.Address]struct{} // State objects finalized but not yet written to the trie | ||
stateObjectsDirty map[common.Address]struct{} // State objects modified in the current execution | ||
|
||
storagePool *StoragePool // sharedPool to store L1 originStorage of stateObjects | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. name: |
||
writeOnSharedStorage bool // Write to the shared origin storage of a stateObject while reading from the underlying storage layer. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rename |
||
// DB error. | ||
// State objects are used by the consensus core and VM which are | ||
// unable to deal with database-level errors. Any error that occurs | ||
|
@@ -147,6 +146,16 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) | |
return newStateDB(root, db, snaps) | ||
} | ||
|
||
// NewWithSharedPool creates a new state with sharedStorge on layer 1.5 | ||
func NewWithSharedPool(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) { | ||
statedb, err := newStateDB(root, db, snaps) | ||
if err != nil { | ||
return nil, err | ||
} | ||
statedb.storagePool = NewStoragePool() | ||
return statedb, nil | ||
} | ||
|
||
func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) { | ||
sdb := &StateDB{ | ||
db: db, | ||
|
@@ -178,6 +187,10 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, | |
return sdb, nil | ||
} | ||
|
||
func (s *StateDB) EnableWriteOnSharedStorage() { | ||
s.writeOnSharedStorage = true | ||
} | ||
|
||
// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the | ||
// state trie concurrently while the state is mutated so that when we reach the | ||
// commit phase, most of the needed data is already hot. | ||
|
@@ -591,78 +604,6 @@ func (s *StateDB) getStateObject(addr common.Address) *StateObject { | |
return nil | ||
} | ||
|
||
func (s *StateDB) TryPreload(block *types.Block, signer types.Signer) { | ||
accounts := make(map[common.Address]bool, block.Transactions().Len()) | ||
accountsSlice := make([]common.Address, 0, block.Transactions().Len()) | ||
for _, tx := range block.Transactions() { | ||
from, err := types.Sender(signer, tx) | ||
if err != nil { | ||
break | ||
} | ||
accounts[from] = true | ||
if tx.To() != nil { | ||
accounts[*tx.To()] = true | ||
} | ||
} | ||
for account := range accounts { | ||
accountsSlice = append(accountsSlice, account) | ||
} | ||
if len(accountsSlice) >= preLoadLimit && len(accountsSlice) > runtime.NumCPU() { | ||
objsChan := make(chan []*StateObject, runtime.NumCPU()) | ||
for i := 0; i < runtime.NumCPU(); i++ { | ||
start := i * len(accountsSlice) / runtime.NumCPU() | ||
end := (i + 1) * len(accountsSlice) / runtime.NumCPU() | ||
if i+1 == runtime.NumCPU() { | ||
end = len(accountsSlice) | ||
} | ||
go func(start, end int) { | ||
objs := s.preloadStateObject(accountsSlice[start:end]) | ||
objsChan <- objs | ||
}(start, end) | ||
} | ||
for i := 0; i < runtime.NumCPU(); i++ { | ||
objs := <-objsChan | ||
for _, obj := range objs { | ||
s.SetStateObject(obj) | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (s *StateDB) preloadStateObject(address []common.Address) []*StateObject { | ||
// Prefer live objects if any is available | ||
if s.snap == nil { | ||
return nil | ||
} | ||
hasher := crypto.NewKeccakState() | ||
objs := make([]*StateObject, 0, len(address)) | ||
for _, addr := range address { | ||
// If no live objects are available, attempt to use snapshots | ||
if acc, err := s.snap.Account(crypto.HashData(hasher, addr.Bytes())); err == nil { | ||
if acc == nil { | ||
continue | ||
} | ||
data := &Account{ | ||
Nonce: acc.Nonce, | ||
Balance: acc.Balance, | ||
CodeHash: acc.CodeHash, | ||
Root: common.BytesToHash(acc.Root), | ||
} | ||
if len(data.CodeHash) == 0 { | ||
data.CodeHash = emptyCodeHash | ||
} | ||
if data.Root == (common.Hash{}) { | ||
data.Root = emptyRoot | ||
} | ||
// Insert into the live set | ||
obj := newObject(s, addr, *data) | ||
objs = append(objs, obj) | ||
} | ||
// Do not enable this feature when snapshot is not enabled. | ||
} | ||
return objs | ||
} | ||
|
||
// getDeletedStateObject is similar to getStateObject, but instead of returning | ||
// nil for a deleted state object, it returns the actual object with the deleted | ||
// flag set. This is needed by the state journal to revert to the correct s- | ||
|
@@ -828,6 +769,7 @@ func (s *StateDB) Copy() *StateDB { | |
stateObjects: make(map[common.Address]*StateObject, len(s.journal.dirties)), | ||
stateObjectsPending: make(map[common.Address]struct{}, len(s.stateObjectsPending)), | ||
stateObjectsDirty: make(map[common.Address]struct{}, len(s.journal.dirties)), | ||
storagePool: s.storagePool, | ||
refund: s.refund, | ||
logs: make(map[common.Hash][]*types.Log, len(s.logs)), | ||
logSize: s.logSize, | ||
|
@@ -1633,3 +1575,7 @@ func (s *StateDB) GetDirtyAccounts() []common.Address { | |
} | ||
return accounts | ||
} | ||
|
||
func (s *StateDB) GetStorage(address common.Address) *sync.Map { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to add |
||
return s.storagePool.getStorage(address) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about rename the file with:
shared_storage_pool.go
,shared_pool.go
is a bit confusing, shared what?