Skip to content

Commit

Permalink
eth/filters api remove gopool
Browse files Browse the repository at this point in the history
  • Loading branch information
0xcb9ff9 committed Sep 7, 2023
1 parent 79d237b commit 47724b4
Showing 1 changed file with 13 additions and 14 deletions.
27 changes: 13 additions & 14 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/gopool"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
Expand Down Expand Up @@ -115,7 +114,7 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub}
api.filtersMu.Unlock()

gopool.Submit(func() {
go func() {
for {
select {
case ph := <-pendingTxs:
Expand All @@ -131,7 +130,7 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
return
}
}
})
}()

return pendingTxSub.ID
}
Expand All @@ -146,7 +145,7 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su

rpcSub := notifier.CreateSubscription()

gopool.Submit(func() {
go func() {
txHashes := make(chan []common.Hash, 128)
pendingTxSub := api.events.SubscribePendingTxs(txHashes)

Expand All @@ -166,7 +165,7 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su
return
}
}
})
}()

return rpcSub, nil
}
Expand All @@ -185,7 +184,7 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: headerSub}
api.filtersMu.Unlock()

gopool.Submit(func() {
go func() {
for {
select {
case h := <-headers:
Expand All @@ -201,7 +200,7 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
return
}
}
})
}()

return headerSub.ID
}
Expand All @@ -215,7 +214,7 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er

rpcSub := notifier.CreateSubscription()

gopool.Submit(func() {
go func() {
headers := make(chan *types.Header)
headersSub := api.events.SubscribeNewHeads(headers)

Expand All @@ -231,7 +230,7 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er
return
}
}
})
}()

return rpcSub, nil
}
Expand All @@ -253,12 +252,12 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc
return nil, err
}

gopool.Submit(func() {

go func() {
for {
select {
case logs := <-matchedLogs:
for _, log := range logs {
log := log
notifier.Notify(rpcSub.ID, &log)
}
case <-rpcSub.Err(): // client send an unsubscribe request
Expand All @@ -269,7 +268,7 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc
return
}
}
})
}()

return rpcSub, nil
}
Expand Down Expand Up @@ -302,7 +301,7 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(api.timeout), logs: make([]*types.Log, 0), s: logsSub}
api.filtersMu.Unlock()

gopool.Submit(func() {
go func() {
for {
select {
case l := <-logs:
Expand All @@ -318,7 +317,7 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
return
}
}
})
}()

return logsSub.ID, nil
}
Expand Down

0 comments on commit 47724b4

Please sign in to comment.