Skip to content

Commit

Permalink
mempool: ensure evicted transactions are removed from the cache (back…
Browse files Browse the repository at this point in the history
…port #9000) (#9004)

This is a manual cherry-pick of commit b94470a6a42e8ffe7e7467521de5f51eb937c454.

In the original implementation transactions evicted for priority were also
removed from the cache. In addition, remove expired transactions from the
cache.

Related:

- Add Has method to cache implementations.
- Update tests to exercise this condition.

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>
  • Loading branch information
tnasu and M. J. Fromberger committed Jul 13, 2023
1 parent dd26c3b commit bb05ee3
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 4 deletions.
13 changes: 13 additions & 0 deletions mempool/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ type TxCache interface {

// Remove removes the given raw transaction from the cache.
Remove(tx types.Tx)

// Has reports whether tx is present in the cache. Checking for presence is
// not treated as an access of the value.
Has(tx types.Tx) bool
}

var _ TxCache = (*LRUTxCache)(nil)
Expand Down Expand Up @@ -97,6 +101,14 @@ func (c *LRUTxCache) Remove(tx types.Tx) {
}
}

func (c *LRUTxCache) Has(tx types.Tx) bool {
c.mtx.Lock()
defer c.mtx.Unlock()

_, ok := c.cacheMap[tx.Key()]
return ok
}

// NopTxCache defines a no-op raw transaction cache.
type NopTxCache struct{}

Expand All @@ -105,3 +117,4 @@ var _ TxCache = (*NopTxCache)(nil)
func (NopTxCache) Reset() {}
func (NopTxCache) Push(types.Tx) bool { return true }
func (NopTxCache) Remove(types.Tx) {}
func (NopTxCache) Has(types.Tx) bool { return false }
3 changes: 3 additions & 0 deletions mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
"old_priority", w.priority,
)
txmp.removeTxByElement(vic)
txmp.cache.Remove(w.tx)
txmp.metrics.EvictedTxs.Add(1)

// We may not need to evict all the eligible transactions. Bail out
Expand Down Expand Up @@ -767,9 +768,11 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
w := cur.Value.(*WrappedTx)
if txmp.config.TTLNumBlocks > 0 && (blockHeight-w.height) > txmp.config.TTLNumBlocks {
txmp.removeTxByElement(cur)
txmp.cache.Remove(w.tx)
txmp.metrics.EvictedTxs.Add(1)
} else if txmp.config.TTLDuration > 0 && now.Sub(w.timestamp) > txmp.config.TTLDuration {
txmp.removeTxByElement(cur)
txmp.cache.Remove(w.tx)
txmp.metrics.EvictedTxs.Add(1)
}
cur = next
Expand Down
12 changes: 8 additions & 4 deletions mempool/v1/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func TestTxMempool_Size(t *testing.T) {
}

func TestTxMempool_Eviction(t *testing.T) {
txmp := setup(t, 0)
txmp := setup(t, 1000)
txmp.config.Size = 5
txmp.config.MaxTxsBytes = 60
txExists := func(spec string) bool {
Expand Down Expand Up @@ -238,6 +238,7 @@ func TestTxMempool_Eviction(t *testing.T) {
mustCheckTx(t, txmp, "key1=0000=25")
require.True(t, txExists("key1=0000=25"))
require.False(t, txExists(bigTx))
require.False(t, txmp.cache.Has([]byte(bigTx)))
require.Equal(t, int64(len("key1=0000=25")), txmp.SizeBytes())

// Now fill up the rest of the slots with other transactions.
Expand Down Expand Up @@ -521,10 +522,10 @@ func TestTxMempool_ConcurrentTxs(t *testing.T) {
}

func TestTxMempool_ExpiredTxs_Timestamp(t *testing.T) {
txmp := setup(t, 50)
txmp := setup(t, 5000)
txmp.config.TTLDuration = 5 * time.Millisecond

added1 := checkTxs(t, txmp, 25, 0)
added1 := checkTxs(t, txmp, 10, 0)
require.Equal(t, len(added1), txmp.Size())

// Wait a while, then add some more transactions that should not be expired
Expand All @@ -540,7 +541,7 @@ func TestTxMempool_ExpiredTxs_Timestamp(t *testing.T) {
// The exact intervals are not important except that the delta should be
// large relative to the cost of CheckTx (ms vs. ns is fine here).
time.Sleep(3 * time.Millisecond)
added2 := checkTxs(t, txmp, 25, 1)
added2 := checkTxs(t, txmp, 10, 1)

// Wait a while longer, so that the first batch will expire.
time.Sleep(3 * time.Millisecond)
Expand All @@ -555,6 +556,9 @@ func TestTxMempool_ExpiredTxs_Timestamp(t *testing.T) {
if _, ok := txmp.txByKey[tx.tx.Key()]; ok {
t.Errorf("Transaction %X should have been purged for TTL", tx.tx.Key())
}
if txmp.cache.Has(tx.tx) {
t.Errorf("Transaction %X should have been removed from the cache", tx.tx.Key())
}
}

// All the transactions added later should still be around.
Expand Down

0 comments on commit bb05ee3

Please sign in to comment.