Skip to content

Commit

Permalink
[R4R]Prefetch state data on mining process (#803)
Browse files Browse the repository at this point in the history
* perf(miner):add mining prefetcher

* fix ineffassign

* fix comments

* fix comments

* fix comments: add AsMessagePrefetch to skip nonce check

* fix comment:refactor check order of method Forward

* fix comments:rename variables

* fix comments: rename

* rename

* fix comments: refactor

* update
  • Loading branch information
qinglin89 authored Mar 28, 2022
1 parent 21a3b11 commit cbf433a
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 1 deletion.
58 changes: 58 additions & 0 deletions core/state_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
)

const prefetchThread = 2
const checkInterval = 10

// statePrefetcher is a basic Prefetcher, which blindly executes a block on top
// of an arbitrary state with the goal of prefetching potentially useful state
Expand Down Expand Up @@ -88,6 +89,63 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
}
}

// PrefetchMining processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and snapshot clean state. Only used for mining stage
func (p *statePrefetcher) PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction) {
var signer = types.MakeSigner(p.config, header.Number)

txCh := make(chan *types.Transaction, 2*prefetchThread)
for i := 0; i < prefetchThread; i++ {
go func(startCh <-chan *types.Transaction, stopCh <-chan struct{}) {
idx := 0
newStatedb := statedb.Copy()
gaspool := new(GasPool).AddGas(gasLimit)
blockContext := NewEVMBlockContext(header, p.bc, nil)
evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
// Iterate over and process the individual transactions
for {
select {
case tx := <-startCh:
// Convert the transaction into an executable message and pre-cache its sender
msg, err := tx.AsMessageNoNonceCheck(signer)
if err != nil {
return // Also invalid block, bail out
}
idx++
newStatedb.Prepare(tx.Hash(), header.Hash(), idx)
precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm)
gaspool = new(GasPool).AddGas(gasLimit)
case <-stopCh:
return
}
}
}(txCh, interruptCh)
}
go func(txset *types.TransactionsByPriceAndNonce) {
count := 0
for {
tx := txset.Peek()
if tx == nil {
return
}
select {
case <-interruptCh:
return
default:
}
if count++; count%checkInterval == 0 {
if *txCurr == nil {
return
}
txset.Forward(*txCurr)
}
txCh <- tx
txset.Shift()
}
}(txs)
}

// precacheTransaction attempts to apply a transaction to the given state database
// and uses the input parameters for its environment. The goal is not to execute
// the transaction successfully, rather to warm up touched data slots.
Expand Down
2 changes: 2 additions & 0 deletions core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Prefetcher interface {
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and state trie nodes.
Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32)
// PrefetchMining used for pre-caching transaction signatures and state trie nodes. Only used for mining stage.
PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction)
}

// Processor is an interface for processing blocks using a given initial state.
Expand Down
62 changes: 62 additions & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,21 @@ func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transa
}
}

// Copy copys a new TransactionsPriceAndNonce with the same *transaction
func (t *TransactionsByPriceAndNonce) Copy() *TransactionsByPriceAndNonce {
heads := make([]*Transaction, len(t.heads))
copy(heads, t.heads)
txs := make(map[common.Address]Transactions, len(t.txs))
for acc, txsTmp := range t.txs {
txs[acc] = txsTmp
}
return &TransactionsByPriceAndNonce{
heads: heads,
txs: txs,
signer: t.signer,
}
}

