From c40e8cfd311c51effce232da5e8965cdf359668d Mon Sep 17 00:00:00 2001 From: "Kim, JinSan" Date: Thu, 21 Jan 2021 11:40:46 +0900 Subject: [PATCH 1/5] feat: impl `checkTxAsyncReactor()` --- consensus/mempool_test.go | 13 ++++++------ mempool/clist_mempool.go | 44 +++++++++++++++++++++++++++------------ mempool/mempool.go | 2 +- mempool/reactor.go | 10 +++++---- mock/mempool.go | 4 ++-- rpc/core/mempool.go | 6 +----- 6 files changed, 48 insertions(+), 31 deletions(-) diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index c2d9a6a5b..384ebea8e 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -155,23 +155,24 @@ func TestMempoolRmBadTx(t *testing.T) { resEndRecheckTx := app.EndRecheckTx(abci.RequestEndRecheckTx{}) assert.Equal(t, code.CodeTypeOK, resEndRecheckTx.Code) - emptyMempoolCh := make(chan struct{}) + checkTxErrorCh := make(chan error) checkTxRespCh := make(chan struct{}) + emptyMempoolCh := make(chan struct{}) go func() { // Try to send the tx through the mempool. // CheckTx should not err, but the app should return a bad abci code // and the tx should get removed from the pool - err := assertMempool(cs.txNotifier).CheckTxAsync(txBytes, mempl.TxInfo{}, func(r *abci.Response) { + assertMempool(cs.txNotifier).CheckTxAsync(txBytes, mempl.TxInfo{}, func(r *abci.Response, err error) { + checkTxErrorCh <- err + if r.GetCheckTx().Code != code.CodeTypeBadNonce { t.Errorf("expected checktx to return bad nonce, got %v", r) return } checkTxRespCh <- struct{}{} }) - if err != nil { - t.Errorf("error after CheckTx: %v", err) - return - } + + <-checkTxErrorCh // check for the tx for { diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 666fdceb8..9622e4b48 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -47,6 +47,8 @@ type CListMempool struct { updateMtx sync.RWMutex preCheck PreCheckFunc + chReqCheckTx chan *requestCheckTxAsync + wal *auto.AutoFile // a log of mempool txs txs *clist.CList // concurrent linked-list of good txs proxyAppConn proxy.AppConnMempool @@ -64,6 +66,12 @@ type CListMempool struct { metrics *Metrics } +type requestCheckTxAsync struct { + tx types.Tx + txInfo TxInfo + cb func(*abci.Response, error) +} + var _ Mempool = &CListMempool{} // CListMempoolOption sets an optional parameter on the mempool. @@ -81,6 +89,7 @@ func NewCListMempool( proxyAppConn: proxyAppConn, txs: clist.New(), height: height, + chReqCheckTx: make(chan *requestCheckTxAsync, config.Size), logger: log.NewNopLogger(), metrics: NopMetrics(), } @@ -93,6 +102,7 @@ func NewCListMempool( for _, option := range options { option(mempool) } + go mempool.checkTxAsyncReactor() return mempool } @@ -231,33 +241,41 @@ func (mem *CListMempool) CheckTxSync(tx types.Tx, txInfo TxInfo) (res *abci.Resp // CONTRACT: Either cb will get called, or err returned. // // Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) CheckTxAsync(tx types.Tx, txInfo TxInfo, cb func(*abci.Response)) (err error) { +func (mem *CListMempool) CheckTxAsync(tx types.Tx, txInfo TxInfo, cb func(*abci.Response, error)) { + mem.chReqCheckTx <- &requestCheckTxAsync{tx: tx, txInfo: txInfo, cb: cb} +} + +func (mem *CListMempool) checkTxAsyncReactor() { + for req := range mem.chReqCheckTx { + mem.checkTxAsync(req.tx, req.txInfo, req.cb) + } +} + +func (mem *CListMempool) checkTxAsync(tx types.Tx, txInfo TxInfo, cb func(*abci.Response, error)) { mem.updateMtx.RLock() - // use defer to unlock mutex because application (*local client*) might panic defer func() { - if err != nil { - mem.updateMtx.RUnlock() - return - } - if r := recover(); r != nil { mem.updateMtx.RUnlock() panic(r) } }() - if err = mem.prepareCheckTx(tx, txInfo); err != nil { - return err + if err := mem.prepareCheckTx(tx, txInfo); err != nil { + cb(nil, err) + mem.updateMtx.RUnlock() + return } // CONTRACT: `app.CheckTxAsync()` should check whether `GasWanted` is valid (0 <= GasWanted <= block.masGas) reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) reqRes.SetCallback(func(res *abci.Response) { - mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, res, cb) - mem.updateMtx.RUnlock() + mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, res, func(response *abci.Response) { + if cb != nil { + cb(response, nil) + } + mem.updateMtx.RUnlock() + }) }) - - return err } // CONTRACT: `caller` should held `mem.updateMtx.RLock()` diff --git a/mempool/mempool.go b/mempool/mempool.go index 93603b7a9..ffa2ebf77 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -16,7 +16,7 @@ type Mempool interface { // CheckTx executes a new transaction against the application to determine // its validity and whether it should be added to the mempool. CheckTxSync(tx types.Tx, txInfo TxInfo) (*abci.Response, error) - CheckTxAsync(tx types.Tx, txInfo TxInfo, callback func(*abci.Response)) error + CheckTxAsync(tx types.Tx, txInfo TxInfo, callback func(*abci.Response, error)) // ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes // bytes total with the condition that the total gasWanted must be less than diff --git a/mempool/reactor.go b/mempool/reactor.go index 79bd18fdc..dde5b9c68 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -9,6 +9,7 @@ import ( amino "github.com/tendermint/go-amino" + abci "github.com/tendermint/tendermint/abci/types" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/clist" "github.com/tendermint/tendermint/libs/log" @@ -174,10 +175,11 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { if src != nil { txInfo.SenderP2PID = src.ID() } - err := memR.mempool.CheckTxAsync(msg.Tx, txInfo, nil) - if err != nil { - memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err) - } + memR.mempool.CheckTxAsync(msg.Tx, txInfo, func(res *abci.Response, err error) { + if err != nil { + memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err) + } + }) // broadcasting happens from go routines per peer default: memR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) diff --git a/mock/mempool.go b/mock/mempool.go index ebedcc05a..b00f3ba95 100644 --- a/mock/mempool.go +++ b/mock/mempool.go @@ -18,8 +18,8 @@ func (Mempool) Size() int { return 0 } func (Mempool) CheckTxSync(_ types.Tx, _ mempl.TxInfo) (*abci.Response, error) { return nil, nil } -func (Mempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(*abci.Response)) error { - return nil +func (Mempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, cb func(*abci.Response, error)) { + cb(nil, nil) } func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} } diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 517550d2c..a0e4fb065 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -21,11 +21,7 @@ import ( // CheckTx nor DeliverTx results. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - err := env.Mempool.CheckTxAsync(tx, mempl.TxInfo{}, nil) - - if err != nil { - return nil, err - } + env.Mempool.CheckTxAsync(tx, mempl.TxInfo{}, nil) return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil } From 22655451a57bad9703662f7a4029cf3ebbde335d Mon Sep 17 00:00:00 2001 From: "Kim, JinSan" Date: Thu, 21 Jan 2021 12:28:03 +0900 Subject: [PATCH 2/5] chore: revise comment --- mempool/clist_mempool.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 9622e4b48..a05b898b0 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -235,10 +235,8 @@ func (mem *CListMempool) CheckTxSync(tx types.Tx, txInfo TxInfo) (res *abci.Resp return res, err } -// It blocks if we're waiting on Update() or Reap(). // cb: A callback from the CheckTx command. // It gets called from another goroutine. -// CONTRACT: Either cb will get called, or err returned. // // Safe for concurrent use by multiple goroutines. func (mem *CListMempool) CheckTxAsync(tx types.Tx, txInfo TxInfo, cb func(*abci.Response, error)) { @@ -251,6 +249,7 @@ func (mem *CListMempool) checkTxAsyncReactor() { } } +// It blocks if we're waiting on Update() or Reap(). func (mem *CListMempool) checkTxAsync(tx types.Tx, txInfo TxInfo, cb func(*abci.Response, error)) { mem.updateMtx.RLock() defer func() { From 116b02ceadee5aca703a23e55609e18139eb33ea Mon Sep 17 00:00:00 2001 From: "Kim, JinSan" Date: Thu, 21 Jan 2021 13:02:46 +0900 Subject: [PATCH 3/5] chore: divide `cb` into `prepareCb` and `checkTxCb` --- consensus/mempool_test.go | 4 ++-- mempool/bench_test.go | 4 ++-- mempool/clist_mempool.go | 24 +++++++++++++----------- mempool/mempool.go | 2 +- mempool/reactor.go | 5 ++--- mock/mempool.go | 5 +++-- rpc/core/mempool.go | 10 +++++++++- 7 files changed, 32 insertions(+), 22 deletions(-) diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 384ebea8e..8e8844015 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -162,9 +162,9 @@ func TestMempoolRmBadTx(t *testing.T) { // Try to send the tx through the mempool. // CheckTx should not err, but the app should return a bad abci code // and the tx should get removed from the pool - assertMempool(cs.txNotifier).CheckTxAsync(txBytes, mempl.TxInfo{}, func(r *abci.Response, err error) { + assertMempool(cs.txNotifier).CheckTxAsync(txBytes, mempl.TxInfo{}, func(err error) { checkTxErrorCh <- err - + }, func(r *abci.Response) { if r.GetCheckTx().Code != code.CodeTypeBadNonce { t.Errorf("expected checktx to return bad nonce, got %v", r) return diff --git a/mempool/bench_test.go b/mempool/bench_test.go index 0c2fdc7d3..eb2197e42 100644 --- a/mempool/bench_test.go +++ b/mempool/bench_test.go @@ -36,7 +36,7 @@ func BenchmarkReapWithCheckTxAsync(b *testing.B) { for i := 0; i < size; i++ { tx := make([]byte, 8) binary.BigEndian.PutUint64(tx, uint64(i)) - mempool.CheckTxAsync(tx, TxInfo{}, nil) + mempool.CheckTxAsync(tx, TxInfo{}, nil, nil) } b.ResetTimer() for i := 0; i < b.N; i++ { @@ -66,7 +66,7 @@ func BenchmarkCheckTxAsync(b *testing.B) { for i := 0; i < b.N; i++ { tx := make([]byte, 8) binary.BigEndian.PutUint64(tx, uint64(i)) - mempool.CheckTxAsync(tx, TxInfo{}, nil) + mempool.CheckTxAsync(tx, TxInfo{}, nil, nil) } } diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index a05b898b0..0a342efef 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -67,9 +67,10 @@ type CListMempool struct { } type requestCheckTxAsync struct { - tx types.Tx - txInfo TxInfo - cb func(*abci.Response, error) + tx types.Tx + txInfo TxInfo + prepareCb func(error) + checkTxCb func(*abci.Response) } var _ Mempool = &CListMempool{} @@ -239,18 +240,18 @@ func (mem *CListMempool) CheckTxSync(tx types.Tx, txInfo TxInfo) (res *abci.Resp // It gets called from another goroutine. // // Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) CheckTxAsync(tx types.Tx, txInfo TxInfo, cb func(*abci.Response, error)) { - mem.chReqCheckTx <- &requestCheckTxAsync{tx: tx, txInfo: txInfo, cb: cb} +func (mem *CListMempool) CheckTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func(error), checkTxCb func(*abci.Response)) { + mem.chReqCheckTx <- &requestCheckTxAsync{tx: tx, txInfo: txInfo, prepareCb: prepareCb, checkTxCb: checkTxCb} } func (mem *CListMempool) checkTxAsyncReactor() { for req := range mem.chReqCheckTx { - mem.checkTxAsync(req.tx, req.txInfo, req.cb) + mem.checkTxAsync(req.tx, req.txInfo, req.prepareCb, req.checkTxCb) } } // It blocks if we're waiting on Update() or Reap(). -func (mem *CListMempool) checkTxAsync(tx types.Tx, txInfo TxInfo, cb func(*abci.Response, error)) { +func (mem *CListMempool) checkTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func(error), checkTxCb func(*abci.Response)) { mem.updateMtx.RLock() defer func() { if r := recover(); r != nil { @@ -259,8 +260,9 @@ func (mem *CListMempool) checkTxAsync(tx types.Tx, txInfo TxInfo, cb func(*abci. } }() - if err := mem.prepareCheckTx(tx, txInfo); err != nil { - cb(nil, err) + err := mem.prepareCheckTx(tx, txInfo) + prepareCb(err) + if err != nil { mem.updateMtx.RUnlock() return } @@ -269,8 +271,8 @@ func (mem *CListMempool) checkTxAsync(tx types.Tx, txInfo TxInfo, cb func(*abci. reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) reqRes.SetCallback(func(res *abci.Response) { mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, res, func(response *abci.Response) { - if cb != nil { - cb(response, nil) + if checkTxCb != nil { + checkTxCb(response) } mem.updateMtx.RUnlock() }) diff --git a/mempool/mempool.go b/mempool/mempool.go index ffa2ebf77..e13def2fd 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -16,7 +16,7 @@ type Mempool interface { // CheckTx executes a new transaction against the application to determine // its validity and whether it should be added to the mempool. CheckTxSync(tx types.Tx, txInfo TxInfo) (*abci.Response, error) - CheckTxAsync(tx types.Tx, txInfo TxInfo, callback func(*abci.Response, error)) + CheckTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func(error), checkTxCb func(*abci.Response)) // ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes // bytes total with the condition that the total gasWanted must be less than diff --git a/mempool/reactor.go b/mempool/reactor.go index dde5b9c68..76dfbf2a3 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -9,7 +9,6 @@ import ( amino "github.com/tendermint/go-amino" - abci "github.com/tendermint/tendermint/abci/types" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/clist" "github.com/tendermint/tendermint/libs/log" @@ -175,11 +174,11 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { if src != nil { txInfo.SenderP2PID = src.ID() } - memR.mempool.CheckTxAsync(msg.Tx, txInfo, func(res *abci.Response, err error) { + memR.mempool.CheckTxAsync(msg.Tx, txInfo, func(err error) { if err != nil { memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err) } - }) + }, nil) // broadcasting happens from go routines per peer default: memR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) diff --git a/mock/mempool.go b/mock/mempool.go index b00f3ba95..bbf6cf53a 100644 --- a/mock/mempool.go +++ b/mock/mempool.go @@ -18,8 +18,9 @@ func (Mempool) Size() int { return 0 } func (Mempool) CheckTxSync(_ types.Tx, _ mempl.TxInfo) (*abci.Response, error) { return nil, nil } -func (Mempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, cb func(*abci.Response, error)) { - cb(nil, nil) +func (Mempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, prepareCb func(error), checkTxCb func(*abci.Response)) { + prepareCb(nil) + checkTxCb(nil) } func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} } diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index a0e4fb065..893f08266 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -21,7 +21,15 @@ import ( // CheckTx nor DeliverTx results. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - env.Mempool.CheckTxAsync(tx, mempl.TxInfo{}, nil) + chErr := make(chan error) + env.Mempool.CheckTxAsync(tx, mempl.TxInfo{}, func(err error) { + chErr <- err + }, nil) + err := <-chErr + if err != nil { + return nil, err + } + return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil } From a86a0da0714a85ffd9cc1820c996f017b439089a Mon Sep 17 00:00:00 2001 From: "Kim, JinSan" Date: Thu, 21 Jan 2021 21:09:21 +0900 Subject: [PATCH 4/5] chore: revise `mock/mempool.CheckTxAsync()` to do nothing --- mock/mempool.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/mock/mempool.go b/mock/mempool.go index bbf6cf53a..9a2b39999 100644 --- a/mock/mempool.go +++ b/mock/mempool.go @@ -18,9 +18,7 @@ func (Mempool) Size() int { return 0 } func (Mempool) CheckTxSync(_ types.Tx, _ mempl.TxInfo) (*abci.Response, error) { return nil, nil } -func (Mempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, prepareCb func(error), checkTxCb func(*abci.Response)) { - prepareCb(nil) - checkTxCb(nil) +func (Mempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(error), _ func(*abci.Response)) { } func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} } From 9152451096bd1796005eae48046b7ef502ba57eb Mon Sep 17 00:00:00 2001 From: "Kim, JinSan" Date: Thu, 21 Jan 2021 21:10:34 +0900 Subject: [PATCH 5/5] fix: check whether `prepareCb` is `nil` --- mempool/clist_mempool.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 0a342efef..85f5550f1 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -261,7 +261,9 @@ func (mem *CListMempool) checkTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func }() err := mem.prepareCheckTx(tx, txInfo) - prepareCb(err) + if prepareCb != nil { + prepareCb(err) + } if err != nil { mem.updateMtx.RUnlock() return