Skip to content
This repository has been archived by the owner on Sep 23, 2023. It is now read-only.

Commit

Permalink
txpool: sync.Mutex instead of RWMutex (#735)
Browse files Browse the repository at this point in the history
[simplelru](https://github.com/hashicorp/golang-lru/tree/master/simplelru)
is not thread safe. During the `Get` operation, the recentness of the
accessed item is updated, so it is not a pure read-operation. Therefore,
the mutex we need to protect the LRUs in txpool is a full mutex, not
`RLock`. See
erigontech/erigon#4679 (comment)
and ethereum/go-ethereum#26164.

Also, RWMutex has a performance overhead compared with the vanilla one
(see, for example, golang/go#38813).

Kudos to Martin Swende for pointing to the issue.
  • Loading branch information
yperbasis committed Nov 14, 2022
1 parent b810bb6 commit 10e8428
Showing 1 changed file with 29 additions and 29 deletions.
58 changes: 29 additions & 29 deletions txpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,14 @@ func calcProtocolBaseFee(baseFee uint64) uint64 {
type TxPool struct {
_chainDB kv.RoDB // remote db - use it wisely
_stateCache kvcache.Cache
lock *sync.RWMutex
lock *sync.Mutex
recentlyConnectedPeers *recentlyConnectedPeers // all txs will be propagated to this peers eventually, and clear list
senders *sendersBatch
// batch processing of remote transactions
// handling works fast without batching, but batching allow:
// - reduce amount of _chainDB transactions
// - batch notifications about new txs (reduce P2P spam to other nodes about txs propagation)
// - and as a result reducing pool.RWLock contention
// - and as a result reducing lock contention
unprocessedRemoteTxs *types.TxSlots
unprocessedRemoteByHash map[string]int // to reject duplicates
byHash map[string]*metaTx // tx_hash => tx : only not committed to db yet records
Expand Down Expand Up @@ -337,7 +337,7 @@ func New(newTxs chan types.Hashes, coreDB kv.RoDB, cfg Config, cache kvcache.Cac
tracedSenders[sender] = struct{}{}
}
return &TxPool{
lock: &sync.RWMutex{},
lock: &sync.Mutex{},
byHash: map[string]*metaTx{},
isLocalLRU: localsHistory,
discardReasonsLRU: discardHistory,
Expand Down Expand Up @@ -539,14 +539,14 @@ func (p *TxPool) getRlpLocked(tx kv.Tx, hash []byte) (rlpTxn []byte, sender []by
return v[20:], v[:20], txn != nil && txn.subPool&IsLocal > 0, nil
}
func (p *TxPool) GetRlp(tx kv.Tx, hash []byte) ([]byte, error) {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()
rlpTx, _, _, err := p.getRlpLocked(tx, hash)
return common.Copy(rlpTx), err
}
func (p *TxPool) AppendLocalHashes(buf []byte) []byte {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()
for hash, txn := range p.byHash {
if txn.subPool&IsLocal == 0 {
continue
Expand All @@ -556,8 +556,8 @@ func (p *TxPool) AppendLocalHashes(buf []byte) []byte {
return buf
}
func (p *TxPool) AppendRemoteHashes(buf []byte) []byte {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()

for hash, txn := range p.byHash {
if txn.subPool&IsLocal != 0 {
Expand All @@ -576,8 +576,8 @@ func (p *TxPool) AppendAllHashes(buf []byte) []byte {
return buf
}
func (p *TxPool) IdHashKnown(tx kv.Tx, hash []byte) (bool, error) {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()
if _, ok := p.discardReasonsLRU.Get(string(hash)); ok {
return true, nil
}
Expand All @@ -590,8 +590,8 @@ func (p *TxPool) IdHashKnown(tx kv.Tx, hash []byte) (bool, error) {
return tx.Has(kv.PoolTransaction, hash)
}
func (p *TxPool) IsLocal(idHash []byte) bool {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()
return p.isLocalLRU.Contains(string(idHash))
}
func (p *TxPool) AddNewGoodPeer(peerID types.PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) }
Expand All @@ -610,8 +610,8 @@ func (p *TxPool) Best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf uint64) (bo
var toRemove []*metaTx

success, err := func() (bool, error) {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()

best := p.pending.best
for i := 0; j < int(n) && i < len(best.ms); i++ {
Expand Down Expand Up @@ -647,8 +647,8 @@ func (p *TxPool) Best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf uint64) (bo
}

func (p *TxPool) CountContent() (int, int, int) {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()
return p.pending.Len(), p.baseFee.Len(), p.queued.Len()
}
func (p *TxPool) AddRemoteTxs(_ context.Context, newTxs types.TxSlots) {
Expand Down Expand Up @@ -871,14 +871,14 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots,
}

func (p *TxPool) coreDB() kv.RoDB {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()
return p._chainDB
}

func (p *TxPool) cache() kvcache.Cache {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()
return p._stateCache
}

Expand Down Expand Up @@ -1072,7 +1072,7 @@ func (p *TxPool) addLocked(mt *metaTx) DiscardReason {

if replaced := p.all.replaceOrInsert(mt); replaced != nil {
if ASSERT {
panic("must neve happen")
panic("must never happen")
}
}

Expand All @@ -1094,8 +1094,8 @@ func (p *TxPool) discardLocked(mt *metaTx, reason DiscardReason) {
}

func (p *TxPool) NonceFromAddress(addr [20]byte) (nonce uint64, inPool bool) {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()
senderID, found := p.senders.getID(addr[:])
if !found {
return 0, false
Expand Down Expand Up @@ -1700,8 +1700,8 @@ func (p *TxPool) logStats() {
return
}

p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()

var m runtime.MemStats
common.ReadMemStats(&m)
Expand All @@ -1724,8 +1724,8 @@ func (p *TxPool) logStats() {

// Deprecated need switch to streaming-like
func (p *TxPool) deprecatedForEach(_ context.Context, f func(rlp, sender []byte, t SubPoolType), tx kv.Tx) {
p.lock.RLock()
defer p.lock.RUnlock()
p.lock.Lock()
defer p.lock.Unlock()
p.all.ascendAll(func(mt *metaTx) bool {
slot := mt.Tx
slotRlp := slot.Rlp
Expand Down Expand Up @@ -1794,7 +1794,7 @@ var PoolPendingBaseFeeKey = []byte("pending_base_fee")
// it doesn't track if peer disconnected, it's fine
type recentlyConnectedPeers struct {
peers []types.PeerID
lock sync.RWMutex
lock sync.Mutex
}

func (l *recentlyConnectedPeers) AddPeer(p types.PeerID) {
Expand Down

0 comments on commit 10e8428

Please sign in to comment.