// Peek returns the next transaction by price.
func (t *TransactionsByPriceAndNonce) Peek() *Transaction {
if len(t.heads) == 0 {
Expand Down Expand Up @@ -488,6 +503,44 @@ func (t *TransactionsByPriceAndNonce) CurrentSize() int {
return len(t.heads)
}

//Forward moves current transaction to be the one which is one index after tx
func (t *TransactionsByPriceAndNonce) Forward(tx *Transaction) {
if tx == nil {
t.heads = t.heads[0:0]
return
}
//check whether target tx exists in t.heads
for _, head := range t.heads {
if tx == head {
//shift t to the position one after tx
txTmp := t.Peek()
for txTmp != tx {
t.Shift()
txTmp = t.Peek()
}
t.Shift()
return
}
}
//get the sender address of tx
acc, _ := Sender(t.signer, tx)
//check whether target tx exists in t.txs
if txs, ok := t.txs[acc]; ok {
for _, txTmp := range txs {
//found the same pointer in t.txs as tx and then shift t to the position one after tx
if txTmp == tx {
txTmp = t.Peek()
for txTmp != tx {
t.Shift()
txTmp = t.Peek()
}
t.Shift()
return
}
}
}
}

// Message is a fully derived transaction and implements core.Message
//
// NOTE: In a future PR this will be removed.
Expand Down Expand Up @@ -535,6 +588,15 @@ func (tx *Transaction) AsMessage(s Signer) (Message, error) {
return msg, err
}

// AsMessageNoNonceCheck returns the transaction with checkNonce field set to be false.
func (tx *Transaction) AsMessageNoNonceCheck(s Signer) (Message, error) {
msg, err := tx.AsMessage(s)
if err == nil {
msg.checkNonce = false
}
return msg, err
}

func (m Message) From() common.Address { return m.from }
func (m Message) To() *common.Address { return m.to }
func (m Message) GasPrice() *big.Int { return m.gasPrice }
Expand Down
69 changes: 69 additions & 0 deletions core/types/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,75 @@ func TestTransactionTimeSort(t *testing.T) {
}
}

func TestTransactionForward(t *testing.T) {
// Generate a batch of accounts to start with
keys := make([]*ecdsa.PrivateKey, 5)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
}
signer := HomesteadSigner{}

// Generate a batch of transactions with overlapping prices, but different creation times
groups := map[common.Address]Transactions{}
for start, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)

tx, _ := SignTx(NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key)
tx2, _ := SignTx(NewTransaction(1, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key)

tx.time = time.Unix(0, int64(len(keys)-start))
tx2.time = time.Unix(1, int64(len(keys)-start))

groups[addr] = append(groups[addr], tx)
groups[addr] = append(groups[addr], tx2)

}
// Sort the transactions
txset := NewTransactionsByPriceAndNonce(signer, groups)
txsetCpy := txset.Copy()

txs := Transactions{}
for tx := txsetCpy.Peek(); tx != nil; tx = txsetCpy.Peek() {
txs = append(txs, tx)
txsetCpy.Shift()
}

tmp := txset.Copy()
for j := 0; j < 10; j++ {
txset = tmp.Copy()
txsetCpy = tmp.Copy()
i := 0
for ; i < j; i++ {
txset.Shift()
}
tx := txset.Peek()
txsetCpy.Forward(tx)
txCpy := txsetCpy.Peek()
if txCpy == nil {
if tx == nil {
continue
}
txset.Shift()
if txset.Peek() != nil {
t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx)
} else {
continue
}
}
txset.Shift()
for ; i < len(txs)-1; i++ {
tx = txset.Peek()
txCpy = txsetCpy.Peek()
if txCpy != tx {
t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx)
}
txsetCpy.Shift()
txset.Shift()
}

}
}

// TestTransactionCoding tests serializing/de-serializing to/from rlp and JSON.
func TestTransactionCoding(t *testing.T) {
key, err := crypto.GenerateKey()
Expand Down
12 changes: 11 additions & 1 deletion miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ type intervalAdjust struct {
// worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result.
type worker struct {
prefetcher core.Prefetcher
config *Config
chainConfig *params.ChainConfig
engine consensus.Engine
Expand Down Expand Up @@ -196,6 +197,7 @@ type worker struct {

func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker {
worker := &worker{
prefetcher: core.NewStatePrefetcher(chainConfig, eth.BlockChain(), engine),
config: config,
chainConfig: chainConfig,
engine: engine,
Expand Down Expand Up @@ -778,6 +780,14 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
}
bloomProcessors := core.NewAsyncReceiptBloomGenerator(processorCapacity)

interruptCh := make(chan struct{})
defer close(interruptCh)
tx := &types.Transaction{}
txCurr := &tx
//prefetch txs from all pending txs
txsPrefetch := txs.Copy()
w.prefetcher.PrefetchMining(txsPrefetch, w.current.header, w.current.gasPool.Gas(), w.current.state.Copy(), *w.chain.GetVMConfig(), interruptCh, txCurr)

LOOP:
for {
// In the following three cases, we will interrupt the execution of the transaction.
Expand Down Expand Up @@ -814,7 +824,7 @@ LOOP:
}
}
// Retrieve the next transaction and abort if all done
tx := txs.Peek()
tx = txs.Peek()
if tx == nil {
break
}
Expand Down

0 comments on commit cbf433a

Please sign in to comment.