From 0da5e1deebb54f7d74d5609150cec02db6f0434d Mon Sep 17 00:00:00 2001 From: Sanaz Taheri <35961250+staheri14@users.noreply.github.com> Date: Tue, 10 Oct 2023 09:36:55 -0700 Subject: [PATCH] chore: ensures consistent use of Lock and Unlock to acquire write-lock on the mempool v1 (#1110) This PR refactors the mempool implementation to ensure consistent use of the `Lock` and `Unlock` methods for mutex management, promoting code reusability and traceability. --- mempool/v1/mempool.go | 34 +++++++++++++++++----------------- mempool/v1/reactor.go | 4 ++-- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/mempool/v1/mempool.go b/mempool/v1/mempool.go index 159dca0481..a07b685477 100644 --- a/mempool/v1/mempool.go +++ b/mempool/v1/mempool.go @@ -24,7 +24,7 @@ var _ mempool.Mempool = (*TxMempool)(nil) // TxMempoolOption sets an optional parameter on the TxMempool. type TxMempoolOption func(*TxMempool) -// TxMempool implemements the Mempool interface and allows the application to +// TxMempool implements the Mempool interface and allows the application to // set priority values on transactions in the CheckTx response. When selecting // transactions to include in a block, higher-priority transactions are chosen // first. When evicting transactions from the mempool for size constraints, @@ -136,8 +136,8 @@ func (txmp *TxMempool) FlushAppConn() error { // We could just not require the caller to hold the lock at all, but the // semantics of the Mempool interface require the caller to hold it, and we // can't change that without disrupting existing use. - txmp.mtx.Unlock() - defer txmp.mtx.Lock() + txmp.Unlock() + defer txmp.Lock() return txmp.proxyAppConn.FlushSync() } @@ -145,8 +145,8 @@ func (txmp *TxMempool) FlushAppConn() error { // EnableTxsAvailable enables the mempool to trigger events when transactions // are available on a block by block basis. func (txmp *TxMempool) EnableTxsAvailable() { - txmp.mtx.Lock() - defer txmp.mtx.Unlock() + txmp.Lock() + defer txmp.Unlock() txmp.txsAvailable = make(chan struct{}, 1) } @@ -244,8 +244,8 @@ func (txmp *TxMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo memp // mempool. It reports an error if no such transaction exists. This operation // does not remove the transaction from the cache. func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error { - txmp.mtx.Lock() - defer txmp.mtx.Unlock() + txmp.Lock() + defer txmp.Unlock() return txmp.removeTxByKey(txKey) } @@ -280,8 +280,8 @@ func (txmp *TxMempool) removeTxByElement(elt *clist.CElement) { // Flush purges the contents of the mempool and the cache, leaving both empty. // The current height is not modified by this operation. func (txmp *TxMempool) Flush() { - txmp.mtx.Lock() - defer txmp.mtx.Unlock() + txmp.Lock() + defer txmp.Unlock() // Remove all the transactions in the list explicitly, so that the sizes // and indexes get updated properly. @@ -449,8 +449,8 @@ func (txmp *TxMempool) Update( // // Finally, the new transaction is added and size stats updated. func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.ResponseCheckTx) { - txmp.mtx.Lock() - defer txmp.mtx.Unlock() + txmp.Lock() + defer txmp.Unlock() var err error if txmp.postCheck != nil { @@ -613,8 +613,8 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) { // that case is handled by addNewTransaction instead. func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.ResponseCheckTx) { txmp.metrics.RecheckTimes.Add(1) - txmp.mtx.Lock() - defer txmp.mtx.Unlock() + txmp.Lock() + defer txmp.Unlock() // Find the transaction reported by the ABCI callback. It is possible the // transaction was evicted during the recheck, in which case the transaction @@ -699,8 +699,8 @@ func (txmp *TxMempool) recheckTransactions() { // When recheck is complete, trigger a notification for more transactions. _ = g.Wait() - txmp.mtx.Lock() - defer txmp.mtx.Unlock() + txmp.Lock() + defer txmp.Unlock() txmp.notifyTxsAvailable() }() } @@ -728,8 +728,8 @@ func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error { // the txpool looped through all transactions and if so, performs a purge of any transaction // that has expired according to the TTLDuration. This is thread safe. func (txmp *TxMempool) CheckToPurgeExpiredTxs() { - txmp.mtx.Lock() - defer txmp.mtx.Unlock() + txmp.Lock() + defer txmp.Unlock() if txmp.config.TTLDuration > 0 && time.Since(txmp.lastPurgeTime) > txmp.config.TTLDuration { txmp.purgeExpiredTxs(txmp.height) } diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index 37e03a8ce9..8dfa3d5c76 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -34,7 +34,7 @@ type mempoolIDs struct { activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter } -// Reserve searches for the next unused ID and assigns it to the +// ReserveForPeer searches for the next unused ID and assigns it to the // peer. func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) { ids.mtx.Lock() @@ -170,7 +170,7 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { // broadcast routine checks if peer is gone and returns } -// Receive implements Reactor. +// ReceiveEnvelope implements Reactor. // It adds any received transactions to the mempool. func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) { memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)