diff --git a/builder/block_submission_rate_limiter_test.go b/builder/block_submission_rate_limiter_test.go index 9a0ce5b55d3c..4d30e75912cc 100644 --- a/builder/block_submission_rate_limiter_test.go +++ b/builder/block_submission_rate_limiter_test.go @@ -68,5 +68,4 @@ func TestLimit(t *testing.T) { t.Error("chan was not ready") } } - } diff --git a/builder/builder.go b/builder/builder.go index b7287ecaac86..32aa3716c816 100644 --- a/builder/builder.go +++ b/builder/builder.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/beacon" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/flashbotsextra" "github.com/ethereum/go-ethereum/log" "github.com/flashbots/go-boost-utils/bls" @@ -43,23 +44,22 @@ type IBuilder interface { } type Builder struct { - ds IDatabaseService + ds flashbotsextra.IDatabaseService beaconClient IBeaconClient relay IRelay eth IEthereumService resubmitter Resubmitter blockSubmissionRateLimiter *BlockSubmissionRateLimiter - - builderSecretKey *bls.SecretKey - builderPublicKey boostTypes.PublicKey - builderSigningDomain boostTypes.Domain + builderSecretKey *bls.SecretKey + builderPublicKey boostTypes.PublicKey + builderSigningDomain boostTypes.Domain bestMu sync.Mutex bestAttrs BuilderPayloadAttributes bestBlockProfit *big.Int } -func NewBuilder(sk *bls.SecretKey, ds IDatabaseService, bc IBeaconClient, relay IRelay, builderSigningDomain boostTypes.Domain, eth IEthereumService) *Builder { +func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, bc IBeaconClient, relay IRelay, builderSigningDomain boostTypes.Domain, eth IEthereumService) *Builder { pkBytes := bls.PublicKeyFromSecretKey(sk).Compress() pk := boostTypes.PublicKey{} pk.FromSlice(pkBytes) diff --git a/builder/builder_test.go b/builder/builder_test.go index 2f143bf44626..b3a3f815d926 100644 --- a/builder/builder_test.go +++ b/builder/builder_test.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/beacon" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/flashbotsextra" "github.com/flashbots/go-boost-utils/bls" boostTypes "github.com/flashbots/go-boost-utils/types" "github.com/stretchr/testify/require" @@ -74,7 +75,7 @@ func TestOnPayloadAttributes(t *testing.T) { testEthService := &testEthereumService{synced: true, testExecutableData: testExecutableData, testBlock: testBlock} - builder := NewBuilder(sk, NilDbService{}, &testBeacon, &testRelay, bDomain, testEthService) + builder := NewBuilder(sk, flashbotsextra.NilDbService{}, &testBeacon, &testRelay, bDomain, testEthService) builder.Start() defer builder.Stop() diff --git a/builder/local_relay_test.go b/builder/local_relay_test.go index b101029b4816..c30779a3fa46 100644 --- a/builder/local_relay_test.go +++ b/builder/local_relay_test.go @@ -14,6 +14,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/beacon" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/flashbotsextra" "github.com/ethereum/go-ethereum/log" "github.com/flashbots/go-boost-utils/bls" boostTypes "github.com/flashbots/go-boost-utils/types" @@ -29,7 +30,7 @@ func newTestBackend(t *testing.T, forkchoiceData *beacon.ExecutableDataV1, block beaconClient := &testBeaconClient{validator: validator} localRelay := NewLocalRelay(sk, beaconClient, bDomain, cDomain, ForkData{}, true) ethService := &testEthereumService{synced: true, testExecutableData: forkchoiceData, testBlock: block} - backend := NewBuilder(sk, NilDbService{}, beaconClient, localRelay, bDomain, ethService) + backend := NewBuilder(sk, flashbotsextra.NilDbService{}, beaconClient, localRelay, bDomain, ethService) // service := NewService("127.0.0.1:31545", backend) return backend, localRelay, validator diff --git a/builder/resubmitter_test.go b/builder/resubmitter_test.go index 516e5c731764..e60ff3ef7711 100644 --- a/builder/resubmitter_test.go +++ b/builder/resubmitter_test.go @@ -9,7 +9,6 @@ import ( ) func TestResubmitter(t *testing.T) { - resubmitter := Resubmitter{} pingCh := make(chan error) diff --git a/builder/service.go b/builder/service.go index 0d26cd6e48ae..f4b99413f306 100644 --- a/builder/service.go +++ b/builder/service.go @@ -8,7 +8,9 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/flashbotsextra" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/rpc" @@ -170,16 +172,27 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *BuilderConfig) error return errors.New("neither local nor remote relay specified") } - ethereumService := NewEthereumService(backend) - // TODO: move to proper flags - var ds IDatabaseService - ds, err = NewDatabaseService(os.Getenv("FLASHBOTS_POSTGRES_DSN")) - if err != nil { - log.Error("could not connect to the DB", "err", err) - ds = NilDbService{} + var ds flashbotsextra.IDatabaseService + dbDSN := os.Getenv("FLASHBOTS_POSTGRES_DSN") + if dbDSN != "" { + ds, err = flashbotsextra.NewDatabaseService(dbDSN) + if err != nil { + log.Error("could not connect to the DB", "err", err) + ds = flashbotsextra.NilDbService{} + } + } else { + log.Info("db dsn is not provided, starting nil db svc") + ds = flashbotsextra.NilDbService{} } + // Bundle fetcher + mevBundleCh := make(chan []types.MevBundle) + blockNumCh := make(chan int64) + bundleFetcher := flashbotsextra.NewBundleFetcher(backend, ds, blockNumCh, mevBundleCh, true) + go bundleFetcher.Run() + + ethereumService := NewEthereumService(backend) builderBackend := NewBuilder(builderSk, ds, beaconClient, relay, builderSigningDomain, ethereumService) builderService := NewService(cfg.ListenAddr, localRelay, builderBackend) diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 5dccb489c275..6cf443781230 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -619,6 +619,15 @@ func (pool *TxPool) MevBundles(blockNumber *big.Int, blockTimestamp uint64) ([]t return ret, nil } +// AddMevBundles adds a mev bundles to the pool +func (pool *TxPool) AddMevBundles(mevBundles []types.MevBundle) error { + pool.mu.Lock() + defer pool.mu.Unlock() + + pool.mevBundles = append(pool.mevBundles, mevBundles...) + return nil +} + // AddMevBundle adds a mev bundle to the pool func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, minTimestamp, maxTimestamp uint64, revertingTxHashes []common.Hash) error { bundleHasher := sha3.NewLegacyKeccak256() diff --git a/flashbotsextra/cmd/bundle_fetcher.go b/flashbotsextra/cmd/bundle_fetcher.go new file mode 100644 index 000000000000..ff6e78be9349 --- /dev/null +++ b/flashbotsextra/cmd/bundle_fetcher.go @@ -0,0 +1,37 @@ +package main + +import ( + "os" + "time" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/flashbotsextra" + "github.com/ethereum/go-ethereum/log" +) + +func main() { + // Test bundle fetcher + log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) + mevBundleCh := make(chan []types.MevBundle) + blockNumCh := make(chan int64) + db, err := flashbotsextra.NewDatabaseService("postgres://postgres:postgres@localhost:5432/test?sslmode=disable") + if err != nil { + panic(err) + } + bundleFetcher := flashbotsextra.NewBundleFetcher(nil, db, blockNumCh, mevBundleCh, false) + + go bundleFetcher.Run() + log.Info("waiting for mev bundles") + go func() { + blockNum := []int64{15232009, 15232008, 15232010} + for _, num := range blockNum { + <-time.After(time.Second) + blockNumCh <- num + } + }() + for bundles := range mevBundleCh { + for _, bundle := range bundles { + log.Info("bundle info", "blockNum", bundle.BlockNumber, "txsLength", len(bundle.Txs)) + } + } +} diff --git a/builder/database.go b/flashbotsextra/database.go similarity index 75% rename from builder/database.go rename to flashbotsextra/database.go index f4799208e5d3..cd840b067a26 100644 --- a/builder/database.go +++ b/flashbotsextra/database.go @@ -1,4 +1,4 @@ -package builder +package flashbotsextra import ( "context" @@ -13,14 +13,24 @@ import ( _ "github.com/lib/pq" ) +const ( + highPrioLimitSize = 500 + lowPrioLimitSize = 100 +) + type IDatabaseService interface { ConsumeBuiltBlock(block *types.Block, bundles []types.SimulatedBundle, bidTrace *boostTypes.BidTrace) + GetPriorityBundles(ctx context.Context, blockNum int64, isHighPrio bool) ([]DbBundle, error) } type NilDbService struct{} func (NilDbService) ConsumeBuiltBlock(*types.Block, []types.SimulatedBundle, *boostTypes.BidTrace) {} +func (NilDbService) GetPriorityBundles(ctx context.Context, blockNum int64, isHighPrio bool) ([]DbBundle, error) { + return []DbBundle{}, nil +} + type DatabaseService struct { db *sqlx.DB @@ -28,6 +38,7 @@ type DatabaseService struct { insertBlockBuiltBundleNoIdStmt *sqlx.NamedStmt insertBlockBuiltBundleWithIdStmt *sqlx.NamedStmt insertMissingBundleStmt *sqlx.NamedStmt + fetchPrioBundlesStmt *sqlx.NamedStmt } func NewDatabaseService(postgresDSN string) (*DatabaseService, error) { @@ -56,17 +67,26 @@ func NewDatabaseService(postgresDSN string) (*DatabaseService, error) { return nil, err } + fetchPrioBundlesStmt, err := db.PrepareNamed("select bundle_hash, param_signed_txs, param_block_number, param_timestamp, received_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase from bundles where is_high_prio = :is_high_prio and coinbase_diff*1e18/total_gas_used > 1000000000 and param_block_number = :param_block_number order by coinbase_diff/total_gas_used DESC limit :limit") + if err != nil { + return nil, err + } return &DatabaseService{ db: db, insertBuiltBlockStmt: insertBuiltBlockStmt, insertBlockBuiltBundleNoIdStmt: insertBlockBuiltBundleNoIdStmt, insertBlockBuiltBundleWithIdStmt: insertBlockBuiltBundleWithIdStmt, insertMissingBundleStmt: insertMissingBundleStmt, + fetchPrioBundlesStmt: fetchPrioBundlesStmt, }, nil } func (ds *DatabaseService) ConsumeBuiltBlock(block *types.Block, bundles []types.SimulatedBundle, bidTrace *boostTypes.BidTrace) { tx, err := ds.db.Beginx() + if err != nil { + log.Error("could not insert built block", "err", err) + return + } blockData := BuiltBlock{ BlockNumber: block.NumberU64(), @@ -134,3 +154,23 @@ func (ds *DatabaseService) ConsumeBuiltBlock(block *types.Block, bundles []types log.Error("could not commit DB trasnaction", "err", err) } } +func (ds *DatabaseService) GetPriorityBundles(ctx context.Context, blockNum int64, isHighPrio bool) ([]DbBundle, error) { + var bundles []DbBundle + tx, err := ds.db.Beginx() + if err != nil { + log.Error("failed to begin db tx for get priority bundles", "err", err) + return nil, err + } + arg := map[string]interface{}{"param_block_number": uint64(blockNum), "is_high_prio": isHighPrio, "limit": lowPrioLimitSize} + if isHighPrio { + arg["limit"] = highPrioLimitSize + } + if err = tx.NamedStmtContext(ctx, ds.fetchPrioBundlesStmt).SelectContext(ctx, &bundles, arg); err != nil { + return nil, err + } + err = tx.Commit() + if err != nil { + log.Error("could not commit GetPriorityBundles transaction", "err", err) + } + return bundles, nil +} diff --git a/builder/database_test.go b/flashbotsextra/database_test.go similarity index 99% rename from builder/database_test.go rename to flashbotsextra/database_test.go index 2ff2c7bb8a27..1d3a7cce3553 100644 --- a/builder/database_test.go +++ b/flashbotsextra/database_test.go @@ -1,4 +1,4 @@ -package builder +package flashbotsextra import ( "math/big" diff --git a/builder/database_types.go b/flashbotsextra/database_types.go similarity index 88% rename from builder/database_types.go rename to flashbotsextra/database_types.go index 96965616b028..58d28c8cf524 100644 --- a/builder/database_types.go +++ b/flashbotsextra/database_types.go @@ -1,4 +1,4 @@ -package builder +package flashbotsextra import ( "math/big" @@ -38,9 +38,9 @@ type DbBundle struct { ParamSignedTxs string `db:"param_signed_txs"` ParamBlockNumber uint64 `db:"param_block_number"` - ParamTimestamp uint64 `db:"param_timestamp"` + ParamTimestamp *uint64 `db:"param_timestamp"` ReceivedTimestamp time.Time `db:"received_timestamp"` - ParamRevertingTxHashes string `db:"param_reverting_tx_hashes"` + ParamRevertingTxHashes *string `db:"param_reverting_tx_hashes"` CoinbaseDiff string `db:"coinbase_diff"` TotalGasUsed uint64 `db:"total_gas_used"` @@ -54,7 +54,7 @@ func SimulatedBundleToDbBundle(bundle *types.SimulatedBundle) DbBundle { for i, rTxHash := range bundle.OriginalBundle.RevertingTxHashes { revertingTxHashes[i] = rTxHash.String() } - + paramRevertingTxHashes := strings.Join(revertingTxHashes, ",") signedTxsStrings := make([]string, len(bundle.OriginalBundle.Txs)) for i, tx := range bundle.OriginalBundle.Txs { signedTxsStrings[i] = tx.Hash().String() @@ -65,8 +65,8 @@ func SimulatedBundleToDbBundle(bundle *types.SimulatedBundle) DbBundle { ParamSignedTxs: strings.Join(signedTxsStrings, ","), ParamBlockNumber: bundle.OriginalBundle.BlockNumber.Uint64(), - ParamTimestamp: bundle.OriginalBundle.MinTimestamp, - ParamRevertingTxHashes: strings.Join(revertingTxHashes, ","), + ParamTimestamp: &bundle.OriginalBundle.MinTimestamp, + ParamRevertingTxHashes: ¶mRevertingTxHashes, CoinbaseDiff: new(big.Rat).SetFrac(bundle.TotalEth, big.NewInt(1e18)).FloatString(18), TotalGasUsed: bundle.TotalGasUsed, diff --git a/flashbotsextra/fetcher.go b/flashbotsextra/fetcher.go new file mode 100644 index 000000000000..14484e691779 --- /dev/null +++ b/flashbotsextra/fetcher.go @@ -0,0 +1,164 @@ +package flashbotsextra + +import ( + "context" + "errors" + "math/big" + "strings" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/log" + "golang.org/x/crypto/sha3" +) + +type Fetcher interface { + Run() error +} + +type bundleFetcher struct { + db IDatabaseService + backend *eth.Ethereum + blockNumCh chan int64 + bundlesCh chan []types.MevBundle + shouldPushToTxPool bool // Added for testing +} + +func NewBundleFetcher(backend *eth.Ethereum, db IDatabaseService, blockNumCh chan int64, bundlesCh chan []types.MevBundle, shouldPushToTxPool bool) *bundleFetcher { + return &bundleFetcher{ + db: db, + backend: backend, + blockNumCh: blockNumCh, + bundlesCh: bundlesCh, + shouldPushToTxPool: shouldPushToTxPool, + } +} + +func (b *bundleFetcher) Run() { + log.Info("Start bundle fetcher") + if b.shouldPushToTxPool { + eventCh := make(chan core.ChainHeadEvent) + b.backend.BlockChain().SubscribeChainHeadEvent(eventCh) + pushBlockNum := func() { + for blockNum := range eventCh { + b.blockNumCh <- blockNum.Block.Header().Number.Int64() + } + } + addMevBundle := func() { + log.Info("Start receiving mev bundles") + for bundles := range b.bundlesCh { + b.backend.TxPool().AddMevBundles(bundles) + } + } + go pushBlockNum() + go addMevBundle() + } + pushMevBundles := func(bundles []DbBundle) { + mevBundles := make([]types.MevBundle, 0) + for _, bundle := range bundles { + mevBundle, err := b.dbBundleToMevBundle(bundle) + if err != nil { + log.Error("failed to convert db bundle to mev bundle", "err", err) + continue + } + mevBundles = append(mevBundles, *mevBundle) + } + if len(mevBundles) > 0 { + b.bundlesCh <- mevBundles + } + } + go b.fetchAndPush(context.Background(), pushMevBundles) +} + +func (b *bundleFetcher) fetchAndPush(ctx context.Context, pushMevBundles func(bundles []DbBundle)) { + var blockNum int64 + lowPrioBundleTicker := time.NewTicker(time.Second * 2) + defer lowPrioBundleTicker.Stop() + + for { + select { + case blockNum = <-b.blockNumCh: + ctxH, cancelH := context.WithTimeout(ctx, time.Second*3) + bundles, err := b.db.GetPriorityBundles(ctxH, blockNum, true) + cancelH() + if err != nil { + log.Error("failed to fetch high prio bundles", "err", err) + continue + } + log.Info("Fetching High prio bundles", "size", len(bundles), "blockNum", blockNum) + if len(bundles) != 0 { + pushMevBundles(bundles) + } + + case <-lowPrioBundleTicker.C: + ctxL, cancelL := context.WithTimeout(ctx, time.Second*3) + bundles, err := b.db.GetPriorityBundles(ctxL, blockNum, false) + cancelL() + if err != nil { + log.Error("failed to fetch low prio bundles", "err", err) + continue + } + log.Info("Fetching low prio bundles", "len", len(bundles), "blockNum", blockNum) + if len(bundles) != 0 { + pushMevBundles(bundles) + } + case <-ctx.Done(): + close(b.bundlesCh) + return + } + } +} + +func (b *bundleFetcher) dbBundleToMevBundle(arg DbBundle) (*types.MevBundle, error) { + signedTxsStr := strings.Split(arg.ParamSignedTxs, ",") + if len(signedTxsStr) == 0 { + return nil, errors.New("bundle missing txs") + } + if arg.ParamBlockNumber == 0 { + return nil, errors.New("bundle missing blockNumber") + } + + var txs types.Transactions + for _, txStr := range signedTxsStr { + decodedTx, err := hexutil.Decode(txStr) + if err != nil { + log.Error("could not decode bundle tx", "id", arg.DbId, "err", err) + continue + } + tx := new(types.Transaction) + if err := tx.UnmarshalBinary(decodedTx); err != nil { + log.Error("could not unmarshal bundle decoded tx", "id", arg.DbId, "err", err) + continue + } + txs = append(txs, tx) + } + var paramRevertingTxHashes []string + if arg.ParamRevertingTxHashes != nil { + paramRevertingTxHashes = strings.Split(*arg.ParamRevertingTxHashes, ",") + } + revertingTxHashesStrings := paramRevertingTxHashes + revertingTxHashes := make([]common.Hash, len(revertingTxHashesStrings)) + for _, rTxHashStr := range revertingTxHashesStrings { + revertingTxHashes = append(revertingTxHashes, common.HexToHash(rTxHashStr)) + } + var minTimestamp uint64 + if arg.ParamTimestamp != nil { + minTimestamp = *arg.ParamTimestamp + } + bundleHasher := sha3.NewLegacyKeccak256() + for _, tx := range txs { + bundleHasher.Write(tx.Hash().Bytes()) + } + bundleHash := common.BytesToHash(bundleHasher.Sum(nil)) + return &types.MevBundle{ + Txs: txs, + BlockNumber: new(big.Int).SetUint64(arg.ParamBlockNumber), + MinTimestamp: minTimestamp, + RevertingTxHashes: revertingTxHashes, + Hash: bundleHash, + }, nil +} diff --git a/go.sum b/go.sum index 294b59dfa147..a0f0f95613d9 100644 --- a/go.sum +++ b/go.sum @@ -334,6 +334,7 @@ github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzp github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-tty v0.0.0-20180907095812-13ff1204f104/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= diff --git a/miner/worker.go b/miner/worker.go index 93e80e05d45f..c98af9f2479a 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1399,7 +1399,6 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment, validatorC log.Warn("Proposer payout create tx failed due to not enough balance", "profit", profit.String()) return errors.New("proposer payout create tx failed due to not enough balance"), nil } - } return nil, blockBundles