From c9b904e9cce750cc25bfedeb81005ea0ba06f6b6 Mon Sep 17 00:00:00 2001 From: Lukasz Cwik Date: Fri, 28 Apr 2023 13:17:58 -0700 Subject: [PATCH] [STAB-19] Reduce the size of the critical section This change makes a copy of `runTx` and does the following: * moves transaction descoding and tx validation before lock acquisition * moves creation of the response after releasing the lock * removes support for the post handler (unused by dYdX) and extracts out the only code that is executed in `runMsgs` allowing us to avoid the creation of the `runMsgCtx` and its associated `MultiStore` * removes `consumeBlockGas` since it is only executed during `deliverTx` --- baseapp/abci.go | 5 +- baseapp/baseapp.go | 139 ++++++++++++++++++++++++++++++++++++++++ baseapp/test_helpers.go | 2 +- go.mod | 2 +- go.sum | 4 +- 5 files changed, 144 insertions(+), 8 deletions(-) diff --git a/baseapp/abci.go b/baseapp/abci.go index 98cfe9cd1e0c..3afa5c4bdc1a 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -372,9 +372,6 @@ func (app *BaseApp) ProcessProposal(req abci.RequestProcessProposal) (resp abci. // will contain relevant error information. Regardless of tx execution outcome, // the ResponseCheckTx will contain relevant gas execution context. func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { - app.mtx.Lock() - defer app.mtx.Unlock() - var mode runTxMode switch { @@ -388,7 +385,7 @@ func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { panic(fmt.Sprintf("unknown RequestCheckTx type: %s", req.Type)) } - gInfo, result, anteEvents, priority, err := app.runTx(mode, req.Tx) + gInfo, result, anteEvents, priority, err := app.runCheckTxConcurrently(mode, req.Tx) if err != nil { return sdkerrors.ResponseCheckTxWithEvents(err, gInfo.GasWanted, gInfo.GasUsed, anteEvents, app.trace) } diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 9e4979511320..4214fe1f98cf 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -619,6 +619,141 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context return ctx.WithMultiStore(msCache), msCache } +// runCheckTxConcurrently processes a transaction with either the checkTx or recheckTx modes, encoded transaction +// bytes, and the decoded transaction itself. All state transitions occur through +// a cached Context depending on the mode provided. +// +// Note, gas execution info is always returned. A reference to a Result is +// returned if the tx does not run out of gas and if all the messages are valid +// and execute successfully. An error is returned otherwise. +func (app *BaseApp) runCheckTxConcurrently(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, priority int64, err error) { + if mode != runTxModeCheck && mode != runTxModeReCheck { + panic("runCheckTxConcurrently can only be invoked for CheckTx and RecheckTx.") + } + + // Strip out the post handler + if app.postHandler != nil { + panic("CheckTx/RecheckTx does not support a post hander.") + } + + // NOTE: GasWanted should be returned by the AnteHandler. GasUsed is + // determined by the GasMeter. We need access to the context to get the gas + // meter, so we initialize upfront. + var gasWanted uint64 + + tx, err := app.txDecoder(txBytes) + if err != nil { + return sdk.GasInfo{}, nil, nil, 0, err + } + + msgs := tx.GetMsgs() + if err := validateBasicTxMsgs(msgs); err != nil { + return sdk.GasInfo{}, nil, nil, 0, err + } + + // Execute the critical section under lock. + // + // Note that careful consideration is needed in the block below to ensure that we don't redefine + // gInfo, result, anteEvents, priority, or err local variables. Also note that this function is + // embedded here to ensure that the lifetime of the mutex is limited to only this function allowing + // for the return values to be computed without holding the lock. + func() { + app.mtx.Lock() + defer app.mtx.Unlock() + + ctx := app.getContextForTx(mode, txBytes) + ms := ctx.MultiStore() + + defer func() { + if r := recover(); r != nil { + recoveryMW := newOutOfGasRecoveryMiddleware(gasWanted, ctx, app.runTxRecoveryMiddleware) + err, result = processRecovery(r, recoveryMW), nil + } + + gInfo = sdk.GasInfo{GasWanted: gasWanted, GasUsed: ctx.GasMeter().GasConsumed()} + }() + + if app.anteHandler != nil { + var ( + anteCtx sdk.Context + msCache sdk.CacheMultiStore + newCtx sdk.Context + ) + + // Branch context before AnteHandler call in case it aborts. + // This is required for both CheckTx and DeliverTx. + // Ref: https://github.com/cosmos/cosmos-sdk/issues/2772 + // + // NOTE: Alternatively, we could require that AnteHandler ensures that + // writes do not happen if aborted/failed. This may have some + // performance benefits, but it'll be more difficult to get right. + anteCtx, msCache = app.cacheTxContext(ctx, txBytes) + anteCtx = anteCtx.WithEventManager(sdk.NewEventManager()) + newCtx, err = app.anteHandler(anteCtx, tx, mode == runTxModeSimulate) + + if !newCtx.IsZero() { + // At this point, newCtx.MultiStore() is a store branch, or something else + // replaced by the AnteHandler. We want the original multistore. + // + // Also, in the case of the tx aborting, we need to track gas consumed via + // the instantiated gas meter in the AnteHandler, so we update the context + // prior to returning. + ctx = newCtx.WithMultiStore(ms) + } + + events := ctx.EventManager().Events() + + // GasMeter expected to be set in AnteHandler + gasWanted = ctx.GasMeter().Limit() + + if err != nil { + // Note that we set the outputs here and return from the critical function back into + // runCheckTxConcurrently which will check `err` and return immediately. + result = nil + anteEvents = nil + priority = 0 + return + } + + priority = ctx.Priority() + msCache.Write() + anteEvents = events.ToABCIEvents() + } + + if mode == runTxModeCheck { + err = app.mempool.Insert(ctx, tx) + if err != nil { + result = nil + return + } + } + }() + if err != nil { + return gInfo, result, anteEvents, priority, err + } + + // Execute a stripped down version of runMsgs that is only used for CheckTx and RecheckTx + var msgResponses []*codectypes.Any + data, err := makeABCIData(msgResponses) + if err != nil { + return gInfo, result, anteEvents, priority, sdkerrors.Wrap(err, "failed to marshal tx data") + } + + msgLogs := make(sdk.ABCIMessageLogs, 0, len(msgs)) + result = &sdk.Result{ + Data: data, + Log: strings.TrimSpace(msgLogs.String()), + Events: sdk.EmptyEvents().ToABCIEvents(), + MsgResponses: msgResponses, + } + + // We don't support the post handler specifically to avoid creating a branched MultiStore and since dYdX + // doesn't need support for it. Once support is necessary or when we are trying to upstream these changes + // we can guard creation of the MultiStore to only occur when the post handler is specified. + + return gInfo, result, anteEvents, priority, err +} + // runTx processes a transaction within a given execution mode, encoded transaction // bytes, and the decoded transaction itself. All state transitions occur through // a cached Context depending on the mode provided. State only gets persisted @@ -627,6 +762,10 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context // returned if the tx does not run out of gas and if all the messages are valid // and execute successfully. An error is returned otherwise. func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, priority int64, err error) { + if mode == runTxModeCheck || mode == runTxModeReCheck { + panic("Expected CheckTx and RecheckTx to be executed via runCheckTxConcurrently") + } + // NOTE: GasWanted should be returned by the AnteHandler. GasUsed is // determined by the GasMeter. We need access to the context to get the gas // meter, so we initialize upfront. diff --git a/baseapp/test_helpers.go b/baseapp/test_helpers.go index a8ecee084d5c..8e2e2da77fab 100644 --- a/baseapp/test_helpers.go +++ b/baseapp/test_helpers.go @@ -16,7 +16,7 @@ func (app *BaseApp) SimCheck(txEncoder sdk.TxEncoder, tx sdk.Tx) (sdk.GasInfo, * if err != nil { return sdk.GasInfo{}, nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "%s", err) } - gasInfo, result, _, _, err := app.runTx(runTxModeCheck, bz) + gasInfo, result, _, _, err := app.runCheckTxConcurrently(runTxModeCheck, bz) return gasInfo, result, err } diff --git a/go.mod b/go.mod index d8b3a5b6de26..632a1f0b1954 100644 --- a/go.mod +++ b/go.mod @@ -195,4 +195,4 @@ retract ( v0.43.0 ) -replace github.com/cometbft/cometbft => github.com/dydxprotocol/cometbft v0.37.2-0.20230703183317-3b10b8dfd96a +replace github.com/cometbft/cometbft => github.com/dydxprotocol/cometbft v0.37.2-0.20230703183317-bc5c0e0243ac diff --git a/go.sum b/go.sum index aaeab17ff3c3..9cc7abf80de9 100644 --- a/go.sum +++ b/go.sum @@ -377,8 +377,8 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/dvsekhvalnov/jose2go v1.5.0 h1:3j8ya4Z4kMCwT5nXIKFSV84YS+HdqSSO0VsTQxaLAeM= github.com/dvsekhvalnov/jose2go v1.5.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= -github.com/dydxprotocol/cometbft v0.37.2-0.20230703183317-3b10b8dfd96a h1:OjTAQDrBzoHoRqYXCEDwrQk8RUn93dhd3uJ58W7H4Mo= -github.com/dydxprotocol/cometbft v0.37.2-0.20230703183317-3b10b8dfd96a/go.mod h1:cpghf0+1GJpJvrqpTHE6UyTcD05m/xllo0xpufL3PgA= +github.com/dydxprotocol/cometbft v0.37.2-0.20230703183317-bc5c0e0243ac h1:OJojXWgxMUhuGvn/p6fhzC1yV75JiKrO8y81cWkIlzY= +github.com/dydxprotocol/cometbft v0.37.2-0.20230703183317-bc5c0e0243ac/go.mod h1:cpghf0+1GJpJvrqpTHE6UyTcD05m/xllo0xpufL3PgA= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=