Skip to content

Commit

Permalink
Bundle cancellations (ethereum#36)
Browse files Browse the repository at this point in the history
Introduces bundle replacement and cancellation via replacementUuid.
Since the replacement is tied to a specific sender, eth_sendBundle gets two additional optional fields: the replacement uuid and the signingAddress of the bundle submission.

The DB requests are done in the background, and cancellations are resolved while non-cancelable bundles are already being simulated to avoid waiting for DB to reply.
If anything goes wrong with the cancellations, the cancelable bundles are not considered.

Note: every block is now sent to the relay, as we can no longer rely on the highest-profit rule!
  • Loading branch information
Ruteri authored and avalonche committed Mar 9, 2023
1 parent 25aa4ba commit 76d9ef0
Show file tree
Hide file tree
Showing 19 changed files with 353 additions and 53 deletions.
16 changes: 7 additions & 9 deletions builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package builder
import (
"context"
"errors"
"math/big"
_ "os"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
blockvalidation "github.com/ethereum/go-ethereum/eth/block-validation"
"golang.org/x/time/rate"

Expand Down Expand Up @@ -236,23 +236,22 @@ func (b *Builder) runBuildingJob(slotCtx context.Context, proposerPubkey boostTy
var (
queueSignal = make(chan struct{}, 1)

queueMu sync.Mutex
queueLastSubmittedProfit = new(big.Int)
queueBestProfit = new(big.Int)
queueBestEntry blockQueueEntry
queueMu sync.Mutex
queueLastSubmittedHash common.Hash
queueBestEntry blockQueueEntry
)

log.Debug("runBuildingJob", "slot", attrs.Slot, "parent", attrs.HeadHash)

submitBestBlock := func() {
queueMu.Lock()
if queueLastSubmittedProfit.Cmp(queueBestProfit) < 0 {
if queueBestEntry.block.Hash() != queueLastSubmittedHash {
err := b.onSealedBlock(queueBestEntry.block, queueBestEntry.ordersCloseTime, queueBestEntry.sealedAt, queueBestEntry.commitedBundles, queueBestEntry.allBundles, proposerPubkey, vd, attrs)

if err != nil {
log.Error("could not run sealed block hook", "err", err)
} else {
queueLastSubmittedProfit.Set(queueBestProfit)
queueLastSubmittedHash = queueBestEntry.block.Hash()
}
}
queueMu.Unlock()
Expand All @@ -271,15 +270,14 @@ func (b *Builder) runBuildingJob(slotCtx context.Context, proposerPubkey boostTy

queueMu.Lock()
defer queueMu.Unlock()
if block.Profit.Cmp(queueBestProfit) > 0 {
if block.Hash() != queueLastSubmittedHash {
queueBestEntry = blockQueueEntry{
block: block,
ordersCloseTime: ordersCloseTime,
sealedAt: sealedAt,
commitedBundles: commitedBundles,
allBundles: allBundles,
}
queueBestProfit.Set(block.Profit)

select {
case queueSignal <- struct{}{}:
Expand Down
13 changes: 10 additions & 3 deletions builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,20 @@ func TestOnPayloadAttributes(t *testing.T) {

require.Equal(t, uint64(25), testRelay.requestedSlot)

// Clear the submitted message and check that the job will be ran again and but a new message will not be submitted since the profit is the same
// Clear the submitted message and check that the job will be ran again and but a new message will not be submitted since the hash is the same
testBlock.Profit = big.NewInt(10)
testRelay.submittedMsg = nil
time.Sleep(2200 * time.Millisecond)
require.Nil(t, testRelay.submittedMsg)

// Up the profit, expect to get the block
testEthService.testBlock.Profit.SetInt64(11)
// Change the hash, expect to get the block
testExecutableData.ExtraData = hexutil.MustDecode("0x0042fafd")
testExecutableData.BlockHash = common.HexToHash("0x0579b1aaca5c079c91e5774bac72c7f9bc2ddf2b126e9c632be68a1cb8f3fc71")
testBlock, err = beacon.ExecutableDataToBlock(*testExecutableData)
testBlock.Profit = big.NewInt(10)
require.NoError(t, err)
testEthService.testBlock = testBlock

time.Sleep(2200 * time.Millisecond)
require.NotNil(t, testRelay.submittedMsg)
}
1 change: 1 addition & 0 deletions builder/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *Config) error {
mevBundleCh := make(chan []types.MevBundle)
blockNumCh := make(chan int64)
bundleFetcher := flashbotsextra.NewBundleFetcher(backend, ds, blockNumCh, mevBundleCh, true)
backend.RegisterBundleFetcher(bundleFetcher)
go bundleFetcher.Run()
}

Expand Down
108 changes: 100 additions & 8 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package txpool

import (
"context"
"errors"
"fmt"
"math"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/google/uuid"
"golang.org/x/crypto/sha3"
)

Expand Down Expand Up @@ -282,6 +284,8 @@ type TxPool struct {
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)

changesSinceReorg int // A counter for how many drops we've performed in-between reorg.

bundleFetcher IFetcher
}

type txpoolResetRequest struct {
Expand Down Expand Up @@ -345,6 +349,17 @@ func NewTxPool(config Config, chainconfig *params.ChainConfig, chain blockChain)
return pool
}

type IFetcher interface {
GetLatestUuidBundles(ctx context.Context, blockNum int64) ([]types.LatestUuidBundle, error)
}

func (pool *TxPool) RegisterBundleFetcher(fetcher IFetcher) {
pool.mu.Lock()
defer pool.mu.Unlock()

pool.bundleFetcher = fetcher
}

// loop is the transaction pool's main event loop, waiting for and reacting to
// outside blockchain events as well as for various reporting and transaction
// eviction events.
Expand Down Expand Up @@ -582,21 +597,79 @@ func (pool *TxPool) Pending(enforceTips bool) map[common.Address]types.Transacti
return pending
}

/// AllMevBundles returns all the MEV Bundles currently in the pool
func (pool *TxPool) AllMevBundles() []types.MevBundle {
return pool.mevBundles
type uuidBundleKey struct {
Uuid uuid.UUID
SigningAddress common.Address
}

func (pool *TxPool) fetchLatestCancellableBundles(ctx context.Context, blockNumber *big.Int) (chan []types.LatestUuidBundle, chan error) {
if pool.bundleFetcher == nil {
return nil, nil
}
errCh := make(chan error, 1)
lubCh := make(chan []types.LatestUuidBundle, 1)
go func(blockNum int64) {
lub, err := pool.bundleFetcher.GetLatestUuidBundles(ctx, blockNum)
errCh <- err
lubCh <- lub
}(blockNumber.Int64())
return lubCh, errCh
}

func resolveCancellableBundles(lubCh chan []types.LatestUuidBundle, errCh chan error, uuidBundles map[uuidBundleKey][]types.MevBundle) []types.MevBundle {
if lubCh == nil || errCh == nil {
return nil
}

if len(uuidBundles) == 0 {
return nil
}

err := <-errCh
if err != nil {
log.Error("could not fetch latest bundles uuid map", "err", err)
return nil
}

currentCancellableBundles := []types.MevBundle{}

log.Trace("Processing uuid bundles", "uuidBundles", uuidBundles)

lubs := <-lubCh
for _, lub := range lubs {
ubk := uuidBundleKey{lub.Uuid, lub.SigningAddress}
bundles, found := uuidBundles[ubk]
if !found {
log.Trace("missing uuid bundle", "ubk", ubk)
continue
}
for _, bundle := range bundles {
if bundle.Hash == lub.BundleHash {
log.Trace("adding uuid bundle", "bundle hash", bundle.Hash.String(), "lub", lub)
currentCancellableBundles = append(currentCancellableBundles, bundle)
break
}
}
}
return currentCancellableBundles
}

// MevBundles returns a list of bundles valid for the given blockNumber/blockTimestamp
// also prunes bundles that are outdated
func (pool *TxPool) MevBundles(blockNumber *big.Int, blockTimestamp uint64) []types.MevBundle {
// Returns regular bundles and a function resolving to current cancellable bundles
func (pool *TxPool) MevBundles(blockNumber *big.Int, blockTimestamp uint64) ([]types.MevBundle, chan []types.MevBundle) {
pool.mu.Lock()
defer pool.mu.Unlock()

ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
lubCh, errCh := pool.fetchLatestCancellableBundles(ctx, blockNumber)

// returned values
var ret []types.MevBundle
// rolled over values
var bundles []types.MevBundle
// (uuid, signingAddress) -> list of bundles
var uuidBundles = make(map[uuidBundleKey][]types.MevBundle)

for _, bundle := range pool.mevBundles {
// Prune outdated bundles
Expand All @@ -610,14 +683,31 @@ func (pool *TxPool) MevBundles(blockNumber *big.Int, blockTimestamp uint64) []ty
continue
}

// return the ones which are in time
ret = append(ret, bundle)
// keep the bundles around internally until they need to be pruned
bundles = append(bundles, bundle)

// TODO: omit duplicates

// do not append to the return quite yet, check the DB for the latest bundle for that uuid
if bundle.Uuid != types.EmptyUUID {
ubk := uuidBundleKey{bundle.Uuid, bundle.SigningAddress}
uuidBundles[ubk] = append(uuidBundles[ubk], bundle)
continue
}

// return the ones which are in time
ret = append(ret, bundle)
}

pool.mevBundles = bundles
return ret

cancellableBundlesCh := make(chan []types.MevBundle, 1)
go func() {
cancellableBundlesCh <- resolveCancellableBundles(lubCh, errCh, uuidBundles)
cancel()
}()

return ret, cancellableBundlesCh
}

// AddMevBundles adds a mev bundles to the pool
Expand All @@ -630,7 +720,7 @@ func (pool *TxPool) AddMevBundles(mevBundles []types.MevBundle) error {
}

// AddMevBundle adds a mev bundle to the pool
func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, minTimestamp, maxTimestamp uint64, revertingTxHashes []common.Hash) error {
func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, replacementUuid uuid.UUID, signingAddress common.Address, minTimestamp, maxTimestamp uint64, revertingTxHashes []common.Hash) error {
bundleHasher := sha3.NewLegacyKeccak256()
for _, tx := range txs {
bundleHasher.Write(tx.Hash().Bytes())
Expand All @@ -643,6 +733,8 @@ func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, m
pool.mevBundles = append(pool.mevBundles, types.MevBundle{
Txs: txs,
BlockNumber: blockNumber,
Uuid: replacementUuid,
SigningAddress: signingAddress,
MinTimestamp: minTimestamp,
MaxTimestamp: maxTimestamp,
RevertingTxHashes: revertingTxHashes,
Expand Down
Loading

0 comments on commit 76d9ef0

Please sign in to comment.