diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index c2d9a6a5b..8e8844015 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -155,23 +155,24 @@ func TestMempoolRmBadTx(t *testing.T) { resEndRecheckTx := app.EndRecheckTx(abci.RequestEndRecheckTx{}) assert.Equal(t, code.CodeTypeOK, resEndRecheckTx.Code) - emptyMempoolCh := make(chan struct{}) + checkTxErrorCh := make(chan error) checkTxRespCh := make(chan struct{}) + emptyMempoolCh := make(chan struct{}) go func() { // Try to send the tx through the mempool. // CheckTx should not err, but the app should return a bad abci code // and the tx should get removed from the pool - err := assertMempool(cs.txNotifier).CheckTxAsync(txBytes, mempl.TxInfo{}, func(r *abci.Response) { + assertMempool(cs.txNotifier).CheckTxAsync(txBytes, mempl.TxInfo{}, func(err error) { + checkTxErrorCh <- err + }, func(r *abci.Response) { if r.GetCheckTx().Code != code.CodeTypeBadNonce { t.Errorf("expected checktx to return bad nonce, got %v", r) return } checkTxRespCh <- struct{}{} }) - if err != nil { - t.Errorf("error after CheckTx: %v", err) - return - } + + <-checkTxErrorCh // check for the tx for { diff --git a/mempool/bench_test.go b/mempool/bench_test.go index 0c2fdc7d3..eb2197e42 100644 --- a/mempool/bench_test.go +++ b/mempool/bench_test.go @@ -36,7 +36,7 @@ func BenchmarkReapWithCheckTxAsync(b *testing.B) { for i := 0; i < size; i++ { tx := make([]byte, 8) binary.BigEndian.PutUint64(tx, uint64(i)) - mempool.CheckTxAsync(tx, TxInfo{}, nil) + mempool.CheckTxAsync(tx, TxInfo{}, nil, nil) } b.ResetTimer() for i := 0; i < b.N; i++ { @@ -66,7 +66,7 @@ func BenchmarkCheckTxAsync(b *testing.B) { for i := 0; i < b.N; i++ { tx := make([]byte, 8) binary.BigEndian.PutUint64(tx, uint64(i)) - mempool.CheckTxAsync(tx, TxInfo{}, nil) + mempool.CheckTxAsync(tx, TxInfo{}, nil, nil) } } diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 666fdceb8..85f5550f1 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -47,6 +47,8 @@ type CListMempool struct { updateMtx sync.RWMutex preCheck PreCheckFunc + chReqCheckTx chan *requestCheckTxAsync + wal *auto.AutoFile // a log of mempool txs txs *clist.CList // concurrent linked-list of good txs proxyAppConn proxy.AppConnMempool @@ -64,6 +66,13 @@ type CListMempool struct { metrics *Metrics } +type requestCheckTxAsync struct { + tx types.Tx + txInfo TxInfo + prepareCb func(error) + checkTxCb func(*abci.Response) +} + var _ Mempool = &CListMempool{} // CListMempoolOption sets an optional parameter on the mempool. @@ -81,6 +90,7 @@ func NewCListMempool( proxyAppConn: proxyAppConn, txs: clist.New(), height: height, + chReqCheckTx: make(chan *requestCheckTxAsync, config.Size), logger: log.NewNopLogger(), metrics: NopMetrics(), } @@ -93,6 +103,7 @@ func NewCListMempool( for _, option := range options { option(mempool) } + go mempool.checkTxAsyncReactor() return mempool } @@ -225,39 +236,49 @@ func (mem *CListMempool) CheckTxSync(tx types.Tx, txInfo TxInfo) (res *abci.Resp return res, err } -// It blocks if we're waiting on Update() or Reap(). // cb: A callback from the CheckTx command. // It gets called from another goroutine. -// CONTRACT: Either cb will get called, or err returned. // // Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) CheckTxAsync(tx types.Tx, txInfo TxInfo, cb func(*abci.Response)) (err error) { +func (mem *CListMempool) CheckTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func(error), checkTxCb func(*abci.Response)) { + mem.chReqCheckTx <- &requestCheckTxAsync{tx: tx, txInfo: txInfo, prepareCb: prepareCb, checkTxCb: checkTxCb} +} + +func (mem *CListMempool) checkTxAsyncReactor() { + for req := range mem.chReqCheckTx { + mem.checkTxAsync(req.tx, req.txInfo, req.prepareCb, req.checkTxCb) + } +} + +// It blocks if we're waiting on Update() or Reap(). +func (mem *CListMempool) checkTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func(error), checkTxCb func(*abci.Response)) { mem.updateMtx.RLock() - // use defer to unlock mutex because application (*local client*) might panic defer func() { - if err != nil { - mem.updateMtx.RUnlock() - return - } - if r := recover(); r != nil { mem.updateMtx.RUnlock() panic(r) } }() - if err = mem.prepareCheckTx(tx, txInfo); err != nil { - return err + err := mem.prepareCheckTx(tx, txInfo) + if prepareCb != nil { + prepareCb(err) + } + if err != nil { + mem.updateMtx.RUnlock() + return } // CONTRACT: `app.CheckTxAsync()` should check whether `GasWanted` is valid (0 <= GasWanted <= block.masGas) reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) reqRes.SetCallback(func(res *abci.Response) { - mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, res, cb) - mem.updateMtx.RUnlock() + mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, res, func(response *abci.Response) { + if checkTxCb != nil { + checkTxCb(response) + } + mem.updateMtx.RUnlock() + }) }) - - return err } // CONTRACT: `caller` should held `mem.updateMtx.RLock()` diff --git a/mempool/mempool.go b/mempool/mempool.go index 93603b7a9..e13def2fd 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -16,7 +16,7 @@ type Mempool interface { // CheckTx executes a new transaction against the application to determine // its validity and whether it should be added to the mempool. CheckTxSync(tx types.Tx, txInfo TxInfo) (*abci.Response, error) - CheckTxAsync(tx types.Tx, txInfo TxInfo, callback func(*abci.Response)) error + CheckTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func(error), checkTxCb func(*abci.Response)) // ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes // bytes total with the condition that the total gasWanted must be less than diff --git a/mempool/reactor.go b/mempool/reactor.go index 79bd18fdc..76dfbf2a3 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -174,10 +174,11 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { if src != nil { txInfo.SenderP2PID = src.ID() } - err := memR.mempool.CheckTxAsync(msg.Tx, txInfo, nil) - if err != nil { - memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err) - } + memR.mempool.CheckTxAsync(msg.Tx, txInfo, func(err error) { + if err != nil { + memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err) + } + }, nil) // broadcasting happens from go routines per peer default: memR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) diff --git a/mock/mempool.go b/mock/mempool.go index ebedcc05a..9a2b39999 100644 --- a/mock/mempool.go +++ b/mock/mempool.go @@ -18,8 +18,7 @@ func (Mempool) Size() int { return 0 } func (Mempool) CheckTxSync(_ types.Tx, _ mempl.TxInfo) (*abci.Response, error) { return nil, nil } -func (Mempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(*abci.Response)) error { - return nil +func (Mempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(error), _ func(*abci.Response)) { } func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} } diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 517550d2c..893f08266 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -21,11 +21,15 @@ import ( // CheckTx nor DeliverTx results. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - err := env.Mempool.CheckTxAsync(tx, mempl.TxInfo{}, nil) - + chErr := make(chan error) + env.Mempool.CheckTxAsync(tx, mempl.TxInfo{}, func(err error) { + chErr <- err + }, nil) + err := <-chErr if err != nil { return nil, err } + return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil }