Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(mempool)!: match server/v2/cometbft and sdk mempool interface #21744

Merged
merged 13 commits into from
Sep 18, 2024
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Every module contains its own CHANGELOG.md. Please refer to the module you are i
### Improvements

* (genutil) [#21701](https://github.com/cosmos/cosmos-sdk/pull/21701) Improved error messages for genesis validation.
* (sims)[#21613](https://github.com/cosmos/cosmos-sdk/pull/21613) Add sims2 framework and factory methods for simpler message factories in modules

### Bug Fixes

Expand All @@ -58,7 +59,7 @@ Every module contains its own CHANGELOG.md. Please refer to the module you are i

### API Breaking Changes

* (sims)[#21613](https://github.com/cosmos/cosmos-sdk/pull/21613) Add sims2 framework and factory methods for simpler message factories in modules
* (types/mempool) [#21744](https://github.com/cosmos/cosmos-sdk/pull/21744) Update types/mempool.Mempool interface to take decoded transactions. This avoid to decode the transaction twice.

### Deprecated

Expand Down
22 changes: 14 additions & 8 deletions baseapp/abci_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,19 +264,25 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan

defer h.txSelector.Clear()

// decode transactions
decodedTxs := make([]sdk.Tx, len(req.Txs))
for i, txBz := range req.Txs {
tx, err := h.txVerifier.TxDecode(txBz)
if err != nil {
return nil, err
}

decodedTxs[i] = tx
}

// If the mempool is nil or NoOp we simply return the transactions
// requested from CometBFT, which, by default, should be in FIFO order.
//
// Note, we still need to ensure the transactions returned respect req.MaxTxBytes.
_, isNoOp := h.mempool.(mempool.NoOpMempool)
if h.mempool == nil || isNoOp {
for _, txBz := range req.Txs {
tx, err := h.txVerifier.TxDecode(txBz)
if err != nil {
return nil, err
}

stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, tx, txBz)
for i, tx := range decodedTxs {
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, tx, req.Txs[i])
if stop {
break
}
Expand All @@ -291,7 +297,7 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
selectedTxsNums int
invalidTxs []sdk.Tx // invalid txs to be removed out of the loop to avoid dead lock
)
h.mempool.SelectBy(ctx, req.Txs, func(memTx sdk.Tx) bool {
h.mempool.SelectBy(ctx, decodedTxs, func(memTx sdk.Tx) bool {
unorderedTx, ok := memTx.(sdk.TxWithUnordered)
isUnordered := ok && unorderedTx.GetUnordered()
txSignersSeqs := make(map[string]uint64)
Expand Down
1 change: 1 addition & 0 deletions baseapp/abci_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ func (s *ABCIUtilsTestSuite) TestDefaultProposalHandler_PriorityNonceMempoolTxSe
ph := baseapp.NewDefaultProposalHandler(mp, app)

for _, v := range tc.txInputs {
app.EXPECT().TxDecode(v.bz).Return(v.tx, nil).AnyTimes()
app.EXPECT().PrepareProposalVerifyTx(v.tx).Return(v.bz, nil).AnyTimes()
s.NoError(mp.Insert(s.ctx.WithPriority(v.priority), v.tx))
tc.req.Txs = append(tc.req.Txs, v.bz)
Expand Down
7 changes: 4 additions & 3 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,10 @@ func (c *Consensus[T]) FinalizeBlock(
}

// remove txs from the mempool
err = c.mempool.Remove(decodedTxs)
if err != nil {
return nil, fmt.Errorf("unable to remove txs: %w", err)
for _, tx := range decodedTxs {
if err = c.mempool.Remove(tx); err != nil {
return nil, fmt.Errorf("unable to remove tx: %w", err)
}
}

c.lastCommittedHeight.Store(req.Height)
Expand Down
2 changes: 1 addition & 1 deletion server/v2/cometbft/handlers/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
// check again.
_, err := app.ValidateTx(ctx, memTx)
if err != nil {
err := h.mempool.Remove([]T{memTx})
err := h.mempool.Remove(memTx)
if err != nil && !errors.Is(err, mempool.ErrTxNotFound) {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion server/v2/cometbft/internal/mock/mock_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ type MockMempool[T transaction.Tx] struct{}

func (MockMempool[T]) Insert(context.Context, T) error { return nil }
func (MockMempool[T]) Select(context.Context, []T) mempool.Iterator[T] { return nil }
func (MockMempool[T]) SelectBy(context.Context, []T, func(T) bool) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complete the implementation of the SelectBy function.

The SelectBy function has been added to facilitate the selection of transactions based on a predicate. However, the current implementation is empty.

Please complete the function logic to ensure it behaves as expected when called. Consider iterating over the input transactions and applying the predicate to each transaction to determine which ones should be selected.

func (MockMempool[T]) CountTx() int { return 0 }
func (MockMempool[T]) Remove([]T) error { return nil }
func (MockMempool[T]) Remove(T) error { return nil }
11 changes: 8 additions & 3 deletions server/v2/cometbft/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ type Mempool[T transaction.Tx] interface {
Insert(context.Context, T) error

// Select returns an Iterator over the app-side mempool. If txs are specified,
// then they shall be incorporated into the Iterator. The Iterator must be
// closed by the caller.
// then they shall be incorporated into the Iterator. The Iterator is not thread-safe to use.
Select(context.Context, []T) Iterator[T]

// SelectBy use callback to iterate over the mempool, it's thread-safe to use.
SelectBy(context.Context, []T, func(T) bool)

// CountTx returns the number of transactions currently in the mempool.
CountTx() int

// Remove attempts to remove a transaction from the mempool, returning an error
// upon failure.
Remove([]T) error
Remove(T) error
}

// Iterator defines an app-side mempool iterator interface that is as minimal as
Expand Down
12 changes: 8 additions & 4 deletions server/v2/cometbft/mempool/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"context"

"cosmossdk.io/core/transaction"

sdk "github.com/cosmos/cosmos-sdk/types"
)

var _ Mempool[sdk.Tx] = (*NoOpMempool[sdk.Tx])(nil) // verify interface at compile time
var _ Mempool[transaction.Tx] = (*NoOpMempool[transaction.Tx])(nil)

// NoOpMempool defines a no-op mempool. Transactions are completely discarded and
Expand All @@ -16,7 +19,8 @@ var _ Mempool[transaction.Tx] = (*NoOpMempool[transaction.Tx])(nil)
// is FIFO-ordered by default.
type NoOpMempool[T transaction.Tx] struct{}

func (NoOpMempool[T]) Insert(context.Context, T) error { return nil }
func (NoOpMempool[T]) Select(context.Context, []T) Iterator[T] { return nil }
func (NoOpMempool[T]) CountTx() int { return 0 }
func (NoOpMempool[T]) Remove([]T) error { return nil }
func (NoOpMempool[T]) Insert(context.Context, T) error { return nil }
func (NoOpMempool[T]) Select(context.Context, []T) Iterator[T] { return nil }
func (NoOpMempool[T]) SelectBy(context.Context, []T, func(T) bool) {}
func (NoOpMempool[T]) CountTx() int { return 0 }
func (NoOpMempool[T]) Remove(T) error { return nil }
2 changes: 1 addition & 1 deletion simapp/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/cosmos/cosmos-db v1.0.3-0.20240911104526-ddc3f09bfc22 // indirect
// this version is not used as it is always replaced by the latest Cosmos SDK version
github.com/cosmos/cosmos-sdk v0.53.0
github.com/spf13/cast v1.7.0 // indirect
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.19.0
Expand Down Expand Up @@ -193,7 +194,6 @@ require (
github.com/sasha-s/go-deadlock v0.3.5 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.7.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/supranational/blst v0.3.13 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect
Expand Down
6 changes: 2 additions & 4 deletions simapp/v2/simdv2/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,11 @@ func initCometConfig() cometbft.CfgOption {
func initCometOptions[T transaction.Tx]() cometbft.ServerOptions[T] {
serverOptions := cometbft.DefaultServerOptions[T]()

// TODO mempool interface doesn't match!

// overwrite app mempool, using max-txs option
// serverOptions.Mempool = func(cfg map[string]any) mempool.Mempool[T] {
// if maxTxs := cast.ToInt(cfg[cometbft.FlagMempoolMaxTxs]); maxTxs >= 0 {
// return mempool.NewSenderNonceMempool(
// mempool.SenderNonceMaxTxOpt(maxTxs),
// return sdkmempool.NewSenderNonceMempool(
// sdkmempool.SenderNonceMaxTxOpt(maxTxs),
// )
// }

Expand Down
4 changes: 2 additions & 2 deletions types/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ type Mempool interface {

// Select returns an Iterator over the app-side mempool. If txs are specified,
// then they shall be incorporated into the Iterator. The Iterator is not thread-safe to use.
Select(context.Context, [][]byte) Iterator
Select(context.Context, []sdk.Tx) Iterator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the change here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As otherwise in server/v2 we would decode twice if the mempool implementation decodes the txs (ours do not, but we cannot know about users):

https://github.com/cosmos/cosmos-sdk/blob/95b8092/server/v2/cometbft/handlers/defaults.go#L52
https://github.com/cosmos/cosmos-sdk/blob/95b8092/server/v2/cometbft/handlers/defaults.go#L72

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use a generic instead of either []byte or sdk.Tx? if we use sdk.Tx how will this work with server/v2?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err.. this interface is only used for app v1 correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, the one of server/v2 is identical but use generic


// SelectBy use callback to iterate over the mempool, it's thread-safe to use.
SelectBy(context.Context, [][]byte, func(sdk.Tx) bool)
SelectBy(context.Context, []sdk.Tx, func(sdk.Tx) bool)

// CountTx returns the number of transactions currently in the mempool.
CountTx() int
Expand Down
4 changes: 2 additions & 2 deletions types/mempool/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var _ Mempool = (*NoOpMempool)(nil)
type NoOpMempool struct{}

func (NoOpMempool) Insert(context.Context, sdk.Tx) error { return nil }
func (NoOpMempool) Select(context.Context, [][]byte) Iterator { return nil }
func (NoOpMempool) SelectBy(context.Context, [][]byte, func(sdk.Tx) bool) {}
func (NoOpMempool) Select(context.Context, []sdk.Tx) Iterator { return nil }
func (NoOpMempool) SelectBy(context.Context, []sdk.Tx, func(sdk.Tx) bool) {}
func (NoOpMempool) CountTx() int { return 0 }
func (NoOpMempool) Remove(sdk.Tx) error { return nil }
6 changes: 3 additions & 3 deletions types/mempool/priority_nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,13 +361,13 @@ func (i *PriorityNonceIterator[C]) Tx() sdk.Tx {
//
// NOTE: It is not safe to use this iterator while removing transactions from
// the underlying mempool.
func (mp *PriorityNonceMempool[C]) Select(ctx context.Context, txs [][]byte) Iterator {
func (mp *PriorityNonceMempool[C]) Select(ctx context.Context, txs []sdk.Tx) Iterator {
mp.mtx.Lock()
defer mp.mtx.Unlock()
return mp.doSelect(ctx, txs)
}

func (mp *PriorityNonceMempool[C]) doSelect(_ context.Context, _ [][]byte) Iterator {
func (mp *PriorityNonceMempool[C]) doSelect(_ context.Context, _ []sdk.Tx) Iterator {
if mp.priorityIndex.Len() == 0 {
return nil
}
Expand All @@ -383,7 +383,7 @@ func (mp *PriorityNonceMempool[C]) doSelect(_ context.Context, _ [][]byte) Itera
}

// SelectBy will hold the mutex during the iteration, callback returns if continue.
func (mp *PriorityNonceMempool[C]) SelectBy(ctx context.Context, txs [][]byte, callback func(sdk.Tx) bool) {
func (mp *PriorityNonceMempool[C]) SelectBy(ctx context.Context, txs []sdk.Tx, callback func(sdk.Tx) bool) {
mp.mtx.Lock()
defer mp.mtx.Unlock()

Expand Down
6 changes: 3 additions & 3 deletions types/mempool/sender_nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,13 @@ func (snm *SenderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error {
//
// NOTE: It is not safe to use this iterator while removing transactions from
// the underlying mempool.
func (snm *SenderNonceMempool) Select(ctx context.Context, txs [][]byte) Iterator {
func (snm *SenderNonceMempool) Select(ctx context.Context, txs []sdk.Tx) Iterator {
snm.mtx.Lock()
defer snm.mtx.Unlock()
return snm.doSelect(ctx, txs)
}

func (snm *SenderNonceMempool) doSelect(_ context.Context, _ [][]byte) Iterator {
func (snm *SenderNonceMempool) doSelect(_ context.Context, _ []sdk.Tx) Iterator {
var senders []string

senderCursors := make(map[string]*skiplist.Element)
Expand Down Expand Up @@ -202,7 +202,7 @@ func (snm *SenderNonceMempool) doSelect(_ context.Context, _ [][]byte) Iterator
}

// SelectBy will hold the mutex during the iteration, callback returns if continue.
func (snm *SenderNonceMempool) SelectBy(ctx context.Context, txs [][]byte, callback func(sdk.Tx) bool) {
func (snm *SenderNonceMempool) SelectBy(ctx context.Context, txs []sdk.Tx, callback func(sdk.Tx) bool) {
snm.mtx.Lock()
defer snm.mtx.Unlock()

Expand Down
Loading