Skip to content

Commit

Permalink
worker: change interrupt type from *int to chan
Browse files Browse the repository at this point in the history
channel operation is preferred than atomic value check in golang.
And it will help for the further refactor on worker.
  • Loading branch information
setunapo authored and brilliant-lx committed Nov 15, 2022
1 parent b4dcff5 commit d1ed977
Showing 1 changed file with 36 additions and 27 deletions.
63 changes: 36 additions & 27 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,15 @@ type task struct {
}

const (
commitInterruptNone int32 = iota
commitInterruptNewHead
commitInterruptResubmit
commitInterruptNewHead int32 = 1
commitInterruptResubmit int32 = 2
)

// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
type newWorkReq struct {
interrupt *int32
noempty bool
timestamp int64
interruptCh chan int32
noempty bool
timestamp int64
}

// getWorkReq represents a request for getting a new sealing work with provided parameters.
Expand Down Expand Up @@ -377,7 +376,7 @@ func (w *worker) close() {
func (w *worker) newWorkLoop(recommit time.Duration) {
defer w.wg.Done()
var (
interrupt *int32
interruptCh chan int32
minRecommit = recommit // minimal resubmit interval specified by user.
timestamp int64 // timestamp for each round of sealing.
)
Expand All @@ -387,13 +386,15 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
<-timer.C // discard the initial tick

// commit aborts in-flight transaction execution with given signal and resubmits a new one.
commit := func(noempty bool, s int32) {
if interrupt != nil {
atomic.StoreInt32(interrupt, s)
commit := func(noempty bool, reason int32) {
if interruptCh != nil {
// each commit work will have its own interruptCh to stop work with a reason
interruptCh <- reason
close(interruptCh)
}
interrupt = new(int32)
interruptCh = make(chan int32, 1)
select {
case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}:
case w.newWorkCh <- &newWorkReq{interruptCh: interruptCh, noempty: noempty, timestamp: timestamp}:
case <-w.exitCh:
return
}
Expand Down Expand Up @@ -489,7 +490,7 @@ func (w *worker) mainLoop() {
for {
select {
case req := <-w.newWorkCh:
w.commitWork(req.interrupt, req.noempty, req.timestamp)
w.commitWork(req.interruptCh, req.noempty, req.timestamp)

case req := <-w.getWorkCh:
block, err := w.generateWork(req.params)
Expand Down Expand Up @@ -796,7 +797,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, rece
return receipt.Logs, nil
}

func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) bool {
func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interruptCh chan int32) bool {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
Expand All @@ -822,24 +823,32 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
}
bloomProcessors := core.NewAsyncReceiptBloomGenerator(processorCapacity)

interruptCh := make(chan struct{})
defer close(interruptCh)
stopPrefetchCh := make(chan struct{})
defer close(stopPrefetchCh)
//prefetch txs from all pending txs
txsPrefetch := txs.Copy()
tx := txsPrefetch.Peek()
txCurr := &tx
w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.CopyDoPrefetch(), *w.chain.GetVMConfig(), interruptCh, txCurr)
w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.CopyDoPrefetch(), *w.chain.GetVMConfig(), stopPrefetchCh, txCurr)

LOOP:
for {
// In the following three cases, we will interrupt the execution of the transaction.
// (1) new head block event arrival, the interrupt signal is 1
// (2) worker start or restart, the interrupt signal is 1
// (3) worker recreate the sealing block with any newly arrived transactions, the interrupt signal is 2.
// (1) new head block event arrival, the reason is 1
// (2) worker start or restart, the reason is 1
// (3) worker recreate the sealing block with any newly arrived transactions, the reason is 2.
// For the first two cases, the semi-finished work will be discarded.
// For the third case, the semi-finished work will be submitted to the consensus engine.
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
return atomic.LoadInt32(interrupt) == commitInterruptNewHead
if interruptCh != nil {
select {
case reason, ok := <-interruptCh:
if !ok {
// should never be here, since interruptCh should not be read before
log.Warn("commit transactions stopped unknown")
}
return reason == commitInterruptNewHead
default:
}
}
// If we don't have enough gas for any further transactions then we're done
if env.gasPool.Gas() < params.TxGas {
Expand Down Expand Up @@ -1028,7 +1037,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
// fillTransactions retrieves the pending transactions from the txpool and fills them
// into the given sealing block. The transaction selection and ordering strategy can
// be customized with the plugin in the future.
func (w *worker) fillTransactions(interrupt *int32, env *environment) {
func (w *worker) fillTransactions(interruptCh chan int32, env *environment) {
// Split the pending transactions into locals and remotes
// Fill the block with all available pending transactions.
pending := w.eth.TxPool().Pending(false)
Expand All @@ -1041,13 +1050,13 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) {
}
if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interrupt) {
if w.commitTransactions(env, txs, interruptCh) {
return
}
}
if len(remoteTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interrupt) {
if w.commitTransactions(env, txs, interruptCh) {
return
}
}
Expand All @@ -1068,7 +1077,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) {

// commitWork generates several new sealing tasks based on the parent block
// and submit them to the sealer.
func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
func (w *worker) commitWork(interruptCh chan int32, noempty bool, timestamp int64) {
start := time.Now()

// Set the coinbase if the worker is running or it's required
Expand All @@ -1094,7 +1103,7 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
w.commit(work, nil, false, start)
}
// Fill pending transactions from the txpool
w.fillTransactions(interrupt, work)
w.fillTransactions(interruptCh, work)
w.commit(work, w.fullTaskHook, true, start)

// Swap out the old work with the new one, terminating any leftover
Expand Down

0 comments on commit d1ed977

Please sign in to comment.