Skip to content

Commit

Permalink
eth/filters: fix getLogs for pending block (ethereum#24949)
Browse files Browse the repository at this point in the history
* eth/filters: fix pending for getLogs

* add pending method to test backend

* fix block range validation
  • Loading branch information
s1na authored and blakehhuynh committed Oct 7, 2022
1 parent 80db4fb commit 8d2c317
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 14 deletions.
19 changes: 13 additions & 6 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type SimulatedBackend struct {
stuckTransactions types.Transactions // holds onto all txes that don't go into the pending block due to low gas
pendingBlock *types.Block // Currently pending block that will be imported on request
pendingState *state.StateDB // Currently pending state that will be the active on request
pendingReceipts types.Receipts // Currently receipts for the pending block

events *filters.EventSystem // Event system for filtering log events live

Expand All @@ -87,8 +88,8 @@ func NewSimulatedBackendWithDatabase(database ethdb.Database, alloc core.Genesis
database: database,
blockchain: blockchain,
config: genesis.Config,
events: filters.NewEventSystem(&filterBackend{database, blockchain}, false),
}
backend.events = filters.NewEventSystem(&filterBackend{database, blockchain, backend}, false)
backend.rollback(blockchain.CurrentBlock())
return backend
}
Expand Down Expand Up @@ -690,7 +691,7 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa
}
var remainingTxes types.Transactions
// Include tx in chain
blocks, _ := core.GenerateChain(b.config, block, ethash.NewFaker(), b.database, 1, func(number int, block *core.BlockGen) {
blocks, receipts := core.GenerateChain(b.config, block, ethash.NewFaker(), b.database, 1, func(number int, block *core.BlockGen) {
for _, tx := range b.pendingBlock.Transactions() {
// if market gas price is not set or tx gas price is lower than the set market gas price
if b.marketGasPrice == nil || b.marketGasPrice.Cmp(tx.GasPrice()) <= 0 {
Expand All @@ -711,6 +712,7 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa
b.pendingBlock = blocks[0]
b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database(), nil)
b.stuckTransactions = remainingTxes
b.pendingReceipts = receipts[0]

return nil
}
Expand All @@ -723,7 +725,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter
var filter *filters.Filter
if query.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain}, *query.BlockHash, query.Addresses, query.Topics)
filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain, b}, *query.BlockHash, query.Addresses, query.Topics)
} else {
// Initialize unset filter boundaries to run from genesis to chain head
from := int64(0)
Expand All @@ -735,7 +737,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter
to = query.ToBlock.Int64()
}
// Construct the range filter
filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain}, from, to, query.Addresses, query.Topics)
filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain, b}, from, to, query.Addresses, query.Topics)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
Expand Down Expand Up @@ -856,8 +858,9 @@ func (m callMsg) AccessList() types.AccessList { return m.CallMsg.AccessList }
// filterBackend implements filters.Backend to support filtering for logs without
// taking bloom-bits acceleration structures into account.
type filterBackend struct {
db ethdb.Database
bc *core.BlockChain
db ethdb.Database
bc *core.BlockChain
backend *SimulatedBackend
}

func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db }
Expand All @@ -874,6 +877,10 @@ func (fb *filterBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*t
return fb.bc.GetHeaderByHash(hash), nil
}

func (fb *filterBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
return fb.backend.pendingBlock, fb.backend.pendingReceipts
}

func (fb *filterBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) {
number := rawdb.ReadHeaderNumber(fb.db, hash)
if number == nil {
Expand Down
46 changes: 38 additions & 8 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Backend interface {
HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error)
PendingBlockAndReceipts() (*types.Block, types.Receipts)

SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
Expand Down Expand Up @@ -128,26 +129,35 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
}
return f.blockLogs(ctx, header)
}
// Short-cut if all we care about is pending logs
if f.begin == rpc.PendingBlockNumber.Int64() {
if f.end != rpc.PendingBlockNumber.Int64() {
return nil, errors.New("invalid block range")
}
return f.pendingLogs()
}
// Figure out the limits of the filter range
header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
if header == nil {
return nil, nil
}
head := header.Number.Uint64()

if f.begin == -1 {
var (
head = header.Number.Uint64()
end = uint64(f.end)
pending = f.end == rpc.PendingBlockNumber.Int64()
)
if f.begin == rpc.LatestBlockNumber.Int64() {
f.begin = int64(head)
}
end := uint64(f.end)
if f.end == -1 {
if f.end == rpc.LatestBlockNumber.Int64() || f.end == rpc.PendingBlockNumber.Int64() {
end = head
}
// Gather all indexed logs, and finish with non indexed ones
var (
logs []*types.Log
err error
logs []*types.Log
err error
size, sections = f.backend.BloomStatus()
)
size, sections := f.backend.BloomStatus()
if indexed := sections * size; indexed > uint64(f.begin) {
if indexed > end {
logs, err = f.indexedLogs(ctx, end)
Expand All @@ -160,6 +170,13 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
}
rest, err := f.unindexedLogs(ctx, end)
logs = append(logs, rest...)
if pending {
pendingLogs, err := f.pendingLogs()
if err != nil {
return nil, err
}
logs = append(logs, pendingLogs...)
}
return logs, err
}

Expand Down Expand Up @@ -272,6 +289,19 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs [
return nil, nil
}

// pendingLogs returns the logs matching the filter criteria within the pending block.
func (f *Filter) pendingLogs() ([]*types.Log, error) {
block, receipts := f.backend.PendingBlockAndReceipts()
if bloomFilter(block.Bloom(), f.addresses, f.topics) {
var unfiltered []*types.Log
for _, r := range receipts {
unfiltered = append(unfiltered, r.Logs...)
}
return filterLogs(unfiltered, nil, nil, f.addresses, f.topics), nil
}
return nil, nil
}

func includes(addresses []common.Address, a common.Address) bool {
for _, addr := range addresses {
if addr == a {
Expand Down
4 changes: 4 additions & 0 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types
return logs, nil
}

func (b *testBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
return nil, nil
}

func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return b.txFeed.Subscribe(ch)
}
Expand Down
1 change: 1 addition & 0 deletions internal/ethapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Backend interface {
BlockByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Block, error)
StateAndHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*state.StateDB, *types.Header, error)
StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error)
PendingBlockAndReceipts() (*types.Block, types.Receipts)
GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error)
GetTd(ctx context.Context, hash common.Hash) *big.Int
GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header, vmConfig *vm.Config) (*vm.EVM, func() error, error)
Expand Down

0 comments on commit 8d2c317

Please sign in to comment.