diff --git a/builder/builder.go b/builder/builder.go index 12ef6ca1177b..6f8c70716743 100644 --- a/builder/builder.go +++ b/builder/builder.go @@ -3,11 +3,11 @@ package builder import ( "context" "errors" - "math/big" _ "os" "sync" "time" + "github.com/ethereum/go-ethereum/common" blockvalidation "github.com/ethereum/go-ethereum/eth/block-validation" "golang.org/x/time/rate" @@ -236,23 +236,22 @@ func (b *Builder) runBuildingJob(slotCtx context.Context, proposerPubkey boostTy var ( queueSignal = make(chan struct{}, 1) - queueMu sync.Mutex - queueLastSubmittedProfit = new(big.Int) - queueBestProfit = new(big.Int) - queueBestEntry blockQueueEntry + queueMu sync.Mutex + queueLastSubmittedHash common.Hash + queueBestEntry blockQueueEntry ) log.Debug("runBuildingJob", "slot", attrs.Slot, "parent", attrs.HeadHash) submitBestBlock := func() { queueMu.Lock() - if queueLastSubmittedProfit.Cmp(queueBestProfit) < 0 { + if queueBestEntry.block.Hash() != queueLastSubmittedHash { err := b.onSealedBlock(queueBestEntry.block, queueBestEntry.ordersCloseTime, queueBestEntry.sealedAt, queueBestEntry.commitedBundles, queueBestEntry.allBundles, proposerPubkey, vd, attrs) if err != nil { log.Error("could not run sealed block hook", "err", err) } else { - queueLastSubmittedProfit.Set(queueBestProfit) + queueLastSubmittedHash = queueBestEntry.block.Hash() } } queueMu.Unlock() @@ -271,7 +270,7 @@ func (b *Builder) runBuildingJob(slotCtx context.Context, proposerPubkey boostTy queueMu.Lock() defer queueMu.Unlock() - if block.Profit.Cmp(queueBestProfit) > 0 { + if block.Hash() != queueLastSubmittedHash { queueBestEntry = blockQueueEntry{ block: block, ordersCloseTime: ordersCloseTime, @@ -279,7 +278,6 @@ func (b *Builder) runBuildingJob(slotCtx context.Context, proposerPubkey boostTy commitedBundles: commitedBundles, allBundles: allBundles, } - queueBestProfit.Set(block.Profit) select { case queueSignal <- struct{}{}: diff --git a/builder/builder_test.go b/builder/builder_test.go index c1a413d3b4da..e191c7fefdbe 100644 --- a/builder/builder_test.go +++ b/builder/builder_test.go @@ -126,13 +126,20 @@ func TestOnPayloadAttributes(t *testing.T) { require.Equal(t, uint64(25), testRelay.requestedSlot) - // Clear the submitted message and check that the job will be ran again and but a new message will not be submitted since the profit is the same + // Clear the submitted message and check that the job will be ran again and but a new message will not be submitted since the hash is the same + testBlock.Profit = big.NewInt(10) testRelay.submittedMsg = nil time.Sleep(2200 * time.Millisecond) require.Nil(t, testRelay.submittedMsg) - // Up the profit, expect to get the block - testEthService.testBlock.Profit.SetInt64(11) + // Change the hash, expect to get the block + testExecutableData.ExtraData = hexutil.MustDecode("0x0042fafd") + testExecutableData.BlockHash = common.HexToHash("0x0579b1aaca5c079c91e5774bac72c7f9bc2ddf2b126e9c632be68a1cb8f3fc71") + testBlock, err = beacon.ExecutableDataToBlock(*testExecutableData) + testBlock.Profit = big.NewInt(10) + require.NoError(t, err) + testEthService.testBlock = testBlock + time.Sleep(2200 * time.Millisecond) require.NotNil(t, testRelay.submittedMsg) } diff --git a/builder/service.go b/builder/service.go index 2481bf5400d9..b49a0edf291c 100644 --- a/builder/service.go +++ b/builder/service.go @@ -185,6 +185,7 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *Config) error { mevBundleCh := make(chan []types.MevBundle) blockNumCh := make(chan int64) bundleFetcher := flashbotsextra.NewBundleFetcher(backend, ds, blockNumCh, mevBundleCh, true) + backend.RegisterBundleFetcher(bundleFetcher) go bundleFetcher.Run() } diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 38b813a9ecc8..92f2386641bd 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -17,6 +17,7 @@ package txpool import ( + "context" "errors" "fmt" "math" @@ -36,6 +37,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" + "github.com/google/uuid" "golang.org/x/crypto/sha3" ) @@ -282,6 +284,8 @@ type TxPool struct { initDoneCh chan struct{} // is closed once the pool is initialized (for tests) changesSinceReorg int // A counter for how many drops we've performed in-between reorg. + + bundleFetcher IFetcher } type txpoolResetRequest struct { @@ -345,6 +349,17 @@ func NewTxPool(config Config, chainconfig *params.ChainConfig, chain blockChain) return pool } +type IFetcher interface { + GetLatestUuidBundles(ctx context.Context, blockNum int64) ([]types.LatestUuidBundle, error) +} + +func (pool *TxPool) RegisterBundleFetcher(fetcher IFetcher) { + pool.mu.Lock() + defer pool.mu.Unlock() + + pool.bundleFetcher = fetcher +} + // loop is the transaction pool's main event loop, waiting for and reacting to // outside blockchain events as well as for various reporting and transaction // eviction events. @@ -582,21 +597,79 @@ func (pool *TxPool) Pending(enforceTips bool) map[common.Address]types.Transacti return pending } -/// AllMevBundles returns all the MEV Bundles currently in the pool -func (pool *TxPool) AllMevBundles() []types.MevBundle { - return pool.mevBundles +type uuidBundleKey struct { + Uuid uuid.UUID + SigningAddress common.Address +} + +func (pool *TxPool) fetchLatestCancellableBundles(ctx context.Context, blockNumber *big.Int) (chan []types.LatestUuidBundle, chan error) { + if pool.bundleFetcher == nil { + return nil, nil + } + errCh := make(chan error, 1) + lubCh := make(chan []types.LatestUuidBundle, 1) + go func(blockNum int64) { + lub, err := pool.bundleFetcher.GetLatestUuidBundles(ctx, blockNum) + errCh <- err + lubCh <- lub + }(blockNumber.Int64()) + return lubCh, errCh +} + +func resolveCancellableBundles(lubCh chan []types.LatestUuidBundle, errCh chan error, uuidBundles map[uuidBundleKey][]types.MevBundle) []types.MevBundle { + if lubCh == nil || errCh == nil { + return nil + } + + if len(uuidBundles) == 0 { + return nil + } + + err := <-errCh + if err != nil { + log.Error("could not fetch latest bundles uuid map", "err", err) + return nil + } + + currentCancellableBundles := []types.MevBundle{} + + log.Trace("Processing uuid bundles", "uuidBundles", uuidBundles) + + lubs := <-lubCh + for _, lub := range lubs { + ubk := uuidBundleKey{lub.Uuid, lub.SigningAddress} + bundles, found := uuidBundles[ubk] + if !found { + log.Trace("missing uuid bundle", "ubk", ubk) + continue + } + for _, bundle := range bundles { + if bundle.Hash == lub.BundleHash { + log.Trace("adding uuid bundle", "bundle hash", bundle.Hash.String(), "lub", lub) + currentCancellableBundles = append(currentCancellableBundles, bundle) + break + } + } + } + return currentCancellableBundles } // MevBundles returns a list of bundles valid for the given blockNumber/blockTimestamp // also prunes bundles that are outdated -func (pool *TxPool) MevBundles(blockNumber *big.Int, blockTimestamp uint64) []types.MevBundle { +// Returns regular bundles and a function resolving to current cancellable bundles +func (pool *TxPool) MevBundles(blockNumber *big.Int, blockTimestamp uint64) ([]types.MevBundle, chan []types.MevBundle) { pool.mu.Lock() defer pool.mu.Unlock() + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + lubCh, errCh := pool.fetchLatestCancellableBundles(ctx, blockNumber) + // returned values var ret []types.MevBundle // rolled over values var bundles []types.MevBundle + // (uuid, signingAddress) -> list of bundles + var uuidBundles = make(map[uuidBundleKey][]types.MevBundle) for _, bundle := range pool.mevBundles { // Prune outdated bundles @@ -610,14 +683,31 @@ func (pool *TxPool) MevBundles(blockNumber *big.Int, blockTimestamp uint64) []ty continue } - // return the ones which are in time - ret = append(ret, bundle) // keep the bundles around internally until they need to be pruned bundles = append(bundles, bundle) + + // TODO: omit duplicates + + // do not append to the return quite yet, check the DB for the latest bundle for that uuid + if bundle.Uuid != types.EmptyUUID { + ubk := uuidBundleKey{bundle.Uuid, bundle.SigningAddress} + uuidBundles[ubk] = append(uuidBundles[ubk], bundle) + continue + } + + // return the ones which are in time + ret = append(ret, bundle) } pool.mevBundles = bundles - return ret + + cancellableBundlesCh := make(chan []types.MevBundle, 1) + go func() { + cancellableBundlesCh <- resolveCancellableBundles(lubCh, errCh, uuidBundles) + cancel() + }() + + return ret, cancellableBundlesCh } // AddMevBundles adds a mev bundles to the pool @@ -630,7 +720,7 @@ func (pool *TxPool) AddMevBundles(mevBundles []types.MevBundle) error { } // AddMevBundle adds a mev bundle to the pool -func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, minTimestamp, maxTimestamp uint64, revertingTxHashes []common.Hash) error { +func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, replacementUuid uuid.UUID, signingAddress common.Address, minTimestamp, maxTimestamp uint64, revertingTxHashes []common.Hash) error { bundleHasher := sha3.NewLegacyKeccak256() for _, tx := range txs { bundleHasher.Write(tx.Hash().Bytes()) @@ -643,6 +733,8 @@ func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, m pool.mevBundles = append(pool.mevBundles, types.MevBundle{ Txs: txs, BlockNumber: blockNumber, + Uuid: replacementUuid, + SigningAddress: signingAddress, MinTimestamp: minTimestamp, MaxTimestamp: maxTimestamp, RevertingTxHashes: revertingTxHashes, diff --git a/core/txpool/txpool_test.go b/core/txpool/txpool_test.go index 237f97afe434..5bcc59c25a9e 100644 --- a/core/txpool/txpool_test.go +++ b/core/txpool/txpool_test.go @@ -17,6 +17,7 @@ package txpool import ( + "context" "crypto/ecdsa" crand "crypto/rand" "errors" @@ -36,7 +37,10 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/test_utils" "github.com/ethereum/go-ethereum/trie" + "github.com/google/uuid" + "github.com/stretchr/testify/require" ) var ( @@ -2422,6 +2426,117 @@ func TestSlotCount(t *testing.T) { } } +// TODO: test bundle cancellations +func TestBundleCancellations(t *testing.T) { + // Create the pool to test the status retrievals with + statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) + blockchain := &testBlockChain{100, statedb, new(event.Feed)} + + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + fetcher := &mockFetcher{make(map[int64]error), make(map[int64][]types.LatestUuidBundle)} + pool.RegisterBundleFetcher(fetcher) + + fetcher.resps[1] = nil + bundles, ccBundles := pool.MevBundles(big.NewInt(1), 20) + require.Equal(t, []types.MevBundle(nil), bundles) + require.Equal(t, []types.MevBundle(nil), <-ccBundles) + + bundle01_uuid1_signer1 := types.MevBundle{ + BlockNumber: big.NewInt(1), + Uuid: uuid.New(), + SigningAddress: common.Address{0x01}, + Hash: common.Hash{0xf0}, + } + bundle02_no_uuid_signer2 := types.MevBundle{ + BlockNumber: big.NewInt(1), + Uuid: types.EmptyUUID, + SigningAddress: common.Address{0x02}, + Hash: common.Hash{0xf1}, + } + bundle03_uuid1_signer1 := types.MevBundle{ + BlockNumber: big.NewInt(1), + Uuid: bundle01_uuid1_signer1.Uuid, + SigningAddress: common.Address{0x01}, + Hash: common.Hash{0xf3}, + } + bundle03_uuid1_signer2 := types.MevBundle{ + BlockNumber: big.NewInt(1), + Uuid: bundle01_uuid1_signer1.Uuid, + SigningAddress: common.Address{0x02}, + Hash: common.Hash{0xf3}, + } + + err := pool.AddMevBundles([]types.MevBundle{ + bundle01_uuid1_signer1, bundle02_no_uuid_signer2, bundle03_uuid1_signer1, bundle03_uuid1_signer2, + }) + require.NoError(t, err) + + // Ignores uuid bundle since fetcher does not return it, passes non-uuid bundle + bundles, ccBundles = pool.MevBundles(big.NewInt(1), 20) + require.Equal(t, []types.MevBundle{bundle02_no_uuid_signer2}, bundles) + cr := test_utils.RequireChan[[]types.MevBundle](ccBundles, time.Millisecond) + require.False(t, cr.Timeout) + require.Equal(t, []types.MevBundle{}, cr.Value) + + fetcher.resps[1] = append(fetcher.resps[1], types.LatestUuidBundle{ + Uuid: bundle01_uuid1_signer1.Uuid, + SigningAddress: bundle01_uuid1_signer1.SigningAddress, + BundleHash: bundle01_uuid1_signer1.Hash, + }) + + // Passes non-uuid bundle and only the bundle exactly matching the fetcher resp + bundles, ccBundles = pool.MevBundles(big.NewInt(1), 20) + require.Equal(t, []types.MevBundle{bundle02_no_uuid_signer2}, bundles) + cr = test_utils.RequireChan[[]types.MevBundle](ccBundles, time.Millisecond) + require.False(t, cr.Timeout) + require.Equal(t, []types.MevBundle{bundle01_uuid1_signer1}, cr.Value) + + fetcher.resps[1] = []types.LatestUuidBundle{ + types.LatestUuidBundle{ + Uuid: bundle03_uuid1_signer1.Uuid, + SigningAddress: bundle03_uuid1_signer1.SigningAddress, + BundleHash: bundle03_uuid1_signer1.Hash, + }, + } + + // Passes non-uuid bundle and only the bundle exactly matching the fetcher resp + bundles, ccBundles = pool.MevBundles(big.NewInt(1), 20) + require.Equal(t, []types.MevBundle{bundle02_no_uuid_signer2}, bundles) + cr = test_utils.RequireChan[[]types.MevBundle](ccBundles, time.Millisecond) + require.False(t, cr.Timeout) + require.Equal(t, []types.MevBundle{bundle03_uuid1_signer1}, cr.Value) + + fetcher.resps[1] = append(fetcher.resps[1], types.LatestUuidBundle{ + Uuid: bundle03_uuid1_signer2.Uuid, + SigningAddress: bundle03_uuid1_signer2.SigningAddress, + BundleHash: bundle03_uuid1_signer2.Hash, + }) + + // Passes non-uuid bundle and both bundles exactly matching the fetcher resp for the same hash + bundles, ccBundles = pool.MevBundles(big.NewInt(1), 20) + require.Equal(t, []types.MevBundle{bundle02_no_uuid_signer2}, bundles) + cr = test_utils.RequireChan[[]types.MevBundle](ccBundles, time.Millisecond) + require.False(t, cr.Timeout) + require.Equal(t, []types.MevBundle{bundle03_uuid1_signer1, bundle03_uuid1_signer2}, cr.Value) +} + +type mockFetcher struct { + errorResps map[int64]error + resps map[int64][]types.LatestUuidBundle +} + +func (f *mockFetcher) GetLatestUuidBundles(_ context.Context, blockNum int64) ([]types.LatestUuidBundle, error) { + if err, found := f.errorResps[blockNum]; found { + return nil, err + } + + if resp, found := f.resps[blockNum]; found { + return resp, nil + } + + return nil, errors.New("unexpected block number") +} + // Benchmarks the speed of validating the contents of the pending queue of the // transaction pool. func BenchmarkPendingDemotion100(b *testing.B) { benchmarkPendingDemotion(b, 100) } diff --git a/core/types/transaction.go b/core/types/transaction.go index 92ce7c61ad1c..b9997dbe7c41 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rlp" + "github.com/google/uuid" ) var ( @@ -733,9 +734,19 @@ func copyAddressPtr(a *common.Address) *common.Address { return &cpy } +var EmptyUUID uuid.UUID + +type LatestUuidBundle struct { + Uuid uuid.UUID + SigningAddress common.Address + BundleHash common.Hash +} + type MevBundle struct { Txs Transactions BlockNumber *big.Int + Uuid uuid.UUID + SigningAddress common.Address MinTimestamp uint64 MaxTimestamp uint64 RevertingTxHashes []common.Hash diff --git a/eth/api_backend.go b/eth/api_backend.go index 2bb841f4793c..6ece439e8217 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -40,6 +40,7 @@ import ( "github.com/ethereum/go-ethereum/miner" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" + "github.com/google/uuid" ) // EthAPIBackend implements ethapi.Backend for full nodes @@ -269,8 +270,8 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction, } } -func (b *EthAPIBackend) SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error { - return b.eth.txPool.AddMevBundle(txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes) +func (b *EthAPIBackend) SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, uuid uuid.UUID, signingAddress common.Address, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error { + return b.eth.txPool.AddMevBundle(txs, big.NewInt(blockNumber.Int64()), uuid, signingAddress, minTimestamp, maxTimestamp, revertingTxHashes) } func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { diff --git a/eth/backend.go b/eth/backend.go index 4a14e16536da..727b27f75afb 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -317,6 +317,10 @@ func (s *Ethereum) APIs() []rpc.API { }...) } +func (s *Ethereum) RegisterBundleFetcher(fetcher core.IFetcher) { + s.txPool.RegisterBundleFetcher(fetcher) +} + func (s *Ethereum) ResetWithGenesisBlock(gb *types.Block) { s.blockchain.ResetWithGenesisBlock(gb) } diff --git a/flashbotsextra/database.go b/flashbotsextra/database.go index 1375e9214a87..fbb077ad7504 100644 --- a/flashbotsextra/database.go +++ b/flashbotsextra/database.go @@ -6,6 +6,7 @@ import ( "math/big" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" boostTypes "github.com/flashbots/go-boost-utils/types" @@ -21,6 +22,7 @@ const ( type IDatabaseService interface { ConsumeBuiltBlock(block *types.Block, OrdersClosedAt time.Time, sealedAt time.Time, commitedBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle, bidTrace *boostTypes.BidTrace) GetPriorityBundles(ctx context.Context, blockNum int64, isHighPrio bool) ([]DbBundle, error) + GetLatestUuidBundles(ctx context.Context, blockNum int64) ([]types.LatestUuidBundle, error) } type NilDbService struct{} @@ -32,12 +34,17 @@ func (NilDbService) GetPriorityBundles(ctx context.Context, blockNum int64, isHi return []DbBundle{}, nil } +func (NilDbService) GetLatestUuidBundles(ctx context.Context, blockNum int64) ([]types.LatestUuidBundle, error) { + return []types.LatestUuidBundle{}, nil +} + type DatabaseService struct { db *sqlx.DB - insertBuiltBlockStmt *sqlx.NamedStmt - insertMissingBundleStmt *sqlx.NamedStmt - fetchPrioBundlesStmt *sqlx.NamedStmt + insertBuiltBlockStmt *sqlx.NamedStmt + insertMissingBundleStmt *sqlx.NamedStmt + fetchPrioBundlesStmt *sqlx.NamedStmt + fetchGetLatestUuidBundlesStmt *sqlx.NamedStmt } func NewDatabaseService(postgresDSN string) (*DatabaseService, error) { @@ -60,11 +67,18 @@ func NewDatabaseService(postgresDSN string) (*DatabaseService, error) { if err != nil { return nil, err } + + fetchGetLatestUuidBundlesStmt, err := db.PrepareNamed("select replacement_uuid, signing_address, bundle_hash from latest_uuid_bundle where target_block_number = :target_block_number") + if err != nil { + return nil, err + } + return &DatabaseService{ - db: db, - insertBuiltBlockStmt: insertBuiltBlockStmt, - insertMissingBundleStmt: insertMissingBundleStmt, - fetchPrioBundlesStmt: fetchPrioBundlesStmt, + db: db, + insertBuiltBlockStmt: insertBuiltBlockStmt, + insertMissingBundleStmt: insertMissingBundleStmt, + fetchPrioBundlesStmt: fetchPrioBundlesStmt, + fetchGetLatestUuidBundlesStmt: fetchGetLatestUuidBundlesStmt, }, nil } @@ -260,21 +274,29 @@ func (ds *DatabaseService) ConsumeBuiltBlock(block *types.Block, ordersClosedAt } func (ds *DatabaseService) GetPriorityBundles(ctx context.Context, blockNum int64, isHighPrio bool) ([]DbBundle, error) { var bundles []DbBundle - tx, err := ds.db.Beginx() - if err != nil { - log.Error("failed to begin db tx for get priority bundles", "err", err) - return nil, err - } arg := map[string]interface{}{"param_block_number": uint64(blockNum), "is_high_prio": isHighPrio, "limit": lowPrioLimitSize} if isHighPrio { arg["limit"] = highPrioLimitSize } - if err = tx.NamedStmtContext(ctx, ds.fetchPrioBundlesStmt).SelectContext(ctx, &bundles, arg); err != nil { + if err := ds.fetchPrioBundlesStmt.SelectContext(ctx, &bundles, arg); err != nil { return nil, err } - err = tx.Commit() - if err != nil { - log.Error("could not commit GetPriorityBundles transaction", "err", err) - } return bundles, nil } + +func (ds *DatabaseService) GetLatestUuidBundles(ctx context.Context, blockNum int64) ([]types.LatestUuidBundle, error) { + var dstLatestBundles []DbLatestUuidBundle + kwArg := map[string]interface{}{"target_block_number": blockNum} + if err := ds.fetchGetLatestUuidBundlesStmt.SelectContext(ctx, &dstLatestBundles, kwArg); err != nil { + return nil, err + } + latestBundles := make([]types.LatestUuidBundle, 0, len(dstLatestBundles)) + for _, dbLub := range dstLatestBundles { + latestBundles = append(latestBundles, types.LatestUuidBundle{ + Uuid: dbLub.Uuid, + SigningAddress: common.HexToAddress(dbLub.SigningAddress), + BundleHash: common.HexToHash(dbLub.BundleHash), + }) + } + return latestBundles, nil +} diff --git a/flashbotsextra/database_types.go b/flashbotsextra/database_types.go index 560d61699610..bf4ebfe391f4 100644 --- a/flashbotsextra/database_types.go +++ b/flashbotsextra/database_types.go @@ -5,7 +5,10 @@ import ( "strings" "time" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/google/uuid" ) type BuiltBlock struct { @@ -51,6 +54,12 @@ type DbBundle struct { EthSentToCoinbase string `db:"eth_sent_to_coinbase"` } +type DbLatestUuidBundle struct { + Uuid uuid.UUID `db:"replacement_uuid"` + SigningAddress string `db:"signing_address"` + BundleHash string `db:"bundle_hash"` +} + type blockAndBundleId struct { BlockId uint64 `db:"block_id"` BundleId uint64 `db:"bundle_id"` @@ -64,7 +73,12 @@ func SimulatedBundleToDbBundle(bundle *types.SimulatedBundle) DbBundle { paramRevertingTxHashes := strings.Join(revertingTxHashes, ",") signedTxsStrings := make([]string, len(bundle.OriginalBundle.Txs)) for i, tx := range bundle.OriginalBundle.Txs { - signedTxsStrings[i] = tx.Hash().String() + txBytes, err := tx.MarshalBinary() + if err != nil { + log.Error("could not marshal tx bytes", "err", err) + continue + } + signedTxsStrings[i] = hexutil.Encode(txBytes) } return DbBundle{ diff --git a/flashbotsextra/fetcher.go b/flashbotsextra/fetcher.go index 484aa1a4ee56..9b966dc7e116 100644 --- a/flashbotsextra/fetcher.go +++ b/flashbotsextra/fetcher.go @@ -74,6 +74,10 @@ func (b *bundleFetcher) Run() { go b.fetchAndPush(context.Background(), pushMevBundles) } +func (b *bundleFetcher) GetLatestUuidBundles(ctx context.Context, blockNum int64) ([]types.LatestUuidBundle, error) { + return b.db.GetLatestUuidBundles(ctx, blockNum) +} + func (b *bundleFetcher) fetchAndPush(ctx context.Context, pushMevBundles func(bundles []DbBundle)) { var currentBlockNum int64 lowPrioBundleTicker := time.NewTicker(time.Second * 2) diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index a94f7c317bc5..d117b6dec562 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -46,6 +46,7 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" + "github.com/google/uuid" "github.com/tyler-smith/go-bip39" "golang.org/x/crypto/sha3" ) @@ -2128,6 +2129,8 @@ func NewPrivateTxBundleAPI(b Backend) *PrivateTxBundleAPI { type SendBundleArgs struct { Txs []hexutil.Bytes `json:"txs"` BlockNumber rpc.BlockNumber `json:"blockNumber"` + ReplacementUuid *uuid.UUID `json:"replacementUuid"` + SigningAddress *common.Address `json:"signingAddress"` MinTimestamp *uint64 `json:"minTimestamp"` MaxTimestamp *uint64 `json:"maxTimestamp"` RevertingTxHashes []common.Hash `json:"revertingTxHashes"` @@ -2152,6 +2155,16 @@ func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs txs = append(txs, tx) } + var replacementUuid uuid.UUID + if args.ReplacementUuid != nil { + replacementUuid = *args.ReplacementUuid + } + + var signingAddress common.Address + if args.SigningAddress != nil { + signingAddress = *args.SigningAddress + } + var minTimestamp, maxTimestamp uint64 if args.MinTimestamp != nil { minTimestamp = *args.MinTimestamp @@ -2160,7 +2173,7 @@ func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs maxTimestamp = *args.MaxTimestamp } - return s.b.SendBundle(ctx, txs, args.BlockNumber, minTimestamp, maxTimestamp, args.RevertingTxHashes) + return s.b.SendBundle(ctx, txs, args.BlockNumber, replacementUuid, signingAddress, minTimestamp, maxTimestamp, args.RevertingTxHashes) } // BundleAPI offers an API for accepting bundled transactions diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 11ae7f973ff9..8347d70454f6 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -35,6 +35,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" + "github.com/google/uuid" ) // Backend interface provides the common API services (that are provided by @@ -75,7 +76,7 @@ type Backend interface { // Transaction pool API SendTx(ctx context.Context, signedTx *types.Transaction, private bool) error - SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error + SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, uuid uuid.UUID, signingAddress common.Address, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) GetPoolTransactions() (types.Transactions, error) GetPoolTransaction(txHash common.Hash) *types.Transaction diff --git a/internal/ethapi/transaction_args_test.go b/internal/ethapi/transaction_args_test.go index 57268bdfb874..be88d9823040 100644 --- a/internal/ethapi/transaction_args_test.go +++ b/internal/ethapi/transaction_args_test.go @@ -38,6 +38,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" + "github.com/google/uuid" ) // TestSetFeeDefaults tests the logic for filling in default fee values works as expected. @@ -216,7 +217,7 @@ func (b *backendMock) SendMegabundle(ctx context.Context, txs types.Transactions return nil } -func (b *backendMock) SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error { +func (b *backendMock) SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, replacementUuid uuid.UUID, signingAddress common.Address, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error { return nil } diff --git a/les/api_backend.go b/les/api_backend.go index 957685e207ab..b8b78b593409 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -39,6 +39,7 @@ import ( "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" + "github.com/google/uuid" ) type LesApiBackend struct { @@ -201,8 +202,8 @@ func (b *LesApiBackend) RemoveTx(txHash common.Hash) { b.eth.txPool.RemoveTx(txHash) } -func (b *LesApiBackend) SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error { - return b.eth.txPool.AddMevBundle(txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes) +func (b *LesApiBackend) SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, uuid uuid.UUID, signingAddress common.Address, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error { + return b.eth.txPool.AddMevBundle(txs, big.NewInt(blockNumber.Int64()), uuid, signingAddress, minTimestamp, maxTimestamp, revertingTxHashes) } func (b *LesApiBackend) SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error { diff --git a/light/txpool.go b/light/txpool.go index ee2e53997c61..ffd1f09c2bb5 100644 --- a/light/txpool.go +++ b/light/txpool.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" + "github.com/google/uuid" ) const ( @@ -562,6 +563,6 @@ func (pool *TxPool) MevBundles(blockNumber *big.Int, blockTimestamp uint64) ([]t } // AddMevBundle adds a mev bundle to the pool -func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error { +func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, replacementUuid uuid.UUID, signingAddress common.Address, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error { return nil } diff --git a/miner/contract_simulator_test.go b/miner/contract_simulator_test.go index a36cc335a112..b5fdf35d05e3 100644 --- a/miner/contract_simulator_test.go +++ b/miner/contract_simulator_test.go @@ -24,6 +24,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" tu "github.com/ethereum/go-ethereum/test_utils" + "github.com/google/uuid" "github.com/stretchr/testify/require" ) @@ -266,8 +267,7 @@ func TestSimulatorState(t *testing.T) { targetBlockNumber := new(big.Int).Set(b.chain.CurrentHeader().Number) targetBlockNumber.Add(targetBlockNumber, big.NewInt(1)) - b.txPool.AddMevBundle(types.Transactions{userSwapTx, backrunTx}, targetBlockNumber, 0, 0, nil) - + b.txPool.AddMevBundle(types.Transactions{userSwapTx, backrunTx}, targetBlockNumber, uuid.UUID{}, common.Address{}, 0, 0, nil) buildBlock([]*types.Transaction{}, 3) } diff --git a/miner/worker.go b/miner/worker.go index 2a98472869d1..404e3eff0f6d 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1345,13 +1345,15 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) (error, [] var blockBundles []types.SimulatedBundle var allBundles []types.SimulatedBundle if w.flashbots.isFlashbots { - bundles := w.eth.TxPool().MevBundles(env.header.Number, env.header.Time) + bundles, ccBundleCh := w.eth.TxPool().MevBundles(env.header.Number, env.header.Time) + bundles = append(bundles, <-ccBundleCh...) var bundleTxs types.Transactions var resultingBundle simulatedBundle var mergedBundles []types.SimulatedBundle var numBundles int var err error + // Sets allBundles in outer scope bundleTxs, resultingBundle, mergedBundles, numBundles, allBundles, err = w.generateFlashbotsBundle(env, bundles, pending) if err != nil { log.Error("Failed to generate flashbots bundle", "err", err) @@ -1412,16 +1414,27 @@ func (w *worker) getSimulatedBundles(env *environment) ([]types.SimulatedBundle, return nil, nil } - bundles := w.eth.TxPool().MevBundles(env.header.Number, env.header.Time) + bundles, ccBundlesCh := w.eth.TxPool().MevBundles(env.header.Number, env.header.Time) // TODO: consider interrupt simBundles, err := w.simulateBundles(env, bundles, nil) /* do not consider gas impact of mempool txs as bundles are treated as transactions wrt ordering */ if err != nil { - log.Error("Failed to simulate flashbots bundles", "err", err) + log.Error("Failed to simulate bundles", "err", err) return nil, err } - return simBundles, nil + ccBundles := <-ccBundlesCh + if ccBundles == nil { + return simBundles, nil + } + + simCcBundles, err := w.simulateBundles(env, ccBundles, nil) /* do not consider gas impact of mempool txs as bundles are treated as transactions wrt ordering */ + if err != nil { + log.Error("Failed to simulate cc bundles", "err", err) + return simBundles, nil + } + + return append(simBundles, simCcBundles...), nil } // generateWork generates a sealing block based on the given parameters. diff --git a/miner/worker_test.go b/miner/worker_test.go index 82f01f3e88c7..13586261f391 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -741,6 +741,7 @@ func TestSimulateBundles(t *testing.T) { } func testBundles(t *testing.T) { + // TODO: test cancellations db := rawdb.NewMemoryDatabase() chainConfig := params.AllEthashProtocolChanges engine := ethash.NewFaker() @@ -819,7 +820,7 @@ func testBundles(t *testing.T) { blockNumber := big.NewInt(0).Add(w.chain.CurrentBlock().Number(), big.NewInt(1)) for _, bundle := range bundles { - err := b.txPool.AddMevBundle(bundle.Txs, blockNumber, 0, 0, nil) + err := b.txPool.AddMevBundle(bundle.Txs, blockNumber, types.EmptyUUID, common.Address{}, 0, 0, nil) require.NoError(t, err) }