Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parlia: Some updates of the miner worker #1182

Merged
merged 2 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ var (
}
MinerDelayLeftoverFlag = cli.DurationFlag{
Name: "miner.delayleftover",
Usage: "Time interval to for broadcast block",
Usage: "Time reserved to finalize a block",
Value: ethconfig.Defaults.Miner.DelayLeftOver,
}
MinerNoVerfiyFlag = cli.BoolFlag{
Expand Down
2 changes: 1 addition & 1 deletion consensus/beacon/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (beacon *Beacon) Prepare(chain consensus.ChainHeaderReader, header *types.H
return nil
}

func (beacon *Beacon) Delay(_ consensus.ChainReader, _ *types.Header) *time.Duration {
func (beacon *Beacon) Delay(_ consensus.ChainReader, _ *types.Header, _ *time.Duration) *time.Duration {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/clique/clique.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ func (c *Clique) Authorize(signer common.Address, signFn SignerFn) {
c.signFn = signFn
}

func (c *Clique) Delay(chain consensus.ChainReader, header *types.Header) *time.Duration {
func (c *Clique) Delay(chain consensus.ChainReader, header *types.Header, leftOver *time.Duration) *time.Duration {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ type Engine interface {
APIs(chain ChainHeaderReader) []rpc.API

// Delay returns the max duration the miner can commit txs
Delay(chain ChainReader, header *types.Header) *time.Duration
Delay(chain ChainReader, header *types.Header, leftOver *time.Duration) *time.Duration

// Close terminates any background threads maintained by the consensus engine.
Close() error
Expand Down
2 changes: 1 addition & 1 deletion consensus/ethash/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ func (ethash *Ethash) FinalizeAndAssemble(chain consensus.ChainHeaderReader, hea
return types.NewBlock(header, txs, uncles, receipts, trie.NewStackTrie(nil)), receipts, nil
}

func (ethash *Ethash) Delay(_ consensus.ChainReader, _ *types.Header) *time.Duration {
func (ethash *Ethash) Delay(_ consensus.ChainReader, _ *types.Header, _ *time.Duration) *time.Duration {
return nil
}

Expand Down
14 changes: 13 additions & 1 deletion consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,13 +793,25 @@ func (p *Parlia) Authorize(val common.Address, signFn SignerFn, signTxFn SignerT
p.signTxFn = signTxFn
}

func (p *Parlia) Delay(chain consensus.ChainReader, header *types.Header) *time.Duration {
// Argument leftOver is the time reserved for block finalize(calculate root, distribute income...)
func (p *Parlia) Delay(chain consensus.ChainReader, header *types.Header, leftOver *time.Duration) *time.Duration {
number := header.Number.Uint64()
snap, err := p.snapshot(chain, number-1, header.ParentHash, nil)
if err != nil {
return nil
}
delay := p.delayForRamanujanFork(snap, header)

if *leftOver >= time.Duration(p.config.Period)*time.Second {
// ignore invalid leftOver
log.Error("Delay invalid argument", "leftOver", leftOver.String(), "Period", p.config.Period)
} else if *leftOver >= delay {
delay = time.Duration(0)
return &delay
} else {
delay = delay - *leftOver
}

// The blocking time should be no more than half of period
half := time.Duration(p.config.Period) * time.Second / 2
if delay > half {
Expand Down
2 changes: 1 addition & 1 deletion miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Config struct {
Notify []string `toml:",omitempty"` // HTTP URL list to be notified of new work packages (only useful in ethash).
NotifyFull bool `toml:",omitempty"` // Notify with pending block headers instead of work packages
ExtraData hexutil.Bytes `toml:",omitempty"` // Block extra data set by the miner
DelayLeftOver time.Duration // Time for broadcast block
DelayLeftOver time.Duration // Time reserved to finalize a block(calculate root, distribute income...)
GasFloor uint64 // Target gas floor for mined blocks.
GasCeil uint64 // Target gas ceiling for mined blocks.
GasPrice *big.Int // Minimum gas price for mining a transaction
Expand Down
53 changes: 15 additions & 38 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ const (
)

var (
commitTxsTimer = metrics.NewRegisteredTimer("worker/committxs", nil)
writeBlockTimer = metrics.NewRegisteredTimer("worker/writeblock", nil)
finalizeBlockTimer = metrics.NewRegisteredTimer("worker/finalizeblock", nil)
)
Expand Down Expand Up @@ -542,33 +541,7 @@ func (w *worker) mainLoop() {
}

case ev := <-w.txsCh:
// Apply transactions to the pending state if we're not sealing
//
// Note all transactions received may not be continuous with transactions
// already included in the current sealing block. These transactions will
// be automatically eliminated.
if !w.isRunning() && w.current != nil {
start := time.Now()
// If block is already full, abort
if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas {
continue
}
txs := make(map[common.Address]types.Transactions)
for _, tx := range ev.Txs {
acc, _ := types.Sender(w.current.signer, tx)
txs[acc] = append(txs[acc], tx)
}
txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee)
tcount := w.current.tcount
w.commitTransactions(w.current, txset, nil)
commitTxsTimer.UpdateSince(start)

// Only update the snapshot if any new transactions were added
// to the pending block
if tcount != w.current.tcount {
w.updateSnapshot(w.current)
}
} else {
if w.isRunning() {
// Special case, if the consensus engine is 0 period clique(dev mode),
// submit sealing work here since all empty submission will be rejected
// by clique. Of course the advance sealing(empty submission) is disabled.
Expand All @@ -577,6 +550,7 @@ func (w *worker) mainLoop() {
w.commitWork(nil, true, time.Now().Unix())
}
}

atomic.AddInt32(&w.newTxs, int32(len(ev.Txs)))

// System stopped
Expand Down Expand Up @@ -797,7 +771,8 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, rece
return receipt.Logs, nil
}

func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interruptCh chan int32) bool {
func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce,
interruptCh chan int32, stopTimer *time.Timer) bool {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
Expand All @@ -809,13 +784,6 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
}

var coalescedLogs []*types.Log
var stopTimer *time.Timer
delay := w.engine.Delay(w.chain, env.header)
if delay != nil {
stopTimer = time.NewTimer(*delay - w.config.DelayLeftOver)
log.Debug("Time left for mining work", "left", (*delay - w.config.DelayLeftOver).String(), "leftover", w.config.DelayLeftOver)
defer stopTimer.Stop()
}
// initilise bloom processors
processorCapacity := 100
if txs.CurrentSize() < processorCapacity {
Expand Down Expand Up @@ -1048,15 +1016,24 @@ func (w *worker) fillTransactions(interruptCh chan int32, env *environment) {
localTxs[account] = txs
}
}

var stopTimer *time.Timer
delay := w.engine.Delay(w.chain, env.header, &w.config.DelayLeftOver)
if delay != nil {
stopTimer = time.NewTimer(*delay)
log.Debug("Time left for mining work", "delay", delay.String())
defer stopTimer.Stop()
}

if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interruptCh) {
if w.commitTransactions(env, txs, interruptCh, stopTimer) {
Copy link
Collaborator

@unclezoro unclezoro Nov 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

	x := time.NewTimer(time.Second)
	<-x.C
	fmt.Println("read delay 1")
	<-x.C
	fmt.Println("read delay 2")

It turns out that fmt.Println("read delay 2") is not reachable.

So if the timer trigger during the first commitTransactions for local tx, then commitTransactions will have no time delay limitation .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it do have the problem, will fix it later.

return
}
}
if len(remoteTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interruptCh) {
if w.commitTransactions(env, txs, interruptCh, stopTimer) {
return
}
}
Expand Down