Skip to content

Commit

Permalink
eth, miner: add timeout for building sealing block (ethereum#25407)
Browse files Browse the repository at this point in the history
* eth, miner: add timeout for building sealing block

* eth, cmd, miner: add newpayloadtimeout flag

* eth, miner, cmd: address comments

* eth, miner: minor fixes

Co-authored-by: Martin Holst Swende <martin@swende.se>
  • Loading branch information
2 people authored and shekhirin committed Jun 6, 2023
1 parent 59fb430 commit f2e275f
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 55 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ var (
utils.MinerExtraDataFlag,
utils.MinerRecommitIntervalFlag,
utils.MinerNoVerifyFlag,
utils.MinerNewPayloadTimeout,
utils.NATFlag,
utils.NoDiscoverFlag,
utils.DiscoveryV5Flag,
Expand Down
9 changes: 9 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,12 @@ var (
Usage: "Disable remote sealing verification",
Category: flags.MinerCategory,
}
MinerNewPayloadTimeout = &cli.DurationFlag{
Name: "miner.newpayload-timeout",
Usage: "Specify the maximum time allowance for creating a new payload",
Value: ethconfig.Defaults.Miner.NewPayloadTimeout,
Category: flags.MinerCategory,
}

// Account settings
UnlockedAccountFlag = &cli.StringFlag{
Expand Down Expand Up @@ -1658,6 +1664,9 @@ func setMiner(ctx *cli.Context, cfg *miner.Config) {
if ctx.IsSet(MinerNoVerifyFlag.Name) {
cfg.Noverify = ctx.Bool(MinerNoVerifyFlag.Name)
}
if ctx.IsSet(MinerNewPayloadTimeout.Name) {
cfg.NewPayloadTimeout = ctx.Duration(MinerNewPayloadTimeout.Name)
}
}

func setRequiredBlocks(ctx *cli.Context, cfg *ethconfig.Config) {
Expand Down
7 changes: 2 additions & 5 deletions eth/catalyst/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,10 +476,9 @@ func TestExchangeTransitionConfig(t *testing.T) {
genesis, preMergeBlocks := generatePreMergeChain(10)
n, ethservice := startEthService(t, genesis, preMergeBlocks)
defer n.Close()
var (
api = NewConsensusAPI(ethservice)
)

// invalid ttd
api := NewConsensusAPI(ethservice)
config := beacon.TransitionConfigurationV1{
TerminalTotalDifficulty: (*hexutil.Big)(big.NewInt(0)),
TerminalBlockHash: common.Hash{},
Expand Down Expand Up @@ -812,10 +811,8 @@ func TestInvalidBloom(t *testing.T) {

func TestNewPayloadOnInvalidTerminalBlock(t *testing.T) {
genesis, preMergeBlocks := generatePreMergeChain(100)
fmt.Println(genesis.Config.TerminalTotalDifficulty)
genesis.Config.TerminalTotalDifficulty = preMergeBlocks[0].Difficulty() //.Sub(genesis.Config.TerminalTotalDifficulty, preMergeBlocks[len(preMergeBlocks)-1].Difficulty())

fmt.Println(genesis.Config.TerminalTotalDifficulty)
n, ethservice := startEthService(t, genesis, preMergeBlocks)
defer n.Close()

Expand Down
16 changes: 6 additions & 10 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,12 @@ var Defaults = Config{
TrieTimeout: 60 * time.Minute,
SnapshotCache: 102,
FilterLogCacheSize: 32,
Miner: miner.Config{
GasCeil: 30000000,
GasPrice: big.NewInt(params.GWei),
Recommit: 3 * time.Second,
},
TxPool: core.DefaultTxPoolConfig,
RPCGasCap: 50000000,
RPCEVMTimeout: 5 * time.Second,
GPO: FullNodeGPO,
RPCTxFeeCap: 1, // 1 ether
Miner: miner.DefaultConfig,
TxPool: core.DefaultTxPoolConfig,
RPCGasCap: 50000000,
RPCEVMTimeout: 5 * time.Second,
GPO: FullNodeGPO,
RPCTxFeeCap: 1, // 1 ether
}

func init() {
Expand Down
10 changes: 10 additions & 0 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ type Config struct {
GasPrice *big.Int // Minimum gas price for mining a transaction
Recommit time.Duration // The time interval for miner to re-create mining work.
Noverify bool // Disable remote mining solution verification(only useful in ethash).

NewPayloadTimeout time.Duration // The maximum time allowance for creating a new payload
}

// DefaultConfig contains default settings for miner.
var DefaultConfig = Config{
GasCeil: 30000000,
GasPrice: big.NewInt(params.GWei),
Recommit: 3 * time.Second,
NewPayloadTimeout: 2 * time.Second,
}

// Miner creates blocks and searches for proof-of-work values.
Expand Down
116 changes: 80 additions & 36 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ const (
var (
errBlockInterruptedByNewHead = errors.New("new head arrived while building block")
errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block")
errBlockInterruptedByTimeout = errors.New("timeout while building block")
)

// environment is the worker's current environment and holds all
Expand Down Expand Up @@ -158,6 +159,7 @@ const (
commitInterruptNone int32 = iota
commitInterruptNewHead
commitInterruptResubmit
commitInterruptTimeout
)

// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
Expand Down Expand Up @@ -241,6 +243,13 @@ type worker struct {
// non-stop and no real transaction will be included.
noempty uint32

// newpayloadTimeout is the maximum timeout allowance for creating payload.
// The default value is 2 seconds but node operator can set it to arbitrary
// large value. A large timeout allowance may cause Geth to fail creating
// a non-empty payload within the specified time and eventually miss the slot
// in case there are some computation expensive transactions in txpool.
newpayloadTimeout time.Duration

// External functions
isLocalBlock func(header *types.Header) bool // Function used to determine whether the specified block is mined by local miner.

Expand Down Expand Up @@ -288,6 +297,16 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
recommit = minRecommitInterval
}
// Sanitize the timeout config for creating payload.
newpayloadTimeout := worker.config.NewPayloadTimeout
if newpayloadTimeout == 0 {
log.Warn("Sanitizing new payload timeout to default", "provided", newpayloadTimeout, "updated", DefaultConfig.NewPayloadTimeout)
newpayloadTimeout = DefaultConfig.NewPayloadTimeout
}
if newpayloadTimeout < time.Millisecond*100 {
log.Warn("Low payload timeout may cause high amount of non-full blocks", "provided", newpayloadTimeout, "default", DefaultConfig.NewPayloadTimeout)
}
worker.newpayloadTimeout = newpayloadTimeout

worker.wg.Add(4)
go worker.mainLoop()
Expand Down Expand Up @@ -844,42 +863,26 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
var coalescedLogs []*types.Log

for {
// In the following three cases, we will interrupt the execution of the transaction.
// (1) new head block event arrival, the interrupt signal is 1
// (2) worker start or restart, the interrupt signal is 1
// (3) worker recreate the sealing block with any newly arrived transactions, the interrupt signal is 2.
// For the first two cases, the semi-finished work will be discarded.
// For the third case, the semi-finished work will be submitted to the consensus engine.
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
ratio := float64(gasLimit-env.gasPool.Gas()) / float64(gasLimit)
if ratio < 0.1 {
ratio = 0.1
}
w.resubmitAdjustCh <- &intervalAdjust{
ratio: ratio,
inc: true,
}
return errBlockInterruptedByRecommit
// Check interruption signal and abort building if it's fired.
if interrupt != nil {
if signal := atomic.LoadInt32(interrupt); signal != commitInterruptNone {
return signalToErr(signal)
}
return errBlockInterruptedByNewHead
}
// If we don't have enough gas for any further transactions then we're done
// If we don't have enough gas for any further transactions then we're done.
if env.gasPool.Gas() < params.TxGas {
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
break
}
// Retrieve the next transaction and abort if all done
// Retrieve the next transaction and abort if all done.
tx := txs.Peek()
if tx == nil {
break
}
// Error may be ignored here. The error has already been checked
// during transaction acceptance is the transaction pool.
//
// We use the eip155 signer regardless of the current hf.
from, _ := types.Sender(env.signer, tx)

// Check whether the tx is replay protected. If we're not in the EIP155 hf
// phase, start ignoring the sender until we do.
if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) {
Expand Down Expand Up @@ -926,7 +929,6 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
txs.Shift()
}
}

if !w.isRunning() && len(coalescedLogs) > 0 {
// We don't push the pendingLogsEvent while we are sealing. The reason is that
// when we are sealing, the worker will regenerate a sealing block every 3 seconds.
Expand All @@ -942,11 +944,6 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
}
w.pendingLogsFeed.Send(cpy)
}
// Notify resubmit loop to decrease resubmitting interval if current interval is larger
// than the user-specified one.
if interrupt != nil {
w.resubmitAdjustCh <- &intervalAdjust{inc: false}
}
return nil
}

Expand Down Expand Up @@ -986,15 +983,15 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
}
timestamp = parent.Time() + 1
}
// Construct the sealing block header, set the extra field if it's allowed
num := parent.Number()
// Construct the sealing block header.
header := &types.Header{
ParentHash: parent.Hash(),
Number: num.Add(num, common.Big1),
Number: new(big.Int).Add(parent.Number(), common.Big1),
GasLimit: core.CalcGasLimit(parent.GasLimit(), w.config.GasCeil),
Time: timestamp,
Coinbase: genParams.coinbase,
}
// Set the extra field if it's allowed.
if !genParams.noExtra && len(w.extra) != 0 {
header.Extra = w.extra
}
Expand Down Expand Up @@ -1082,7 +1079,16 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) {
defer work.discard()

if !params.noTxs {
w.fillTransactions(nil, work)
interrupt := new(int32)
timer := time.AfterFunc(w.newpayloadTimeout, func() {
atomic.StoreInt32(interrupt, commitInterruptTimeout)
})
defer timer.Stop()

err := w.fillTransactions(interrupt, work)
if errors.Is(err, errBlockInterruptedByTimeout) {
log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(w.newpayloadTimeout))
}
}
return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
}
Expand Down Expand Up @@ -1113,13 +1119,36 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
if !noempty && atomic.LoadUint32(&w.noempty) == 0 {
w.commit(work.copy(), nil, false, start)
}

// Fill pending transactions from the txpool
// Fill pending transactions from the txpool into the block.
err = w.fillTransactions(interrupt, work)
if errors.Is(err, errBlockInterruptedByNewHead) {
switch {
case err == nil:
// The entire block is filled, decrease resubmit interval in case
// of current interval is larger than the user-specified one.
w.resubmitAdjustCh <- &intervalAdjust{inc: false}

case errors.Is(err, errBlockInterruptedByRecommit):
// Notify resubmit loop to increase resubmitting interval if the
// interruption is due to frequent commits.
gaslimit := work.header.GasLimit
ratio := float64(gaslimit-work.gasPool.Gas()) / float64(gaslimit)
if ratio < 0.1 {
ratio = 0.1
}
w.resubmitAdjustCh <- &intervalAdjust{
ratio: ratio,
inc: true,
}

case errors.Is(err, errBlockInterruptedByNewHead):
// If the block building is interrupted by newhead event, discard it
// totally. Committing the interrupted block introduces unnecessary
// delay, and possibly causes miner to mine on the previous head,
// which could result in higher uncle rate.
work.discard()
return
}
// Submit the generated block for consensus sealing.
w.commit(work.copy(), w.fullTaskHook, true, start)

// Swap out the old work with the new one, terminating any leftover
Expand Down Expand Up @@ -1231,3 +1260,18 @@ func totalFees(block *types.Block, receipts []*types.Receipt) *big.Float {
}
return new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether)))
}

// signalToErr converts the interruption signal to a concrete error type for return.
// The given signal must be a valid interruption signal.
func signalToErr(signal int32) error {
switch signal {
case commitInterruptNewHead:
return errBlockInterruptedByNewHead
case commitInterruptResubmit:
return errBlockInterruptedByRecommit
case commitInterruptTimeout:
return errBlockInterruptedByTimeout
default:
panic(fmt.Errorf("undefined signal %d", signal))
}
}
8 changes: 4 additions & 4 deletions miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,21 +523,21 @@ func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine co
}

func TestGetSealingWorkEthash(t *testing.T) {
testGetSealingWork(t, ethashChainConfig, ethash.NewFaker(), false)
testGetSealingWork(t, ethashChainConfig, ethash.NewFaker())
}

func TestGetSealingWorkClique(t *testing.T) {
testGetSealingWork(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase()), false)
testGetSealingWork(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase()))
}

func TestGetSealingWorkPostMerge(t *testing.T) {
local := new(params.ChainConfig)
*local = *ethashChainConfig
local.TerminalTotalDifficulty = big.NewInt(0)
testGetSealingWork(t, local, ethash.NewFaker(), true)
testGetSealingWork(t, local, ethash.NewFaker())
}

func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, postMerge bool) {
func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) {
defer engine.Close()

w, b := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0)
Expand Down

0 comments on commit f2e275f

Please sign in to comment.