Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R]Prefetch state data on mining process #803

Merged
merged 11 commits into from
Mar 28, 2022
9 changes: 9 additions & 0 deletions core/gaspool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ import (
// in a block. The zero value is a pool with zero gas available.
type GasPool uint64

// SetGas set an initial value for gaspool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set -> sets

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

func (gp *GasPool) SetGas(amount uint64) *GasPool {
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
setunapo marked this conversation as resolved.
Show resolved Hide resolved
if amount > math.MaxUint64 {
panic("gas pool pushed above uint64")
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
}
*(*uint64)(gp) = amount
return gp
}

// AddGas makes gas available for execution.
func (gp *GasPool) AddGas(amount uint64) *GasPool {
if uint64(*gp) > math.MaxUint64-amount {
Expand Down
57 changes: 57 additions & 0 deletions core/state_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,63 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
}
}

// PrefetchMining processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and snapshot clean state. Only used for mining stage
func (p *statePrefetcher) PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction) {
var signer = types.MakeSigner(p.config, header.Number)

txCh := make(chan *types.Transaction, 2*prefetchThread)
for i := 0; i < prefetchThread; i++ {
go func(txCh <-chan *types.Transaction, stopCh <-chan struct{}) {
setunapo marked this conversation as resolved.
Show resolved Hide resolved
idx := 0
newStatedb := statedb.Copy()
gaspool := new(GasPool).AddGas(gasLimit)
blockContext := NewEVMBlockContext(header, p.bc, nil)
evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
// Iterate over and process the individual transactions
for {
select {
case tx := <-txCh:
// Convert the transaction into an executable message and pre-cache its sender
msg, err := tx.AsMessage(signer)
setunapo marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return // Also invalid block, bail out
}
idx++
newStatedb.Prepare(tx.Hash(), header.Hash(), idx)
precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm)
gaspool.SetGas(gasLimit)
setunapo marked this conversation as resolved.
Show resolved Hide resolved
case <-stopCh:
return
}
}
}(txCh, interruptCh)
}
go func(txs *types.TransactionsByPriceAndNonce) {
count := 0
for {
tx := txs.Peek()
if tx == nil {
return
}
select {
case <-interruptCh:
return
default:
}
if count++; count%10 == 0 {
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
if *txCurr == nil {
setunapo marked this conversation as resolved.
Show resolved Hide resolved
return
}
txs.Forward(*txCurr)
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
}
txCh <- tx
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
txs.Shift()
}
}(txs)
}

// precacheTransaction attempts to apply a transaction to the given state database
// and uses the input parameters for its environment. The goal is not to execute
// the transaction successfully, rather to warm up touched data slots.
Expand Down
2 changes: 2 additions & 0 deletions core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Prefetcher interface {
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and state trie nodes.
Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32)
// PrefetchMining used for pre-caching transaction signatures and state trie nodes. Only used for mining stage.
PrefetchMining(txs *types.TransactionsByPriceAndNonce, header *types.Header, gasLimit uint64, statedb *state.StateDB, cfg vm.Config, interruptCh <-chan struct{}, txCurr **types.Transaction)
}

// Processor is an interface for processing blocks using a given initial state.
Expand Down
56 changes: 56 additions & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,21 @@ func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transa
}
}

