Skip to content

Commit

Permalink
Address goroutine leak
Browse files Browse the repository at this point in the history
  • Loading branch information
cffls committed Sep 12, 2024
1 parent f45da7c commit d9daaca
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 46 deletions.
43 changes: 2 additions & 41 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/stateless"
"github.com/ethereum/go-ethereum/core/tracing"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
Expand Down Expand Up @@ -617,15 +616,7 @@ func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_
processorCount++

go func() {
// todo: @anshalshukla || @cffls - check witness collection and requirement in start prefetchers
var witness *stateless.Witness
if bc.vmConfig.EnableWitnessCollection {
witness, err = stateless.NewWitness(bc, block)
if err != nil {
return
}
}
parallelStatedb.StartPrefetcher("chain", witness)
parallelStatedb.StartPrefetcher("chain", nil)
receipts, logs, usedGas, err := bc.parallelProcessor.Process(block, parallelStatedb, bc.vmConfig, ctx)
resultChan <- Result{receipts, logs, usedGas, err, parallelStatedb, blockExecutionParallelCounter}
}()
Expand All @@ -641,16 +632,7 @@ func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_
processorCount++

go func() {
// todo: @anshalshukla || @cffls - check witness collection and requirement in start prefetchers
var witness *stateless.Witness
if bc.vmConfig.EnableWitnessCollection {
witness, err = stateless.NewWitness(bc, block)
if err != nil {
return
}
}
statedb.StartPrefetcher("chain", witness)

statedb.StartPrefetcher("chain", nil)
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig, ctx)
resultChan <- Result{receipts, logs, usedGas, err, statedb, blockExecutionSerialCounter}
}()
Expand Down Expand Up @@ -2319,27 +2301,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}

statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return it.index, err
}
statedb.SetLogger(bc.logger)

// If we are past Byzantium, enable prefetching to pull in trie node paths
// while processing transactions. Before Byzantium the prefetcher is mostly
// useless due to the intermediate root hashing after each transaction.
if bc.chainConfig.IsByzantium(block.Number()) {
var witness *stateless.Witness
if bc.vmConfig.EnableWitnessCollection {
witness, err = stateless.NewWitness(bc, block)
if err != nil {
return it.index, err
}
}
statedb.StartPrefetcher("chain", witness)
}
activeState = statedb

// If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
var followupInterrupt atomic.Bool
Expand Down
2 changes: 0 additions & 2 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,6 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
if task.shouldRerunWithoutFeeDelay {
shouldDelayFeeCal = false

statedb.StopPrefetcher()

// nolint
*statedb = *backupStateDB

Expand Down
4 changes: 4 additions & 0 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,10 @@ func (s *StateDB) Copy() *StateDB {
state.accessList = s.accessList.Copy()
state.transientStorage = s.transientStorage.Copy()

if s.prefetcher != nil {
state.prefetcher = s.prefetcher
}

if s.mvHashmap != nil {
state.mvHashmap = s.mvHashmap
}
Expand Down
17 changes: 17 additions & 0 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type triePrefetcher struct {
storageDupWriteMeter metrics.Meter
storageDupCrossMeter metrics.Meter
storageWasteMeter metrics.Meter

lock sync.RWMutex // Use RWMutex for better read/write locking
}

func newTriePrefetcher(db Database, root common.Hash, namespace string, noreads bool) *triePrefetcher {
Expand Down Expand Up @@ -94,6 +96,9 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string, noreads
// to all of them. Depending on the async parameter, the method will either block
// until all subfetchers spin down, or return immediately.
func (p *triePrefetcher) terminate(async bool) {
p.lock.Lock() // Lock for writing
defer p.lock.Unlock() // Ensure the lock is released after the function

// Short circuit if the fetcher is already closed
select {
case <-p.term:
Expand All @@ -109,6 +114,9 @@ func (p *triePrefetcher) terminate(async bool) {

// report aggregates the pre-fetching and usage metrics and reports them.
func (p *triePrefetcher) report() {
p.lock.RLock() // Lock for reading
defer p.lock.RUnlock() // Ensure the lock is released after the function

if !metrics.Enabled {
return
}
Expand Down Expand Up @@ -157,6 +165,9 @@ func (p *triePrefetcher) report() {
// repeated.
// 2. Finalize of the main account trie. This happens only once per block.
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte, read bool) error {
p.lock.Lock() // Lock for writing
defer p.lock.Unlock() // Ensure the lock is released after the function

// If the state item is only being read, but reads are disabled, return
if read && p.noreads {
return nil
Expand All @@ -181,6 +192,9 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm
// the given trie terminates. If no fetcher exists for the request, nil will be
// returned.
func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie {
p.lock.RLock() // Lock for reading
defer p.lock.RUnlock() // Ensure the lock is released after the function

// Bail if no trie was prefetched for this root
fetcher := p.fetchers[p.trieID(owner, root)]
if fetcher == nil {
Expand All @@ -195,6 +209,9 @@ func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie {
// used marks a batch of state items used to allow creating statistics as to
// how useful or wasteful the fetcher is.
func (p *triePrefetcher) used(owner common.Hash, root common.Hash, used [][]byte) {
p.lock.Lock() // Lock for writing
defer p.lock.Unlock() // Ensure the lock is released after the function

if fetcher := p.fetchers[p.trieID(owner, root)]; fetcher != nil {
fetcher.wait() // ensure the fetcher's idle before poking in its internals
fetcher.used = used
Expand Down
2 changes: 0 additions & 2 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,6 @@ func (h *handler) Start(maxPeers int) {

func (h *handler) Stop() {
h.txsSub.Unsubscribe() // quits txBroadcastLoop
h.txFetcher.Stop()
h.downloader.Terminate()
h.minedBlockSub.Unsubscribe()

// Quit chainSync and txsync64.
Expand Down
2 changes: 1 addition & 1 deletion eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (cs *chainSyncer) loop() {
cs.handler.txFetcher.Start()

defer cs.handler.blockFetcher.Stop()
// defer cs.handler.txFetcher.Stop()
defer cs.handler.txFetcher.Stop()
defer cs.handler.downloader.Terminate()

// The force timer lowers the peer count threshold down to one when it fires.
Expand Down

0 comments on commit d9daaca

Please sign in to comment.