Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R]Prefetch state data on mining process #803

Merged
merged 11 commits into from
Mar 28, 2022
58 changes: 58 additions & 0 deletions core/state_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
)

const prefetchThread = 2
const checkInterval = 10

// statePrefetcher is a basic Prefetcher, which blindly executes a block on top
// of an arbitrary state with the goal of prefetching potentially useful state
Expand Down Expand Up @@ -88,6 +89,63 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
}
}

// PrefetchMining processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and snapshot clean state. Only used for mining stage
func (p *statePrefetcher) PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction) {
var signer = types.MakeSigner(p.config, header.Number)

txCh := make(chan *types.Transaction, 2*prefetchThread)
for i := 0; i < prefetchThread; i++ {
go func(startCh <-chan *types.Transaction, stopCh <-chan struct{}) {
idx := 0
newStatedb := statedb.Copy()
gaspool := new(GasPool).AddGas(gasLimit)
blockContext := NewEVMBlockContext(header, p.bc, nil)
evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
// Iterate over and process the individual transactions
for {
select {
case tx := <-startCh:
// Convert the transaction into an executable message and pre-cache its sender
msg, err := tx.AsMessageNoNonceCheck(signer)
if err != nil {
return // Also invalid block, bail out
}
idx++
newStatedb.Prepare(tx.Hash(), header.Hash(), idx)
precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm)
gaspool = new(GasPool).AddGas(gasLimit)
case <-stopCh:
return
}
}
}(txCh, interruptCh)
}
go func(txset *types.TransactionsByPriceAndNonce) {
count := 0
for {
tx := txset.Peek()
if tx == nil {
return
}
select {
case <-interruptCh:
return
default:
}
if count++; count%checkInterval == 0 {
if *txCurr == nil {
setunapo marked this conversation as resolved.
Show resolved Hide resolved
return
}
txset.Forward(*txCurr)
}
txCh <- tx
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
txset.Shift()
}
}(txs)
}

// precacheTransaction attempts to apply a transaction to the given state database
// and uses the input parameters for its environment. The goal is not to execute
// the transaction successfully, rather to warm up touched data slots.
Expand Down
2 changes: 2 additions & 0 deletions core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Prefetcher interface {
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and state trie nodes.
Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32)
// PrefetchMining used for pre-caching transaction signatures and state trie nodes. Only used for mining stage.
PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction)
}

// Processor is an interface for processing blocks using a given initial state.
Expand Down
71 changes: 71 additions & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,21 @@ func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transa
}
}

