Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R] state verification pipeline #795

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
// transition, such as amount of used gas, the receipt roots and the state root
// itself. ValidateState returns a database batch if the validation was a success
// otherwise nil and an error is returned.
func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64, skipHeavyVerify bool) error {
func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64) error {
header := block.Header()
if block.GasUsed() != usedGas {
return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas)
Expand All @@ -135,13 +135,14 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
return nil
},
}
if skipHeavyVerify {
if statedb.IsPipeCommit() {
validateFuns = append(validateFuns, func() error {
if err := statedb.WaitPipeVerification(); err != nil {
return err
}
statedb.Finalise(v.config.IsEIP158(header.Number))
statedb.AccountsIntermediateRoot()
//state verification pipeline - accounts root are not calculated here
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
statedb.PopulateSnapAccountAndStorage()
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
return nil
})
} else {
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2143,7 +2143,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
// Validate the state using the default validator
substart = time.Now()
if !statedb.IsLightProcessed() {
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas, bc.pipeCommit); err != nil {
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
log.Error("validate state failed", "error", err)
bc.reportBlock(block, receipts, err)
return it.index, err
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func testBlockChainImport(chain types.Blocks, pipelineCommit bool, blockchain *B
blockchain.reportBlock(block, receipts, err)
return err
}
err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas, pipelineCommit)
err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas)
if err != nil {
blockchain.reportBlock(block, receipts, err)
return err
Expand Down
9 changes: 9 additions & 0 deletions core/state/snapshot/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s
storageList: make(map[common.Hash][]common.Hash),
verifiedCh: verified,
}

switch parent := parent.(type) {
case *diskLayer:
dl.rebloom(parent)
Expand All @@ -190,6 +191,7 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s
default:
panic("unknown parent type")
}

// Sanity check that accounts or storage slots are never nil
for accountHash, blob := range accounts {
if blob == nil {
Expand Down Expand Up @@ -286,6 +288,13 @@ func (dl *diffLayer) Verified() bool {
}
}

func (dl *diffLayer) CorrectAccounts(accounts map[common.Hash][]byte) {
dl.lock.Lock()
defer dl.lock.Unlock()

dl.accountData = accounts
}

// Parent returns the subsequent layer of a diff layer.
func (dl *diffLayer) Parent() snapshot {
return dl.parent
Expand Down
3 changes: 3 additions & 0 deletions core/state/snapshot/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func (dl *diskLayer) Verified() bool {
return true
}

func (dl *diskLayer) CorrectAccounts(map[common.Hash][]byte) {
}

// Parent always returns nil as there's no layer below the disk.
func (dl *diskLayer) Parent() snapshot {
return nil
Expand Down
6 changes: 6 additions & 0 deletions core/state/snapshot/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,12 @@ func (dl *diffLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
if dl.Stale() {
return common.Hash{}, ErrSnapshotStale
}

// Wait the snapshot(difflayer) is verified, it means the account data also been refreshed with the correct data
if !dl.WaitAndGetVerifyRes() {
return common.Hash{}, ErrSnapshotStale
}

// Everything below was journalled, persist this layer too
if err := rlp.Encode(buffer, dl.root); err != nil {
return common.Hash{}, err
Expand Down
9 changes: 8 additions & 1 deletion core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,12 @@ type Snapshot interface {
// Verified returns whether the snapshot is verified
Verified() bool

// Store the verification result
// MarkValid stores the verification result
MarkValid()

// CorrectAccounts updates account data for storing the correct data during pipecommit
CorrectAccounts(map[common.Hash][]byte)

// Account directly retrieves the account associated with a particular hash in
// the snapshot slim data format.
Account(hash common.Hash) (*Account, error)
Expand Down Expand Up @@ -240,6 +243,10 @@ func (t *Tree) waitBuild() {
}
}

func (t *Tree) Layers() int {
return len(t.layers)
}

// Disable interrupts any pending snapshot generator, deletes all the snapshot
// layers in memory and marks snapshots disabled globally. In order to resume
// the snapshot functionality, the caller must invoke Rebuild.
Expand Down
77 changes: 73 additions & 4 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,16 @@ func (s *StateDB) MarkLightProcessed() {

// Enable the pipeline commit function of statedb
func (s *StateDB) EnablePipeCommit() {
if s.snap != nil {
if s.snap != nil && s.snaps.Layers() > 1 {
s.pipeCommit = true
}
}

// IsPipeCommit checks whether pipecommit is enabled on the statedb or not
func (s *StateDB) IsPipeCommit() bool {
return s.pipeCommit
}

// Mark that the block is full processed
func (s *StateDB) MarkFullProcessed() {
s.fullProcessed = true
Expand Down Expand Up @@ -1023,6 +1028,47 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
return s.StateIntermediateRoot()
}

//PopulateSnapAccountAndStorage tries to populate required accounts and storages for pipecommit
func (s *StateDB) PopulateSnapAccountAndStorage() {
for addr := range s.stateObjectsPending {
if obj := s.stateObjects[addr]; !obj.deleted {
if s.snap != nil && !obj.deleted {
s.populateSnapStorage(obj)
s.snapAccounts[obj.address] = snapshot.SlimAccountRLP(obj.data.Nonce, obj.data.Balance, emptyRoot, obj.data.CodeHash)
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}

//populateSnapStorage tries to populate required storages for pipecommit
func (s *StateDB) populateSnapStorage(obj *StateObject) {
for key, value := range obj.dirtyStorage {
obj.pendingStorage[key] = value
}
if len(obj.pendingStorage) == 0 {
return
}
var storage map[string][]byte
for key, value := range obj.pendingStorage {
var v []byte
if (value != common.Hash{}) {
// Encoding []byte cannot fail, ok to ignore the error.
v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:]))
}
// If state snapshotting is active, cache the data til commit
if obj.db.snap != nil {
if storage == nil {
// Retrieve the old storage map, if available, create a new one otherwise
if storage = obj.db.snapStorage[obj.address]; storage == nil {
storage = make(map[string][]byte)
obj.db.snapStorage[obj.address] = storage
}
}
storage[string(key[:])] = v // v will be nil if value is 0x00
}
}
}

func (s *StateDB) AccountsIntermediateRoot() {
tasks := make(chan func())
finishCh := make(chan struct{})
Expand Down Expand Up @@ -1108,6 +1154,7 @@ func (s *StateDB) StateIntermediateRoot() common.Hash {
}
s.trie = tr
}

usedAddrs := make([][]byte, 0, len(s.stateObjectsPending))
for addr := range s.stateObjectsPending {
if obj := s.stateObjects[addr]; obj.deleted {
Expand All @@ -1120,6 +1167,7 @@ func (s *StateDB) StateIntermediateRoot() common.Hash {
if prefetcher != nil {
prefetcher.used(s.originalRoot, usedAddrs)
}

if len(s.stateObjectsPending) > 0 {
s.stateObjectsPending = make(map[common.Address]struct{})
}
Expand Down Expand Up @@ -1297,20 +1345,39 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er
var diffLayer *types.DiffLayer
var verified chan struct{}
var snapUpdated chan struct{}
var accountCorrected chan struct{}

if s.snap != nil {
diffLayer = &types.DiffLayer{}
}
if s.pipeCommit {
// async commit the MPT
verified = make(chan struct{})
snapUpdated = make(chan struct{})
accountCorrected = make(chan struct{})
}

commmitTrie := func() error {
commitErr := func() error {
accountData := make(map[common.Hash][]byte)
if s.pipeCommit {
// Due to state verification pipeline, the accounts roots are not updated, leading to the data in the difflayer is not correct, capture the correct data here
<-snapUpdated
s.AccountsIntermediateRoot()
for k, v := range s.snapAccounts {
accountData[crypto.Keccak256Hash(k[:])] = v
}
if parent := s.snap.Root(); parent != s.expectedRoot {
s.snaps.Snapshot(s.expectedRoot).CorrectAccounts(accountData)
}
close(accountCorrected)
}

if s.stateRoot = s.StateIntermediateRoot(); s.fullProcessed && s.expectedRoot != s.stateRoot {
fmt.Printf("Invalid merkle root (remote: %x local: %x) \n", s.expectedRoot, s.stateRoot)
return fmt.Errorf("invalid merkle root (remote: %x local: %x)", s.expectedRoot, s.stateRoot)
}

tasks := make(chan func())
taskResults := make(chan error, len(s.stateObjectsDirty))
tasksNum := 0
Expand Down Expand Up @@ -1399,16 +1466,15 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er

if s.pipeCommit {
if commitErr == nil {
<-snapUpdated
s.snaps.Snapshot(s.stateRoot).MarkValid()
} else {
// The blockchain will do the further rewind if write block not finish yet
if failPostCommitFunc != nil {
<-snapUpdated
failPostCommitFunc()
}
log.Error("state verification failed", "err", commitErr)
}
<-accountCorrected
forcodedancing marked this conversation as resolved.
Show resolved Hide resolved
close(verified)
}
return commitErr
Expand Down Expand Up @@ -1450,9 +1516,12 @@ func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() er
}
// Only update if there's a state transition (skip empty Clique blocks)
if parent := s.snap.Root(); parent != s.expectedRoot {
if err := s.snaps.Update(s.expectedRoot, parent, s.snapDestructs, s.snapAccounts, s.snapStorage, verified); err != nil {
err := s.snaps.Update(s.expectedRoot, parent, s.snapDestructs, s.snapAccounts, s.snapStorage, verified)

if err != nil {
log.Warn("Failed to update snapshot tree", "from", parent, "to", s.expectedRoot, "err", err)
}

// Keep n diff layers in the memory
// - head layer is paired with HEAD state
// - head-1 layer is paired with HEAD-1 state
Expand Down
2 changes: 1 addition & 1 deletion core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty
}

// Do validate in advance so that we can fall back to full process
if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed, false); err != nil {
if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed); err != nil {
log.Error("validate state failed during diff sync", "error", err)
return nil, nil, 0, err
}
Expand Down
2 changes: 1 addition & 1 deletion core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Validator interface {

// ValidateState validates the given statedb and optionally the receipts and
// gas used.
ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64, skipHeavyVerify bool) error
ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error
}

// Prefetcher is an interface for pre-caching transaction signatures and state.
Expand Down