From 14e4f7f6e62c12a772ab2539cbb0f75131b8c23d Mon Sep 17 00:00:00 2001 From: qinglin89 <316032931@qq.com> Date: Tue, 15 Mar 2022 11:07:39 +0800 Subject: [PATCH] perf(miner):add mining prefetcher --- core/gaspool.go | 9 +++++ core/state_prefetcher.go | 57 ++++++++++++++++++++++++++++ core/types.go | 2 + core/types/transaction.go | 56 +++++++++++++++++++++++++++ core/types/transaction_test.go | 69 ++++++++++++++++++++++++++++++++++ miner/worker.go | 12 ++++++ 6 files changed, 205 insertions(+) diff --git a/core/gaspool.go b/core/gaspool.go index e3795c1ee9..96194349fa 100644 --- a/core/gaspool.go +++ b/core/gaspool.go @@ -25,6 +25,15 @@ import ( // in a block. The zero value is a pool with zero gas available. type GasPool uint64 +// SetGas set an initial value for gaspool +func (gp *GasPool) SetGas(amount uint64) *GasPool { + if amount > math.MaxUint64 { + panic("gas pool pushed above uint64") + } + *(*uint64)(gp) = amount + return gp +} + // AddGas makes gas available for execution. func (gp *GasPool) AddGas(amount uint64) *GasPool { if uint64(*gp) > math.MaxUint64-amount { diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index d559a03a0f..b5fbad7899 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -88,6 +88,63 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c } } +// PrefetchMining processes the state changes according to the Ethereum rules by running +// the transaction messages using the statedb, but any changes are discarded. The +// only goal is to pre-cache transaction signatures and snapshot clean state. Only used for mining stage +func (p *statePrefetcher) PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction) { + var signer = types.MakeSigner(p.config, header.Number) + + txCh := make(chan *types.Transaction, 2*prefetchThread) + for i := 0; i < prefetchThread; i++ { + go func(txCh <-chan *types.Transaction, stopCh <-chan struct{}) { + idx := 0 + newStatedb := statedb.Copy() + gaspool := new(GasPool).AddGas(gasLimit) + blockContext := NewEVMBlockContext(header, p.bc, nil) + evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) + // Iterate over and process the individual transactions + for { + select { + case tx := <-txCh: + // Convert the transaction into an executable message and pre-cache its sender + msg, err := tx.AsMessage(signer) + if err != nil { + return // Also invalid block, bail out + } + idx++ + newStatedb.Prepare(tx.Hash(), header.Hash(), idx) + precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm) + gaspool.SetGas(gasLimit) + case <-stopCh: + return + } + } + }(txCh, interruptCh) + } + go func(txs *types.TransactionsByPriceAndNonce) { + count := 0 + for { + tx := txs.Peek() + if tx == nil { + return + } + select { + case <-interruptCh: + return + default: + } + if count++; count%10 == 0 { + if *txCurr == nil { + return + } + txs.Forward(*txCurr) + } + txCh <- tx + txs.Shift() + } + }(txs) +} + // precacheTransaction attempts to apply a transaction to the given state database // and uses the input parameters for its environment. The goal is not to execute // the transaction successfully, rather to warm up touched data slots. diff --git a/core/types.go b/core/types.go index 5ed4817e68..61722aea74 100644 --- a/core/types.go +++ b/core/types.go @@ -40,6 +40,8 @@ type Prefetcher interface { // the transaction messages using the statedb, but any changes are discarded. The // only goal is to pre-cache transaction signatures and state trie nodes. Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) + // PrefetchMining used for pre-caching transaction signatures and state trie nodes. Only used for mining stage. + PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction) } // Processor is an interface for processing blocks using a given initial state. diff --git a/core/types/transaction.go b/core/types/transaction.go index 74c011544b..500a9d211b 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -458,6 +458,21 @@ func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transa } } +// Copy copy a new TransactionsPriceAndNonce with the same *transaction +func (t *TransactionsByPriceAndNonce) Copy() *TransactionsByPriceAndNonce { + heads := make([]*Transaction, len(t.heads)) + copy(heads, t.heads) + txs := make(map[common.Address]Transactions, len(t.txs)) + for acc, txsTmp := range t.txs { + txs[acc] = txsTmp + } + return &TransactionsByPriceAndNonce{ + heads: heads, + txs: txs, + signer: t.signer, + } +} + // Peek returns the next transaction by price. func (t *TransactionsByPriceAndNonce) Peek() *Transaction { if len(t.heads) == 0 { @@ -488,6 +503,47 @@ func (t *TransactionsByPriceAndNonce) CurrentSize() int { return len(t.heads) } +//Forward move t to be one index behind tx, tx cant be nil +func (t *TransactionsByPriceAndNonce) Forward(tx *Transaction) { + if tx == nil { + txTmp := t.Peek() + for txTmp != nil { + t.Shift() + txTmp = t.Peek() + } + return + } + + l := len(t.heads) + acc, _ := Sender(t.signer, tx) + for i := 0; i < l; i++ { + accTmp, _ := Sender(t.signer, t.heads[i]) + if acc == accTmp { + if tx == t.heads[i] { + txTmp := t.Peek() + for txTmp != tx { + t.Shift() + txTmp = t.Peek() + } + t.Shift() + return + } + for _, txTmp := range t.txs[accTmp] { + if txTmp == tx { + txTmp = t.Peek() + for txTmp != tx { + t.Shift() + txTmp = t.Peek() + } + t.Shift() + return + } + } + return + } + } +} + // Message is a fully derived transaction and implements core.Message // // NOTE: In a future PR this will be removed. diff --git a/core/types/transaction_test.go b/core/types/transaction_test.go index 3cece9c235..c81b7b8647 100644 --- a/core/types/transaction_test.go +++ b/core/types/transaction_test.go @@ -358,6 +358,75 @@ func TestTransactionTimeSort(t *testing.T) { } } +func TestTransactionForward(t *testing.T) { + // Generate a batch of accounts to start with + keys := make([]*ecdsa.PrivateKey, 5) + for i := 0; i < len(keys); i++ { + keys[i], _ = crypto.GenerateKey() + } + signer := HomesteadSigner{} + + // Generate a batch of transactions with overlapping prices, but different creation times + groups := map[common.Address]Transactions{} + for start, key := range keys { + addr := crypto.PubkeyToAddress(key.PublicKey) + + tx, _ := SignTx(NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key) + tx2, _ := SignTx(NewTransaction(1, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key) + + tx.time = time.Unix(0, int64(len(keys)-start)) + tx2.time = time.Unix(1, int64(len(keys)-start)) + + groups[addr] = append(groups[addr], tx) + groups[addr] = append(groups[addr], tx2) + + } + // Sort the transactions + txset := NewTransactionsByPriceAndNonce(signer, groups) + txsetCpy := txset.Copy() + + txs := Transactions{} + for tx := txsetCpy.Peek(); tx != nil; tx = txsetCpy.Peek() { + txs = append(txs, tx) + txsetCpy.Shift() + } + + tmp := txset.Copy() + for j := 0; j < 10; j++ { + txset = tmp.Copy() + txsetCpy = tmp.Copy() + i := 0 + for ; i < j; i++ { + txset.Shift() + } + tx := txset.Peek() + txsetCpy.Forward(tx) + txCpy := txsetCpy.Peek() + if txCpy == nil { + if tx == nil { + continue + } + txset.Shift() + if txset.Peek() != nil { + t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx) + } else { + continue + } + } + txset.Shift() + for ; i < len(txs)-1; i++ { + tx = txset.Peek() + txCpy = txsetCpy.Peek() + if txCpy != tx { + t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx) + } + txsetCpy.Shift() + txset.Shift() + } + + } +} + // TestTransactionCoding tests serializing/de-serializing to/from rlp and JSON. func TestTransactionCoding(t *testing.T) { key, err := crypto.GenerateKey() diff --git a/miner/worker.go b/miner/worker.go index 28ef170e40..8c14fb5775 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -130,6 +130,7 @@ type intervalAdjust struct { // worker is the main object which takes care of submitting new work to consensus engine // and gathering the sealing result. type worker struct { + prefetcher core.Prefetcher config *Config chainConfig *params.ChainConfig engine consensus.Engine @@ -196,6 +197,7 @@ type worker struct { func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker { worker := &worker{ + prefetcher: core.NewStatePrefetcher(chainConfig, eth.BlockChain(), engine), config: config, chainConfig: chainConfig, engine: engine, @@ -778,6 +780,13 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin } bloomProcessors := core.NewAsyncReceiptBloomGenerator(processorCapacity) + // var interruptPrefetch uint32 + interruptCh := make(chan struct{}) + var txCurr **types.Transaction + //prefetch txs from all pending txs + txsPrefetch := txs.Copy() + w.prefetcher.PrefetchMining(txsPrefetch, w.current.header, w.current.gasPool.Gas(), w.current.state.Copy(), *w.chain.GetVMConfig(), interruptCh, txCurr) + LOOP: for { // In the following three cases, we will interrupt the execution of the transaction. @@ -798,6 +807,7 @@ LOOP: inc: true, } } + close(interruptCh) return atomic.LoadInt32(interrupt) == commitInterruptNewHead } // If we don't have enough gas for any further transactions then we're done @@ -815,6 +825,7 @@ LOOP: } // Retrieve the next transaction and abort if all done tx := txs.Peek() + txCurr = &tx if tx == nil { break } @@ -868,6 +879,7 @@ LOOP: txs.Shift() } } + close(interruptCh) bloomProcessors.Close() if !w.isRunning() && len(coalescedLogs) > 0 {