From eacf53db36719a4b347c5d935e0c9bdc2ec16703 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 6 Dec 2022 21:17:24 +0100 Subject: [PATCH 1/5] eth/filters, eth/tracers: add request cancellation checks This ensures that RPC method handlers will react to a timeout or cancelled request soon after the event occurs. --- eth/filters/filter.go | 3 +++ eth/tracers/api.go | 12 +++++++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 26e85a6f1a42..611fc8630144 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -234,6 +234,9 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e var logs []*types.Log for ; f.begin <= int64(end); f.begin++ { + if f.begin%10 == 0 && ctx.Err() != nil { + return logs, ctx.Err() + } header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin)) if header == nil || err != nil { return logs, err diff --git a/eth/tracers/api.go b/eth/tracers/api.go index a9b51c507807..d6c892eac70b 100644 --- a/eth/tracers/api.go +++ b/eth/tracers/api.go @@ -547,6 +547,9 @@ func (api *API) IntermediateRoots(ctx context.Context, hash common.Hash, config deleteEmptyObjects = chainConfig.IsEIP158(block.Number()) ) for i, tx := range block.Transactions() { + if err := ctx.Err(); err != nil { + return nil, err + } var ( msg, _ = tx.AsMessage(signer, block.BaseFee()) txContext = core.NewEVMTxContext(msg) @@ -638,12 +641,19 @@ func (api *API) traceBlock(ctx context.Context, block *types.Block, config *Trac } }() } + // Feed the transactions into the tracers and return var failed error blockCtx := core.NewEVMBlockContext(block.Header(), api.chainContext(ctx), nil) for i, tx := range txs { // Send the trace task over for execution - jobs <- &txTraceTask{statedb: statedb.Copy(), index: i} + task := &txTraceTask{statedb: statedb.Copy(), index: i} + select { + case jobs <- task: + case <-ctx.Done(): + failed = ctx.Err() + break + } // Generate the next state snapshot fast without tracing msg, _ := tx.AsMessage(signer, block.BaseFee()) From 455f2f4b25704fab24a3d459e3eb45ca66e27274 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 7 Dec 2022 15:56:55 +0100 Subject: [PATCH 2/5] Update api.go --- eth/tracers/api.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/eth/tracers/api.go b/eth/tracers/api.go index d6c892eac70b..9b04f186ebeb 100644 --- a/eth/tracers/api.go +++ b/eth/tracers/api.go @@ -645,6 +645,7 @@ func (api *API) traceBlock(ctx context.Context, block *types.Block, config *Trac // Feed the transactions into the tracers and return var failed error blockCtx := core.NewEVMBlockContext(block.Header(), api.chainContext(ctx), nil) +txloop: for i, tx := range txs { // Send the trace task over for execution task := &txTraceTask{statedb: statedb.Copy(), index: i} @@ -652,7 +653,7 @@ func (api *API) traceBlock(ctx context.Context, block *types.Block, config *Trac case jobs <- task: case <-ctx.Done(): failed = ctx.Err() - break + break txloop } // Generate the next state snapshot fast without tracing @@ -661,7 +662,7 @@ func (api *API) traceBlock(ctx context.Context, block *types.Block, config *Trac vmenv := vm.NewEVM(blockCtx, core.NewEVMTxContext(msg), statedb, api.backend.ChainConfig(), vm.Config{}) if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.Gas())); err != nil { failed = err - break + break txloop } // Finalize the state so any modifications are written to the trie // Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect From 2b87faa2a9e01a6b5684b1d1a84c5372f2630078 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 7 Dec 2022 19:40:17 +0100 Subject: [PATCH 3/5] eth/tracers: reduce block tracing channel buffer --- eth/tracers/api.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/eth/tracers/api.go b/eth/tracers/api.go index 9b04f186ebeb..adc406418baa 100644 --- a/eth/tracers/api.go +++ b/eth/tracers/api.go @@ -610,14 +610,13 @@ func (api *API) traceBlock(ctx context.Context, block *types.Block, config *Trac signer = types.MakeSigner(api.backend.ChainConfig(), block.Number()) txs = block.Transactions() results = make([]*txTraceResult, len(txs)) - - pend = new(sync.WaitGroup) - jobs = make(chan *txTraceTask, len(txs)) + pend sync.WaitGroup ) threads := runtime.NumCPU() if threads > len(txs) { threads = len(txs) } + jobs := make(chan *txTraceTask, len(txs)) blockHash := block.Hash() for th := 0; th < threads; th++ { pend.Add(1) @@ -650,10 +649,10 @@ txloop: // Send the trace task over for execution task := &txTraceTask{statedb: statedb.Copy(), index: i} select { - case jobs <- task: case <-ctx.Done(): failed = ctx.Err() break txloop + case jobs <- task: } // Generate the next state snapshot fast without tracing @@ -668,6 +667,7 @@ txloop: // Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number())) } + close(jobs) pend.Wait() From 125954de4caa361d4b118236ab49e42f74c2cadf Mon Sep 17 00:00:00 2001 From: Sina Mahmoodi Date: Fri, 9 Dec 2022 10:51:25 +0100 Subject: [PATCH 4/5] fix jobs chan length --- eth/tracers/api.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/tracers/api.go b/eth/tracers/api.go index adc406418baa..db635239cca4 100644 --- a/eth/tracers/api.go +++ b/eth/tracers/api.go @@ -616,7 +616,7 @@ func (api *API) traceBlock(ctx context.Context, block *types.Block, config *Trac if threads > len(txs) { threads = len(txs) } - jobs := make(chan *txTraceTask, len(txs)) + jobs := make(chan *txTraceTask, threads) blockHash := block.Hash() for th := 0; th < threads; th++ { pend.Add(1) From 54a5986987a143bfa1a9bde67072a34a92b9c50f Mon Sep 17 00:00:00 2001 From: Sina Mahmoodi Date: Thu, 15 Dec 2022 13:02:17 +0100 Subject: [PATCH 5/5] eth: add cancel checks to StateAt* methods --- eth/api.go | 4 ++-- eth/api_backend.go | 4 ++-- eth/state_accessor.go | 13 ++++++++++--- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/eth/api.go b/eth/api.go index e480dde8f64f..0cb5c1fdc40a 100644 --- a/eth/api.go +++ b/eth/api.go @@ -405,13 +405,13 @@ type storageEntry struct { } // StorageRangeAt returns the storage at the given block height and transaction index. -func (api *DebugAPI) StorageRangeAt(blockHash common.Hash, txIndex int, contractAddress common.Address, keyStart hexutil.Bytes, maxResult int) (StorageRangeResult, error) { +func (api *DebugAPI) StorageRangeAt(ctx context.Context, blockHash common.Hash, txIndex int, contractAddress common.Address, keyStart hexutil.Bytes, maxResult int) (StorageRangeResult, error) { // Retrieve the block block := api.eth.blockchain.GetBlockByHash(blockHash) if block == nil { return StorageRangeResult{}, fmt.Errorf("block %#x not found", blockHash) } - _, _, statedb, release, err := api.eth.stateAtTransaction(block, txIndex, 0) + _, _, statedb, release, err := api.eth.stateAtTransaction(ctx, block, txIndex, 0) if err != nil { return StorageRangeResult{}, err } diff --git a/eth/api_backend.go b/eth/api_backend.go index fad88e801865..3055bae75efa 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -365,9 +365,9 @@ func (b *EthAPIBackend) StartMining(threads int) error { } func (b *EthAPIBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, tracers.StateReleaseFunc, error) { - return b.eth.StateAtBlock(block, reexec, base, readOnly, preferDisk) + return b.eth.StateAtBlock(ctx, block, reexec, base, readOnly, preferDisk) } func (b *EthAPIBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) { - return b.eth.stateAtTransaction(block, txIndex, reexec) + return b.eth.stateAtTransaction(ctx, block, txIndex, reexec) } diff --git a/eth/state_accessor.go b/eth/state_accessor.go index 778f88ab35d1..3bb1464952a0 100644 --- a/eth/state_accessor.go +++ b/eth/state_accessor.go @@ -17,6 +17,7 @@ package eth import ( + "context" "errors" "fmt" "time" @@ -56,7 +57,7 @@ var noopReleaser = tracers.StateReleaseFunc(func() {}) // - preferDisk: this arg can be used by the caller to signal that even though the 'base' is // provided, it would be preferable to start from a fresh state, if we have it // on disk. -func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (statedb *state.StateDB, release tracers.StateReleaseFunc, err error) { +func (eth *Ethereum) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (statedb *state.StateDB, release tracers.StateReleaseFunc, err error) { var ( current *types.Block database state.Database @@ -111,6 +112,9 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state } // Database does not have the state for the given block, try to regenerate for i := uint64(0); i < reexec; i++ { + if err := ctx.Err(); err != nil { + return nil, nil, err + } if current.NumberU64() == 0 { return nil, nil, errors.New("genesis state is missing") } @@ -142,6 +146,9 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state parent common.Hash ) for current.NumberU64() < origin { + if err := ctx.Err(); err != nil { + return nil, nil, err + } // Print progress logs if long enough time elapsed if time.Since(logged) > 8*time.Second && report { log.Info("Regenerating historical state", "block", current.NumberU64()+1, "target", origin, "remaining", origin-current.NumberU64()-1, "elapsed", time.Since(start)) @@ -182,7 +189,7 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state } // stateAtTransaction returns the execution environment of a certain transaction. -func (eth *Ethereum) stateAtTransaction(block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) { +func (eth *Ethereum) stateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) { // Short circuit if it's genesis block. if block.NumberU64() == 0 { return nil, vm.BlockContext{}, nil, nil, errors.New("no transaction in genesis") @@ -194,7 +201,7 @@ func (eth *Ethereum) stateAtTransaction(block *types.Block, txIndex int, reexec } // Lookup the statedb of parent block from the live database, // otherwise regenerate it on the flight. - statedb, release, err := eth.StateAtBlock(parent, reexec, nil, true, false) + statedb, release, err := eth.StateAtBlock(ctx, parent, reexec, nil, true, false) if err != nil { return nil, vm.BlockContext{}, nil, nil, err }