Skip to content

Commit

Permalink
prefetch state by apply transactions within one block
Browse files Browse the repository at this point in the history
  • Loading branch information
unclezoro committed Jan 7, 2022
1 parent 74f6b61 commit 9dc6456
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 34 deletions.
22 changes: 17 additions & 5 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ const (
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
maxBeyondBlocks = 2048
prefetchTxNumber = 100

diffLayerFreezerRecheckInterval = 3 * time.Second
diffLayerPruneRecheckInterval = 1 * time.Second // The interval to prune unverified diff layers
Expand Down Expand Up @@ -233,10 +234,11 @@ type BlockChain struct {
running int32 // 0 if chain is running, 1 when stopped
procInterrupt int32 // interrupt signaler for block processing

engine consensus.Engine
validator Validator // Block and state validator interface
processor Processor // Block transaction processor interface
vmConfig vm.Config
engine consensus.Engine
prefetcher Prefetcher
validator Validator // Block and state validator interface
processor Processor // Block transaction processor interface
vmConfig vm.Config

shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
Expand Down Expand Up @@ -295,6 +297,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
diffNumToBlockHashes: make(map[uint64]map[common.Hash]struct{}),
diffPeersToDiffHashes: make(map[string]map[common.Hash]struct{}),
}
bc.prefetcher = NewStatePrefetcher(chainConfig, bc, engine)
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.processor = NewStateProcessor(chainConfig, bc, engine)

Expand Down Expand Up @@ -2051,13 +2054,20 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")

var followupInterrupt uint32
if len(block.Transactions()) >= prefetchTxNumber {
throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps)
go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
}(time.Now(), block, throwaway, &followupInterrupt)
}
//Process block using the parent state as reference point
substart := time.Now()
statedb, receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
activeState = statedb
if err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
return it.index, err
}
// Update the metrics touched during block processing
Expand All @@ -2076,6 +2086,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
log.Error("validate state failed", "error", err)
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
return it.index, err
}
}
Expand All @@ -2093,6 +2104,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
substart = time.Now()
status, err := bc.writeBlockWithState(block, receipts, logs, statedb, false)
if err != nil {
atomic.StoreUint32(&followupInterrupt, 1)
return it.index, err
}
// Update the metrics touched during block commit
Expand Down
72 changes: 44 additions & 28 deletions core/state_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"runtime"
"sync/atomic"

"github.com/ethereum/go-ethereum/consensus"
Expand All @@ -35,42 +36,57 @@ type statePrefetcher struct {
engine consensus.Engine // Consensus engine used for block rewards
}

// newStatePrefetcher initialises a new statePrefetcher.
func NewStatePrefetcher(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *statePrefetcher {
return &statePrefetcher{
config: config,
bc: bc,
engine: engine,
}
}

// Prefetch processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and state trie nodes.
func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) {
var (
header = block.Header()
gaspool = new(GasPool).AddGas(block.GasLimit())
blockContext = NewEVMBlockContext(header, p.bc, nil)
evm = vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
signer = types.MakeSigner(p.config, header.Number)
header = block.Header()
signer = types.MakeSigner(p.config, header.Number)
)
// Iterate over and process the individual transactions
byzantium := p.config.IsByzantium(block.Number())
for i, tx := range block.Transactions() {
// If block precaching was interrupted, abort
if interrupt != nil && atomic.LoadUint32(interrupt) == 1 {
return
}
// Convert the transaction into an executable message and pre-cache its sender
msg, err := tx.AsMessage(signer)
if err != nil {
return // Also invalid block, bail out
}
statedb.Prepare(tx.Hash(), block.Hash(), i)
if err := precacheTransaction(msg, p.config, gaspool, statedb, header, evm); err != nil {
return // Ugh, something went horribly wrong, bail out
}
// If we're pre-byzantium, pre-load trie nodes for the intermediate root
if !byzantium {
statedb.IntermediateRoot(true)
transactions := block.Transactions()
threads := runtime.NumCPU()
batch := len(transactions) / (threads + 1)

for i := 1; i <= threads; i++ {
start := i * batch
end := (i + 1) * batch
if i == threads {
end = len(transactions)
}
go func(start, end int) {
newStatedb := statedb.Copy()
gaspool := new(GasPool).AddGas(block.GasLimit())
blockContext := NewEVMBlockContext(header, p.bc, nil)
evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
// Iterate over and process the individual transactions
for i, tx := range transactions[start:end] {
// If block precaching was interrupted, abort
if interrupt != nil && atomic.LoadUint32(interrupt) == 1 {
return
}
// Convert the transaction into an executable message and pre-cache its sender
msg, err := tx.AsMessage(signer)
if err != nil {
return // Also invalid block, bail out
}
newStatedb.Prepare(tx.Hash(), block.Hash(), i)
if err := precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm); err != nil {
return // Ugh, something went horribly wrong, bail out
}
}
}(start, end)
}
// If were post-byzantium, pre-load trie nodes for the final root hash
if byzantium {
statedb.IntermediateRoot(true)
}

}

// precacheTransaction attempts to apply a transaction to the given state database
Expand Down
1 change: 0 additions & 1 deletion core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
gp = new(GasPool).AddGas(block.GasLimit())
)
signer := types.MakeSigner(p.bc.chainConfig, block.Number())
statedb.TryPreload(block, signer)
var receipts = make([]*types.Receipt, 0)
// Mutate the block and state according to any hard-fork specs
if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 {
Expand Down

0 comments on commit 9dc6456

Please sign in to comment.