diff --git a/config/config.go b/config/config.go index d7250e00e7..19659ffac7 100644 --- a/config/config.go +++ b/config/config.go @@ -28,6 +28,7 @@ const ( // Default is v0. MempoolV0 = "v0" MempoolV1 = "v1" + MempoolV2 = "v2" ) // NOTE: Most of the structs & relevant comments + the @@ -687,9 +688,7 @@ type MempoolConfig struct { // Mempool version to use: // 1) "v0" - (default) FIFO mempool. // 2) "v1" - prioritized mempool. - // WARNING: There's a known memory leak with the prioritized mempool - // that the team are working on. Read more here: - // https://github.com/tendermint/tendermint/issues/8775 + // 3) "v2" - content addressable transaction pool Version string `mapstructure:"version"` RootDir string `mapstructure:"home"` Recheck bool `mapstructure:"recheck"` @@ -735,7 +734,7 @@ type MempoolConfig struct { // DefaultMempoolConfig returns a default configuration for the Tendermint mempool func DefaultMempoolConfig() *MempoolConfig { return &MempoolConfig{ - Version: MempoolV0, + Version: MempoolV2, Recheck: true, Broadcast: true, WalPath: "", diff --git a/config/toml.go b/config/toml.go index f6e9434d35..06d0e1b675 100644 --- a/config/toml.go +++ b/config/toml.go @@ -345,6 +345,7 @@ dial_timeout = "{{ .P2P.DialTimeout }}" # Mempool version to use: # 1) "v0" - (default) FIFO mempool. # 2) "v1" - prioritized mempool. +# 3) "v2" - content addressable transaction pool version = "{{ .Mempool.Version }}" recheck = {{ .Mempool.Recheck }} diff --git a/mempool/cat/cache.go b/mempool/cat/cache.go new file mode 100644 index 0000000000..f2b0dfea38 --- /dev/null +++ b/mempool/cat/cache.go @@ -0,0 +1,237 @@ +package cat + +import ( + "container/list" + "time" + + tmsync "github.com/tendermint/tendermint/libs/sync" + "github.com/tendermint/tendermint/types" +) + +// LRUTxCache maintains a thread-safe LRU cache of raw transactions. The cache +// only stores the hash of the raw transaction. +type LRUTxCache struct { + mtx tmsync.Mutex + size int + cacheMap map[types.TxKey]*list.Element + list *list.List +} + +func NewLRUTxCache(cacheSize int) *LRUTxCache { + return &LRUTxCache{ + size: cacheSize, + cacheMap: make(map[types.TxKey]*list.Element, cacheSize), + list: list.New(), + } +} + +// GetList returns the underlying linked-list that backs the LRU cache. Note, +// this should be used for testing purposes only! +func (c *LRUTxCache) GetList() *list.List { + return c.list +} + +func (c *LRUTxCache) Reset() { + c.mtx.Lock() + defer c.mtx.Unlock() + + c.cacheMap = make(map[types.TxKey]*list.Element, c.size) + c.list.Init() +} + +func (c *LRUTxCache) Push(txKey types.TxKey) bool { + if c.size == 0 { + return true + } + + c.mtx.Lock() + defer c.mtx.Unlock() + + moved, ok := c.cacheMap[txKey] + if ok { + c.list.MoveToBack(moved) + return false + } + + if c.list.Len() >= c.size { + front := c.list.Front() + if front != nil { + frontKey := front.Value.(types.TxKey) + delete(c.cacheMap, frontKey) + c.list.Remove(front) + } + } + + e := c.list.PushBack(txKey) + c.cacheMap[txKey] = e + + return true +} + +func (c *LRUTxCache) Remove(txKey types.TxKey) { + c.mtx.Lock() + defer c.mtx.Unlock() + + e := c.cacheMap[txKey] + delete(c.cacheMap, txKey) + + if e != nil { + c.list.Remove(e) + } +} + +func (c *LRUTxCache) Has(txKey types.TxKey) bool { + if c.size == 0 { + return false + } + + c.mtx.Lock() + defer c.mtx.Unlock() + + _, ok := c.cacheMap[txKey] + return ok +} + +type EvictedTxInfo struct { + timeEvicted time.Time + priority int64 + gasWanted int64 + sender string + peers map[uint16]bool +} + +type EvictedTxCache struct { + mtx tmsync.Mutex + size int + cache map[types.TxKey]*EvictedTxInfo +} + +func NewEvictedTxCache(size int) *EvictedTxCache { + return &EvictedTxCache{ + size: size, + cache: make(map[types.TxKey]*EvictedTxInfo), + } +} + +func (c *EvictedTxCache) Has(txKey types.TxKey) bool { + c.mtx.Lock() + defer c.mtx.Unlock() + _, exists := c.cache[txKey] + return exists +} + +func (c *EvictedTxCache) Push(wtx *WrappedTx) { + c.mtx.Lock() + defer c.mtx.Unlock() + c.cache[wtx.key] = &EvictedTxInfo{ + timeEvicted: time.Now().UTC(), + priority: wtx.priority, + gasWanted: wtx.gasWanted, + sender: wtx.sender, + peers: wtx.peers, + } + // if cache too large, remove the oldest entry + if len(c.cache) > c.size { + oldestTxKey := wtx.key + oldestTxTime := time.Now().UTC() + for key, info := range c.cache { + if info.timeEvicted.Before(oldestTxTime) { + oldestTxTime = info.timeEvicted + oldestTxKey = key + } + } + delete(c.cache, oldestTxKey) + } +} + +func (c *EvictedTxCache) Pop(txKey types.TxKey) *EvictedTxInfo { + c.mtx.Lock() + defer c.mtx.Unlock() + info, exists := c.cache[txKey] + if !exists { + return nil + } else { + delete(c.cache, txKey) + return info + } +} + +func (c *EvictedTxCache) Prune(limit time.Time) { + c.mtx.Lock() + defer c.mtx.Unlock() + for key, info := range c.cache { + if info.timeEvicted.Before(limit) { + delete(c.cache, key) + } + } +} + +// seenTxSet records transactions that have been +// seen by other peers but not yet by us +type SeenTxSet struct { + mtx tmsync.Mutex + size int + set map[types.TxKey]timestampedPeerSet +} + +type timestampedPeerSet struct { + peers map[uint16]bool + time time.Time +} + +func NewSeenTxSet(size int) *SeenTxSet { + return &SeenTxSet{ + size: size, + set: make(map[types.TxKey]timestampedPeerSet), + } +} + +func (s *SeenTxSet) Add(txKey types.TxKey, peer uint16) { + s.mtx.Lock() + defer s.mtx.Unlock() + seenSet, exists := s.set[txKey] + if !exists { + s.set[txKey] = timestampedPeerSet{ + peers: map[uint16]bool{peer: true}, + time: time.Now().UTC(), + } + s.constrainSize() + } else { + seenSet.peers[peer] = true + } +} + +func (s *SeenTxSet) constrainSize() { + if len(s.set) > s.size { + var ( + oldestTxKey types.TxKey + oldestTime time.Time + ) + for key, set := range s.set { + if oldestTime.IsZero() || set.time.Before(oldestTime) { + oldestTxKey = key + oldestTime = set.time + } + } + delete(s.set, oldestTxKey) + } +} + +func (s *SeenTxSet) Pop(txKey types.TxKey) map[uint16]bool { + s.mtx.Lock() + defer s.mtx.Unlock() + seenSet, exists := s.set[txKey] + if !exists { + return nil + } else { + delete(s.set, txKey) + return seenSet.peers + } +} + +// Len returns the amount of cached items. Mostly used for testing. +func (s *SeenTxSet) Len() int { + s.mtx.Lock() + defer s.mtx.Unlock() + return len(s.set) +} diff --git a/mempool/cat/cache_test.go b/mempool/cat/cache_test.go new file mode 100644 index 0000000000..6f983f8183 --- /dev/null +++ b/mempool/cat/cache_test.go @@ -0,0 +1,94 @@ +package cat_test + +import ( + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/mempool/cat" + "github.com/tendermint/tendermint/types" +) + +func TestSeenTxSet(t *testing.T) { + var ( + tx1Key = types.Tx("tx1").Key() + tx2Key = types.Tx("tx2").Key() + tx3Key = types.Tx("tx3").Key() + peer1 uint16 = 1 + peer2 uint16 = 2 + ) + + seenSet := cat.NewSeenTxSet(2) + require.Nil(t, seenSet.Pop(tx1Key)) + + seenSet.Add(tx1Key, peer1) + seenSet.Add(tx1Key, peer2) + seenSet.Add(tx1Key, peer1) + peers := seenSet.Pop(tx1Key) + require.NotNil(t, peers) + require.Equal(t, map[uint16]bool{peer1: true, peer2: true}, peers) + seenSet.Add(tx2Key, peer1) + seenSet.Add(tx3Key, peer1) + seenSet.Add(tx1Key, peer1) + require.Equal(t, 2, seenSet.Len()) +} + +func TestCacheRemove(t *testing.T) { + cache := cat.NewLRUTxCache(100) + numTxs := 10 + + txs := make([][32]byte, numTxs) + for i := 0; i < numTxs; i++ { + // probability of collision is 2**-256 + txBytes := make([]byte, 32) + _, err := rand.Read(txBytes) + require.NoError(t, err) + + copy(txs[i][:], txBytes) + cache.Push(txs[i]) + + // make sure its added to both the linked list and the map + require.Equal(t, i+1, cache.GetList().Len()) + } + + for i := 0; i < numTxs; i++ { + cache.Remove(txs[i]) + // make sure its removed from both the map and the linked list + require.Equal(t, numTxs-(i+1), cache.GetList().Len()) + } +} + +func TestEvictedTxCache(t *testing.T) { + var ( + tx1 = types.Tx("tx1") + tx2 = types.Tx("tx2") + tx3 = types.Tx("tx3") + wtx1 = cat.NewWrappedTx( + tx1, tx1.Key(), 10, 1, 5, "", + ) + wtx2 = cat.NewWrappedTx( + tx2, tx2.Key(), 10, 1, 5, "", + ) + wtx3 = cat.NewWrappedTx( + tx3, tx3.Key(), 10, 1, 5, "", + ) + ) + + cache := cat.NewEvictedTxCache(2) + require.False(t, cache.Has(tx1.Key())) + require.Nil(t, cache.Pop(tx1.Key())) + cache.Push(wtx1) + require.True(t, cache.Has(tx1.Key())) + require.NotNil(t, cache.Pop(tx1.Key())) + cache.Push(wtx1) + time.Sleep(1 * time.Millisecond) + cache.Push(wtx2) + time.Sleep(1 * time.Millisecond) + cache.Push(wtx3) + require.False(t, cache.Has(tx1.Key())) + cache.Prune(time.Now().UTC().Add(1 * time.Second)) + require.False(t, cache.Has(tx2.Key())) + require.False(t, cache.Has(tx3.Key())) +} diff --git a/mempool/cat/mempool.go b/mempool/cat/mempool.go new file mode 100644 index 0000000000..b73d51f45e --- /dev/null +++ b/mempool/cat/mempool.go @@ -0,0 +1,849 @@ +package cat + +import ( + "errors" + "fmt" + "runtime" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/creachadair/taskgroup" + + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/clist" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/mempool" + "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/types" +) + +var _ mempool.Mempool = (*TxMempool)(nil) + +const ( + evictedTxCacheSize = 100 + seenByPeerSetSize = 200 +) + +// TxMempoolOption sets an optional parameter on the TxMempool. +type TxMempoolOption func(*TxMempool) + +// TxMempool implemements the Mempool interface and allows the application to +// set priority values on transactions in the CheckTx response. When selecting +// transactions to include in a block, higher-priority transactions are chosen +// first. When evicting transactions from the mempool for size constraints, +// lower-priority transactions are evicted sooner. +// +// Within the mempool, transactions are ordered by time of arrival, and are +// gossiped to the rest of the network based on that order (gossip order does +// not take priority into account). +type TxMempool struct { + // Immutable fields + logger log.Logger + config *config.MempoolConfig + proxyAppConn proxy.AppConnMempool + metrics *mempool.Metrics + + // Atomically-updated fields + txsBytes int64 // atomic: the total size of all transactions in the mempool, in bytes + + // These fields are not synchronized. They are modified in `Update` which should never + // be called concurrently. + notifiedTxsAvailable bool + txsAvailable chan struct{} // one value sent per height when mempool is not empty + preCheck mempool.PreCheckFunc + postCheck mempool.PostCheckFunc + height int64 // the latest height passed to Update + + // Concurrent list of valid transactions (passed CheckTx) + txs *clist.CList + // Thread-safe cache of rejected transactions for quick look-up + rejectedTxCache *LRUTxCache + // Thread-safe cache of valid txs that were evicted + evictedTxs *EvictedTxCache + // Thread-safe list of transactions peers have seen that we have not yet seen + seenByPeersSet *SeenTxSet + + // Synchronized fields, protected by mtx. + mtx *sync.RWMutex + txByKey map[types.TxKey]*clist.CElement // used as a lookup table +} + +// NewTxMempool constructs a new, empty priority mempool at the specified +// initial height and using the given config and options. +func NewTxMempool( + logger log.Logger, + cfg *config.MempoolConfig, + proxyAppConn proxy.AppConnMempool, + height int64, + options ...TxMempoolOption, +) *TxMempool { + + txmp := &TxMempool{ + logger: logger, + config: cfg, + proxyAppConn: proxyAppConn, + metrics: mempool.NopMetrics(), + rejectedTxCache: NewLRUTxCache(cfg.CacheSize), + evictedTxs: NewEvictedTxCache(evictedTxCacheSize), + seenByPeersSet: NewSeenTxSet(seenByPeerSetSize), + txs: clist.New(), + mtx: new(sync.RWMutex), + height: height, + txByKey: make(map[types.TxKey]*clist.CElement), + preCheck: func(_ types.Tx) error { return nil }, + postCheck: func(_ types.Tx, _ *abci.ResponseCheckTx) error { return nil }, + } + + for _, opt := range options { + opt(txmp) + } + + return txmp +} + +// WithPreCheck sets a filter for the mempool to reject a transaction if f(tx) +// returns an error. This is executed before CheckTx. It only applies to the +// first created block. After that, Update() overwrites the existing value. +func WithPreCheck(f mempool.PreCheckFunc) TxMempoolOption { + return func(txmp *TxMempool) { txmp.preCheck = f } +} + +// WithPostCheck sets a filter for the mempool to reject a transaction if +// f(tx, resp) returns an error. This is executed after CheckTx. It only applies +// to the first created block. After that, Update overwrites the existing value. +func WithPostCheck(f mempool.PostCheckFunc) TxMempoolOption { + return func(txmp *TxMempool) { txmp.postCheck = f } +} + +// WithMetrics sets the mempool's metrics collector. +func WithMetrics(metrics *mempool.Metrics) TxMempoolOption { + return func(txmp *TxMempool) { txmp.metrics = metrics } +} + +// Lock obtains a write-lock on the mempool. A caller must be sure to explicitly +// release the lock when finished. No transactions will be added or removed +// until the lock is released +func (txmp *TxMempool) Lock() { txmp.mtx.Lock() } + +// Unlock releases a write-lock on the mempool. +func (txmp *TxMempool) Unlock() { txmp.mtx.Unlock() } + +// Size returns the number of valid transactions in the mempool. It is +// thread-safe. +func (txmp *TxMempool) Size() int { return txmp.txs.Len() } + +// SizeBytes return the total sum in bytes of all the valid transactions in the +// mempool. It is thread-safe. +func (txmp *TxMempool) SizeBytes() int64 { return atomic.LoadInt64(&txmp.txsBytes) } + +// FlushAppConn executes FlushSync on the mempool's proxyAppConn. +// +// The caller must hold an exclusive mempool lock (by calling txmp.Lock) before +// calling FlushAppConn. +func (txmp *TxMempool) FlushAppConn() error { + // N.B.: We have to issue the call outside the lock so that its callback can + // fire. It's safe to do this, the flush will block until complete. + // + // We could just not require the caller to hold the lock at all, but the + // semantics of the Mempool interface require the caller to hold it, and we + // can't change that without disrupting existing use. + txmp.mtx.Unlock() + defer txmp.mtx.Lock() + + return txmp.proxyAppConn.FlushSync() +} + +// EnableTxsAvailable enables the mempool to trigger events when transactions +// are available on a block by block basis. +func (txmp *TxMempool) EnableTxsAvailable() { + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + + txmp.txsAvailable = make(chan struct{}, 1) +} + +// TxsAvailable returns a channel which fires once for every height, and only +// when transactions are available in the mempool. It is thread-safe. +func (txmp *TxMempool) TxsAvailable() <-chan struct{} { return txmp.txsAvailable } + +func (txmp *TxMempool) Has(txKey types.TxKey) bool { + txmp.mtx.RLock() + defer txmp.mtx.RUnlock() + return txmp.has(txKey) +} + +func (txmp *TxMempool) has(txKey types.TxKey) bool { + _, exists := txmp.txByKey[txKey] + return exists +} + +func (txmp *TxMempool) GetAllTxKeys() []types.TxKey { + txmp.mtx.RLock() + defer txmp.mtx.RUnlock() + keys := make([]types.TxKey, len(txmp.txByKey)) + idx := 0 + for key := range txmp.txByKey { + keys[idx] = key + idx++ + } + return keys +} + +func (txmp *TxMempool) IsRejectedTx(txKey types.TxKey) bool { + return txmp.rejectedTxCache.Has(txKey) +} + +func (txmp *TxMempool) WasRecentlyEvicted(txKey types.TxKey) bool { + return txmp.evictedTxs.Has(txKey) +} + +func (txmp *TxMempool) TryReinsertEvictedTx(txKey types.TxKey, tx types.Tx, peer uint16) error { + info := txmp.evictedTxs.Pop(txKey) + if info == nil { + return nil + } + txmp.logger.Debug("attempting to reinsert evicted tx", "txKey", fmt.Sprintf("%X", txKey)) + wtx := NewWrappedTx( + tx, txKey, txmp.height, info.gasWanted, info.priority, info.sender, + ) + for p := range info.peers { + wtx.SetPeer(p) + } + wtx.SetPeer(peer) + checkTxResp := &abci.ResponseCheckTx{ + Code: abci.CodeTypeOK, + Priority: info.priority, + Sender: info.sender, + GasWanted: info.gasWanted, + } + return txmp.addNewTransaction(wtx, checkTxResp) +} + +// CheckTx adds the given transaction to the mempool if it fits and passes the +// application's ABCI CheckTx method. +// +// CheckTx reports an error without adding tx if: +// +// - The size of tx exceeds the configured maximum transaction size. +// - The pre-check hook reports an error for tx. +// - The transaction already exists in the transaction clist or in the rejectedTxCache. +// +// If tx passes all of the above conditions, `TryAddNewTx` is called +func (txmp *TxMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo mempool.TxInfo) error { + // Reject transactions in excess of the configured maximum transaction size. + if len(tx) > txmp.config.MaxTxBytes { + return mempool.ErrTxTooLarge{Max: txmp.config.MaxTxBytes, Actual: len(tx)} + } + + key := tx.Key() + + if txmp.IsRejectedTx(key) { + // The peer has sent us a transaction that we have marked as invalid. Since `CheckTx` can + // be non-deterministic, we don't punish the peer but instead just ignore the msg + return mempool.ErrTxInCache + } + + if txmp.WasRecentlyEvicted(key) { + // the transaction was recently evicted. If true, we attempt to re-add it to the mempool + // skipping check tx. + return txmp.TryReinsertEvictedTx(key, tx, txInfo.SenderID) + } + + // This is a new transaction that we haven't seen before. Verify it against the app and attempt + // to add it to the transaction pool. + rsp, err := txmp.TryAddNewTx(tx, key, txInfo) + if err != nil { + return err + } + + // call the callback if it is set + if cb != nil { + cb(&abci.Response{Value: &abci.Response_CheckTx{CheckTx: rsp}}) + } + return nil +} + +// TryAddNewTx attempts to add a tx that has not already been seen before. It first marks it as seen +// to avoid races with the same tx. It then call `CheckTx` so that the application can validate it. +// If it passes `CheckTx`, the new transaction is added to the mempool solong as it has +// sufficient priority and space else if evicted it will return an error +func (txmp *TxMempool) TryAddNewTx(tx types.Tx, key types.TxKey, txInfo mempool.TxInfo) (*abci.ResponseCheckTx, error) { + // reserve the key + if !txmp.reserveTx(key) { + txmp.logger.Debug("mempool already attempting to verify and add transaction", "txKey", fmt.Sprintf("%X", key)) + txmp.PeerHasTx(txInfo.SenderID, key) + return nil, errors.New("tx already added") + } + + resp, err := txmp.tryAddNewTx(tx, key, txInfo) + if err != nil { + // remove the reservation if adding failed + txmp.unreserveTx(key) + } + return resp, err +} + +func (txmp *TxMempool) tryAddNewTx(tx types.Tx, key types.TxKey, txInfo mempool.TxInfo) (*abci.ResponseCheckTx, error) { + // Reject transactions in excess of the configured maximum transaction size. + if len(tx) > txmp.config.MaxTxBytes { + return nil, mempool.ErrTxTooLarge{Max: txmp.config.MaxTxBytes, Actual: len(tx)} + } + + // If a precheck hook is defined, call it before invoking the application. + if err := txmp.preCheck(tx); err != nil { + return nil, mempool.ErrPreCheck{Reason: err} + } + + // Early exit if the proxy connection has an error. + if err := txmp.proxyAppConn.Error(); err != nil { + return nil, err + } + + // Invoke an ABCI CheckTx for this transaction. + rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{Tx: tx}) + if err != nil { + return rsp, err + } + if rsp.Code != abci.CodeTypeOK { + txmp.metrics.RejectedTxs.Add(1) + return rsp, fmt.Errorf("application rejected transaction with code %d", rsp.Code) + } + + // Create wrapped tx + wtx := NewWrappedTx( + tx, key, txmp.height, rsp.GasWanted, rsp.Priority, rsp.Sender, + ) + if txInfo.SenderID > 0 { + wtx.SetPeer(txInfo.SenderID) + } + + // Perform the post check + err = txmp.postCheck(wtx.tx, rsp) + if err != nil { + txmp.metrics.RejectedTxs.Add(1) + return rsp, fmt.Errorf("rejected bad transaction after post check: %w", err) + } + + // Now we consider the transaction to be valid. Once a transaction is valid, it + // can only become invalid if recheckTx is enabled and RecheckTx returns a non zero code + if err := txmp.addNewTransaction(wtx, rsp); err != nil { + return nil, err + } + return rsp, nil +} + +// reserveTx adds an empty element for the specified key to prevent +// a transaction with the same key from being added +func (txmp *TxMempool) reserveTx(key types.TxKey) bool { + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + if _, ok := txmp.txByKey[key]; ok { + return false // already reserved + } else { + txmp.txByKey[key] = &clist.CElement{} + } + return true +} + +// unreserveTx is called when a pending transaction failed +// to enter the mempool. The empty element and key is removed. +func (txmp *TxMempool) unreserveTx(key types.TxKey) { + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + value, ok := txmp.txByKey[key] + if ok && value.Value == nil { + delete(txmp.txByKey, key) + } +} + +// RemoveTxByKey removes the transaction with the specified key from the +// mempool. It reports an error if no such transaction exists. This operation +// does not remove the transaction from the rejectedTxCache. +func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error { + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + return txmp.removeTxByKey(txKey) +} + +// removeTxByKey removes the specified transaction key from the mempool. +// The caller must hold txmp.mtx excluxively. +func (txmp *TxMempool) removeTxByKey(key types.TxKey) error { + if elt, ok := txmp.txByKey[key]; ok { + w := elt.Value.(*WrappedTx) + delete(txmp.txByKey, key) + txmp.txs.Remove(elt) + elt.DetachPrev() + elt.DetachNext() + atomic.AddInt64(&txmp.txsBytes, -w.Size()) + return nil + } + return fmt.Errorf("transaction %x not found", key) +} + +// removeTxByElement removes the specified transaction element from the mempool. +// The caller must hold txmp.mtx exclusively. +func (txmp *TxMempool) removeTxByElement(elt *clist.CElement) { + w := elt.Value.(*WrappedTx) + delete(txmp.txByKey, w.tx.Key()) + txmp.txs.Remove(elt) + elt.DetachPrev() + elt.DetachNext() + atomic.AddInt64(&txmp.txsBytes, -w.Size()) +} + +// Flush purges the contents of the mempool and the cache, leaving both empty. +// The current height is not modified by this operation. +func (txmp *TxMempool) Flush() { + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + + // Remove all the transactions in the list explicitly, so that the sizes + // and indexes get updated properly. + cur := txmp.txs.Front() + for cur != nil { + next := cur.Next() + txmp.removeTxByElement(cur) + cur = next + } + txmp.rejectedTxCache.Reset() +} + +// PeerHasTx marks that the transaction has been seen by a peer. +// It returns true if the mempool has the transaction and has recorded the +// peer and false if the mempool has not yet seen the transaction that the +// peer has +func (txmp *TxMempool) PeerHasTx(peer uint16, txKey types.TxKey) { + // peer must be non-zero + if peer == 0 { + return + } + txmp.mtx.RLock() + defer txmp.mtx.RUnlock() + el, exists := txmp.txByKey[txKey] + if exists { + wtx := el.Value.(*WrappedTx) + wtx.SetPeer(peer) + } else { + txmp.seenByPeersSet.Add(txKey, peer) + } +} + +// allEntriesSorted returns a slice of all the transactions currently in the +// mempool, sorted in nonincreasing order by priority with ties broken by +// increasing order of arrival time. +func (txmp *TxMempool) allEntriesSorted() []*WrappedTx { + txmp.mtx.RLock() + defer txmp.mtx.RUnlock() + + all := make([]*WrappedTx, len(txmp.txByKey)) + idx := 0 + for _, tx := range txmp.txByKey { + all[idx] = tx.Value.(*WrappedTx) + idx++ + } + sort.Slice(all, func(i, j int) bool { + if all[i].priority == all[j].priority { + return all[i].timestamp.Before(all[j].timestamp) + } + return all[i].priority > all[j].priority // N.B. higher priorities first + }) + return all +} + +// ReapMaxBytesMaxGas returns a slice of valid transactions that fit within the +// size and gas constraints. The results are ordered by nonincreasing priority, +// with ties broken by increasing order of arrival. Reaping transactions does +// not remove them from the mempool.add +// +// If maxBytes < 0, no limit is set on the total size in bytes. +// If maxGas < 0, no limit is set on the total gas cost. +// +// If the mempool is empty or has no transactions fitting within the given +// constraints, the result will also be empty. +func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { + var totalGas, totalBytes int64 + + var keep []types.Tx //nolint:prealloc + for _, w := range txmp.allEntriesSorted() { + // N.B. When computing byte size, we need to include the overhead for + // encoding as protobuf to send to the application. + totalGas += w.gasWanted + totalBytes += types.ComputeProtoSizeForTxs([]types.Tx{w.tx}) + if (maxGas >= 0 && totalGas > maxGas) || (maxBytes >= 0 && totalBytes > maxBytes) { + break + } + keep = append(keep, w.tx) + } + return keep +} + +// TxsWaitChan returns a channel that is closed when there is at least one +// transaction available to be gossiped. +func (txmp *TxMempool) TxsWaitChan() <-chan struct{} { return txmp.txs.WaitChan() } + +// TxsFront returns the frontmost element of the pending transaction list. +// It will be nil if the mempool is empty. +func (txmp *TxMempool) TxsFront() *clist.CElement { return txmp.txs.Front() } + +// ReapMaxTxs returns up to max transactions from the mempool. The results are +// ordered by nonincreasing priority with ties broken by increasing order of +// arrival. Reaping transactions does not remove them from the mempool. +// +// If max < 0, all transactions in the mempool are reaped. +// +// The result may have fewer than max elements (possibly zero) if the mempool +// does not have that many transactions available. +func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs { + var keep []types.Tx //nolint:prealloc + + for _, w := range txmp.allEntriesSorted() { + if max >= 0 && len(keep) >= max { + break + } + keep = append(keep, w.tx) + } + return keep +} + +// Update removes all the given transactions from the mempool and the cache, +// and updates the current block height. The blockTxs and deliverTxResponses +// must have the same length with each response corresponding to the tx at the +// same offset. +// +// If the configuration enables recheck, Update sends each remaining +// transaction after removing blockTxs to the ABCI CheckTx method. Any +// transactions marked as invalid during recheck are also removed. +// +// The caller must hold an exclusive mempool lock (by calling txmp.Lock) before +// calling Update. +func (txmp *TxMempool) Update( + blockHeight int64, + blockTxs types.Txs, + deliverTxResponses []*abci.ResponseDeliverTx, + newPreFn mempool.PreCheckFunc, + newPostFn mempool.PostCheckFunc, +) error { + // Safety check: Transactions and responses must match in number. + if len(blockTxs) != len(deliverTxResponses) { + panic(fmt.Sprintf("mempool: got %d transactions but %d DeliverTx responses", + len(blockTxs), len(deliverTxResponses))) + } + + txmp.height = blockHeight + txmp.notifiedTxsAvailable = false + + if newPreFn != nil { + txmp.preCheck = newPreFn + } + if newPostFn != nil { + txmp.postCheck = newPostFn + } + + for _, tx := range blockTxs { + // Regardless of success, remove the transaction from the mempool. + _ = txmp.removeTxByKey(tx.Key()) + } + + txmp.purgeExpiredTxs(blockHeight) + + // If there any uncommitted transactions left in the mempool, we either + // initiate re-CheckTx per remaining transaction or notify that remaining + // transactions are left. + size := txmp.Size() + txmp.metrics.Size.Set(float64(size)) + if size > 0 { + if txmp.config.Recheck { + txmp.recheckTransactions() + } else { + txmp.notifyTxsAvailable() + } + } + return nil +} + +// addNewTransaction handles the ABCI CheckTx response for the first time a +// transaction is added to the mempool. A recheck after a block is committed +// goes to handleRecheckResult. +// +// If either the application rejected the transaction or a post-check hook is +// defined and rejects the transaction, it is discarded. +// +// Otherwise, if the mempool is full, check for lower-priority transactions +// that can be evicted to make room for the new one. If no such transactions +// exist, this transaction is logged and dropped; otherwise the selected +// transactions are evicted. +// +// Finally, the new transaction is added and size stats updated. +func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.ResponseCheckTx) error { + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + + // At this point the application has ruled the transaction valid, but the + // mempool might be full. If so, find the lowest-priority items with lower + // priority than the application assigned to this new one, and evict as many + // of them as necessary to make room for tx. If no such items exist, we + // discard tx. + if !txmp.canAddTx(wtx) { + var victims []*clist.CElement // eligible transactions for eviction + var victimBytes int64 // total size of victims + for cur := txmp.txs.Front(); cur != nil; cur = cur.Next() { + cw := cur.Value.(*WrappedTx) + if cw.priority < wtx.priority { + victims = append(victims, cur) + victimBytes += cw.Size() + } + } + + // If there are no suitable eviction candidates, or the total size of + // those candidates is not enough to make room for the new transaction, + // drop the new one. + if len(victims) == 0 || victimBytes < wtx.Size() { + txmp.metrics.EvictedTxs.Add(1) + txmp.evictedTxs.Push(wtx) + checkTxRes.MempoolError = + fmt.Sprintf("rejected valid incoming transaction; mempool is full (%X)", + wtx.tx.Hash()) + return fmt.Errorf("rejected valid incoming transaction; mempool is full (%X)", + wtx.tx.Hash()) + } + + txmp.logger.Debug("evicting lower-priority transactions", + "new_tx", fmt.Sprintf("%X", wtx.tx.Hash()), + "new_priority", wtx.priority, + ) + + // Sort lowest priority items first so they will be evicted first. Break + // ties in favor of newer items (to maintain FIFO semantics in a group). + sort.Slice(victims, func(i, j int) bool { + iw := victims[i].Value.(*WrappedTx) + jw := victims[j].Value.(*WrappedTx) + if iw.priority == jw.priority { + return iw.timestamp.After(jw.timestamp) + } + return iw.priority < jw.priority + }) + + // Evict as many of the victims as necessary to make room. + var evictedBytes int64 + for _, vic := range victims { + w := vic.Value.(*WrappedTx) + txmp.evictTx(w) + + // We may not need to evict all the eligible transactions. Bail out + // early if we have made enough room. + evictedBytes += w.Size() + if evictedBytes >= wtx.Size() { + break + } + } + } + + // check if the transaction has been seen by other peers before + peers := txmp.seenByPeersSet.Pop(wtx.key) + if peers != nil { + for peer := range peers { + wtx.SetPeer(peer) + } + } + + txmp.insertTx(wtx) + + txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size())) + txmp.metrics.Size.Set(float64(txmp.Size())) + txmp.logger.Debug( + "inserted new valid transaction", + "priority", wtx.priority, + "tx", fmt.Sprintf("%X", wtx.tx.Hash()), + "height", txmp.height, + "num_txs", txmp.Size(), + ) + txmp.notifyTxsAvailable() + return nil +} + +func (txmp *TxMempool) insertTx(wtx *WrappedTx) { + elt := txmp.txs.PushBack(wtx) + txmp.txByKey[wtx.tx.Key()] = elt + // if we're reinserting an evicted transaction + // remove it from the map + txmp.evictedTxs.Pop(wtx.key) + + atomic.AddInt64(&txmp.txsBytes, wtx.Size()) +} + +func (txmp *TxMempool) evictTx(wtx *WrappedTx) { + txmp.removeTxByKey(wtx.key) + txmp.metrics.EvictedTxs.Add(1) + txmp.evictedTxs.Push(wtx) + txmp.logger.Debug( + "evicted valid existing transaction; mempool full", + "old_tx", fmt.Sprintf("%X", wtx.key), + "old_priority", wtx.priority, + ) +} + +// handleRecheckResult handles the responses from ABCI CheckTx calls issued +// during the recheck phase of a block Update. It removes any transactions +// invalidated by the application. +// +// This method is NOT executed for the initial CheckTx on a new transaction; +// that case is handled by addNewTransaction instead. +func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.ResponseCheckTx) { + txmp.metrics.RecheckTimes.Add(1) + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + + // Find the transaction reported by the ABCI callback. It is possible the + // transaction was evicted during the recheck, in which case the transaction + // will be gone. + elt, ok := txmp.txByKey[tx.Key()] + if !ok { + return + } + wtx := elt.Value.(*WrappedTx) + + // If a postcheck hook is defined, call it before checking the result. + var err error + if txmp.postCheck != nil { + err = txmp.postCheck(tx, checkTxRes) + } + + if checkTxRes.Code == abci.CodeTypeOK && err == nil { + // Note that we do not update the transaction with any of the values returned in + // recheck tx + return // N.B. Size of mempool did not change + } + + txmp.logger.Debug( + "existing transaction no longer valid; failed re-CheckTx callback", + "priority", wtx.priority, + "tx", fmt.Sprintf("%X", wtx.key), + "err", err, + "code", checkTxRes.Code, + ) + txmp.removeTxByElement(elt) + txmp.metrics.FailedTxs.Add(1) + txmp.metrics.Size.Set(float64(txmp.Size())) +} + +// recheckTransactions initiates re-CheckTx ABCI calls for all the transactions +// currently in the mempool. It reports the number of recheck calls that were +// successfully initiated. +// +// Precondition: The mempool is not empty. +// The caller must hold txmp.mtx exclusively. +func (txmp *TxMempool) recheckTransactions() { + if txmp.Size() == 0 { + panic("mempool: cannot run recheck on an empty mempool") + } + txmp.logger.Debug( + "executing re-CheckTx for all remaining transactions", + "num_txs", txmp.Size(), + "height", txmp.height, + ) + + // Collect transactions currently in the mempool requiring recheck. + wtxs := make([]*WrappedTx, 0, txmp.txs.Len()) + for e := txmp.txs.Front(); e != nil; e = e.Next() { + wtxs = append(wtxs, e.Value.(*WrappedTx)) + } + + // Issue CheckTx calls for each remaining transaction, and when all the + // rechecks are complete signal watchers that transactions may be available. + go func() { + g, start := taskgroup.New(nil).Limit(2 * runtime.NumCPU()) + + for _, wtx := range wtxs { + wtx := wtx + start(func() error { + // The response for this CheckTx is handled by the default recheckTxCallback. + rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{ + Tx: wtx.tx, + Type: abci.CheckTxType_Recheck, + }) + if err != nil { + txmp.logger.Error("failed to execute CheckTx during recheck", + "err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash())) + } else { + txmp.handleRecheckResult(wtx.tx, rsp) + } + return nil + }) + } + _ = txmp.proxyAppConn.FlushAsync() + + // When recheck is complete, trigger a notification for more transactions. + _ = g.Wait() + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + txmp.notifyTxsAvailable() + }() +} + +// canAddTx returns an error if we cannot insert the provided *WrappedTx into +// the mempool due to mempool configured constraints. Otherwise, nil is +// returned and the transaction can be inserted into the mempool. +func (txmp *TxMempool) canAddTx(wtx *WrappedTx) bool { + numTxs := txmp.Size() + txBytes := txmp.SizeBytes() + + if numTxs >= txmp.config.Size || wtx.Size()+txBytes > txmp.config.MaxTxsBytes { + return false + } + + return true +} + +// purgeExpiredTxs removes all transactions from the mempool that have exceeded +// their respective height or time-based limits as of the given blockHeight. +// Transactions removed by this operation are not removed from the rejectedTxCache. +// +// The caller must hold txmp.mtx exclusively. +func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { + if txmp.config.TTLNumBlocks == 0 && txmp.config.TTLDuration == 0 { + return // nothing to do + } + + now := time.Now() + cur := txmp.txs.Front() + for cur != nil { + // N.B. Grab the next element first, since if we remove cur its successor + // will be invalidated. + next := cur.Next() + + w := cur.Value.(*WrappedTx) + if txmp.config.TTLNumBlocks > 0 && (blockHeight-w.height) > txmp.config.TTLNumBlocks { + txmp.removeTxByElement(cur) + txmp.metrics.EvictedTxs.Add(1) + } else if txmp.config.TTLDuration > 0 && now.Sub(w.timestamp) > txmp.config.TTLDuration { + txmp.removeTxByElement(cur) + txmp.metrics.EvictedTxs.Add(1) + } + cur = next + } + + // purge old evicted transactions + if txmp.config.TTLDuration > 0 { + limit := now.Add(-txmp.config.TTLDuration) + txmp.evictedTxs.Prune(limit) + } +} + +func (txmp *TxMempool) notifyTxsAvailable() { + if txmp.Size() == 0 { + return // nothing to do + } + + if txmp.txsAvailable != nil && !txmp.notifiedTxsAvailable { + // channel cap is 1, so this will send once + txmp.notifiedTxsAvailable = true + + select { + case txmp.txsAvailable <- struct{}{}: + default: + } + } +} diff --git a/mempool/cat/mempool_bench_test.go b/mempool/cat/mempool_bench_test.go new file mode 100644 index 0000000000..199947dd12 --- /dev/null +++ b/mempool/cat/mempool_bench_test.go @@ -0,0 +1,32 @@ +package cat + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/mempool" +) + +func BenchmarkTxMempool_CheckTx(b *testing.B) { + txmp := setup(b, 10000) + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + b.StopTimer() + prefix := make([]byte, 20) + _, err := rng.Read(prefix) + require.NoError(b, err) + + priority := int64(rng.Intn(9999-1000) + 1000) + tx := []byte(fmt.Sprintf("%X=%d", prefix, priority)) + b.StartTimer() + + require.NoError(b, txmp.CheckTx(tx, nil, mempool.TxInfo{})) + } +} diff --git a/mempool/cat/mempool_test.go b/mempool/cat/mempool_test.go new file mode 100644 index 0000000000..75b407c232 --- /dev/null +++ b/mempool/cat/mempool_test.go @@ -0,0 +1,688 @@ +package cat + +import ( + "bytes" + "errors" + "fmt" + "math/rand" + "os" + "sort" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/abci/example/code" + "github.com/tendermint/tendermint/abci/example/kvstore" + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/mempool" + "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/types" +) + +// application extends the KV store application by overriding CheckTx to provide +// transaction priority based on the value in the key/value pair. +type application struct { + *kvstore.Application +} + +type testTx struct { + tx types.Tx + priority int64 +} + +func (app *application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { + var ( + priority int64 + sender string + ) + + // infer the priority from the raw transaction value (sender=key=value) + parts := bytes.Split(req.Tx, []byte("=")) + if len(parts) == 3 { + v, err := strconv.ParseInt(string(parts[2]), 10, 64) + if err != nil { + return abci.ResponseCheckTx{ + Priority: priority, + Code: 100, + GasWanted: 1, + } + } + + priority = v + sender = string(parts[0]) + } else { + return abci.ResponseCheckTx{ + Priority: priority, + Code: 101, + GasWanted: 1, + } + } + + return abci.ResponseCheckTx{ + Priority: priority, + Sender: sender, + Code: code.CodeTypeOK, + GasWanted: 1, + } +} + +func setup(t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool { + t.Helper() + + app := &application{kvstore.NewApplication()} + cc := proxy.NewLocalClientCreator(app) + + cfg := config.ResetTestRoot(strings.ReplaceAll(t.Name(), "/", "|")) + cfg.Mempool.CacheSize = cacheSize + + appConnMem, err := cc.NewABCIClient() + require.NoError(t, err) + require.NoError(t, appConnMem.Start()) + + t.Cleanup(func() { + os.RemoveAll(cfg.RootDir) + require.NoError(t, appConnMem.Stop()) + }) + + return NewTxMempool(log.TestingLogger().With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...) +} + +// mustCheckTx invokes txmp.CheckTx for the given transaction and waits until +// its callback has finished executing. It fails t if CheckTx fails. +func mustCheckTx(t *testing.T, txmp *TxMempool, spec string) { + require.NoError(t, txmp.CheckTx([]byte(spec), nil, mempool.TxInfo{})) +} + +func checkTxs(t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx { + txs := make([]testTx, numTxs) + txInfo := mempool.TxInfo{SenderID: peerID} + + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + + for i := 0; i < numTxs; i++ { + prefix := make([]byte, 20) + _, err := rng.Read(prefix) + require.NoError(t, err) + + priority := int64(rng.Intn(9999-1000) + 1000) + + txs[i] = testTx{ + tx: []byte(fmt.Sprintf("sender-%d-%d=%X=%d", i, peerID, prefix, priority)), + priority: priority, + } + require.NoError(t, txmp.CheckTx(txs[i].tx, nil, txInfo)) + } + + return txs +} + +func TestTxMempool_TxsAvailable(t *testing.T) { + txmp := setup(t, 0) + txmp.EnableTxsAvailable() + + ensureNoTxFire := func() { + timer := time.NewTimer(500 * time.Millisecond) + select { + case <-txmp.TxsAvailable(): + require.Fail(t, "unexpected transactions event") + case <-timer.C: + } + } + + ensureTxFire := func() { + timer := time.NewTimer(500 * time.Millisecond) + select { + case <-txmp.TxsAvailable(): + case <-timer.C: + require.Fail(t, "expected transactions event") + } + } + + // ensure no event as we have not executed any transactions yet + ensureNoTxFire() + + // Execute CheckTx for some transactions and ensure TxsAvailable only fires + // once. + txs := checkTxs(t, txmp, 100, 0) + ensureTxFire() + ensureNoTxFire() + + rawTxs := make([]types.Tx, len(txs)) + for i, tx := range txs { + rawTxs[i] = tx.tx + } + + responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50])) + for i := 0; i < len(responses); i++ { + responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} + } + + // commit half the transactions and ensure we fire an event + txmp.Lock() + require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil)) + txmp.Unlock() + ensureTxFire() + ensureNoTxFire() + + // Execute CheckTx for more transactions and ensure we do not fire another + // event as we're still on the same height (1). + _ = checkTxs(t, txmp, 100, 0) + ensureNoTxFire() +} + +func TestTxMempool_Size(t *testing.T) { + txmp := setup(t, 0) + txs := checkTxs(t, txmp, 100, 0) + require.Equal(t, len(txs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + + rawTxs := make([]types.Tx, len(txs)) + for i, tx := range txs { + rawTxs[i] = tx.tx + } + + responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50])) + for i := 0; i < len(responses); i++ { + responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} + } + + txmp.Lock() + require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil)) + txmp.Unlock() + + require.Equal(t, len(rawTxs)/2, txmp.Size()) + require.Equal(t, int64(2850), txmp.SizeBytes()) +} + +func TestTxMempool_Eviction(t *testing.T) { + txmp := setup(t, 1000) + txmp.config.Size = 5 + txmp.config.MaxTxsBytes = 60 + txExists := func(spec string) bool { + txmp.Lock() + defer txmp.Unlock() + key := types.Tx(spec).Key() + _, ok := txmp.txByKey[key] + return ok + } + + txEvicted := func(spec string) bool { + return txmp.evictedTxs.Has(types.Tx(spec).Key()) + } + + // A transaction bigger than the mempool should be rejected even when there + // are slots available. + err := txmp.CheckTx(types.Tx("big=0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef=1"), nil, mempool.TxInfo{}) + require.Error(t, err) + require.Contains(t, err.Error(), "mempool is full") + require.Equal(t, 0, txmp.Size()) + + // Nearly-fill the mempool with a low-priority transaction, to show that it + // is evicted even when slots are available for a higher-priority tx. + const bigTx = "big=0123456789abcdef0123456789abcdef0123456789abcdef01234=2" + mustCheckTx(t, txmp, bigTx) + require.Equal(t, 1, txmp.Size()) // bigTx is the only element + require.True(t, txExists(bigTx)) + require.Equal(t, int64(len(bigTx)), txmp.SizeBytes()) + + // The next transaction should evict bigTx, because it is higher priority + // but does not fit on size. + mustCheckTx(t, txmp, "key1=0000=25") + require.True(t, txExists("key1=0000=25")) + require.False(t, txExists(bigTx)) + require.True(t, txEvicted(bigTx)) + require.Equal(t, int64(len("key1=0000=25")), txmp.SizeBytes()) + + // Now fill up the rest of the slots with other transactions. + mustCheckTx(t, txmp, "key2=0001=5") + mustCheckTx(t, txmp, "key3=0002=10") + mustCheckTx(t, txmp, "key4=0003=3") + mustCheckTx(t, txmp, "key5=0004=3") + + // A new transaction with low priority should be discarded. + err = txmp.CheckTx(types.Tx("key6=0005=1"), nil, mempool.TxInfo{}) + require.Error(t, err) + require.Contains(t, err.Error(), "mempool is full") + require.False(t, txExists("key6=0005=1")) + // transactions instantly evicted should still be cached + require.True(t, txEvicted("key6=0005=1")) + + // A new transaction with higher priority should evict key5, which is the + // newest of the two transactions with lowest priority. + mustCheckTx(t, txmp, "key7=0006=7") + require.True(t, txExists("key7=0006=7")) // new transaction added + require.False(t, txExists("key5=0004=3")) // newest low-priority tx evicted + require.True(t, txExists("key4=0003=3")) // older low-priority tx retained + + // Another new transaction evicts the other low-priority element. + mustCheckTx(t, txmp, "key8=0007=20") + require.True(t, txExists("key8=0007=20")) + require.False(t, txExists("key4=0003=3")) + + // Now the lowest-priority tx is 5, so that should be the next to go. + mustCheckTx(t, txmp, "key9=0008=9") + require.True(t, txExists("key9=0008=9")) + require.False(t, txExists("k3y2=0001=5")) + + // Add a transaction that requires eviction of multiple lower-priority + // entries, in order to fit the size of the element. + mustCheckTx(t, txmp, "key10=0123456789abcdef=11") // evict 10, 9, 7; keep 25, 20, 11 + require.True(t, txExists("key1=0000=25")) + require.True(t, txExists("key8=0007=20")) + require.True(t, txExists("key10=0123456789abcdef=11")) + require.False(t, txExists("key3=0002=10")) + require.False(t, txExists("key9=0008=9")) + require.False(t, txExists("key7=0006=7")) + + // Free up some space so we can add back previously evicted txs + err = txmp.Update(1, types.Txs{types.Tx("key10=0123456789abcdef=11")}, []*abci.ResponseDeliverTx{{Code: abci.CodeTypeOK}}, nil, nil) + require.NoError(t, err) + require.False(t, txExists("key10=0123456789abcdef=11")) + mustCheckTx(t, txmp, "key3=0002=10") + require.True(t, txExists("key3=0002=10")) +} + +func TestTxMempool_Flush(t *testing.T) { + txmp := setup(t, 0) + txs := checkTxs(t, txmp, 100, 0) + require.Equal(t, len(txs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + + rawTxs := make([]types.Tx, len(txs)) + for i, tx := range txs { + rawTxs[i] = tx.tx + } + + responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50])) + for i := 0; i < len(responses); i++ { + responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} + } + + txmp.Lock() + require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil)) + txmp.Unlock() + + txmp.Flush() + require.Zero(t, txmp.Size()) + require.Equal(t, int64(0), txmp.SizeBytes()) +} + +func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) { + txmp := setup(t, 0) + tTxs := checkTxs(t, txmp, 100, 0) // all txs request 1 gas unit + require.Equal(t, len(tTxs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + + txMap := make(map[types.TxKey]testTx) + priorities := make([]int64, len(tTxs)) + for i, tTx := range tTxs { + txMap[tTx.tx.Key()] = tTx + priorities[i] = tTx.priority + } + + sort.Slice(priorities, func(i, j int) bool { + // sort by priority, i.e. decreasing order + return priorities[i] > priorities[j] + }) + + ensurePrioritized := func(reapedTxs types.Txs) { + reapedPriorities := make([]int64, len(reapedTxs)) + for i, rTx := range reapedTxs { + reapedPriorities[i] = txMap[rTx.Key()].priority + } + + require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities) + } + + // reap by gas capacity only + reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50) + ensurePrioritized(reapedTxs) + require.Equal(t, len(tTxs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + require.Len(t, reapedTxs, 50) + + // reap by transaction bytes only + reapedTxs = txmp.ReapMaxBytesMaxGas(1000, -1) + ensurePrioritized(reapedTxs) + require.Equal(t, len(tTxs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + require.GreaterOrEqual(t, len(reapedTxs), 16) + + // Reap by both transaction bytes and gas, where the size yields 31 reaped + // transactions and the gas limit reaps 25 transactions. + reapedTxs = txmp.ReapMaxBytesMaxGas(1500, 30) + ensurePrioritized(reapedTxs) + require.Equal(t, len(tTxs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + require.Len(t, reapedTxs, 25) +} + +func TestTxMempool_ReapMaxTxs(t *testing.T) { + txmp := setup(t, 0) + tTxs := checkTxs(t, txmp, 100, 0) + require.Equal(t, len(tTxs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + + txMap := make(map[types.TxKey]testTx) + priorities := make([]int64, len(tTxs)) + for i, tTx := range tTxs { + txMap[tTx.tx.Key()] = tTx + priorities[i] = tTx.priority + } + + sort.Slice(priorities, func(i, j int) bool { + // sort by priority, i.e. decreasing order + return priorities[i] > priorities[j] + }) + + ensurePrioritized := func(reapedTxs types.Txs) { + reapedPriorities := make([]int64, len(reapedTxs)) + for i, rTx := range reapedTxs { + reapedPriorities[i] = txMap[rTx.Key()].priority + } + + require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities) + } + + // reap all transactions + reapedTxs := txmp.ReapMaxTxs(-1) + ensurePrioritized(reapedTxs) + require.Equal(t, len(tTxs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + require.Len(t, reapedTxs, len(tTxs)) + + // reap a single transaction + reapedTxs = txmp.ReapMaxTxs(1) + ensurePrioritized(reapedTxs) + require.Equal(t, len(tTxs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + require.Len(t, reapedTxs, 1) + + // reap half of the transactions + reapedTxs = txmp.ReapMaxTxs(len(tTxs) / 2) + ensurePrioritized(reapedTxs) + require.Equal(t, len(tTxs), txmp.Size()) + require.Equal(t, int64(5690), txmp.SizeBytes()) + require.Len(t, reapedTxs, len(tTxs)/2) +} + +func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) { + txmp := setup(t, 0) + + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + tx := make([]byte, txmp.config.MaxTxBytes+1) + _, err := rng.Read(tx) + require.NoError(t, err) + + err = txmp.CheckTx(tx, nil, mempool.TxInfo{SenderID: 0}) + require.Equal(t, mempool.ErrTxTooLarge{Max: txmp.config.MaxTxBytes, Actual: len(tx)}, err) + + tx = make([]byte, txmp.config.MaxTxBytes-1) + _, err = rng.Read(tx) + require.NoError(t, err) + + err = txmp.CheckTx(tx, nil, mempool.TxInfo{SenderID: 0}) + require.NotEqual(t, mempool.ErrTxTooLarge{Max: txmp.config.MaxTxBytes, Actual: len(tx)}, err) +} + +func TestTxMempool_CheckTxSamePeer(t *testing.T) { + txmp := setup(t, 100) + peerID := uint16(1) + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + + prefix := make([]byte, 20) + _, err := rng.Read(prefix) + require.NoError(t, err) + + tx := []byte(fmt.Sprintf("sender-0=%X=%d", prefix, 50)) + + require.NoError(t, txmp.CheckTx(tx, nil, mempool.TxInfo{SenderID: peerID})) + require.Error(t, txmp.CheckTx(tx, nil, mempool.TxInfo{SenderID: peerID})) +} + +func TestTxMempool_ConcurrentTxs(t *testing.T) { + txmp := setup(t, 100) + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + checkTxDone := make(chan struct{}) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + for i := 0; i < 20; i++ { + _ = checkTxs(t, txmp, 100, 0) + dur := rng.Intn(1000-500) + 500 + time.Sleep(time.Duration(dur) * time.Millisecond) + } + + wg.Done() + close(checkTxDone) + }() + + wg.Add(1) + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + defer wg.Done() + + var height int64 = 1 + + for range ticker.C { + reapedTxs := txmp.ReapMaxTxs(200) + if len(reapedTxs) > 0 { + responses := make([]*abci.ResponseDeliverTx, len(reapedTxs)) + for i := 0; i < len(responses); i++ { + var code uint32 + + if i%10 == 0 { + code = 100 + } else { + code = abci.CodeTypeOK + } + + responses[i] = &abci.ResponseDeliverTx{Code: code} + } + + txmp.Lock() + require.NoError(t, txmp.Update(height, reapedTxs, responses, nil, nil)) + txmp.Unlock() + + height++ + } else { + // only return once we know we finished the CheckTx loop + select { + case <-checkTxDone: + return + default: + } + } + } + }() + + wg.Wait() + require.Zero(t, txmp.Size()) + require.Zero(t, txmp.SizeBytes()) +} + +func TestTxMempool_ExpiredTxs_Timestamp(t *testing.T) { + txmp := setup(t, 5000) + txmp.config.TTLDuration = 5 * time.Millisecond + + added1 := checkTxs(t, txmp, 10, 0) + require.Equal(t, len(added1), txmp.Size()) + + // Wait a while, then add some more transactions that should not be expired + // when the first batch TTLs out. + // + // ms: 0 1 2 3 4 5 6 + // ^ ^ ^ ^ + // | | | +-- Update (triggers pruning) + // | | +------ first batch expires + // | +-------------- second batch added + // +-------------------------- first batch added + // + // The exact intervals are not important except that the delta should be + // large relative to the cost of CheckTx (ms vs. ns is fine here). + time.Sleep(3 * time.Millisecond) + added2 := checkTxs(t, txmp, 10, 1) + + // Wait a while longer, so that the first batch will expire. + time.Sleep(3 * time.Millisecond) + + // Trigger an update so that pruning will occur. + txmp.Lock() + defer txmp.Unlock() + require.NoError(t, txmp.Update(txmp.height+1, nil, nil, nil, nil)) + + // All the transactions in the original set should have been purged. + for _, tx := range added1 { + if _, ok := txmp.txByKey[tx.tx.Key()]; ok { + t.Errorf("Transaction %X should have been purged for TTL", tx.tx.Key()) + } + if txmp.rejectedTxCache.Has(tx.tx.Key()) { + t.Errorf("Transaction %X should have been removed from the cache", tx.tx.Key()) + } + } + + // All the transactions added later should still be around. + for _, tx := range added2 { + if _, ok := txmp.txByKey[tx.tx.Key()]; !ok { + t.Errorf("Transaction %X should still be in the mempool, but is not", tx.tx.Key()) + } + } +} + +func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) { + txmp := setup(t, 500) + txmp.height = 100 + txmp.config.TTLNumBlocks = 10 + + tTxs := checkTxs(t, txmp, 100, 0) + require.Equal(t, len(tTxs), txmp.Size()) + + // reap 5 txs at the next height -- no txs should expire + reapedTxs := txmp.ReapMaxTxs(5) + responses := make([]*abci.ResponseDeliverTx, len(reapedTxs)) + for i := 0; i < len(responses); i++ { + responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} + } + + txmp.Lock() + require.NoError(t, txmp.Update(txmp.height+1, reapedTxs, responses, nil, nil)) + txmp.Unlock() + + require.Equal(t, 95, txmp.Size()) + + // check more txs at height 101 + _ = checkTxs(t, txmp, 50, 1) + require.Equal(t, 145, txmp.Size()) + + // Reap 5 txs at a height that would expire all the transactions from before + // the previous Update (height 100). + // + // NOTE: When we reap txs below, we do not know if we're picking txs from the + // initial CheckTx calls or from the second round of CheckTx calls. Thus, we + // cannot guarantee that all 95 txs are remaining that should be expired and + // removed. However, we do know that that at most 95 txs can be expired and + // removed. + reapedTxs = txmp.ReapMaxTxs(5) + responses = make([]*abci.ResponseDeliverTx, len(reapedTxs)) + for i := 0; i < len(responses); i++ { + responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} + } + + txmp.Lock() + require.NoError(t, txmp.Update(txmp.height+10, reapedTxs, responses, nil, nil)) + txmp.Unlock() + + require.GreaterOrEqual(t, txmp.Size(), 45) +} + +func TestTxMempool_CheckTxPostCheckError(t *testing.T) { + cases := []struct { + name string + err error + }{ + { + name: "error", + err: errors.New("test error"), + }, + { + name: "no error", + err: nil, + }, + } + for _, tc := range cases { + testCase := tc + t.Run(testCase.name, func(t *testing.T) { + postCheckFn := func(_ types.Tx, _ *abci.ResponseCheckTx) error { + return testCase.err + } + txmp := setup(t, 0, WithPostCheck(postCheckFn)) + tx := []byte("sender=0000=1") + err := txmp.CheckTx(tx, nil, mempool.TxInfo{SenderID: 0}) + require.True(t, errors.Is(err, testCase.err)) + }) + } +} + +func TestSeenTx(t *testing.T) { + txmp := setup(t, 500) + tx := types.Tx("sender=0000=1") + + // mark a few peers as already having seen a tx + txmp.PeerHasTx(1, tx.Key()) + txmp.PeerHasTx(2, tx.Key()) + + // now add the transaction + err := txmp.CheckTx(tx, nil, mempool.TxInfo{SenderID: 3}) + require.NoError(t, err) + require.True(t, txmp.Has(tx.Key())) + + txmp.Lock() + el := txmp.txByKey[tx.Key()] + txmp.Unlock() + + wtx := el.Value.(*WrappedTx) + require.True(t, wtx.peers[1]) + require.True(t, wtx.peers[2]) + require.True(t, wtx.peers[3]) +} + +func TestConcurrentlyAddingTx(t *testing.T) { + txmp := setup(t, 500) + tx := types.Tx("sender=0000=1") + + numTxs := 10 + errCh := make(chan error, numTxs) + wg := &sync.WaitGroup{} + for i := 0; i < numTxs; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, err := txmp.TryAddNewTx(tx, tx.Key(), mempool.TxInfo{SenderID: uint16(i + 1)}) + errCh <- err + }() + } + go func() { + wg.Wait() + close(errCh) + }() + + errCount := 0 + expErr := errors.New("tx already added") + for err := range errCh { + fmt.Println("received error") + if err != nil { + require.Equal(t, expErr, err) + errCount++ + } + } + require.Equal(t, numTxs-1, errCount) +} diff --git a/mempool/cat/reactor.go b/mempool/cat/reactor.go new file mode 100644 index 0000000000..759fed74f4 --- /dev/null +++ b/mempool/cat/reactor.go @@ -0,0 +1,361 @@ +package cat + +import ( + "fmt" + "time" + + "github.com/gogo/protobuf/proto" + + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/clist" + "github.com/tendermint/tendermint/libs/log" + tmsync "github.com/tendermint/tendermint/libs/sync" + "github.com/tendermint/tendermint/mempool" + "github.com/tendermint/tendermint/p2p" + protomem "github.com/tendermint/tendermint/proto/tendermint/mempool" + "github.com/tendermint/tendermint/types" +) + +// Reactor handles mempool tx broadcasting amongst peers. +// It maintains a map from peer ID to counter, to prevent gossiping txs to the +// peers you received it from. +type Reactor struct { + p2p.BaseReactor + config *cfg.MempoolConfig + mempool *TxMempool + ids *mempoolIDs +} + +type mempoolIDs struct { + mtx tmsync.RWMutex + peerMap map[p2p.ID]uint16 + nextID uint16 // assumes that a node will never have over 65536 active peers + activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter +} + +// Reserve searches for the next unused ID and assigns it to the +// peer. +func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) { + ids.mtx.Lock() + defer ids.mtx.Unlock() + + curID := ids.nextPeerID() + ids.peerMap[peer.ID()] = curID + ids.activeIDs[curID] = struct{}{} +} + +// nextPeerID returns the next unused peer ID to use. +// This assumes that ids's mutex is already locked. +func (ids *mempoolIDs) nextPeerID() uint16 { + if len(ids.activeIDs) == mempool.MaxActiveIDs { + panic(fmt.Sprintf("node has maximum %d active IDs and wanted to get one more", mempool.MaxActiveIDs)) + } + + _, idExists := ids.activeIDs[ids.nextID] + for idExists { + ids.nextID++ + _, idExists = ids.activeIDs[ids.nextID] + } + curID := ids.nextID + ids.nextID++ + return curID +} + +// Reclaim returns the ID reserved for the peer back to unused pool. +func (ids *mempoolIDs) Reclaim(peer p2p.Peer) { + ids.mtx.Lock() + defer ids.mtx.Unlock() + + removedID, ok := ids.peerMap[peer.ID()] + if ok { + delete(ids.activeIDs, removedID) + delete(ids.peerMap, peer.ID()) + } +} + +// GetForPeer returns an ID reserved for the peer. +func (ids *mempoolIDs) GetForPeer(peer p2p.Peer) uint16 { + ids.mtx.RLock() + defer ids.mtx.RUnlock() + + return ids.peerMap[peer.ID()] +} + +func newMempoolIDs() *mempoolIDs { + return &mempoolIDs{ + peerMap: make(map[p2p.ID]uint16), + activeIDs: map[uint16]struct{}{0: {}}, + nextID: 1, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx + } +} + +// NewReactor returns a new Reactor with the given config and mempool. +func NewReactor(config *cfg.MempoolConfig, mempool *TxMempool) *Reactor { + memR := &Reactor{ + config: config, + mempool: mempool, + ids: newMempoolIDs(), + } + memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR) + return memR +} + +// InitPeer implements Reactor by creating a state for the peer. +func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer { + memR.ids.ReserveForPeer(peer) + return peer +} + +// SetLogger sets the Logger on the reactor and the underlying mempool. +func (memR *Reactor) SetLogger(l log.Logger) { + memR.Logger = l +} + +// OnStart implements p2p.BaseReactor. +func (memR *Reactor) OnStart() error { + if !memR.config.Broadcast { + memR.Logger.Info("Tx broadcasting is disabled") + } + return nil +} + +// GetChannels implements Reactor by returning the list of channels for this +// reactor. +func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { + largestTx := make([]byte, memR.config.MaxTxBytes) + batchMsg := protomem.Message{ + Sum: &protomem.Message_Txs{ + Txs: &protomem.Txs{Txs: [][]byte{largestTx}}, + }, + } + + return []*p2p.ChannelDescriptor{ + { + ID: mempool.MempoolChannel, + Priority: 5, + RecvMessageCapacity: batchMsg.Size(), + }, + } +} + +// AddPeer implements Reactor. +// It starts a broadcast routine ensuring all txs are forwarded to the given peer. +func (memR *Reactor) AddPeer(peer p2p.Peer) { + if memR.config.Broadcast { + memR.sendAllTxKeys(peer) + go memR.broadcastTxRoutine(peer) + } +} + +// RemovePeer implements Reactor. +func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { + memR.ids.Reclaim(peer) + // broadcast routine checks if peer is gone and returns +} + +// Receive implements Reactor. +// It adds any received transactions to the mempool. +func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { + msg := &protomem.Message{} + err := proto.Unmarshal(msgBytes, msg) + if err != nil { + panic(err) + } + memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", fmt.Sprintf("%T", msg)) + switch m := msg.Sum.(type) { + case *protomem.Message_Txs: + protoTxs := m.Txs.Txs + if len(protoTxs) == 0 { + memR.Logger.Error("received tmpty txs from peer", "src", src) + return + } + txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(src)} + if src != nil { + txInfo.SenderP2PID = src.ID() + } + + var err error + for _, tx := range protoTxs { + ntx := types.Tx(tx) + key := ntx.Key() + if memR.mempool.IsRejectedTx(key) { + // The peer has sent us a transaction that we have already rejected. Since `CheckTx` can + // be non-deterministic, we don't punish the peer but instead just ignore the msg + continue + } + if memR.mempool.WasRecentlyEvicted(key) { + // the transaction was recently evicted. If true, we attempt to re-add it to the mempool + // skipping check tx. + err := memR.mempool.TryReinsertEvictedTx(key, ntx, txInfo.SenderID) + if err != nil { + memR.Logger.Info("Unable to readd evicted tx", "tx_key", key, "err", err) + } + continue + } + if memR.mempool.Has(key) { + // We have already received this transaction. We mark the peer that send the message + // as already seeing the transaction as well and then we finish + memR.mempool.PeerHasTx(txInfo.SenderID, key) + continue + } + memR.broadcastSeenTx(key) + _, err = memR.mempool.TryAddNewTx(ntx, key, txInfo) + if err != nil { + memR.Logger.Info("Could not add tx", "tx_key", key, "err", err) + } + } + case *protomem.Message_SeenTx: + if len(m.SeenTx.TxKey) != types.TxKeySize { + memR.Logger.Error("Peer sent SeenTx with incorrect key size", "len", len(m.SeenTx.TxKey)) + return + } + var txKey [types.TxKeySize]byte + copy(txKey[:], m.SeenTx.TxKey) + memR.mempool.PeerHasTx(memR.ids.GetForPeer(src), types.TxKey(txKey)) + + default: + memR.Logger.Error("unknown message type", "src", src, "chId", chID, "msg", fmt.Sprintf("%T", msg)) + memR.Switch.StopPeerForError(src, fmt.Errorf("mempool cannot handle message of type: %T", msg)) + return + } + + // broadcasting happens from go routines per peer +} + +// PeerState describes the state of a peer. +type PeerState interface { + GetHeight() int64 +} + +// Send new mempool txs to peer. +func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { + peerID := memR.ids.GetForPeer(peer) + var next *clist.CElement + + for { + // In case of both next.NextWaitChan() and peer.Quit() are variable at the same time + if !memR.IsRunning() || !peer.IsRunning() { + return + } + + // This happens because the CElement we were looking at got garbage + // collected (removed). That is, .NextWait() returned nil. Go ahead and + // start from the beginning. + if next == nil { + select { + case <-memR.mempool.TxsWaitChan(): // Wait until a tx is available + if next = memR.mempool.TxsFront(); next == nil { + continue + } + + case <-peer.Quit(): + return + + case <-memR.Quit(): + return + } + } + + // Make sure the peer is up to date. + peerState, ok := peer.Get(types.PeerStateKey).(PeerState) + if !ok { + // Peer does not have a state yet. We set it in the consensus reactor, but + // when we add peer in Switch, the order we call reactors#AddPeer is + // different every time due to us using a map. Sometimes other reactors + // will be initialized before the consensus reactor. We should wait a few + // milliseconds and retry. + time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond) + continue + } + + // Allow for a lag of 1 block. + memTx := next.Value.(*WrappedTx) + if peerState.GetHeight() < memTx.height-1 { + time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond) + continue + } + + // NOTE: Transaction batching was disabled due to + // https://github.com/tendermint/tendermint/issues/5796 + if !memTx.HasPeer(peerID) { + msg := protomem.Message{ + Sum: &protomem.Message_Txs{ + Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}}, + }, + } + + bz, err := msg.Marshal() + if err != nil { + panic(err) + } + + success := peer.Send(mempool.MempoolChannel, bz) + if !success { + time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond) + continue + } + } + + select { + case <-next.NextWaitChan(): + // see the start of the for loop for nil check + next = next.Next() + + case <-peer.Quit(): + return + + case <-memR.Quit(): + return + } + } +} + +func (memR *Reactor) broadcastSeenTx(txKey types.TxKey) { + memR.Logger.Debug( + "broadcasting seen tx", + "tx_key", txKey, + ) + msg := protomem.Message{ + Sum: &protomem.Message_SeenTx{ + SeenTx: &protomem.SeenTx{TxKey: txKey[:]}, + }, + } + bz, err := msg.Marshal() + if err != nil { + panic(err) + } + memR.Switch.Broadcast(mempool.MempoolChannel, bz) +} + +// sendAllTxKeys loops through all txs currently in the mempool and iteratively +// sends a `SeenTx` message to the peer. This is added to a queue and will block +// when the queue becomes full. +func (memR *Reactor) sendAllTxKeys(peer p2p.Peer) { + txKeys := memR.mempool.GetAllTxKeys() + for _, txKey := range txKeys { + msg := protomem.Message{ + Sum: &protomem.Message_SeenTx{ + SeenTx: &protomem.SeenTx{TxKey: txKey[:]}, + }, + } + bz, err := msg.Marshal() + if err != nil { + panic(err) + } + + peer.Send(mempool.MempoolChannel, bz) + } +} + +//----------------------------------------------------------------------------- +// Messages + +// TxsMessage is a Message containing transactions. +type TxsMessage struct { + Txs []types.Tx +} + +// String returns a string representation of the TxsMessage. +func (m *TxsMessage) String() string { + return fmt.Sprintf("[TxsMessage %v]", m.Txs) +} diff --git a/mempool/cat/reactor_test.go b/mempool/cat/reactor_test.go new file mode 100644 index 0000000000..2ecf7f1a5c --- /dev/null +++ b/mempool/cat/reactor_test.go @@ -0,0 +1,266 @@ +package cat + +import ( + "encoding/hex" + "os" + "sync" + "testing" + "time" + + "github.com/go-kit/log/term" + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/abci/example/kvstore" + "github.com/tendermint/tendermint/crypto/ed25519" + p2pmock "github.com/tendermint/tendermint/p2p/mock" + + cfg "github.com/tendermint/tendermint/config" + + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/mempool" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/mocks" + protomem "github.com/tendermint/tendermint/proto/tendermint/mempool" + "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/types" +) + +const ( + numTxs = 1000 + timeout = 120 * time.Second // ridiculously high because CircleCI is slow +) + +type peerState struct { + height int64 +} + +func (ps peerState) GetHeight() int64 { + return ps.height +} + +// Send a bunch of txs to the first reactor's mempool and wait for them all to +// be received in the others. +func TestReactorBroadcastTxsMessage(t *testing.T) { + config := cfg.TestConfig() + // if there were more than two reactors, the order of transactions could not be + // asserted in waitForTxsOnReactors (due to transactions gossiping). If we + // replace Connect2Switches (full mesh) with a func, which connects first + // reactor to others and nothing else, this test should also pass with >2 reactors. + const N = 2 + reactors := makeAndConnectReactors(config, N) + defer func() { + for _, r := range reactors { + if err := r.Stop(); err != nil { + assert.NoError(t, err) + } + } + }() + for _, r := range reactors { + for _, peer := range r.Switch.Peers().List() { + peer.Set(types.PeerStateKey, peerState{1}) + } + } + + txs := checkTxs(t, reactors[0].mempool, numTxs, mempool.UnknownPeerID) + transactions := make(types.Txs, len(txs)) + for idx, tx := range txs { + transactions[idx] = tx.tx + } + + waitForTxsOnReactors(t, transactions, reactors) +} + +func TestReactorSendSeenTxOnConnection(t *testing.T) { + app := kvstore.NewApplication() + cc := proxy.NewLocalClientCreator(app) + pool, cleanup := newMempoolWithApp(cc) + t.Cleanup(cleanup) + reactor := NewReactor(cfg.TestConfig().Mempool, pool) + + tx1 := types.Tx("hello") + key1 := tx1.Key() + msg1 := &protomem.Message{ + Sum: &protomem.Message_SeenTx{ + SeenTx: &protomem.SeenTx{TxKey: key1[:]}, + }, + } + msgBytes1, err := proto.Marshal(msg1) + require.NoError(t, err) + tx2 := types.Tx("world") + key2 := tx2.Key() + msg2 := &protomem.Message{ + Sum: &protomem.Message_SeenTx{ + SeenTx: &protomem.SeenTx{TxKey: key2[:]}, + }, + } + msgBytes2, err := proto.Marshal(msg2) + require.NoError(t, err) + + peer := &mocks.Peer{} + nodeKey := p2p.NodeKey{PrivKey: ed25519.GenPrivKey()} + peer.On("ID").Return(nodeKey.ID()) + peer.On("SendEnvelope", mempool.MempoolChannel, msgBytes1).Return(true) + peer.On("SendEnvelope", mempool.MempoolChannel, msgBytes2).Return(true) + peer.On("SendEnvelope", mempool.MempoolChannel, mock.AnythingOfType("[]byte")).Maybe() + + pool.CheckTx(tx1, nil, mempool.TxInfo{}) + pool.CheckTx(tx2, nil, mempool.TxInfo{}) + + reactor.InitPeer(peer) + reactor.AddPeer(peer) + + peer.AssertExpectations(t) +} + +func TestMempoolVectors(t *testing.T) { + testCases := []struct { + testName string + tx []byte + expBytes string + }{ + {"tx 1", []byte{123}, "0a030a017b"}, + {"tx 2", []byte("proto encoding in mempool"), "0a1b0a1970726f746f20656e636f64696e6720696e206d656d706f6f6c"}, + } + + for _, tc := range testCases { + tc := tc + + msg := protomem.Message{ + Sum: &protomem.Message_Txs{ + Txs: &protomem.Txs{Txs: [][]byte{tc.tx}}, + }, + } + bz, err := msg.Marshal() + require.NoError(t, err, tc.testName) + + require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName) + } +} + +func TestLegacyReactorReceiveBasic(t *testing.T) { + config := cfg.TestConfig() + // if there were more than two reactors, the order of transactions could not be + // asserted in waitForTxsOnReactors (due to transactions gossiping). If we + // replace Connect2Switches (full mesh) with a func, which connects first + // reactor to others and nothing else, this test should also pass with >2 reactors. + const N = 1 + reactors := makeAndConnectReactors(config, N) + var ( + reactor = reactors[0] + peer = p2pmock.NewPeer(nil) + ) + defer func() { + err := reactor.Stop() + assert.NoError(t, err) + }() + + reactor.InitPeer(peer) + reactor.AddPeer(peer) + + msg := &protomem.Message{ + Sum: &protomem.Message_Txs{ + Txs: &protomem.Txs{Txs: [][]byte{}}, + }, + } + m, err := proto.Marshal(msg) + assert.NoError(t, err) + + assert.NotPanics(t, func() { + reactor.Receive(mempool.MempoolChannel, peer, m) + }) +} + +func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor { + reactors := make([]*Reactor, n) + logger := mempoolLogger() + for i := 0; i < n; i++ { + app := kvstore.NewApplication() + cc := proxy.NewLocalClientCreator(app) + mempool, cleanup := newMempoolWithApp(cc) + defer cleanup() + + reactors[i] = NewReactor(config.Mempool, mempool) // so we dont start the consensus states + reactors[i].SetLogger(logger.With("validator", i)) + } + + p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("MEMPOOL", reactors[i]) + return s + + }, p2p.Connect2Switches) + return reactors +} + +// mempoolLogger is a TestingLogger which uses a different +// color for each validator ("validator" key must exist). +func mempoolLogger() log.Logger { + return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor { + for i := 0; i < len(keyvals)-1; i += 2 { + if keyvals[i] == "validator" { + return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))} + } + } + return term.FgBgColor{} + }) +} + +func newMempoolWithApp(cc proxy.ClientCreator) (*TxMempool, func()) { + conf := cfg.ResetTestRoot("mempool_test") + + mp, cu := newMempoolWithAppAndConfig(cc, conf) + return mp, cu +} + +func newMempoolWithAppAndConfig(cc proxy.ClientCreator, conf *cfg.Config) (*TxMempool, func()) { + appConnMem, _ := cc.NewABCIClient() + appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool")) + err := appConnMem.Start() + if err != nil { + panic(err) + } + + mp := NewTxMempool(log.TestingLogger(), conf.Mempool, appConnMem, 0) + + return mp, func() { os.RemoveAll(conf.RootDir) } +} + +func waitForTxsOnReactors(t *testing.T, txs types.Txs, reactors []*Reactor) { + // wait for the txs in all mempools + wg := new(sync.WaitGroup) + for i, reactor := range reactors { + wg.Add(1) + go func(r *Reactor, reactorIndex int) { + defer wg.Done() + waitForTxsOnReactor(t, txs, r, reactorIndex) + }(reactor, i) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + timer := time.After(timeout) + select { + case <-timer: + t.Fatal("Timed out waiting for txs") + case <-done: + } +} + +func waitForTxsOnReactor(t *testing.T, txs types.Txs, reactor *Reactor, reactorIndex int) { + mempool := reactor.mempool + for mempool.Size() < len(txs) { + time.Sleep(time.Millisecond * 100) + } + + reapedTxs := mempool.ReapMaxTxs(len(txs)) + for i, tx := range txs { + assert.Equalf(t, tx, reapedTxs[i], + "txs at index %d on reactor %d don't match: %v vs %v", i, reactorIndex, tx, reapedTxs[i]) + } +} diff --git a/mempool/cat/spec.md b/mempool/cat/spec.md new file mode 100644 index 0000000000..8dfe3e6ab2 --- /dev/null +++ b/mempool/cat/spec.md @@ -0,0 +1,85 @@ +# Content Addressable Transaction Pool Specification + +- 01.12.2022 | Initial specification (@cmwaters) + +### Outline + +This document specifies the properties, design and implementation of a content addressable transaction pool (CAT). This protocol is intended as an alternative to the FIFO and Priority mempools currently built-in to the Tendermint consensus protocol. The term content-addressable here, indicates that each transaction is identified by a smaller, unique tag (in this case a sha256 hash). These tags are broadcast among the transactions as a means of more compactly indicating which peers have which transactions. Tracking what each peer has aims at reduces the amount of duplication. In a network without content tracking, a peer may receive as many duplicate transactions as peers connected to. The tradeoff here therefore is that the transactions are significantly larger than the tag such that the sum of the data saved sending what would be duplicated transactions is larger than the sum of sending each peer a tag. + +### Purpose + +The objective of such a protocol is to transport transactions from the author (usually a client) to a proposed block, optimizing both latency and throughput i.e. how quickly can a transaction be proposed (and committed) and how many transactions can be transported into a block at once. + +Typically the mempool serves to receive inbound transactions via an RPC endpoint, gossip them to all nodes in the network (regardless of whether they are capable of proposing a block or not), and stage groups of transactions to both consensus and the application to be included in a block. + +### Assumptions + +The following are assumptions inherited from existing Tendermint mempool protocols: + +- `CheckTx` should be seen as a simple gatekeeper to what transactions enter the pool to be gossiped and staged. It is non-deterministic: one node may reject a transaction that another node keeps. +- Applications implementing `CheckTx` are responsible for replay protection (i.e. the same transaction being present in multiple blocks). The mempool ensures that within the same block, no duplicate transactions can exist. +- The underlying p2p layer guarantees eventually reliable broadcast. A transaction need only be sent once to eventually reach the target peer. + +### Messages + +The CAT protocol extends on the existing mempool implementations by introducing a new protobuf message: + +```protobuf +message SeenTx { + bytes tx_key = 1; +} +``` + +The `SeenTx` contains the sha256 hash of the raw transaction bytes. The only validation is that the byte slice MUST have a length of 32. + +> NOTE: The term `SeenTx` is used over the more common `HasTx` because the transaction pool contains sophisticated eviction logic. TTL's, higher priority transactions and reCheckTx may mean that a transaction pool *had* a transaction but does not have it any more. Semantically it's more appropriate to use `SeenTx` to imply not the presence of a transaction but that the node has seen it and dealt with it accordingly. + +#### Outbound logic + +A `SeenTx` is broadcasted to ALL nodes upon receiving a "new" transaction from a peer. The transaction pool does not need to track every unique inbound transaction, therefore "new" is identified as: + +- The node does not currently have the transaction +- The node did not recently reject the transacton (subject to the size of the cache) +- The node did not recently evict the transaction (subject to the size of the cache) + +Given this criteria, it is feasible, yet unlikely that a node receives two `SeenTx` messages from the same peer for the same transaction. + +A `SeenTx` MAY be sent for each transaction currently in the transaction pool when a connection with a peer is first established. This acts as a mechanism for syncing pool state across peers. + +The `SeenTx` message SHOULD be broadcasted before either validation or storage. It is important that the `SeenTx` be delivered to connected peers as soon as possible to decrease the likelihood of duplicate transmission. Given this, it is possible that a `SeenTx` is sent for a transaction that is rejected by the node. This is acceptable given that `CheckTx` is non-deterministic (one node may reject a transaction that another node does not). + +> NOTE: Inbound transactions submitted via the RPC do not trigger a `SeenTx` message as it is assumed that the node is the first to see the transaction and by gossiping it to others it is implied that the node has seen the transaction. + +#### Inbound logic + +Upon receiving a `SeenTx` message: + +- If the node has the transaction in its pool, it will mark that the peer has seen the transaction and MUST not broadcast that transaction to that peer. +- If the node has recently rejected that transaction, it SHOULD ignore the message +- If the node has not seen the transaction or has recently evicted it, it MAY cache the message and process it if the corresponding transaction is received. + +### Gossip + +A node MUST not broadcast a transaction to a peer after receiving a `SeenTx` for that transaction from that peer. Given the asynchronous nature of sending and receiving messages it is possible to still receive a transaction from a peer that the node has sent a `SeenTx` to therefore nodes SHOULD NOT punish peers if they *appear* to not follow this rule. + +Each transaction received and sent should be also be treated as a `SeenTx`. Receiving a transaction from a peer MUST mark that peer as having seen that transaction. Each transaction sent by the peer (under the reliable broadcast assumption) MUST also mark the peer as having seen that transaction. + +Finally, transaction pools are solely run in-memory; thus when a node stops, all transactions are discarded. To avoid the scenario where a node restarts and does not receive transactions because other nodes recorded a `SeenTx` message from their previous run, each transaction pool should track peer state based **per connection** and not per `NodeID`. + +### Cache + +The transaction pool employs a few OPTIONAL caches for performance improvements: + +- Rejected transactions can be cached to a configurable size to avoid unnecessarily burdening the application in verification. Using a large cache here implies that when `CheckTx` rejects a transaction it will not, at a later point, become valid. With this in mind, verification should veer towards being less stringent. +- `SeenTx` messages for transactions that the node has not yet encountered MAY be cached so as to avoid unnecessarily broadcasting transactions back to those peers once the node eventually receives the transaction. +- The meta data around evicted transactions MAY be cached as these have already been validated so if the transaction is received in a state where the transaction pool has capacity for it, the transaction can be quickly added. + +> NOTE: In the future, `WantTx` messages may be introduced in the case of the last bullet point to indicate that even after sending the `SeenTx` we want to receive that transaction again. + +ALL caches SHOULD be bounded in size. + +### Compatibility + +CAT has Go API compatibility with the existing two mempool implementations. It implements both the `Reactor` interface required by Tendermint's P2P layer and the `Mempool` interface used by `consensus` and `rpc`. CAT is not currently network compatible with existing implementations as the new message type will be unknown to other nodes causing them to drop the connection. + +> NOTE: p2p compatibility can be solved by implementing the message type on a different channel which is simply ignored by nodes that don't support it. This may actually be preferable as a different channel allows for a different priority. diff --git a/mempool/cat/tx.go b/mempool/cat/tx.go new file mode 100644 index 0000000000..e1c332c10e --- /dev/null +++ b/mempool/cat/tx.go @@ -0,0 +1,60 @@ +package cat + +import ( + "sync" + "time" + + "github.com/tendermint/tendermint/types" +) + +// wrappedTx defines a wrapper around a raw transaction with additional metadata +// that is used for indexing. With the exception of the map of peers who have +// seen this transaction, this struct should never be modified +type WrappedTx struct { + // these fields are immutable + tx types.Tx // the original transaction data + key types.TxKey // the transaction hash + height int64 // height when this transaction was initially checked (for expiry) + timestamp time.Time // time when transaction was entered (for TTL) + gasWanted int64 // app: gas required to execute this transaction + priority int64 // app: priority value for this transaction + sender string // app: assigned sender label + + mtx sync.Mutex + peers map[uint16]bool // peer IDs who have sent us this transaction +} + +func NewWrappedTx(tx types.Tx, key types.TxKey, height, gasWanted, priority int64, sender string) *WrappedTx { + return &WrappedTx{ + tx: tx, + key: key, + height: height, + timestamp: time.Now().UTC(), + gasWanted: gasWanted, + priority: priority, + sender: sender, + peers: map[uint16]bool{}, + } +} + +// Size reports the size of the raw transaction in bytes. +func (w *WrappedTx) Size() int64 { return int64(len(w.tx)) } + +// SetPeer adds the specified peer ID as a sender of w. +func (w *WrappedTx) SetPeer(id uint16) { + w.mtx.Lock() + defer w.mtx.Unlock() + if w.peers == nil { + w.peers = map[uint16]bool{id: true} + } else { + w.peers[id] = true + } +} + +// HasPeer reports whether the specified peer ID is a sender of w. +func (w *WrappedTx) HasPeer(id uint16) bool { + w.mtx.Lock() + defer w.mtx.Unlock() + _, ok := w.peers[id] + return ok +} diff --git a/node/node.go b/node/node.go index eb0b77e427..6d499118b5 100644 --- a/node/node.go +++ b/node/node.go @@ -30,6 +30,7 @@ import ( "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/light" mempl "github.com/tendermint/tendermint/mempool" + mempoolv2 "github.com/tendermint/tendermint/mempool/cat" mempoolv0 "github.com/tendermint/tendermint/mempool/v0" mempoolv1 "github.com/tendermint/tendermint/mempool/v1" "github.com/tendermint/tendermint/p2p" @@ -373,6 +374,26 @@ func createMempoolAndMempoolReactor( logger log.Logger, ) (mempl.Mempool, p2p.Reactor) { switch config.Mempool.Version { + case cfg.MempoolV2: + mp := mempoolv2.NewTxMempool( + logger, + config.Mempool, + proxyApp.Mempool(), + state.LastBlockHeight, + mempoolv2.WithMetrics(memplMetrics), + mempoolv2.WithPreCheck(sm.TxPreCheck(state)), + mempoolv2.WithPostCheck(sm.TxPostCheck(state)), + ) + + reactor := mempoolv2.NewReactor( + config.Mempool, + mp, + ) + if config.Consensus.WaitForTxs() { + mp.EnableTxsAvailable() + } + + return mp, reactor case cfg.MempoolV1: mp := mempoolv1.NewTxMempool( logger, diff --git a/proto/tendermint/mempool/types.pb.go b/proto/tendermint/mempool/types.pb.go index 11e259551d..f670960827 100644 --- a/proto/tendermint/mempool/types.pb.go +++ b/proto/tendermint/mempool/types.pb.go @@ -66,9 +66,55 @@ func (m *Txs) GetTxs() [][]byte { return nil } +type SeenTx struct { + TxKey []byte `protobuf:"bytes,1,opt,name=tx_key,json=txKey,proto3" json:"tx_key,omitempty"` +} + +func (m *SeenTx) Reset() { *m = SeenTx{} } +func (m *SeenTx) String() string { return proto.CompactTextString(m) } +func (*SeenTx) ProtoMessage() {} +func (*SeenTx) Descriptor() ([]byte, []int) { + return fileDescriptor_2af51926fdbcbc05, []int{1} +} +func (m *SeenTx) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SeenTx) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SeenTx.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SeenTx) XXX_Merge(src proto.Message) { + xxx_messageInfo_SeenTx.Merge(m, src) +} +func (m *SeenTx) XXX_Size() int { + return m.Size() +} +func (m *SeenTx) XXX_DiscardUnknown() { + xxx_messageInfo_SeenTx.DiscardUnknown(m) +} + +var xxx_messageInfo_SeenTx proto.InternalMessageInfo + +func (m *SeenTx) GetTxKey() []byte { + if m != nil { + return m.TxKey + } + return nil +} + type Message struct { // Types that are valid to be assigned to Sum: + // // *Message_Txs + // *Message_SeenTx Sum isMessage_Sum `protobuf_oneof:"sum"` } @@ -76,7 +122,7 @@ func (m *Message) Reset() { *m = Message{} } func (m *Message) String() string { return proto.CompactTextString(m) } func (*Message) ProtoMessage() {} func (*Message) Descriptor() ([]byte, []int) { - return fileDescriptor_2af51926fdbcbc05, []int{1} + return fileDescriptor_2af51926fdbcbc05, []int{2} } func (m *Message) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -114,8 +160,12 @@ type isMessage_Sum interface { type Message_Txs struct { Txs *Txs `protobuf:"bytes,1,opt,name=txs,proto3,oneof" json:"txs,omitempty"` } +type Message_SeenTx struct { + SeenTx *SeenTx `protobuf:"bytes,2,opt,name=seen_tx,json=seenTx,proto3,oneof" json:"seen_tx,omitempty"` +} -func (*Message_Txs) isMessage_Sum() {} +func (*Message_Txs) isMessage_Sum() {} +func (*Message_SeenTx) isMessage_Sum() {} func (m *Message) GetSum() isMessage_Sum { if m != nil { @@ -131,34 +181,46 @@ func (m *Message) GetTxs() *Txs { return nil } +func (m *Message) GetSeenTx() *SeenTx { + if x, ok := m.GetSum().(*Message_SeenTx); ok { + return x.SeenTx + } + return nil +} + // XXX_OneofWrappers is for the internal use of the proto package. func (*Message) XXX_OneofWrappers() []interface{} { return []interface{}{ (*Message_Txs)(nil), + (*Message_SeenTx)(nil), } } func init() { proto.RegisterType((*Txs)(nil), "tendermint.mempool.Txs") + proto.RegisterType((*SeenTx)(nil), "tendermint.mempool.SeenTx") proto.RegisterType((*Message)(nil), "tendermint.mempool.Message") } func init() { proto.RegisterFile("tendermint/mempool/types.proto", fileDescriptor_2af51926fdbcbc05) } var fileDescriptor_2af51926fdbcbc05 = []byte{ - // 179 bytes of a gzipped FileDescriptorProto + // 240 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x2b, 0x49, 0xcd, 0x4b, 0x49, 0x2d, 0xca, 0xcd, 0xcc, 0x2b, 0xd1, 0xcf, 0x4d, 0xcd, 0x2d, 0xc8, 0xcf, 0xcf, 0xd1, 0x2f, 0xa9, 0x2c, 0x48, 0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x12, 0x42, 0xc8, 0xeb, 0x41, 0xe5, 0x95, 0xc4, 0xb9, 0x98, 0x43, 0x2a, 0x8a, 0x85, 0x04, 0xb8, 0x98, 0x4b, 0x2a, 0x8a, 0x25, - 0x18, 0x15, 0x98, 0x35, 0x78, 0x82, 0x40, 0x4c, 0x25, 0x5b, 0x2e, 0x76, 0xdf, 0xd4, 0xe2, 0xe2, - 0xc4, 0xf4, 0x54, 0x21, 0x6d, 0x98, 0x24, 0xa3, 0x06, 0xb7, 0x91, 0xb8, 0x1e, 0xa6, 0x29, 0x7a, - 0x21, 0x15, 0xc5, 0x1e, 0x0c, 0x60, 0x7d, 0x4e, 0xac, 0x5c, 0xcc, 0xc5, 0xa5, 0xb9, 0x4e, 0xc1, - 0x27, 0x1e, 0xc9, 0x31, 0x5e, 0x78, 0x24, 0xc7, 0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, - 0x0c, 0x17, 0x1e, 0xcb, 0x31, 0xdc, 0x78, 0x2c, 0xc7, 0x10, 0x65, 0x99, 0x9e, 0x59, 0x92, 0x51, - 0x9a, 0xa4, 0x97, 0x9c, 0x9f, 0xab, 0x8f, 0xe4, 0x60, 0x24, 0x26, 0xd8, 0xb5, 0xfa, 0x98, 0x9e, - 0x49, 0x62, 0x03, 0xcb, 0x18, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0xca, 0xc3, 0xa0, 0xfc, 0xe9, - 0x00, 0x00, 0x00, + 0x18, 0x15, 0x98, 0x35, 0x78, 0x82, 0x40, 0x4c, 0x25, 0x79, 0x2e, 0xb6, 0xe0, 0xd4, 0xd4, 0xbc, + 0x90, 0x0a, 0x21, 0x51, 0x2e, 0xb6, 0x92, 0x8a, 0xf8, 0xec, 0xd4, 0x4a, 0x09, 0x46, 0x05, 0x46, + 0x0d, 0x9e, 0x20, 0xd6, 0x92, 0x0a, 0xef, 0xd4, 0x4a, 0xa5, 0x12, 0x2e, 0x76, 0xdf, 0xd4, 0xe2, + 0xe2, 0xc4, 0xf4, 0x54, 0x21, 0x6d, 0x98, 0x6e, 0x46, 0x0d, 0x6e, 0x23, 0x71, 0x3d, 0x4c, 0x6b, + 0xf4, 0x42, 0x2a, 0x8a, 0x3d, 0x18, 0xc0, 0x06, 0x0b, 0x99, 0x72, 0xb1, 0x17, 0xa7, 0xa6, 0xe6, + 0xc5, 0x97, 0x54, 0x48, 0x30, 0x81, 0x35, 0x48, 0x61, 0xd3, 0x00, 0xb1, 0xdb, 0x83, 0x21, 0x88, + 0xad, 0x18, 0xcc, 0x72, 0x62, 0xe5, 0x62, 0x2e, 0x2e, 0xcd, 0x75, 0x0a, 0x3e, 0xf1, 0x48, 0x8e, + 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, 0xb8, 0xf0, 0x58, + 0x8e, 0xe1, 0xc6, 0x63, 0x39, 0x86, 0x28, 0xcb, 0xf4, 0xcc, 0x92, 0x8c, 0xd2, 0x24, 0xbd, 0xe4, + 0xfc, 0x5c, 0x7d, 0xa4, 0x80, 0x40, 0x62, 0x82, 0x43, 0x41, 0x1f, 0x33, 0x90, 0x92, 0xd8, 0xc0, + 0x32, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x25, 0xcf, 0x38, 0xd6, 0x41, 0x01, 0x00, 0x00, } func (m *Txs) Marshal() (dAtA []byte, err error) { @@ -193,6 +255,36 @@ func (m *Txs) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *SeenTx) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SeenTx) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SeenTx) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.TxKey) > 0 { + i -= len(m.TxKey) + copy(dAtA[i:], m.TxKey) + i = encodeVarintTypes(dAtA, i, uint64(len(m.TxKey))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func (m *Message) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -246,6 +338,27 @@ func (m *Message_Txs) MarshalToSizedBuffer(dAtA []byte) (int, error) { } return len(dAtA) - i, nil } +func (m *Message_SeenTx) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Message_SeenTx) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.SeenTx != nil { + { + size, err := m.SeenTx.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTypes(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + return len(dAtA) - i, nil +} func encodeVarintTypes(dAtA []byte, offset int, v uint64) int { offset -= sovTypes(v) base := offset @@ -272,6 +385,19 @@ func (m *Txs) Size() (n int) { return n } +func (m *SeenTx) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.TxKey) + if l > 0 { + n += 1 + l + sovTypes(uint64(l)) + } + return n +} + func (m *Message) Size() (n int) { if m == nil { return 0 @@ -296,6 +422,18 @@ func (m *Message_Txs) Size() (n int) { } return n } +func (m *Message_SeenTx) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.SeenTx != nil { + l = m.SeenTx.Size() + n += 1 + l + sovTypes(uint64(l)) + } + return n +} func sovTypes(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 @@ -385,6 +523,90 @@ func (m *Txs) Unmarshal(dAtA []byte) error { } return nil } +func (m *SeenTx) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SeenTx: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SeenTx: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TxKey", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TxKey = append(m.TxKey[:0], dAtA[iNdEx:postIndex]...) + if m.TxKey == nil { + m.TxKey = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTypes(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthTypes + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *Message) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -449,6 +671,41 @@ func (m *Message) Unmarshal(dAtA []byte) error { } m.Sum = &Message_Txs{v} iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SeenTx", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &SeenTx{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Sum = &Message_SeenTx{v} + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipTypes(dAtA[iNdEx:]) diff --git a/proto/tendermint/mempool/types.proto b/proto/tendermint/mempool/types.proto index b55d9717b1..ff24847f35 100644 --- a/proto/tendermint/mempool/types.proto +++ b/proto/tendermint/mempool/types.proto @@ -7,8 +7,13 @@ message Txs { repeated bytes txs = 1; } +message SeenTx { + bytes tx_key = 1; +} + message Message { oneof sum { - Txs txs = 1; + Txs txs = 1; + SeenTx seen_tx = 2; } } diff --git a/test/e2e/generator/generate.go b/test/e2e/generator/generate.go index f0c6fef7dd..c216474c6c 100644 --- a/test/e2e/generator/generate.go +++ b/test/e2e/generator/generate.go @@ -32,7 +32,7 @@ var ( // FIXME: v2 disabled due to flake nodeFastSyncs = uniformChoice{"v0"} // "v2" nodeStateSyncs = uniformChoice{false, true} - nodeMempools = uniformChoice{"v0", "v1"} + nodeMempools = uniformChoice{"v0", "v1", "v2"} nodePersistIntervals = uniformChoice{0, 1, 5} nodeSnapshotIntervals = uniformChoice{0, 3} nodeRetainBlocks = uniformChoice{0, 1, 5} diff --git a/test/e2e/networks/ci.toml b/test/e2e/networks/ci.toml index 7ed1049df1..e1bbc68e8b 100644 --- a/test/e2e/networks/ci.toml +++ b/test/e2e/networks/ci.toml @@ -68,7 +68,6 @@ start_at = 1005 # Becomes part of the validator set at 1010 seeds = ["seed02"] database = "cleveldb" fast_sync = "v0" -mempool_version = "v1" # FIXME: should be grpc, disabled due to https://github.com/tendermint/tendermint/issues/5439 #abci_protocol = "grpc" privval_protocol = "tcp" diff --git a/test/e2e/pkg/manifest.go b/test/e2e/pkg/manifest.go index 65c0cf84b3..3d9b966fe4 100644 --- a/test/e2e/pkg/manifest.go +++ b/test/e2e/pkg/manifest.go @@ -92,8 +92,8 @@ type ManifestNode struct { // Defaults to disabled. FastSync string `toml:"fast_sync"` - // Mempool specifies which version of mempool to use. Either "v0" or "v1" - // This defaults to v0. + // Mempool specifies which version of mempool to use. Either "v0" or "v1", or "v2" + // (cat). This defaults to v2. Mempool string `toml:"mempool_version"` // StateSync enables state sync. The runner automatically configures trusted diff --git a/test/e2e/pkg/testnet.go b/test/e2e/pkg/testnet.go index 0729bafca9..66bffdaafc 100644 --- a/test/e2e/pkg/testnet.go +++ b/test/e2e/pkg/testnet.go @@ -11,6 +11,7 @@ import ( "strconv" "strings" + "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/crypto/secp256k1" @@ -305,7 +306,7 @@ func (n Node) Validate(testnet Testnet) error { } switch n.Mempool { - case "", "v0", "v1": + case "", config.MempoolV0, config.MempoolV1, config.MempoolV2: default: return fmt.Errorf("invalid mempool version %q", n.Mempool) } diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index 1e178adb0a..39726e859f 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -32,6 +32,7 @@ import ( "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/light" mempl "github.com/tendermint/tendermint/mempool" + mempoolv2 "github.com/tendermint/tendermint/mempool/cat" mempoolv0 "github.com/tendermint/tendermint/mempool/v0" mempoolv1 "github.com/tendermint/tendermint/mempool/v1" "github.com/tendermint/tendermint/p2p" @@ -383,6 +384,26 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, state sm.State, memplMetrics *mempl.Metrics, logger log.Logger, ) (p2p.Reactor, mempl.Mempool) { switch config.Mempool.Version { + case cfg.MempoolV2: + mp := mempoolv2.NewTxMempool( + logger, + config.Mempool, + proxyApp.Mempool(), + state.LastBlockHeight, + mempoolv2.WithMetrics(memplMetrics), + mempoolv2.WithPreCheck(sm.TxPreCheck(state)), + mempoolv2.WithPostCheck(sm.TxPostCheck(state)), + ) + + reactor := mempoolv2.NewReactor( + config.Mempool, + mp, + ) + if config.Consensus.WaitForTxs() { + mp.EnableTxsAvailable() + } + + return reactor, mp case cfg.MempoolV1: mp := mempoolv1.NewTxMempool( logger,