Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

common/pool: fix pool abuse #27

Merged
merged 4 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions common/gopool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ package gopool

import (
"runtime"
"time"

"github.com/panjf2000/ants/v2"
)

var (
// Init a instance pool when importing ants.
defaultPool, _ = ants.NewPool(ants.DefaultAntsPoolSize, ants.WithExpiryDuration(10*time.Second))
defaultPool, _ = ants.NewPool(ants.DefaultAntsPoolSize, ants.WithDisablePurge(true))
minNumberPerTask = 5
)

Expand Down
6 changes: 2 additions & 4 deletions core/asm/lexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"strings"
"unicode"
"unicode/utf8"

"github.com/ethereum/go-ethereum/common/gopool"
)

// stateFn is used through the lifetime of the
Expand Down Expand Up @@ -105,14 +103,14 @@ func Lex(source []byte, debug bool) <-chan token {
state: lexLine,
debug: debug,
}
gopool.Submit(func() {
go func() {
l.emit(lineStart)
for l.state != nil {
l.state = l.state(l)
}
l.emit(eof)
close(l.tokens)
})
}()

return ch
}
Expand Down
27 changes: 12 additions & 15 deletions core/bloombits/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"time"

"github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/common/gopool"
"github.com/ethereum/go-ethereum/crypto"
)

Expand Down Expand Up @@ -84,7 +83,7 @@ type Matcher struct {
retrievals chan chan *Retrieval // Retriever processes waiting for task allocations
deliveries chan *Retrieval // Retriever processes waiting for task response deliveries

running uint32 // Atomic flag whether a session is live or not
running atomic.Bool // Atomic flag whether a session is live or not
}

// NewMatcher creates a new pipeline for retrieving bloom bit streams and doing
Expand Down Expand Up @@ -147,10 +146,10 @@ func (m *Matcher) addScheduler(idx uint) {
// channel is closed.
func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) {
// Make sure we're not creating concurrent sessions
if atomic.SwapUint32(&m.running, 1) == 1 {
if m.running.Swap(true) {
return nil, errors.New("matcher already running")
}
defer atomic.StoreUint32(&m.running, 0)
defer m.running.Store(false)

// Initiate a new matching round
session := &MatcherSession{
Expand All @@ -165,7 +164,7 @@ func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uin

// Read the output from the result sink and deliver to the user
session.pend.Add(1)
gopool.Submit(func() {
go func() {
defer session.pend.Done()
defer close(results)

Expand Down Expand Up @@ -211,7 +210,7 @@ func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uin
}
}
}
})
}()
return session, nil
}

Expand All @@ -227,7 +226,7 @@ func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) ch
source := make(chan *partialMatches, buffer)

session.pend.Add(1)
gopool.Submit(func() {
go func() {
defer session.pend.Done()
defer close(source)

Expand All @@ -238,7 +237,7 @@ func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) ch
case source <- &partialMatches{i, bytes.Repeat([]byte{0xff}, int(m.sectionSize/8))}:
}
}
})
}()
// Assemble the daisy-chained filtering pipeline
next := source
dist := make(chan *request, buffer)
Expand All @@ -248,9 +247,7 @@ func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) ch
}
// Start the request distribution
session.pend.Add(1)
gopool.Submit(func() {
m.distributor(dist, session)
})
go m.distributor(dist, session)

return next
}
Expand All @@ -276,7 +273,7 @@ func (m *Matcher) subMatch(source chan *partialMatches, dist chan *request, bloo
results := make(chan *partialMatches, cap(source))

session.pend.Add(2)
gopool.Submit(func() {
go func() {
// Tear down the goroutine and terminate all source channels
defer session.pend.Done()
defer close(process)
Expand Down Expand Up @@ -317,9 +314,9 @@ func (m *Matcher) subMatch(source chan *partialMatches, dist chan *request, bloo
}
}
}
})
}()

gopool.Submit(func() {
go func() {
// Tear down the goroutine and terminate the final sink channel
defer session.pend.Done()
defer close(results)
Expand Down Expand Up @@ -375,7 +372,7 @@ func (m *Matcher) subMatch(source chan *partialMatches, dist chan *request, bloo
}
}
}
})
}()
return results
}

