Skip to content

Commit

Permalink
chore: ensures consistent use of Lock and Unlock to acquire write-loc…
Browse files Browse the repository at this point in the history
…k on the mempool v1 (#1110)

This PR refactors the mempool implementation to ensure consistent use of
the `Lock` and `Unlock` methods for mutex management, promoting code
reusability and traceability.
  • Loading branch information
staheri14 committed Oct 10, 2023
1 parent f7635ef commit 0da5e1d
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 19 deletions.
34 changes: 17 additions & 17 deletions mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var _ mempool.Mempool = (*TxMempool)(nil)
// TxMempoolOption sets an optional parameter on the TxMempool.
type TxMempoolOption func(*TxMempool)

// TxMempool implemements the Mempool interface and allows the application to
// TxMempool implements the Mempool interface and allows the application to
// set priority values on transactions in the CheckTx response. When selecting
// transactions to include in a block, higher-priority transactions are chosen
// first. When evicting transactions from the mempool for size constraints,
Expand Down Expand Up @@ -136,17 +136,17 @@ func (txmp *TxMempool) FlushAppConn() error {
// We could just not require the caller to hold the lock at all, but the
// semantics of the Mempool interface require the caller to hold it, and we
// can't change that without disrupting existing use.
txmp.mtx.Unlock()
defer txmp.mtx.Lock()
txmp.Unlock()
defer txmp.Lock()

return txmp.proxyAppConn.FlushSync()
}

// EnableTxsAvailable enables the mempool to trigger events when transactions
// are available on a block by block basis.
func (txmp *TxMempool) EnableTxsAvailable() {
txmp.mtx.Lock()
defer txmp.mtx.Unlock()
txmp.Lock()
defer txmp.Unlock()

txmp.txsAvailable = make(chan struct{}, 1)
}
Expand Down Expand Up @@ -244,8 +244,8 @@ func (txmp *TxMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo memp
// mempool. It reports an error if no such transaction exists. This operation
// does not remove the transaction from the cache.
func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error {
txmp.mtx.Lock()
defer txmp.mtx.Unlock()
txmp.Lock()
defer txmp.Unlock()
return txmp.removeTxByKey(txKey)
}

Expand Down Expand Up @@ -280,8 +280,8 @@ func (txmp *TxMempool) removeTxByElement(elt *clist.CElement) {
// Flush purges the contents of the mempool and the cache, leaving both empty.
// The current height is not modified by this operation.
func (txmp *TxMempool) Flush() {
txmp.mtx.Lock()
defer txmp.mtx.Unlock()
txmp.Lock()
defer txmp.Unlock()

// Remove all the transactions in the list explicitly, so that the sizes
// and indexes get updated properly.
Expand Down Expand Up @@ -449,8 +449,8 @@ func (txmp *TxMempool) Update(
//
// Finally, the new transaction is added and size stats updated.
func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.ResponseCheckTx) {
txmp.mtx.Lock()
defer txmp.mtx.Unlock()
txmp.Lock()
defer txmp.Unlock()

var err error
if txmp.postCheck != nil {
Expand Down Expand Up @@ -613,8 +613,8 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) {
// that case is handled by addNewTransaction instead.
func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.ResponseCheckTx) {
txmp.metrics.RecheckTimes.Add(1)
txmp.mtx.Lock()
defer txmp.mtx.Unlock()
txmp.Lock()
defer txmp.Unlock()

// Find the transaction reported by the ABCI callback. It is possible the
// transaction was evicted during the recheck, in which case the transaction
Expand Down Expand Up @@ -699,8 +699,8 @@ func (txmp *TxMempool) recheckTransactions() {

// When recheck is complete, trigger a notification for more transactions.
_ = g.Wait()
txmp.mtx.Lock()
defer txmp.mtx.Unlock()
txmp.Lock()
defer txmp.Unlock()
txmp.notifyTxsAvailable()
}()
}
Expand Down Expand Up @@ -728,8 +728,8 @@ func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error {
// the txpool looped through all transactions and if so, performs a purge of any transaction
// that has expired according to the TTLDuration. This is thread safe.
func (txmp *TxMempool) CheckToPurgeExpiredTxs() {
txmp.mtx.Lock()
defer txmp.mtx.Unlock()
txmp.Lock()
defer txmp.Unlock()
if txmp.config.TTLDuration > 0 && time.Since(txmp.lastPurgeTime) > txmp.config.TTLDuration {
txmp.purgeExpiredTxs(txmp.height)
}
Expand Down
4 changes: 2 additions & 2 deletions mempool/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type mempoolIDs struct {
activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter
}

// Reserve searches for the next unused ID and assigns it to the
// ReserveForPeer searches for the next unused ID and assigns it to the
// peer.
func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) {
ids.mtx.Lock()
Expand Down Expand Up @@ -170,7 +170,7 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// broadcast routine checks if peer is gone and returns
}

// Receive implements Reactor.
// ReceiveEnvelope implements Reactor.
// It adds any received transactions to the mempool.
func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
Expand Down

0 comments on commit 0da5e1d

Please sign in to comment.