Skip to content

Commit

Permalink
parlia: Delay() with DelayLeftOver
Browse files Browse the repository at this point in the history
Right now, DelayLeftOver is used to reserve time for block finalize, not block
broadcast. And the code does not work as expected.

The general block generation could be described as:
|- fillTransactions -|- finalize a block -|- wait until the period(3s) reached -|- broadcast -|
  • Loading branch information
setunapo committed Nov 15, 2022
1 parent d1ed977 commit 7832b73
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 18 deletions.
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ var (
}
MinerDelayLeftoverFlag = cli.DurationFlag{
Name: "miner.delayleftover",
Usage: "Time interval to for broadcast block",
Usage: "Time reserved to finalize a block",
Value: ethconfig.Defaults.Miner.DelayLeftOver,
}
MinerNoVerfiyFlag = cli.BoolFlag{
Expand Down
2 changes: 1 addition & 1 deletion consensus/beacon/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (beacon *Beacon) Prepare(chain consensus.ChainHeaderReader, header *types.H
return nil
}

func (beacon *Beacon) Delay(_ consensus.ChainReader, _ *types.Header) *time.Duration {
func (beacon *Beacon) Delay(_ consensus.ChainReader, _ *types.Header, _ *time.Duration) *time.Duration {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/clique/clique.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ func (c *Clique) Authorize(signer common.Address, signFn SignerFn) {
c.signFn = signFn
}

func (c *Clique) Delay(chain consensus.ChainReader, header *types.Header) *time.Duration {
func (c *Clique) Delay(chain consensus.ChainReader, header *types.Header, leftOver *time.Duration) *time.Duration {
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ type Engine interface {
APIs(chain ChainHeaderReader) []rpc.API

// Delay returns the max duration the miner can commit txs
Delay(chain ChainReader, header *types.Header) *time.Duration
// Argument leftOver is the time reserved for block finalize(calculate root, distribute income...)
Delay(chain ChainReader, header *types.Header, leftOver *time.Duration) *time.Duration

// Close terminates any background threads maintained by the consensus engine.
Close() error
Expand Down
2 changes: 1 addition & 1 deletion consensus/ethash/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ func (ethash *Ethash) FinalizeAndAssemble(chain consensus.ChainHeaderReader, hea
return types.NewBlock(header, txs, uncles, receipts, trie.NewStackTrie(nil)), receipts, nil
}

func (ethash *Ethash) Delay(_ consensus.ChainReader, _ *types.Header) *time.Duration {
func (ethash *Ethash) Delay(_ consensus.ChainReader, _ *types.Header, _ *time.Duration) *time.Duration {
return nil
}

Expand Down
14 changes: 13 additions & 1 deletion consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,13 +793,25 @@ func (p *Parlia) Authorize(val common.Address, signFn SignerFn, signTxFn SignerT
p.signTxFn = signTxFn
}

func (p *Parlia) Delay(chain consensus.ChainReader, header *types.Header) *time.Duration {
func (p *Parlia) Delay(chain consensus.ChainReader, header *types.Header, leftOver *time.Duration) *time.Duration {
number := header.Number.Uint64()
snap, err := p.snapshot(chain, number-1, header.ParentHash, nil)
if err != nil {
return nil
}
delay := p.delayForRamanujanFork(snap, header)

// leftOver is the time reserved for root calculate and block broadcast.
if *leftOver >= time.Duration(p.config.Period)*time.Second {
// ignore invalid leftOver
log.Error("Delay invalid argument", "leftOver", leftOver.String(), "Period", p.config.Period)
} else if *leftOver >= delay {
delay = time.Duration(0)
return &delay
} else {
delay = delay - *leftOver
}

// The blocking time should be no more than half of period
half := time.Duration(p.config.Period) * time.Second / 2
if delay > half {
Expand Down
2 changes: 1 addition & 1 deletion miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Config struct {
Notify []string `toml:",omitempty"` // HTTP URL list to be notified of new work packages (only useful in ethash).
NotifyFull bool `toml:",omitempty"` // Notify with pending block headers instead of work packages
ExtraData hexutil.Bytes `toml:",omitempty"` // Block extra data set by the miner
DelayLeftOver time.Duration // Time for broadcast block
DelayLeftOver time.Duration // Time reserved to finalize a block(calculate root, distribute income...)
GasFloor uint64 // Target gas floor for mined blocks.
GasCeil uint64 // Target gas ceiling for mined blocks.
GasPrice *big.Int // Minimum gas price for mining a transaction
Expand Down
25 changes: 14 additions & 11 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func (w *worker) mainLoop() {
}
txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee)
tcount := w.current.tcount
w.commitTransactions(w.current, txset, nil)
w.commitTransactions(w.current, txset, nil, nil)
commitTxsTimer.UpdateSince(start)

// Only update the snapshot if any new transactions were added
Expand Down Expand Up @@ -797,7 +797,8 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, rece
return receipt.Logs, nil
}

func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interruptCh chan int32) bool {
func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce,
interruptCh chan int32, stopTimer *time.Timer) bool {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
Expand All @@ -809,13 +810,6 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
}

var coalescedLogs []*types.Log
var stopTimer *time.Timer
delay := w.engine.Delay(w.chain, env.header)
if delay != nil {
stopTimer = time.NewTimer(*delay - w.config.DelayLeftOver)
log.Debug("Time left for mining work", "left", (*delay - w.config.DelayLeftOver).String(), "leftover", w.config.DelayLeftOver)
defer stopTimer.Stop()
}
// initilise bloom processors
processorCapacity := 100
if txs.CurrentSize() < processorCapacity {
Expand Down Expand Up @@ -1048,15 +1042,24 @@ func (w *worker) fillTransactions(interruptCh chan int32, env *environment) {
localTxs[account] = txs
}
}

var stopTimer *time.Timer
delay := w.engine.Delay(w.chain, env.header, &w.config.DelayLeftOver)
if delay != nil {
stopTimer = time.NewTimer(*delay)
log.Debug("Time left for mining work", "delay", delay.String())
defer stopTimer.Stop()
}

if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interruptCh) {
if w.commitTransactions(env, txs, interruptCh, stopTimer) {
return
}
}
if len(remoteTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interruptCh) {
if w.commitTransactions(env, txs, interruptCh, stopTimer) {
return
}
}
Expand Down

0 comments on commit 7832b73

Please sign in to comment.