Expand Down
10 changes: 2 additions & 8 deletions core/bloombits/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package bloombits

import (
"sync"

"github.com/ethereum/go-ethereum/common/gopool"
)

// request represents a bloom retrieval task to prioritize and pull from the local
Expand Down Expand Up @@ -65,12 +63,8 @@ func (s *scheduler) run(sections chan uint64, dist chan *request, done chan []by

// Start the pipeline schedulers to forward between user -> distributor -> user
wg.Add(2)
gopool.Submit(func() {
s.scheduleRequests(sections, dist, pend, quit, wg)
})
gopool.Submit(func() {
s.scheduleDeliveries(pend, done, quit, wg)
})
go s.scheduleRequests(sections, dist, pend, quit, wg)
go s.scheduleDeliveries(pend, done, quit, wg)
}

// reset cleans up any leftovers from previous runs. This is required before a
Expand Down
5 changes: 1 addition & 4 deletions core/rawdb/chain_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/gopool"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
Expand Down Expand Up @@ -169,9 +168,7 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool
}
go lookup() // start the sequential db accessor
for i := 0; i < int(threads); i++ {
gopool.Submit(func() {
process()
})
go process()
}
return hashesCh
}
Expand Down
7 changes: 2 additions & 5 deletions core/state/snapshot/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
"sync"
"time"

"github.com/ethereum/go-ethereum/common/gopool"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
Expand Down Expand Up @@ -317,8 +315,7 @@ func generateTrieRoot(db ethdb.KeyValueWriter, it Iterator, account common.Hash,
if err != nil {
return stop(err)
}
hash := it.Hash()
gopool.Submit(func() {
go func(hash common.Hash) {
subroot, err := leafCallback(db, hash, common.BytesToHash(account.CodeHash), stats)
if err != nil {
results <- err
Expand All @@ -329,7 +326,7 @@ func generateTrieRoot(db ethdb.KeyValueWriter, it Iterator, account common.Hash,
return
}
results <- nil
})
}(it.Hash())
fullData, err = rlp.EncodeToBytes(account)
if err != nil {
return stop(err)
Expand Down
5 changes: 2 additions & 3 deletions eth/bloombits.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"time"

"github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/common/gopool"
"github.com/ethereum/go-ethereum/core/rawdb"
)

Expand All @@ -46,7 +45,7 @@ const (
// retrievals from possibly a range of filters and serving the data to satisfy.
func (eth *Ethereum) startBloomHandlers(sectionSize uint64) {
for i := 0; i < bloomServiceThreads; i++ {
gopool.Submit(func() {
go func() {
for {
select {
case <-eth.closeBloomHandler:
Expand All @@ -70,6 +69,6 @@ func (eth *Ethereum) startBloomHandlers(sectionSize uint64) {
request <- task
}
}
})
}()
}
}
5 changes: 2 additions & 3 deletions eth/downloader/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sync"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common/gopool"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
)
Expand Down Expand Up @@ -99,7 +98,7 @@ func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription,

rpcSub := notifier.CreateSubscription()

gopool.Submit(func() {
go func() {
statuses := make(chan interface{})
sub := api.SubscribeSyncStatus(statuses)

Expand All @@ -115,7 +114,7 @@ func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription,
return
}
}
})
}()

return rpcSub, nil
}
Expand Down
5 changes: 2 additions & 3 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

mapset "github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/gopool"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -795,14 +794,14 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
f.requests[peer] = &txRequest{hashes: hashes, time: f.clock.Now()}
txRequestOutMeter.Mark(int64(len(hashes)))
p := peer
gopool.Submit(func() {
go func() {
// Try to fetch the transactions, but in case of a request
// failure (e.g. peer disconnected), reschedule the hashes.
if err := f.fetchTxs(p, hashes); err != nil {
txRequestFailMeter.Mark(int64(len(hashes)))
f.Drop(p)
}
})
}()
}
})
// If a new request was fired, schedule a timeout timer
Expand Down
Loading