Skip to content

Commit

Permalink
TxExecutor baseapp option, TxIndex/MsgIndex in context
Browse files Browse the repository at this point in the history
  • Loading branch information
yihuang committed Mar 13, 2024
1 parent a569ba6 commit 27cc99b
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Every module contains its own CHANGELOG.md. Please refer to the module you are i
* (types) [#18768](https://github.com/cosmos/cosmos-sdk/pull/18768) Add MustValAddressFromBech32 function.
* (gRPC) [#19049](https://github.com/cosmos/cosmos-sdk/pull/19049) Add debug log prints for each gRPC request.
* (x/consensus) [#19483](https://github.com/cosmos/cosmos-sdk/pull/19483) Add consensus messages registration to consensus module.
* (baseapp) [#]() Add TxExecutor option, and `TxIndex`/`MsgIndex` to context.

### Improvements

Expand Down
71 changes: 43 additions & 28 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,34 +803,10 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
//
// NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g.
// vote extensions, so skip those.
txResults := make([]*abci.ExecTxResult, 0, len(req.Txs))
for _, rawTx := range req.Txs {
var response *abci.ExecTxResult

if _, err := app.txDecoder(rawTx); err == nil {
response = app.deliverTx(rawTx)
} else {
// In the case where a transaction included in a block proposal is malformed,
// we still want to return a default response to comet. This is because comet
// expects a response for each transaction included in a block proposal.
response = sdkerrors.ResponseExecTxResultWithEvents(
sdkerrors.ErrTxDecode,
0,
0,
nil,
false,
)
}

// check after every tx if we should abort
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

txResults = append(txResults, response)
txResults, err := app.executeTxs(ctx, req.Txs)
if err != nil {
// usually due to cancelled
return nil, err
}

if app.finalizeBlockState.ms.TracingEnabled() {
Expand Down Expand Up @@ -861,6 +837,45 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
}, nil
}

func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, error) {
if app.txExecutor != nil {
return app.txExecutor(ctx, len(txs), app.finalizeBlockState.ms, func(i int, ms storetypes.MultiStore) *abci.ExecTxResult {
return app.deliverTxWithMultiStore(txs[i], i, ms)
})
}

txResults := make([]*abci.ExecTxResult, 0, len(txs))
for i, rawTx := range txs {
var response *abci.ExecTxResult

if _, err := app.txDecoder(rawTx); err == nil {
response = app.deliverTx(rawTx, i)
} else {
// In the case where a transaction included in a block proposal is malformed,
// we still want to return a default response to comet. This is because comet
// expects a response for each transaction included in a block proposal.
response = sdkerrors.ResponseExecTxResultWithEvents(
sdkerrors.ErrTxDecode,
0,
0,
nil,
false,
)
}

// check after every tx if we should abort
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

txResults = append(txResults, response)
}
return txResults, nil
}

// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock.
// Specifically, it will execute an application's BeginBlock (if defined), followed
// by the transactions in the proposal, finally followed by the application's
Expand Down
27 changes: 22 additions & 5 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ type BaseApp struct {
// including the goroutine handling.This is experimental and must be enabled
// by developers.
optimisticExec *oe.OptimisticExecution

// Optional alternative tx executor, used for block-stm parallel transaction execution.
txExecutor TxExecutor
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down Expand Up @@ -657,7 +660,7 @@ func (app *BaseApp) getBlockGasMeter(ctx sdk.Context) storetypes.GasMeter {
}

// retrieve the context for the tx w/ txBytes and other memoized values.
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte, txIndex int) sdk.Context {
app.mu.Lock()
defer app.mu.Unlock()

Expand All @@ -666,7 +669,8 @@ func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {
panic(fmt.Sprintf("state is nil for mode %v", mode))
}
ctx := modeState.Context().
WithTxBytes(txBytes)
WithTxBytes(txBytes).
WithTxIndex(txIndex)
// WithVoteInfos(app.voteInfos) // TODO: identify if this is needed

ctx = ctx.WithIsSigverifyTx(app.sigverifyTx)
Expand Down Expand Up @@ -746,7 +750,11 @@ func (app *BaseApp) beginBlock(req *abci.RequestFinalizeBlock) (sdk.BeginBlock,
return resp, nil
}

func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
func (app *BaseApp) deliverTx(tx []byte, txIndex int) *abci.ExecTxResult {
return app.deliverTxWithMultiStore(tx, txIndex, nil)
}

func (app *BaseApp) deliverTxWithMultiStore(tx []byte, txIndex int, txMultiStore storetypes.MultiStore) *abci.ExecTxResult {
gInfo := sdk.GasInfo{}
resultStr := "successful"

Expand All @@ -759,7 +767,7 @@ func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted")
}()

gInfo, result, anteEvents, err := app.runTx(execModeFinalize, tx)
gInfo, result, anteEvents, err := app.runTxWithMultiStore(execModeFinalize, tx, txIndex, txMultiStore)
if err != nil {
resultStr = "failed"
resp = sdkerrors.ResponseExecTxResultWithEvents(
Expand Down Expand Up @@ -817,12 +825,19 @@ func (app *BaseApp) endBlock(ctx context.Context) (sdk.EndBlock, error) {
// returned if the tx does not run out of gas and if all the messages are valid
// and execute successfully. An error is returned otherwise.
func (app *BaseApp) runTx(mode execMode, txBytes []byte) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
return app.runTxWithMultiStore(mode, txBytes, -1, nil)
}

func (app *BaseApp) runTxWithMultiStore(mode execMode, txBytes []byte, txIndex int, txMultiStore storetypes.MultiStore) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
// NOTE: GasWanted should be returned by the AnteHandler. GasUsed is
// determined by the GasMeter. We need access to the context to get the gas
// meter, so we initialize upfront.
var gasWanted uint64

ctx := app.getContextForTx(mode, txBytes)
ctx := app.getContextForTx(mode, txBytes, txIndex)
if txMultiStore != nil {
ctx = ctx.WithMultiStore(txMultiStore)
}
ms := ctx.MultiStore()

// only run the tx if there is block gas remaining
Expand Down Expand Up @@ -996,6 +1011,8 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, msgsV2 []protov2.Me
break
}

ctx = ctx.WithMsgIndex(i)

handler := app.msgServiceRouter.Handler(msg)
if handler == nil {
return nil, errorsmod.Wrapf(sdkerrors.ErrUnknownRequest, "no message handler found for %T", msg)
Expand Down
2 changes: 1 addition & 1 deletion baseapp/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var _ genesis.TxHandler = (*BaseApp)(nil)
// ExecuteGenesisTx implements genesis.GenesisState from
// cosmossdk.io/core/genesis to set initial state in genesis
func (ba *BaseApp) ExecuteGenesisTx(tx []byte) error {
res := ba.deliverTx(tx)
res := ba.deliverTx(tx, -1)

if res.Code != types.CodeTypeOK {
return errors.New(res.Log)
Expand Down
10 changes: 10 additions & 0 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ func SetOptimisticExecution(opts ...func(*oe.OptimisticExecution)) func(*BaseApp
}
}

// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution.
func SetTxExecutor(executor TxExecutor) func(*BaseApp) {
return func(app *BaseApp) { app.txExecutor = executor }
}

func (app *BaseApp) SetName(name string) {
if app.sealed {
panic("SetName() on sealed BaseApp")
Expand Down Expand Up @@ -395,3 +400,8 @@ func (app *BaseApp) SetMsgServiceRouter(msgServiceRouter *MsgServiceRouter) {
func (app *BaseApp) SetGRPCQueryRouter(grpcQueryRouter *GRPCQueryRouter) {
app.grpcQueryRouter = grpcQueryRouter
}

// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution.
func (app *BaseApp) SetTxExecutor(executor TxExecutor) {
app.txExecutor = executor
}
4 changes: 2 additions & 2 deletions baseapp/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func (app *BaseApp) NewUncachedContext(isCheckTx bool, header cmtproto.Header) s
}

func (app *BaseApp) GetContextForFinalizeBlock(txBytes []byte) sdk.Context {
return app.getContextForTx(execModeFinalize, txBytes)
return app.getContextForTx(execModeFinalize, txBytes, -1)
}

func (app *BaseApp) GetContextForCheckTx(txBytes []byte) sdk.Context {
return app.getContextForTx(execModeCheck, txBytes)
return app.getContextForTx(execModeCheck, txBytes, -1)
}
15 changes: 15 additions & 0 deletions baseapp/txexecutor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package baseapp

import (
"context"

storetypes "cosmossdk.io/store/types"
abci "github.com/cometbft/cometbft/abci/types"
)

type TxExecutor func(
ctx context.Context,
blockSize int,
cms storetypes.MultiStore,
deliverTxWithMultiStore func(int, storetypes.MultiStore) *abci.ExecTxResult,
) ([]*abci.ExecTxResult, error)
15 changes: 15 additions & 0 deletions types/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type Context struct {
streamingManager storetypes.StreamingManager
cometInfo comet.Info
headerInfo header.Info

txIndex int
msgIndex int
}

// Proposed rename, not done to avoid API breakage
Expand Down Expand Up @@ -90,6 +93,8 @@ func (c Context) TransientKVGasConfig() storetypes.GasConfig { return c.trans
func (c Context) StreamingManager() storetypes.StreamingManager { return c.streamingManager }
func (c Context) CometInfo() comet.Info { return c.cometInfo }
func (c Context) HeaderInfo() header.Info { return c.headerInfo }
func (c Context) TxIndex() int { return c.txIndex }
func (c Context) MsgIndex() int { return c.msgIndex }

// clone the header before returning
func (c Context) BlockHeader() cmtproto.Header {
Expand Down Expand Up @@ -314,6 +319,16 @@ func (c Context) WithHeaderInfo(headerInfo header.Info) Context {
return c
}

func (c Context) WithTxIndex(txIndex int) Context {
c.txIndex = txIndex
return c
}

func (c Context) WithMsgIndex(msgIndex int) Context {
c.msgIndex = msgIndex
return c
}

// TODO: remove???
func (c Context) IsZero() bool {
return c.ms == nil
Expand Down

0 comments on commit 27cc99b

Please sign in to comment.