From 3c3fb8f3838d5462cd3e31a7bf3bac578e6e9bef Mon Sep 17 00:00:00 2001 From: Sina Mahmoodi <1591639+s1na@users.noreply.github.com> Date: Tue, 7 Jun 2022 08:31:19 +0200 Subject: [PATCH] eth/filters: fix getLogs for pending block (#24949) * eth/filters: fix pending for getLogs * add pending method to test backend * fix block range validation --- accounts/abi/bind/backends/simulated.go | 28 ++++++++------- eth/filters/filter.go | 46 ++++++++++++++++++++----- eth/filters/filter_system_test.go | 4 +++ internal/ethapi/backend.go | 1 + 4 files changed, 59 insertions(+), 20 deletions(-) diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index 48415802b36f..85efcf30158c 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -63,10 +63,10 @@ type SimulatedBackend struct { database ethdb.Database // In memory database to store our testing data blockchain *core.BlockChain // Ethereum blockchain to handle the consensus - mu sync.Mutex - stuckTransactions types.Transactions // holds onto all txes that don't go into the pending block due to low gas - pendingBlock *types.Block // Currently pending block that will be imported on request - pendingState *state.StateDB // Currently pending state that will be the active on request + mu sync.Mutex + pendingBlock *types.Block // Currently pending block that will be imported on request + pendingState *state.StateDB // Currently pending state that will be the active on request + pendingReceipts types.Receipts // Currently receipts for the pending block events *filters.EventSystem // Event system for filtering log events live @@ -87,8 +87,8 @@ func NewSimulatedBackendWithDatabase(database ethdb.Database, alloc core.Genesis database: database, blockchain: blockchain, config: genesis.Config, - events: filters.NewEventSystem(&filterBackend{database, blockchain}, false), } + backend.events = filters.NewEventSystem(&filterBackend{database, blockchain, backend}, false) backend.rollback(blockchain.CurrentBlock()) return backend } @@ -690,7 +690,7 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa } var remainingTxes types.Transactions // Include tx in chain - blocks, _ := core.GenerateChain(b.config, block, ethash.NewFaker(), b.database, 1, func(number int, block *core.BlockGen) { + blocks, receipts := core.GenerateChain(b.config, block, ethash.NewFaker(), b.database, 1, func(number int, block *core.BlockGen) { for _, tx := range b.pendingBlock.Transactions() { // if market gas price is not set or tx gas price is lower than the set market gas price if b.marketGasPrice == nil || b.marketGasPrice.Cmp(tx.GasPrice()) <= 0 { @@ -710,8 +710,7 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa b.pendingBlock = blocks[0] b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database(), nil) - b.stuckTransactions = remainingTxes - + b.pendingReceipts = receipts[0] return nil } @@ -723,7 +722,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter var filter *filters.Filter if query.BlockHash != nil { // Block filter requested, construct a single-shot filter - filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain}, *query.BlockHash, query.Addresses, query.Topics) + filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain, b}, *query.BlockHash, query.Addresses, query.Topics) } else { // Initialize unset filter boundaries to run from genesis to chain head from := int64(0) @@ -735,7 +734,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter to = query.ToBlock.Int64() } // Construct the range filter - filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain}, from, to, query.Addresses, query.Topics) + filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain, b}, from, to, query.Addresses, query.Topics) } // Run the filter and return all the logs logs, err := filter.Logs(ctx) @@ -856,8 +855,9 @@ func (m callMsg) AccessList() types.AccessList { return m.CallMsg.AccessList } // filterBackend implements filters.Backend to support filtering for logs without // taking bloom-bits acceleration structures into account. type filterBackend struct { - db ethdb.Database - bc *core.BlockChain + db ethdb.Database + bc *core.BlockChain + backend *SimulatedBackend } func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db } @@ -874,6 +874,10 @@ func (fb *filterBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*t return fb.bc.GetHeaderByHash(hash), nil } +func (fb *filterBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) { + return fb.backend.pendingBlock, fb.backend.pendingReceipts +} + func (fb *filterBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) { number := rawdb.ReadHeaderNumber(fb.db, hash) if number == nil { diff --git a/eth/filters/filter.go b/eth/filters/filter.go index f64e84abb86c..9ff7ab7f55e1 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -36,6 +36,7 @@ type Backend interface { HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) + PendingBlockAndReceipts() (*types.Block, types.Receipts) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription @@ -128,26 +129,35 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { } return f.blockLogs(ctx, header) } + // Short-cut if all we care about is pending logs + if f.begin == rpc.PendingBlockNumber.Int64() { + if f.end != rpc.PendingBlockNumber.Int64() { + return nil, errors.New("invalid block range") + } + return f.pendingLogs() + } // Figure out the limits of the filter range header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) if header == nil { return nil, nil } - head := header.Number.Uint64() - - if f.begin == -1 { + var ( + head = header.Number.Uint64() + end = uint64(f.end) + pending = f.end == rpc.PendingBlockNumber.Int64() + ) + if f.begin == rpc.LatestBlockNumber.Int64() { f.begin = int64(head) } - end := uint64(f.end) - if f.end == -1 { + if f.end == rpc.LatestBlockNumber.Int64() || f.end == rpc.PendingBlockNumber.Int64() { end = head } // Gather all indexed logs, and finish with non indexed ones var ( - logs []*types.Log - err error + logs []*types.Log + err error + size, sections = f.backend.BloomStatus() ) - size, sections := f.backend.BloomStatus() if indexed := sections * size; indexed > uint64(f.begin) { if indexed > end { logs, err = f.indexedLogs(ctx, end) @@ -160,6 +170,13 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { } rest, err := f.unindexedLogs(ctx, end) logs = append(logs, rest...) + if pending { + pendingLogs, err := f.pendingLogs() + if err != nil { + return nil, err + } + logs = append(logs, pendingLogs...) + } return logs, err } @@ -272,6 +289,19 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs [ return nil, nil } +// pendingLogs returns the logs matching the filter criteria within the pending block. +func (f *Filter) pendingLogs() ([]*types.Log, error) { + block, receipts := f.backend.PendingBlockAndReceipts() + if bloomFilter(block.Bloom(), f.addresses, f.topics) { + var unfiltered []*types.Log + for _, r := range receipts { + unfiltered = append(unfiltered, r.Logs...) + } + return filterLogs(unfiltered, nil, nil, f.addresses, f.topics), nil + } + return nil, nil +} + func includes(addresses []common.Address, a common.Address) bool { for _, addr := range addresses { if addr == a { diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 8d145d960432..82d626ef127e 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -106,6 +106,10 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types return logs, nil } +func (b *testBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) { + return nil, nil +} + func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { return b.txFeed.Subscribe(ch) } diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index bc60fb2a64f6..c11d842df7d5 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -65,6 +65,7 @@ type Backend interface { BlockByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Block, error) StateAndHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*state.StateDB, *types.Header, error) StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error) + PendingBlockAndReceipts() (*types.Block, types.Receipts) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) GetTd(ctx context.Context, hash common.Hash) *big.Int GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header, vmConfig *vm.Config) (*vm.EVM, func() error, error)