diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index c48c1bcfce..cb6741bace 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -548,7 +548,7 @@ var ( } MinerDelayLeftoverFlag = cli.DurationFlag{ Name: "miner.delayleftover", - Usage: "Time interval to for broadcast block", + Usage: "Time reserved to finalize a block", Value: ethconfig.Defaults.Miner.DelayLeftOver, } MinerNoVerfiyFlag = cli.BoolFlag{ diff --git a/consensus/beacon/consensus.go b/consensus/beacon/consensus.go index cd493f3d65..8282ed7cb4 100644 --- a/consensus/beacon/consensus.go +++ b/consensus/beacon/consensus.go @@ -264,7 +264,7 @@ func (beacon *Beacon) Prepare(chain consensus.ChainHeaderReader, header *types.H return nil } -func (beacon *Beacon) Delay(_ consensus.ChainReader, _ *types.Header) *time.Duration { +func (beacon *Beacon) Delay(_ consensus.ChainReader, _ *types.Header, _ *time.Duration) *time.Duration { return nil } diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 75ed916a86..a258f1fe5f 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -591,7 +591,7 @@ func (c *Clique) Authorize(signer common.Address, signFn SignerFn) { c.signFn = signFn } -func (c *Clique) Delay(chain consensus.ChainReader, header *types.Header) *time.Duration { +func (c *Clique) Delay(chain consensus.ChainReader, header *types.Header, leftOver *time.Duration) *time.Duration { return nil } diff --git a/consensus/consensus.go b/consensus/consensus.go index c3e7b4870a..87632a9d0d 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -126,7 +126,7 @@ type Engine interface { APIs(chain ChainHeaderReader) []rpc.API // Delay returns the max duration the miner can commit txs - Delay(chain ChainReader, header *types.Header) *time.Duration + Delay(chain ChainReader, header *types.Header, leftOver *time.Duration) *time.Duration // Close terminates any background threads maintained by the consensus engine. Close() error diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index be6085c713..12a69c127a 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -610,7 +610,7 @@ func (ethash *Ethash) FinalizeAndAssemble(chain consensus.ChainHeaderReader, hea return types.NewBlock(header, txs, uncles, receipts, trie.NewStackTrie(nil)), receipts, nil } -func (ethash *Ethash) Delay(_ consensus.ChainReader, _ *types.Header) *time.Duration { +func (ethash *Ethash) Delay(_ consensus.ChainReader, _ *types.Header, _ *time.Duration) *time.Duration { return nil } diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index fee0fe1293..2e544803ef 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -793,13 +793,25 @@ func (p *Parlia) Authorize(val common.Address, signFn SignerFn, signTxFn SignerT p.signTxFn = signTxFn } -func (p *Parlia) Delay(chain consensus.ChainReader, header *types.Header) *time.Duration { +// Argument leftOver is the time reserved for block finalize(calculate root, distribute income...) +func (p *Parlia) Delay(chain consensus.ChainReader, header *types.Header, leftOver *time.Duration) *time.Duration { number := header.Number.Uint64() snap, err := p.snapshot(chain, number-1, header.ParentHash, nil) if err != nil { return nil } delay := p.delayForRamanujanFork(snap, header) + + if *leftOver >= time.Duration(p.config.Period)*time.Second { + // ignore invalid leftOver + log.Error("Delay invalid argument", "leftOver", leftOver.String(), "Period", p.config.Period) + } else if *leftOver >= delay { + delay = time.Duration(0) + return &delay + } else { + delay = delay - *leftOver + } + // The blocking time should be no more than half of period half := time.Duration(p.config.Period) * time.Second / 2 if delay > half { diff --git a/miner/miner.go b/miner/miner.go index 4b20599d66..0ea2c0ea13 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -48,7 +48,7 @@ type Config struct { Notify []string `toml:",omitempty"` // HTTP URL list to be notified of new work packages (only useful in ethash). NotifyFull bool `toml:",omitempty"` // Notify with pending block headers instead of work packages ExtraData hexutil.Bytes `toml:",omitempty"` // Block extra data set by the miner - DelayLeftOver time.Duration // Time for broadcast block + DelayLeftOver time.Duration // Time reserved to finalize a block(calculate root, distribute income...) GasFloor uint64 // Target gas floor for mined blocks. GasCeil uint64 // Target gas ceiling for mined blocks. GasPrice *big.Int // Minimum gas price for mining a transaction diff --git a/miner/worker.go b/miner/worker.go index 3e0d225f78..4e25d048a2 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -65,7 +65,6 @@ const ( ) var ( - commitTxsTimer = metrics.NewRegisteredTimer("worker/committxs", nil) writeBlockTimer = metrics.NewRegisteredTimer("worker/writeblock", nil) finalizeBlockTimer = metrics.NewRegisteredTimer("worker/finalizeblock", nil) ) @@ -542,33 +541,7 @@ func (w *worker) mainLoop() { } case ev := <-w.txsCh: - // Apply transactions to the pending state if we're not sealing - // - // Note all transactions received may not be continuous with transactions - // already included in the current sealing block. These transactions will - // be automatically eliminated. - if !w.isRunning() && w.current != nil { - start := time.Now() - // If block is already full, abort - if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas { - continue - } - txs := make(map[common.Address]types.Transactions) - for _, tx := range ev.Txs { - acc, _ := types.Sender(w.current.signer, tx) - txs[acc] = append(txs[acc], tx) - } - txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee) - tcount := w.current.tcount - w.commitTransactions(w.current, txset, nil) - commitTxsTimer.UpdateSince(start) - - // Only update the snapshot if any new transactions were added - // to the pending block - if tcount != w.current.tcount { - w.updateSnapshot(w.current) - } - } else { + if w.isRunning() { // Special case, if the consensus engine is 0 period clique(dev mode), // submit sealing work here since all empty submission will be rejected // by clique. Of course the advance sealing(empty submission) is disabled. @@ -577,6 +550,7 @@ func (w *worker) mainLoop() { w.commitWork(nil, true, time.Now().Unix()) } } + atomic.AddInt32(&w.newTxs, int32(len(ev.Txs))) // System stopped @@ -797,7 +771,8 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, rece return receipt.Logs, nil } -func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interruptCh chan int32) bool { +func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, + interruptCh chan int32, stopTimer *time.Timer) bool { gasLimit := env.header.GasLimit if env.gasPool == nil { env.gasPool = new(core.GasPool).AddGas(gasLimit) @@ -809,13 +784,6 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP } var coalescedLogs []*types.Log - var stopTimer *time.Timer - delay := w.engine.Delay(w.chain, env.header) - if delay != nil { - stopTimer = time.NewTimer(*delay - w.config.DelayLeftOver) - log.Debug("Time left for mining work", "left", (*delay - w.config.DelayLeftOver).String(), "leftover", w.config.DelayLeftOver) - defer stopTimer.Stop() - } // initilise bloom processors processorCapacity := 100 if txs.CurrentSize() < processorCapacity { @@ -1048,15 +1016,24 @@ func (w *worker) fillTransactions(interruptCh chan int32, env *environment) { localTxs[account] = txs } } + + var stopTimer *time.Timer + delay := w.engine.Delay(w.chain, env.header, &w.config.DelayLeftOver) + if delay != nil { + stopTimer = time.NewTimer(*delay) + log.Debug("Time left for mining work", "delay", delay.String()) + defer stopTimer.Stop() + } + if len(localTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, interruptCh) { + if w.commitTransactions(env, txs, interruptCh, stopTimer) { return } } if len(remoteTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, interruptCh) { + if w.commitTransactions(env, txs, interruptCh, stopTimer) { return } }