Skip to content

Commit

Permalink
mempool: rework lock discipline to mitigate callback deadlocks (backp…
Browse files Browse the repository at this point in the history
…ort #9030) (#9033)

(manual cherry-pick of commit 22ed610083cb8275a954406296832149c4cc1dcd)

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>
  • Loading branch information
tnasu and M. J. Fromberger committed Jul 19, 2023
1 parent 0dcca16 commit 007164a
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 106 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ require (
require (
github.com/Finschia/r2ishiguro_vrf v0.1.2
github.com/bufbuild/buf v1.25.0
github.com/creachadair/taskgroup v0.3.2
github.com/golangci/golangci-lint v1.53.3
github.com/klauspost/pgzip v1.2.6 // indirect
github.com/oasisprotocol/curve25519-voi v0.0.0-20230110094441-db37f07504ce
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr
github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creachadair/taskgroup v0.3.2 h1:zlfutDS+5XG40AOxcHDSThxKzns8Tnr9jnr6VqkYlkM=
github.com/creachadair/taskgroup v0.3.2/go.mod h1:wieWwecHVzsidg2CsUnFinW1faVN4+kq+TDlRJQ0Wbk=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down
179 changes: 73 additions & 106 deletions mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ package v1

import (
"fmt"
"reflect"
"runtime"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/creachadair/taskgroup"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/clist"
Expand Down Expand Up @@ -42,8 +43,7 @@ type TxMempool struct {
cache mempool.TxCache // seen transactions

// Atomically-updated fields
txsBytes int64 // atomic: the total size of all transactions in the mempool, in bytes
txRecheck int64 // atomic: the number of pending recheck calls
txsBytes int64 // atomic: the total size of all transactions in the mempool, in bytes

// Synchronized fields, protected by mtx.
mtx *sync.RWMutex
Expand Down Expand Up @@ -84,8 +84,6 @@ func NewTxMempool(
txmp.cache = mempool.NewLRUTxCache(cfg.CacheSize)
}

proxyAppConn.SetResponseCallback(txmp.recheckTxCallback)

for _, opt := range options {
opt(txmp)
}
Expand Down Expand Up @@ -220,30 +218,23 @@ func (txmp *TxMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo memp
return err
}

// Initiate an ABCI CheckTx for this transaction. The callback is
// responsible for adding the transaction to the pool if it survives.
//
// N.B.: We have to issue the call outside the lock. In a local client,
// even an "async" call invokes its callback immediately which will make
// the callback deadlock trying to acquire the same lock. This isn't a
// problem with out-of-process calls, but this has to work for both.
reqRes := txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx})
if err := txmp.proxyAppConn.FlushSync(); err != nil {
// Invoke an ABCI CheckTx for this transaction.
rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{Tx: tx})
if err != nil {
txmp.cache.Remove(tx)
return err
}
reqRes.SetCallback(func(res *abci.Response) {
wtx := &WrappedTx{
tx: tx,
hash: tx.Key(),
timestamp: time.Now().UTC(),
height: height,
}
wtx.SetPeer(txInfo.SenderID)
txmp.initialTxCallback(wtx, res)
if cb != nil {
cb(res)
}
})
wtx := &WrappedTx{
tx: tx,
hash: tx.Key(),
timestamp: time.Now().UTC(),
height: height,
}
wtx.SetPeer(txInfo.SenderID)
txmp.addNewTransaction(wtx, rsp)
if cb != nil {
cb(&abci.Response{Value: &abci.Response_CheckTx{CheckTx: rsp}})
}
return nil
}

Expand Down Expand Up @@ -299,10 +290,6 @@ func (txmp *TxMempool) Flush() {
cur = next
}
txmp.cache.Reset()

// Discard any pending recheck calls that may be in flight. The calls will
// still complete, but will have no effect on the mempool.
atomic.StoreInt64(&txmp.txRecheck, 0)
}

// allEntriesSorted returns a slice of all the transactions currently in the
Expand Down Expand Up @@ -398,12 +385,6 @@ func (txmp *TxMempool) Update(
newPreFn mempool.PreCheckFunc,
newPostFn mempool.PostCheckFunc,
) error {
// TODO(creachadair): This would be a nice safety check but requires Go 1.18.
// // Safety check: The caller is required to hold the lock.
// if txmp.mtx.TryLock() {
// txmp.mtx.Unlock()
// panic("mempool: Update caller does not hold the lock")
// }
// Safety check: Transactions and responses must match in number.
if len(blockTxs) != len(deliverTxResponses) {
panic(fmt.Sprintf("mempool: got %d transactions but %d DeliverTx responses",
Expand Down Expand Up @@ -451,9 +432,9 @@ func (txmp *TxMempool) Update(
return nil
}

// initialTxCallback handles the ABCI CheckTx response for the first time a
// addNewTransaction handles the ABCI CheckTx response for the first time a
// transaction is added to the mempool. A recheck after a block is committed
// goes to the default callback (see recheckTxCallback).
// goes to handleRecheckResult.
//
// If either the application rejected the transaction or a post-check hook is
// defined and rejects the transaction, it is discarded.
Expand All @@ -464,31 +445,22 @@ func (txmp *TxMempool) Update(
// transactions are evicted.
//
// Finally, the new transaction is added and size stats updated.
func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
if !ok {
txmp.logger.Error("mempool: received incorrect result type in CheckTx callback",
"expected", reflect.TypeOf(&abci.Response_CheckTx{}).Name(),
"got", reflect.TypeOf(res.Value).Name(),
)
return
}

func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.ResponseCheckTx) {
txmp.mtx.Lock()
defer txmp.mtx.Unlock()

var err error
if txmp.postCheck != nil {
err = txmp.postCheck(wtx.tx, checkTxRes.CheckTx)
err = txmp.postCheck(wtx.tx, checkTxRes)
}

if err != nil || checkTxRes.CheckTx.Code != abci.CodeTypeOK {
if err != nil || checkTxRes.Code != abci.CodeTypeOK {
txmp.logger.Info(
"rejected bad transaction",
"priority", wtx.Priority(),
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"peer_id", wtx.peers,
"code", checkTxRes.CheckTx.Code,
"code", checkTxRes.Code,
"post_check_err", err,
)

Expand All @@ -503,13 +475,13 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
// If there was a post-check error, record its text in the result for
// debugging purposes.
if err != nil {
checkTxRes.CheckTx.MempoolError = err.Error()
checkTxRes.MempoolError = err.Error()
}
return
}

priority := checkTxRes.CheckTx.Priority
sender := checkTxRes.CheckTx.Sender
priority := checkTxRes.Priority
sender := checkTxRes.Sender

// Disallow multiple concurrent transactions from the same sender assigned
// by the ABCI application. As a special case, an empty sender is not
Expand All @@ -523,7 +495,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
"tx", fmt.Sprintf("%X", w.tx.Hash()),
"sender", sender,
)
checkTxRes.CheckTx.MempoolError =
checkTxRes.MempoolError =
fmt.Sprintf("rejected valid incoming transaction; tx already exists for sender %q (%X)",
sender, w.tx.Hash())
txmp.metrics.RejectedTxs.Add(1)
Expand Down Expand Up @@ -558,7 +530,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"err", err.Error(),
)
checkTxRes.CheckTx.MempoolError =
checkTxRes.MempoolError =
fmt.Sprintf("rejected valid incoming transaction; mempool is full (%X)",
wtx.tx.Hash())
txmp.metrics.RejectedTxs.Add(1)
Expand Down Expand Up @@ -604,7 +576,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
}
}

wtx.SetGasWanted(checkTxRes.CheckTx.GasWanted)
wtx.SetGasWanted(checkTxRes.GasWanted)
wtx.SetPriority(priority)
wtx.SetSender(sender)
txmp.insertTx(wtx)
Expand All @@ -631,33 +603,14 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) {
atomic.AddInt64(&txmp.txsBytes, wtx.Size())
}

// recheckTxCallback handles the responses from ABCI CheckTx calls issued
// during the recheck phase of a block Update. It updates the recheck counter
// and removes any transactions invalidated by the application.
// handleRecheckResult handles the responses from ABCI CheckTx calls issued
// during the recheck phase of a block Update. It removes any transactions
// invalidated by the application.
//
// This callback is NOT executed for the initial CheckTx on a new transaction;
// that case is handled by initialTxCallback instead.
func (txmp *TxMempool) recheckTxCallback(req *abci.Request, res *abci.Response) {
checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
if !ok {
// Don't log this; this is the default callback and other response types
// can safely be ignored.
return
}

// Check whether we are expecting recheck responses at this point.
// If not, we will ignore the response, this usually means the mempool was Flushed.
// If this is the "last" pending recheck, trigger a notification when it's been processed.
numLeft := atomic.AddInt64(&txmp.txRecheck, -1)
if numLeft == 0 {
defer txmp.notifyTxsAvailable() // notify waiters on return, if mempool is non-empty
} else if numLeft < 0 {
return
}

// This method is NOT executed for the initial CheckTx on a new transaction;
// that case is handled by addNewTransaction instead.
func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.ResponseCheckTx) {
txmp.metrics.RecheckTimes.Add(1)
tx := types.Tx(req.GetCheckTx().Tx)

txmp.mtx.Lock()
defer txmp.mtx.Unlock()

Expand All @@ -673,11 +626,11 @@ func (txmp *TxMempool) recheckTxCallback(req *abci.Request, res *abci.Response)
// If a postcheck hook is defined, call it before checking the result.
var err error
if txmp.postCheck != nil {
err = txmp.postCheck(tx, checkTxRes.CheckTx)
err = txmp.postCheck(tx, checkTxRes)
}

if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil {
wtx.SetPriority(checkTxRes.CheckTx.Priority)
if checkTxRes.Code == abci.CodeTypeOK && err == nil {
wtx.SetPriority(checkTxRes.Priority)
return // N.B. Size of mempool did not change
}

Expand All @@ -686,7 +639,7 @@ func (txmp *TxMempool) recheckTxCallback(req *abci.Request, res *abci.Response)
"priority", wtx.Priority(),
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"err", err,
"code", checkTxRes.CheckTx.Code,
"code", checkTxRes.Code,
)
txmp.removeTxByElement(elt)
txmp.metrics.FailedTxs.Add(1)
Expand All @@ -711,29 +664,43 @@ func (txmp *TxMempool) recheckTransactions() {
"num_txs", txmp.Size(),
"height", txmp.height,
)
// N.B.: We have to issue the calls outside the lock. In a local client,
// even an "async" call invokes its callback immediately which will make the
// callback deadlock trying to acquire the same lock. This isn't a problem
// with out-of-process calls, but this has to work for both.
txmp.mtx.Unlock()
defer txmp.mtx.Lock()

atomic.StoreInt64(&txmp.txRecheck, int64(txmp.txs.Len()))
// Collect transactions currently in the mempool requiring recheck.
wtxs := make([]*WrappedTx, 0, txmp.txs.Len())
for e := txmp.txs.Front(); e != nil; e = e.Next() {
wtx := e.Value.(*WrappedTx)

// The response for this CheckTx is handled by the default recheckTxCallback.
_ = txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{
Tx: wtx.tx,
Type: abci.CheckTxType_Recheck,
})
if err := txmp.proxyAppConn.FlushSync(); err != nil {
atomic.AddInt64(&txmp.txRecheck, -1)
txmp.logger.Error("mempool: error flushing re-CheckTx", "key", wtx.tx.Key(), "err", err)
wtxs = append(wtxs, e.Value.(*WrappedTx))
}

// Issue CheckTx calls for each remaining transaction, and when all the
// rechecks are complete signal watchers that transactions may be available.
go func() {
g, start := taskgroup.New(nil).Limit(2 * runtime.NumCPU())

for _, wtx := range wtxs {
wtx := wtx
start(func() error {
// The response for this CheckTx is handled by the default recheckTxCallback.
rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{
Tx: wtx.tx,
Type: abci.CheckTxType_Recheck,
})
if err != nil {
txmp.logger.Error("failed to execute CheckTx during recheck",
"err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash()))
} else {
txmp.handleRecheckResult(wtx.tx, rsp)
}
return nil
})
}
}
_ = txmp.proxyAppConn.FlushAsync()

txmp.proxyAppConn.FlushAsync()
// When recheck is complete, trigger a notification for more transactions.
_ = g.Wait()
txmp.mtx.Lock()
defer txmp.mtx.Unlock()
txmp.notifyTxsAvailable()
}()
}

// canAddTx returns an error if we cannot insert the provided *WrappedTx into
Expand Down

0 comments on commit 007164a

Please sign in to comment.