// Copy copys a new TransactionsPriceAndNonce with the same *transaction
func (t *TransactionsByPriceAndNonce) Copy() *TransactionsByPriceAndNonce {
heads := make([]*Transaction, len(t.heads))
copy(heads, t.heads)
txs := make(map[common.Address]Transactions, len(t.txs))
for acc, txsTmp := range t.txs {
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
txs[acc] = txsTmp
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
}
return &TransactionsByPriceAndNonce{
heads: heads,
txs: txs,
signer: t.signer,
setunapo marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Peek returns the next transaction by price.
func (t *TransactionsByPriceAndNonce) Peek() *Transaction {
if len(t.heads) == 0 {
Expand Down Expand Up @@ -488,6 +503,44 @@ func (t *TransactionsByPriceAndNonce) CurrentSize() int {
return len(t.heads)
}

//Forward moves current transaction to be the one which is one index after tx
func (t *TransactionsByPriceAndNonce) Forward(tx *Transaction) {
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
if tx == nil {
t.heads = t.heads[0:0]
return
}
//check whether target tx exists in t.heads
for _, head := range t.heads {
if tx == head {
//shift t to the position one after tx
txTmp := t.Peek()
for txTmp != tx {
t.Shift()
txTmp = t.Peek()
}
t.Shift()
return
}
}
//get the sender address of tx
acc, _ := Sender(t.signer, tx)
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
setunapo marked this conversation as resolved.
Show resolved Hide resolved
//check whether target tx exists in t.txs
if txs, ok := t.txs[acc]; ok {
for _, txTmp := range txs {
//found the same pointer in t.txs as tx and then shift t to the position one after tx
if txTmp == tx {
txTmp = t.Peek()
setunapo marked this conversation as resolved.
Show resolved Hide resolved
for txTmp != tx {
t.Shift()
txTmp = t.Peek()
}
t.Shift()
return
}
}
}
}

// Message is a fully derived transaction and implements core.Message
//
// NOTE: In a future PR this will be removed.
Expand Down Expand Up @@ -535,6 +588,24 @@ func (tx *Transaction) AsMessage(s Signer) (Message, error) {
return msg, err
}

// AsMessageNoNonceCheck returns the transaction with checkNonce field set to be false.
func (tx *Transaction) AsMessageNoNonceCheck(s Signer) (Message, error) {
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
msg := Message{
nonce: tx.Nonce(),
gasLimit: tx.Gas(),
gasPrice: new(big.Int).Set(tx.GasPrice()),
to: tx.To(),
amount: tx.Value(),
data: tx.Data(),
accessList: tx.AccessList(),
checkNonce: false,
}

var err error
msg.from, err = Sender(s, tx)
return msg, err
}

func (m Message) From() common.Address { return m.from }
func (m Message) To() *common.Address { return m.to }
func (m Message) GasPrice() *big.Int { return m.gasPrice }
Expand Down
69 changes: 69 additions & 0 deletions core/types/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,75 @@ func TestTransactionTimeSort(t *testing.T) {
}
}

func TestTransactionForward(t *testing.T) {
// Generate a batch of accounts to start with
keys := make([]*ecdsa.PrivateKey, 5)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
}
signer := HomesteadSigner{}

// Generate a batch of transactions with overlapping prices, but different creation times
groups := map[common.Address]Transactions{}
for start, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)

tx, _ := SignTx(NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key)
tx2, _ := SignTx(NewTransaction(1, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key)

tx.time = time.Unix(0, int64(len(keys)-start))
tx2.time = time.Unix(1, int64(len(keys)-start))

groups[addr] = append(groups[addr], tx)
groups[addr] = append(groups[addr], tx2)

}
// Sort the transactions
txset := NewTransactionsByPriceAndNonce(signer, groups)
txsetCpy := txset.Copy()

txs := Transactions{}
for tx := txsetCpy.Peek(); tx != nil; tx = txsetCpy.Peek() {
txs = append(txs, tx)
txsetCpy.Shift()
}

tmp := txset.Copy()
for j := 0; j < 10; j++ {
txset = tmp.Copy()
txsetCpy = tmp.Copy()
i := 0
for ; i < j; i++ {
txset.Shift()
}
tx := txset.Peek()
txsetCpy.Forward(tx)
txCpy := txsetCpy.Peek()
if txCpy == nil {
if tx == nil {
continue
}
txset.Shift()
if txset.Peek() != nil {
t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx)
} else {
continue
}
}
txset.Shift()
for ; i < len(txs)-1; i++ {
tx = txset.Peek()
txCpy = txsetCpy.Peek()
if txCpy != tx {
t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx)
}
txsetCpy.Shift()
txset.Shift()
}

}
}

// TestTransactionCoding tests serializing/de-serializing to/from rlp and JSON.
func TestTransactionCoding(t *testing.T) {
key, err := crypto.GenerateKey()
Expand Down
12 changes: 11 additions & 1 deletion miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ type intervalAdjust struct {
// worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result.
type worker struct {
prefetcher core.Prefetcher
config *Config
chainConfig *params.ChainConfig
engine consensus.Engine
Expand Down Expand Up @@ -196,6 +197,7 @@ type worker struct {

func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker {
worker := &worker{
prefetcher: core.NewStatePrefetcher(chainConfig, eth.BlockChain(), engine),
config: config,
chainConfig: chainConfig,
engine: engine,
Expand Down Expand Up @@ -778,6 +780,14 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
}
bloomProcessors := core.NewAsyncReceiptBloomGenerator(processorCapacity)

interruptCh := make(chan struct{})
defer close(interruptCh)
tx := &types.Transaction{}
txCurr := &tx
//prefetch txs from all pending txs
txsPrefetch := txs.Copy()
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
w.prefetcher.PrefetchMining(txsPrefetch, w.current.header, w.current.gasPool.Gas(), w.current.state.Copy(), *w.chain.GetVMConfig(), interruptCh, txCurr)

LOOP:
for {
// In the following three cases, we will interrupt the execution of the transaction.
Expand Down Expand Up @@ -814,7 +824,7 @@ LOOP:
}
}
// Retrieve the next transaction and abort if all done
tx := txs.Peek()
tx = txs.Peek()
if tx == nil {
break
}
Expand Down