diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index b47f69d29..aa848204e 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -15,7 +15,6 @@ import ( "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/libs/clist" "github.com/tendermint/tendermint/libs/log" - tmmath "github.com/tendermint/tendermint/libs/math" "github.com/tendermint/tendermint/types" ) @@ -420,24 +419,10 @@ func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs { txmp.mtx.Lock() defer txmp.mtx.Unlock() - numTxs := txmp.priorityIndex.NumTxs() - if max < 0 { - max = numTxs - } - - cap := tmmath.MinInt(numTxs, max) - - // wTxs contains a list of *WrappedTx retrieved from the priority queue that - // need to be re-enqueued prior to returning. - wTxs := make([]*WrappedTx, 0, cap) - txs := make([]types.Tx, 0, cap) - for txmp.priorityIndex.NumTxs() > 0 && len(txs) < max { - wtx := txmp.priorityIndex.PopTx() - txs = append(txs, wtx.tx) - wTxs = append(wTxs, wtx) - } + wTxs := txmp.priorityIndex.PeekTxs(max) + txs := make([]types.Tx, 0, len(wTxs)) for _, wtx := range wTxs { - txmp.priorityIndex.PushTx(wtx) + txs = append(txs, wtx.tx) } return txs } diff --git a/internal/mempool/priority_queue.go b/internal/mempool/priority_queue.go index e31997397..ad3a347a3 100644 --- a/internal/mempool/priority_queue.go +++ b/internal/mempool/priority_queue.go @@ -4,6 +4,8 @@ import ( "container/heap" "sort" "sync" + + tmmath "github.com/tendermint/tendermint/libs/math" ) var _ heap.Interface = (*TxPriorityQueue)(nil) @@ -106,6 +108,32 @@ func (pq *TxPriorityQueue) PopTx() *WrappedTx { return nil } +// dequeue up to `max` transactions and reenqueue while locked +func (pq *TxPriorityQueue) PeekTxs(max int) []*WrappedTx { + pq.mtx.Lock() + defer pq.mtx.Unlock() + + numTxs := len(pq.txs) + if max < 0 { + max = numTxs + } + + cap := tmmath.MinInt(numTxs, max) + res := make([]*WrappedTx, 0, cap) + for i := 0; i < cap; i++ { + popped := heap.Pop(pq) + if popped == nil { + break + } + res = append(res, popped.(*WrappedTx)) + } + + for _, tx := range res { + heap.Push(pq, tx) + } + return res +} + // Push implements the Heap interface. // // NOTE: A caller should never call Push. Use PushTx instead.