// Copy copy a new TransactionsPriceAndNonce with the same *transaction
setunapo marked this conversation as resolved.
Show resolved Hide resolved
func (t *TransactionsByPriceAndNonce) Copy() *TransactionsByPriceAndNonce {
heads := make([]*Transaction, len(t.heads))
copy(heads, t.heads)
txs := make(map[common.Address]Transactions, len(t.txs))
for acc, txsTmp := range t.txs {
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
txs[acc] = txsTmp
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
}
return &TransactionsByPriceAndNonce{
heads: heads,
txs: txs,
signer: t.signer,
setunapo marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Peek returns the next transaction by price.
func (t *TransactionsByPriceAndNonce) Peek() *Transaction {
if len(t.heads) == 0 {
Expand Down Expand Up @@ -488,6 +503,47 @@ func (t *TransactionsByPriceAndNonce) CurrentSize() int {
return len(t.heads)
}

//Forward move t to be one index behind tx, tx cant be nil
func (t *TransactionsByPriceAndNonce) Forward(tx *Transaction) {
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
if tx == nil {
txTmp := t.Peek()
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
for txTmp != nil {
t.Shift()
txTmp = t.Peek()
}
return
}

l := len(t.heads)
acc, _ := Sender(t.signer, tx)
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
setunapo marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i < l; i++ {
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
accTmp, _ := Sender(t.signer, t.heads[i])
if acc == accTmp {
if tx == t.heads[i] {
txTmp := t.Peek()
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
for txTmp != tx {
t.Shift()
txTmp = t.Peek()
}
t.Shift()
return
}
for _, txTmp := range t.txs[accTmp] {
if txTmp == tx {
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
txTmp = t.Peek()
for txTmp != tx {
t.Shift()
txTmp = t.Peek()
}
t.Shift()
return
}
}
return
}
}
}

// Message is a fully derived transaction and implements core.Message
//
// NOTE: In a future PR this will be removed.
Expand Down
69 changes: 69 additions & 0 deletions core/types/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,75 @@ func TestTransactionTimeSort(t *testing.T) {
}
}

func TestTransactionForward(t *testing.T) {
// Generate a batch of accounts to start with
keys := make([]*ecdsa.PrivateKey, 5)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
}
signer := HomesteadSigner{}

// Generate a batch of transactions with overlapping prices, but different creation times
groups := map[common.Address]Transactions{}
for start, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)

tx, _ := SignTx(NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key)
tx2, _ := SignTx(NewTransaction(1, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key)

tx.time = time.Unix(0, int64(len(keys)-start))
tx2.time = time.Unix(1, int64(len(keys)-start))

groups[addr] = append(groups[addr], tx)
groups[addr] = append(groups[addr], tx2)

}
// Sort the transactions
txset := NewTransactionsByPriceAndNonce(signer, groups)
txsetCpy := txset.Copy()

txs := Transactions{}
for tx := txsetCpy.Peek(); tx != nil; tx = txsetCpy.Peek() {
txs = append(txs, tx)
txsetCpy.Shift()
}

tmp := txset.Copy()
for j := 0; j < 10; j++ {
txset = tmp.Copy()
txsetCpy = tmp.Copy()
i := 0
for ; i < j; i++ {
txset.Shift()
}
tx := txset.Peek()
txsetCpy.Forward(tx)
txCpy := txsetCpy.Peek()
if txCpy == nil {
if tx == nil {
continue
}
txset.Shift()
if txset.Peek() != nil {
t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx)
} else {
continue
}
}
txset.Shift()
for ; i < len(txs)-1; i++ {
tx = txset.Peek()
txCpy = txsetCpy.Peek()
if txCpy != tx {
t.Errorf("forward got an incorrect result, got %v, want %v", txCpy, tx)
}
txsetCpy.Shift()
txset.Shift()
}

}
}

// TestTransactionCoding tests serializing/de-serializing to/from rlp and JSON.
func TestTransactionCoding(t *testing.T) {
key, err := crypto.GenerateKey()
Expand Down
14 changes: 13 additions & 1 deletion miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ type intervalAdjust struct {
// worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result.
type worker struct {
prefetcher core.Prefetcher
config *Config
chainConfig *params.ChainConfig
engine consensus.Engine
Expand Down Expand Up @@ -196,6 +197,7 @@ type worker struct {

func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker {
worker := &worker{
prefetcher: core.NewStatePrefetcher(chainConfig, eth.BlockChain(), engine),
config: config,
chainConfig: chainConfig,
engine: engine,
Expand Down Expand Up @@ -778,6 +780,14 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
}
bloomProcessors := core.NewAsyncReceiptBloomGenerator(processorCapacity)

// var interruptPrefetch uint32
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
interruptCh := make(chan struct{})
var tx *types.Transaction
txCurr := &tx
//prefetch txs from all pending txs
txsPrefetch := txs.Copy()
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
w.prefetcher.PrefetchMining(txsPrefetch, w.current.header, w.current.gasPool.Gas(), w.current.state.Copy(), *w.chain.GetVMConfig(), interruptCh, txCurr)

LOOP:
for {
// In the following three cases, we will interrupt the execution of the transaction.
Expand All @@ -798,6 +808,7 @@ LOOP:
inc: true,
}
}
close(interruptCh)
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
return atomic.LoadInt32(interrupt) == commitInterruptNewHead
}
// If we don't have enough gas for any further transactions then we're done
Expand All @@ -814,7 +825,7 @@ LOOP:
}
}
// Retrieve the next transaction and abort if all done
tx := txs.Peek()
tx = txs.Peek()
if tx == nil {
break
}
Expand Down Expand Up @@ -868,6 +879,7 @@ LOOP:
txs.Shift()
}
}
close(interruptCh)
bloomProcessors.Close()

if !w.isRunning() && len(coalescedLogs) > 0 {
Expand Down