From 2a984a83855c8081c5877fbf77bb575372f44852 Mon Sep 17 00:00:00 2001 From: Shivam Sharma Date: Mon, 27 Mar 2023 18:24:19 +0530 Subject: [PATCH 1/3] add : TestCommitInterruptExperimentBor --- core/tests/blockchain_repair_test.go | 2 +- miner/test_backend.go | 602 ++++++++++++++++++++++++++- miner/worker_test.go | 113 +++-- 3 files changed, 682 insertions(+), 35 deletions(-) diff --git a/core/tests/blockchain_repair_test.go b/core/tests/blockchain_repair_test.go index 9b166b7165..e27b376931 100644 --- a/core/tests/blockchain_repair_test.go +++ b/core/tests/blockchain_repair_test.go @@ -1815,7 +1815,7 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) { chainConfig.LondonBlock = big.NewInt(0) - _, back, closeFn := miner.NewTestWorker(t, chainConfig, engine, db, 0) + _, back, closeFn := miner.NewTestWorker(t, chainConfig, engine, db, 0, 0, 0) defer closeFn() genesis := back.BlockChain().Genesis() diff --git a/miner/test_backend.go b/miner/test_backend.go index 5eb8d932d1..351d4650a8 100644 --- a/miner/test_backend.go +++ b/miner/test_backend.go @@ -1,12 +1,19 @@ package miner import ( + "context" "crypto/rand" "errors" + "fmt" "math/big" + "os" + "sync/atomic" + "time" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" + cmath "github.com/ethereum/go-ethereum/common/math" + "github.com/ethereum/go-ethereum/common/tracing" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/bor" "github.com/ethereum/go-ethereum/consensus/clique" @@ -18,7 +25,11 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) const ( @@ -169,17 +180,600 @@ func (b *testWorkerBackend) newRandomTx(creation bool) *types.Transaction { return tx } -func NewTestWorker(t TensingObject, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int) (*worker, *testWorkerBackend, func()) { +func NewTestWorker(t TensingObject, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int, noempty uint32, delay uint) (*worker, *testWorkerBackend, func()) { backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks) backend.txPool.AddLocals(pendingTxs) - //nolint:staticcheck - w := newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false) + var w *worker + + if delay != 0 { + w = newWorkerWithDelay(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false, delay) + } else { + //nolint:staticcheck + w = newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false) + } w.setEtherbase(TestBankAddress) // enable empty blocks - w.noempty = 0 + w.noempty = noempty return w, backend, w.close } + +//nolint:staticcheck +func newWorkerWithDelay(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(header *types.Header) bool, init bool, delay uint) *worker { + worker := &worker{ + config: config, + chainConfig: chainConfig, + engine: engine, + eth: eth, + mux: mux, + chain: eth.BlockChain(), + isLocalBlock: isLocalBlock, + localUncles: make(map[common.Hash]*types.Block), + remoteUncles: make(map[common.Hash]*types.Block), + unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), sealingLogAtDepth), + pendingTasks: make(map[common.Hash]*task), + txsCh: make(chan core.NewTxsEvent, txChanSize), + chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), + chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), + newWorkCh: make(chan *newWorkReq), + getWorkCh: make(chan *getWorkReq), + taskCh: make(chan *task), + resultCh: make(chan *types.Block, resultQueueSize), + exitCh: make(chan struct{}), + startCh: make(chan struct{}, 1), + resubmitIntervalCh: make(chan time.Duration), + resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), + noempty: 1, + } + worker.profileCount = new(int32) + // Subscribe NewTxsEvent for tx pool + worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) + // Subscribe events for blockchain + worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) + worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh) + + // Sanitize recommit interval if the user-specified one is too short. + recommit := worker.config.Recommit + if recommit < minRecommitInterval { + log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval) + recommit = minRecommitInterval + } + + ctx := tracing.WithTracer(context.Background(), otel.GetTracerProvider().Tracer("MinerWorker")) + + worker.wg.Add(4) + + go worker.mainLoopWithDelay(ctx, delay) + go worker.newWorkLoop(ctx, recommit) + go worker.resultLoop() + go worker.taskLoop() + + // Submit first work to initialize pending state. + if init { + worker.startCh <- struct{}{} + } + return worker +} + +func (w *worker) mainLoopWithDelay(ctx context.Context, delay uint) { + defer w.wg.Done() + defer w.txsSub.Unsubscribe() + defer w.chainHeadSub.Unsubscribe() + defer w.chainSideSub.Unsubscribe() + defer func() { + if w.current != nil { + w.current.discard() + } + }() + + cleanTicker := time.NewTicker(time.Second * 10) + defer cleanTicker.Stop() + + for { + select { + case req := <-w.newWorkCh: + //nolint:contextcheck + + w.commitWorkWithDelay(req.ctx, req.interrupt, req.noempty, req.timestamp, delay) + + case req := <-w.getWorkCh: + //nolint:contextcheck + block, err := w.generateWork(req.ctx, req.params) + if err != nil { + req.err = err + req.result <- nil + } else { + req.result <- block + } + + case ev := <-w.chainSideCh: + // Short circuit for duplicate side blocks + if _, exist := w.localUncles[ev.Block.Hash()]; exist { + continue + } + if _, exist := w.remoteUncles[ev.Block.Hash()]; exist { + continue + } + // Add side block to possible uncle block set depending on the author. + if w.isLocalBlock != nil && w.isLocalBlock(ev.Block.Header()) { + w.localUncles[ev.Block.Hash()] = ev.Block + } else { + w.remoteUncles[ev.Block.Hash()] = ev.Block + } + // If our sealing block contains less than 2 uncle blocks, + // add the new uncle block if valid and regenerate a new + // sealing block for higher profit. + if w.isRunning() && w.current != nil && len(w.current.uncles) < 2 { + start := time.Now() + if err := w.commitUncle(w.current, ev.Block.Header()); err == nil { + commitErr := w.commit(ctx, w.current.copy(), nil, true, start) + if commitErr != nil { + log.Error("error while committing work for mining", "err", commitErr) + } + } + } + + case <-cleanTicker.C: + chainHead := w.chain.CurrentBlock() + for hash, uncle := range w.localUncles { + if uncle.NumberU64()+staleThreshold <= chainHead.NumberU64() { + delete(w.localUncles, hash) + } + } + for hash, uncle := range w.remoteUncles { + if uncle.NumberU64()+staleThreshold <= chainHead.NumberU64() { + delete(w.remoteUncles, hash) + } + } + + case ev := <-w.txsCh: + // Apply transactions to the pending state if we're not sealing + // + // Note all transactions received may not be continuous with transactions + // already included in the current sealing block. These transactions will + // be automatically eliminated. + if !w.isRunning() && w.current != nil { + // If block is already full, abort + if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas { + continue + } + + txs := make(map[common.Address]types.Transactions) + + for _, tx := range ev.Txs { + acc, _ := types.Sender(w.current.signer, tx) + txs[acc] = append(txs[acc], tx) + } + + txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, cmath.FromBig(w.current.header.BaseFee)) + tcount := w.current.tcount + + interruptCh, stopFn := getInterruptTimer(ctx, w.current, w.chain.CurrentBlock()) + w.commitTransactionsWithDelay(w.current, txset, nil, interruptCh, delay) + + // Only update the snapshot if any new transactions were added + // to the pending block + if tcount != w.current.tcount { + w.updateSnapshot(w.current) + } + + stopFn() + } else { + // Special case, if the consensus engine is 0 period clique(dev mode), + // submit sealing work here since all empty submission will be rejected + // by clique. Of course the advance sealing(empty submission) is disabled. + if w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0 { + w.commitWork(ctx, nil, true, time.Now().Unix()) + } + } + atomic.AddInt32(&w.newTxs, int32(len(ev.Txs))) + + // System stopped + case <-w.exitCh: + return + case <-w.txsSub.Err(): + return + case <-w.chainHeadSub.Err(): + return + case <-w.chainSideSub.Err(): + return + } + } +} + +func (w *worker) commitTransactionsWithDelay(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32, interruptCh chan struct{}, delay uint) bool { + gasLimit := env.header.GasLimit + if env.gasPool == nil { + env.gasPool = new(core.GasPool).AddGas(gasLimit) + } + var coalescedLogs []*types.Log + + initialGasLimit := env.gasPool.Gas() + initialTxs := txs.GetTxs() + + var breakCause string + + defer func() { + log.OnDebug(func(lg log.Logging) { + lg("commitTransactions-stats", + "initialTxsCount", initialTxs, + "initialGasLimit", initialGasLimit, + "resultTxsCount", txs.GetTxs(), + "resultGapPool", env.gasPool.Gas(), + "exitCause", breakCause) + }) + }() + +mainloop: + for { + // case of interrupting by timeout + select { + case <-interruptCh: + commitInterruptCounter.Inc(1) + break mainloop + default: + } + // In the following three cases, we will interrupt the execution of the transaction. + // (1) new head block event arrival, the interrupt signal is 1 + // (2) worker start or restart, the interrupt signal is 1 + // (3) worker recreate the sealing block with any newly arrived transactions, the interrupt signal is 2. + // For the first two cases, the semi-finished work will be discarded. + // For the third case, the semi-finished work will be submitted to the consensus engine. + if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { + // Notify resubmit loop to increase resubmitting interval due to too frequent commits. + if atomic.LoadInt32(interrupt) == commitInterruptResubmit { + ratio := float64(gasLimit-env.gasPool.Gas()) / float64(gasLimit) + if ratio < 0.1 { + ratio = 0.1 + } + w.resubmitAdjustCh <- &intervalAdjust{ + ratio: ratio, + inc: true, + } + } + + breakCause = "interrupt" + return atomic.LoadInt32(interrupt) == commitInterruptNewHead + } + // If we don't have enough gas for any further transactions then we're done + if env.gasPool.Gas() < params.TxGas { + log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) + + breakCause = "Not enough gas for further transactions" + break + } + // Retrieve the next transaction and abort if all done + tx := txs.Peek() + if tx == nil { + breakCause = "all transactions has been included" + break + } + // Error may be ignored here. The error has already been checked + // during transaction acceptance is the transaction pool. + // + // We use the eip155 signer regardless of the current hf. + from, _ := types.Sender(env.signer, tx) + // Check whether the tx is replay protected. If we're not in the EIP155 hf + // phase, start ignoring the sender until we do. + if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) { + log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block) + + txs.Pop() + continue + } + // Start executing the transaction + env.state.Prepare(tx.Hash(), env.tcount) + + var start time.Time + + log.OnDebug(func(log.Logging) { + start = time.Now() + }) + + logs, err := w.commitTransaction(env, tx) + time.Sleep(time.Duration(delay) * time.Millisecond) + + switch { + case errors.Is(err, core.ErrGasLimitReached): + // Pop the current out-of-gas transaction without shifting in the next from the account + log.Trace("Gas limit exceeded for current block", "sender", from) + txs.Pop() + + case errors.Is(err, core.ErrNonceTooLow): + // New head notification data race between the transaction pool and miner, shift + log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce()) + txs.Shift() + + case errors.Is(err, core.ErrNonceTooHigh): + // Reorg notification data race between the transaction pool and miner, skip account = + log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce()) + txs.Pop() + + case errors.Is(err, nil): + // Everything ok, collect the logs and shift in the next transaction from the same account + coalescedLogs = append(coalescedLogs, logs...) + env.tcount++ + txs.Shift() + + log.OnDebug(func(lg log.Logging) { + lg("Committed new tx", "tx hash", tx.Hash(), "from", from, "to", tx.To(), "nonce", tx.Nonce(), "gas", tx.Gas(), "gasPrice", tx.GasPrice(), "value", tx.Value(), "time spent", time.Since(start)) + }) + + case errors.Is(err, core.ErrTxTypeNotSupported): + // Pop the unsupported transaction without shifting in the next from the account + log.Trace("Skipping unsupported transaction type", "sender", from, "type", tx.Type()) + txs.Pop() + + default: + // Strange error, discard the transaction and get the next in line (note, the + // nonce-too-high clause will prevent us from executing in vain). + log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) + txs.Shift() + } + } + + if !w.isRunning() && len(coalescedLogs) > 0 { + // We don't push the pendingLogsEvent while we are sealing. The reason is that + // when we are sealing, the worker will regenerate a sealing block every 3 seconds. + // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing. + + // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined + // logs by filling in the block hash when the block was mined by the local miner. This can + // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. + cpy := make([]*types.Log, len(coalescedLogs)) + for i, l := range coalescedLogs { + cpy[i] = new(types.Log) + *cpy[i] = *l + } + w.pendingLogsFeed.Send(cpy) + } + // Notify resubmit loop to decrease resubmitting interval if current interval is larger + // than the user-specified one. + if interrupt != nil { + w.resubmitAdjustCh <- &intervalAdjust{inc: false} + } + return false +} + +func (w *worker) commitWorkWithDelay(ctx context.Context, interrupt *int32, noempty bool, timestamp int64, delay uint) { + + start := time.Now() + + var ( + work *environment + err error + ) + + tracing.Exec(ctx, "", "worker.prepareWork", func(ctx context.Context, span trace.Span) { + // Set the coinbase if the worker is running or it's required + var coinbase common.Address + if w.isRunning() { + if w.coinbase == (common.Address{}) { + log.Error("Refusing to mine without etherbase") + return + } + + coinbase = w.coinbase // Use the preset address as the fee recipient + } + + work, err = w.prepareWork(&generateParams{ + timestamp: uint64(timestamp), + coinbase: coinbase, + }) + }) + + if err != nil { + return + } + + var interruptCh chan struct{} + + stopFn := func() {} + defer func() { + stopFn() + }() + + if !noempty { + interruptCh, stopFn = getInterruptTimer(ctx, work, w.chain.CurrentBlock()) + } + + ctx, span := tracing.StartSpan(ctx, "commitWork") + defer tracing.EndSpan(span) + + tracing.SetAttributes( + span, + attribute.Int("number", int(work.header.Number.Uint64())), + ) + + // Create an empty block based on temporary copied state for + // sealing in advance without waiting block execution finished. + if !noempty && atomic.LoadUint32(&w.noempty) == 0 { + err = w.commit(ctx, work.copy(), nil, false, start) + if err != nil { + return + } + } + + // Fill pending transactions from the txpool + w.fillTransactionsWithDelay(ctx, interrupt, work, interruptCh, delay) + + err = w.commit(ctx, work.copy(), w.fullTaskHook, true, start) + if err != nil { + return + } + + // Swap out the old work with the new one, terminating any leftover + // prefetcher processes in the mean time and starting a new one. + if w.current != nil { + w.current.discard() + } + + w.current = work +} + +func (w *worker) fillTransactionsWithDelay(ctx context.Context, interrupt *int32, env *environment, interruptCh chan struct{}, delay uint) { + ctx, span := tracing.StartSpan(ctx, "fillTransactions") + defer tracing.EndSpan(span) + + // Split the pending transactions into locals and remotes + // Fill the block with all available pending transactions. + + var ( + localTxsCount int + remoteTxsCount int + localTxs = make(map[common.Address]types.Transactions) + remoteTxs map[common.Address]types.Transactions + ) + + // TODO: move to config or RPC + const profiling = false + + if profiling { + doneCh := make(chan struct{}) + + defer func() { + close(doneCh) + }() + + go func(number uint64) { + closeFn := func() error { + return nil + } + + for { + select { + case <-time.After(150 * time.Millisecond): + // Check if we've not crossed limit + if attempt := atomic.AddInt32(w.profileCount, 1); attempt >= 10 { + log.Info("Completed profiling", "attempt", attempt) + + return + } + + log.Info("Starting profiling in fill transactions", "number", number) + + dir, err := os.MkdirTemp("", fmt.Sprintf("bor-traces-%s-", time.Now().UTC().Format("2006-01-02-150405Z"))) + if err != nil { + log.Error("Error in profiling", "path", dir, "number", number, "err", err) + return + } + + // grab the cpu profile + closeFnInternal, err := startProfiler("cpu", dir, number) + if err != nil { + log.Error("Error in profiling", "path", dir, "number", number, "err", err) + return + } + + closeFn = func() error { + err := closeFnInternal() + + log.Info("Completed profiling", "path", dir, "number", number, "error", err) + + return nil + } + + case <-doneCh: + err := closeFn() + + if err != nil { + log.Info("closing fillTransactions", "number", number, "error", err) + } + + return + } + } + }(env.header.Number.Uint64()) + } + + tracing.Exec(ctx, "", "worker.SplittingTransactions", func(ctx context.Context, span trace.Span) { + + prePendingTime := time.Now() + + pending := w.eth.TxPool().Pending(ctx, true) + remoteTxs = pending + + postPendingTime := time.Now() + + for _, account := range w.eth.TxPool().Locals() { + if txs := remoteTxs[account]; len(txs) > 0 { + delete(remoteTxs, account) + localTxs[account] = txs + } + } + + postLocalsTime := time.Now() + + localTxsCount = len(localTxs) + remoteTxsCount = len(remoteTxs) + + tracing.SetAttributes( + span, + attribute.Int("len of local txs", localTxsCount), + attribute.Int("len of remote txs", remoteTxsCount), + attribute.String("time taken by Pending()", fmt.Sprintf("%v", postPendingTime.Sub(prePendingTime))), + attribute.String("time taken by Locals()", fmt.Sprintf("%v", postLocalsTime.Sub(postPendingTime))), + ) + }) + + var ( + localEnvTCount int + remoteEnvTCount int + committed bool + ) + + if localTxsCount > 0 { + var txs *types.TransactionsByPriceAndNonce + + tracing.Exec(ctx, "", "worker.LocalTransactionsByPriceAndNonce", func(ctx context.Context, span trace.Span) { + txs = types.NewTransactionsByPriceAndNonce(env.signer, localTxs, cmath.FromBig(env.header.BaseFee)) + + tracing.SetAttributes( + span, + attribute.Int("len of tx local Heads", txs.GetTxs()), + ) + }) + + tracing.Exec(ctx, "", "worker.LocalCommitTransactions", func(ctx context.Context, span trace.Span) { + committed = w.commitTransactionsWithDelay(env, txs, interrupt, interruptCh, delay) + }) + + if committed { + return + } + + localEnvTCount = env.tcount + } + + if remoteTxsCount > 0 { + var txs *types.TransactionsByPriceAndNonce + + tracing.Exec(ctx, "", "worker.RemoteTransactionsByPriceAndNonce", func(ctx context.Context, span trace.Span) { + txs = types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, cmath.FromBig(env.header.BaseFee)) + + tracing.SetAttributes( + span, + attribute.Int("len of tx remote Heads", txs.GetTxs()), + ) + }) + + tracing.Exec(ctx, "", "worker.RemoteCommitTransactions", func(ctx context.Context, span trace.Span) { + committed = w.commitTransactionsWithDelay(env, txs, interrupt, interruptCh, delay) + }) + + if committed { + return + } + + remoteEnvTCount = env.tcount + } + + tracing.SetAttributes( + span, + attribute.Int("len of final local txs ", localEnvTCount), + attribute.Int("len of final remote txs", remoteEnvTCount), + ) +} diff --git a/miner/worker_test.go b/miner/worker_test.go index 3a1dd5f8b9..31a7cc7350 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -18,11 +18,13 @@ package miner import ( "math/big" + "os" "sync/atomic" "testing" "time" "github.com/golang/mock/gomock" + "gotest.tools/assert" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" @@ -35,6 +37,7 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/tests/bor/mocks" ) @@ -63,35 +66,14 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool, isBor bool) { engine consensus.Engine chainConfig *params.ChainConfig db = rawdb.NewMemoryDatabase() + ctrl *gomock.Controller ) if isBor { chainConfig = params.BorUnittestChainConfig - ctrl := gomock.NewController(t) + engine, ctrl = getFakeBorFromConfig(t, chainConfig) defer ctrl.Finish() - - ethAPIMock := api.NewMockCaller(ctrl) - ethAPIMock.EXPECT().Call(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - - spanner := bor.NewMockSpanner(ctrl) - spanner.EXPECT().GetCurrentValidators(gomock.Any(), gomock.Any(), gomock.Any()).Return([]*valset.Validator{ - { - ID: 0, - Address: TestBankAddress, - VotingPower: 100, - ProposerPriority: 0, - }, - }, nil).AnyTimes() - - heimdallClientMock := mocks.NewMockIHeimdallClient(ctrl) - heimdallClientMock.EXPECT().Close().Times(1) - - contractMock := bor.NewMockGenesisContract(ctrl) - - db, _, _ = NewDBForFakes(t) - - engine = NewFakeBor(t, db, chainConfig, ethAPIMock, spanner, heimdallClientMock, contractMock) } else { if isClique { chainConfig = params.AllCliqueProtocolChanges @@ -107,7 +89,7 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool, isBor bool) { chainConfig.LondonBlock = big.NewInt(0) - w, b, _ := NewTestWorker(t, chainConfig, engine, db, 0) + w, b, _ := NewTestWorker(t, chainConfig, engine, db, 0, 0, 0) defer w.close() // This test chain imports the mined blocks. @@ -171,6 +153,34 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool, isBor bool) { } } +func getFakeBorFromConfig(t *testing.T, chainConfig *params.ChainConfig) (consensus.Engine, *gomock.Controller) { + ctrl := gomock.NewController(t) + + ethAPIMock := api.NewMockCaller(ctrl) + ethAPIMock.EXPECT().Call(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + spanner := bor.NewMockSpanner(ctrl) + spanner.EXPECT().GetCurrentValidators(gomock.Any(), gomock.Any(), gomock.Any()).Return([]*valset.Validator{ + { + ID: 0, + Address: TestBankAddress, + VotingPower: 100, + ProposerPriority: 0, + }, + }, nil).AnyTimes() + + heimdallClientMock := mocks.NewMockIHeimdallClient(ctrl) + heimdallClientMock.EXPECT().Close().Times(1) + + contractMock := bor.NewMockGenesisContract(ctrl) + + db, _, _ := NewDBForFakes(t) + + engine := NewFakeBor(t, db, chainConfig, ethAPIMock, spanner, heimdallClientMock, contractMock) + + return engine, ctrl +} + func TestEmptyWorkEthash(t *testing.T) { t.Skip() testEmptyWork(t, ethashChainConfig, ethash.NewFaker()) @@ -183,7 +193,7 @@ func TestEmptyWorkClique(t *testing.T) { func testEmptyWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { defer engine.Close() - w, _, _ := NewTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0) + w, _, _ := NewTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0, 0, 0) defer w.close() var ( @@ -237,7 +247,7 @@ func TestStreamUncleBlock(t *testing.T) { ethash := ethash.NewFaker() defer ethash.Close() - w, b, _ := NewTestWorker(t, ethashChainConfig, ethash, rawdb.NewMemoryDatabase(), 1) + w, b, _ := NewTestWorker(t, ethashChainConfig, ethash, rawdb.NewMemoryDatabase(), 1, 0, 0) defer w.close() var taskCh = make(chan struct{}) @@ -299,7 +309,7 @@ func TestRegenerateMiningBlockClique(t *testing.T) { func testRegenerateMiningBlock(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { defer engine.Close() - w, b, _ := NewTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0) + w, b, _ := NewTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0, 0, 0) defer w.close() var taskCh = make(chan struct{}, 3) @@ -370,7 +380,7 @@ func TestAdjustIntervalClique(t *testing.T) { func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { defer engine.Close() - w, _, _ := NewTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0) + w, _, _ := NewTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0, 0, 0) defer w.close() w.skipSealHook = func(task *task) bool { @@ -478,7 +488,7 @@ func TestGetSealingWorkPostMerge(t *testing.T) { func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, postMerge bool) { defer engine.Close() - w, b, _ := NewTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0) + w, b, _ := NewTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0, 0, 0) defer w.close() w.setExtra([]byte{0x01, 0x02}) @@ -614,6 +624,49 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co } } +func TestCommitInterruptExperimentBor(t *testing.T) { + // with 1 sec block time and 200 millisec tx delay we should get 5 txs per block + testCommitInterruptExperimentBor(t, 200, 5) + + // with 1 sec block time and 100 millisec tx delay we should get 10 txs per block + testCommitInterruptExperimentBor(t, 100, 10) +} + +func testCommitInterruptExperimentBor(t *testing.T, delay uint, txCount int) { + var ( + engine consensus.Engine + chainConfig *params.ChainConfig + db = rawdb.NewMemoryDatabase() + ctrl *gomock.Controller + ) + + chainConfig = params.BorUnittestChainConfig + log.Root().SetHandler(log.LvlFilterHandler(4, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) + engine, ctrl = getFakeBorFromConfig(t, chainConfig) + defer func() { + engine.Close() + ctrl.Finish() + }() + + w, b, _ := NewTestWorker(t, chainConfig, engine, db, 0, 1, delay) + defer w.close() + + go func() { + for { + tx := b.newRandomTx(false) + b.TxPool().AddRemote(tx) + time.Sleep(6 * time.Millisecond) + } + }() + + // Start mining! + w.start() + time.Sleep(5 * time.Second) + w.stop() + + assert.Equal(t, txCount, w.chain.CurrentBlock().Transactions().Len()) +} + func BenchmarkBorMining(b *testing.B) { chainConfig := params.BorUnittestChainConfig @@ -645,7 +698,7 @@ func BenchmarkBorMining(b *testing.B) { chainConfig.LondonBlock = big.NewInt(0) - w, back, _ := NewTestWorker(b, chainConfig, engine, db, 0) + w, back, _ := NewTestWorker(b, chainConfig, engine, db, 0, 0, 0) defer w.close() // This test chain imports the mined blocks. From 2f73ec7866a835c916e7a282fae8d75271a1ffc5 Mon Sep 17 00:00:00 2001 From: Shivam Sharma Date: Tue, 28 Mar 2023 00:58:38 +0530 Subject: [PATCH 2/3] lint : fix lint --- miner/test_backend.go | 23 ++++++++++++++++++----- miner/worker_test.go | 12 +++++++++++- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/miner/test_backend.go b/miner/test_backend.go index 351d4650a8..3d4934e052 100644 --- a/miner/test_backend.go +++ b/miner/test_backend.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -187,6 +188,7 @@ func NewTestWorker(t TensingObject, chainConfig *params.ChainConfig, engine cons var w *worker if delay != 0 { + //nolint:staticcheck w = newWorkerWithDelay(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false, delay) } else { //nolint:staticcheck @@ -255,9 +257,11 @@ func newWorkerWithDelay(config *Config, chainConfig *params.ChainConfig, engine if init { worker.startCh <- struct{}{} } + return worker } +// nolint:gocognit func (w *worker) mainLoopWithDelay(ctx context.Context, delay uint) { defer w.wg.Done() defer w.txsSub.Unsubscribe() @@ -276,7 +280,6 @@ func (w *worker) mainLoopWithDelay(ctx context.Context, delay uint) { select { case req := <-w.newWorkCh: //nolint:contextcheck - w.commitWorkWithDelay(req.ctx, req.interrupt, req.noempty, req.timestamp, delay) case req := <-w.getWorkCh: @@ -294,15 +297,18 @@ func (w *worker) mainLoopWithDelay(ctx context.Context, delay uint) { if _, exist := w.localUncles[ev.Block.Hash()]; exist { continue } + if _, exist := w.remoteUncles[ev.Block.Hash()]; exist { continue } + // Add side block to possible uncle block set depending on the author. if w.isLocalBlock != nil && w.isLocalBlock(ev.Block.Header()) { w.localUncles[ev.Block.Hash()] = ev.Block } else { w.remoteUncles[ev.Block.Hash()] = ev.Block } + // If our sealing block contains less than 2 uncle blocks, // add the new uncle block if valid and regenerate a new // sealing block for higher profit. @@ -323,6 +329,7 @@ func (w *worker) mainLoopWithDelay(ctx context.Context, delay uint) { delete(w.localUncles, hash) } } + for hash, uncle := range w.remoteUncles { if uncle.NumberU64()+staleThreshold <= chainHead.NumberU64() { delete(w.remoteUncles, hash) @@ -369,6 +376,7 @@ func (w *worker) mainLoopWithDelay(ctx context.Context, delay uint) { w.commitWork(ctx, nil, true, time.Now().Unix()) } } + atomic.AddInt32(&w.newTxs, int32(len(ev.Txs))) // System stopped @@ -384,11 +392,13 @@ func (w *worker) mainLoopWithDelay(ctx context.Context, delay uint) { } } +// nolint:gocognit func (w *worker) commitTransactionsWithDelay(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32, interruptCh chan struct{}, delay uint) bool { gasLimit := env.header.GasLimit if env.gasPool == nil { env.gasPool = new(core.GasPool).AddGas(gasLimit) } + var coalescedLogs []*types.Log initialGasLimit := env.gasPool.Gas() @@ -427,6 +437,7 @@ mainloop: if atomic.LoadInt32(interrupt) == commitInterruptResubmit { ratio := float64(gasLimit-env.gasPool.Gas()) / float64(gasLimit) if ratio < 0.1 { + // nolint:goconst ratio = 0.1 } w.resubmitAdjustCh <- &intervalAdjust{ @@ -434,20 +445,21 @@ mainloop: inc: true, } } - + // nolint:goconst breakCause = "interrupt" return atomic.LoadInt32(interrupt) == commitInterruptNewHead } // If we don't have enough gas for any further transactions then we're done if env.gasPool.Gas() < params.TxGas { log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) - + // nolint:goconst breakCause = "Not enough gas for further transactions" break } // Retrieve the next transaction and abort if all done tx := txs.Peek() if tx == nil { + // nolint:goconst breakCause = "all transactions has been included" break } @@ -519,7 +531,6 @@ mainloop: // We don't push the pendingLogsEvent while we are sealing. The reason is that // when we are sealing, the worker will regenerate a sealing block every 3 seconds. // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing. - // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined // logs by filling in the block hash when the block was mined by the local miner. This can // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. @@ -528,6 +539,7 @@ mainloop: cpy[i] = new(types.Log) *cpy[i] = *l } + w.pendingLogsFeed.Send(cpy) } // Notify resubmit loop to decrease resubmitting interval if current interval is larger @@ -535,11 +547,11 @@ mainloop: if interrupt != nil { w.resubmitAdjustCh <- &intervalAdjust{inc: false} } + return false } func (w *worker) commitWorkWithDelay(ctx context.Context, interrupt *int32, noempty bool, timestamp int64, delay uint) { - start := time.Now() var ( @@ -614,6 +626,7 @@ func (w *worker) commitWorkWithDelay(ctx context.Context, interrupt *int32, noem w.current = work } +// nolint:gocognit func (w *worker) fillTransactionsWithDelay(ctx context.Context, interrupt *int32, env *environment, interruptCh chan struct{}, delay uint) { ctx, span := tracing.StartSpan(ctx, "fillTransactions") defer tracing.EndSpan(span) diff --git a/miner/worker_test.go b/miner/worker_test.go index 31a7cc7350..8b7fa9d1e1 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -154,6 +154,8 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool, isBor bool) { } func getFakeBorFromConfig(t *testing.T, chainConfig *params.ChainConfig) (consensus.Engine, *gomock.Controller) { + t.Helper() + ctrl := gomock.NewController(t) ethAPIMock := api.NewMockCaller(ctrl) @@ -625,6 +627,7 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co } func TestCommitInterruptExperimentBor(t *testing.T) { + t.Parallel() // with 1 sec block time and 200 millisec tx delay we should get 5 txs per block testCommitInterruptExperimentBor(t, 200, 5) @@ -633,6 +636,8 @@ func TestCommitInterruptExperimentBor(t *testing.T) { } func testCommitInterruptExperimentBor(t *testing.T, delay uint, txCount int) { + t.Helper() + var ( engine consensus.Engine chainConfig *params.ChainConfig @@ -641,7 +646,9 @@ func testCommitInterruptExperimentBor(t *testing.T, delay uint, txCount int) { ) chainConfig = params.BorUnittestChainConfig + log.Root().SetHandler(log.LvlFilterHandler(4, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) + engine, ctrl = getFakeBorFromConfig(t, chainConfig) defer func() { engine.Close() @@ -654,7 +661,10 @@ func testCommitInterruptExperimentBor(t *testing.T, delay uint, txCount int) { go func() { for { tx := b.newRandomTx(false) - b.TxPool().AddRemote(tx) + if err := b.TxPool().AddRemote(tx); err != nil { + t.Error(err) + } + time.Sleep(6 * time.Millisecond) } }() From 107cb4986c7353cb2cdb764b92ce920dc0688ea1 Mon Sep 17 00:00:00 2001 From: Shivam Sharma Date: Tue, 28 Mar 2023 12:25:22 +0530 Subject: [PATCH 3/3] chg : add waitgroup --- miner/worker_test.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/miner/worker_test.go b/miner/worker_test.go index 8b7fa9d1e1..c4eab34f13 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -19,6 +19,7 @@ package miner import ( "math/big" "os" + "sync" "sync/atomic" "testing" "time" @@ -658,17 +659,24 @@ func testCommitInterruptExperimentBor(t *testing.T, delay uint, txCount int) { w, b, _ := NewTestWorker(t, chainConfig, engine, db, 0, 1, delay) defer w.close() + wg := new(sync.WaitGroup) + wg.Add(1) + go func() { + wg.Done() + for { tx := b.newRandomTx(false) if err := b.TxPool().AddRemote(tx); err != nil { - t.Error(err) + t.Log(err) } - time.Sleep(6 * time.Millisecond) + time.Sleep(20 * time.Millisecond) } }() + wg.Wait() + // Start mining! w.start() time.Sleep(5 * time.Second)