diff --git a/core/error.go b/core/error.go index 1fbd0d599b..132704ec10 100644 --- a/core/error.go +++ b/core/error.go @@ -75,3 +75,21 @@ var ( // current network configuration. ErrTxTypeNotSupported = types.ErrTxTypeNotSupported ) + +type CustomError struct { + msg string + code int +} + +func NewCustomError(msg string, code int) *CustomError { + return &CustomError{ + msg: msg, + code: code, + } +} + +func (e *CustomError) ErrorCode() int { return e.code } + +func (e *CustomError) Error() string { + return e.msg +} diff --git a/core/tx_pool.go b/core/tx_pool.go index a97ccf22e0..d2d5e4562c 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -18,6 +18,7 @@ package core import ( "errors" + "fmt" "math" "math/big" "sort" @@ -53,6 +54,9 @@ const ( // more expensive to propagate; larger transactions also take more resources // to validate whether they fit into the pool or not. txMaxSize = 4 * txSlotSize // 128KB + + // Bundle Error Code + GasPriceTooLowErrorCode = -38011 ) var ( @@ -88,8 +92,10 @@ var ( // making the transaction invalid, rather a DOS protection. ErrOversizedData = errors.New("oversized data") - // ErrorBundlePoolIsFull is returned if the number of bundle exceed the limit - ErrorBundlePoolIsFull = errors.New("bundle pool is full") + BundleAlreadyExistError = NewCustomError("bundle already exist", -38001) + BundlePoolFullError = NewCustomError("bundle pool is full", -38002) + SimulatorMissingError = NewCustomError("bundle simulator is missing", -38003) + DifferentSendersError = NewCustomError("only one tx sender is allowed within one bundle", -38010) ) var ( @@ -574,10 +580,29 @@ func (pool *TxPool) PruneBundle(bundle common.Hash) { delete(pool.mevBundles, bundle) } +// For testing +func (pool *TxPool) SetBundle(bundleHash common.Hash, bundle *types.MevBundle) { + pool.mu.Lock() + defer pool.mu.Unlock() + pool.mevBundles[bundleHash] = bundle +} + // AddMevBundle adds a mev bundle to the pool func (pool *TxPool) AddMevBundle(txs types.Transactions, maxBlockNumber *big.Int, minTimestamp, maxTimestamp uint64, revertingTxHashes []common.Hash) (common.Hash, error) { + senders := make(map[common.Address]bool) + for _, tx := range txs { + txSender, _ := types.Sender(pool.signer, tx) + senders[txSender] = true + if tx.GasPrice() == nil || tx.GasPrice().Cmp(big.NewInt(int64(pool.config.PriceLimit))) < 0 { + return common.Hash{}, NewCustomError(fmt.Sprintf("tx gas price too low, expected %d at least", pool.config.PriceLimit), GasPriceTooLowErrorCode) + } + if len(senders) > 1 { + return common.Hash{}, DifferentSendersError + } + } + if pool.simulator == nil { - return common.Hash{}, errors.New("bundle simulator is nil") + return common.Hash{}, SimulatorMissingError } bundle := types.MevBundle{ Txs: txs, @@ -600,7 +625,7 @@ func (pool *TxPool) AddMevBundle(txs types.Transactions, maxBlockNumber *big.Int pool.mu.Lock() defer pool.mu.Unlock() if _, ok := pool.mevBundles[hash]; ok { - return common.Hash{}, errors.New("bundle already exist") + return common.Hash{}, BundleAlreadyExistError } if len(pool.mevBundles) > int(pool.config.BundleSlot) { leastPrice := big.NewInt(math.MaxInt64) @@ -612,7 +637,7 @@ func (pool *TxPool) AddMevBundle(txs types.Transactions, maxBlockNumber *big.Int } } if bundle.Price.Cmp(leastPrice) < 0 { - return common.Hash{}, ErrorBundlePoolIsFull + return common.Hash{}, BundlePoolFullError } delete(pool.mevBundles, leastBundleHash) } diff --git a/eth/api_backend.go b/eth/api_backend.go index 2d7e9b8597..baea60b49e 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -249,7 +249,7 @@ func (b *EthAPIBackend) SendBundle(ctx context.Context, txs types.Transactions, func (b *EthAPIBackend) BundlePrice() (*big.Int, error) { bundles := b.eth.txPool.AllMevBundles() - if len(bundles) == 0 { + if len(bundles) < b.eth.config.Miner.MaxSimulatBundles/2 { return big.NewInt(b.eth.config.Miner.MevGasPriceFloor), nil } sort.SliceStable(bundles, func(i, j int) bool { diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index a35893e278..fd5174580f 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "math/big" - "sort" "strings" "time" @@ -55,10 +54,12 @@ import ( const ( UnHealthyTimeout = 5 * time.Second - MaxBundleBlockDelay = 1200 - MaxBundleTimeDelay = 60 * 60 // second + MaxBundleBlockDelay = 100 + MaxBundleTimeDelay = 5 * 60 // second MaxOracleBlocks = 21 DropBlocks = 3 + + InvalidBundleParamError = -38000 ) // PublicEthereumAPI provides an API to access Ethereum related information. @@ -2521,28 +2522,10 @@ func (s *PrivateTxBundleAPI) BundlePrice(ctx context.Context) (*big.Int, error) // SendBundle will add the signed transaction to the transaction pool. // The sender is responsible for signing the transaction and using the correct nonce and ensuring validity func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs) (common.Hash, error) { - gasUsedRatio := make([]int, 0, MaxOracleBlocks) - block := s.b.CurrentBlock() - var err error - for i := 0; i < MaxOracleBlocks && block.NumberU64() > 1; i++ { - gasUsedRatio = append(gasUsedRatio, int(block.GasUsed()*100/block.GasLimit())) - block, err = s.b.BlockByHash(context.Background(), block.ParentHash()) - if err != nil { - break - } - } - sort.Ints(gasUsedRatio) - validGasUsedRatio := gasUsedRatio - if len(gasUsedRatio) > DropBlocks { - validGasUsedRatio = gasUsedRatio[DropBlocks:] - } - if len(validGasUsedRatio) == 0 { - return common.Hash{}, errors.New("no enough example ratio") - } var txs types.Transactions if len(args.Txs) == 0 { - return common.Hash{}, errors.New("bundle missing txs") + return common.Hash{}, core.NewCustomError("bundle missing txs", InvalidBundleParamError) } if args.MaxBlockNumber == 0 && (args.MaxTimestamp == nil || *args.MaxTimestamp == 0) { maxTimeStamp := uint64(time.Now().Unix()) + MaxBundleTimeDelay @@ -2550,19 +2533,20 @@ func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs } currentBlock := s.b.CurrentBlock() if args.MaxBlockNumber != 0 && args.MaxBlockNumber.Int64() > int64(currentBlock.NumberU64())+MaxBundleBlockDelay { - return common.Hash{}, errors.New("the maxBlockNumber should not be lager than currentBlockNum + 1200") + return common.Hash{}, core.NewCustomError("the maxBlockNumber should not be lager than currentBlockNum + 100", InvalidBundleParamError) } if args.MaxTimestamp != nil && *args.MaxTimestamp > currentBlock.Time()+uint64(MaxBundleTimeDelay) { - return common.Hash{}, errors.New("the maxTimestamp should not be later than currentBlockTimestamp + 1 hour") + return common.Hash{}, core.NewCustomError("the maxTimestamp should not be later than currentBlockTimestamp + 5 minutes", InvalidBundleParamError) } if args.MaxTimestamp != nil && args.MinTimestamp != nil && *args.MaxTimestamp != 0 && *args.MinTimestamp != 0 { if *args.MaxTimestamp <= *args.MinTimestamp { - return common.Hash{}, errors.New("the maxTimestamp should not be less than minTimestamp") + return common.Hash{}, core.NewCustomError("the maxTimestamp should not be less than minTimestamp", InvalidBundleParamError) } } if args.MinTimestamp != nil && *args.MinTimestamp > currentBlock.Time()+uint64(MaxBundleTimeDelay) { - return common.Hash{}, errors.New("the minTimestamp should not be later than currentBlockTimestamp + 1 hour") + return common.Hash{}, core.NewCustomError("the minTimestamp should not be later than currentBlockTimestamp + 5 minutes", InvalidBundleParamError) } + for _, encodedTx := range args.Txs { tx := new(types.Transaction) if err := tx.UnmarshalBinary(encodedTx); err != nil { diff --git a/miner/worker.go b/miner/worker.go index 0348d1ef19..e0de626f46 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -79,10 +79,19 @@ const ( // staleThreshold is the maximum depth of the acceptable stale block. staleThreshold = 11 + + // bundlePruneInterval is the interval to do bundle prune check + bundlePruneInterval = 3 * time.Second + + // ErrorCode + SimulatorTxFailedErrorCode = -38006 ) var ( commitTxsTimer = metrics.NewRegisteredTimer("worker/committxs", nil) + + SimulatorReceiptFailedError = core.NewCustomError("simulate tx success, while status of receipt is failed", -38004) + BundlePriceTooLowError = core.NewCustomError("no enough gas price for the bundle", -38005) ) // environment is the worker's current environment and holds all of the current state information. @@ -241,6 +250,9 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus go worker.newWorkLoop(recommit) go worker.resultLoop() go worker.taskLoop() + if worker.config.IsFlashbots { + go worker.bundlePruneLoop() + } // Submit first work to initialize pending state. if init { @@ -547,6 +559,41 @@ func (w *worker) mainLoop() { } } +func (w *worker) bundlePruneLoop() { + ticker := time.NewTicker(bundlePruneInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + parent := w.chain.CurrentBlock() + num := parent.Number() + header := &types.Header{ + ParentHash: parent.Hash(), + Number: num.Add(num, common.Big1), + GasLimit: core.CalcGasLimit(parent, w.config.GasFloor, w.config.GasCeil), + Extra: w.extra, + Time: uint64(time.Now().Unix()), + Difficulty: big.NewInt(2), + } + pruneBundles := func() { + bundles, err := w.eth.TxPool().MevBundles(num.Add(num, common.Big1), uint64(time.Now().Unix())) + log.Info("Total bundles", "n", len(bundles)) + if err != nil { + log.Error("Failed to fetch pending transactions", "err", err) + return + } + sort.SliceStable(bundles, func(i, j int) bool { + return bundles[j].Price.Cmp(bundles[i].Price) < 0 + }) + w.simulateBundles(bundles, w.coinbase, parent, header) + } + pruneBundles() + case <-w.exitCh: + return + } + } +} + // taskLoop is a standalone goroutine to fetch sealing task from the generator and // push them to consensus engine. func (w *worker) taskLoop() { @@ -979,7 +1026,7 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) } bundles, err := w.eth.TxPool().MevBundles(header.Number, header.Time) - log.Error("Total bundles", "n", len(bundles)) + log.Info("Total bundles", "n", len(bundles)) if err != nil { log.Error("Failed to fetch pending transactions", "err", err) return @@ -1195,18 +1242,20 @@ func (w *worker) computeBundleGas(bundle types.MevBundle, parent *types.Block, h if err == core.ErrGasLimitReached && !pruneGasExceed { log.Warn("bundle gas limit exceed", "hash", bundle.Hash.String(), "err", err) } else { + log.Warn("Prune bundle because of err", "hash", bundle.Hash.String(), "err", err) w.eth.TxPool().PruneBundle(bundle.Hash) } } - return simulatedBundle{}, err + return simulatedBundle{}, core.NewCustomError(err.Error(), SimulatorTxFailedErrorCode) } if receipt.Status == types.ReceiptStatusFailed && !containsHash(bundle.RevertingTxHashes, receipt.TxHash) { if prune { log.Warn("Prune bundle because of failed tx", "hash", bundle.Hash.String()) + w.eth.TxPool().PruneBundle(bundle.Hash) } - return simulatedBundle{}, errors.New("failed tx") + return simulatedBundle{}, SimulatorReceiptFailedError } totalGasUsed += receipt.GasUsed @@ -1228,7 +1277,7 @@ func (w *worker) computeBundleGas(bundle types.MevBundle, parent *types.Block, h log.Warn("Prune bundle because of not enough gas price", "hash", bundle.Hash.String()) w.eth.TxPool().PruneBundle(bundle.Hash) } - return simulatedBundle{}, errors.New("no enough gas price") + return simulatedBundle{}, BundlePriceTooLowError } return simulatedBundle{ diff --git a/miner/worker_test.go b/miner/worker_test.go index 0954d879d1..9190726724 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -37,6 +37,7 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" ) const ( @@ -221,6 +222,70 @@ func TestGenerateBlockAndImportCliqueWithMev(t *testing.T) { testGenerateBlockAndImport(t, true, true) } +func TestBundlePrune(t *testing.T) { + var ( + engine consensus.Engine + chainConfig *params.ChainConfig + db = rawdb.NewMemoryDatabase() + ) + chainConfig = params.AllCliqueProtocolChanges + chainConfig.Clique = ¶ms.CliqueConfig{Period: 1, Epoch: 30000} + engine = clique.New(chainConfig.Clique, db) + + w, b := newTestWorker(t, chainConfig, engine, db, 0) + w.config.IsFlashbots = true + w.config.MaxSimulatBundles = 100 + w.config.MevGasPriceFloor = 3 + b.txPool.SetBundleSimulator(&dummySimulator{}) + go w.bundlePruneLoop() + defer w.close() + + db2 := rawdb.NewMemoryDatabase() + b.genesis.MustCommit(db2) + chain, _ := core.NewBlockChain(db2, nil, b.chain.Config(), engine, vm.Config{}, nil, nil) + defer chain.Stop() + + // Ignore empty commit here for less noise. + w.skipSealHook = func(task *task) bool { + return len(task.receipts) == 0 + } + + // Wait for mined blocks. + sub := w.mux.Subscribe(core.NewMinedBlockEvent{}) + defer sub.Unsubscribe() + + // Start mining! + w.start() + + falseGasPriceTx, _ := types.SignTx(types.NewTransaction(b.txPool.Nonce(testBankAddress2), testUserAddress, big.NewInt(1000), params.TxGas, big.NewInt(1), nil), types.HomesteadSigner{}, testBankKey2) + falseNonceTx, _ := types.SignTx(types.NewTransaction(b.txPool.Nonce(testBankAddress2)-1, testUserAddress, big.NewInt(1000), params.TxGas, big.NewInt(4), nil), types.HomesteadSigner{}, testBankKey2) + + bundleTxs := []*types.Transaction{ + falseGasPriceTx, + falseNonceTx, + } + + for _, tx := range bundleTxs { + bundle := &types.MevBundle{ + Txs: []*types.Transaction{tx}, + MaxBlockNumber: nil, + MinTimestamp: 0, + MaxTimestamp: 0, + RevertingTxHashes: nil, + } + + bz, _ := rlp.EncodeToBytes(bundle) + hash := crypto.Keccak256Hash(bz) + bundle.Hash = hash + b.txPool.SetBundle(hash, bundle) + } + time.Sleep(bundlePruneInterval + 1*time.Second) + bundles := b.txPool.AllMevBundles() + if len(bundles) != 0 { + t.Fatalf("unexpected bundle size, want %d, actaul %d", 0, len(bundles)) + } +} + func testGenerateBlockAndImport(t *testing.T, isClique, mev bool) { var ( engine consensus.Engine @@ -268,6 +333,21 @@ func testGenerateBlockAndImport(t *testing.T, isClique, mev bool) { if err != nil { t.Fatalf("add mev failed %v", err) } + // Multiple sender should fail + _, err = b.txPool.AddMevBundle([]*types.Transaction{ + b.newRandomTx(false, testBankAddress2, testBankKey2), + b.newRandomTx(false, testBankAddress, testBankKey), + }, nil, 0, 0, nil) + if err == nil || err.Error() != "only one tx sender is allowed within one bundle" { + t.Fatalf("Unexpected error %v", err) + } + // Invalid gas price + lowGastx, _ := types.SignTx(types.NewTransaction(b.txPool.Nonce(testBankAddress2), testUserAddress, big.NewInt(1000), params.TxGas, big.NewInt(0), nil), types.HomesteadSigner{}, testBankKey2) + + _, err = b.txPool.AddMevBundle([]*types.Transaction{lowGastx}, nil, 0, 0, nil) + if err == nil || err.Error() != "tx gas price too low, expected 1 at least" { + t.Fatalf("Unexpected error %v", err) + } } for i := 0; i < 5; i++ { b.txPool.AddLocal(b.newRandomTx(true, testBankAddress, testBankKey))